mirror of
https://github.com/python/cpython.git
synced 2025-11-11 02:52:04 +00:00
Revert my commit 3555cf6f9c98: "Issue #8796: codecs.open() calls the builtin
open() function instead of using StreamReaderWriter. Deprecate StreamReader, StreamWriter, StreamReaderWriter, StreamRecoder and EncodedFile() of the codec module. Use the builtin open() function or io.TextIOWrapper instead." "It has not been approved !" wrote Marc-Andre Lemburg.
This commit is contained in:
parent
4f2dab5c33
commit
0501070669
4 changed files with 59 additions and 148 deletions
|
|
@ -1,10 +1,7 @@
|
|||
from test import support
|
||||
import _testcapi
|
||||
import codecs
|
||||
import io
|
||||
import sys
|
||||
import unittest
|
||||
import warnings
|
||||
import codecs
|
||||
import sys, _testcapi, io
|
||||
|
||||
class Queue(object):
|
||||
"""
|
||||
|
|
@ -66,9 +63,7 @@ def check_partial(self, input, partialresults):
|
|||
# the StreamReader and check that the results equal the appropriate
|
||||
# entries from partialresults.
|
||||
q = Queue(b"")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
r = codecs.getreader(self.encoding)(q)
|
||||
r = codecs.getreader(self.encoding)(q)
|
||||
result = ""
|
||||
for (c, partialresult) in zip(input.encode(self.encoding), partialresults):
|
||||
q.write(bytes([c]))
|
||||
|
|
@ -111,9 +106,7 @@ def getreader(input):
|
|||
return codecs.getreader(self.encoding)(stream)
|
||||
|
||||
def readalllines(input, keepends=True, size=None):
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = getreader(input)
|
||||
reader = getreader(input)
|
||||
lines = []
|
||||
while True:
|
||||
line = reader.readline(size=size, keepends=keepends)
|
||||
|
|
@ -222,18 +215,14 @@ def test_bug1175396(self):
|
|||
' \r\n',
|
||||
]
|
||||
stream = io.BytesIO("".join(s).encode(self.encoding))
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
for (i, line) in enumerate(reader):
|
||||
self.assertEqual(line, s[i])
|
||||
|
||||
def test_readlinequeue(self):
|
||||
q = Queue(b"")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
writer = codecs.getwriter(self.encoding)(q)
|
||||
reader = codecs.getreader(self.encoding)(q)
|
||||
writer = codecs.getwriter(self.encoding)(q)
|
||||
reader = codecs.getreader(self.encoding)(q)
|
||||
|
||||
# No lineends
|
||||
writer.write("foo\r")
|
||||
|
|
@ -264,9 +253,7 @@ def test_bug1098990_a(self):
|
|||
|
||||
s = (s1+s2+s3).encode(self.encoding)
|
||||
stream = io.BytesIO(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
self.assertEqual(reader.readline(), s1)
|
||||
self.assertEqual(reader.readline(), s2)
|
||||
self.assertEqual(reader.readline(), s3)
|
||||
|
|
@ -281,9 +268,7 @@ def test_bug1098990_b(self):
|
|||
|
||||
s = (s1+s2+s3+s4+s5).encode(self.encoding)
|
||||
stream = io.BytesIO(s)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
reader = codecs.getreader(self.encoding)(stream)
|
||||
self.assertEqual(reader.readline(), s1)
|
||||
self.assertEqual(reader.readline(), s2)
|
||||
self.assertEqual(reader.readline(), s3)
|
||||
|
|
@ -305,9 +290,7 @@ def test_only_one_bom(self):
|
|||
_,_,reader,writer = codecs.lookup(self.encoding)
|
||||
# encode some stream
|
||||
s = io.BytesIO()
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = writer(s)
|
||||
f = writer(s)
|
||||
f.write("spam")
|
||||
f.write("spam")
|
||||
d = s.getvalue()
|
||||
|
|
@ -315,22 +298,16 @@ def test_only_one_bom(self):
|
|||
self.assertTrue(d == self.spamle or d == self.spambe)
|
||||
# try to read it back
|
||||
s = io.BytesIO(d)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = reader(s)
|
||||
f = reader(s)
|
||||
self.assertEqual(f.read(), "spamspam")
|
||||
|
||||
def test_badbom(self):
|
||||
s = io.BytesIO(4*b"\xff")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
self.assertRaises(UnicodeError, f.read)
|
||||
|
||||
s = io.BytesIO(8*b"\xff")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
self.assertRaises(UnicodeError, f.read)
|
||||
|
||||
def test_partial(self):
|
||||
|
|
@ -477,9 +454,7 @@ def test_only_one_bom(self):
|
|||
_,_,reader,writer = codecs.lookup(self.encoding)
|
||||
# encode some stream
|
||||
s = io.BytesIO()
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = writer(s)
|
||||
f = writer(s)
|
||||
f.write("spam")
|
||||
f.write("spam")
|
||||
d = s.getvalue()
|
||||
|
|
@ -487,22 +462,16 @@ def test_only_one_bom(self):
|
|||
self.assertTrue(d == self.spamle or d == self.spambe)
|
||||
# try to read it back
|
||||
s = io.BytesIO(d)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = reader(s)
|
||||
f = reader(s)
|
||||
self.assertEqual(f.read(), "spamspam")
|
||||
|
||||
def test_badbom(self):
|
||||
s = io.BytesIO(b"\xff\xff")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
self.assertRaises(UnicodeError, f.read)
|
||||
|
||||
s = io.BytesIO(b"\xff\xff\xff\xff")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
f = codecs.getreader(self.encoding)(s)
|
||||
self.assertRaises(UnicodeError, f.read)
|
||||
|
||||
def test_partial(self):
|
||||
|
|
@ -548,8 +517,7 @@ def test_bug691291(self):
|
|||
self.addCleanup(support.unlink, support.TESTFN)
|
||||
with open(support.TESTFN, 'wb') as fp:
|
||||
fp.write(s)
|
||||
with codecs.open(support.TESTFN, 'U',
|
||||
encoding=self.encoding) as reader:
|
||||
with codecs.open(support.TESTFN, 'U', encoding=self.encoding) as reader:
|
||||
self.assertEqual(reader.read(), s1)
|
||||
|
||||
class UTF16LETest(ReadTest):
|
||||
|
|
@ -737,9 +705,7 @@ def test_stream_bom(self):
|
|||
reader = codecs.getreader("utf-8-sig")
|
||||
for sizehint in [None] + list(range(1, 11)) + \
|
||||
[64, 128, 256, 512, 1024]:
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
istream = reader(io.BytesIO(bytestring))
|
||||
istream = reader(io.BytesIO(bytestring))
|
||||
ostream = io.StringIO()
|
||||
while 1:
|
||||
if sizehint is not None:
|
||||
|
|
@ -761,9 +727,7 @@ def test_stream_bare(self):
|
|||
reader = codecs.getreader("utf-8-sig")
|
||||
for sizehint in [None] + list(range(1, 11)) + \
|
||||
[64, 128, 256, 512, 1024]:
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
istream = reader(io.BytesIO(bytestring))
|
||||
istream = reader(io.BytesIO(bytestring))
|
||||
ostream = io.StringIO()
|
||||
while 1:
|
||||
if sizehint is not None:
|
||||
|
|
@ -785,9 +749,7 @@ def test_empty(self):
|
|||
class RecodingTest(unittest.TestCase):
|
||||
def test_recoding(self):
|
||||
f = io.BytesIO()
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
|
||||
f2 = codecs.EncodedFile(f, "unicode_internal", "utf-8")
|
||||
f2.write("a")
|
||||
f2.close()
|
||||
# Python used to crash on this at exit because of a refcount
|
||||
|
|
@ -1164,9 +1126,7 @@ def test_builtin_encode(self):
|
|||
self.assertEqual("pyth\xf6n.org.".encode("idna"), b"xn--pythn-mua.org.")
|
||||
|
||||
def test_stream(self):
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
r = codecs.getreader("idna")(io.BytesIO(b"abc"))
|
||||
r = codecs.getreader("idna")(io.BytesIO(b"abc"))
|
||||
r.read(3)
|
||||
self.assertEqual(r.read(), "")
|
||||
|
||||
|
|
@ -1273,24 +1233,18 @@ def test_getwriter(self):
|
|||
class StreamReaderTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
self.reader = codecs.getreader('utf-8')
|
||||
self.reader = codecs.getreader('utf-8')
|
||||
self.stream = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
|
||||
|
||||
def test_readlines(self):
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
f = self.reader(self.stream)
|
||||
f = self.reader(self.stream)
|
||||
self.assertEqual(f.readlines(), ['\ud55c\n', '\uae00'])
|
||||
|
||||
class EncodedFileTest(unittest.TestCase):
|
||||
|
||||
def test_basic(self):
|
||||
f = io.BytesIO(b'\xed\x95\x9c\n\xea\xb8\x80')
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
|
||||
ef = codecs.EncodedFile(f, 'utf-16-le', 'utf-8')
|
||||
self.assertEqual(ef.read(), b'\\\xd5\n\x00\x00\xae')
|
||||
|
||||
f = io.BytesIO()
|
||||
|
|
@ -1434,9 +1388,7 @@ def test_basics(self):
|
|||
if encoding not in broken_unicode_with_streams:
|
||||
# check stream reader/writer
|
||||
q = Queue(b"")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
writer = codecs.getwriter(encoding)(q)
|
||||
writer = codecs.getwriter(encoding)(q)
|
||||
encodedresult = b""
|
||||
for c in s:
|
||||
writer.write(c)
|
||||
|
|
@ -1444,9 +1396,7 @@ def test_basics(self):
|
|||
self.assertTrue(type(chunk) is bytes, type(chunk))
|
||||
encodedresult += chunk
|
||||
q = Queue(b"")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(encoding)(q)
|
||||
reader = codecs.getreader(encoding)(q)
|
||||
decodedresult = ""
|
||||
for c in encodedresult:
|
||||
q.write(bytes([c]))
|
||||
|
|
@ -1520,9 +1470,7 @@ def test_seek(self):
|
|||
continue
|
||||
if encoding in broken_unicode_with_streams:
|
||||
continue
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(s.encode(encoding)))
|
||||
for t in range(5):
|
||||
# Test that calling seek resets the internal codec state and buffers
|
||||
reader.seek(0, 0)
|
||||
|
|
@ -1591,19 +1539,15 @@ def test_decode_with_string_map(self):
|
|||
class WithStmtTest(unittest.TestCase):
|
||||
def test_encodedfile(self):
|
||||
f = io.BytesIO(b"\xc3\xbc")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
|
||||
self.assertEqual(ef.read(), b"\xfc")
|
||||
with codecs.EncodedFile(f, "latin-1", "utf-8") as ef:
|
||||
self.assertEqual(ef.read(), b"\xfc")
|
||||
|
||||
def test_streamreaderwriter(self):
|
||||
f = io.BytesIO(b"\xc3\xbc")
|
||||
info = codecs.lookup("utf-8")
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
with codecs.StreamReaderWriter(f, info.streamreader,
|
||||
info.streamwriter, 'strict') as srw:
|
||||
self.assertEqual(srw.read(), "\xfc")
|
||||
with codecs.StreamReaderWriter(f, info.streamreader,
|
||||
info.streamwriter, 'strict') as srw:
|
||||
self.assertEqual(srw.read(), "\xfc")
|
||||
|
||||
class TypesTest(unittest.TestCase):
|
||||
def test_decode_unicode(self):
|
||||
|
|
@ -1700,15 +1644,15 @@ def test_seek0(self):
|
|||
|
||||
# (StreamWriter) Check that the BOM is written after a seek(0)
|
||||
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
|
||||
f.write(data[0])
|
||||
self.assertNotEqual(f.tell(), 0)
|
||||
f.seek(0)
|
||||
f.write(data)
|
||||
f.writer.write(data[0])
|
||||
self.assertNotEqual(f.writer.tell(), 0)
|
||||
f.writer.seek(0)
|
||||
f.writer.write(data)
|
||||
f.seek(0)
|
||||
self.assertEqual(f.read(), data)
|
||||
|
||||
# Check that the BOM is not written after a seek() at a
|
||||
# position different than the start
|
||||
# Check that the BOM is not written after a seek() at a position
|
||||
# different than the start
|
||||
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
|
||||
f.write(data)
|
||||
f.seek(f.tell())
|
||||
|
|
@ -1716,12 +1660,12 @@ def test_seek0(self):
|
|||
f.seek(0)
|
||||
self.assertEqual(f.read(), data * 2)
|
||||
|
||||
# (StreamWriter) Check that the BOM is not written after a
|
||||
# seek() at a position different than the start
|
||||
# (StreamWriter) Check that the BOM is not written after a seek()
|
||||
# at a position different than the start
|
||||
with codecs.open(support.TESTFN, 'w+', encoding=encoding) as f:
|
||||
f.write(data)
|
||||
f.seek(f.tell())
|
||||
f.write(data)
|
||||
f.writer.write(data)
|
||||
f.writer.seek(f.writer.tell())
|
||||
f.writer.write(data)
|
||||
f.seek(0)
|
||||
self.assertEqual(f.read(), data * 2)
|
||||
|
||||
|
|
@ -1760,9 +1704,7 @@ def test_basics(self):
|
|||
def test_read(self):
|
||||
for encoding in bytes_transform_encodings:
|
||||
sin = codecs.encode(b"\x80", encoding)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
sout = reader.read()
|
||||
self.assertEqual(sout, b"\x80")
|
||||
|
||||
|
|
@ -1771,9 +1713,7 @@ def test_readline(self):
|
|||
if encoding in ['uu_codec', 'zlib_codec']:
|
||||
continue
|
||||
sin = codecs.encode(b"\x80", encoding)
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("ignore", DeprecationWarning)
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
reader = codecs.getreader(encoding)(io.BytesIO(sin))
|
||||
sout = reader.readline()
|
||||
self.assertEqual(sout, b"\x80")
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue