mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
Merge a254fec40e into 7099af8f5e
This commit is contained in:
commit
83f5353598
8 changed files with 76 additions and 20 deletions
|
|
@ -8,14 +8,20 @@
|
||||||
import codecs
|
import codecs
|
||||||
import base64
|
import base64
|
||||||
|
|
||||||
|
### Codec Helpers
|
||||||
|
|
||||||
|
def _assert_strict(errors):
|
||||||
|
if errors != 'strict':
|
||||||
|
raise ValueError(f'Unsupported error handling mode: "{errors}" - must be "strict"')
|
||||||
|
|
||||||
### Codec APIs
|
### Codec APIs
|
||||||
|
|
||||||
def base64_encode(input, errors='strict'):
|
def base64_encode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
return (base64.encodebytes(input), len(input))
|
return (base64.encodebytes(input), len(input))
|
||||||
|
|
||||||
def base64_decode(input, errors='strict'):
|
def base64_decode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
return (base64.decodebytes(input), len(input))
|
return (base64.decodebytes(input), len(input))
|
||||||
|
|
||||||
class Codec(codecs.Codec):
|
class Codec(codecs.Codec):
|
||||||
|
|
@ -26,12 +32,12 @@ def decode(self, input, errors='strict'):
|
||||||
|
|
||||||
class IncrementalEncoder(codecs.IncrementalEncoder):
|
class IncrementalEncoder(codecs.IncrementalEncoder):
|
||||||
def encode(self, input, final=False):
|
def encode(self, input, final=False):
|
||||||
assert self.errors == 'strict'
|
_assert_strict(self.errors)
|
||||||
return base64.encodebytes(input)
|
return base64.encodebytes(input)
|
||||||
|
|
||||||
class IncrementalDecoder(codecs.IncrementalDecoder):
|
class IncrementalDecoder(codecs.IncrementalDecoder):
|
||||||
def decode(self, input, final=False):
|
def decode(self, input, final=False):
|
||||||
assert self.errors == 'strict'
|
_assert_strict(self.errors)
|
||||||
return base64.decodebytes(input)
|
return base64.decodebytes(input)
|
||||||
|
|
||||||
class StreamWriter(Codec, codecs.StreamWriter):
|
class StreamWriter(Codec, codecs.StreamWriter):
|
||||||
|
|
|
||||||
|
|
@ -10,14 +10,20 @@
|
||||||
import codecs
|
import codecs
|
||||||
import bz2 # this codec needs the optional bz2 module !
|
import bz2 # this codec needs the optional bz2 module !
|
||||||
|
|
||||||
|
### Codec Helpers
|
||||||
|
|
||||||
|
def _assert_strict(errors):
|
||||||
|
if errors != 'strict':
|
||||||
|
raise ValueError(f'Unsupported error handling mode: "{errors}" - must be "strict"')
|
||||||
|
|
||||||
### Codec APIs
|
### Codec APIs
|
||||||
|
|
||||||
def bz2_encode(input, errors='strict'):
|
def bz2_encode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
return (bz2.compress(input), len(input))
|
return (bz2.compress(input), len(input))
|
||||||
|
|
||||||
def bz2_decode(input, errors='strict'):
|
def bz2_decode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
return (bz2.decompress(input), len(input))
|
return (bz2.decompress(input), len(input))
|
||||||
|
|
||||||
class Codec(codecs.Codec):
|
class Codec(codecs.Codec):
|
||||||
|
|
@ -28,7 +34,7 @@ def decode(self, input, errors='strict'):
|
||||||
|
|
||||||
class IncrementalEncoder(codecs.IncrementalEncoder):
|
class IncrementalEncoder(codecs.IncrementalEncoder):
|
||||||
def __init__(self, errors='strict'):
|
def __init__(self, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.compressobj = bz2.BZ2Compressor()
|
self.compressobj = bz2.BZ2Compressor()
|
||||||
|
|
||||||
|
|
@ -44,7 +50,7 @@ def reset(self):
|
||||||
|
|
||||||
class IncrementalDecoder(codecs.IncrementalDecoder):
|
class IncrementalDecoder(codecs.IncrementalDecoder):
|
||||||
def __init__(self, errors='strict'):
|
def __init__(self, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.decompressobj = bz2.BZ2Decompressor()
|
self.decompressobj = bz2.BZ2Decompressor()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -8,14 +8,20 @@
|
||||||
import codecs
|
import codecs
|
||||||
import binascii
|
import binascii
|
||||||
|
|
||||||
|
### Codec Helpers
|
||||||
|
|
||||||
|
def _assert_strict(errors):
|
||||||
|
if errors != 'strict':
|
||||||
|
raise ValueError(f'Unsupported error handling mode: "{errors}" - must be "strict"')
|
||||||
|
|
||||||
### Codec APIs
|
### Codec APIs
|
||||||
|
|
||||||
def hex_encode(input, errors='strict'):
|
def hex_encode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
return (binascii.b2a_hex(input), len(input))
|
return (binascii.b2a_hex(input), len(input))
|
||||||
|
|
||||||
def hex_decode(input, errors='strict'):
|
def hex_decode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
return (binascii.a2b_hex(input), len(input))
|
return (binascii.a2b_hex(input), len(input))
|
||||||
|
|
||||||
class Codec(codecs.Codec):
|
class Codec(codecs.Codec):
|
||||||
|
|
@ -26,12 +32,12 @@ def decode(self, input, errors='strict'):
|
||||||
|
|
||||||
class IncrementalEncoder(codecs.IncrementalEncoder):
|
class IncrementalEncoder(codecs.IncrementalEncoder):
|
||||||
def encode(self, input, final=False):
|
def encode(self, input, final=False):
|
||||||
assert self.errors == 'strict'
|
_assert_strict(self.errors)
|
||||||
return binascii.b2a_hex(input)
|
return binascii.b2a_hex(input)
|
||||||
|
|
||||||
class IncrementalDecoder(codecs.IncrementalDecoder):
|
class IncrementalDecoder(codecs.IncrementalDecoder):
|
||||||
def decode(self, input, final=False):
|
def decode(self, input, final=False):
|
||||||
assert self.errors == 'strict'
|
_assert_strict(self.errors)
|
||||||
return binascii.a2b_hex(input)
|
return binascii.a2b_hex(input)
|
||||||
|
|
||||||
class StreamWriter(Codec, codecs.StreamWriter):
|
class StreamWriter(Codec, codecs.StreamWriter):
|
||||||
|
|
|
||||||
|
|
@ -7,15 +7,23 @@
|
||||||
import quopri
|
import quopri
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
|
### Codec Helpers
|
||||||
|
|
||||||
|
def _assert_strict(errors):
|
||||||
|
if errors != 'strict':
|
||||||
|
raise ValueError(f'Unsupported error handling mode: "{errors}" - must be "strict"')
|
||||||
|
|
||||||
|
### Codec APIs
|
||||||
|
|
||||||
def quopri_encode(input, errors='strict'):
|
def quopri_encode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
f = BytesIO(input)
|
f = BytesIO(input)
|
||||||
g = BytesIO()
|
g = BytesIO()
|
||||||
quopri.encode(f, g, quotetabs=True)
|
quopri.encode(f, g, quotetabs=True)
|
||||||
return (g.getvalue(), len(input))
|
return (g.getvalue(), len(input))
|
||||||
|
|
||||||
def quopri_decode(input, errors='strict'):
|
def quopri_decode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
f = BytesIO(input)
|
f = BytesIO(input)
|
||||||
g = BytesIO()
|
g = BytesIO()
|
||||||
quopri.decode(f, g)
|
quopri.decode(f, g)
|
||||||
|
|
|
||||||
|
|
@ -11,10 +11,16 @@
|
||||||
import binascii
|
import binascii
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
|
||||||
|
### Codec Helpers
|
||||||
|
|
||||||
|
def _assert_strict(errors):
|
||||||
|
if errors != 'strict':
|
||||||
|
raise ValueError(f'Unsupported error handling mode: "{errors}" - must be "strict"')
|
||||||
|
|
||||||
### Codec APIs
|
### Codec APIs
|
||||||
|
|
||||||
def uu_encode(input, errors='strict', filename='<data>', mode=0o666):
|
def uu_encode(input, errors='strict', filename='<data>', mode=0o666):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
infile = BytesIO(input)
|
infile = BytesIO(input)
|
||||||
outfile = BytesIO()
|
outfile = BytesIO()
|
||||||
read = infile.read
|
read = infile.read
|
||||||
|
|
@ -35,7 +41,7 @@ def uu_encode(input, errors='strict', filename='<data>', mode=0o666):
|
||||||
return (outfile.getvalue(), len(input))
|
return (outfile.getvalue(), len(input))
|
||||||
|
|
||||||
def uu_decode(input, errors='strict'):
|
def uu_decode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
infile = BytesIO(input)
|
infile = BytesIO(input)
|
||||||
outfile = BytesIO()
|
outfile = BytesIO()
|
||||||
readline = infile.readline
|
readline = infile.readline
|
||||||
|
|
|
||||||
|
|
@ -8,14 +8,20 @@
|
||||||
import codecs
|
import codecs
|
||||||
import zlib # this codec needs the optional zlib module !
|
import zlib # this codec needs the optional zlib module !
|
||||||
|
|
||||||
|
### Codec Helpers
|
||||||
|
|
||||||
|
def _assert_strict(errors):
|
||||||
|
if errors != 'strict':
|
||||||
|
raise ValueError(f'Unsupported error handling mode: "{errors}" - must be "strict"')
|
||||||
|
|
||||||
### Codec APIs
|
### Codec APIs
|
||||||
|
|
||||||
def zlib_encode(input, errors='strict'):
|
def zlib_encode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
return (zlib.compress(input), len(input))
|
return (zlib.compress(input), len(input))
|
||||||
|
|
||||||
def zlib_decode(input, errors='strict'):
|
def zlib_decode(input, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
return (zlib.decompress(input), len(input))
|
return (zlib.decompress(input), len(input))
|
||||||
|
|
||||||
class Codec(codecs.Codec):
|
class Codec(codecs.Codec):
|
||||||
|
|
@ -26,7 +32,7 @@ def decode(self, input, errors='strict'):
|
||||||
|
|
||||||
class IncrementalEncoder(codecs.IncrementalEncoder):
|
class IncrementalEncoder(codecs.IncrementalEncoder):
|
||||||
def __init__(self, errors='strict'):
|
def __init__(self, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.compressobj = zlib.compressobj()
|
self.compressobj = zlib.compressobj()
|
||||||
|
|
||||||
|
|
@ -42,7 +48,7 @@ def reset(self):
|
||||||
|
|
||||||
class IncrementalDecoder(codecs.IncrementalDecoder):
|
class IncrementalDecoder(codecs.IncrementalDecoder):
|
||||||
def __init__(self, errors='strict'):
|
def __init__(self, errors='strict'):
|
||||||
assert errors == 'strict'
|
_assert_strict(errors)
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.decompressobj = zlib.decompressobj()
|
self.decompressobj = zlib.decompressobj()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3129,6 +3129,20 @@ def test_uu_invalid(self):
|
||||||
# Missing "begin" line
|
# Missing "begin" line
|
||||||
self.assertRaises(ValueError, codecs.decode, b"", "uu-codec")
|
self.assertRaises(ValueError, codecs.decode, b"", "uu-codec")
|
||||||
|
|
||||||
|
def test_invalid_error_input(self):
|
||||||
|
for encoding in bytes_transform_encodings:
|
||||||
|
with self.subTest(encoding=encoding):
|
||||||
|
encoder = codecs.getencoder(encoding)
|
||||||
|
decoder = codecs.getdecoder(encoding)
|
||||||
|
|
||||||
|
self.assertRaises(ValueError, encoder, 'in', errors='notstrict')
|
||||||
|
self.assertRaises(ValueError, decoder, 'in', errors='notstrict')
|
||||||
|
|
||||||
|
incdev = codecs.getincrementaldecoder(encoding)
|
||||||
|
if encoding not in ('base64_codec', 'uu_codec', 'quopri_codec', 'hex_codec'):
|
||||||
|
self.assertRaises(ValueError, incdev, errors='notstrict')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# The codec system tries to add notes to exceptions in order to ensure
|
# The codec system tries to add notes to exceptions in order to ensure
|
||||||
# the error mentions the operation being performed and the codec involved.
|
# the error mentions the operation being performed and the codec involved.
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
The ``base64_codec``, ``uu_codec``, ``quopri_codec``, ``hex_codec``,
|
||||||
|
``zlib_codec`` and ``bz2_codec`` now raise a :exc:`ValueError` when their
|
||||||
|
decoder/encoder is provided an *errors* parameter that is not equal to
|
||||||
|
``'strict'``.
|
||||||
Loading…
Add table
Add a link
Reference in a new issue