From 6aed859adec5134ab756520c0dc5f7feeae6cab8 Mon Sep 17 00:00:00 2001 From: Danny Lin Date: Sat, 24 May 2025 17:34:26 +0800 Subject: [PATCH] Add `remove()` and `repack()` to `ZipFile` --- Doc/library/zipfile.rst | 23 + Lib/test/test_zipfile/test_core.py | 725 +++++++++++++++++++++++++++++ Lib/test/test_zipfile64.py | 129 +++++ Lib/zipfile/__init__.py | 374 +++++++++++++++ 4 files changed, 1251 insertions(+) diff --git a/Doc/library/zipfile.rst b/Doc/library/zipfile.rst index 6a4fa67332e..047512bc88e 100644 --- a/Doc/library/zipfile.rst +++ b/Doc/library/zipfile.rst @@ -518,6 +518,29 @@ ZipFile Objects .. versionadded:: 3.11 +.. method:: ZipFile.remove(zinfo_or_arcname) + + Removes a member from the archive. *zinfo_or_arcname* is either the full + path of the member, or a :class:`ZipInfo` instance. + + The archive must be opened with mode ``'w'``, ``'x'`` or ``'a'``. + + Calling :meth:`remove` on a closed ZipFile will raise a :exc:`ValueError`. + + .. versionadded:: next + + +.. method:: ZipFile.repack() + + Repack a zip file and physically remove non-referenced file entries. + + The archive must be opened with mode ``'a'``. + + Calling :meth:`repack` on a closed ZipFile will raise a :exc:`ValueError`. + + .. versionadded:: next + + The following data attributes are also available: .. attribute:: ZipFile.filename diff --git a/Lib/test/test_zipfile/test_core.py b/Lib/test/test_zipfile/test_core.py index ada96813709..2ba2d296c44 100644 --- a/Lib/test/test_zipfile/test_core.py +++ b/Lib/test/test_zipfile/test_core.py @@ -13,6 +13,7 @@ import time import unittest import unittest.mock as mock +import warnings import zipfile @@ -1360,6 +1361,730 @@ class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): class ZstdWriterTests(AbstractWriterTests, unittest.TestCase): compression = zipfile.ZIP_ZSTANDARD + +def ComparableZipInfo(zinfo): + return (zinfo.filename, zinfo.header_offset, zinfo.compress_size, zinfo.CRC) + +_struct_pack = struct.pack + +def struct_pack_no_dd_sig(fmt, *values): + """A mock side_effect for native `struct.pack` to not generate a + signature for data descriptors.""" + # suppress BytesWarning etc. + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + if values[0] == zipfile._DD_SIGNATURE: + return _struct_pack(fmt[0:1] + fmt[2:], *values[1:]) + return _struct_pack(fmt, *values) + +class RepackHelperMixin: + """Common helpers for remove and repack.""" + def _prepare_zip_from_test_files(self, zfname, test_files, force_zip64=False): + zinfos = [] + with zipfile.ZipFile(zfname, 'w', self.compression) as zh: + for file, data in test_files: + with zh.open(file, 'w', force_zip64=force_zip64) as fh: + fh.write(data) + zinfo = zh.getinfo(file) + zinfos.append(ComparableZipInfo(zinfo)) + return zinfos + +class AbstractRemoveTests(RepackHelperMixin): + def test_remove_by_name(self): + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + for i in range(0, 3): + with self.subTest(i=i, filename=test_files[i][0]): + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zh.remove(test_files[i][0]) + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + [zi for j, zi in enumerate(zinfos) if j != i], + ) + + # check NameToInfo cache + with self.assertRaises(KeyError): + zh.getinfo(test_files[i][0]) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + def test_remove_by_zinfo(self): + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + for i in range(0, 3): + with self.subTest(i=i, filename=test_files[i][0]): + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zh.remove(zh.infolist()[i]) + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + [zi for j, zi in enumerate(zinfos) if j != i], + ) + + # check NameToInfo cache + with self.assertRaises(KeyError): + zh.getinfo(test_files[i][0]) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + def test_remove_by_name_nonexist(self): + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + with self.assertRaises(KeyError): + zh.remove('nonexist.txt') + + def test_remove_by_zinfo_nonexist(self): + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + with self.assertRaises(KeyError): + zh.remove(zipfile.ZipInfo('nonexist.txt')) + + def test_remove_by_name_duplicated(self): + test_files = [ + ('file.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file1.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + # suppress duplicated name warning + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zh.remove('file.txt') + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + [zinfos[0], zinfos[2]], + ) + + # check NameToInfo cache + self.assertEqual( + ComparableZipInfo(zh.getinfo('file.txt')), + zinfos[0], + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zh.remove('file.txt') + zh.remove('file.txt') + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + [zinfos[2]], + ) + + # check NameToInfo cache + with self.assertRaises(KeyError): + zh.getinfo('file.txt') + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + def test_remove_by_zinfo_duplicated(self): + test_files = [ + ('file.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file1.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + # suppress duplicated name warning + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zh.remove(zh.infolist()[0]) + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + [zinfos[1], zinfos[2]], + ) + + # check NameToInfo cache + self.assertEqual( + ComparableZipInfo(zh.getinfo('file.txt')), + zinfos[1], + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zh.remove(zh.infolist()[1]) + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + [zinfos[0], zinfos[2]], + ) + + # check NameToInfo cache + self.assertEqual( + ComparableZipInfo(zh.getinfo('file.txt')), + zinfos[0], + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + infolist = zh.infolist().copy() + zh.remove(infolist[0]) + zh.remove(infolist[1]) + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + [zinfos[2]], + ) + + # check NameToInfo cache + with self.assertRaises(KeyError): + zh.getinfo('file.txt') + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + def test_remove_zip64(self): + test_files = [ + ('pre.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('datafile', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ('post.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ] + + for i in range(0, 3): + with self.subTest(i=i, filename=test_files[i][0]): + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files, force_zip64=True) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + zh.remove(zh.infolist()[i]) + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + [zi for j, zi in enumerate(zinfos) if j != i], + ) + + # check NameToInfo cache + with self.assertRaises(KeyError): + zh.getinfo(test_files[i][0]) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + def test_remove_validate(self): + file = 'datafile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + + # closed: error out and do nothing + with zipfile.ZipFile(TESTFN, 'w') as zh: + zh.writestr(file, data) + with zipfile.ZipFile(TESTFN, 'a') as zh: + zh.close() + with self.assertRaises(ValueError): + zh.remove(file) + + # writing: error out and do nothing + with zipfile.ZipFile(TESTFN, 'w') as zh: + zh.writestr(file, data) + with zipfile.ZipFile(TESTFN, 'a') as zh: + with zh.open('newfile.txt', 'w') as fh: + with self.assertRaises(ValueError): + zh.remove(file) + + # mode 'r': error out and do nothing + with zipfile.ZipFile(TESTFN, 'w') as zh: + zh.writestr(file, data) + with zipfile.ZipFile(TESTFN, 'r') as zh: + with self.assertRaises(ValueError): + zh.remove(file) + +class StoredRemoveTests(AbstractRemoveTests, unittest.TestCase): + compression = zipfile.ZIP_STORED + +@requires_zlib() +class DeflateRemoveTests(AbstractRemoveTests, unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + +@requires_bz2() +class Bzip2RemoveTests(AbstractRemoveTests, unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + +@requires_lzma() +class LzmaRemoveTests(AbstractRemoveTests, unittest.TestCase): + compression = zipfile.ZIP_LZMA + +@requires_zstd() +class ZstdRemoveTests(AbstractRemoveTests, unittest.TestCase): + compression = zipfile.ZIP_ZSTANDARD + +class AbstractRepackTests(RepackHelperMixin): + def test_repack_basic(self): + """Should remove local file entries for deleted files.""" + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + ln = len(test_files) + iii = (ii for n in range(1, ln + 1) for ii in itertools.combinations(range(ln), n)) + for ii in iii: + with self.subTest(remove=ii): + # calculate the expected results + _test_files = [data for j, data in enumerate(test_files) if j not in ii] + expected_zinfos = self._prepare_zip_from_test_files(TESTFN, _test_files) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + for i in ii: + zh.remove(test_files[i][0]) + zh.repack() + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + def test_repack_bytes_before_first_file(self): + """Should preserve random bytes before the first recorded local file entry.""" + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + for ii in ([], [0], [0, 1], [0, 1, 2]): + with self.subTest(remove=ii): + # calculate the expected results + _test_files = [data for j, data in enumerate(test_files) if j not in ii] + with open(TESTFN, 'wb') as fh: + fh.write(b'dummy') + with open(TESTFN, 'r+b') as fh: + fh.seek(0, 2) + expected_zinfos = self._prepare_zip_from_test_files(fh, _test_files) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + with open(TESTFN, 'wb') as fh: + fh.write(b'dummy') + with open(TESTFN, 'r+b') as fh: + fh.seek(0, 2) + zinfos = self._prepare_zip_from_test_files(fh, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + for i in ii: + zh.remove(test_files[i][0]) + zh.repack() + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + def test_repack_magic_before_first_file(self): + """Should preserve random signature bytes not forming a valid file entry + before the first recorded local file entry.""" + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + for ii in ([], [0], [0, 1], [0, 1, 2]): + with self.subTest(remove=ii): + # calculate the expected results + _test_files = [data for j, data in enumerate(test_files) if j not in ii] + with open(TESTFN, 'wb') as fh: + fh.write(b'PK\003\004 ') + with open(TESTFN, 'r+b') as fh: + fh.seek(0, 2) + expected_zinfos = self._prepare_zip_from_test_files(fh, _test_files) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + with open(TESTFN, 'wb') as fh: + fh.write(b'PK\003\004 ') + with open(TESTFN, 'r+b') as fh: + fh.seek(0, 2) + zinfos = self._prepare_zip_from_test_files(fh, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + for i in ii: + zh.remove(test_files[i][0]) + zh.repack() + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + def test_repack_file_entry_before_first_file(self): + """Should preserve seeming valid file entries not forming consecutive + valid file entries until the first recorded local file entry. + + This may happen whan a self-extractor contains an uncompressed ZIP + library. (simulated by writing a ZIP file in this test) + """ + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + for ii in ([], [0], [0, 1], [0, 1, 2]): + with self.subTest(remove=ii): + # calculate the expected results + _test_files = [data for j, data in enumerate(test_files) if j not in ii] + with open(TESTFN, 'wb') as fh: + with zipfile.ZipFile(fh, 'w') as zh: + zh.writestr('file.txt', b'dummy') + fh.write(b' ') + with open(TESTFN, 'r+b') as fh: + fh.seek(0, 2) + expected_zinfos = self._prepare_zip_from_test_files(fh, _test_files) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + with open(TESTFN, 'wb') as fh: + with zipfile.ZipFile(fh, 'w') as zh: + zh.writestr('file.txt', b'dummy') + fh.write(b' ') + with open(TESTFN, 'r+b') as fh: + fh.seek(0, 2) + zinfos = self._prepare_zip_from_test_files(fh, test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + for i in ii: + zh.remove(test_files[i][0]) + zh.repack() + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + def test_repack_zip64(self): + """Should correctly handle file entries with zip64.""" + test_files = [ + ('pre.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('datafile', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ('post.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ] + + for ii in ([0], [0, 1], [1], [2]): + with self.subTest(remove=ii): + # calculate the expected results + _test_files = [data for j, data in enumerate(test_files) if j not in ii] + expected_zinfos = self._prepare_zip_from_test_files(TESTFN, _test_files, force_zip64=True) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + zinfos = self._prepare_zip_from_test_files(TESTFN, test_files, force_zip64=True) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + for i in ii: + zh.remove(test_files[i][0]) + zh.repack() + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + def test_repack_data_descriptor(self): + """Should correctly handle file entries using data descriptor.""" + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + for ii in ([0], [0, 1], [1], [2]): + with self.subTest(remove=ii): + # calculate the expected results + _test_files = [data for j, data in enumerate(test_files) if j not in ii] + with open(TESTFN, 'wb') as fh: + expected_zinfos = self._prepare_zip_from_test_files(Unseekable(fh), _test_files) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + with open(TESTFN, 'wb') as fh: + zinfos = self._prepare_zip_from_test_files(Unseekable(fh), test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + # make sure data descriptor bit is really set (by making zipfile unseekable) + for zi in zh.infolist(): + self.assertTrue(zi.flag_bits & 8, f'data descriptor not used: {zi.filename}') + + for i in ii: + zh.remove(test_files[i][0]) + zh.repack() + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + def test_repack_data_descriptor_and_zip64(self): + """Should correctly handle file entries using data descriptor and zip64.""" + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + for ii in ([0], [0, 1], [1], [2]): + with self.subTest(remove=ii): + # calculate the expected results + _test_files = [data for j, data in enumerate(test_files) if j not in ii] + with open(TESTFN, 'wb') as fh: + expected_zinfos = self._prepare_zip_from_test_files(Unseekable(fh), _test_files, force_zip64=True) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + with open(TESTFN, 'wb') as fh: + zinfos = self._prepare_zip_from_test_files(Unseekable(fh), test_files, force_zip64=True) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + # make sure data descriptor bit is really set (by making zipfile unseekable) + for zi in zh.infolist(): + self.assertTrue(zi.flag_bits & 8, f'data descriptor not used: {zi.filename}') + + for i in ii: + zh.remove(test_files[i][0]) + zh.repack() + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + def test_repack_data_descriptor_no_sig(self): + """Should correctly handle file entries using data descriptor without signature.""" + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + for ii in ([0], [0, 1], [1], [2]): + with self.subTest(remove=ii): + # calculate the expected results + _test_files = [data for j, data in enumerate(test_files) if j not in ii] + with open(TESTFN, 'wb') as fh: + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): + expected_zinfos = self._prepare_zip_from_test_files(Unseekable(fh), _test_files) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + with open(TESTFN, 'wb') as fh: + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): + zinfos = self._prepare_zip_from_test_files(Unseekable(fh), test_files) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + # make sure data descriptor bit is really set (by making zipfile unseekable) + for zi in zh.infolist(): + self.assertTrue(zi.flag_bits & 8, f'data descriptor flag not set: {zi.filename}') + + for i in ii: + zh.remove(test_files[i][0]) + zh.repack() + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + def test_repack_data_descriptor_no_sig_and_zip64(self): + """Should correctly handle file entries using data descriptor without signature and zip64.""" + test_files = [ + ('file0.txt', b'Lorem ipsum dolor sit amet, consectetur adipiscing elit'), + ('file1.txt', b'Duis aute irure dolor in reprehenderit in voluptate velit esse'), + ('file2.txt', b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'), + ] + + for ii in ([0], [0, 1], [1], [2]): + with self.subTest(remove=ii): + # calculate the expected results + _test_files = [data for j, data in enumerate(test_files) if j not in ii] + with open(TESTFN, 'wb') as fh: + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): + expected_zinfos = self._prepare_zip_from_test_files(Unseekable(fh), _test_files, force_zip64=True) + expected_size = os.path.getsize(TESTFN) + + # do the removal and check the result + with open(TESTFN, 'wb') as fh: + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): + zinfos = self._prepare_zip_from_test_files(Unseekable(fh), test_files, force_zip64=True) + with zipfile.ZipFile(TESTFN, 'a', self.compression) as zh: + # make sure data descriptor bit is really set (by making zipfile unseekable) + for zi in zh.infolist(): + self.assertTrue(zi.flag_bits & 8, f'data descriptor flag not set: {zi.filename}') + + for i in ii: + zh.remove(test_files[i][0]) + zh.repack() + + # check infolist + self.assertEqual( + [ComparableZipInfo(zi) for zi in zh.infolist()], + expected_zinfos, + ) + + # make sure the zip file is still valid + self.assertIsNone(zh.testzip()) + + # check file size + self.assertEqual(os.path.getsize(TESTFN), expected_size) + + def test_repack_validate(self): + file = 'datafile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + + # closed: error out and do nothing + with zipfile.ZipFile(TESTFN, 'w') as zh: + zh.writestr(file, data) + with zipfile.ZipFile(TESTFN, 'a') as zh: + zh.close() + with self.assertRaises(ValueError): + zh.repack() + + # writing: error out and do nothing + with zipfile.ZipFile(TESTFN, 'w') as zh: + zh.writestr(file, data) + with zipfile.ZipFile(TESTFN, 'a') as zh: + with zh.open('newfile.txt', 'w') as fh: + with self.assertRaises(ValueError): + zh.repack() + + # mode 'r': error out and do nothing + with zipfile.ZipFile(TESTFN, 'w') as zh: + zh.writestr(file, data) + with zipfile.ZipFile(TESTFN, 'r') as zh: + with self.assertRaises(ValueError): + zh.repack() + + # mode 'w': error out and do nothing + with zipfile.ZipFile(TESTFN, 'w') as zh: + zh.writestr(file, data) + with zipfile.ZipFile(TESTFN, 'w') as zh: + with self.assertRaises(ValueError): + zh.repack() + + # mode 'x': error out and do nothing + os.remove(TESTFN) + with zipfile.ZipFile(TESTFN, 'x') as zh: + with self.assertRaises(ValueError): + zh.repack() + +class StoredRepackTests(AbstractRepackTests, unittest.TestCase): + compression = zipfile.ZIP_STORED + +@requires_zlib() +class DeflateRepackTests(AbstractRepackTests, unittest.TestCase): + compression = zipfile.ZIP_DEFLATED + +@requires_bz2() +class Bzip2RepackTests(AbstractRepackTests, unittest.TestCase): + compression = zipfile.ZIP_BZIP2 + +@requires_lzma() +class LzmaRepackTests(AbstractRepackTests, unittest.TestCase): + compression = zipfile.ZIP_LZMA + +@requires_zstd() +class ZstdRepackTests(AbstractRepackTests, unittest.TestCase): + compression = zipfile.ZIP_ZSTANDARD + + class PyZipFileTests(unittest.TestCase): def assertCompiledIn(self, name, namelist): if name + 'o' not in namelist: diff --git a/Lib/test/test_zipfile64.py b/Lib/test/test_zipfile64.py index 2e1affe0252..ba943719fcc 100644 --- a/Lib/test/test_zipfile64.py +++ b/Lib/test/test_zipfile64.py @@ -14,11 +14,14 @@ import zipfile, unittest import time import sys +import unittest.mock as mock from tempfile import TemporaryFile from test.support import os_helper from test.support import requires_zlib +from test.test_zipfile.test_core import Unseekable +from test.test_zipfile.test_core import struct_pack_no_dd_sig TESTFN = os_helper.TESTFN TESTFN2 = TESTFN + "2" @@ -87,6 +90,132 @@ def tearDown(self): os_helper.unlink(TESTFN2) +class TestRepack(unittest.TestCase): + def setUp(self): + # Create test data. + line_gen = ("Test of zipfile line %d." % i for i in range(1000000)) + self.data = '\n'.join(line_gen).encode('ascii') + + # It will contain enough copies of self.data to reach about 8 GiB. + self.datacount = 8*1024**3 // len(self.data) + + def _write_large_file(self, fh): + next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL + for num in range(self.datacount): + fh.write(self.data) + # Print still working message since this test can be really slow + if next_time <= time.monotonic(): + next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL + print(( + ' writing %d of %d, be patient...' % + (num, self.datacount)), file=sys.__stdout__) + sys.__stdout__.flush() + + def test_clean_removed_large_file(self): + """Should move the physical data of a file positioned after a large + removed file without causing a memory issue.""" + # Try the temp file. If we do TESTFN2, then it hogs + # gigabytes of disk space for the duration of the test. + with TemporaryFile() as f: + self._test_clean_removed_large_file(f) + self.assertFalse(f.closed) + + def _test_clean_removed_large_file(self, f): + file = 'file.txt' + file1 = 'largefile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + with zipfile.ZipFile(f, 'w') as zh: + with zh.open(file1, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + zh.writestr(file, data) + + with zipfile.ZipFile(f, 'a') as zh: + zh.remove(file1) + zh.repack() + self.assertIsNone(zh.testzip()) + + def test_clean_removed_file_before_large_file(self): + """Should move the physical data of a large file positioned after a + removed file without causing a memory issue.""" + # Try the temp file. If we do TESTFN2, then it hogs + # gigabytes of disk space for the duration of the test. + with TemporaryFile() as f: + self._test_clean_removed_file_before_large_file(f) + self.assertFalse(f.closed) + + def _test_clean_removed_file_before_large_file(self, f): + file = 'file.txt' + file1 = 'largefile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + with zipfile.ZipFile(f, 'w') as zh: + zh.writestr(file, data) + with zh.open(file1, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + + with zipfile.ZipFile(f, 'a') as zh: + zh.remove(file) + zh.repack() + self.assertIsNone(zh.testzip()) + + def test_clean_removed_large_file_with_dd(self): + """Should scan for the data descriptor of a removed large file without + causing a memory issue.""" + # Try the temp file. If we do TESTFN2, then it hogs + # gigabytes of disk space for the duration of the test. + with TemporaryFile() as f: + self._test_clean_removed_large_file_with_dd(f) + self.assertFalse(f.closed) + + def _test_clean_removed_large_file_with_dd(self, f): + file = 'file.txt' + file1 = 'largefile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + with zipfile.ZipFile(Unseekable(f), 'w') as zh: + with zh.open(file1, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + zh.writestr(file, data) + + with zipfile.ZipFile(f, 'a') as zh: + # make sure data descriptor bit is really set (by making zip file unseekable) + for zi in zh.infolist(): + self.assertTrue(zi.flag_bits & 8, f'data descriptor flag not set: {zi.filename}') + + zh.remove(file1) + zh.repack() + self.assertIsNone(zh.testzip()) + + def test_clean_removed_large_file_with_dd_no_sig(self): + """Should scan for the data descriptor (without signature) of a removed + large file without causing a memory issue.""" + # Try the temp file. If we do TESTFN2, then it hogs + # gigabytes of disk space for the duration of the test. + with TemporaryFile() as f: + self._test_clean_removed_large_file_with_dd_no_sig(f) + self.assertFalse(f.closed) + + def _test_clean_removed_large_file_with_dd_no_sig(self, f): + # Reduce data to 400 MiB for this test, as it's especially slow... + self.datacount = 400*1024**2 // len(self.data) + + file = 'file.txt' + file1 = 'largefile.txt' + data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem' + with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig): + with zipfile.ZipFile(Unseekable(f), 'w') as zh: + with zh.open(file1, 'w', force_zip64=True) as fh: + self._write_large_file(fh) + zh.writestr(file, data) + + with zipfile.ZipFile(f, 'a') as zh: + # make sure data descriptor bit is really set (by making zip file unseekable) + for zi in zh.infolist(): + self.assertTrue(zi.flag_bits & 8, f'data descriptor flag not set: {zi.filename}') + + zh.remove(file1) + zh.repack() + self.assertIsNone(zh.testzip()) + + class OtherTests(unittest.TestCase): def testMoreThan64kFiles(self): # This test checks that more than 64k files can be added to an archive, diff --git a/Lib/zipfile/__init__.py b/Lib/zipfile/__init__.py index 18caeb3e04a..54f3b1f3f67 100644 --- a/Lib/zipfile/__init__.py +++ b/Lib/zipfile/__init__.py @@ -1367,6 +1367,322 @@ def close(self): self._zipfile._writing = False +class _ZipRepacker: + """Class for ZipFile repacking.""" + def __init__(self, debug=0): + self.debug = debug # Level of printing: 0 through 3 + + def repack(self, zfile): + """ + Repack the ZIP file, removing unrecorded local file entries and random + bytes not listed in the central directory. + + Assumes that local file entries are written consecutively without gaps. + + Truncation is applied in two phases: + + 1. Before the first recorded file entry: + - If a sequence of valid local file entries (starting with + `PK\x03\x04`) is found leading up to the first recorded entry, + it is truncated. + - Otherwise, all leading bytes are preserved (e.g., in cases such + as self-extracting code or embedded ZIP libraries). + + 2. Between or after the recorded entries: + - Any data between two recorded entries, or after the last recorded + entry but before the central directory, is removed—regardless of + whether it resembles a valid entry. + + ### Examples + + Truncation before first recorded entry: + + [random bytes] + [unrecorded local file entry 1] + [unrecorded local file entry 2] + [random bytes] + <- truncation start + [unrecorded local file entry 3] + [unrecorded local file entry 4] + <- truncation end + [recorded local file entry 1] + ... + [central directory] + + Truncation between recorded entries: + + ... + [recorded local file entry 5] + <- truncation start + [random bytes] + [unrecorded local file entry] + [random bytes] + <- truncation end + [recorded local file entry 6] + ... + [recorded local file entry n] + <- truncation start + [unrecorded local file entry] + <- truncation end + [central directory] + + No truncation case: + + [unrecorded local file entry 1] + [unrecorded local file entry 2] + ... + [unrecorded local file entry n] + [random bytes] + [recorded local file entry 1] + ... + """ + with zfile._lock: + self._repack(zfile) + + def _repack(self, zfile, *, chunk_size=2**20): + fp = zfile.fp + + # get a sorted filelist by header offset, in case the dir order + # doesn't match the actual entry order + filelist = sorted(zfile.filelist, key=lambda x: x.header_offset) + + # calculate the starting entry offset (bytes to skip) + entry_offset = 0 + + try: + data_offset = filelist[0].header_offset + except IndexError: + data_offset = zfile.start_dir + + if data_offset > 0: + if self.debug > 2: + print('scanning file signatures before:', data_offset) + for pos in self._iter_scan_signature(fp, stringFileHeader, 0, data_offset): + if self._starts_consecutive_file_entries(fp, pos, data_offset): + entry_offset = data_offset - pos + break + + # move file entries + for i, info in enumerate(filelist): + # get the total size of the entry + try: + offset = filelist[i + 1].header_offset + except IndexError: + offset = zfile.start_dir + entry_size = offset - info.header_offset + + used_entry_size = self._calc_local_file_entry_size(fp, info) + + # update the header and move entry data to the new position + if entry_offset > 0: + old_header_offset = info.header_offset + info.header_offset -= entry_offset + read_size = 0 + while read_size < used_entry_size: + fp.seek(old_header_offset + read_size) + data = fp.read(min(used_entry_size - read_size, chunk_size)) + fp.seek(info.header_offset + read_size) + fp.write(data) + fp.flush() + read_size += len(data) + + if info._end_offset is not None: + info._end_offset = info.header_offset + used_entry_size + + # update entry_offset for subsequent files to follow + if used_entry_size < entry_size: + entry_offset += entry_size - used_entry_size + + # Avoid missing entry if entries have a duplicated name. + # Reverse the order as NameToInfo normally stores the last added one. + for info in reversed(zfile.filelist): + zfile.NameToInfo.setdefault(info.filename, info) + + # update state + zfile.start_dir -= entry_offset + zfile._didModify = True + + def _iter_scan_signature(self, fp, signature, start_offset, end_offset, chunk_size=4096): + sig_len = len(signature) + remainder = b'' + pos = start_offset + + fp.seek(start_offset) + while pos < end_offset: + read_size = min(chunk_size, end_offset - pos) + chunk = remainder + fp.read(read_size) + if not chunk: + break + + idx = 0 + while True: + idx = chunk.find(signature, idx) + if idx == -1 or idx + sig_len > len(chunk): + break + + abs_pos = pos - len(remainder) + idx + yield abs_pos + idx += 1 + + remainder = chunk[-(sig_len - 1):] + pos += read_size + + def _starts_consecutive_file_entries(self, fp, start_offset, end_offset): + offset = start_offset + + while offset < end_offset: + if self.debug > 2: + print('checking local file entry:', offset) + + fp.seek(offset) + try: + fheader = self._read_local_file_header(fp) + except BadZipFile: + return False + + # Create a dummy ZipInfo to utilize parsing. + # Flush only the required information. + zinfo = ZipInfo() + zinfo.header_offset = offset + zinfo.flag_bits = fheader[_FH_GENERAL_PURPOSE_FLAG_BITS] + zinfo.compress_size = fheader[_FH_COMPRESSED_SIZE] + zinfo.file_size = fheader[_FH_UNCOMPRESSED_SIZE] + zinfo.CRC = fheader[_FH_CRC] + + filename = fp.read(fheader[_FH_FILENAME_LENGTH]) + zinfo.extra = fp.read(fheader[_FH_EXTRA_FIELD_LENGTH]) + pos = fp.tell() + + if pos > end_offset: + return False + + try: + zinfo._decodeExtra(crc32(filename)) # parse zip64 + except BadZipFile: + return False + + data_descriptor_size = 0 + + if zinfo.flag_bits & _MASK_USE_DATA_DESCRIPTOR: + # According to the spec, these fields should be zero when data + # descriptor is used. Otherwise treat as a false positive on + # random bytes to return early, as scanning for data descriptor + # is rather intensive. + if not (zinfo.CRC == zinfo.compress_size == zinfo.file_size == 0): + return False + + zip64 = ( + fheader[_FH_UNCOMPRESSED_SIZE] == 0xffffffff or + fheader[_FH_COMPRESSED_SIZE] == 0xffffffff + ) + + dd = self._scan_data_descriptor(fp, pos, end_offset, zip64) + + if dd is None: + return False + + crc, compress_size, file_size, data_descriptor_size = dd + zinfo.CRC = crc + zinfo.compress_size = compress_size + zinfo.file_size = file_size + + offset += ( + sizeFileHeader + + fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] + + zinfo.compress_size + + data_descriptor_size + ) + + if self.debug > 2: + print('next', offset) + + return offset == end_offset + + def _read_local_file_header(self, fp): + fheader = fp.read(sizeFileHeader) + if len(fheader) != sizeFileHeader: + raise BadZipFile("Truncated file header") + fheader = struct.unpack(structFileHeader, fheader) + if fheader[_FH_SIGNATURE] != stringFileHeader: + raise BadZipFile("Bad magic number for file header") + return fheader + + def _scan_data_descriptor(self, fp, offset, end_offset, zip64): + dd_fmt = '