gh-115952: Fix a potential virtual memory allocation denial of service in pickle (GH-119204)

Loading a small data which does not even involve arbitrary code execution
could consume arbitrary large amount of memory. There were three issues:

* PUT and LONG_BINPUT with large argument (the C implementation only).
  Since the memo is implemented in C as a continuous dynamic array, a single
  opcode can cause its resizing to arbitrary size. Now the sparsity of
  memo indices is limited.
* BINBYTES, BINBYTES8 and BYTEARRAY8 with large argument.  They allocated
  the bytes or bytearray object of the specified size before reading into
  it.  Now they read very large data by chunks.
* BINSTRING, BINUNICODE, LONG4, BINUNICODE8 and FRAME with large
  argument.  They read the whole data by calling the read() method of
  the underlying file object, which usually allocates the bytes object of
  the specified size before reading into it.  Now they read very large data
  by chunks.

Also add comprehensive benchmark suite to measure performance and memory
impact of chunked reading optimization in PR #119204.

Features:
- Normal mode: benchmarks legitimate pickles (time/memory metrics)
- Antagonistic mode: tests malicious pickles (DoS protection)
- Baseline comparison: side-by-side comparison of two Python builds
- Support for truncated data and sparse memo attack vectors

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Co-authored-by: Gregory P. Smith <greg@krypto.org>
This commit is contained in:
Serhiy Storchaka 2025-12-05 19:17:01 +02:00 committed by GitHub
parent 4085ff7b32
commit 59f247e43b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 1767 additions and 177 deletions

View file

@ -74,6 +74,15 @@ def count_opcode(code, pickle):
def identity(x):
return x
def itersize(start, stop):
# Produce geometrical increasing sequence from start to stop
# (inclusively) for tests.
size = start
while size < stop:
yield size
size <<= 1
yield stop
class UnseekableIO(io.BytesIO):
def peek(self, *args):
@ -853,9 +862,8 @@ def assert_is_copy(self, obj, objcopy, msg=None):
self.assertEqual(getattr(obj, slot, None),
getattr(objcopy, slot, None), msg=msg)
def check_unpickling_error(self, errors, data):
with self.subTest(data=data), \
self.assertRaises(errors):
def check_unpickling_error_strict(self, errors, data):
with self.assertRaises(errors):
try:
self.loads(data)
except BaseException as exc:
@ -864,6 +872,10 @@ def check_unpickling_error(self, errors, data):
(data, exc.__class__.__name__, exc))
raise
def check_unpickling_error(self, errors, data):
with self.subTest(data=data):
self.check_unpickling_error_strict(errors, data)
def test_load_from_data0(self):
self.assert_is_copy(self._testdata, self.loads(DATA0))
@ -1150,6 +1162,155 @@ def test_negative_32b_binput(self):
dumped = b'\x80\x03X\x01\x00\x00\x00ar\xff\xff\xff\xff.'
self.check_unpickling_error(ValueError, dumped)
def test_too_large_put(self):
# Test that PUT with large id does not cause allocation of
# too large memo table. The C implementation uses a dict-based memo
# for sparse indices (when idx > memo_len * 2) instead of allocating
# a massive array. This test verifies large sparse indices work without
# causing memory exhaustion.
#
# The following simple pickle creates an empty list, memoizes it
# using a large index, then loads it back on the stack, builds
# a tuple containing 2 identical empty lists and returns it.
data = lambda n: (b'((lp' + str(n).encode() + b'\n' +
b'g' + str(n).encode() + b'\nt.')
# 0: ( MARK
# 1: ( MARK
# 2: l LIST (MARK at 1)
# 3: p PUT 1000000000000
# 18: g GET 1000000000000
# 33: t TUPLE (MARK at 0)
# 34: . STOP
for idx in [10**6, 10**9, 10**12]:
if idx > sys.maxsize:
continue
self.assertEqual(self.loads(data(idx)), ([],)*2)
def test_too_large_long_binput(self):
# Test that LONG_BINPUT with large id does not cause allocation of
# too large memo table. The C implementation uses a dict-based memo
# for sparse indices (when idx > memo_len * 2) instead of allocating
# a massive array. This test verifies large sparse indices work without
# causing memory exhaustion.
#
# The following simple pickle creates an empty list, memoizes it
# using a large index, then loads it back on the stack, builds
# a tuple containing 2 identical empty lists and returns it.
data = lambda n: (b'(]r' + struct.pack('<I', n) +
b'j' + struct.pack('<I', n) + b't.')
# 0: ( MARK
# 1: ] EMPTY_LIST
# 2: r LONG_BINPUT 4294967295
# 7: j LONG_BINGET 4294967295
# 12: t TUPLE (MARK at 0)
# 13: . STOP
for idx in itersize(1 << 20, min(sys.maxsize, (1 << 32) - 1)):
self.assertEqual(self.loads(data(idx)), ([],)*2)
def _test_truncated_data(self, dumped, expected_error=None):
# Test that instructions to read large data without providing
# such amount of data do not cause large memory usage.
if expected_error is None:
expected_error = self.truncated_data_error
# BytesIO
with self.assertRaisesRegex(*expected_error):
self.loads(dumped)
if hasattr(self, 'unpickler'):
try:
with open(TESTFN, 'wb') as f:
f.write(dumped)
# buffered file
with open(TESTFN, 'rb') as f:
u = self.unpickler(f)
with self.assertRaisesRegex(*expected_error):
u.load()
# unbuffered file
with open(TESTFN, 'rb', buffering=0) as f:
u = self.unpickler(f)
with self.assertRaisesRegex(*expected_error):
u.load()
finally:
os_helper.unlink(TESTFN)
def test_truncated_large_binstring(self):
data = lambda size: b'T' + struct.pack('<I', size) + b'.' * 5
# 0: T BINSTRING '....'
# 9: . STOP
self.assertEqual(self.loads(data(4)), '....') # self-testing
for size in itersize(1 << 10, min(sys.maxsize - 5, (1 << 31) - 1)):
self._test_truncated_data(data(size))
self._test_truncated_data(data(1 << 31),
(pickle.UnpicklingError, 'truncated|exceeds|negative byte count'))
def test_truncated_large_binunicode(self):
data = lambda size: b'X' + struct.pack('<I', size) + b'.' * 5
# 0: X BINUNICODE '....'
# 9: . STOP
self.assertEqual(self.loads(data(4)), '....') # self-testing
for size in itersize(1 << 10, min(sys.maxsize - 5, (1 << 32) - 1)):
self._test_truncated_data(data(size))
def test_truncated_large_binbytes(self):
data = lambda size: b'B' + struct.pack('<I', size) + b'.' * 5
# 0: B BINBYTES b'....'
# 9: . STOP
self.assertEqual(self.loads(data(4)), b'....') # self-testing
for size in itersize(1 << 10, min(sys.maxsize, 1 << 31)):
self._test_truncated_data(data(size))
def test_truncated_large_long4(self):
data = lambda size: b'\x8b' + struct.pack('<I', size) + b'.' * 5
# 0: \x8b LONG4 0x2e2e2e2e
# 9: . STOP
self.assertEqual(self.loads(data(4)), 0x2e2e2e2e) # self-testing
for size in itersize(1 << 10, min(sys.maxsize - 5, (1 << 31) - 1)):
self._test_truncated_data(data(size))
self._test_truncated_data(data(1 << 31),
(pickle.UnpicklingError, 'LONG pickle has negative byte count'))
def test_truncated_large_frame(self):
data = lambda size: b'\x95' + struct.pack('<Q', size) + b'N.'
# 0: \x95 FRAME 2
# 9: N NONE
# 10: . STOP
self.assertIsNone(self.loads(data(2))) # self-testing
for size in itersize(1 << 10, sys.maxsize - 9):
self._test_truncated_data(data(size))
if sys.maxsize + 1 < 1 << 64:
self._test_truncated_data(data(sys.maxsize + 1),
((OverflowError, ValueError),
'FRAME length exceeds|frame size > sys.maxsize'))
def test_truncated_large_binunicode8(self):
data = lambda size: b'\x8d' + struct.pack('<Q', size) + b'.' * 5
# 0: \x8d BINUNICODE8 '....'
# 13: . STOP
self.assertEqual(self.loads(data(4)), '....') # self-testing
for size in itersize(1 << 10, sys.maxsize - 9):
self._test_truncated_data(data(size))
if sys.maxsize + 1 < 1 << 64:
self._test_truncated_data(data(sys.maxsize + 1), self.size_overflow_error)
def test_truncated_large_binbytes8(self):
data = lambda size: b'\x8e' + struct.pack('<Q', size) + b'.' * 5
# 0: \x8e BINBYTES8 b'....'
# 13: . STOP
self.assertEqual(self.loads(data(4)), b'....') # self-testing
for size in itersize(1 << 10, sys.maxsize):
self._test_truncated_data(data(size))
if sys.maxsize + 1 < 1 << 64:
self._test_truncated_data(data(sys.maxsize + 1), self.size_overflow_error)
def test_truncated_large_bytearray8(self):
data = lambda size: b'\x96' + struct.pack('<Q', size) + b'.' * 5
# 0: \x96 BYTEARRAY8 bytearray(b'....')
# 13: . STOP
self.assertEqual(self.loads(data(4)), bytearray(b'....')) # self-testing
for size in itersize(1 << 10, sys.maxsize):
self._test_truncated_data(data(size))
if sys.maxsize + 1 < 1 << 64:
self._test_truncated_data(data(sys.maxsize + 1), self.size_overflow_error)
def test_badly_escaped_string(self):
self.check_unpickling_error(ValueError, b"S'\\'\n.")