mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
They could raise SystemError or crash when getting the "closed" attribute or converting it to boolean raises an exception.
1547 lines
56 KiB
Python
1547 lines
56 KiB
Python
import array
|
|
import pickle
|
|
import random
|
|
import sys
|
|
import threading
|
|
import time
|
|
import unittest
|
|
import warnings
|
|
import weakref
|
|
from collections import deque, UserList
|
|
from itertools import cycle, count
|
|
from test import support
|
|
from test.support import os_helper, threading_helper
|
|
from .utils import byteslike, CTestCase, PyTestCase
|
|
|
|
|
|
import io # C implementation.
|
|
import _pyio as pyio # Python implementation.
|
|
|
|
|
|
class CommonBufferedTests:
|
|
# Tests common to BufferedReader, BufferedWriter and BufferedRandom
|
|
|
|
def test_detach(self):
|
|
raw = self.MockRawIO()
|
|
buf = self.tp(raw)
|
|
self.assertIs(buf.detach(), raw)
|
|
self.assertRaises(ValueError, buf.detach)
|
|
|
|
repr(buf) # Should still work
|
|
|
|
def test_fileno(self):
|
|
rawio = self.MockRawIO()
|
|
bufio = self.tp(rawio)
|
|
|
|
self.assertEqual(42, bufio.fileno())
|
|
|
|
def test_invalid_args(self):
|
|
rawio = self.MockRawIO()
|
|
bufio = self.tp(rawio)
|
|
# Invalid whence
|
|
self.assertRaises(ValueError, bufio.seek, 0, -1)
|
|
self.assertRaises(ValueError, bufio.seek, 0, 9)
|
|
|
|
def test_override_destructor(self):
|
|
tp = self.tp
|
|
record = []
|
|
class MyBufferedIO(tp):
|
|
def __del__(self):
|
|
record.append(1)
|
|
try:
|
|
f = super().__del__
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
f()
|
|
def close(self):
|
|
record.append(2)
|
|
super().close()
|
|
def flush(self):
|
|
record.append(3)
|
|
super().flush()
|
|
rawio = self.MockRawIO()
|
|
bufio = MyBufferedIO(rawio)
|
|
del bufio
|
|
support.gc_collect()
|
|
self.assertEqual(record, [1, 2, 3])
|
|
|
|
def test_context_manager(self):
|
|
# Test usability as a context manager
|
|
rawio = self.MockRawIO()
|
|
bufio = self.tp(rawio)
|
|
def _with():
|
|
with bufio:
|
|
pass
|
|
_with()
|
|
# bufio should now be closed, and using it a second time should raise
|
|
# a ValueError.
|
|
self.assertRaises(ValueError, _with)
|
|
|
|
def test_error_through_destructor(self):
|
|
# Test that the exception state is not modified by a destructor,
|
|
# even if close() fails.
|
|
rawio = self.CloseFailureIO()
|
|
with support.catch_unraisable_exception() as cm:
|
|
with self.assertRaises(AttributeError):
|
|
self.tp(rawio).xyzzy
|
|
|
|
self.assertEqual(cm.unraisable.exc_type, OSError)
|
|
|
|
def test_repr(self):
|
|
raw = self.MockRawIO()
|
|
b = self.tp(raw)
|
|
clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
|
|
self.assertRegex(repr(b), "<%s>" % clsname)
|
|
raw.name = "dummy"
|
|
self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
|
|
raw.name = b"dummy"
|
|
self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
|
|
|
|
def test_recursive_repr(self):
|
|
# Issue #25455
|
|
raw = self.MockRawIO()
|
|
b = self.tp(raw)
|
|
with support.swap_attr(raw, 'name', b), support.infinite_recursion(25):
|
|
with self.assertRaises(RuntimeError):
|
|
repr(b) # Should not crash
|
|
|
|
def test_flush_error_on_close(self):
|
|
# Test that buffered file is closed despite failed flush
|
|
# and that flush() is called before file closed.
|
|
raw = self.MockRawIO()
|
|
closed = []
|
|
def bad_flush():
|
|
closed[:] = [b.closed, raw.closed]
|
|
raise OSError()
|
|
raw.flush = bad_flush
|
|
b = self.tp(raw)
|
|
self.assertRaises(OSError, b.close) # exception not swallowed
|
|
self.assertTrue(b.closed)
|
|
self.assertTrue(raw.closed)
|
|
self.assertTrue(closed) # flush() called
|
|
self.assertFalse(closed[0]) # flush() called before file closed
|
|
self.assertFalse(closed[1])
|
|
raw.flush = lambda: None # break reference loop
|
|
|
|
def test_close_error_on_close(self):
|
|
raw = self.MockRawIO()
|
|
def bad_flush():
|
|
raise OSError('flush')
|
|
def bad_close():
|
|
raise OSError('close')
|
|
raw.close = bad_close
|
|
b = self.tp(raw)
|
|
b.flush = bad_flush
|
|
with self.assertRaises(OSError) as err: # exception not swallowed
|
|
b.close()
|
|
self.assertEqual(err.exception.args, ('close',))
|
|
self.assertIsInstance(err.exception.__context__, OSError)
|
|
self.assertEqual(err.exception.__context__.args, ('flush',))
|
|
self.assertFalse(b.closed)
|
|
|
|
# Silence destructor error
|
|
raw.close = lambda: None
|
|
b.flush = lambda: None
|
|
|
|
def test_nonnormalized_close_error_on_close(self):
|
|
# Issue #21677
|
|
raw = self.MockRawIO()
|
|
def bad_flush():
|
|
raise non_existing_flush
|
|
def bad_close():
|
|
raise non_existing_close
|
|
raw.close = bad_close
|
|
b = self.tp(raw)
|
|
b.flush = bad_flush
|
|
with self.assertRaises(NameError) as err: # exception not swallowed
|
|
b.close()
|
|
self.assertIn('non_existing_close', str(err.exception))
|
|
self.assertIsInstance(err.exception.__context__, NameError)
|
|
self.assertIn('non_existing_flush', str(err.exception.__context__))
|
|
self.assertFalse(b.closed)
|
|
|
|
# Silence destructor error
|
|
b.flush = lambda: None
|
|
raw.close = lambda: None
|
|
|
|
def test_multi_close(self):
|
|
raw = self.MockRawIO()
|
|
b = self.tp(raw)
|
|
b.close()
|
|
b.close()
|
|
b.close()
|
|
self.assertRaises(ValueError, b.flush)
|
|
|
|
def test_unseekable(self):
|
|
bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
|
|
self.assertRaises(self.UnsupportedOperation, bufio.tell)
|
|
self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
|
|
|
|
def test_readonly_attributes(self):
|
|
raw = self.MockRawIO()
|
|
buf = self.tp(raw)
|
|
x = self.MockRawIO()
|
|
with self.assertRaises(AttributeError):
|
|
buf.raw = x
|
|
|
|
def test_pickling_subclass(self):
|
|
global MyBufferedIO
|
|
class MyBufferedIO(self.tp):
|
|
def __init__(self, raw, tag):
|
|
super().__init__(raw)
|
|
self.tag = tag
|
|
def __getstate__(self):
|
|
return self.tag, self.raw.getvalue()
|
|
def __setstate__(slf, state):
|
|
tag, value = state
|
|
slf.__init__(self.BytesIO(value), tag)
|
|
|
|
raw = self.BytesIO(b'data')
|
|
buf = MyBufferedIO(raw, tag='ham')
|
|
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
|
|
with self.subTest(protocol=proto):
|
|
pickled = pickle.dumps(buf, proto)
|
|
newbuf = pickle.loads(pickled)
|
|
self.assertEqual(newbuf.raw.getvalue(), b'data')
|
|
self.assertEqual(newbuf.tag, 'ham')
|
|
del MyBufferedIO
|
|
|
|
|
|
class SizeofTest:
|
|
|
|
@support.cpython_only
|
|
def test_sizeof(self):
|
|
bufsize1 = 4096
|
|
bufsize2 = 8192
|
|
rawio = self.MockRawIO()
|
|
bufio = self.tp(rawio, buffer_size=bufsize1)
|
|
size = sys.getsizeof(bufio) - bufsize1
|
|
rawio = self.MockRawIO()
|
|
bufio = self.tp(rawio, buffer_size=bufsize2)
|
|
self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
|
|
|
|
@support.cpython_only
|
|
def test_buffer_freeing(self) :
|
|
bufsize = 4096
|
|
rawio = self.MockRawIO()
|
|
bufio = self.tp(rawio, buffer_size=bufsize)
|
|
size = sys.getsizeof(bufio) - bufsize
|
|
bufio.close()
|
|
self.assertEqual(sys.getsizeof(bufio), size)
|
|
|
|
class BufferedReaderTest(CommonBufferedTests):
|
|
read_mode = "rb"
|
|
|
|
def test_constructor(self):
|
|
rawio = self.MockRawIO([b"abc"])
|
|
bufio = self.tp(rawio)
|
|
bufio.__init__(rawio)
|
|
bufio.__init__(rawio, buffer_size=1024)
|
|
bufio.__init__(rawio, buffer_size=16)
|
|
self.assertEqual(b"abc", bufio.read())
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
|
|
rawio = self.MockRawIO([b"abc"])
|
|
bufio.__init__(rawio)
|
|
self.assertEqual(b"abc", bufio.read())
|
|
|
|
def test_uninitialized(self):
|
|
bufio = self.tp.__new__(self.tp)
|
|
del bufio
|
|
bufio = self.tp.__new__(self.tp)
|
|
self.assertRaisesRegex((ValueError, AttributeError),
|
|
'uninitialized|has no attribute',
|
|
bufio.read, 0)
|
|
bufio.__init__(self.MockRawIO())
|
|
self.assertEqual(bufio.read(0), b'')
|
|
|
|
def test_read(self):
|
|
for arg in (None, 7):
|
|
rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
|
bufio = self.tp(rawio)
|
|
self.assertEqual(b"abcdefg", bufio.read(arg))
|
|
# Invalid args
|
|
self.assertRaises(ValueError, bufio.read, -2)
|
|
|
|
def test_read1(self):
|
|
rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
|
bufio = self.tp(rawio)
|
|
self.assertEqual(b"a", bufio.read(1))
|
|
self.assertEqual(b"b", bufio.read1(1))
|
|
self.assertEqual(rawio._reads, 1)
|
|
self.assertEqual(b"", bufio.read1(0))
|
|
self.assertEqual(b"c", bufio.read1(100))
|
|
self.assertEqual(rawio._reads, 1)
|
|
self.assertEqual(b"d", bufio.read1(100))
|
|
self.assertEqual(rawio._reads, 2)
|
|
self.assertEqual(b"efg", bufio.read1(100))
|
|
self.assertEqual(rawio._reads, 3)
|
|
self.assertEqual(b"", bufio.read1(100))
|
|
self.assertEqual(rawio._reads, 4)
|
|
|
|
def test_read1_arbitrary(self):
|
|
rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
|
bufio = self.tp(rawio)
|
|
self.assertEqual(b"a", bufio.read(1))
|
|
self.assertEqual(b"bc", bufio.read1())
|
|
self.assertEqual(b"d", bufio.read1())
|
|
self.assertEqual(b"efg", bufio.read1(-1))
|
|
self.assertEqual(rawio._reads, 3)
|
|
self.assertEqual(b"", bufio.read1())
|
|
self.assertEqual(rawio._reads, 4)
|
|
|
|
def test_readinto(self):
|
|
rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
|
bufio = self.tp(rawio)
|
|
b = bytearray(2)
|
|
self.assertEqual(bufio.readinto(b), 2)
|
|
self.assertEqual(b, b"ab")
|
|
self.assertEqual(bufio.readinto(b), 2)
|
|
self.assertEqual(b, b"cd")
|
|
self.assertEqual(bufio.readinto(b), 2)
|
|
self.assertEqual(b, b"ef")
|
|
self.assertEqual(bufio.readinto(b), 1)
|
|
self.assertEqual(b, b"gf")
|
|
self.assertEqual(bufio.readinto(b), 0)
|
|
self.assertEqual(b, b"gf")
|
|
rawio = self.MockRawIO((b"abc", None))
|
|
bufio = self.tp(rawio)
|
|
self.assertEqual(bufio.readinto(b), 2)
|
|
self.assertEqual(b, b"ab")
|
|
self.assertEqual(bufio.readinto(b), 1)
|
|
self.assertEqual(b, b"cb")
|
|
|
|
def test_readinto1(self):
|
|
buffer_size = 10
|
|
rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
|
|
bufio = self.tp(rawio, buffer_size=buffer_size)
|
|
b = bytearray(2)
|
|
self.assertEqual(bufio.peek(3), b'abc')
|
|
self.assertEqual(rawio._reads, 1)
|
|
self.assertEqual(bufio.readinto1(b), 2)
|
|
self.assertEqual(b, b"ab")
|
|
self.assertEqual(rawio._reads, 1)
|
|
self.assertEqual(bufio.readinto1(b), 1)
|
|
self.assertEqual(b[:1], b"c")
|
|
self.assertEqual(rawio._reads, 1)
|
|
self.assertEqual(bufio.readinto1(b), 2)
|
|
self.assertEqual(b, b"de")
|
|
self.assertEqual(rawio._reads, 2)
|
|
b = bytearray(2*buffer_size)
|
|
self.assertEqual(bufio.peek(3), b'fgh')
|
|
self.assertEqual(rawio._reads, 3)
|
|
self.assertEqual(bufio.readinto1(b), 6)
|
|
self.assertEqual(b[:6], b"fghjkl")
|
|
self.assertEqual(rawio._reads, 4)
|
|
|
|
def test_readinto_array(self):
|
|
buffer_size = 60
|
|
data = b"a" * 26
|
|
rawio = self.MockRawIO((data,))
|
|
bufio = self.tp(rawio, buffer_size=buffer_size)
|
|
|
|
# Create an array with element size > 1 byte
|
|
b = array.array('i', b'x' * 32)
|
|
assert len(b) != 16
|
|
|
|
# Read into it. We should get as many *bytes* as we can fit into b
|
|
# (which is more than the number of elements)
|
|
n = bufio.readinto(b)
|
|
self.assertGreater(n, len(b))
|
|
|
|
# Check that old contents of b are preserved
|
|
bm = memoryview(b).cast('B')
|
|
self.assertLess(n, len(bm))
|
|
self.assertEqual(bm[:n], data[:n])
|
|
self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
|
|
|
|
def test_readinto1_array(self):
|
|
buffer_size = 60
|
|
data = b"a" * 26
|
|
rawio = self.MockRawIO((data,))
|
|
bufio = self.tp(rawio, buffer_size=buffer_size)
|
|
|
|
# Create an array with element size > 1 byte
|
|
b = array.array('i', b'x' * 32)
|
|
assert len(b) != 16
|
|
|
|
# Read into it. We should get as many *bytes* as we can fit into b
|
|
# (which is more than the number of elements)
|
|
n = bufio.readinto1(b)
|
|
self.assertGreater(n, len(b))
|
|
|
|
# Check that old contents of b are preserved
|
|
bm = memoryview(b).cast('B')
|
|
self.assertLess(n, len(bm))
|
|
self.assertEqual(bm[:n], data[:n])
|
|
self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
|
|
|
|
def test_readlines(self):
|
|
def bufio():
|
|
rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
|
|
return self.tp(rawio)
|
|
self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
|
|
self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
|
|
self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
|
|
|
|
def test_buffering(self):
|
|
data = b"abcdefghi"
|
|
dlen = len(data)
|
|
|
|
tests = [
|
|
[ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
|
|
[ 100, [ 3, 3, 3], [ dlen ] ],
|
|
[ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
|
|
]
|
|
|
|
for bufsize, buf_read_sizes, raw_read_sizes in tests:
|
|
rawio = self.MockFileIO(data)
|
|
bufio = self.tp(rawio, buffer_size=bufsize)
|
|
pos = 0
|
|
for nbytes in buf_read_sizes:
|
|
self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
|
|
pos += nbytes
|
|
# this is mildly implementation-dependent
|
|
self.assertEqual(rawio.read_history, raw_read_sizes)
|
|
|
|
def test_read_non_blocking(self):
|
|
# Inject some None's in there to simulate EWOULDBLOCK
|
|
rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
|
|
bufio = self.tp(rawio)
|
|
self.assertEqual(b"abcd", bufio.read(6))
|
|
self.assertEqual(b"e", bufio.read(1))
|
|
self.assertEqual(b"fg", bufio.read())
|
|
self.assertEqual(b"", bufio.peek(1))
|
|
self.assertIsNone(bufio.read())
|
|
self.assertEqual(b"", bufio.read())
|
|
|
|
rawio = self.MockRawIO((b"a", None, None))
|
|
self.assertEqual(b"a", rawio.readall())
|
|
self.assertIsNone(rawio.readall())
|
|
|
|
def test_read_past_eof(self):
|
|
rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
|
bufio = self.tp(rawio)
|
|
|
|
self.assertEqual(b"abcdefg", bufio.read(9000))
|
|
|
|
def test_read_all(self):
|
|
rawio = self.MockRawIO((b"abc", b"d", b"efg"))
|
|
bufio = self.tp(rawio)
|
|
|
|
self.assertEqual(b"abcdefg", bufio.read())
|
|
|
|
@threading_helper.requires_working_threading()
|
|
@support.requires_resource('cpu')
|
|
def test_threads(self):
|
|
try:
|
|
# Write out many bytes with exactly the same number of 0's,
|
|
# 1's... 255's. This will help us check that concurrent reading
|
|
# doesn't duplicate or forget contents.
|
|
N = 1000
|
|
l = list(range(256)) * N
|
|
random.shuffle(l)
|
|
s = bytes(bytearray(l))
|
|
with self.open(os_helper.TESTFN, "wb") as f:
|
|
f.write(s)
|
|
with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
|
|
bufio = self.tp(raw, 8)
|
|
errors = []
|
|
results = []
|
|
def f():
|
|
try:
|
|
# Intra-buffer read then buffer-flushing read
|
|
for n in cycle([1, 19]):
|
|
s = bufio.read(n)
|
|
if not s:
|
|
break
|
|
# list.append() is atomic
|
|
results.append(s)
|
|
except Exception as e:
|
|
errors.append(e)
|
|
raise
|
|
threads = [threading.Thread(target=f) for x in range(20)]
|
|
with threading_helper.start_threads(threads):
|
|
time.sleep(0.02) # yield
|
|
self.assertFalse(errors,
|
|
"the following exceptions were caught: %r" % errors)
|
|
s = b''.join(results)
|
|
for i in range(256):
|
|
c = bytes(bytearray([i]))
|
|
self.assertEqual(s.count(c), N)
|
|
finally:
|
|
os_helper.unlink(os_helper.TESTFN)
|
|
|
|
def test_unseekable(self):
|
|
bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
|
|
self.assertRaises(self.UnsupportedOperation, bufio.tell)
|
|
self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
|
|
bufio.read(1)
|
|
self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
|
|
self.assertRaises(self.UnsupportedOperation, bufio.tell)
|
|
|
|
def test_misbehaved_io(self):
|
|
rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
|
|
bufio = self.tp(rawio)
|
|
self.assertRaises(OSError, bufio.seek, 0)
|
|
self.assertRaises(OSError, bufio.tell)
|
|
|
|
# Silence destructor error
|
|
bufio.close = lambda: None
|
|
|
|
def test_no_extraneous_read(self):
|
|
# Issue #9550; when the raw IO object has satisfied the read request,
|
|
# we should not issue any additional reads, otherwise it may block
|
|
# (e.g. socket).
|
|
bufsize = 16
|
|
for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
|
|
rawio = self.MockRawIO([b"x" * n])
|
|
bufio = self.tp(rawio, bufsize)
|
|
self.assertEqual(bufio.read(n), b"x" * n)
|
|
# Simple case: one raw read is enough to satisfy the request.
|
|
self.assertEqual(rawio._extraneous_reads, 0,
|
|
"failed for {}: {} != 0".format(n, rawio._extraneous_reads))
|
|
# A more complex case where two raw reads are needed to satisfy
|
|
# the request.
|
|
rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
|
|
bufio = self.tp(rawio, bufsize)
|
|
self.assertEqual(bufio.read(n), b"x" * n)
|
|
self.assertEqual(rawio._extraneous_reads, 0,
|
|
"failed for {}: {} != 0".format(n, rawio._extraneous_reads))
|
|
|
|
def test_read_on_closed(self):
|
|
# Issue #23796
|
|
b = self.BufferedReader(self.BytesIO(b"12"))
|
|
b.read(1)
|
|
b.close()
|
|
with self.subTest('peek'):
|
|
self.assertRaises(ValueError, b.peek)
|
|
with self.subTest('read1'):
|
|
self.assertRaises(ValueError, b.read1, 1)
|
|
with self.subTest('read'):
|
|
self.assertRaises(ValueError, b.read)
|
|
with self.subTest('readinto'):
|
|
self.assertRaises(ValueError, b.readinto, bytearray())
|
|
with self.subTest('readinto1'):
|
|
self.assertRaises(ValueError, b.readinto1, bytearray())
|
|
with self.subTest('flush'):
|
|
self.assertRaises(ValueError, b.flush)
|
|
with self.subTest('truncate'):
|
|
self.assertRaises(ValueError, b.truncate)
|
|
with self.subTest('seek'):
|
|
self.assertRaises(ValueError, b.seek, 0)
|
|
|
|
def test_truncate_on_read_only(self):
|
|
rawio = self.MockFileIO(b"abc")
|
|
bufio = self.tp(rawio)
|
|
self.assertFalse(bufio.writable())
|
|
self.assertRaises(self.UnsupportedOperation, bufio.truncate)
|
|
self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
|
|
|
|
def test_tell_character_device_file(self):
|
|
# GH-95782
|
|
# For the (former) bug in BufferedIO to manifest, the wrapped IO obj
|
|
# must be able to produce at least 2 bytes.
|
|
raw = self.MockCharPseudoDevFileIO(b"12")
|
|
buf = self.tp(raw)
|
|
self.assertEqual(buf.tell(), 0)
|
|
self.assertEqual(buf.read(1), b"1")
|
|
self.assertEqual(buf.tell(), 0)
|
|
|
|
def test_seek_character_device_file(self):
|
|
raw = self.MockCharPseudoDevFileIO(b"12")
|
|
buf = self.tp(raw)
|
|
self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
|
|
self.assertEqual(buf.seek(1, io.SEEK_SET), 0)
|
|
self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
|
|
self.assertEqual(buf.read(1), b"1")
|
|
|
|
# In the C implementation, tell() sets the BufferedIO's abs_pos to 0,
|
|
# which means that the next seek() could return a negative offset if it
|
|
# does not sanity-check:
|
|
self.assertEqual(buf.tell(), 0)
|
|
self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
|
|
|
|
|
|
class CBufferedReaderTest(BufferedReaderTest, SizeofTest, CTestCase):
|
|
tp = io.BufferedReader
|
|
|
|
def test_initialization(self):
|
|
rawio = self.MockRawIO([b"abc"])
|
|
bufio = self.tp(rawio)
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
|
|
self.assertRaises(ValueError, bufio.read)
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
|
|
self.assertRaises(ValueError, bufio.read)
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
|
|
self.assertRaises(ValueError, bufio.read)
|
|
|
|
def test_misbehaved_io_read(self):
|
|
rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
|
|
bufio = self.tp(rawio)
|
|
# _pyio.BufferedReader seems to implement reading different, so that
|
|
# checking this is not so easy.
|
|
self.assertRaises(OSError, bufio.read, 10)
|
|
|
|
def test_garbage_collection(self):
|
|
# C BufferedReader objects are collected.
|
|
# The Python version has __del__, so it ends into gc.garbage instead
|
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
|
# Note that using warnings_helper.check_warnings() will keep the
|
|
# file alive due to the `source` argument to warn(). So, use
|
|
# catch_warnings() instead.
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore", ResourceWarning)
|
|
rawio = self.FileIO(os_helper.TESTFN, "w+b")
|
|
f = self.tp(rawio)
|
|
f.f = f
|
|
wr = weakref.ref(f)
|
|
del f
|
|
support.gc_collect()
|
|
self.assertIsNone(wr(), wr)
|
|
|
|
def test_args_error(self):
|
|
# Issue #17275
|
|
with self.assertRaisesRegex(TypeError, "BufferedReader"):
|
|
self.tp(self.BytesIO(), 1024, 1024, 1024)
|
|
|
|
def test_bad_readinto_value(self):
|
|
rawio = self.tp(self.BytesIO(b"12"))
|
|
rawio.readinto = lambda buf: -1
|
|
bufio = self.tp(rawio)
|
|
with self.assertRaises(OSError) as cm:
|
|
bufio.readline()
|
|
self.assertIsNone(cm.exception.__cause__)
|
|
|
|
def test_bad_readinto_type(self):
|
|
rawio = self.tp(self.BytesIO(b"12"))
|
|
rawio.readinto = lambda buf: b''
|
|
bufio = self.tp(rawio)
|
|
with self.assertRaises(OSError) as cm:
|
|
bufio.readline()
|
|
self.assertIsInstance(cm.exception.__cause__, TypeError)
|
|
|
|
|
|
class PyBufferedReaderTest(BufferedReaderTest, PyTestCase):
|
|
tp = pyio.BufferedReader
|
|
|
|
|
|
class BufferedWriterTest(CommonBufferedTests):
|
|
write_mode = "wb"
|
|
|
|
def test_constructor(self):
|
|
rawio = self.MockRawIO()
|
|
bufio = self.tp(rawio)
|
|
bufio.__init__(rawio)
|
|
bufio.__init__(rawio, buffer_size=1024)
|
|
bufio.__init__(rawio, buffer_size=16)
|
|
self.assertEqual(3, bufio.write(b"abc"))
|
|
bufio.flush()
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
|
|
bufio.__init__(rawio)
|
|
self.assertEqual(3, bufio.write(b"ghi"))
|
|
bufio.flush()
|
|
self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
|
|
|
|
def test_uninitialized(self):
|
|
bufio = self.tp.__new__(self.tp)
|
|
del bufio
|
|
bufio = self.tp.__new__(self.tp)
|
|
self.assertRaisesRegex((ValueError, AttributeError),
|
|
'uninitialized|has no attribute',
|
|
bufio.write, b'')
|
|
bufio.__init__(self.MockRawIO())
|
|
self.assertEqual(bufio.write(b''), 0)
|
|
|
|
def test_detach_flush(self):
|
|
raw = self.MockRawIO()
|
|
buf = self.tp(raw)
|
|
buf.write(b"howdy!")
|
|
self.assertFalse(raw._write_stack)
|
|
buf.detach()
|
|
self.assertEqual(raw._write_stack, [b"howdy!"])
|
|
|
|
def test_write(self):
|
|
# Write to the buffered IO but don't overflow the buffer.
|
|
writer = self.MockRawIO()
|
|
bufio = self.tp(writer, 8)
|
|
bufio.write(b"abc")
|
|
self.assertFalse(writer._write_stack)
|
|
buffer = bytearray(b"def")
|
|
bufio.write(buffer)
|
|
buffer[:] = b"***" # Overwrite our copy of the data
|
|
bufio.flush()
|
|
self.assertEqual(b"".join(writer._write_stack), b"abcdef")
|
|
|
|
def test_write_overflow(self):
|
|
writer = self.MockRawIO()
|
|
bufio = self.tp(writer, 8)
|
|
contents = b"abcdefghijklmnop"
|
|
for n in range(0, len(contents), 3):
|
|
bufio.write(contents[n:n+3])
|
|
flushed = b"".join(writer._write_stack)
|
|
# At least (total - 8) bytes were implicitly flushed, perhaps more
|
|
# depending on the implementation.
|
|
self.assertStartsWith(flushed, contents[:-8])
|
|
|
|
def check_writes(self, intermediate_func):
|
|
# Lots of writes, test the flushed output is as expected.
|
|
contents = bytes(range(256)) * 1000
|
|
n = 0
|
|
writer = self.MockRawIO()
|
|
bufio = self.tp(writer, 13)
|
|
# Generator of write sizes: repeat each N 15 times then proceed to N+1
|
|
def gen_sizes():
|
|
for size in count(1):
|
|
for i in range(15):
|
|
yield size
|
|
sizes = gen_sizes()
|
|
while n < len(contents):
|
|
size = min(next(sizes), len(contents) - n)
|
|
self.assertEqual(bufio.write(contents[n:n+size]), size)
|
|
intermediate_func(bufio)
|
|
n += size
|
|
bufio.flush()
|
|
self.assertEqual(contents, b"".join(writer._write_stack))
|
|
|
|
def test_writes(self):
|
|
self.check_writes(lambda bufio: None)
|
|
|
|
def test_writes_and_flushes(self):
|
|
self.check_writes(lambda bufio: bufio.flush())
|
|
|
|
def test_writes_and_seeks(self):
|
|
def _seekabs(bufio):
|
|
pos = bufio.tell()
|
|
bufio.seek(pos + 1, 0)
|
|
bufio.seek(pos - 1, 0)
|
|
bufio.seek(pos, 0)
|
|
self.check_writes(_seekabs)
|
|
def _seekrel(bufio):
|
|
pos = bufio.seek(0, 1)
|
|
bufio.seek(+1, 1)
|
|
bufio.seek(-1, 1)
|
|
bufio.seek(pos, 0)
|
|
self.check_writes(_seekrel)
|
|
|
|
def test_writes_and_truncates(self):
|
|
self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
|
|
|
|
def test_write_non_blocking(self):
|
|
raw = self.MockNonBlockWriterIO()
|
|
bufio = self.tp(raw, 8)
|
|
|
|
self.assertEqual(bufio.write(b"abcd"), 4)
|
|
self.assertEqual(bufio.write(b"efghi"), 5)
|
|
# 1 byte will be written, the rest will be buffered
|
|
raw.block_on(b"k")
|
|
self.assertEqual(bufio.write(b"jklmn"), 5)
|
|
|
|
# 8 bytes will be written, 8 will be buffered and the rest will be lost
|
|
raw.block_on(b"0")
|
|
try:
|
|
bufio.write(b"opqrwxyz0123456789")
|
|
except self.BlockingIOError as e:
|
|
written = e.characters_written
|
|
else:
|
|
self.fail("BlockingIOError should have been raised")
|
|
self.assertEqual(written, 16)
|
|
self.assertEqual(raw.pop_written(),
|
|
b"abcdefghijklmnopqrwxyz")
|
|
|
|
self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
|
|
s = raw.pop_written()
|
|
# Previously buffered bytes were flushed
|
|
self.assertStartsWith(s, b"01234567A")
|
|
|
|
def test_write_and_rewind(self):
|
|
raw = self.BytesIO()
|
|
bufio = self.tp(raw, 4)
|
|
self.assertEqual(bufio.write(b"abcdef"), 6)
|
|
self.assertEqual(bufio.tell(), 6)
|
|
bufio.seek(0, 0)
|
|
self.assertEqual(bufio.write(b"XY"), 2)
|
|
bufio.seek(6, 0)
|
|
self.assertEqual(raw.getvalue(), b"XYcdef")
|
|
self.assertEqual(bufio.write(b"123456"), 6)
|
|
bufio.flush()
|
|
self.assertEqual(raw.getvalue(), b"XYcdef123456")
|
|
|
|
def test_flush(self):
|
|
writer = self.MockRawIO()
|
|
bufio = self.tp(writer, 8)
|
|
bufio.write(b"abc")
|
|
bufio.flush()
|
|
self.assertEqual(b"abc", writer._write_stack[0])
|
|
|
|
def test_writelines(self):
|
|
l = [b'ab', b'cd', b'ef']
|
|
writer = self.MockRawIO()
|
|
bufio = self.tp(writer, 8)
|
|
bufio.writelines(l)
|
|
bufio.flush()
|
|
self.assertEqual(b''.join(writer._write_stack), b'abcdef')
|
|
|
|
def test_writelines_userlist(self):
|
|
l = UserList([b'ab', b'cd', b'ef'])
|
|
writer = self.MockRawIO()
|
|
bufio = self.tp(writer, 8)
|
|
bufio.writelines(l)
|
|
bufio.flush()
|
|
self.assertEqual(b''.join(writer._write_stack), b'abcdef')
|
|
|
|
def test_writelines_error(self):
|
|
writer = self.MockRawIO()
|
|
bufio = self.tp(writer, 8)
|
|
self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
|
|
self.assertRaises(TypeError, bufio.writelines, None)
|
|
self.assertRaises(TypeError, bufio.writelines, 'abc')
|
|
|
|
def test_destructor(self):
|
|
writer = self.MockRawIO()
|
|
bufio = self.tp(writer, 8)
|
|
bufio.write(b"abc")
|
|
del bufio
|
|
support.gc_collect()
|
|
self.assertEqual(b"abc", writer._write_stack[0])
|
|
|
|
def test_truncate(self):
|
|
# Truncate implicitly flushes the buffer.
|
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
|
with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
|
|
bufio = self.tp(raw, 8)
|
|
bufio.write(b"abcdef")
|
|
self.assertEqual(bufio.truncate(3), 3)
|
|
self.assertEqual(bufio.tell(), 6)
|
|
with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
|
|
self.assertEqual(f.read(), b"abc")
|
|
|
|
def test_truncate_after_write(self):
|
|
# Ensure that truncate preserves the file position after
|
|
# writes longer than the buffer size.
|
|
# Issue: https://bugs.python.org/issue32228
|
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
|
with self.open(os_helper.TESTFN, "wb") as f:
|
|
# Fill with some buffer
|
|
f.write(b'\x00' * 10000)
|
|
buffer_sizes = [8192, 4096, 200]
|
|
for buffer_size in buffer_sizes:
|
|
with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
|
|
f.write(b'\x00' * (buffer_size + 1))
|
|
# After write write_pos and write_end are set to 0
|
|
f.read(1)
|
|
# read operation makes sure that pos != raw_pos
|
|
f.truncate()
|
|
self.assertEqual(f.tell(), buffer_size + 2)
|
|
|
|
@threading_helper.requires_working_threading()
|
|
@support.requires_resource('cpu')
|
|
def test_threads(self):
|
|
try:
|
|
# Write out many bytes from many threads and test they were
|
|
# all flushed.
|
|
N = 1000
|
|
contents = bytes(range(256)) * N
|
|
sizes = cycle([1, 19])
|
|
n = 0
|
|
queue = deque()
|
|
while n < len(contents):
|
|
size = next(sizes)
|
|
queue.append(contents[n:n+size])
|
|
n += size
|
|
del contents
|
|
# We use a real file object because it allows us to
|
|
# exercise situations where the GIL is released before
|
|
# writing the buffer to the raw streams. This is in addition
|
|
# to concurrency issues due to switching threads in the middle
|
|
# of Python code.
|
|
with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
|
|
bufio = self.tp(raw, 8)
|
|
errors = []
|
|
def f():
|
|
try:
|
|
while True:
|
|
try:
|
|
s = queue.popleft()
|
|
except IndexError:
|
|
return
|
|
bufio.write(s)
|
|
except Exception as e:
|
|
errors.append(e)
|
|
raise
|
|
threads = [threading.Thread(target=f) for x in range(20)]
|
|
with threading_helper.start_threads(threads):
|
|
time.sleep(0.02) # yield
|
|
self.assertFalse(errors,
|
|
"the following exceptions were caught: %r" % errors)
|
|
bufio.close()
|
|
with self.open(os_helper.TESTFN, "rb") as f:
|
|
s = f.read()
|
|
for i in range(256):
|
|
self.assertEqual(s.count(bytes([i])), N)
|
|
finally:
|
|
os_helper.unlink(os_helper.TESTFN)
|
|
|
|
def test_misbehaved_io(self):
|
|
rawio = self.MisbehavedRawIO()
|
|
bufio = self.tp(rawio, 5)
|
|
self.assertRaises(OSError, bufio.seek, 0)
|
|
self.assertRaises(OSError, bufio.tell)
|
|
self.assertRaises(OSError, bufio.write, b"abcdef")
|
|
|
|
# Silence destructor error
|
|
bufio.close = lambda: None
|
|
|
|
def test_max_buffer_size_removal(self):
|
|
with self.assertRaises(TypeError):
|
|
self.tp(self.MockRawIO(), 8, 12)
|
|
|
|
def test_write_error_on_close(self):
|
|
raw = self.MockRawIO()
|
|
def bad_write(b):
|
|
raise OSError()
|
|
raw.write = bad_write
|
|
b = self.tp(raw)
|
|
b.write(b'spam')
|
|
self.assertRaises(OSError, b.close) # exception not swallowed
|
|
self.assertTrue(b.closed)
|
|
|
|
@threading_helper.requires_working_threading()
|
|
def test_slow_close_from_thread(self):
|
|
# Issue #31976
|
|
rawio = self.SlowFlushRawIO()
|
|
bufio = self.tp(rawio, 8)
|
|
t = threading.Thread(target=bufio.close)
|
|
t.start()
|
|
rawio.in_flush.wait()
|
|
self.assertRaises(ValueError, bufio.write, b'spam')
|
|
self.assertTrue(bufio.closed)
|
|
t.join()
|
|
|
|
|
|
class CBufferedWriterTest(BufferedWriterTest, SizeofTest, CTestCase):
|
|
tp = io.BufferedWriter
|
|
|
|
def test_initialization(self):
|
|
rawio = self.MockRawIO()
|
|
bufio = self.tp(rawio)
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
|
|
self.assertRaises(ValueError, bufio.write, b"def")
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
|
|
self.assertRaises(ValueError, bufio.write, b"def")
|
|
self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
|
|
self.assertRaises(ValueError, bufio.write, b"def")
|
|
|
|
def test_garbage_collection(self):
|
|
# C BufferedWriter objects are collected, and collecting them flushes
|
|
# all data to disk.
|
|
# The Python version has __del__, so it ends into gc.garbage instead
|
|
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
|
# Note that using warnings_helper.check_warnings() will keep the
|
|
# file alive due to the `source` argument to warn(). So, use
|
|
# catch_warnings() instead.
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore", ResourceWarning)
|
|
rawio = self.FileIO(os_helper.TESTFN, "w+b")
|
|
f = self.tp(rawio)
|
|
f.write(b"123xxx")
|
|
f.x = f
|
|
wr = weakref.ref(f)
|
|
del f
|
|
support.gc_collect()
|
|
self.assertIsNone(wr(), wr)
|
|
with self.open(os_helper.TESTFN, "rb") as f:
|
|
self.assertEqual(f.read(), b"123xxx")
|
|
|
|
def test_args_error(self):
|
|
# Issue #17275
|
|
with self.assertRaisesRegex(TypeError, "BufferedWriter"):
|
|
self.tp(self.BytesIO(), 1024, 1024, 1024)
|
|
|
|
def test_non_boolean_closed_attr(self):
|
|
# gh-140650: check TypeError is raised
|
|
class MockRawIOWithoutClosed(self.MockRawIO):
|
|
closed = NotImplemented
|
|
|
|
bufio = self.tp(MockRawIOWithoutClosed())
|
|
self.assertRaises(TypeError, bufio.write, b"")
|
|
self.assertRaises(TypeError, bufio.flush)
|
|
self.assertRaises(TypeError, bufio.close)
|
|
|
|
def test_closed_attr_raises(self):
|
|
class MockRawIOClosedRaises(self.MockRawIO):
|
|
@property
|
|
def closed(self):
|
|
raise ValueError("test")
|
|
|
|
bufio = self.tp(MockRawIOClosedRaises())
|
|
self.assertRaisesRegex(ValueError, "test", bufio.write, b"")
|
|
self.assertRaisesRegex(ValueError, "test", bufio.flush)
|
|
self.assertRaisesRegex(ValueError, "test", bufio.close)
|
|
|
|
|
|
class PyBufferedWriterTest(BufferedWriterTest, PyTestCase):
|
|
tp = pyio.BufferedWriter
|
|
|
|
class BufferedRWPairTest:
|
|
|
|
def test_constructor(self):
|
|
pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
|
self.assertFalse(pair.closed)
|
|
|
|
def test_uninitialized(self):
|
|
pair = self.tp.__new__(self.tp)
|
|
del pair
|
|
pair = self.tp.__new__(self.tp)
|
|
self.assertRaisesRegex((ValueError, AttributeError),
|
|
'uninitialized|has no attribute',
|
|
pair.read, 0)
|
|
self.assertRaisesRegex((ValueError, AttributeError),
|
|
'uninitialized|has no attribute',
|
|
pair.write, b'')
|
|
pair.__init__(self.MockRawIO(), self.MockRawIO())
|
|
self.assertEqual(pair.read(0), b'')
|
|
self.assertEqual(pair.write(b''), 0)
|
|
|
|
def test_detach(self):
|
|
pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
|
self.assertRaises(self.UnsupportedOperation, pair.detach)
|
|
|
|
def test_constructor_max_buffer_size_removal(self):
|
|
with self.assertRaises(TypeError):
|
|
self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
|
|
|
|
def test_constructor_with_not_readable(self):
|
|
class NotReadable(self.MockRawIO):
|
|
def readable(self):
|
|
return False
|
|
|
|
self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
|
|
|
|
def test_constructor_with_not_writeable(self):
|
|
class NotWriteable(self.MockRawIO):
|
|
def writable(self):
|
|
return False
|
|
|
|
self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
|
|
|
|
def test_read(self):
|
|
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
|
|
|
self.assertEqual(pair.read(3), b"abc")
|
|
self.assertEqual(pair.read(1), b"d")
|
|
self.assertEqual(pair.read(), b"ef")
|
|
pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
|
|
self.assertEqual(pair.read(None), b"abc")
|
|
|
|
def test_readlines(self):
|
|
pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
|
|
self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
|
|
self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
|
|
self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
|
|
|
|
def test_read1(self):
|
|
# .read1() is delegated to the underlying reader object, so this test
|
|
# can be shallow.
|
|
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
|
|
|
self.assertEqual(pair.read1(3), b"abc")
|
|
self.assertEqual(pair.read1(), b"def")
|
|
|
|
def test_readinto(self):
|
|
for method in ("readinto", "readinto1"):
|
|
with self.subTest(method):
|
|
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
|
|
|
data = byteslike(b'\0' * 5)
|
|
self.assertEqual(getattr(pair, method)(data), 5)
|
|
self.assertEqual(bytes(data), b"abcde")
|
|
|
|
# gh-138720: C BufferedRWPair would destruct in a bad order resulting in
|
|
# an unraisable exception.
|
|
support.gc_collect()
|
|
|
|
def test_write(self):
|
|
w = self.MockRawIO()
|
|
pair = self.tp(self.MockRawIO(), w)
|
|
|
|
pair.write(b"abc")
|
|
pair.flush()
|
|
buffer = bytearray(b"def")
|
|
pair.write(buffer)
|
|
buffer[:] = b"***" # Overwrite our copy of the data
|
|
pair.flush()
|
|
self.assertEqual(w._write_stack, [b"abc", b"def"])
|
|
|
|
def test_peek(self):
|
|
pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
|
|
|
|
self.assertStartsWith(pair.peek(3), b"abc")
|
|
self.assertEqual(pair.read(3), b"abc")
|
|
|
|
def test_readable(self):
|
|
pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
|
self.assertTrue(pair.readable())
|
|
|
|
def test_writeable(self):
|
|
pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
|
self.assertTrue(pair.writable())
|
|
|
|
def test_seekable(self):
|
|
# BufferedRWPairs are never seekable, even if their readers and writers
|
|
# are.
|
|
pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
|
self.assertFalse(pair.seekable())
|
|
|
|
# .flush() is delegated to the underlying writer object and has been
|
|
# tested in the test_write method.
|
|
|
|
def test_close_and_closed(self):
|
|
pair = self.tp(self.MockRawIO(), self.MockRawIO())
|
|
self.assertFalse(pair.closed)
|
|
pair.close()
|
|
self.assertTrue(pair.closed)
|
|
|
|
def test_reader_close_error_on_close(self):
|
|
def reader_close():
|
|
reader_non_existing
|
|
reader = self.MockRawIO()
|
|
reader.close = reader_close
|
|
writer = self.MockRawIO()
|
|
pair = self.tp(reader, writer)
|
|
with self.assertRaises(NameError) as err:
|
|
pair.close()
|
|
self.assertIn('reader_non_existing', str(err.exception))
|
|
self.assertTrue(pair.closed)
|
|
self.assertFalse(reader.closed)
|
|
self.assertTrue(writer.closed)
|
|
|
|
# Silence destructor error
|
|
reader.close = lambda: None
|
|
|
|
def test_writer_close_error_on_close(self):
|
|
def writer_close():
|
|
writer_non_existing
|
|
reader = self.MockRawIO()
|
|
writer = self.MockRawIO()
|
|
writer.close = writer_close
|
|
pair = self.tp(reader, writer)
|
|
with self.assertRaises(NameError) as err:
|
|
pair.close()
|
|
self.assertIn('writer_non_existing', str(err.exception))
|
|
self.assertFalse(pair.closed)
|
|
self.assertTrue(reader.closed)
|
|
self.assertFalse(writer.closed)
|
|
|
|
# Silence destructor error
|
|
writer.close = lambda: None
|
|
writer = None
|
|
|
|
# Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
|
|
with support.catch_unraisable_exception():
|
|
# Ignore BufferedRWPair unraisable exception
|
|
with support.catch_unraisable_exception():
|
|
pair = None
|
|
support.gc_collect()
|
|
support.gc_collect()
|
|
|
|
def test_reader_writer_close_error_on_close(self):
|
|
def reader_close():
|
|
reader_non_existing
|
|
def writer_close():
|
|
writer_non_existing
|
|
reader = self.MockRawIO()
|
|
reader.close = reader_close
|
|
writer = self.MockRawIO()
|
|
writer.close = writer_close
|
|
pair = self.tp(reader, writer)
|
|
with self.assertRaises(NameError) as err:
|
|
pair.close()
|
|
self.assertIn('reader_non_existing', str(err.exception))
|
|
self.assertIsInstance(err.exception.__context__, NameError)
|
|
self.assertIn('writer_non_existing', str(err.exception.__context__))
|
|
self.assertFalse(pair.closed)
|
|
self.assertFalse(reader.closed)
|
|
self.assertFalse(writer.closed)
|
|
|
|
# Silence destructor error
|
|
reader.close = lambda: None
|
|
writer.close = lambda: None
|
|
|
|
def test_isatty(self):
|
|
class SelectableIsAtty(self.MockRawIO):
|
|
def __init__(self, isatty):
|
|
super().__init__()
|
|
self._isatty = isatty
|
|
|
|
def isatty(self):
|
|
return self._isatty
|
|
|
|
pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
|
|
self.assertFalse(pair.isatty())
|
|
|
|
pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
|
|
self.assertTrue(pair.isatty())
|
|
|
|
pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
|
|
self.assertTrue(pair.isatty())
|
|
|
|
pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
|
|
self.assertTrue(pair.isatty())
|
|
|
|
def test_weakref_clearing(self):
|
|
brw = self.tp(self.MockRawIO(), self.MockRawIO())
|
|
ref = weakref.ref(brw)
|
|
brw = None
|
|
ref = None # Shouldn't segfault.
|
|
|
|
class CBufferedRWPairTest(BufferedRWPairTest, CTestCase):
|
|
tp = io.BufferedRWPair
|
|
|
|
class PyBufferedRWPairTest(BufferedRWPairTest, PyTestCase):
|
|
tp = pyio.BufferedRWPair
|
|
|
|
|
|
class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
|
|
read_mode = "rb+"
|
|
write_mode = "wb+"
|
|
|
|
def test_constructor(self):
|
|
BufferedReaderTest.test_constructor(self)
|
|
BufferedWriterTest.test_constructor(self)
|
|
|
|
def test_uninitialized(self):
|
|
BufferedReaderTest.test_uninitialized(self)
|
|
BufferedWriterTest.test_uninitialized(self)
|
|
|
|
def test_read_and_write(self):
|
|
raw = self.MockRawIO((b"asdf", b"ghjk"))
|
|
rw = self.tp(raw, 8)
|
|
|
|
self.assertEqual(b"as", rw.read(2))
|
|
rw.write(b"ddd")
|
|
rw.write(b"eee")
|
|
self.assertFalse(raw._write_stack) # Buffer writes
|
|
self.assertEqual(b"ghjk", rw.read())
|
|
self.assertEqual(b"dddeee", raw._write_stack[0])
|
|
|
|
def test_seek_and_tell(self):
|
|
raw = self.BytesIO(b"asdfghjkl")
|
|
rw = self.tp(raw)
|
|
|
|
self.assertEqual(b"as", rw.read(2))
|
|
self.assertEqual(2, rw.tell())
|
|
rw.seek(0, 0)
|
|
self.assertEqual(b"asdf", rw.read(4))
|
|
|
|
rw.write(b"123f")
|
|
rw.seek(0, 0)
|
|
self.assertEqual(b"asdf123fl", rw.read())
|
|
self.assertEqual(9, rw.tell())
|
|
rw.seek(-4, 2)
|
|
self.assertEqual(5, rw.tell())
|
|
rw.seek(2, 1)
|
|
self.assertEqual(7, rw.tell())
|
|
self.assertEqual(b"fl", rw.read(11))
|
|
rw.flush()
|
|
self.assertEqual(b"asdf123fl", raw.getvalue())
|
|
|
|
self.assertRaises(TypeError, rw.seek, 0.0)
|
|
|
|
def check_flush_and_read(self, read_func):
|
|
raw = self.BytesIO(b"abcdefghi")
|
|
bufio = self.tp(raw)
|
|
|
|
self.assertEqual(b"ab", read_func(bufio, 2))
|
|
bufio.write(b"12")
|
|
self.assertEqual(b"ef", read_func(bufio, 2))
|
|
self.assertEqual(6, bufio.tell())
|
|
bufio.flush()
|
|
self.assertEqual(6, bufio.tell())
|
|
self.assertEqual(b"ghi", read_func(bufio))
|
|
raw.seek(0, 0)
|
|
raw.write(b"XYZ")
|
|
# flush() resets the read buffer
|
|
bufio.flush()
|
|
bufio.seek(0, 0)
|
|
self.assertEqual(b"XYZ", read_func(bufio, 3))
|
|
|
|
def test_flush_and_read(self):
|
|
self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
|
|
|
|
def test_flush_and_readinto(self):
|
|
def _readinto(bufio, n=-1):
|
|
b = bytearray(n if n >= 0 else 9999)
|
|
n = bufio.readinto(b)
|
|
return bytes(b[:n])
|
|
self.check_flush_and_read(_readinto)
|
|
|
|
def test_flush_and_peek(self):
|
|
def _peek(bufio, n=-1):
|
|
# This relies on the fact that the buffer can contain the whole
|
|
# raw stream, otherwise peek() can return less.
|
|
b = bufio.peek(n)
|
|
if n != -1:
|
|
b = b[:n]
|
|
bufio.seek(len(b), 1)
|
|
return b
|
|
self.check_flush_and_read(_peek)
|
|
|
|
def test_flush_and_write(self):
|
|
raw = self.BytesIO(b"abcdefghi")
|
|
bufio = self.tp(raw)
|
|
|
|
bufio.write(b"123")
|
|
bufio.flush()
|
|
bufio.write(b"45")
|
|
bufio.flush()
|
|
bufio.seek(0, 0)
|
|
self.assertEqual(b"12345fghi", raw.getvalue())
|
|
self.assertEqual(b"12345fghi", bufio.read())
|
|
|
|
def test_threads(self):
|
|
BufferedReaderTest.test_threads(self)
|
|
BufferedWriterTest.test_threads(self)
|
|
|
|
def test_writes_and_peek(self):
|
|
def _peek(bufio):
|
|
bufio.peek(1)
|
|
self.check_writes(_peek)
|
|
def _peek(bufio):
|
|
pos = bufio.tell()
|
|
bufio.seek(-1, 1)
|
|
bufio.peek(1)
|
|
bufio.seek(pos, 0)
|
|
self.check_writes(_peek)
|
|
|
|
def test_writes_and_reads(self):
|
|
def _read(bufio):
|
|
bufio.seek(-1, 1)
|
|
bufio.read(1)
|
|
self.check_writes(_read)
|
|
|
|
def test_writes_and_read1s(self):
|
|
def _read1(bufio):
|
|
bufio.seek(-1, 1)
|
|
bufio.read1(1)
|
|
self.check_writes(_read1)
|
|
|
|
def test_writes_and_readintos(self):
|
|
def _read(bufio):
|
|
bufio.seek(-1, 1)
|
|
bufio.readinto(bytearray(1))
|
|
self.check_writes(_read)
|
|
|
|
def test_write_after_readahead(self):
|
|
# Issue #6629: writing after the buffer was filled by readahead should
|
|
# first rewind the raw stream.
|
|
for overwrite_size in [1, 5]:
|
|
raw = self.BytesIO(b"A" * 10)
|
|
bufio = self.tp(raw, 4)
|
|
# Trigger readahead
|
|
self.assertEqual(bufio.read(1), b"A")
|
|
self.assertEqual(bufio.tell(), 1)
|
|
# Overwriting should rewind the raw stream if it needs so
|
|
bufio.write(b"B" * overwrite_size)
|
|
self.assertEqual(bufio.tell(), overwrite_size + 1)
|
|
# If the write size was smaller than the buffer size, flush() and
|
|
# check that rewind happens.
|
|
bufio.flush()
|
|
self.assertEqual(bufio.tell(), overwrite_size + 1)
|
|
s = raw.getvalue()
|
|
self.assertEqual(s,
|
|
b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
|
|
|
|
def test_write_rewind_write(self):
|
|
# Various combinations of reading / writing / seeking backwards / writing again
|
|
def mutate(bufio, pos1, pos2):
|
|
assert pos2 >= pos1
|
|
# Fill the buffer
|
|
bufio.seek(pos1)
|
|
bufio.read(pos2 - pos1)
|
|
bufio.write(b'\x02')
|
|
# This writes earlier than the previous write, but still inside
|
|
# the buffer.
|
|
bufio.seek(pos1)
|
|
bufio.write(b'\x01')
|
|
|
|
b = b"\x80\x81\x82\x83\x84"
|
|
for i in range(0, len(b)):
|
|
for j in range(i, len(b)):
|
|
raw = self.BytesIO(b)
|
|
bufio = self.tp(raw, 100)
|
|
mutate(bufio, i, j)
|
|
bufio.flush()
|
|
expected = bytearray(b)
|
|
expected[j] = 2
|
|
expected[i] = 1
|
|
self.assertEqual(raw.getvalue(), expected,
|
|
"failed result for i=%d, j=%d" % (i, j))
|
|
|
|
def test_truncate_after_read_or_write(self):
|
|
raw = self.BytesIO(b"A" * 10)
|
|
bufio = self.tp(raw, 100)
|
|
self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
|
|
self.assertEqual(bufio.truncate(), 2)
|
|
self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
|
|
self.assertEqual(bufio.truncate(), 4)
|
|
|
|
def test_misbehaved_io(self):
|
|
BufferedReaderTest.test_misbehaved_io(self)
|
|
BufferedWriterTest.test_misbehaved_io(self)
|
|
|
|
def test_interleaved_read_write(self):
|
|
# Test for issue #12213
|
|
with self.BytesIO(b'abcdefgh') as raw:
|
|
with self.tp(raw, 100) as f:
|
|
f.write(b"1")
|
|
self.assertEqual(f.read(1), b'b')
|
|
f.write(b'2')
|
|
self.assertEqual(f.read1(1), b'd')
|
|
f.write(b'3')
|
|
buf = bytearray(1)
|
|
f.readinto(buf)
|
|
self.assertEqual(buf, b'f')
|
|
f.write(b'4')
|
|
self.assertEqual(f.peek(1), b'h')
|
|
f.flush()
|
|
self.assertEqual(raw.getvalue(), b'1b2d3f4h')
|
|
|
|
with self.BytesIO(b'abc') as raw:
|
|
with self.tp(raw, 100) as f:
|
|
self.assertEqual(f.read(1), b'a')
|
|
f.write(b"2")
|
|
self.assertEqual(f.read(1), b'c')
|
|
f.flush()
|
|
self.assertEqual(raw.getvalue(), b'a2c')
|
|
|
|
def test_read1_after_write(self):
|
|
with self.BytesIO(b'abcdef') as raw:
|
|
with self.tp(raw, 3) as f:
|
|
f.write(b"1")
|
|
self.assertEqual(f.read1(1), b'b')
|
|
f.flush()
|
|
self.assertEqual(raw.getvalue(), b'1bcdef')
|
|
with self.BytesIO(b'abcdef') as raw:
|
|
with self.tp(raw, 3) as f:
|
|
f.write(b"1")
|
|
self.assertEqual(f.read1(), b'bcd')
|
|
f.flush()
|
|
self.assertEqual(raw.getvalue(), b'1bcdef')
|
|
with self.BytesIO(b'abcdef') as raw:
|
|
with self.tp(raw, 3) as f:
|
|
f.write(b"1")
|
|
# XXX: read(100) returns different numbers of bytes
|
|
# in Python and C implementations.
|
|
self.assertEqual(f.read1(100)[:3], b'bcd')
|
|
f.flush()
|
|
self.assertEqual(raw.getvalue(), b'1bcdef')
|
|
|
|
def test_interleaved_readline_write(self):
|
|
with self.BytesIO(b'ab\ncdef\ng\n') as raw:
|
|
with self.tp(raw) as f:
|
|
f.write(b'1')
|
|
self.assertEqual(f.readline(), b'b\n')
|
|
f.write(b'2')
|
|
self.assertEqual(f.readline(), b'def\n')
|
|
f.write(b'3')
|
|
self.assertEqual(f.readline(), b'\n')
|
|
f.flush()
|
|
self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
|
|
|
|
# You can't construct a BufferedRandom over a non-seekable stream.
|
|
test_unseekable = None
|
|
|
|
# writable() returns True, so there's no point to test it over
|
|
# a writable stream.
|
|
test_truncate_on_read_only = None
|
|
|
|
|
|
class CBufferedRandomTest(BufferedRandomTest, SizeofTest, CTestCase):
|
|
tp = io.BufferedRandom
|
|
|
|
def test_garbage_collection(self):
|
|
CBufferedReaderTest.test_garbage_collection(self)
|
|
CBufferedWriterTest.test_garbage_collection(self)
|
|
|
|
def test_args_error(self):
|
|
# Issue #17275
|
|
with self.assertRaisesRegex(TypeError, "BufferedRandom"):
|
|
self.tp(self.BytesIO(), 1024, 1024, 1024)
|
|
|
|
|
|
class PyBufferedRandomTest(BufferedRandomTest, PyTestCase):
|
|
tp = pyio.BufferedRandom
|
|
|
|
|
|
# Simple test to ensure that optimizations in the IO library deliver the
|
|
# expected results. For best testing, run this under a debug-build Python too
|
|
# (to exercise asserts in the C code).
|
|
|
|
lengths = list(range(1, 257)) + [512, 1000, 1024, 2048, 4096, 8192, 10000,
|
|
16384, 32768, 65536, 1000000]
|
|
|
|
class BufferSizeTest:
|
|
def try_one(self, s):
|
|
# Write s + "\n" + s to file, then open it and ensure that successive
|
|
# .readline()s deliver what we wrote.
|
|
|
|
# Ensure we can open TESTFN for writing.
|
|
os_helper.unlink(os_helper.TESTFN)
|
|
|
|
# Since C doesn't guarantee we can write/read arbitrary bytes in text
|
|
# files, use binary mode.
|
|
f = self.open(os_helper.TESTFN, "wb")
|
|
try:
|
|
# write once with \n and once without
|
|
f.write(s)
|
|
f.write(b"\n")
|
|
f.write(s)
|
|
f.close()
|
|
f = self.open(os_helper.TESTFN, "rb")
|
|
line = f.readline()
|
|
self.assertEqual(line, s + b"\n")
|
|
line = f.readline()
|
|
self.assertEqual(line, s)
|
|
line = f.readline()
|
|
self.assertFalse(line) # Must be at EOF
|
|
f.close()
|
|
finally:
|
|
os_helper.unlink(os_helper.TESTFN)
|
|
|
|
def drive_one(self, pattern):
|
|
for length in lengths:
|
|
# Repeat string 'pattern' as often as needed to reach total length
|
|
# 'length'. Then call try_one with that string, a string one larger
|
|
# than that, and a string one smaller than that. Try this with all
|
|
# small sizes and various powers of 2, so we exercise all likely
|
|
# stdio buffer sizes, and "off by one" errors on both sides.
|
|
q, r = divmod(length, len(pattern))
|
|
teststring = pattern * q + pattern[:r]
|
|
self.assertEqual(len(teststring), length)
|
|
self.try_one(teststring)
|
|
self.try_one(teststring + b"x")
|
|
self.try_one(teststring[:-1])
|
|
|
|
def test_primepat(self):
|
|
# A pattern with prime length, to avoid simple relationships with
|
|
# stdio buffer sizes.
|
|
self.drive_one(b"1234567890\00\01\02\03\04\05\06")
|
|
|
|
def test_nullpat(self):
|
|
self.drive_one(b'\0' * 1000)
|
|
|
|
|
|
class CBufferSizeTest(BufferSizeTest, unittest.TestCase):
|
|
open = io.open
|
|
|
|
class PyBufferSizeTest(BufferSizeTest, unittest.TestCase):
|
|
open = staticmethod(pyio.open)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|