mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
gh-63014: Add utilites to transform surrogate codes and astral characters
This commit is contained in:
parent
af8c3d7a26
commit
d33addde0c
2 changed files with 254 additions and 0 deletions
117
Lib/codecs.py
117
Lib/codecs.py
|
|
@ -1107,6 +1107,123 @@ def make_encoding_map(decoding_map):
|
||||||
m[v] = None
|
m[v] = None
|
||||||
return m
|
return m
|
||||||
|
|
||||||
|
_surrogates_re = None
|
||||||
|
|
||||||
|
def rehandle_surrogatepass(string, errors):
|
||||||
|
handler = None
|
||||||
|
global _surrogates_re
|
||||||
|
if not _surrogates_re:
|
||||||
|
import re
|
||||||
|
_surrogates_re = re.compile('[\ud800-\uefff]+')
|
||||||
|
pos = 0
|
||||||
|
res = []
|
||||||
|
while True:
|
||||||
|
m = _surrogates_re.search(string, pos)
|
||||||
|
if m:
|
||||||
|
if handler is None:
|
||||||
|
handler = lookup_error(errors)
|
||||||
|
res.append(string[pos: m.start()])
|
||||||
|
repl, pos = handler(UnicodeTranslateError(string, m.start(), m.end(),
|
||||||
|
'lone surrogates'))
|
||||||
|
res.append(repl)
|
||||||
|
elif pos:
|
||||||
|
res.append(string[pos:])
|
||||||
|
return ''.join(res)
|
||||||
|
else:
|
||||||
|
return string[:]
|
||||||
|
|
||||||
|
def rehandle_surrogateescape(string, errors):
|
||||||
|
handler = None
|
||||||
|
global _surrogates_re
|
||||||
|
if not _surrogates_re:
|
||||||
|
import re
|
||||||
|
_surrogates_re = re.compile('[\ud800-\uefff]+')
|
||||||
|
pos = 0
|
||||||
|
res = []
|
||||||
|
while True:
|
||||||
|
m = _surrogates_re.search(string, pos)
|
||||||
|
if m:
|
||||||
|
if handler is None:
|
||||||
|
handler = lookup_error(errors)
|
||||||
|
start = m.start()
|
||||||
|
res.append(string[pos: start])
|
||||||
|
try:
|
||||||
|
baddata = string[start: m.end()].encode('ascii', 'surrogateescape')
|
||||||
|
except UnicodeEncodeError as err:
|
||||||
|
raise UnicodeTranslateError(string,
|
||||||
|
err.start + start,err.end + start,
|
||||||
|
r'surrogates not in range \udc80-\udcff') from None
|
||||||
|
try:
|
||||||
|
repl, pos = handler(UnicodeDecodeError('unicode', baddata,
|
||||||
|
0, len(baddata),
|
||||||
|
'lone surrogates'))
|
||||||
|
except UnicodeDecodeError as err:
|
||||||
|
raise UnicodeTranslateError(string,
|
||||||
|
err.start + start,
|
||||||
|
err.end + start,
|
||||||
|
err.reason) from None
|
||||||
|
pos += start
|
||||||
|
res.append(repl)
|
||||||
|
elif pos:
|
||||||
|
res.append(string[pos:])
|
||||||
|
return ''.join(res)
|
||||||
|
else:
|
||||||
|
return string[:]
|
||||||
|
|
||||||
|
_astral_re = None
|
||||||
|
|
||||||
|
def handle_astrals(string, errors):
|
||||||
|
handler = None
|
||||||
|
global _astral_re
|
||||||
|
if not _astral_re:
|
||||||
|
import re
|
||||||
|
_astral_re = re.compile(r'[^\u0000-\uffff]+')
|
||||||
|
pos = 0
|
||||||
|
res = []
|
||||||
|
while True:
|
||||||
|
m = _astral_re.search(string, pos)
|
||||||
|
if m:
|
||||||
|
if handler is None:
|
||||||
|
handler = lookup_error(errors)
|
||||||
|
res.append(string[pos: m.start()])
|
||||||
|
repl, pos = handler(UnicodeTranslateError(string, m.start(), m.end(),
|
||||||
|
'astral characters'))
|
||||||
|
res.append(repl)
|
||||||
|
elif pos:
|
||||||
|
res.append(string[pos:])
|
||||||
|
return ''.join(res)
|
||||||
|
else:
|
||||||
|
return string[:]
|
||||||
|
|
||||||
|
def _decompose_astral(match):
|
||||||
|
res = []
|
||||||
|
for c in match.group():
|
||||||
|
k = ord(c) - 0x10000
|
||||||
|
res.append('%c%c' % (0xd800 + (k >> 10), 0xdc00 + (k & 0x3ff)))
|
||||||
|
return ''.join(res)
|
||||||
|
|
||||||
|
def decompose_astrals(string):
|
||||||
|
global _astral_re
|
||||||
|
if not _astral_re:
|
||||||
|
import re
|
||||||
|
_astral_re = re.compile(r'[^\u0000-\uffff]+')
|
||||||
|
return _astral_re.sub(_decompose_astral, string)
|
||||||
|
|
||||||
|
_surrogate_pair_re = None
|
||||||
|
|
||||||
|
def _compose_surrogate_pair(match):
|
||||||
|
hi, lo = match.group()
|
||||||
|
hi = ord(hi) - 0xd800
|
||||||
|
lo = ord(lo) - 0xdc00
|
||||||
|
return chr(0x10000 + (hi << 10) + lo)
|
||||||
|
|
||||||
|
def compose_surrogate_pairs(string):
|
||||||
|
global _surrogate_pair_re
|
||||||
|
if not _surrogate_pair_re:
|
||||||
|
import re
|
||||||
|
_surrogate_pair_re = re.compile(r'[\ud800-\udbff][\udc00-\udfff]')
|
||||||
|
return _surrogate_pair_re.sub(_compose_surrogate_pair, string)
|
||||||
|
|
||||||
### error handlers
|
### error handlers
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -1945,6 +1945,143 @@ def test_pickle(self):
|
||||||
self.assertFalse(unpickled_codec_info._is_text_encoding)
|
self.assertFalse(unpickled_codec_info._is_text_encoding)
|
||||||
|
|
||||||
|
|
||||||
|
def test_rehandle_surrogatepass(self):
|
||||||
|
self.assertRaises(TypeError, codecs.rehandle_surrogatepass)
|
||||||
|
for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3',
|
||||||
|
'\U00010480\U0001d4ff'):
|
||||||
|
with self.subTest(str=s):
|
||||||
|
self.assertEqual(codecs.rehandle_surrogatepass(s, 'strict'), s)
|
||||||
|
with self.assertRaises(UnicodeTranslateError) as cm:
|
||||||
|
codecs.rehandle_surrogatepass('a\ud801\udc80b', 'strict')
|
||||||
|
self.assertEqual(cm.exception.encoding, None)
|
||||||
|
self.assertEqual(cm.exception.object, 'a\ud801\udc80b')
|
||||||
|
self.assertEqual(cm.exception.start, 1)
|
||||||
|
self.assertEqual(cm.exception.end, 3)
|
||||||
|
tests = [
|
||||||
|
('ignore', ('', '')),
|
||||||
|
('replace', ('\ufffd','\ufffd')),
|
||||||
|
('backslashreplace', ('\\ud801', '\\udc80')),
|
||||||
|
# ('namereplace', ('\\ud801', '\\udc80')),
|
||||||
|
# ('xmlcharrefreplace', ('�', '�')),
|
||||||
|
# ('surrogatepass', ('\ud801', '\udc80')),
|
||||||
|
]
|
||||||
|
for (error, args) in tests:
|
||||||
|
for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'):
|
||||||
|
data = tmpl.format('\ud801', '\udc80')
|
||||||
|
expected = tmpl.format(*args)
|
||||||
|
with self.subTest(error=error, data=data):
|
||||||
|
self.assertEqual(codecs.rehandle_surrogatepass(data, error),
|
||||||
|
expected)
|
||||||
|
|
||||||
|
def test_rehandle_surrogateescape(self):
|
||||||
|
self.assertRaises(TypeError, codecs.rehandle_surrogateescape)
|
||||||
|
for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3',
|
||||||
|
'\U00010480\U0001d4ff'):
|
||||||
|
with self.subTest(str=s):
|
||||||
|
self.assertEqual(codecs.rehandle_surrogateescape(s, 'strict'), s)
|
||||||
|
with self.assertRaises(UnicodeTranslateError) as cm:
|
||||||
|
codecs.rehandle_surrogateescape('a\udc80\udcffb', 'strict')
|
||||||
|
self.assertEqual(cm.exception.encoding, None)
|
||||||
|
self.assertEqual(cm.exception.object, 'a\udc80\udcffb')
|
||||||
|
self.assertEqual(cm.exception.start, 1)
|
||||||
|
self.assertEqual(cm.exception.end, 3)
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
codecs.rehandle_surrogateescape('a\udc80b', 'namereplace')
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
codecs.rehandle_surrogateescape('a\udc80b', 'xmlcharrefreplace')
|
||||||
|
with self.assertRaises(UnicodeTranslateError):
|
||||||
|
codecs.rehandle_surrogateescape('a\udc80b', 'surrogatepass')
|
||||||
|
tests = [
|
||||||
|
('ignore', ('', '')),
|
||||||
|
('replace', ('\ufffd','\ufffd')),
|
||||||
|
('backslashreplace', ('\\x80','\\xff')),
|
||||||
|
('surrogateescape', ('\udc80','\udcff')),
|
||||||
|
]
|
||||||
|
for (error, args) in tests:
|
||||||
|
for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'):
|
||||||
|
data = tmpl.format('\udc80', '\udcff')
|
||||||
|
expected = tmpl.format(*args)
|
||||||
|
if error == 'replace':
|
||||||
|
expected = expected.replace('\ufffd\ufffd', '\ufffd')
|
||||||
|
with self.subTest(error=error, data=data):
|
||||||
|
self.assertEqual(codecs.rehandle_surrogateescape(data, error),
|
||||||
|
expected)
|
||||||
|
for error in ('strict', 'ignore', 'replace',
|
||||||
|
'backslashreplace', 'namereplace', 'xmlcharrefreplace',
|
||||||
|
'surrogatepass', 'surrogateescape'):
|
||||||
|
with self.assertRaises(UnicodeTranslateError):
|
||||||
|
codecs.rehandle_surrogateescape('\udc7f', error)
|
||||||
|
with self.assertRaises(UnicodeTranslateError):
|
||||||
|
codecs.rehandle_surrogateescape('\udd00', error)
|
||||||
|
|
||||||
|
def test_handle_astrals(self):
|
||||||
|
self.assertRaises(TypeError, codecs.handle_astrals)
|
||||||
|
for s in ('', 'abc', '\xe0\xdf\xe7', '\u03b1\u03b2\u03b3',
|
||||||
|
'\ud801\udc80', '\udc80'):
|
||||||
|
with self.subTest(str=s):
|
||||||
|
self.assertEqual(codecs.handle_astrals(s, 'strict'), s)
|
||||||
|
with self.assertRaises(UnicodeTranslateError) as cm:
|
||||||
|
codecs.handle_astrals('a\U00010480\U0001d4ffb', 'strict')
|
||||||
|
self.assertEqual(cm.exception.encoding, None)
|
||||||
|
self.assertEqual(cm.exception.object, 'a\U00010480\U0001d4ffb')
|
||||||
|
self.assertEqual(cm.exception.start, 1)
|
||||||
|
self.assertEqual(cm.exception.end, 3)
|
||||||
|
# with self.assertRaises(UnicodeTranslateError):
|
||||||
|
# codecs.handle_astrals('a\U00010480b', 'surrogatepass')
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
codecs.handle_astrals('a\U00010480b', 'surrogateescape')
|
||||||
|
tests = [
|
||||||
|
('ignore', ('', '')),
|
||||||
|
('replace', ('\ufffd','\ufffd')),
|
||||||
|
('backslashreplace', ('\\U00010480', '\\U0001d4ff')),
|
||||||
|
# ('namereplace', ('\\N{OSMANYA LETTER ALEF}',
|
||||||
|
# '\\N{MATHEMATICAL BOLD SCRIPT SMALL V}')),
|
||||||
|
# ('xmlcharrefreplace', ('𐒀','𝓿')),
|
||||||
|
]
|
||||||
|
for (error, args) in tests:
|
||||||
|
for tmpl in ('a{}b', 'a{}b{}', 'a{}{}c', 'a{}b{}c'):
|
||||||
|
data = tmpl.format('\U00010480', '\U0001d4ff')
|
||||||
|
expected = tmpl.format(*args)
|
||||||
|
with self.subTest(error=error, data=data):
|
||||||
|
self.assertEqual(codecs.handle_astrals(data, error),
|
||||||
|
expected)
|
||||||
|
|
||||||
|
def test_decompose_astrals(self):
|
||||||
|
self.assertRaises(TypeError, codecs.decompose_astrals)
|
||||||
|
tests = [
|
||||||
|
('abc', 'abc'),
|
||||||
|
('\xe0\xdf\xe7', '\xe0\xdf\xe7'),
|
||||||
|
('\u03b1\u03b2\u03b3', '\u03b1\u03b2\u03b3'),
|
||||||
|
('a\U00010480b', 'a\ud801\udc80b'),
|
||||||
|
('a\U00010480b\U0001d4ff', 'a\ud801\udc80b\ud835\udcff'),
|
||||||
|
('a\U00010480\U0001d4ffc', 'a\ud801\udc80\ud835\udcffc'),
|
||||||
|
('a\U00010480b\U0001d4ffc', 'a\ud801\udc80b\ud835\udcffc'),
|
||||||
|
('a\ud801\udc80b', 'a\ud801\udc80b'),
|
||||||
|
('a\udc80b', 'a\udc80b'),
|
||||||
|
]
|
||||||
|
for s, r in tests:
|
||||||
|
with self.subTest(str=s):
|
||||||
|
self.assertEqual(codecs.decompose_astrals(s), r)
|
||||||
|
|
||||||
|
def test_compose_surrogate_pairs(self):
|
||||||
|
self.assertRaises(TypeError, codecs.compose_surrogate_pairs)
|
||||||
|
tests = [
|
||||||
|
('abc', 'abc'),
|
||||||
|
('\xe0\xdf\xe7', '\xe0\xdf\xe7'),
|
||||||
|
('\u03b1\u03b2\u03b3', '\u03b1\u03b2\u03b3'),
|
||||||
|
('a\ud801\udc80b', 'a\U00010480b'),
|
||||||
|
('a\ud801\udc80b\ud835\udcff', 'a\U00010480b\U0001d4ff'),
|
||||||
|
('a\ud801\udc80\ud835\udcffc', 'a\U00010480\U0001d4ffc'),
|
||||||
|
('a\ud801\udc80b\ud835\udcffc', 'a\U00010480b\U0001d4ffc'),
|
||||||
|
('a\udc80\ud801\ud801\udc80b', 'a\udc80\ud801\U00010480b'),
|
||||||
|
('a\ud801\udc80\udc80\ud801b', 'a\U00010480\udc80\ud801b'),
|
||||||
|
('a\udc80b', 'a\udc80b'),
|
||||||
|
]
|
||||||
|
for s, r in tests:
|
||||||
|
with self.subTest(str=s):
|
||||||
|
self.assertEqual(codecs.compose_surrogate_pairs(s), r)
|
||||||
|
|
||||||
|
|
||||||
class StreamReaderTest(unittest.TestCase):
|
class StreamReaderTest(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue