import array import codecs import locale import os import pickle import sys import threading import time import unittest import warnings import weakref from collections import UserList from test import support from test.support import os_helper, threading_helper from test.support.script_helper import assert_python_ok from .utils import CTestCase, PyTestCase import io # C implementation of io import _pyio as pyio # Python implementation of io def _default_chunk_size(): """Get the default TextIOWrapper chunk size""" with open(__file__, "r", encoding="latin-1") as f: return f._CHUNK_SIZE class BadIndex: def __index__(self): 1/0 # To fully exercise seek/tell, the StatefulIncrementalDecoder has these # properties: # - A single output character can correspond to many bytes of input. # - The number of input bytes to complete the character can be # undetermined until the last input byte is received. # - The number of input bytes can vary depending on previous input. # - A single input byte can correspond to many characters of output. # - The number of output characters can be undetermined until the # last input byte is received. # - The number of output characters can vary depending on previous input. class StatefulIncrementalDecoder(codecs.IncrementalDecoder): """ For testing seek/tell behavior with a stateful, buffering decoder. Input is a sequence of words. Words may be fixed-length (length set by input) or variable-length (period-terminated). In variable-length mode, extra periods are ignored. Possible words are: - 'i' followed by a number sets the input length, I (maximum 99). When I is set to 0, words are space-terminated. - 'o' followed by a number sets the output length, O (maximum 99). - Any other word is converted into a word followed by a period on the output. The output word consists of the input word truncated or padded out with hyphens to make its length equal to O. If O is 0, the word is output verbatim without truncating or padding. I and O are initially set to 1. When I changes, any buffered input is re-scanned according to the new I. EOF also terminates the last word. """ def __init__(self, errors='strict'): codecs.IncrementalDecoder.__init__(self, errors) self.reset() def __repr__(self): return '' % id(self) def reset(self): self.i = 1 self.o = 1 self.buffer = bytearray() def getstate(self): i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset() return bytes(self.buffer), i*100 + o def setstate(self, state): buffer, io = state self.buffer = bytearray(buffer) i, o = divmod(io, 100) self.i, self.o = i ^ 1, o ^ 1 def decode(self, input, final=False): output = '' for b in input: if self.i == 0: # variable-length, terminated with period if b == ord('.'): if self.buffer: output += self.process_word() else: self.buffer.append(b) else: # fixed-length, terminate after self.i bytes self.buffer.append(b) if len(self.buffer) == self.i: output += self.process_word() if final and self.buffer: # EOF terminates the last word output += self.process_word() return output def process_word(self): output = '' if self.buffer[0] == ord('i'): self.i = min(99, int(self.buffer[1:] or 0)) # set input length elif self.buffer[0] == ord('o'): self.o = min(99, int(self.buffer[1:] or 0)) # set output length else: output = self.buffer.decode('ascii') if len(output) < self.o: output += '-'*self.o # pad out with hyphens if self.o: output = output[:self.o] # truncate to output length output += '.' self.buffer = bytearray() return output codecEnabled = False # bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak # when registering codecs and cleanup functions. def lookupTestDecoder(name): if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder': latin1 = codecs.lookup('latin-1') return codecs.CodecInfo( name='test_decoder', encode=latin1.encode, decode=None, incrementalencoder=None, streamreader=None, streamwriter=None, incrementaldecoder=StatefulIncrementalDecoder) class StatefulIncrementalDecoderTest(unittest.TestCase): """ Make sure the StatefulIncrementalDecoder actually works. """ test_cases = [ # I=1, O=1 (fixed-length input == fixed-length output) (b'abcd', False, 'a.b.c.d.'), # I=0, O=0 (variable-length input, variable-length output) (b'oiabcd', True, 'abcd.'), # I=0, O=0 (should ignore extra periods) (b'oi...abcd...', True, 'abcd.'), # I=0, O=6 (variable-length input, fixed-length output) (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'), # I=2, O=6 (fixed-length input < fixed-length output) (b'i.i2.o6xyz', True, 'xy----.z-----.'), # I=6, O=3 (fixed-length input > fixed-length output) (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'), # I=0, then 3; O=29, then 15 (with longer output) (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True, 'a----------------------------.' + 'b----------------------------.' + 'cde--------------------------.' + 'abcdefghijabcde.' + 'a.b------------.' + '.c.------------.' + 'd.e------------.' + 'k--------------.' + 'l--------------.' + 'm--------------.') ] def test_decoder(self): # Try a few one-shot test cases. for input, eof, output in self.test_cases: d = StatefulIncrementalDecoder() self.assertEqual(d.decode(input, eof), output) # Also test an unfinished decode, followed by forcing EOF. d = StatefulIncrementalDecoder() self.assertEqual(d.decode(b'oiabcd'), '') self.assertEqual(d.decode(b'', 1), 'abcd.') class TextIOWrapperTest: def setUp(self): self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n" self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii") os_helper.unlink(os_helper.TESTFN) codecs.register(lookupTestDecoder) self.addCleanup(codecs.unregister, lookupTestDecoder) def tearDown(self): os_helper.unlink(os_helper.TESTFN) def test_constructor(self): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) t = self.TextIOWrapper(b, encoding="utf-8") t.__init__(b, encoding="latin-1", newline="\r\n") self.assertEqual(t.encoding, "latin-1") self.assertEqual(t.line_buffering, False) t.__init__(b, encoding="utf-8", line_buffering=True) self.assertEqual(t.encoding, "utf-8") self.assertEqual(t.line_buffering, True) self.assertEqual("\xe9\n", t.readline()) invalid_type = TypeError if self.is_C else ValueError with self.assertRaises(invalid_type): t.__init__(b, encoding=42) with self.assertRaises(UnicodeEncodeError): t.__init__(b, encoding='\udcfe') with self.assertRaises(ValueError): t.__init__(b, encoding='utf-8\0') with self.assertRaises(invalid_type): t.__init__(b, encoding="utf-8", errors=42) if support.Py_DEBUG or sys.flags.dev_mode or self.is_C: with self.assertRaises(UnicodeEncodeError): t.__init__(b, encoding="utf-8", errors='\udcfe') if support.Py_DEBUG or sys.flags.dev_mode or self.is_C: with self.assertRaises(ValueError): t.__init__(b, encoding="utf-8", errors='replace\0') with self.assertRaises(TypeError): t.__init__(b, encoding="utf-8", newline=42) with self.assertRaises(ValueError): t.__init__(b, encoding="utf-8", newline='\udcfe') with self.assertRaises(ValueError): t.__init__(b, encoding="utf-8", newline='\n\0') with self.assertRaises(ValueError): t.__init__(b, encoding="utf-8", newline='xyzzy') def test_uninitialized(self): t = self.TextIOWrapper.__new__(self.TextIOWrapper) del t t = self.TextIOWrapper.__new__(self.TextIOWrapper) self.assertRaises(Exception, repr, t) self.assertRaisesRegex((ValueError, AttributeError), 'uninitialized|has no attribute', t.read, 0) t.__init__(self.MockRawIO(), encoding="utf-8") self.assertEqual(t.read(0), '') def test_non_text_encoding_codecs_are_rejected(self): # Ensure the constructor complains if passed a codec that isn't # marked as a text encoding # http://bugs.python.org/issue20404 r = self.BytesIO() b = self.BufferedWriter(r) with self.assertRaisesRegex(LookupError, "is not a text encoding"): self.TextIOWrapper(b, encoding="hex") def test_detach(self): r = self.BytesIO() b = self.BufferedWriter(r) t = self.TextIOWrapper(b, encoding="ascii") self.assertIs(t.detach(), b) t = self.TextIOWrapper(b, encoding="ascii") t.write("howdy") self.assertFalse(r.getvalue()) t.detach() self.assertEqual(r.getvalue(), b"howdy") self.assertRaises(ValueError, t.detach) # Operations independent of the detached stream should still work repr(t) self.assertEqual(t.encoding, "ascii") self.assertEqual(t.errors, "strict") self.assertFalse(t.line_buffering) self.assertFalse(t.write_through) def test_repr(self): raw = self.BytesIO("hello".encode("utf-8")) b = self.BufferedReader(raw) t = self.TextIOWrapper(b, encoding="utf-8") modname = self.TextIOWrapper.__module__ self.assertRegex(repr(t), r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname) raw.name = "dummy" self.assertRegex(repr(t), r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname) t.mode = "r" self.assertRegex(repr(t), r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname) raw.name = b"dummy" self.assertRegex(repr(t), r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname) t.buffer.detach() repr(t) # Should not raise an exception def test_recursive_repr(self): # Issue #25455 raw = self.BytesIO() t = self.TextIOWrapper(raw, encoding="utf-8") with support.swap_attr(raw, 'name', t), support.infinite_recursion(25): with self.assertRaises(RuntimeError): repr(t) # Should not crash def test_subclass_repr(self): class TestSubclass(self.TextIOWrapper): pass f = TestSubclass(self.StringIO()) self.assertIn(TestSubclass.__name__, repr(f)) def test_line_buffering(self): r = self.BytesIO() b = self.BufferedWriter(r, 1000) t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True) t.write("X") self.assertEqual(r.getvalue(), b"") # No flush happened t.write("Y\nZ") self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed t.write("A\rB") self.assertEqual(r.getvalue(), b"XY\nZA\rB") def test_reconfigure_line_buffering(self): r = self.BytesIO() b = self.BufferedWriter(r, 1000) t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False) t.write("AB\nC") self.assertEqual(r.getvalue(), b"") t.reconfigure(line_buffering=True) # implicit flush self.assertEqual(r.getvalue(), b"AB\nC") t.write("DEF\nG") self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") t.write("H") self.assertEqual(r.getvalue(), b"AB\nCDEF\nG") t.reconfigure(line_buffering=False) # implicit flush self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") t.write("IJ") self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH") # Keeping default value t.reconfigure() t.reconfigure(line_buffering=None) self.assertEqual(t.line_buffering, False) t.reconfigure(line_buffering=True) t.reconfigure() t.reconfigure(line_buffering=None) self.assertEqual(t.line_buffering, True) @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled") def test_default_encoding(self): with os_helper.EnvironmentVarGuard() as env: # try to get a user preferred encoding different than the current # locale encoding to check that TextIOWrapper() uses the current # locale encoding and not the user preferred encoding env.unset('LC_ALL', 'LANG', 'LC_CTYPE') current_locale_encoding = locale.getencoding() b = self.BytesIO() with warnings.catch_warnings(): warnings.simplefilter("ignore", EncodingWarning) t = self.TextIOWrapper(b) self.assertEqual(t.encoding, current_locale_encoding) def test_encoding(self): # Check the encoding attribute is always set, and valid b = self.BytesIO() t = self.TextIOWrapper(b, encoding="utf-8") self.assertEqual(t.encoding, "utf-8") with warnings.catch_warnings(): warnings.simplefilter("ignore", EncodingWarning) t = self.TextIOWrapper(b) self.assertIsNotNone(t.encoding) codecs.lookup(t.encoding) def test_encoding_errors_reading(self): # (1) default b = self.BytesIO(b"abc\n\xff\n") t = self.TextIOWrapper(b, encoding="ascii") self.assertRaises(UnicodeError, t.read) # (2) explicit strict b = self.BytesIO(b"abc\n\xff\n") t = self.TextIOWrapper(b, encoding="ascii", errors="strict") self.assertRaises(UnicodeError, t.read) # (3) ignore b = self.BytesIO(b"abc\n\xff\n") t = self.TextIOWrapper(b, encoding="ascii", errors="ignore") self.assertEqual(t.read(), "abc\n\n") # (4) replace b = self.BytesIO(b"abc\n\xff\n") t = self.TextIOWrapper(b, encoding="ascii", errors="replace") self.assertEqual(t.read(), "abc\n\ufffd\n") def test_encoding_errors_writing(self): # (1) default b = self.BytesIO() t = self.TextIOWrapper(b, encoding="ascii") self.assertRaises(UnicodeError, t.write, "\xff") # (2) explicit strict b = self.BytesIO() t = self.TextIOWrapper(b, encoding="ascii", errors="strict") self.assertRaises(UnicodeError, t.write, "\xff") # (3) ignore b = self.BytesIO() t = self.TextIOWrapper(b, encoding="ascii", errors="ignore", newline="\n") t.write("abc\xffdef\n") t.flush() self.assertEqual(b.getvalue(), b"abcdef\n") # (4) replace b = self.BytesIO() t = self.TextIOWrapper(b, encoding="ascii", errors="replace", newline="\n") t.write("abc\xffdef\n") t.flush() self.assertEqual(b.getvalue(), b"abc?def\n") def test_newlines(self): input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ] tests = [ [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ], [ '', input_lines ], [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ], [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ], [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ], ] encodings = ( 'utf-8', 'latin-1', 'utf-16', 'utf-16-le', 'utf-16-be', 'utf-32', 'utf-32-le', 'utf-32-be', ) # Try a range of buffer sizes to test the case where \r is the last # character in TextIOWrapper._pending_line. for encoding in encodings: # XXX: str.encode() should return bytes data = bytes(''.join(input_lines).encode(encoding)) for do_reads in (False, True): for bufsize in range(1, 10): for newline, exp_lines in tests: bufio = self.BufferedReader(self.BytesIO(data), bufsize) textio = self.TextIOWrapper(bufio, newline=newline, encoding=encoding) if do_reads: got_lines = [] while True: c2 = textio.read(2) if c2 == '': break self.assertEqual(len(c2), 2) got_lines.append(c2 + textio.readline()) else: got_lines = list(textio) for got_line, exp_line in zip(got_lines, exp_lines): self.assertEqual(got_line, exp_line) self.assertEqual(len(got_lines), len(exp_lines)) def test_newlines_input(self): testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG" normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n") for newline, expected in [ (None, normalized.decode("ascii").splitlines(keepends=True)), ("", testdata.decode("ascii").splitlines(keepends=True)), ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]), ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]), ]: buf = self.BytesIO(testdata) txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) self.assertEqual(txt.readlines(), expected) txt.seek(0) self.assertEqual(txt.read(), "".join(expected)) def test_newlines_output(self): testdict = { "": b"AAA\nBBB\nCCC\nX\rY\r\nZ", "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ", "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ", "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ", } tests = [(None, testdict[os.linesep])] + sorted(testdict.items()) for newline, expected in tests: buf = self.BytesIO() txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline) txt.write("AAA\nB") txt.write("BB\nCCC\n") txt.write("X\rY\r\nZ") txt.flush() self.assertEqual(buf.closed, False) self.assertEqual(buf.getvalue(), expected) def test_destructor(self): l = [] base = self.BytesIO class MyBytesIO(base): def close(self): l.append(self.getvalue()) base.close(self) b = MyBytesIO() t = self.TextIOWrapper(b, encoding="ascii") t.write("abc") del t support.gc_collect() self.assertEqual([b"abc"], l) def test_override_destructor(self): record = [] class MyTextIO(self.TextIOWrapper): def __del__(self): record.append(1) try: f = super().__del__ except AttributeError: pass else: f() def close(self): record.append(2) super().close() def flush(self): record.append(3) super().flush() b = self.BytesIO() t = MyTextIO(b, encoding="ascii") del t support.gc_collect() self.assertEqual(record, [1, 2, 3]) def test_error_through_destructor(self): # Test that the exception state is not modified by a destructor, # even if close() fails. rawio = self.CloseFailureIO() with support.catch_unraisable_exception() as cm: with self.assertRaises(AttributeError): self.TextIOWrapper(rawio, encoding="utf-8").xyzzy self.assertEqual(cm.unraisable.exc_type, OSError) # Systematic tests of the text I/O API def test_basic_io(self): for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65): for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le": f = self.open(os_helper.TESTFN, "w+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEqual(f.write("abc"), 3) f.close() f = self.open(os_helper.TESTFN, "r+", encoding=enc) f._CHUNK_SIZE = chunksize self.assertEqual(f.tell(), 0) self.assertEqual(f.read(), "abc") cookie = f.tell() self.assertEqual(f.seek(0), 0) self.assertEqual(f.read(None), "abc") f.seek(0) self.assertEqual(f.read(2), "ab") self.assertEqual(f.read(1), "c") self.assertEqual(f.read(1), "") self.assertEqual(f.read(), "") self.assertEqual(f.tell(), cookie) self.assertEqual(f.seek(0), 0) self.assertEqual(f.seek(0, 2), cookie) self.assertEqual(f.write("def"), 3) self.assertEqual(f.seek(cookie), cookie) self.assertEqual(f.read(), "def") if enc.startswith("utf"): self.multi_line_test(f, enc) f.close() def multi_line_test(self, f, enc): f.seek(0) f.truncate() sample = "s\xff\u0fff\uffff" wlines = [] for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000): chars = [] for i in range(size): chars.append(sample[i % len(sample)]) line = "".join(chars) + "\n" wlines.append((f.tell(), line)) f.write(line) f.seek(0) rlines = [] while True: pos = f.tell() line = f.readline() if not line: break rlines.append((pos, line)) self.assertEqual(rlines, wlines) def test_telling(self): f = self.open(os_helper.TESTFN, "w+", encoding="utf-8") p0 = f.tell() f.write("\xff\n") p1 = f.tell() f.write("\xff\n") p2 = f.tell() f.seek(0) self.assertEqual(f.tell(), p0) self.assertEqual(f.readline(), "\xff\n") self.assertEqual(f.tell(), p1) self.assertEqual(f.readline(), "\xff\n") self.assertEqual(f.tell(), p2) f.seek(0) for line in f: self.assertEqual(line, "\xff\n") self.assertRaises(OSError, f.tell) self.assertEqual(f.tell(), p2) f.close() def test_seeking(self): chunk_size = _default_chunk_size() prefix_size = chunk_size - 2 u_prefix = "a" * prefix_size prefix = bytes(u_prefix.encode("utf-8")) self.assertEqual(len(u_prefix), len(prefix)) u_suffix = "\u8888\n" suffix = bytes(u_suffix.encode("utf-8")) line = prefix + suffix with self.open(os_helper.TESTFN, "wb") as f: f.write(line*2) with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: s = f.read(prefix_size) self.assertEqual(s, str(prefix, "ascii")) self.assertEqual(f.tell(), prefix_size) self.assertEqual(f.readline(), u_suffix) def test_seeking_too(self): # Regression test for a specific bug data = b'\xe0\xbf\xbf\n' with self.open(os_helper.TESTFN, "wb") as f: f.write(data) with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f: f._CHUNK_SIZE # Just test that it exists f._CHUNK_SIZE = 2 f.readline() f.tell() def test_seek_and_tell(self): #Test seek/tell using the StatefulIncrementalDecoder. # Make test faster by doing smaller seeks CHUNK_SIZE = 128 def test_seek_and_tell_with_data(data, min_pos=0): """Tell/seek to various points within a data stream and ensure that the decoded data returned by read() is consistent.""" f = self.open(os_helper.TESTFN, 'wb') f.write(data) f.close() f = self.open(os_helper.TESTFN, encoding='test_decoder') f._CHUNK_SIZE = CHUNK_SIZE decoded = f.read() f.close() for i in range(min_pos, len(decoded) + 1): # seek positions for j in [1, 5, len(decoded) - i]: # read lengths f = self.open(os_helper.TESTFN, encoding='test_decoder') self.assertEqual(f.read(i), decoded[:i]) cookie = f.tell() self.assertEqual(f.read(j), decoded[i:i + j]) f.seek(cookie) self.assertEqual(f.read(), decoded[i:]) f.close() # Enable the test decoder. StatefulIncrementalDecoder.codecEnabled = 1 # Run the tests. try: # Try each test case. for input, _, _ in StatefulIncrementalDecoderTest.test_cases: test_seek_and_tell_with_data(input) # Position each test case so that it crosses a chunk boundary. for input, _, _ in StatefulIncrementalDecoderTest.test_cases: offset = CHUNK_SIZE - len(input)//2 prefix = b'.'*offset # Don't bother seeking into the prefix (takes too long). min_pos = offset*2 test_seek_and_tell_with_data(prefix + input, min_pos) # Ensure our test decoder won't interfere with subsequent tests. finally: StatefulIncrementalDecoder.codecEnabled = 0 def test_multibyte_seek_and_tell(self): f = self.open(os_helper.TESTFN, "w", encoding="euc_jp") f.write("AB\n\u3046\u3048\n") f.close() f = self.open(os_helper.TESTFN, "r", encoding="euc_jp") self.assertEqual(f.readline(), "AB\n") p0 = f.tell() self.assertEqual(f.readline(), "\u3046\u3048\n") p1 = f.tell() f.seek(p0) self.assertEqual(f.readline(), "\u3046\u3048\n") self.assertEqual(f.tell(), p1) f.close() def test_seek_with_encoder_state(self): f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004") f.write("\u00e6\u0300") p0 = f.tell() f.write("\u00e6") f.seek(p0) f.write("\u0300") f.close() f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004") self.assertEqual(f.readline(), "\u00e6\u0300\u0300") f.close() def test_encoded_writes(self): data = "1234567890" tests = ("utf-16", "utf-16-le", "utf-16-be", "utf-32", "utf-32-le", "utf-32-be") for encoding in tests: buf = self.BytesIO() f = self.TextIOWrapper(buf, encoding=encoding) # Check if the BOM is written only once (see issue1753). f.write(data) f.write(data) f.seek(0) self.assertEqual(f.read(), data * 2) f.seek(0) self.assertEqual(f.read(), data * 2) self.assertEqual(buf.getvalue(), (data * 2).encode(encoding)) def test_unreadable(self): class UnReadable(self.BytesIO): def readable(self): return False txt = self.TextIOWrapper(UnReadable(), encoding="utf-8") self.assertRaises(OSError, txt.read) def test_read_one_by_one(self): txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8") reads = "" while True: c = txt.read(1) if not c: break reads += c self.assertEqual(reads, "AA\nBB") def test_readlines(self): txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8") self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"]) txt.seek(0) self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"]) txt.seek(0) self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"]) # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128. def test_read_by_chunk(self): # make sure "\r\n" straddles 128 char boundary. txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8") reads = "" while True: c = txt.read(128) if not c: break reads += c self.assertEqual(reads, "A"*127+"\nB") def test_writelines(self): l = ['ab', 'cd', 'ef'] buf = self.BytesIO() txt = self.TextIOWrapper(buf, encoding="utf-8") txt.writelines(l) txt.flush() self.assertEqual(buf.getvalue(), b'abcdef') def test_writelines_userlist(self): l = UserList(['ab', 'cd', 'ef']) buf = self.BytesIO() txt = self.TextIOWrapper(buf, encoding="utf-8") txt.writelines(l) txt.flush() self.assertEqual(buf.getvalue(), b'abcdef') def test_writelines_error(self): txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8") self.assertRaises(TypeError, txt.writelines, [1, 2, 3]) self.assertRaises(TypeError, txt.writelines, None) self.assertRaises(TypeError, txt.writelines, b'abc') def test_issue1395_1(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") # read one char at a time reads = "" while True: c = txt.read(1) if not c: break reads += c self.assertEqual(reads, self.normalized) def test_issue1395_2(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = "" while True: c = txt.read(4) if not c: break reads += c self.assertEqual(reads, self.normalized) def test_issue1395_3(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = txt.read(4) reads += txt.read(4) reads += txt.readline() reads += txt.readline() reads += txt.readline() self.assertEqual(reads, self.normalized) def test_issue1395_4(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = txt.read(4) reads += txt.read() self.assertEqual(reads, self.normalized) def test_issue1395_5(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt._CHUNK_SIZE = 4 reads = txt.read(4) pos = txt.tell() txt.seek(0) txt.seek(pos) self.assertEqual(txt.read(4), "BBB\n") def test_issue2282(self): buffer = self.BytesIO(self.testdata) txt = self.TextIOWrapper(buffer, encoding="ascii") self.assertEqual(buffer.seekable(), txt.seekable()) def test_append_bom(self): # The BOM is not written again when appending to a non-empty file filename = os_helper.TESTFN for charset in ('utf-8-sig', 'utf-16', 'utf-32'): with self.open(filename, 'w', encoding=charset) as f: f.write('aaa') pos = f.tell() with self.open(filename, 'rb') as f: self.assertEqual(f.read(), 'aaa'.encode(charset)) with self.open(filename, 'a', encoding=charset) as f: f.write('xxx') with self.open(filename, 'rb') as f: self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) def test_seek_bom(self): # Same test, but when seeking manually filename = os_helper.TESTFN for charset in ('utf-8-sig', 'utf-16', 'utf-32'): with self.open(filename, 'w', encoding=charset) as f: f.write('aaa') pos = f.tell() with self.open(filename, 'r+', encoding=charset) as f: f.seek(pos) f.write('zzz') f.seek(0) f.write('bbb') with self.open(filename, 'rb') as f: self.assertEqual(f.read(), 'bbbzzz'.encode(charset)) def test_seek_append_bom(self): # Same test, but first seek to the start and then to the end filename = os_helper.TESTFN for charset in ('utf-8-sig', 'utf-16', 'utf-32'): with self.open(filename, 'w', encoding=charset) as f: f.write('aaa') with self.open(filename, 'a', encoding=charset) as f: f.seek(0) f.seek(0, self.SEEK_END) f.write('xxx') with self.open(filename, 'rb') as f: self.assertEqual(f.read(), 'aaaxxx'.encode(charset)) def test_errors_property(self): with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f: self.assertEqual(f.errors, "strict") with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f: self.assertEqual(f.errors, "replace") @support.no_tracing @threading_helper.requires_working_threading() def test_threads_write(self): # Issue6750: concurrent writes could duplicate data event = threading.Event() with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f: def run(n): text = "Thread%03d\n" % n event.wait() f.write(text) threads = [threading.Thread(target=run, args=(x,)) for x in range(20)] with threading_helper.start_threads(threads, event.set): time.sleep(0.02) with self.open(os_helper.TESTFN, encoding="utf-8") as f: content = f.read() for n in range(20): self.assertEqual(content.count("Thread%03d\n" % n), 1) def test_flush_error_on_close(self): # Test that text file is closed despite failed flush # and that flush() is called before file closed. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") closed = [] def bad_flush(): closed[:] = [txt.closed, txt.buffer.closed] raise OSError() txt.flush = bad_flush self.assertRaises(OSError, txt.close) # exception not swallowed self.assertTrue(txt.closed) self.assertTrue(txt.buffer.closed) self.assertTrue(closed) # flush() called self.assertFalse(closed[0]) # flush() called before file closed self.assertFalse(closed[1]) txt.flush = lambda: None # break reference loop def test_close_error_on_close(self): buffer = self.BytesIO(self.testdata) def bad_flush(): raise OSError('flush') def bad_close(): raise OSError('close') buffer.close = bad_close txt = self.TextIOWrapper(buffer, encoding="ascii") txt.flush = bad_flush with self.assertRaises(OSError) as err: # exception not swallowed txt.close() self.assertEqual(err.exception.args, ('close',)) self.assertIsInstance(err.exception.__context__, OSError) self.assertEqual(err.exception.__context__.args, ('flush',)) self.assertFalse(txt.closed) # Silence destructor error buffer.close = lambda: None txt.flush = lambda: None def test_nonnormalized_close_error_on_close(self): # Issue #21677 buffer = self.BytesIO(self.testdata) def bad_flush(): raise non_existing_flush def bad_close(): raise non_existing_close buffer.close = bad_close txt = self.TextIOWrapper(buffer, encoding="ascii") txt.flush = bad_flush with self.assertRaises(NameError) as err: # exception not swallowed txt.close() self.assertIn('non_existing_close', str(err.exception)) self.assertIsInstance(err.exception.__context__, NameError) self.assertIn('non_existing_flush', str(err.exception.__context__)) self.assertFalse(txt.closed) # Silence destructor error buffer.close = lambda: None txt.flush = lambda: None def test_multi_close(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") txt.close() txt.close() txt.close() self.assertRaises(ValueError, txt.flush) def test_unseekable(self): txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8") self.assertRaises(self.UnsupportedOperation, txt.tell) self.assertRaises(self.UnsupportedOperation, txt.seek, 0) def test_readonly_attributes(self): txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii") buf = self.BytesIO(self.testdata) with self.assertRaises(AttributeError): txt.buffer = buf def test_rawio(self): # Issue #12591: TextIOWrapper must work with raw I/O objects, so # that subprocess.Popen() can have the required unbuffered # semantics with universal_newlines=True. raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') # Reads self.assertEqual(txt.read(4), 'abcd') self.assertEqual(txt.readline(), 'efghi\n') self.assertEqual(list(txt), ['jkl\n', 'opq\n']) def test_rawio_write_through(self): # Issue #12591: with write_through=True, writes don't need a flush raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n']) txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n', write_through=True) txt.write('1') txt.write('23\n4') txt.write('5') self.assertEqual(b''.join(raw._write_stack), b'123\n45') def test_bufio_write_through(self): # Issue #21396: write_through=True doesn't force a flush() # on the underlying binary buffered object. flush_called, write_called = [], [] class BufferedWriter(self.BufferedWriter): def flush(self, *args, **kwargs): flush_called.append(True) return super().flush(*args, **kwargs) def write(self, *args, **kwargs): write_called.append(True) return super().write(*args, **kwargs) rawio = self.BytesIO() data = b"a" bufio = BufferedWriter(rawio, len(data)*2) textio = self.TextIOWrapper(bufio, encoding='ascii', write_through=True) # write to the buffered io but don't overflow the buffer text = data.decode('ascii') textio.write(text) # buffer.flush is not called with write_through=True self.assertFalse(flush_called) # buffer.write *is* called with write_through=True self.assertTrue(write_called) self.assertEqual(rawio.getvalue(), b"") # no flush write_called = [] # reset textio.write(text * 10) # total content is larger than bufio buffer self.assertTrue(write_called) self.assertEqual(rawio.getvalue(), data * 11) # all flushed def test_reconfigure_write_through(self): raw = self.MockRawIO([]) t = self.TextIOWrapper(raw, encoding='ascii', newline='\n') t.write('1') t.reconfigure(write_through=True) # implied flush self.assertEqual(t.write_through, True) self.assertEqual(b''.join(raw._write_stack), b'1') t.write('23') self.assertEqual(b''.join(raw._write_stack), b'123') t.reconfigure(write_through=False) self.assertEqual(t.write_through, False) t.write('45') t.flush() self.assertEqual(b''.join(raw._write_stack), b'12345') # Keeping default value t.reconfigure() t.reconfigure(write_through=None) self.assertEqual(t.write_through, False) t.reconfigure(write_through=True) t.reconfigure() t.reconfigure(write_through=None) self.assertEqual(t.write_through, True) def test_read_nonbytes(self): # Issue #17106 # Crash when underlying read() returns non-bytes t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") self.assertRaises(TypeError, t.read, 1) t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") self.assertRaises(TypeError, t.readline) t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8") self.assertRaises(TypeError, t.read) def test_illegal_encoder(self): # Issue 31271: Calling write() while the return value of encoder's # encode() is invalid shouldn't cause an assertion failure. rot13 = codecs.lookup("rot13") with support.swap_attr(rot13, '_is_text_encoding', True): t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13") self.assertRaises(TypeError, t.write, 'bar') def test_illegal_decoder(self): # Issue #17106 # Bypass the early encoding check added in issue 20404 def _make_illegal_wrapper(): quopri = codecs.lookup("quopri") quopri._is_text_encoding = True try: t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n', encoding="quopri") finally: quopri._is_text_encoding = False return t # Crash when decoder returns non-string t = _make_illegal_wrapper() self.assertRaises(TypeError, t.read, 1) t = _make_illegal_wrapper() self.assertRaises(TypeError, t.readline) t = _make_illegal_wrapper() self.assertRaises(TypeError, t.read) # Issue 31243: calling read() while the return value of decoder's # getstate() is invalid should neither crash the interpreter nor # raise a SystemError. def _make_very_illegal_wrapper(getstate_ret_val): class BadDecoder: def getstate(self): return getstate_ret_val def _get_bad_decoder(dummy): return BadDecoder() quopri = codecs.lookup("quopri") with support.swap_attr(quopri, 'incrementaldecoder', _get_bad_decoder): return _make_illegal_wrapper() t = _make_very_illegal_wrapper(42) self.assertRaises(TypeError, t.read, 42) t = _make_very_illegal_wrapper(()) self.assertRaises(TypeError, t.read, 42) t = _make_very_illegal_wrapper((1, 2)) self.assertRaises(TypeError, t.read, 42) def _check_create_at_shutdown(self, **kwargs): # Issue #20037: creating a TextIOWrapper at shutdown # shouldn't crash the interpreter. iomod = self.io.__name__ code = """if 1: import codecs import {iomod} as io # Avoid looking up codecs at shutdown codecs.lookup('utf-8') class C: def __del__(self): io.TextIOWrapper(io.BytesIO(), **{kwargs}) print("ok") c = C() """.format(iomod=iomod, kwargs=kwargs) return assert_python_ok("-c", code) def test_create_at_shutdown_without_encoding(self): rc, out, err = self._check_create_at_shutdown() if err: # Can error out with a RuntimeError if the module state # isn't found. self.assertIn(self.shutdown_error, err.decode()) else: self.assertEqual("ok", out.decode().strip()) def test_create_at_shutdown_with_encoding(self): rc, out, err = self._check_create_at_shutdown(encoding='utf-8', errors='strict') self.assertFalse(err) self.assertEqual("ok", out.decode().strip()) def test_read_byteslike(self): r = MemviewBytesIO(b'Just some random string\n') t = self.TextIOWrapper(r, 'utf-8') # TextIOwrapper will not read the full string, because # we truncate it to a multiple of the native int size # so that we can construct a more complex memoryview. bytes_val = _to_memoryview(r.getvalue()).tobytes() self.assertEqual(t.read(200), bytes_val.decode('utf-8')) def test_issue22849(self): class F(object): def readable(self): return True def writable(self): return True def seekable(self): return True for i in range(10): try: self.TextIOWrapper(F(), encoding='utf-8') except Exception: pass F.tell = lambda x: 0 t = self.TextIOWrapper(F(), encoding='utf-8') def test_reconfigure_locale(self): wrapper = self.TextIOWrapper(self.BytesIO(b"test")) wrapper.reconfigure(encoding="locale") def test_reconfigure_encoding_read(self): # latin1 -> utf8 # (latin1 can decode utf-8 encoded string) data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8') raw = self.BytesIO(data) txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') self.assertEqual(txt.readline(), 'abc\xe9\n') with self.assertRaises(self.UnsupportedOperation): txt.reconfigure(encoding='utf-8') with self.assertRaises(self.UnsupportedOperation): txt.reconfigure(newline=None) def test_reconfigure_write_fromascii(self): # ascii has a specific encodefunc in the C implementation, # but utf-8-sig has not. Make sure that we get rid of the # cached encodefunc when we switch encoders. raw = self.BytesIO() txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') txt.write('foo\n') txt.reconfigure(encoding='utf-8-sig') txt.write('\xe9\n') txt.flush() self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n') def test_reconfigure_write(self): # latin -> utf8 raw = self.BytesIO() txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n') txt.write('abc\xe9\n') txt.reconfigure(encoding='utf-8') self.assertEqual(raw.getvalue(), b'abc\xe9\n') txt.write('d\xe9f\n') txt.flush() self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n') # ascii -> utf-8-sig: ensure that no BOM is written in the middle of # the file raw = self.BytesIO() txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') txt.write('abc\n') txt.reconfigure(encoding='utf-8-sig') txt.write('d\xe9f\n') txt.flush() self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n') def test_reconfigure_write_non_seekable(self): raw = self.BytesIO() raw.seekable = lambda: False raw.seek = None txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n') txt.write('abc\n') txt.reconfigure(encoding='utf-8-sig') txt.write('d\xe9f\n') txt.flush() # If the raw stream is not seekable, there'll be a BOM self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n') def test_reconfigure_defaults(self): txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n') txt.reconfigure(encoding=None) self.assertEqual(txt.encoding, 'ascii') self.assertEqual(txt.errors, 'replace') txt.write('LF\n') txt.reconfigure(newline='\r\n') self.assertEqual(txt.encoding, 'ascii') self.assertEqual(txt.errors, 'replace') txt.reconfigure(errors='ignore') self.assertEqual(txt.encoding, 'ascii') self.assertEqual(txt.errors, 'ignore') txt.write('CRLF\n') txt.reconfigure(encoding='utf-8', newline=None) self.assertEqual(txt.errors, 'strict') txt.seek(0) self.assertEqual(txt.read(), 'LF\nCRLF\n') self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n') def test_reconfigure_errors(self): txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r') with self.assertRaises(TypeError): # there was a crash txt.reconfigure(encoding=42) if self.is_C: with self.assertRaises(UnicodeEncodeError): txt.reconfigure(encoding='\udcfe') with self.assertRaises(LookupError): txt.reconfigure(encoding='locale\0') # TODO: txt.reconfigure(encoding='utf-8\0') # TODO: txt.reconfigure(encoding='nonexisting') with self.assertRaises(TypeError): txt.reconfigure(errors=42) if self.is_C: with self.assertRaises(UnicodeEncodeError): txt.reconfigure(errors='\udcfe') # TODO: txt.reconfigure(errors='ignore\0') # TODO: txt.reconfigure(errors='nonexisting') with self.assertRaises(TypeError): txt.reconfigure(newline=42) with self.assertRaises(ValueError): txt.reconfigure(newline='\udcfe') with self.assertRaises(ValueError): txt.reconfigure(newline='xyz') if not self.is_C: # TODO: Should fail in C too. with self.assertRaises(ValueError): txt.reconfigure(newline='\n\0') if self.is_C: # TODO: Use __bool__(), not __index__(). with self.assertRaises(ZeroDivisionError): txt.reconfigure(line_buffering=BadIndex()) with self.assertRaises(OverflowError): txt.reconfigure(line_buffering=2**1000) with self.assertRaises(ZeroDivisionError): txt.reconfigure(write_through=BadIndex()) with self.assertRaises(OverflowError): txt.reconfigure(write_through=2**1000) with self.assertRaises(ZeroDivisionError): # there was a crash txt.reconfigure(line_buffering=BadIndex(), write_through=BadIndex()) self.assertEqual(txt.encoding, 'ascii') self.assertEqual(txt.errors, 'replace') self.assertIs(txt.line_buffering, False) self.assertIs(txt.write_through, False) txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n', line_buffering=True, write_through=True) self.assertEqual(txt.encoding, 'latin1') self.assertEqual(txt.errors, 'ignore') self.assertIs(txt.line_buffering, True) self.assertIs(txt.write_through, True) def test_reconfigure_newline(self): raw = self.BytesIO(b'CR\rEOF') txt = self.TextIOWrapper(raw, 'ascii', newline='\n') txt.reconfigure(newline=None) self.assertEqual(txt.readline(), 'CR\n') raw = self.BytesIO(b'CR\rEOF') txt = self.TextIOWrapper(raw, 'ascii', newline='\n') txt.reconfigure(newline='') self.assertEqual(txt.readline(), 'CR\r') raw = self.BytesIO(b'CR\rLF\nEOF') txt = self.TextIOWrapper(raw, 'ascii', newline='\r') txt.reconfigure(newline='\n') self.assertEqual(txt.readline(), 'CR\rLF\n') raw = self.BytesIO(b'LF\nCR\rEOF') txt = self.TextIOWrapper(raw, 'ascii', newline='\n') txt.reconfigure(newline='\r') self.assertEqual(txt.readline(), 'LF\nCR\r') raw = self.BytesIO(b'CR\rCRLF\r\nEOF') txt = self.TextIOWrapper(raw, 'ascii', newline='\r') txt.reconfigure(newline='\r\n') self.assertEqual(txt.readline(), 'CR\rCRLF\r\n') txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r') txt.reconfigure(newline=None) txt.write('linesep\n') txt.reconfigure(newline='') txt.write('LF\n') txt.reconfigure(newline='\n') txt.write('LF\n') txt.reconfigure(newline='\r') txt.write('CR\n') txt.reconfigure(newline='\r\n') txt.write('CRLF\n') expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n' self.assertEqual(txt.detach().getvalue().decode('ascii'), expected) def test_issue25862(self): # Assertion failures occurred in tell() after read() and write(). t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') t.read(1) t.read() t.tell() t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii') t.read(1) t.write('x') t.tell() def test_issue35928(self): p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO()) f = self.TextIOWrapper(p) res = f.readline() self.assertEqual(res, 'foo\n') f.write(res) self.assertEqual(res + f.readline(), 'foo\nbar\n') def test_pickling_subclass(self): global MyTextIO class MyTextIO(self.TextIOWrapper): def __init__(self, raw, tag): super().__init__(raw) self.tag = tag def __getstate__(self): return self.tag, self.buffer.getvalue() def __setstate__(slf, state): tag, value = state slf.__init__(self.BytesIO(value), tag) raw = self.BytesIO(b'data') txt = MyTextIO(raw, 'ham') for proto in range(pickle.HIGHEST_PROTOCOL + 1): with self.subTest(protocol=proto): pickled = pickle.dumps(txt, proto) newtxt = pickle.loads(pickled) self.assertEqual(newtxt.buffer.getvalue(), b'data') self.assertEqual(newtxt.tag, 'ham') del MyTextIO @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") def test_read_non_blocking(self): import os r, w = os.pipe() try: os.set_blocking(r, False) with self.io.open(r, 'rt') as textfile: r = None # Nothing has been written so a non-blocking read raises a BlockingIOError exception. with self.assertRaises(BlockingIOError): textfile.read() finally: if r is not None: os.close(r) os.close(w) class MemviewBytesIO(io.BytesIO): '''A BytesIO object whose read method returns memoryviews rather than bytes''' def read1(self, len_): return _to_memoryview(super().read1(len_)) def read(self, len_): return _to_memoryview(super().read(len_)) def _to_memoryview(buf): '''Convert bytes-object *buf* to a non-trivial memoryview''' arr = array.array('i') idx = len(buf) - len(buf) % arr.itemsize arr.frombytes(buf[:idx]) return memoryview(arr) class CTextIOWrapperTest(TextIOWrapperTest, CTestCase): shutdown_error = "LookupError: unknown encoding: ascii" def test_initialization(self): r = self.BytesIO(b"\xc3\xa9\n\n") b = self.BufferedReader(r, 1000) t = self.TextIOWrapper(b, encoding="utf-8") self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy') self.assertRaises(ValueError, t.read) t = self.TextIOWrapper.__new__(self.TextIOWrapper) self.assertRaises(Exception, repr, t) def test_garbage_collection(self): # C TextIOWrapper objects are collected, and collecting them flushes # all data to disk. # The Python version has __del__, so it ends in gc.garbage instead. with warnings.catch_warnings(): warnings.simplefilter("ignore", ResourceWarning) rawio = self.FileIO(os_helper.TESTFN, "wb") b = self.BufferedWriter(rawio) t = self.TextIOWrapper(b, encoding="ascii") t.write("456def") t.x = t wr = weakref.ref(t) del t support.gc_collect() self.assertIsNone(wr(), wr) with self.open(os_helper.TESTFN, "rb") as f: self.assertEqual(f.read(), b"456def") def test_rwpair_cleared_before_textio(self): # Issue 13070: TextIOWrapper's finalization would crash when called # after the reference to the underlying BufferedRWPair's writer got # cleared by the GC. for i in range(1000): b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) t1 = self.TextIOWrapper(b1, encoding="ascii") b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO()) t2 = self.TextIOWrapper(b2, encoding="ascii") # circular references t1.buddy = t2 t2.buddy = t1 support.gc_collect() def test_del__CHUNK_SIZE_SystemError(self): t = self.TextIOWrapper(self.BytesIO(), encoding='ascii') with self.assertRaises(AttributeError): del t._CHUNK_SIZE def test_internal_buffer_size(self): # bpo-43260: TextIOWrapper's internal buffer should not store # data larger than chunk size. chunk_size = 8192 # default chunk size, updated later class MockIO(self.MockRawIO): def write(self, data): if len(data) > chunk_size: raise RuntimeError return super().write(data) buf = MockIO() t = self.TextIOWrapper(buf, encoding="ascii") chunk_size = t._CHUNK_SIZE t.write("abc") t.write("def") # default chunk size is 8192 bytes so t don't write data to buf. self.assertEqual([], buf._write_stack) with self.assertRaises(RuntimeError): t.write("x"*(chunk_size+1)) self.assertEqual([b"abcdef"], buf._write_stack) t.write("ghi") t.write("x"*chunk_size) self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack) def test_issue119506(self): chunk_size = 8192 class MockIO(self.MockRawIO): written = False def write(self, data): if not self.written: self.written = True t.write("middle") return super().write(data) buf = MockIO() t = self.TextIOWrapper(buf) t.write("abc") t.write("def") # writing data which size >= chunk_size cause flushing buffer before write. t.write("g" * chunk_size) t.flush() self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size], buf._write_stack) class PyTextIOWrapperTest(TextIOWrapperTest, PyTestCase): shutdown_error = "LookupError: unknown encoding: ascii" class IncrementalNewlineDecoderTest: def check_newline_decoding_utf8(self, decoder): # UTF-8 specific tests for a newline decoder def _check_decode(b, s, **kwargs): # We exercise getstate() / setstate() as well as decode() state = decoder.getstate() self.assertEqual(decoder.decode(b, **kwargs), s) decoder.setstate(state) self.assertEqual(decoder.decode(b, **kwargs), s) _check_decode(b'\xe8\xa2\x88', "\u8888") _check_decode(b'\xe8', "") _check_decode(b'\xa2', "") _check_decode(b'\x88', "\u8888") _check_decode(b'\xe8', "") _check_decode(b'\xa2', "") _check_decode(b'\x88', "\u8888") _check_decode(b'\xe8', "") self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True) decoder.reset() _check_decode(b'\n', "\n") _check_decode(b'\r', "") _check_decode(b'', "\n", final=True) _check_decode(b'\r', "\n", final=True) _check_decode(b'\r', "") _check_decode(b'a', "\na") _check_decode(b'\r\r\n', "\n\n") _check_decode(b'\r', "") _check_decode(b'\r', "\n") _check_decode(b'\na', "\na") _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n") _check_decode(b'\xe8\xa2\x88', "\u8888") _check_decode(b'\n', "\n") _check_decode(b'\xe8\xa2\x88\r', "\u8888") _check_decode(b'\n', "\n") def check_newline_decoding(self, decoder, encoding): result = [] if encoding is not None: encoder = codecs.getincrementalencoder(encoding)() def _decode_bytewise(s): # Decode one byte at a time for b in encoder.encode(s): result.append(decoder.decode(bytes([b]))) else: encoder = None def _decode_bytewise(s): # Decode one char at a time for c in s: result.append(decoder.decode(c)) self.assertEqual(decoder.newlines, None) _decode_bytewise("abc\n\r") self.assertEqual(decoder.newlines, '\n') _decode_bytewise("\nabc") self.assertEqual(decoder.newlines, ('\n', '\r\n')) _decode_bytewise("abc\r") self.assertEqual(decoder.newlines, ('\n', '\r\n')) _decode_bytewise("abc") self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n')) _decode_bytewise("abc\r") self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc") decoder.reset() input = "abc" if encoder is not None: encoder.reset() input = encoder.encode(input) self.assertEqual(decoder.decode(input), "abc") self.assertEqual(decoder.newlines, None) def test_newline_decoder(self): encodings = ( # None meaning the IncrementalNewlineDecoder takes unicode input # rather than bytes input None, 'utf-8', 'latin-1', 'utf-16', 'utf-16-le', 'utf-16-be', 'utf-32', 'utf-32-le', 'utf-32-be', ) for enc in encodings: decoder = enc and codecs.getincrementaldecoder(enc)() decoder = self.IncrementalNewlineDecoder(decoder, translate=True) self.check_newline_decoding(decoder, enc) decoder = codecs.getincrementaldecoder("utf-8")() decoder = self.IncrementalNewlineDecoder(decoder, translate=True) self.check_newline_decoding_utf8(decoder) self.assertRaises(TypeError, decoder.setstate, 42) def test_newline_bytes(self): # Issue 5433: Excessive optimization in IncrementalNewlineDecoder def _check(dec): self.assertEqual(dec.newlines, None) self.assertEqual(dec.decode("\u0D00"), "\u0D00") self.assertEqual(dec.newlines, None) self.assertEqual(dec.decode("\u0A00"), "\u0A00") self.assertEqual(dec.newlines, None) dec = self.IncrementalNewlineDecoder(None, translate=False) _check(dec) dec = self.IncrementalNewlineDecoder(None, translate=True) _check(dec) def test_translate(self): # issue 35062 for translate in (-2, -1, 1, 2): decoder = codecs.getincrementaldecoder("utf-8")() decoder = self.IncrementalNewlineDecoder(decoder, translate) self.check_newline_decoding_utf8(decoder) decoder = codecs.getincrementaldecoder("utf-8")() decoder = self.IncrementalNewlineDecoder(decoder, translate=0) self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n") class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase): IncrementalNewlineDecoder = io.IncrementalNewlineDecoder @support.cpython_only def test_uninitialized(self): uninitialized = self.IncrementalNewlineDecoder.__new__( self.IncrementalNewlineDecoder) self.assertRaises(ValueError, uninitialized.decode, b'bar') self.assertRaises(ValueError, uninitialized.getstate) self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0)) self.assertRaises(ValueError, uninitialized.reset) class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase): IncrementalNewlineDecoder = pyio.IncrementalNewlineDecoder