diff --git a/Lib/test/test_zipfile/test_core.py b/Lib/test/test_zipfile/test_core.py index 6cd0d0760bc..43e17337a7a 100644 --- a/Lib/test/test_zipfile/test_core.py +++ b/Lib/test/test_zipfile/test_core.py @@ -2404,6 +2404,405 @@ class LzmaRepackTests(AbstractRepackTests, unittest.TestCase): class ZstdRepackTests(AbstractRepackTests, unittest.TestCase): compression = zipfile.ZIP_ZSTANDARD +class ZipRepackerTests(unittest.TestCase): + def test_iter_scan_signature(self): + bytes_ = b'sig__sig__sig__sig' + ln = len(bytes_) + fp = io.BytesIO(bytes_) + repacker = zipfile._ZipRepacker() + + # basic + self.assertEqual( + list(repacker._iter_scan_signature(fp, b'sig', 0, ln)), + [0, 5, 10, 15], + ) + + # start_offset + self.assertEqual( + list(repacker._iter_scan_signature(fp, b'sig', 1, ln)), + [5, 10, 15], + ) + self.assertEqual( + list(repacker._iter_scan_signature(fp, b'sig', 6, ln)), + [10, 15], + ) + self.assertEqual( + list(repacker._iter_scan_signature(fp, b'sig', 16, ln)), + [], + ) + + # end_offset + self.assertEqual( + list(repacker._iter_scan_signature(fp, b'sig', 0, ln - 1)), + [0, 5, 10], + ) + self.assertEqual( + list(repacker._iter_scan_signature(fp, b'sig', 0, ln - 6)), + [0, 5], + ) + + # chunk_size + self.assertEqual( + list(repacker._iter_scan_signature(fp, b'sig', 0, ln, 3)), + [0, 5, 10, 15], + ) + self.assertEqual( + list(repacker._iter_scan_signature(fp, b'sig', 0, ln, 1)), + [0, 5, 10, 15], + ) + + def test_scan_data_descriptor(self): + import zlib + repacker = zipfile._ZipRepacker() + + # basic + bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False), + (zlib.crc32(b'dummy'), 5, 5, 16), + ) + + # return None if no signature + bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False), + None, + ) + + # return None if not unpackable + bytes_ = b'PK\x07\x08' + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False), + None, + ) + + # return None if compressed size not match + bytes_ = b'dummPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False), + None, + ) + + # zip64 + bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), True), + (zlib.crc32(b'dummy'), 5, 5, 24), + ) + + # offset + bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 1, len(bytes_), False), + None, + ) + + bytes_ = b'123dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_), False), + None, + ) + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 3, len(bytes_), False), + (zlib.crc32(b'dummy'), 5, 5, 16), + ) + + # end_offset + bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_) - 1, False), + None, + ) + + bytes_ = b'dummyPK\x07\x08\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00123' + self.assertEqual( + repacker._scan_data_descriptor(io.BytesIO(bytes_), 0, len(bytes_) - 3, False), + (zlib.crc32(b'dummy'), 5, 5, 16), + ) + + def test_scan_data_descriptor_no_sig(self): + import zlib + repacker = zipfile._ZipRepacker() + + # basic + bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False), + (zlib.crc32(b'dummy'), 5, 5, 12), + ) + + # return None if compressed size not match + bytes_ = b'dumm\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False), + None, + ) + + # zip64 + bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), True), + (zlib.crc32(b'dummy'), 5, 5, 20), + ) + + # offset + bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 1, len(bytes_), False), + None, + ) + + bytes_ = b'123dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False), + None, + ) + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 3, len(bytes_), False), + (zlib.crc32(b'dummy'), 5, 5, 12), + ) + + # end_offset + bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_) - 1, False), + None, + ) + + bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00123' + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_) - 3, False), + (zlib.crc32(b'dummy'), 5, 5, 12), + ) + + # chunk_size + bytes_ = b'dummy\x3f\xf2\xf4\x4f\x05\x00\x00\x00\x05\x00\x00\x00' + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False, 12), + (zlib.crc32(b'dummy'), 5, 5, 12), + ) + self.assertEqual( + repacker._scan_data_descriptor_no_sig(io.BytesIO(bytes_), 0, len(bytes_), False, 1), + (zlib.crc32(b'dummy'), 5, 5, 12), + ) + + def test_scan_data_descriptor_no_sig_by_decompression(self): + import zlib + import compression.zstd + repacker = zipfile._ZipRepacker() + + for method in ( + zipfile.ZIP_DEFLATED, + zipfile.ZIP_BZIP2, + zipfile.ZIP_ZSTANDARD, + ): + compressor = zipfile._get_compressor(method) + with self.subTest(method=method, compressor=compressor): + comp_bytes = compressor.compress(b'dummy') + comp_bytes += compressor.flush() + comp_len = len(comp_bytes) + + # basic + bytes_ = comp_bytes + b'\x3f\xf2\xf4\x4f' + struct.pack('