This commit is contained in:
Danny Lin 2025-06-21 23:07:50 +08:00
parent 11c09378b2
commit 4b2176e890
3 changed files with 54 additions and 56 deletions

View file

@ -527,7 +527,7 @@ ZipFile Objects
a path is provided. a path is provided.
This does not physically remove the local file entry from the archive. This does not physically remove the local file entry from the archive.
Call :meth:`ZipFile.repack` afterwards to reclaim space. Call :meth:`repack` afterwards to reclaim space.
The archive must be opened with mode ``'w'``, ``'x'`` or ``'a'``. The archive must be opened with mode ``'w'``, ``'x'`` or ``'a'``.

View file

@ -1362,8 +1362,11 @@ class ZstdWriterTests(AbstractWriterTests, unittest.TestCase):
compression = zipfile.ZIP_ZSTANDARD compression = zipfile.ZIP_ZSTANDARD
def ComparableZipInfo(zinfo): class ComparableZipInfo:
return (zinfo.filename, zinfo.header_offset, zinfo.compress_size, zinfo.CRC) keys = [i for i in zipfile.ZipInfo.__slots__ if not i.startswith('_')]
def __new__(cls, zinfo):
return {i: getattr(zinfo, i) for i in cls.keys}
_struct_pack = struct.pack _struct_pack = struct.pack
@ -1379,6 +1382,8 @@ def struct_pack_no_dd_sig(fmt, *values):
class RepackHelperMixin: class RepackHelperMixin:
"""Common helpers for remove and repack.""" """Common helpers for remove and repack."""
maxDiff = 8192
@classmethod @classmethod
def _prepare_test_files(cls): def _prepare_test_files(cls):
return [ return [
@ -1389,14 +1394,11 @@ def _prepare_test_files(cls):
@classmethod @classmethod
def _prepare_zip_from_test_files(cls, zfname, test_files, force_zip64=False): def _prepare_zip_from_test_files(cls, zfname, test_files, force_zip64=False):
zinfos = []
with zipfile.ZipFile(zfname, 'w', cls.compression) as zh: with zipfile.ZipFile(zfname, 'w', cls.compression) as zh:
for file, data in test_files: for file, data in test_files:
with zh.open(file, 'w', force_zip64=force_zip64) as fh: with zh.open(file, 'w', force_zip64=force_zip64) as fh:
fh.write(data) fh.write(data)
zinfo = zh.getinfo(file) return list(zh.infolist())
zinfos.append(ComparableZipInfo(zinfo))
return zinfos
class AbstractRemoveTests(RepackHelperMixin): class AbstractRemoveTests(RepackHelperMixin):
@classmethod @classmethod
@ -1416,7 +1418,7 @@ def test_remove_by_name(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zi for j, zi in enumerate(zinfos) if j != i], [ComparableZipInfo(zi) for j, zi in enumerate(zinfos) if j != i],
) )
# check NameToInfo cache # check NameToInfo cache
@ -1437,7 +1439,7 @@ def test_remove_by_zinfo(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zi for j, zi in enumerate(zinfos) if j != i], [ComparableZipInfo(zi) for j, zi in enumerate(zinfos) if j != i],
) )
# check NameToInfo cache # check NameToInfo cache
@ -1478,13 +1480,13 @@ def test_remove_by_name_duplicated(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zinfos[0], zinfos[2]], [ComparableZipInfo(zi) for zi in [zinfos[0], zinfos[2]]],
) )
# check NameToInfo cache # check NameToInfo cache
self.assertEqual( self.assertEqual(
ComparableZipInfo(zh.getinfo('file.txt')), ComparableZipInfo(zh.getinfo('file.txt')),
zinfos[0], ComparableZipInfo(zinfos[0]),
) )
# make sure the zip file is still valid # make sure the zip file is still valid
@ -1499,7 +1501,7 @@ def test_remove_by_name_duplicated(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zinfos[2]], [ComparableZipInfo(zi) for zi in [zinfos[2]]],
) )
# check NameToInfo cache # check NameToInfo cache
@ -1528,13 +1530,13 @@ def test_remove_by_zinfo_duplicated(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zinfos[1], zinfos[2]], [ComparableZipInfo(zi) for zi in [zinfos[1], zinfos[2]]],
) )
# check NameToInfo cache # check NameToInfo cache
self.assertEqual( self.assertEqual(
ComparableZipInfo(zh.getinfo('file.txt')), ComparableZipInfo(zh.getinfo('file.txt')),
zinfos[1], ComparableZipInfo(zinfos[1]),
) )
# make sure the zip file is still valid # make sure the zip file is still valid
@ -1548,13 +1550,13 @@ def test_remove_by_zinfo_duplicated(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zinfos[0], zinfos[2]], [ComparableZipInfo(zi) for zi in [zinfos[0], zinfos[2]]],
) )
# check NameToInfo cache # check NameToInfo cache
self.assertEqual( self.assertEqual(
ComparableZipInfo(zh.getinfo('file.txt')), ComparableZipInfo(zh.getinfo('file.txt')),
zinfos[0], ComparableZipInfo(zinfos[0]),
) )
# make sure the zip file is still valid # make sure the zip file is still valid
@ -1570,7 +1572,7 @@ def test_remove_by_zinfo_duplicated(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zinfos[2]], [ComparableZipInfo(zi) for zi in [zinfos[2]]],
) )
# check NameToInfo cache # check NameToInfo cache
@ -1591,7 +1593,7 @@ def test_remove_zip64(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zi for j, zi in enumerate(zinfos) if j != i], [ComparableZipInfo(zi) for j, zi in enumerate(zinfos) if j != i],
) )
# check NameToInfo cache # check NameToInfo cache
@ -1626,14 +1628,14 @@ def test_remove_mode_w(self):
with zipfile.ZipFile(TESTFN, 'w') as zh: with zipfile.ZipFile(TESTFN, 'w') as zh:
for file, data in self.test_files: for file, data in self.test_files:
zh.writestr(file, data) zh.writestr(file, data)
zinfos = [ComparableZipInfo(zi) for zi in zh.infolist()] zinfos = list(zh.infolist())
zh.remove(self.test_files[0][0]) zh.remove(self.test_files[0][0])
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zinfos[1], zinfos[2]], [ComparableZipInfo(zi) for zi in [zinfos[1], zinfos[2]]],
) )
# check NameToInfo cache # check NameToInfo cache
@ -1648,14 +1650,14 @@ def test_remove_mode_x(self):
with zipfile.ZipFile(TESTFN, 'x') as zh: with zipfile.ZipFile(TESTFN, 'x') as zh:
for file, data in self.test_files: for file, data in self.test_files:
zh.writestr(file, data) zh.writestr(file, data)
zinfos = [ComparableZipInfo(zi) for zi in zh.infolist()] zinfos = list(zh.infolist())
zh.remove(self.test_files[0][0]) zh.remove(self.test_files[0][0])
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
[zinfos[1], zinfos[2]], [ComparableZipInfo(zi) for zi in [zinfos[1], zinfos[2]]],
) )
# check NameToInfo cache # check NameToInfo cache
@ -1714,7 +1716,7 @@ def test_repack_basic(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -1766,7 +1768,7 @@ def test_repack_bytes_before_first_file(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -1800,7 +1802,7 @@ def test_repack_magic_before_first_file(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -1846,7 +1848,7 @@ def test_repack_file_entry_before_first_file(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -1856,6 +1858,7 @@ def test_repack_file_entry_before_first_file(self):
with zipfile.ZipFile(TESTFN) as zh: with zipfile.ZipFile(TESTFN) as zh:
self.assertIsNone(zh.testzip()) self.assertIsNone(zh.testzip())
@mock.patch.object(time, 'time', new=lambda: 315504000) # fix time for ZipFile.writestr()
def test_repack_bytes_before_removed_files(self): def test_repack_bytes_before_removed_files(self):
"""Should preserve if there are bytes before stale local file entries.""" """Should preserve if there are bytes before stale local file entries."""
for ii in ([1], [1, 2], [2]): for ii in ([1], [1, 2], [2]):
@ -1870,7 +1873,7 @@ def test_repack_bytes_before_removed_files(self):
zh.writestr(file, data) zh.writestr(file, data)
for i in ii: for i in ii:
zh.remove(self.test_files[i][0]) zh.remove(self.test_files[i][0])
expected_zinfos = [ComparableZipInfo(zi) for zi in zh.infolist()] expected_zinfos = list(zh.infolist())
expected_size = os.path.getsize(TESTFN) expected_size = os.path.getsize(TESTFN)
# do the removal and check the result # do the removal and check the result
@ -1889,7 +1892,7 @@ def test_repack_bytes_before_removed_files(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -1899,6 +1902,7 @@ def test_repack_bytes_before_removed_files(self):
with zipfile.ZipFile(TESTFN) as zh: with zipfile.ZipFile(TESTFN) as zh:
self.assertIsNone(zh.testzip()) self.assertIsNone(zh.testzip())
@mock.patch.object(time, 'time', new=lambda: 315504000) # fix time for ZipFile.writestr()
def test_repack_bytes_after_removed_files(self): def test_repack_bytes_after_removed_files(self):
"""Should keep extra bytes if there are bytes after stale local file entries.""" """Should keep extra bytes if there are bytes after stale local file entries."""
for ii in ([1], [1, 2], [2]): for ii in ([1], [1, 2], [2]):
@ -1912,7 +1916,7 @@ def test_repack_bytes_after_removed_files(self):
if i == ii[-1]: if i == ii[-1]:
fh.write(b' dummy bytes ') fh.write(b' dummy bytes ')
zh.start_dir = fh.tell() zh.start_dir = fh.tell()
expected_zinfos = [ComparableZipInfo(zi) for zi in zh.infolist()] expected_zinfos = list(zh.infolist())
expected_size = os.path.getsize(TESTFN) expected_size = os.path.getsize(TESTFN)
# do the removal and check the result # do the removal and check the result
@ -1931,7 +1935,7 @@ def test_repack_bytes_after_removed_files(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -1941,6 +1945,7 @@ def test_repack_bytes_after_removed_files(self):
with zipfile.ZipFile(TESTFN) as zh: with zipfile.ZipFile(TESTFN) as zh:
self.assertIsNone(zh.testzip()) self.assertIsNone(zh.testzip())
@mock.patch.object(time, 'time', new=lambda: 315504000) # fix time for ZipFile.writestr()
def test_repack_bytes_between_removed_files(self): def test_repack_bytes_between_removed_files(self):
"""Should strip only local file entries before random bytes.""" """Should strip only local file entries before random bytes."""
# calculate the expected results # calculate the expected results
@ -1951,7 +1956,7 @@ def test_repack_bytes_between_removed_files(self):
zh.start_dir = fh.tell() zh.start_dir = fh.tell()
zh.writestr(*self.test_files[2]) zh.writestr(*self.test_files[2])
zh.remove(self.test_files[2][0]) zh.remove(self.test_files[2][0])
expected_zinfos = [ComparableZipInfo(zi) for zi in zh.infolist()] expected_zinfos = list(zh.infolist())
expected_size = os.path.getsize(TESTFN) expected_size = os.path.getsize(TESTFN)
# do the removal and check the result # do the removal and check the result
@ -1970,7 +1975,7 @@ def test_repack_bytes_between_removed_files(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -1992,7 +1997,7 @@ def test_repack_prepended_bytes(self):
fh.write(b'dummy ') fh.write(b'dummy ')
fh.write(fz.read()) fh.write(fz.read())
with zipfile.ZipFile(TESTFN) as zh: with zipfile.ZipFile(TESTFN) as zh:
expected_zinfos = [ComparableZipInfo(zi) for zi in zh.infolist()] expected_zinfos = list(zh.infolist())
expected_size = os.path.getsize(TESTFN) expected_size = os.path.getsize(TESTFN)
# do the removal and check the result # do the removal and check the result
@ -2010,7 +2015,7 @@ def test_repack_prepended_bytes(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -2055,7 +2060,7 @@ def test_repack_removed_basic(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -2098,20 +2103,20 @@ def test_repack_removed_partial(self):
with zipfile.ZipFile(TESTFN) as zh: with zipfile.ZipFile(TESTFN) as zh:
self.assertIsNone(zh.testzip()) self.assertIsNone(zh.testzip())
@mock.patch.object(time, 'time', new=lambda: 315504000) # fix time for ZipFile.writestr()
def test_repack_removed_bytes_between_files(self): def test_repack_removed_bytes_between_files(self):
"""Should not remove bytes between local file entries.""" """Should not remove bytes between local file entries."""
for ii in ([0], [1], [2]): for ii in ([0], [1], [2]):
with self.subTest(removed=ii): with self.subTest(removed=ii):
# calculate the expected results # calculate the expected results
expected_zinfos = []
with open(TESTFN, 'wb') as fh: with open(TESTFN, 'wb') as fh:
with zipfile.ZipFile(fh, 'w', self.compression) as zh: with zipfile.ZipFile(fh, 'w', self.compression) as zh:
for j, (file, data) in enumerate(self.test_files): for j, (file, data) in enumerate(self.test_files):
if j not in ii: if j not in ii:
zh.writestr(file, data) zh.writestr(file, data)
expected_zinfos.append(ComparableZipInfo(zh.getinfo(file)))
fh.write(b' dummy bytes ') fh.write(b' dummy bytes ')
zh.start_dir = fh.tell() zh.start_dir = fh.tell()
expected_zinfos = list(zh.infolist())
expected_size = os.path.getsize(TESTFN) expected_size = os.path.getsize(TESTFN)
# do the removal and check the result # do the removal and check the result
@ -2128,7 +2133,7 @@ def test_repack_removed_bytes_between_files(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size
@ -2184,7 +2189,7 @@ def test_repack_removed_prepended_bytes(self):
fh.write(b'dummy ') fh.write(b'dummy ')
fh.write(fz.read()) fh.write(fz.read())
with zipfile.ZipFile(TESTFN) as zh: with zipfile.ZipFile(TESTFN) as zh:
expected_zinfos = [ComparableZipInfo(zi) for zi in zh.infolist()] expected_zinfos = list(zh.infolist())
expected_size = os.path.getsize(TESTFN) expected_size = os.path.getsize(TESTFN)
# do the removal and check the result # do the removal and check the result
@ -2201,7 +2206,7 @@ def test_repack_removed_prepended_bytes(self):
# check infolist # check infolist
self.assertEqual( self.assertEqual(
[ComparableZipInfo(zi) for zi in zh.infolist()], [ComparableZipInfo(zi) for zi in zh.infolist()],
expected_zinfos, [ComparableZipInfo(zi) for zi in expected_zinfos],
) )
# check file size # check file size

View file

@ -1389,10 +1389,9 @@ def repack(self, zfile, removed=None):
""" """
Repack the ZIP file, stripping unreferenced local file entries. Repack the ZIP file, stripping unreferenced local file entries.
Assumes that local file entries are stored consecutively, with no gaps Assumes that local file entries (and the central directory, which is
or overlaps. mostly treated as the "last entry") are stored consecutively, with no
gaps or overlaps:
Behavior:
1. If any referenced entry overlaps with another, a `BadZipFile` error 1. If any referenced entry overlaps with another, a `BadZipFile` error
is raised since safe repacking cannot be guaranteed. is raised since safe repacking cannot be guaranteed.
@ -1405,8 +1404,8 @@ def repack(self, zfile, removed=None):
be a sequence of consecutive entries with no extra preceding bytes; be a sequence of consecutive entries with no extra preceding bytes;
extra following bytes are preserved. extra following bytes are preserved.
4. This is to prevent an unexpected data removal (false positive), This is to prevent an unexpected data removal (false positive), though
though a false negative may happen in certain rare cases. a false negative may happen in certain rare cases.
Examples: Examples:
@ -1456,8 +1455,8 @@ def repack(self, zfile, removed=None):
- Modifies the ZIP file in place. - Modifies the ZIP file in place.
- Updates zfile.start_dir to account for removed data. - Updates zfile.start_dir to account for removed data.
- Sets zfile._didModify to True. - Sets zfile._didModify to True.
- Updates header_offset and _end_offset of referenced ZipInfo - Updates header_offset and clears _end_offset of referenced
instances. ZipInfo instances.
Parameters: Parameters:
zfile: A ZipFile object representing the archive to repack. zfile: A ZipFile object representing the archive to repack.
@ -1559,14 +1558,8 @@ def repack(self, zfile, removed=None):
zfile.start_dir -= entry_offset zfile.start_dir -= entry_offset
zfile._didModify = True zfile._didModify = True
end_offset = zfile.start_dir for zinfo in filelist:
for zinfo in reversed(filelist): zinfo._end_offset = None
if zinfo in removed_zinfos:
zinfo._end_offset = None
else:
if zinfo._end_offset is not None:
zinfo._end_offset = end_offset
end_offset = zinfo.header_offset
def _calc_initial_entry_offset(self, fp, data_offset): def _calc_initial_entry_offset(self, fp, data_offset):
checked_offsets = {} checked_offsets = {}