mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
Introduce _scan_data_descriptor_no_sig_by_decompression
This commit is contained in:
parent
31c4c936c6
commit
f8fade17a0
2 changed files with 102 additions and 2 deletions
|
|
@ -2008,6 +2008,9 @@ def test_repack_data_descriptor_no_sig(self):
|
||||||
|
|
||||||
def test_repack_data_descriptor_no_sig_strict(self):
|
def test_repack_data_descriptor_no_sig_strict(self):
|
||||||
"""Should skip data descriptor without signature when `strict_descriptor` is set."""
|
"""Should skip data descriptor without signature when `strict_descriptor` is set."""
|
||||||
|
if self.compression not in (zipfile.ZIP_STORED, zipfile.ZIP_LZMA):
|
||||||
|
self.skipTest('require unsupported decompression method')
|
||||||
|
|
||||||
for ii in ([0], [0, 1]):
|
for ii in ([0], [0, 1]):
|
||||||
with self.subTest(remove=ii):
|
with self.subTest(remove=ii):
|
||||||
# calculate the expected results
|
# calculate the expected results
|
||||||
|
|
@ -2046,6 +2049,47 @@ def test_repack_data_descriptor_no_sig_strict(self):
|
||||||
with zipfile.ZipFile(TESTFN) as zh:
|
with zipfile.ZipFile(TESTFN) as zh:
|
||||||
self.assertIsNone(zh.testzip())
|
self.assertIsNone(zh.testzip())
|
||||||
|
|
||||||
|
def test_repack_data_descriptor_no_sig_strict_by_decompressoin(self):
|
||||||
|
"""Should correctly handle file entries using data descriptor without signature
|
||||||
|
through decompression."""
|
||||||
|
if self.compression in (zipfile.ZIP_STORED, zipfile.ZIP_LZMA):
|
||||||
|
self.skipTest('require supported decompression method')
|
||||||
|
|
||||||
|
for ii in ([0], [0, 1]):
|
||||||
|
with self.subTest(remove=ii):
|
||||||
|
# calculate the expected results
|
||||||
|
test_files = [data for j, data in enumerate(self.test_files) if j not in ii]
|
||||||
|
with open(TESTFN, 'wb') as fh:
|
||||||
|
with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig):
|
||||||
|
expected_zinfos = self._prepare_zip_from_test_files(Unseekable(fh), test_files)
|
||||||
|
expected_size = os.path.getsize(TESTFN)
|
||||||
|
|
||||||
|
# do the removal and check the result
|
||||||
|
with open(TESTFN, 'wb') as fh:
|
||||||
|
with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig):
|
||||||
|
zinfos = self._prepare_zip_from_test_files(Unseekable(fh), self.test_files)
|
||||||
|
with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh:
|
||||||
|
# make sure data descriptor bit is really set (by making zipfile unseekable)
|
||||||
|
for zi in zh.infolist():
|
||||||
|
self.assertTrue(zi.flag_bits & 8, f'data descriptor flag not set: {zi.filename}')
|
||||||
|
|
||||||
|
for i in ii:
|
||||||
|
zh.remove(self.test_files[i][0])
|
||||||
|
zh.repack(strict_descriptor=True)
|
||||||
|
|
||||||
|
# check infolist
|
||||||
|
self.assertEqual(
|
||||||
|
[ComparableZipInfo(zi) for zi in zh.infolist()],
|
||||||
|
expected_zinfos,
|
||||||
|
)
|
||||||
|
|
||||||
|
# check file size
|
||||||
|
self.assertEqual(os.path.getsize(TESTFN), expected_size)
|
||||||
|
|
||||||
|
# make sure the zip file is still valid
|
||||||
|
with zipfile.ZipFile(TESTFN) as zh:
|
||||||
|
self.assertIsNone(zh.testzip())
|
||||||
|
|
||||||
def test_repack_data_descriptor_no_sig_and_zip64(self):
|
def test_repack_data_descriptor_no_sig_and_zip64(self):
|
||||||
"""Should correctly handle file entries using data descriptor without signature and zip64."""
|
"""Should correctly handle file entries using data descriptor without signature and zip64."""
|
||||||
for ii in ([0], [0, 1], [1], [2]):
|
for ii in ([0], [0, 1], [1], [2]):
|
||||||
|
|
|
||||||
|
|
@ -1628,8 +1628,14 @@ def _validate_local_file_entry(self, fp, offset, end_offset):
|
||||||
zip64 = fheader[_FH_UNCOMPRESSED_SIZE] == 0xffffffff
|
zip64 = fheader[_FH_UNCOMPRESSED_SIZE] == 0xffffffff
|
||||||
|
|
||||||
dd = self._scan_data_descriptor(fp, pos, end_offset, zip64)
|
dd = self._scan_data_descriptor(fp, pos, end_offset, zip64)
|
||||||
if dd is None and not self.strict_descriptor:
|
if dd is None:
|
||||||
dd = self._scan_data_descriptor_no_sig(fp, pos, end_offset, zip64)
|
dd = self._scan_data_descriptor_no_sig_by_decompression(
|
||||||
|
fp, pos, end_offset, zip64, fheader[_FH_COMPRESSION_METHOD])
|
||||||
|
if dd is False:
|
||||||
|
if not self.strict_descriptor:
|
||||||
|
dd = self._scan_data_descriptor_no_sig(fp, pos, end_offset, zip64)
|
||||||
|
else:
|
||||||
|
dd = None
|
||||||
if dd is None:
|
if dd is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
@ -1705,6 +1711,56 @@ def _scan_data_descriptor_no_sig(self, fp, offset, end_offset, zip64, chunk_size
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _scan_data_descriptor_no_sig_by_decompression(self, fp, offset, end_offset, zip64, method):
|
||||||
|
dd_fmt = '<LQQ' if zip64 else '<LLL'
|
||||||
|
dd_size = struct.calcsize(dd_fmt)
|
||||||
|
|
||||||
|
if offset + dd_size > end_offset:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
decompressor = _get_decompressor(method)
|
||||||
|
except NotImplementedError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
if decompressor is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Current LZMADecompressor is unreliable since it's `.eof` is usually
|
||||||
|
# not set as expected.
|
||||||
|
if isinstance(decompressor, LZMADecompressor):
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
pos = self._find_compression_end_offset(fp, offset, end_offset - dd_size, decompressor)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
fp.seek(pos)
|
||||||
|
dd = fp.read(dd_size)
|
||||||
|
crc, compress_size, file_size = struct.unpack(dd_fmt, dd)
|
||||||
|
if pos - offset != compress_size:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return crc, compress_size, file_size, dd_size
|
||||||
|
|
||||||
|
def _find_compression_end_offset(self, fp, offset, end_offset, decompressor, chunk_size=4096):
|
||||||
|
fp.seek(offset)
|
||||||
|
read_size = 0
|
||||||
|
while True:
|
||||||
|
chunk = fp.read(min(chunk_size, end_offset - offset - read_size))
|
||||||
|
if not chunk:
|
||||||
|
raise EOFError('Unexpected EOF while decompressing')
|
||||||
|
|
||||||
|
# may raise on error
|
||||||
|
decompressor.decompress(chunk)
|
||||||
|
|
||||||
|
read_size += len(chunk)
|
||||||
|
|
||||||
|
if decompressor.eof:
|
||||||
|
unused_len = len(decompressor.unused_data)
|
||||||
|
return offset + read_size - unused_len
|
||||||
|
|
||||||
def _calc_local_file_entry_size(self, fp, zinfo):
|
def _calc_local_file_entry_size(self, fp, zinfo):
|
||||||
fp.seek(zinfo.header_offset)
|
fp.seek(zinfo.header_offset)
|
||||||
fheader = self._read_local_file_header(fp)
|
fheader = self._read_local_file_header(fp)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue