This commit is contained in:
Serhiy Storchaka 2025-12-08 06:11:11 +02:00 committed by GitHub
commit e1852542e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 254 additions and 0 deletions

View file

@ -1109,6 +1109,123 @@ def make_encoding_map(decoding_map):
m[v] = None
return m
_surrogates_re = None
def rehandle_surrogatepass(string, errors):
handler = None
global _surrogates_re
if not _surrogates_re:
import re
_surrogates_re = re.compile('[\ud800-\uefff]+')
pos = 0
res = []
while True:
m = _surrogates_re.search(string, pos)
if m:
if handler is None:
handler = lookup_error(errors)
res.append(string[pos: m.start()])
repl, pos = handler(UnicodeTranslateError(string, m.start(), m.end(),
'lone surrogates'))
res.append(repl)
elif pos:
res.append(string[pos:])
return ''.join(res)
else:
return string[:]
def rehandle_surrogateescape(string, errors):
handler = None
global _surrogates_re
if not _surrogates_re:
import re
_surrogates_re = re.compile('[\ud800-\uefff]+')
pos = 0
res = []
while True:
m = _surrogates_re.search(string, pos)
if m:
if handler is None:
handler = lookup_error(errors)
start = m.start()
res.append(string[pos: start])
try:
baddata = string[start: m.end()].encode('ascii', 'surrogateescape')
except UnicodeEncodeError as err:
raise UnicodeTranslateError(string,
err.start + start,err.end + start,
r'surrogates not in range \udc80-\udcff') from None
try:
repl, pos = handler(UnicodeDecodeError('unicode', baddata,
0, len(baddata),
'lone surrogates'))
except UnicodeDecodeError as err:
raise UnicodeTranslateError(string,
err.start + start,
err.end + start,
err.reason) from None
pos += start
res.append(repl)
elif pos:
res.append(string[pos:])
return ''.join(res)
else:
return string[:]
_astral_re = None
def handle_astrals(string, errors):
handler = None
global _astral_re
if not _astral_re:
import re
_astral_re = re.compile(r'[^\u0000-\uffff]+')
pos = 0
res = []
while True:
m = _astral_re.search(string, pos)
if m:
if handler is None:
handler = lookup_error(errors)
res.append(string[pos: m.start()])
repl, pos = handler(UnicodeTranslateError(string, m.start(), m.end(),
'astral characters'))
res.append(repl)
elif pos:
res.append(string[pos:])
return ''.join(res)
else:
return string[:]
def _decompose_astral(match):
res = []
for c in match.group():
k = ord(c) - 0x10000
res.append('%c%c' % (0xd800 + (k >> 10), 0xdc00 + (k & 0x3ff)))
return ''.join(res)
def decompose_astrals(string):
global _astral_re
if not _astral_re:
import re
_astral_re = re.compile(r'[^\u0000-\uffff]+')
return _astral_re.sub(_decompose_astral, string)
_surrogate_pair_re = None
def _compose_surrogate_pair(match):
hi, lo = match.group()
hi = ord(hi) - 0xd800
lo = ord(lo) - 0xdc00
return chr(0x10000 + (hi << 10) + lo)
def compose_surrogate_pairs(string):
global _surrogate_pair_re
if not _surrogate_pair_re:
import re
_surrogate_pair_re = re.compile(r'[\ud800-\udbff][\udc00-\udfff]')
return _surrogate_pair_re.sub(_compose_surrogate_pair, string)
### error handlers
strict_errors = lookup_error("strict")

View file

@ -1957,6 +1957,143 @@ def test_pickle(self):
self.assertFalse(unpickled_codec_info._is_text_encoding)
def test_rehandle_surrogatepass(self):
self.assertRaises(TypeError, codecs.rehandle_surrogatepass)
for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3',
'\U00010480\U0001d4ff'):
with self.subTest(str=s):
self.assertEqual(codecs.rehandle_surrogatepass(s, 'strict'), s)
with self.assertRaises(UnicodeTranslateError) as cm:
codecs.rehandle_surrogatepass('a\ud801\udc80b', 'strict')
self.assertEqual(cm.exception.encoding, None)
self.assertEqual(cm.exception.object, 'a\ud801\udc80b')
self.assertEqual(cm.exception.start, 1)
self.assertEqual(cm.exception.end, 3)
tests = [
('ignore', ('', '')),
('replace', ('\ufffd','\ufffd')),
('backslashreplace', ('\\ud801', '\\udc80')),
# ('namereplace', ('\\ud801', '\\udc80')),
# ('xmlcharrefreplace', ('&#55297;', '&#56448;')),
# ('surrogatepass', ('\ud801', '\udc80')),
]
for (error, args) in tests:
for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'):
data = tmpl.format('\ud801', '\udc80')
expected = tmpl.format(*args)
with self.subTest(error=error, data=data):
self.assertEqual(codecs.rehandle_surrogatepass(data, error),
expected)
def test_rehandle_surrogateescape(self):
self.assertRaises(TypeError, codecs.rehandle_surrogateescape)
for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3',
'\U00010480\U0001d4ff'):
with self.subTest(str=s):
self.assertEqual(codecs.rehandle_surrogateescape(s, 'strict'), s)
with self.assertRaises(UnicodeTranslateError) as cm:
codecs.rehandle_surrogateescape('a\udc80\udcffb', 'strict')
self.assertEqual(cm.exception.encoding, None)
self.assertEqual(cm.exception.object, 'a\udc80\udcffb')
self.assertEqual(cm.exception.start, 1)
self.assertEqual(cm.exception.end, 3)
with self.assertRaises(TypeError):
codecs.rehandle_surrogateescape('a\udc80b', 'namereplace')
with self.assertRaises(TypeError):
codecs.rehandle_surrogateescape('a\udc80b', 'xmlcharrefreplace')
with self.assertRaises(UnicodeTranslateError):
codecs.rehandle_surrogateescape('a\udc80b', 'surrogatepass')
tests = [
('ignore', ('', '')),
('replace', ('\ufffd','\ufffd')),
('backslashreplace', ('\\x80','\\xff')),
('surrogateescape', ('\udc80','\udcff')),
]
for (error, args) in tests:
for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'):
data = tmpl.format('\udc80', '\udcff')
expected = tmpl.format(*args)
if error == 'replace':
expected = expected.replace('\ufffd\ufffd', '\ufffd')
with self.subTest(error=error, data=data):
self.assertEqual(codecs.rehandle_surrogateescape(data, error),
expected)
for error in ('strict', 'ignore', 'replace',
'backslashreplace', 'namereplace', 'xmlcharrefreplace',
'surrogatepass', 'surrogateescape'):
with self.assertRaises(UnicodeTranslateError):
codecs.rehandle_surrogateescape('\udc7f', error)
with self.assertRaises(UnicodeTranslateError):
codecs.rehandle_surrogateescape('\udd00', error)
def test_handle_astrals(self):
self.assertRaises(TypeError, codecs.handle_astrals)
for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3',
'\ud801\udc80', '\udc80'):
with self.subTest(str=s):
self.assertEqual(codecs.handle_astrals(s, 'strict'), s)
with self.assertRaises(UnicodeTranslateError) as cm:
codecs.handle_astrals('a\U00010480\U0001d4ffb', 'strict')
self.assertEqual(cm.exception.encoding, None)
self.assertEqual(cm.exception.object, 'a\U00010480\U0001d4ffb')
self.assertEqual(cm.exception.start, 1)
self.assertEqual(cm.exception.end, 3)
# with self.assertRaises(UnicodeTranslateError):
# codecs.handle_astrals('a\U00010480b', 'surrogatepass')
with self.assertRaises(TypeError):
codecs.handle_astrals('a\U00010480b', 'surrogateescape')
tests = [
('ignore', ('', '')),
('replace', ('\ufffd','\ufffd')),
('backslashreplace', ('\\U00010480', '\\U0001d4ff')),
# ('namereplace', ('\\N{OSMANYA LETTER ALEF}',
# '\\N{MATHEMATICAL BOLD SCRIPT SMALL V}')),
# ('xmlcharrefreplace', ('&#66688;','&#120063;')),
]
for (error, args) in tests:
for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'):
data = tmpl.format('\U00010480', '\U0001d4ff')
expected = tmpl.format(*args)
with self.subTest(error=error, data=data):
self.assertEqual(codecs.handle_astrals(data, error),
expected)
def test_decompose_astrals(self):
self.assertRaises(TypeError, codecs.decompose_astrals)
tests = [
('abc', 'abc'),
('\xe0\xdf\xe7', '\xe0\xdf\xe7'),
('\u03b1\u03b2\u03b3', '\u03b1\u03b2\u03b3'),
('a\U00010480b', 'a\ud801\udc80b'),
('a\U00010480b\U0001d4ff', 'a\ud801\udc80b\ud835\udcff'),
('a\U00010480\U0001d4ffc', 'a\ud801\udc80\ud835\udcffc'),
('a\U00010480b\U0001d4ffc', 'a\ud801\udc80b\ud835\udcffc'),
('a\ud801\udc80b', 'a\ud801\udc80b'),
('a\udc80b', 'a\udc80b'),
]
for s, r in tests:
with self.subTest(str=s):
self.assertEqual(codecs.decompose_astrals(s), r)
def test_compose_surrogate_pairs(self):
self.assertRaises(TypeError, codecs.compose_surrogate_pairs)
tests = [
('abc', 'abc'),
('\xe0\xdf\xe7', '\xe0\xdf\xe7'),
('\u03b1\u03b2\u03b3', '\u03b1\u03b2\u03b3'),
('a\ud801\udc80b', 'a\U00010480b'),
('a\ud801\udc80b\ud835\udcff', 'a\U00010480b\U0001d4ff'),
('a\ud801\udc80\ud835\udcffc', 'a\U00010480\U0001d4ffc'),
('a\ud801\udc80b\ud835\udcffc', 'a\U00010480b\U0001d4ffc'),
('a\udc80\ud801\ud801\udc80b', 'a\udc80\ud801\U00010480b'),
('a\ud801\udc80\udc80\ud801b', 'a\U00010480\udc80\ud801b'),
('a\udc80b', 'a\udc80b'),
]
for s, r in tests:
with self.subTest(str=s):
self.assertEqual(codecs.compose_surrogate_pairs(s), r)
class StreamReaderTest(unittest.TestCase):
def setUp(self):