This commit is contained in:
Danny Lin 2025-12-08 06:11:59 +02:00 committed by GitHub
commit 7cfc20ff91
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 2773 additions and 0 deletions

View file

@ -548,6 +548,69 @@ ZipFile objects
.. versionadded:: 3.11
.. method:: ZipFile.remove(zinfo_or_arcname)
Removes a member entry from the archive's central directory.
*zinfo_or_arcname* may be the full path of the member or a :class:`ZipInfo`
instance. If multiple members share the same full path and the path is
provided, only one of them is removed.
The archive must be opened with mode ``'w'``, ``'x'`` or ``'a'``.
Returns the removed :class:`ZipInfo` instance.
Calling :meth:`remove` on a closed ZipFile will raise a :exc:`ValueError`.
.. note::
This method only removes the member's entry from the central directory,
making it inaccessible to most tools. The member's local file entry,
including content and metadata, remains in the archive and is still
recoverable using forensic tools. Call :meth:`repack` afterwards to
completely remove the member and reclaim space.
.. versionadded:: next
.. method:: ZipFile.repack(removed=None, *, \
strict_descriptor=False[, chunk_size])
Rewrites the archive to remove unreferenced local file entries, shrinking
its file size. The archive must be opened with mode ``'a'``.
If *removed* is provided, it must be a sequence of :class:`ZipInfo` objects
representing the recently removed members, and only their corresponding
local file entries will be removed. Otherwise, the archive is scanned to
locate and remove local file entries that are no longer referenced in the
central directory.
When scanning, setting ``strict_descriptor=True`` disables detection of any
entry using an unsigned data descriptor (a format deprecated by the ZIP
specification since version 6.3.0, released on 2006-09-29, and used only by
some legacy tools), which is significantly slower to scan—around 100 to
1000 times in the worst case. This does not affect performance on entries
without such feature.
*chunk_size* may be specified to control the buffer size when moving
entry data (default is 1 MiB).
Calling :meth:`repack` on a closed ZipFile will raise a :exc:`ValueError`.
.. note::
The scanning algorithm is heuristic-based and assumes that the ZIP file
is normally structured—for example, with local file entries stored
consecutively, without overlap or interleaved binary data. Prepended
binary data, such as a self-extractor stub, is recognized and preserved
unless it happens to contain bytes that coincidentally resemble a valid
local file entry in multiple respects—an extremely rare case. Embedded
ZIP payloads are also handled correctly, as long as they follow normal
structure. However, the algorithm does not guarantee correctness or
safety on untrusted or intentionally crafted input. It is generally
recommended to provide the *removed* argument for better reliability and
performance.
.. versionadded:: next
The following data attributes are also available:
.. attribute:: ZipFile.filename

File diff suppressed because it is too large Load diff

View file

@ -13,12 +13,16 @@
import zipfile, unittest
import time
import tracemalloc
import sys
import unittest.mock as mock
from tempfile import TemporaryFile
from test.support import os_helper
from test.support import requires_zlib
from test.test_zipfile.test_core import Unseekable
from test.test_zipfile.test_core import struct_pack_no_dd_sig
TESTFN = os_helper.TESTFN
TESTFN2 = TESTFN + "2"
@ -87,6 +91,174 @@ def tearDown(self):
os_helper.unlink(TESTFN2)
class TestRepack(unittest.TestCase):
def setUp(self):
# Create test data.
line_gen = ("Test of zipfile line %d." % i for i in range(1000000))
self.data = '\n'.join(line_gen).encode('ascii')
# It will contain enough copies of self.data to reach about 8 GiB.
self.datacount = 8*1024**3 // len(self.data)
# memory usage should not exceed 10 MiB
self.allowed_memory = 10*1024**2
def _write_large_file(self, fh):
next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
for num in range(self.datacount):
fh.write(self.data)
# Print still working message since this test can be really slow
if next_time <= time.monotonic():
next_time = time.monotonic() + _PRINT_WORKING_MSG_INTERVAL
print((
' writing %d of %d, be patient...' %
(num, self.datacount)), file=sys.__stdout__)
sys.__stdout__.flush()
def test_strip_removed_large_file(self):
"""Should move the physical data of a file positioned after a large
removed file without causing a memory issue."""
# Try the temp file. If we do TESTFN2, then it hogs
# gigabytes of disk space for the duration of the test.
with TemporaryFile() as f:
tracemalloc.start()
self._test_strip_removed_large_file(f)
self.assertFalse(f.closed)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.assertLess(peak, self.allowed_memory)
def _test_strip_removed_large_file(self, f):
file = 'file.txt'
file1 = 'largefile.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
with zipfile.ZipFile(f, 'w') as zh:
with zh.open(file1, 'w', force_zip64=True) as fh:
self._write_large_file(fh)
zh.writestr(file, data)
with zipfile.ZipFile(f, 'a') as zh:
zh.remove(file1)
zh.repack()
self.assertIsNone(zh.testzip())
def test_strip_removed_file_before_large_file(self):
"""Should move the physical data of a large file positioned after a
removed file without causing a memory issue."""
# Try the temp file. If we do TESTFN2, then it hogs
# gigabytes of disk space for the duration of the test.
with TemporaryFile() as f:
tracemalloc.start()
self._test_strip_removed_file_before_large_file(f)
self.assertFalse(f.closed)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.assertLess(peak, self.allowed_memory)
def _test_strip_removed_file_before_large_file(self, f):
file = 'file.txt'
file1 = 'largefile.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
with zipfile.ZipFile(f, 'w') as zh:
zh.writestr(file, data)
with zh.open(file1, 'w', force_zip64=True) as fh:
self._write_large_file(fh)
with zipfile.ZipFile(f, 'a') as zh:
zh.remove(file)
zh.repack()
self.assertIsNone(zh.testzip())
def test_strip_removed_large_file_with_dd(self):
"""Should scan for the data descriptor of a removed large file without
causing a memory issue."""
# Try the temp file. If we do TESTFN2, then it hogs
# gigabytes of disk space for the duration of the test.
with TemporaryFile() as f:
tracemalloc.start()
self._test_strip_removed_large_file_with_dd(f)
self.assertFalse(f.closed)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.assertLess(peak, self.allowed_memory)
def _test_strip_removed_large_file_with_dd(self, f):
file = 'file.txt'
file1 = 'largefile.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
with zipfile.ZipFile(Unseekable(f), 'w') as zh:
with zh.open(file1, 'w', force_zip64=True) as fh:
self._write_large_file(fh)
zh.writestr(file, data)
with zipfile.ZipFile(f, 'a') as zh:
zh.remove(file1)
zh.repack()
self.assertIsNone(zh.testzip())
def test_strip_removed_large_file_with_dd_no_sig(self):
"""Should scan for the data descriptor (without signature) of a removed
large file without causing a memory issue."""
# Reduce data scale for this test, as it's especially slow...
self.datacount = 30*1024**2 // len(self.data)
self.allowed_memory = 200*1024
# Try the temp file. If we do TESTFN2, then it hogs
# gigabytes of disk space for the duration of the test.
with TemporaryFile() as f:
tracemalloc.start()
self._test_strip_removed_large_file_with_dd_no_sig(f)
self.assertFalse(f.closed)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.assertLess(peak, self.allowed_memory)
def _test_strip_removed_large_file_with_dd_no_sig(self, f):
file = 'file.txt'
file1 = 'largefile.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig):
with zipfile.ZipFile(Unseekable(f), 'w') as zh:
with zh.open(file1, 'w', force_zip64=True) as fh:
self._write_large_file(fh)
zh.writestr(file, data)
with zipfile.ZipFile(f, 'a') as zh:
zh.remove(file1)
zh.repack()
self.assertIsNone(zh.testzip())
@requires_zlib()
def test_strip_removed_large_file_with_dd_no_sig_by_decompression(self):
"""Should scan for the data descriptor (without signature) of a removed
large file without causing a memory issue."""
# Try the temp file. If we do TESTFN2, then it hogs
# gigabytes of disk space for the duration of the test.
with TemporaryFile() as f:
tracemalloc.start()
self._test_strip_removed_large_file_with_dd_no_sig_by_decompression(
f, zipfile.ZIP_DEFLATED)
self.assertFalse(f.closed)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
self.assertLess(peak, self.allowed_memory)
def _test_strip_removed_large_file_with_dd_no_sig_by_decompression(self, f, method):
file = 'file.txt'
file1 = 'largefile.txt'
data = b'Sed ut perspiciatis unde omnis iste natus error sit voluptatem'
with mock.patch('zipfile.struct.pack', side_effect=struct_pack_no_dd_sig):
with zipfile.ZipFile(Unseekable(f), 'w', compression=method) as zh:
with zh.open(file1, 'w', force_zip64=True) as fh:
self._write_large_file(fh)
zh.writestr(file, data)
with zipfile.ZipFile(f, 'a') as zh:
zh.remove(file1)
zh.repack()
self.assertIsNone(zh.testzip())
class OtherTests(unittest.TestCase):
def testMoreThan64kFiles(self):
# This test checks that more than 64k files can be added to an archive,

View file

@ -778,6 +778,13 @@ def __init__(self):
self._unconsumed = b''
self.eof = False
@property
def unused_data(self):
try:
return self._decomp.unused_data
except AttributeError:
return b''
def decompress(self, data):
if self._decomp is None:
self._unconsumed += data
@ -1380,6 +1387,461 @@ def close(self):
self._zipfile._writing = False
class _ZipRepacker:
"""Class for ZipFile repacking."""
def __init__(self, *, strict_descriptor=False, chunk_size=2**20, debug=0):
self.debug = debug # Level of printing: 0 through 3
self.chunk_size = chunk_size
self.strict_descriptor = strict_descriptor
def _debug(self, level, *msg):
if self.debug >= level:
print(*msg)
def repack(self, zfile, removed=None):
"""
Repack the ZIP file, stripping unreferenced local file entries.
Assumes that local file entries (and the central directory, which is
mostly treated as the "last entry") are stored consecutively, with no
gaps or overlaps:
1. If any referenced entry overlaps with another, a `BadZipFile` error
is raised since safe repacking cannot be guaranteed.
2. Data before the first referenced entry is stripped only when it
appears to be a sequence of consecutive entries with no extra
following bytes; extra preceeding bytes are preserved.
3. Data between referenced entries is stripped only when it appears to
be a sequence of consecutive entries with no extra preceding bytes;
extra following bytes are preserved.
This is to prevent an unexpected data removal (false positive), though
a false negative may happen in certain rare cases.
Examples:
Stripping before the first referenced entry:
[random bytes]
[unreferenced local file entry]
[random bytes]
<-- stripping start
[unreferenced local file entry]
[unreferenced local file entry]
<-- stripping end
[local file entry 1] (or central directory)
...
Stripping between referenced entries:
...
[local file entry]
<-- stripping start
[unreferenced local file entry]
[unreferenced local file entry]
<-- stripping end
[random bytes]
[unreferenced local file entry]
[random bytes]
[local file entry] (or central directory)
...
No stripping:
[unreferenced local file entry]
[random bytes]
[local file entry 1] (or central directory)
...
No stripping:
...
[local file entry]
[random bytes]
[unreferenced local file entry]
[local file entry] (or central directory)
...
Side effects:
- Modifies the ZIP file in place.
- Updates zfile.start_dir to account for removed data.
- Sets zfile._didModify to True.
- Updates header_offset and clears _end_offset of referenced
ZipInfo instances.
Parameters:
zfile: A ZipFile object representing the archive to repack.
removed: Optional. A sequence of ZipInfo instances representing
the previously removed entries. When provided, only their
corresponding local file entries are stripped.
"""
removed_zinfos = set(removed or ())
fp = zfile.fp
# get a sorted filelist by header offset, in case the dir order
# doesn't match the actual entry order
filelist = (*zfile.filelist, *removed_zinfos)
filelist = sorted(filelist, key=lambda x: x.header_offset)
# calculate each entry size and validate
entry_size_list = []
used_entry_size_list = []
for i, zinfo in enumerate(filelist):
try:
offset = filelist[i + 1].header_offset
except IndexError:
offset = zfile.start_dir
entry_size = offset - zinfo.header_offset
# may raise on an invalid local file header
used_entry_size = self._calc_local_file_entry_size(fp, zinfo)
self._debug(3, 'entry:', i, zinfo.orig_filename,
zinfo.header_offset, entry_size, used_entry_size)
if used_entry_size > entry_size:
raise BadZipFile(
f"Overlapped entries: {zinfo.orig_filename!r} ")
if removed is not None and zinfo not in removed_zinfos:
used_entry_size = entry_size
entry_size_list.append(entry_size)
used_entry_size_list.append(used_entry_size)
# calculate the starting entry offset (bytes to skip)
if removed is None:
try:
offset = filelist[0].header_offset
except IndexError:
offset = zfile.start_dir
entry_offset = self._calc_initial_entry_offset(fp, offset)
else:
entry_offset = 0
# move file entries
for i, zinfo in enumerate(filelist):
entry_size = entry_size_list[i]
used_entry_size = used_entry_size_list[i]
# update the header and move entry data to the new position
old_header_offset = zinfo.header_offset
zinfo.header_offset -= entry_offset
if zinfo in removed_zinfos:
self._copy_bytes(
fp,
old_header_offset + used_entry_size,
zinfo.header_offset,
entry_size - used_entry_size,
)
# update entry_offset for subsequent files to follow
entry_offset += used_entry_size
else:
if entry_offset > 0:
self._copy_bytes(
fp,
old_header_offset,
zinfo.header_offset,
used_entry_size,
)
stale_entry_size = self._validate_local_file_entry_sequence(
fp,
old_header_offset + used_entry_size,
old_header_offset + entry_size,
)
if stale_entry_size > 0:
self._copy_bytes(
fp,
old_header_offset + used_entry_size + stale_entry_size,
zinfo.header_offset + used_entry_size,
entry_size - used_entry_size - stale_entry_size,
)
# update entry_offset for subsequent files to follow
entry_offset += stale_entry_size
# update state
zfile.start_dir -= entry_offset
zfile._didModify = True
for zinfo in filelist:
zinfo._end_offset = None
def _calc_initial_entry_offset(self, fp, data_offset):
checked_offsets = {}
if data_offset > 0:
self._debug(3, 'scanning file signatures before:', data_offset)
for pos in self._iter_scan_signature(fp, stringFileHeader, 0, data_offset):
self._debug(3, 'checking file signature at:', pos)
entry_size = self._validate_local_file_entry_sequence(
fp, pos, data_offset, checked_offsets)
if entry_size == data_offset - pos:
return entry_size
return 0
def _iter_scan_signature(self, fp, signature, start_offset, end_offset,
chunk_size=io.DEFAULT_BUFFER_SIZE):
sig_len = len(signature)
remainder = b''
pos = start_offset
while pos < end_offset:
# required for each loop since fp may be changed during each yield
fp.seek(pos)
chunk = remainder + fp.read(min(chunk_size, end_offset - pos))
delta = pos - len(remainder)
idx = 0
while True:
idx = chunk.find(signature, idx)
if idx == -1:
break
yield delta + idx
idx += 1
remainder = chunk[-(sig_len - 1):]
pos += chunk_size
def _validate_local_file_entry_sequence(self, fp, start_offset, end_offset, checked_offsets=None):
offset = start_offset
while offset < end_offset:
self._debug(3, 'checking local file entry at:', offset)
# Cache checked offsets to improve performance.
try:
entry_size = checked_offsets[offset]
except (KeyError, TypeError):
entry_size = self._validate_local_file_entry(fp, offset, end_offset)
if checked_offsets is not None:
checked_offsets[offset] = entry_size
else:
self._debug(3, 'read from checked cache:', offset)
if entry_size is None:
break
offset += entry_size
return offset - start_offset
def _validate_local_file_entry(self, fp, offset, end_offset):
fp.seek(offset)
try:
fheader = self._read_local_file_header(fp)
except BadZipFile:
return None
# Create a dummy ZipInfo to utilize parsing.
# Flush only the required information.
zinfo = ZipInfo()
zinfo.header_offset = offset
zinfo.flag_bits = fheader[_FH_GENERAL_PURPOSE_FLAG_BITS]
zinfo.compress_size = fheader[_FH_COMPRESSED_SIZE]
zinfo.file_size = fheader[_FH_UNCOMPRESSED_SIZE]
zinfo.CRC = fheader[_FH_CRC]
filename = fp.read(fheader[_FH_FILENAME_LENGTH])
zinfo.extra = fp.read(fheader[_FH_EXTRA_FIELD_LENGTH])
pos = fp.tell()
if pos > end_offset:
return None
# parse zip64
try:
zinfo._decodeExtra(crc32(filename))
except BadZipFile:
return None
dd_size = 0
if zinfo.flag_bits & _MASK_USE_DATA_DESCRIPTOR:
# According to the spec, these fields should be zero when data
# descriptor is used. Otherwise treat as a false positive on
# random bytes to return early, as scanning for data descriptor
# is rather expensive.
if not (zinfo.CRC == zinfo.compress_size == zinfo.file_size == 0):
return None
zip64 = fheader[_FH_UNCOMPRESSED_SIZE] == 0xffffffff
dd = self._scan_data_descriptor(fp, pos, end_offset, zip64)
if dd is None and not self.strict_descriptor:
if zinfo.flag_bits & _MASK_ENCRYPTED:
dd = False
else:
dd = self._scan_data_descriptor_no_sig_by_decompression(
fp, pos, end_offset, zip64, fheader[_FH_COMPRESSION_METHOD])
if dd is False:
dd = self._scan_data_descriptor_no_sig(fp, pos, end_offset, zip64)
if dd is None:
return None
zinfo.CRC, zinfo.compress_size, zinfo.file_size, dd_size = dd
return (
sizeFileHeader +
fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] +
zinfo.compress_size +
dd_size
)
def _read_local_file_header(self, fp):
fheader = fp.read(sizeFileHeader)
if len(fheader) != sizeFileHeader:
raise BadZipFile("Truncated file header")
fheader = struct.unpack(structFileHeader, fheader)
if fheader[_FH_SIGNATURE] != stringFileHeader:
raise BadZipFile("Bad magic number for file header")
return fheader
def _scan_data_descriptor(self, fp, offset, end_offset, zip64):
dd_fmt = '<LLQQ' if zip64 else '<LLLL'
dd_size = struct.calcsize(dd_fmt)
# scan for signature and take the first valid descriptor
for pos in self._iter_scan_signature(
fp, struct.pack('<L', _DD_SIGNATURE), offset, end_offset
):
fp.seek(pos)
dd = fp.read(min(dd_size, end_offset - pos))
try:
_, crc, compress_size, file_size = struct.unpack(dd_fmt, dd)
except struct.error:
continue
# @TODO: also check CRC to better guard from a false positive?
if pos - offset != compress_size:
continue
return crc, compress_size, file_size, dd_size
return None
def _scan_data_descriptor_no_sig(self, fp, offset, end_offset, zip64, chunk_size=8192):
dd_fmt = '<LQQ' if zip64 else '<LLL'
dd_size = struct.calcsize(dd_fmt)
pos = offset
remainder = b''
fp.seek(offset)
while pos < end_offset:
chunk = remainder + fp.read(min(chunk_size, end_offset - pos))
delta = pos - len(remainder) - offset
mv = memoryview(chunk)
for i in range(len(chunk) - dd_size + 1):
dd = mv[i:i + dd_size]
try:
crc, compress_size, file_size = struct.unpack(dd_fmt, dd)
except struct.error:
continue
if delta + i != compress_size:
continue
return crc, compress_size, file_size, dd_size
remainder = chunk[-(dd_size - 1):]
pos += chunk_size
return None
def _scan_data_descriptor_no_sig_by_decompression(self, fp, offset, end_offset, zip64, method):
try:
decompressor = _get_decompressor(method)
except RuntimeError:
return False
if decompressor is None:
return False
dd_fmt = '<LQQ' if zip64 else '<LLL'
dd_size = struct.calcsize(dd_fmt)
# early return and prevent potential `fp.read(-1)`
if end_offset - dd_size < offset:
return None
try:
pos = self._trace_compressed_block_end(fp, offset, end_offset - dd_size, decompressor)
except Exception:
return None
fp.seek(pos)
dd = fp.read(dd_size)
try:
crc, compress_size, file_size = struct.unpack(dd_fmt, dd)
except struct.error:
return None
if pos - offset != compress_size:
return None
return crc, compress_size, file_size, dd_size
def _trace_compressed_block_end(self, fp, offset, end_offset, decompressor,
chunk_size=io.DEFAULT_BUFFER_SIZE):
fp.seek(offset)
read_size = 0
while True:
chunk = fp.read(min(chunk_size, end_offset - offset - read_size))
if not chunk:
raise EOFError('Unexpected EOF while decompressing')
# may raise on error
decompressor.decompress(chunk)
read_size += len(chunk)
if decompressor.eof:
unused_len = len(decompressor.unused_data)
return offset + read_size - unused_len
def _calc_local_file_entry_size(self, fp, zinfo):
fp.seek(zinfo.header_offset)
fheader = self._read_local_file_header(fp)
if zinfo.flag_bits & _MASK_USE_DATA_DESCRIPTOR:
zip64 = fheader[_FH_UNCOMPRESSED_SIZE] == 0xffffffff
dd_fmt = '<LLQQ' if zip64 else '<LLLL'
fp.seek(
fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] +
zinfo.compress_size,
os.SEEK_CUR,
)
if fp.read(struct.calcsize('<L')) != struct.pack('<L', _DD_SIGNATURE):
dd_fmt = '<LQQ' if zip64 else '<LLL'
dd_size = struct.calcsize(dd_fmt)
else:
dd_size = 0
return (
sizeFileHeader +
fheader[_FH_FILENAME_LENGTH] + fheader[_FH_EXTRA_FIELD_LENGTH] +
zinfo.compress_size +
dd_size
)
def _copy_bytes(self, fp, old_offset, new_offset, size):
read_size = 0
while read_size < size:
fp.seek(old_offset + read_size)
data = fp.read(min(size - read_size, self.chunk_size))
fp.seek(new_offset + read_size)
fp.write(data)
fp.flush()
read_size += len(data)
class ZipFile:
""" Class with methods to open, read, write, close, list zip files.
@ -1866,6 +2328,72 @@ def extractall(self, path=None, members=None, pwd=None):
for zipinfo in members:
self._extract_member(zipinfo, path, pwd)
def remove(self, zinfo_or_arcname):
"""Remove a member from the archive."""
if self.mode not in ('w', 'x', 'a'):
raise ValueError("remove() requires mode 'w', 'x', or 'a'")
if not self.fp:
raise ValueError(
"Attempt to write to ZIP archive that was already closed")
if self._writing:
raise ValueError(
"Can't write to ZIP archive while an open writing handle exists."
)
with self._lock:
# get the zinfo
if isinstance(zinfo_or_arcname, ZipInfo):
zinfo = zinfo_or_arcname
else:
# raise KeyError if arcname does not exist
zinfo = self.getinfo(zinfo_or_arcname)
try:
self.filelist.remove(zinfo)
except ValueError:
raise KeyError('There is no item %r in the archive' % zinfo) from None
try:
del self.NameToInfo[zinfo.filename]
except KeyError:
pass
# Avoid missing entry if there is another entry having the same name,
# to prevent an error on `testzip()`.
# Reverse the order as NameToInfo normally stores the last added one.
for zi in reversed(self.filelist):
if zi.filename == zinfo.filename:
self.NameToInfo.setdefault(zi.filename, zi)
break
self._didModify = True
return zinfo
def repack(self, removed=None, **opts):
"""Repack a zip file, removing non-referenced file entries.
The archive must be opened with mode 'a', as mode 'w'/'x' do not
truncate the file when closed. This cannot be simplely changed as
they may be used on an unseekable file buffer, which disallows
truncation."""
if self.mode != 'a':
raise ValueError("repack() requires mode 'a'")
if not self.fp:
raise ValueError(
"Attempt to write to ZIP archive that was already closed")
if self._writing:
raise ValueError(
"Can't write to ZIP archive while an open writing handle exists"
)
with self._lock:
self._writing = True
try:
_ZipRepacker(**opts).repack(self, removed)
finally:
self._writing = False
@classmethod
def _sanitize_windows_name(cls, arcname, pathsep):
"""Replace bad characters and remove trailing dots from parts."""

View file

@ -0,0 +1 @@
Add :meth:`~zipfile.ZipFile.remove` and :meth:`~zipfile.ZipFile.repack` to :class:`~zipfile.ZipFile`.