mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	when result of PyLong_AsLong() narrowed to int without checks. This is a backport of changesets 13e2e44db99d and 525407d89277.
		
			
				
	
	
		
			463 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			463 lines
		
	
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Adapted from test_file.py by Daniel Stutzbach
 | 
						|
 | 
						|
import sys
 | 
						|
import os
 | 
						|
import io
 | 
						|
import errno
 | 
						|
import unittest
 | 
						|
from array import array
 | 
						|
from weakref import proxy
 | 
						|
from functools import wraps
 | 
						|
import _testcapi
 | 
						|
 | 
						|
from test.support import TESTFN, check_warnings, run_unittest, make_bad_fd
 | 
						|
from collections import UserList
 | 
						|
 | 
						|
from _io import FileIO as _FileIO
 | 
						|
 | 
						|
class AutoFileTests(unittest.TestCase):
 | 
						|
    # file tests for which a test file is automatically set up
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.f = _FileIO(TESTFN, 'w')
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        if self.f:
 | 
						|
            self.f.close()
 | 
						|
        os.remove(TESTFN)
 | 
						|
 | 
						|
    def testWeakRefs(self):
 | 
						|
        # verify weak references
 | 
						|
        p = proxy(self.f)
 | 
						|
        p.write(bytes(range(10)))
 | 
						|
        self.assertEqual(self.f.tell(), p.tell())
 | 
						|
        self.f.close()
 | 
						|
        self.f = None
 | 
						|
        self.assertRaises(ReferenceError, getattr, p, 'tell')
 | 
						|
 | 
						|
    def testSeekTell(self):
 | 
						|
        self.f.write(bytes(range(20)))
 | 
						|
        self.assertEqual(self.f.tell(), 20)
 | 
						|
        self.f.seek(0)
 | 
						|
        self.assertEqual(self.f.tell(), 0)
 | 
						|
        self.f.seek(10)
 | 
						|
        self.assertEqual(self.f.tell(), 10)
 | 
						|
        self.f.seek(5, 1)
 | 
						|
        self.assertEqual(self.f.tell(), 15)
 | 
						|
        self.f.seek(-5, 1)
 | 
						|
        self.assertEqual(self.f.tell(), 10)
 | 
						|
        self.f.seek(-5, 2)
 | 
						|
        self.assertEqual(self.f.tell(), 15)
 | 
						|
 | 
						|
    def testAttributes(self):
 | 
						|
        # verify expected attributes exist
 | 
						|
        f = self.f
 | 
						|
 | 
						|
        self.assertEqual(f.mode, "wb")
 | 
						|
        self.assertEqual(f.closed, False)
 | 
						|
 | 
						|
        # verify the attributes are readonly
 | 
						|
        for attr in 'mode', 'closed':
 | 
						|
            self.assertRaises((AttributeError, TypeError),
 | 
						|
                              setattr, f, attr, 'oops')
 | 
						|
 | 
						|
    def testReadinto(self):
 | 
						|
        # verify readinto
 | 
						|
        self.f.write(bytes([1, 2]))
 | 
						|
        self.f.close()
 | 
						|
        a = array('b', b'x'*10)
 | 
						|
        self.f = _FileIO(TESTFN, 'r')
 | 
						|
        n = self.f.readinto(a)
 | 
						|
        self.assertEqual(array('b', [1, 2]), a[:n])
 | 
						|
 | 
						|
    def testWritelinesList(self):
 | 
						|
        l = [b'123', b'456']
 | 
						|
        self.f.writelines(l)
 | 
						|
        self.f.close()
 | 
						|
        self.f = _FileIO(TESTFN, 'rb')
 | 
						|
        buf = self.f.read()
 | 
						|
        self.assertEqual(buf, b'123456')
 | 
						|
 | 
						|
    def testWritelinesUserList(self):
 | 
						|
        l = UserList([b'123', b'456'])
 | 
						|
        self.f.writelines(l)
 | 
						|
        self.f.close()
 | 
						|
        self.f = _FileIO(TESTFN, 'rb')
 | 
						|
        buf = self.f.read()
 | 
						|
        self.assertEqual(buf, b'123456')
 | 
						|
 | 
						|
    def testWritelinesError(self):
 | 
						|
        self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
 | 
						|
        self.assertRaises(TypeError, self.f.writelines, None)
 | 
						|
        self.assertRaises(TypeError, self.f.writelines, "abc")
 | 
						|
 | 
						|
    def test_none_args(self):
 | 
						|
        self.f.write(b"hi\nbye\nabc")
 | 
						|
        self.f.close()
 | 
						|
        self.f = _FileIO(TESTFN, 'r')
 | 
						|
        self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
 | 
						|
        self.f.seek(0)
 | 
						|
        self.assertEqual(self.f.readline(None), b"hi\n")
 | 
						|
        self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
 | 
						|
 | 
						|
    def test_reject(self):
 | 
						|
        self.assertRaises(TypeError, self.f.write, "Hello!")
 | 
						|
 | 
						|
    def testRepr(self):
 | 
						|
        self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode=%r>"
 | 
						|
                                        % (self.f.name, self.f.mode))
 | 
						|
        del self.f.name
 | 
						|
        self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode=%r>"
 | 
						|
                                        % (self.f.fileno(), self.f.mode))
 | 
						|
        self.f.close()
 | 
						|
        self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
 | 
						|
 | 
						|
    def testErrors(self):
 | 
						|
        f = self.f
 | 
						|
        self.assertTrue(not f.isatty())
 | 
						|
        self.assertTrue(not f.closed)
 | 
						|
        #self.assertEqual(f.name, TESTFN)
 | 
						|
        self.assertRaises(ValueError, f.read, 10) # Open for reading
 | 
						|
        f.close()
 | 
						|
        self.assertTrue(f.closed)
 | 
						|
        f = _FileIO(TESTFN, 'r')
 | 
						|
        self.assertRaises(TypeError, f.readinto, "")
 | 
						|
        self.assertTrue(not f.closed)
 | 
						|
        f.close()
 | 
						|
        self.assertTrue(f.closed)
 | 
						|
 | 
						|
    def testMethods(self):
 | 
						|
        methods = ['fileno', 'isatty', 'read', 'readinto',
 | 
						|
                   'seek', 'tell', 'truncate', 'write', 'seekable',
 | 
						|
                   'readable', 'writable']
 | 
						|
 | 
						|
        self.f.close()
 | 
						|
        self.assertTrue(self.f.closed)
 | 
						|
 | 
						|
        for methodname in methods:
 | 
						|
            method = getattr(self.f, methodname)
 | 
						|
            # should raise on closed file
 | 
						|
            self.assertRaises(ValueError, method)
 | 
						|
 | 
						|
    def testOpendir(self):
 | 
						|
        # Issue 3703: opening a directory should fill the errno
 | 
						|
        # Windows always returns "[Errno 13]: Permission denied
 | 
						|
        # Unix calls dircheck() and returns "[Errno 21]: Is a directory"
 | 
						|
        try:
 | 
						|
            _FileIO('.', 'r')
 | 
						|
        except IOError as e:
 | 
						|
            self.assertNotEqual(e.errno, 0)
 | 
						|
            self.assertEqual(e.filename, ".")
 | 
						|
        else:
 | 
						|
            self.fail("Should have raised IOError")
 | 
						|
 | 
						|
    @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
 | 
						|
    def testOpenDirFD(self):
 | 
						|
        fd = os.open('.', os.O_RDONLY)
 | 
						|
        with self.assertRaises(IOError) as cm:
 | 
						|
            _FileIO(fd, 'r')
 | 
						|
        os.close(fd)
 | 
						|
        self.assertEqual(cm.exception.errno, errno.EISDIR)
 | 
						|
 | 
						|
    #A set of functions testing that we get expected behaviour if someone has
 | 
						|
    #manually closed the internal file descriptor.  First, a decorator:
 | 
						|
    def ClosedFD(func):
 | 
						|
        @wraps(func)
 | 
						|
        def wrapper(self):
 | 
						|
            #forcibly close the fd before invoking the problem function
 | 
						|
            f = self.f
 | 
						|
            os.close(f.fileno())
 | 
						|
            try:
 | 
						|
                func(self, f)
 | 
						|
            finally:
 | 
						|
                try:
 | 
						|
                    self.f.close()
 | 
						|
                except IOError:
 | 
						|
                    pass
 | 
						|
        return wrapper
 | 
						|
 | 
						|
    def ClosedFDRaises(func):
 | 
						|
        @wraps(func)
 | 
						|
        def wrapper(self):
 | 
						|
            #forcibly close the fd before invoking the problem function
 | 
						|
            f = self.f
 | 
						|
            os.close(f.fileno())
 | 
						|
            try:
 | 
						|
                func(self, f)
 | 
						|
            except IOError as e:
 | 
						|
                self.assertEqual(e.errno, errno.EBADF)
 | 
						|
            else:
 | 
						|
                self.fail("Should have raised IOError")
 | 
						|
            finally:
 | 
						|
                try:
 | 
						|
                    self.f.close()
 | 
						|
                except IOError:
 | 
						|
                    pass
 | 
						|
        return wrapper
 | 
						|
 | 
						|
    @ClosedFDRaises
 | 
						|
    def testErrnoOnClose(self, f):
 | 
						|
        f.close()
 | 
						|
 | 
						|
    @ClosedFDRaises
 | 
						|
    def testErrnoOnClosedWrite(self, f):
 | 
						|
        f.write(b'a')
 | 
						|
 | 
						|
    @ClosedFDRaises
 | 
						|
    def testErrnoOnClosedSeek(self, f):
 | 
						|
        f.seek(0)
 | 
						|
 | 
						|
    @ClosedFDRaises
 | 
						|
    def testErrnoOnClosedTell(self, f):
 | 
						|
        f.tell()
 | 
						|
 | 
						|
    @ClosedFDRaises
 | 
						|
    def testErrnoOnClosedTruncate(self, f):
 | 
						|
        f.truncate(0)
 | 
						|
 | 
						|
    @ClosedFD
 | 
						|
    def testErrnoOnClosedSeekable(self, f):
 | 
						|
        f.seekable()
 | 
						|
 | 
						|
    @ClosedFD
 | 
						|
    def testErrnoOnClosedReadable(self, f):
 | 
						|
        f.readable()
 | 
						|
 | 
						|
    @ClosedFD
 | 
						|
    def testErrnoOnClosedWritable(self, f):
 | 
						|
        f.writable()
 | 
						|
 | 
						|
    @ClosedFD
 | 
						|
    def testErrnoOnClosedFileno(self, f):
 | 
						|
        f.fileno()
 | 
						|
 | 
						|
    @ClosedFD
 | 
						|
    def testErrnoOnClosedIsatty(self, f):
 | 
						|
        self.assertEqual(f.isatty(), False)
 | 
						|
 | 
						|
    def ReopenForRead(self):
 | 
						|
        try:
 | 
						|
            self.f.close()
 | 
						|
        except IOError:
 | 
						|
            pass
 | 
						|
        self.f = _FileIO(TESTFN, 'r')
 | 
						|
        os.close(self.f.fileno())
 | 
						|
        return self.f
 | 
						|
 | 
						|
    @ClosedFDRaises
 | 
						|
    def testErrnoOnClosedRead(self, f):
 | 
						|
        f = self.ReopenForRead()
 | 
						|
        f.read(1)
 | 
						|
 | 
						|
    @ClosedFDRaises
 | 
						|
    def testErrnoOnClosedReadall(self, f):
 | 
						|
        f = self.ReopenForRead()
 | 
						|
        f.readall()
 | 
						|
 | 
						|
    @ClosedFDRaises
 | 
						|
    def testErrnoOnClosedReadinto(self, f):
 | 
						|
        f = self.ReopenForRead()
 | 
						|
        a = array('b', b'x'*10)
 | 
						|
        f.readinto(a)
 | 
						|
 | 
						|
class OtherFileTests(unittest.TestCase):
 | 
						|
 | 
						|
    def testAbles(self):
 | 
						|
        try:
 | 
						|
            f = _FileIO(TESTFN, "w")
 | 
						|
            self.assertEqual(f.readable(), False)
 | 
						|
            self.assertEqual(f.writable(), True)
 | 
						|
            self.assertEqual(f.seekable(), True)
 | 
						|
            f.close()
 | 
						|
 | 
						|
            f = _FileIO(TESTFN, "r")
 | 
						|
            self.assertEqual(f.readable(), True)
 | 
						|
            self.assertEqual(f.writable(), False)
 | 
						|
            self.assertEqual(f.seekable(), True)
 | 
						|
            f.close()
 | 
						|
 | 
						|
            f = _FileIO(TESTFN, "a+")
 | 
						|
            self.assertEqual(f.readable(), True)
 | 
						|
            self.assertEqual(f.writable(), True)
 | 
						|
            self.assertEqual(f.seekable(), True)
 | 
						|
            self.assertEqual(f.isatty(), False)
 | 
						|
            f.close()
 | 
						|
 | 
						|
            if sys.platform != "win32":
 | 
						|
                try:
 | 
						|
                    f = _FileIO("/dev/tty", "a")
 | 
						|
                except EnvironmentError:
 | 
						|
                    # When run in a cron job there just aren't any
 | 
						|
                    # ttys, so skip the test.  This also handles other
 | 
						|
                    # OS'es that don't support /dev/tty.
 | 
						|
                    pass
 | 
						|
                else:
 | 
						|
                    self.assertEqual(f.readable(), False)
 | 
						|
                    self.assertEqual(f.writable(), True)
 | 
						|
                    if sys.platform != "darwin" and \
 | 
						|
                       'bsd' not in sys.platform and \
 | 
						|
                       not sys.platform.startswith('sunos'):
 | 
						|
                        # Somehow /dev/tty appears seekable on some BSDs
 | 
						|
                        self.assertEqual(f.seekable(), False)
 | 
						|
                    self.assertEqual(f.isatty(), True)
 | 
						|
                    f.close()
 | 
						|
        finally:
 | 
						|
            os.unlink(TESTFN)
 | 
						|
 | 
						|
    def testModeStrings(self):
 | 
						|
        # check invalid mode strings
 | 
						|
        for mode in ("", "aU", "wU+", "rw", "rt"):
 | 
						|
            try:
 | 
						|
                f = _FileIO(TESTFN, mode)
 | 
						|
            except ValueError:
 | 
						|
                pass
 | 
						|
            else:
 | 
						|
                f.close()
 | 
						|
                self.fail('%r is an invalid file mode' % mode)
 | 
						|
 | 
						|
    def testUnicodeOpen(self):
 | 
						|
        # verify repr works for unicode too
 | 
						|
        f = _FileIO(str(TESTFN), "w")
 | 
						|
        f.close()
 | 
						|
        os.unlink(TESTFN)
 | 
						|
 | 
						|
    def testBytesOpen(self):
 | 
						|
        # Opening a bytes filename
 | 
						|
        try:
 | 
						|
            fn = TESTFN.encode("ascii")
 | 
						|
        except UnicodeEncodeError:
 | 
						|
            # Skip test
 | 
						|
            return
 | 
						|
        f = _FileIO(fn, "w")
 | 
						|
        try:
 | 
						|
            f.write(b"abc")
 | 
						|
            f.close()
 | 
						|
            with open(TESTFN, "rb") as f:
 | 
						|
                self.assertEqual(f.read(), b"abc")
 | 
						|
        finally:
 | 
						|
            os.unlink(TESTFN)
 | 
						|
 | 
						|
    def testConstructorHandlesNULChars(self):
 | 
						|
        fn_with_NUL = 'foo\0bar'
 | 
						|
        self.assertRaises(TypeError, _FileIO, fn_with_NUL, 'w')
 | 
						|
        self.assertRaises(TypeError, _FileIO, bytes(fn_with_NUL, 'ascii'), 'w')
 | 
						|
 | 
						|
    def testInvalidFd(self):
 | 
						|
        self.assertRaises(ValueError, _FileIO, -10)
 | 
						|
        self.assertRaises(OSError, _FileIO, make_bad_fd())
 | 
						|
        if sys.platform == 'win32':
 | 
						|
            import msvcrt
 | 
						|
            self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
 | 
						|
        # Issue 15989
 | 
						|
        self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
 | 
						|
        self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
 | 
						|
 | 
						|
    def testBadModeArgument(self):
 | 
						|
        # verify that we get a sensible error message for bad mode argument
 | 
						|
        bad_mode = "qwerty"
 | 
						|
        try:
 | 
						|
            f = _FileIO(TESTFN, bad_mode)
 | 
						|
        except ValueError as msg:
 | 
						|
            if msg.args[0] != 0:
 | 
						|
                s = str(msg)
 | 
						|
                if TESTFN in s or bad_mode not in s:
 | 
						|
                    self.fail("bad error message for invalid mode: %s" % s)
 | 
						|
            # if msg.args[0] == 0, we're probably on Windows where there may be
 | 
						|
            # no obvious way to discover why open() failed.
 | 
						|
        else:
 | 
						|
            f.close()
 | 
						|
            self.fail("no error for invalid mode: %s" % bad_mode)
 | 
						|
 | 
						|
    def testTruncate(self):
 | 
						|
        f = _FileIO(TESTFN, 'w')
 | 
						|
        f.write(bytes(bytearray(range(10))))
 | 
						|
        self.assertEqual(f.tell(), 10)
 | 
						|
        f.truncate(5)
 | 
						|
        self.assertEqual(f.tell(), 10)
 | 
						|
        self.assertEqual(f.seek(0, io.SEEK_END), 5)
 | 
						|
        f.truncate(15)
 | 
						|
        self.assertEqual(f.tell(), 5)
 | 
						|
        self.assertEqual(f.seek(0, io.SEEK_END), 15)
 | 
						|
        f.close()
 | 
						|
 | 
						|
    def testTruncateOnWindows(self):
 | 
						|
        def bug801631():
 | 
						|
            # SF bug <http://www.python.org/sf/801631>
 | 
						|
            # "file.truncate fault on windows"
 | 
						|
            f = _FileIO(TESTFN, 'w')
 | 
						|
            f.write(bytes(range(11)))
 | 
						|
            f.close()
 | 
						|
 | 
						|
            f = _FileIO(TESTFN,'r+')
 | 
						|
            data = f.read(5)
 | 
						|
            if data != bytes(range(5)):
 | 
						|
                self.fail("Read on file opened for update failed %r" % data)
 | 
						|
            if f.tell() != 5:
 | 
						|
                self.fail("File pos after read wrong %d" % f.tell())
 | 
						|
 | 
						|
            f.truncate()
 | 
						|
            if f.tell() != 5:
 | 
						|
                self.fail("File pos after ftruncate wrong %d" % f.tell())
 | 
						|
 | 
						|
            f.close()
 | 
						|
            size = os.path.getsize(TESTFN)
 | 
						|
            if size != 5:
 | 
						|
                self.fail("File size after ftruncate wrong %d" % size)
 | 
						|
 | 
						|
        try:
 | 
						|
            bug801631()
 | 
						|
        finally:
 | 
						|
            os.unlink(TESTFN)
 | 
						|
 | 
						|
    def testAppend(self):
 | 
						|
        try:
 | 
						|
            f = open(TESTFN, 'wb')
 | 
						|
            f.write(b'spam')
 | 
						|
            f.close()
 | 
						|
            f = open(TESTFN, 'ab')
 | 
						|
            f.write(b'eggs')
 | 
						|
            f.close()
 | 
						|
            f = open(TESTFN, 'rb')
 | 
						|
            d = f.read()
 | 
						|
            f.close()
 | 
						|
            self.assertEqual(d, b'spameggs')
 | 
						|
        finally:
 | 
						|
            try:
 | 
						|
                os.unlink(TESTFN)
 | 
						|
            except:
 | 
						|
                pass
 | 
						|
 | 
						|
    def testInvalidInit(self):
 | 
						|
        self.assertRaises(TypeError, _FileIO, "1", 0, 0)
 | 
						|
 | 
						|
    def testWarnings(self):
 | 
						|
        with check_warnings(quiet=True) as w:
 | 
						|
            self.assertEqual(w.warnings, [])
 | 
						|
            self.assertRaises(TypeError, _FileIO, [])
 | 
						|
            self.assertEqual(w.warnings, [])
 | 
						|
            self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
 | 
						|
            self.assertEqual(w.warnings, [])
 | 
						|
 | 
						|
    def testUnclosedFDOnException(self):
 | 
						|
        class MyException(Exception): pass
 | 
						|
        class MyFileIO(_FileIO):
 | 
						|
            def __setattr__(self, name, value):
 | 
						|
                if name == "name":
 | 
						|
                    raise MyException("blocked setting name")
 | 
						|
                return super(MyFileIO, self).__setattr__(name, value)
 | 
						|
        fd = os.open(__file__, os.O_RDONLY)
 | 
						|
        self.assertRaises(MyException, MyFileIO, fd)
 | 
						|
        os.close(fd)  # should not raise OSError(EBADF)
 | 
						|
 | 
						|
 | 
						|
def test_main():
 | 
						|
    # Historically, these tests have been sloppy about removing TESTFN.
 | 
						|
    # So get rid of it no matter what.
 | 
						|
    try:
 | 
						|
        run_unittest(AutoFileTests, OtherFileTests)
 | 
						|
    finally:
 | 
						|
        if os.path.exists(TESTFN):
 | 
						|
            os.unlink(TESTFN)
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    test_main()
 |