Add tests for _ZipRepacker

This commit is contained in:
Danny Lin 2025-06-01 09:20:42 +08:00
parent 1d5ec61337
commit db9d0d6199

View file

@ -2404,6 +2404,405 @@ class LzmaRepackTests(AbstractRepackTests, unittest.TestCase):
class ZstdRepackTests(AbstractRepackTests, unittest.TestCase):
compression = zipfile.ZIP_ZSTANDARD
class ZipRepackerTests(unittest.TestCase):
def test_iter_scan_signature(self):
bytes_ = b'sig__sig__sig__sig'
ln = len(bytes_)
fp = io.BytesIO(bytes_)
repacker = zipfile._ZipRepacker()
# basic
self.assertEqual(
list(repacker._iter_scan_signature(fp, b'sig', 0, ln)),
[0, 5, 10, 15],
)
# start_offset
self.assertEqual(
list(repacker._iter_scan_signature(fp, b'sig', 1, ln)),
[5, 10, 15],
)
self.assertEqual(
list(repacker._iter_scan_signature(fp, b'sig', 6, ln)),
[10, 15],
)
self.assertEqual(
list(repacker._iter_scan_signature(fp, b'sig', 16, ln)),
[],
)
# end_offset
self.assertEqual(
list(repacker._iter_scan_signature(fp, b'sig', 0, ln - 1)),
[0, 5, 10],
)
self.assertEqual(
list(repacker._iter_scan_signature(fp, b'sig', 0, ln - 6)),
[0, 5],
)
# chunk_size
self.assertEqual(
list(repacker._iter_scan_signature(fp, b'sig', 0, ln, 3)),
[0, 5, 10, 15],
)
self.assertEqual(
list(repacker._iter_scan_signature(fp, b'sig', 0, ln, 1)),
[0, 5, 10, 15],
)
def test_scan_data_descriptor(self):
import zlib
repacker = zipfile._ZipRepacker()
# basic
bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False),
(zlib.crc32(b'dummy'), 5, 5, 16),
)
# return None if no signature
bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False),
None,
)
# return None if not unpackable
bytes_ = b'PK\x07\x08'
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False),
None,
)
# return None if compressed size not match
bytes_ = b'dummPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False),
None,
)
# zip64
bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), True),
(zlib.crc32(b'dummy'), 5, 5, 24),
)
# offset
bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 1, len(bytes_), False),
None,
)
bytes_ = b'123dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False),
None,
)
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 3, len(bytes_), False),
(zlib.crc32(b'dummy'), 5, 5, 16),
)
# end_offset
bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_) - 1, False),
None,
)
bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00123'
self.assertEqual(
repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_) - 3, False),
(zlib.crc32(b'dummy'), 5, 5, 16),
)
def test_scan_data_descriptor_no_sig(self):
import zlib
repacker = zipfile._ZipRepacker()
# basic
bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False),
(zlib.crc32(b'dummy'), 5, 5, 12),
)
# return None if compressed size not match
bytes_ = b'dumm\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False),
None,
)
# zip64
bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), True),
(zlib.crc32(b'dummy'), 5, 5, 20),
)
# offset
bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 1, len(bytes_), False),
None,
)
bytes_ = b'123dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False),
None,
)
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 3, len(bytes_), False),
(zlib.crc32(b'dummy'), 5, 5, 12),
)
# end_offset
bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_) - 1, False),
None,
)
bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00123'
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_) - 3, False),
(zlib.crc32(b'dummy'), 5, 5, 12),
)
# chunk_size
bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False, 12),
(zlib.crc32(b'dummy'), 5, 5, 12),
)
self.assertEqual(
repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False, 1),
(zlib.crc32(b'dummy'), 5, 5, 12),
)
def test_scan_data_descriptor_no_sig_by_decompression(self):
import zlib
import compression.zstd
repacker = zipfile._ZipRepacker()
for method in (
zipfile.ZIP_DEFLATED,
zipfile.ZIP_BZIP2,
zipfile.ZIP_ZSTANDARD,
):
compressor = zipfile._get_compressor(method)
with self.subTest(method=method, compressor=compressor):
comp_bytes = compressor.compress(b'dummy')
comp_bytes += compressor.flush()
comp_len = len(comp_bytes)
# basic
bytes_ = comp_bytes + b'\x3f\xf2\xf4\x4f' + struct.pack('<L', comp_len) + b'\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 0, len(bytes_), False, method),
(zlib.crc32(b'dummy'), comp_len, 5, 12),
)
# return None if insufficient data length
bytes_ = b'\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 0, len(bytes_) - 1, False, method),
None,
)
bytes_ = b'\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 0, len(bytes_) + 1, False, method),
None,
)
# return None if compressed size not match
bytes_ = comp_bytes + b'\x3f\xf2\xf4\x4f' + struct.pack('<L', comp_len - 1) + b'\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 0, len(bytes_), False, method),
None,
)
# zip64
bytes_ = comp_bytes + b'\x3f\xf2\xf4\x4f' + struct.pack('<Q', comp_len) + b'\x05\x00\x00\x00\x00\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 0, len(bytes_), True, method),
(zlib.crc32(b'dummy'), comp_len, 5, 20),
)
# offset
bytes_ = comp_bytes + b'\x3f\xf2\xf4\x4f' + struct.pack('<L', comp_len) + b'\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 1, len(bytes_), False, method),
None,
)
bytes_ = b'123' + comp_bytes + b'\x3f\xf2\xf4\x4f' + struct.pack('<L', comp_len) + b'\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 3, len(bytes_), False, method),
(zlib.crc32(b'dummy'), comp_len, 5, 12),
)
# end_offset
bytes_ = comp_bytes + b'\x3f\xf2\xf4\x4f' + struct.pack('<L', comp_len) + b'\x05\x00\x00\x00'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 0, len(bytes_) - 2, False, method),
None,
)
bytes_ = comp_bytes + b'\x3f\xf2\xf4\x4f' + struct.pack('<L', comp_len) + b'\x05\x00\x00\x00123'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 0, len(bytes_) - 2, False, method),
(zlib.crc32(b'dummy'), comp_len, 5, 12),
)
def test_scan_data_descriptor_no_sig_by_decompression_invalid(self):
repacker = zipfile._ZipRepacker()
for method in (
zipfile.ZIP_STORED,
zipfile.ZIP_LZMA,
1024, # simulate an unknown method
):
with self.subTest(method=method):
bytes_ = b'dummy'
self.assertEqual(
repacker._scan_data_descriptor_no_sig_by_decompression(
io.BytesIO(bytes_), 0, len(bytes_), False, method),
False,
)
def test_trace_compressed_block_end(self):
import zlib
import compression.zstd
repacker = zipfile._ZipRepacker()
for method, exc_cls in (
(zipfile.ZIP_DEFLATED, zlib.error),
(zipfile.ZIP_BZIP2, OSError),
(zipfile.ZIP_ZSTANDARD, compression.zstd.ZstdError),
):
compressor = zipfile._get_compressor(method)
with self.subTest(method=method, compressor=compressor):
comp_bytes = compressor.compress(b'dummy')
comp_bytes += compressor.flush()
comp_len = len(comp_bytes)
# basic
decompressor = zipfile._get_decompressor(method)
bytes_ = comp_bytes
self.assertEqual(
repacker._trace_compressed_block_end(io.BytesIO(bytes_), 0, len(bytes_), decompressor),
comp_len,
)
# offset
decompressor = zipfile._get_decompressor(method)
bytes_ = comp_bytes
with self.assertRaises(exc_cls):
repacker._trace_compressed_block_end(io.BytesIO(bytes_), 1, len(bytes_), decompressor)
decompressor = zipfile._get_decompressor(method)
bytes_ = b'123' + comp_bytes
with self.assertRaises(exc_cls):
repacker._trace_compressed_block_end(io.BytesIO(bytes_), 0, len(bytes_), decompressor)
decompressor = zipfile._get_decompressor(method)
bytes_ = b'123' + comp_bytes
self.assertEqual(
repacker._trace_compressed_block_end(io.BytesIO(bytes_), 3, len(bytes_), decompressor),
comp_len + 3,
)
# end_offset
decompressor = zipfile._get_decompressor(method)
bytes_ = comp_bytes
with self.assertRaises(EOFError):
repacker._trace_compressed_block_end(io.BytesIO(bytes_), 0, len(bytes_) - 1, decompressor)
decompressor = zipfile._get_decompressor(method)
bytes_ = comp_bytes + b'123'
self.assertEqual(
repacker._trace_compressed_block_end(io.BytesIO(bytes_), 0, len(bytes_) - 3, decompressor),
comp_len,
)
# chunk_size
decompressor = zipfile._get_decompressor(method)
bytes_ = comp_bytes
self.assertEqual(
repacker._trace_compressed_block_end(io.BytesIO(bytes_), 0, len(bytes_), decompressor, 16),
comp_len,
)
decompressor = zipfile._get_decompressor(method)
bytes_ = comp_bytes
self.assertEqual(
repacker._trace_compressed_block_end(io.BytesIO(bytes_), 0, len(bytes_), decompressor, 1),
comp_len,
)
def test_copy_bytes(self):
repacker = zipfile._ZipRepacker()
fp = io.BytesIO(b'abc123')
repacker._copy_bytes(fp, 0, 3, 3)
self.assertEqual(fp.getvalue(), b'abcabc')
fp = io.BytesIO(b'abc123')
repacker._copy_bytes(fp, 3, 0, 3)
self.assertEqual(fp.getvalue(), b'123123')
fp = io.BytesIO(b'abc123')
repacker._copy_bytes(fp, 0, 4, 3)
self.assertEqual(fp.getvalue(), b'abc1abc')
fp = io.BytesIO(b'abc123')
repacker._copy_bytes(fp, 0, 10, 3)
self.assertEqual(fp.getvalue(), b'abc123\x00\x00\x00\x00abc')
fp = io.BytesIO(b'abc123')
repacker._copy_bytes(fp, 0, 3, 2)
self.assertEqual(fp.getvalue(), b'abcab3')
# check chunk_size
repacker = zipfile._ZipRepacker(chunk_size=6)
fp = io.BytesIO(b'abcdef123456')
with mock.patch.object(fp, 'read', wraps=fp.read) as m_read:
repacker._copy_bytes(fp, 0, 12, 6)
self.assertEqual(fp.getvalue(), b'abcdef123456abcdef')
m_read.assert_called_once_with(6)
repacker = zipfile._ZipRepacker(chunk_size=3)
fp = io.BytesIO(b'abcdef123456')
with mock.patch.object(fp, 'read', wraps=fp.read) as m_read:
repacker._copy_bytes(fp, 0, 12, 6)
self.assertEqual(fp.getvalue(), b'abcdef123456abcdef')
self.assertEqual(m_read.mock_calls, [mock.call(3), mock.call(3)])
repacker = zipfile._ZipRepacker(chunk_size=1)
fp = io.BytesIO(b'abcdef123456')
with mock.patch.object(fp, 'read', wraps=fp.read) as m_read:
repacker._copy_bytes(fp, 0, 12, 6)
self.assertEqual(fp.getvalue(), b'abcdef123456abcdef')
self.assertEqual(m_read.mock_calls, [
mock.call(1), mock.call(1), mock.call(1), mock.call(1), mock.call(1), mock.call(1)])
class PyZipFileTests(unittest.TestCase):
def assertCompiledIn(self, name, namelist):