diff --git a/Lib/test/test_zipfile/test_core.py b/Lib/test/test_zipfile/test_core.py index 26081a86015..8a834f53f67 100644 --- a/Lib/test/test_zipfile/test_core.py +++ b/Lib/test/test_zipfile/test_core.py @@ -2008,6 +2008,9 @@ def test_repack_data_descriptor_no_sig(self): def test_repack_data_descriptor_no_sig_strict(self): """Should skip data descriptor without signature when `strict_descriptor` is set.""" + if self.compression not in (zipfile.ZIP_STORED, zipfile.ZIP_LZMA): + self.skipTest('require unsupported decompression method') + for ii in ([0], [0, 1]): with self.subTest(remove=ii): # calculate the expected results @@ -2046,6 +2049,47 @@ def test_repack_data_descriptor_no_sig_strict(self): with zipfile.ZipFile(TESTFN) as zh: self.assertIsNone(zh.testzip()) + def test_repack_data_descriptor_no_sig_strict_by_decompressoin(self): + """Should correctly handle file entries using data descriptor without signature + through decompression.""" + if self.compression in (zipfile.ZIP_STORED, zipfile.ZIP_LZMA): + self.skipTest('require supported decompression method') + + for ii in ([0], [0, 1]): + with self.subTest(remove=ii): + # calculate the expected results + test_files = [data for j, data in enumerate(self.test_files) if j not in ii] + with open(TESTFN, 'wb') as fh: + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): + expected_zinfos = self._prepare_zip_from_test_files(Unseekable(fh), test_files) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + with open(TESTFN, 'wb') as fh: + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): + zinfos = self._prepare_zip_from_test_files(Unseekable(fh), self.test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + # make sure data descriptor bit is really set (by making zipfile unseekable) + for zi in zh.infolist(): + self.assertTrue(zi.flag_bits & 8, f'data descriptor flag not set: {zi.filename}') + + for i in ii: + zh.remove(self.test_files[i][0]) + zh.repack(strict_descriptor=True) + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + # make sure the zip file is still valid + with zipfile.ZipFile(TESTFN) as zh: + self.assertIsNone(zh.testzip()) + def test_repack_data_descriptor_no_sig_and_zip64(self): """Should correctly handle file entries using data descriptor without signature and zip64.""" for ii in ([0], [0, 1], [1], [2]): diff --git a/Lib/zipfile/__init__.py b/Lib/zipfile/__init__.py index 9c645ac731d..5098799a9ab 100644 --- a/Lib/zipfile/__init__.py +++ b/Lib/zipfile/__init__.py @@ -1628,8 +1628,14 @@ def _validate_local_file_entry(self, fp, offset, end_offset): zip64 = fheader[_FH_UNCOMPRESSED_SIZE] == 0xffffffff dd = self._scan_data_descriptor(fp, pos, end_offset, zip64) - if dd is None and not self.strict_descriptor: - dd = self._scan_data_descriptor_no_sig(fp, pos, end_offset, zip64) + if dd is None: + dd = self._scan_data_descriptor_no_sig_by_decompression( + fp, pos, end_offset, zip64, fheader[_FH_COMPRESSION_METHOD]) + if dd is False: + if not self.strict_descriptor: + dd = self._scan_data_descriptor_no_sig(fp, pos, end_offset, zip64) + else: + dd = None if dd is None: return None @@ -1705,6 +1711,56 @@ def _scan_data_descriptor_no_sig(self, fp, offset, end_offset, zip64, chunk_size return None + def _scan_data_descriptor_no_sig_by_decompression(self, fp, offset, end_offset, zip64, method): + dd_fmt = ' end_offset: + return False + + try: + decompressor = _get_decompressor(method) + except NotImplementedError: + return False + + if decompressor is None: + return False + + # Current LZMADecompressor is unreliable since it's `.eof` is usually + # not set as expected. + if isinstance(decompressor, LZMADecompressor): + return False + + try: + pos = self._find_compression_end_offset(fp, offset, end_offset - dd_size, decompressor) + except Exception: + return None + + fp.seek(pos) + dd = fp.read(dd_size) + crc, compress_size, file_size = struct.unpack(dd_fmt, dd) + if pos - offset != compress_size: + return None + + return crc, compress_size, file_size, dd_size + + def _find_compression_end_offset(self, fp, offset, end_offset, decompressor, chunk_size=4096): + fp.seek(offset) + read_size = 0 + while True: + chunk = fp.read(min(chunk_size, end_offset - offset - read_size)) + if not chunk: + raise EOFError('Unexpected EOF while decompressing') + + # may raise on error + decompressor.decompress(chunk) + + read_size += len(chunk) + + if decompressor.eof: + unused_len = len(decompressor.unused_data) + return offset + read_size - unused_len + def _calc_local_file_entry_size(self, fp, zinfo): fp.seek(zinfo.header_offset) fheader = self._read_local_file_header(fp)