mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
[3.13] gh-139700: Check consistency of the zip64 end of central directory record (GH-139702) (GH-139708)
Support records with "zip64 extensible data" if there are no bytes
prepended to the ZIP file.
(cherry picked from commit 162997bb70)
This commit is contained in:
parent
527623e827
commit
333d4a6f49
3 changed files with 113 additions and 23 deletions
|
|
@ -884,6 +884,8 @@ def make_zip64_file(
|
||||||
self, file_size_64_set=False, file_size_extra=False,
|
self, file_size_64_set=False, file_size_extra=False,
|
||||||
compress_size_64_set=False, compress_size_extra=False,
|
compress_size_64_set=False, compress_size_extra=False,
|
||||||
header_offset_64_set=False, header_offset_extra=False,
|
header_offset_64_set=False, header_offset_extra=False,
|
||||||
|
extensible_data=b'',
|
||||||
|
end_of_central_dir_size=None, offset_to_end_of_central_dir=None,
|
||||||
):
|
):
|
||||||
"""Generate bytes sequence for a zip with (incomplete) zip64 data.
|
"""Generate bytes sequence for a zip with (incomplete) zip64 data.
|
||||||
|
|
||||||
|
|
@ -937,6 +939,12 @@ def make_zip64_file(
|
||||||
|
|
||||||
central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
|
central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
|
||||||
offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
|
offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
|
||||||
|
if end_of_central_dir_size is None:
|
||||||
|
end_of_central_dir_size = 44 + len(extensible_data)
|
||||||
|
if offset_to_end_of_central_dir is None:
|
||||||
|
offset_to_end_of_central_dir = (108
|
||||||
|
+ 8 * len(local_zip64_fields)
|
||||||
|
+ 8 * len(central_zip64_fields))
|
||||||
|
|
||||||
local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
|
local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
|
||||||
central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
|
central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
|
||||||
|
|
@ -965,14 +973,17 @@ def make_zip64_file(
|
||||||
+ filename
|
+ filename
|
||||||
+ central_extra
|
+ central_extra
|
||||||
# Zip64 end of central directory
|
# Zip64 end of central directory
|
||||||
+ b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
|
+ b"PK\x06\x06"
|
||||||
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
|
+ struct.pack('<Q', end_of_central_dir_size)
|
||||||
|
+ b"-\x00-\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
|
||||||
+ b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
|
+ b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
|
||||||
+ central_dir_size
|
+ central_dir_size
|
||||||
+ offset_to_central_dir
|
+ offset_to_central_dir
|
||||||
|
+ extensible_data
|
||||||
# Zip64 end of central directory locator
|
# Zip64 end of central directory locator
|
||||||
+ b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
|
+ b"PK\x06\x07\x00\x00\x00\x00"
|
||||||
+ b"\x00\x00\x00"
|
+ struct.pack('<Q', offset_to_end_of_central_dir)
|
||||||
|
+ b"\x01\x00\x00\x00"
|
||||||
# end of central directory
|
# end of central directory
|
||||||
+ b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
|
+ b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
|
||||||
+ b"\x00\x00\x00\x00"
|
+ b"\x00\x00\x00\x00"
|
||||||
|
|
@ -1003,6 +1014,7 @@ def test_bad_zip64_extra(self):
|
||||||
with self.assertRaises(zipfile.BadZipFile) as e:
|
with self.assertRaises(zipfile.BadZipFile) as e:
|
||||||
zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
|
zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
|
||||||
self.assertIn('file size', str(e.exception).lower())
|
self.assertIn('file size', str(e.exception).lower())
|
||||||
|
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_file_size_extra)))
|
||||||
|
|
||||||
# zip64 file size present, zip64 compress size present, one field in
|
# zip64 file size present, zip64 compress size present, one field in
|
||||||
# extra, expecting two, equals missing compress size.
|
# extra, expecting two, equals missing compress size.
|
||||||
|
|
@ -1014,6 +1026,7 @@ def test_bad_zip64_extra(self):
|
||||||
with self.assertRaises(zipfile.BadZipFile) as e:
|
with self.assertRaises(zipfile.BadZipFile) as e:
|
||||||
zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
|
zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
|
||||||
self.assertIn('compress size', str(e.exception).lower())
|
self.assertIn('compress size', str(e.exception).lower())
|
||||||
|
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_compress_size_extra)))
|
||||||
|
|
||||||
# zip64 compress size present, no fields in extra, expecting one,
|
# zip64 compress size present, no fields in extra, expecting one,
|
||||||
# equals missing compress size.
|
# equals missing compress size.
|
||||||
|
|
@ -1023,6 +1036,7 @@ def test_bad_zip64_extra(self):
|
||||||
with self.assertRaises(zipfile.BadZipFile) as e:
|
with self.assertRaises(zipfile.BadZipFile) as e:
|
||||||
zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
|
zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
|
||||||
self.assertIn('compress size', str(e.exception).lower())
|
self.assertIn('compress size', str(e.exception).lower())
|
||||||
|
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_compress_size_extra)))
|
||||||
|
|
||||||
# zip64 file size present, zip64 compress size present, zip64 header
|
# zip64 file size present, zip64 compress size present, zip64 header
|
||||||
# offset present, two fields in extra, expecting three, equals missing
|
# offset present, two fields in extra, expecting three, equals missing
|
||||||
|
|
@ -1037,6 +1051,7 @@ def test_bad_zip64_extra(self):
|
||||||
with self.assertRaises(zipfile.BadZipFile) as e:
|
with self.assertRaises(zipfile.BadZipFile) as e:
|
||||||
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
|
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
|
||||||
self.assertIn('header offset', str(e.exception).lower())
|
self.assertIn('header offset', str(e.exception).lower())
|
||||||
|
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))
|
||||||
|
|
||||||
# zip64 compress size present, zip64 header offset present, one field
|
# zip64 compress size present, zip64 header offset present, one field
|
||||||
# in extra, expecting two, equals missing header offset
|
# in extra, expecting two, equals missing header offset
|
||||||
|
|
@ -1049,6 +1064,7 @@ def test_bad_zip64_extra(self):
|
||||||
with self.assertRaises(zipfile.BadZipFile) as e:
|
with self.assertRaises(zipfile.BadZipFile) as e:
|
||||||
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
|
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
|
||||||
self.assertIn('header offset', str(e.exception).lower())
|
self.assertIn('header offset', str(e.exception).lower())
|
||||||
|
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))
|
||||||
|
|
||||||
# zip64 file size present, zip64 header offset present, one field in
|
# zip64 file size present, zip64 header offset present, one field in
|
||||||
# extra, expecting two, equals missing header offset
|
# extra, expecting two, equals missing header offset
|
||||||
|
|
@ -1061,6 +1077,7 @@ def test_bad_zip64_extra(self):
|
||||||
with self.assertRaises(zipfile.BadZipFile) as e:
|
with self.assertRaises(zipfile.BadZipFile) as e:
|
||||||
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
|
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
|
||||||
self.assertIn('header offset', str(e.exception).lower())
|
self.assertIn('header offset', str(e.exception).lower())
|
||||||
|
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))
|
||||||
|
|
||||||
# zip64 header offset present, no fields in extra, expecting one,
|
# zip64 header offset present, no fields in extra, expecting one,
|
||||||
# equals missing header offset
|
# equals missing header offset
|
||||||
|
|
@ -1072,6 +1089,63 @@ def test_bad_zip64_extra(self):
|
||||||
with self.assertRaises(zipfile.BadZipFile) as e:
|
with self.assertRaises(zipfile.BadZipFile) as e:
|
||||||
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
|
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
|
||||||
self.assertIn('header offset', str(e.exception).lower())
|
self.assertIn('header offset', str(e.exception).lower())
|
||||||
|
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))
|
||||||
|
|
||||||
|
def test_bad_zip64_end_of_central_dir(self):
|
||||||
|
zipdata = self.make_zip64_file(end_of_central_dir_size=0)
|
||||||
|
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*record'):
|
||||||
|
zipfile.ZipFile(io.BytesIO(zipdata))
|
||||||
|
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
|
||||||
|
|
||||||
|
zipdata = self.make_zip64_file(end_of_central_dir_size=100)
|
||||||
|
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*record'):
|
||||||
|
zipfile.ZipFile(io.BytesIO(zipdata))
|
||||||
|
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
|
||||||
|
|
||||||
|
zipdata = self.make_zip64_file(offset_to_end_of_central_dir=0)
|
||||||
|
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*record'):
|
||||||
|
zipfile.ZipFile(io.BytesIO(zipdata))
|
||||||
|
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
|
||||||
|
|
||||||
|
zipdata = self.make_zip64_file(offset_to_end_of_central_dir=1000)
|
||||||
|
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*locator'):
|
||||||
|
zipfile.ZipFile(io.BytesIO(zipdata))
|
||||||
|
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
|
||||||
|
|
||||||
|
def test_zip64_end_of_central_dir_record_not_found(self):
|
||||||
|
zipdata = self.make_zip64_file()
|
||||||
|
zipdata = zipdata.replace(b"PK\x06\x06", b'\x00'*4)
|
||||||
|
with self.assertRaisesRegex(zipfile.BadZipFile, 'record not found'):
|
||||||
|
zipfile.ZipFile(io.BytesIO(zipdata))
|
||||||
|
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
|
||||||
|
|
||||||
|
zipdata = self.make_zip64_file(
|
||||||
|
extensible_data=b'\xca\xfe\x04\x00\x00\x00data')
|
||||||
|
zipdata = zipdata.replace(b"PK\x06\x06", b'\x00'*4)
|
||||||
|
with self.assertRaisesRegex(zipfile.BadZipFile, 'record not found'):
|
||||||
|
zipfile.ZipFile(io.BytesIO(zipdata))
|
||||||
|
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))
|
||||||
|
|
||||||
|
def test_zip64_extensible_data(self):
|
||||||
|
# These values are what is set in the make_zip64_file method.
|
||||||
|
expected_file_size = 8
|
||||||
|
expected_compress_size = 8
|
||||||
|
expected_header_offset = 0
|
||||||
|
expected_content = b"test1234"
|
||||||
|
|
||||||
|
zipdata = self.make_zip64_file(
|
||||||
|
extensible_data=b'\xca\xfe\x04\x00\x00\x00data')
|
||||||
|
with zipfile.ZipFile(io.BytesIO(zipdata)) as zf:
|
||||||
|
zinfo = zf.infolist()[0]
|
||||||
|
self.assertEqual(zinfo.file_size, expected_file_size)
|
||||||
|
self.assertEqual(zinfo.compress_size, expected_compress_size)
|
||||||
|
self.assertEqual(zinfo.header_offset, expected_header_offset)
|
||||||
|
self.assertEqual(zf.read(zinfo), expected_content)
|
||||||
|
self.assertTrue(zipfile.is_zipfile(io.BytesIO(zipdata)))
|
||||||
|
|
||||||
|
with self.assertRaisesRegex(zipfile.BadZipFile, 'record not found'):
|
||||||
|
zipfile.ZipFile(io.BytesIO(b'prepended' + zipdata))
|
||||||
|
self.assertFalse(zipfile.is_zipfile(io.BytesIO(b'prepended' + zipdata)))
|
||||||
|
|
||||||
def test_generated_valid_zip64_extra(self):
|
def test_generated_valid_zip64_extra(self):
|
||||||
# These values are what is set in the make_zip64_file method.
|
# These values are what is set in the make_zip64_file method.
|
||||||
|
|
|
||||||
|
|
@ -245,7 +245,7 @@ def is_zipfile(filename):
|
||||||
else:
|
else:
|
||||||
with open(filename, "rb") as fp:
|
with open(filename, "rb") as fp:
|
||||||
result = _check_zipfile(fp)
|
result = _check_zipfile(fp)
|
||||||
except OSError:
|
except (OSError, BadZipFile):
|
||||||
pass
|
pass
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
@ -253,16 +253,15 @@ def _EndRecData64(fpin, offset, endrec):
|
||||||
"""
|
"""
|
||||||
Read the ZIP64 end-of-archive records and use that to update endrec
|
Read the ZIP64 end-of-archive records and use that to update endrec
|
||||||
"""
|
"""
|
||||||
try:
|
offset -= sizeEndCentDir64Locator
|
||||||
fpin.seek(offset - sizeEndCentDir64Locator, 2)
|
if offset < 0:
|
||||||
except OSError:
|
# The file is not large enough to contain a ZIP64
|
||||||
# If the seek fails, the file is not large enough to contain a ZIP64
|
|
||||||
# end-of-archive record, so just return the end record we were given.
|
# end-of-archive record, so just return the end record we were given.
|
||||||
return endrec
|
return endrec
|
||||||
|
fpin.seek(offset)
|
||||||
data = fpin.read(sizeEndCentDir64Locator)
|
data = fpin.read(sizeEndCentDir64Locator)
|
||||||
if len(data) != sizeEndCentDir64Locator:
|
if len(data) != sizeEndCentDir64Locator:
|
||||||
return endrec
|
raise OSError("Unknown I/O error")
|
||||||
sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data)
|
sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data)
|
||||||
if sig != stringEndArchive64Locator:
|
if sig != stringEndArchive64Locator:
|
||||||
return endrec
|
return endrec
|
||||||
|
|
@ -270,16 +269,33 @@ def _EndRecData64(fpin, offset, endrec):
|
||||||
if diskno != 0 or disks > 1:
|
if diskno != 0 or disks > 1:
|
||||||
raise BadZipFile("zipfiles that span multiple disks are not supported")
|
raise BadZipFile("zipfiles that span multiple disks are not supported")
|
||||||
|
|
||||||
# Assume no 'zip64 extensible data'
|
offset -= sizeEndCentDir64
|
||||||
fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2)
|
if reloff > offset:
|
||||||
|
raise BadZipFile("Corrupt zip64 end of central directory locator")
|
||||||
|
# First, check the assumption that there is no prepended data.
|
||||||
|
fpin.seek(reloff)
|
||||||
|
extrasz = offset - reloff
|
||||||
data = fpin.read(sizeEndCentDir64)
|
data = fpin.read(sizeEndCentDir64)
|
||||||
if len(data) != sizeEndCentDir64:
|
if len(data) != sizeEndCentDir64:
|
||||||
return endrec
|
raise OSError("Unknown I/O error")
|
||||||
|
if not data.startswith(stringEndArchive64) and reloff != offset:
|
||||||
|
# Since we already have seen the Zip64 EOCD Locator, it's
|
||||||
|
# possible we got here because there is prepended data.
|
||||||
|
# Assume no 'zip64 extensible data'
|
||||||
|
fpin.seek(offset)
|
||||||
|
extrasz = 0
|
||||||
|
data = fpin.read(sizeEndCentDir64)
|
||||||
|
if len(data) != sizeEndCentDir64:
|
||||||
|
raise OSError("Unknown I/O error")
|
||||||
|
if not data.startswith(stringEndArchive64):
|
||||||
|
raise BadZipFile("Zip64 end of central directory record not found")
|
||||||
|
|
||||||
sig, sz, create_version, read_version, disk_num, disk_dir, \
|
sig, sz, create_version, read_version, disk_num, disk_dir, \
|
||||||
dircount, dircount2, dirsize, diroffset = \
|
dircount, dircount2, dirsize, diroffset = \
|
||||||
struct.unpack(structEndArchive64, data)
|
struct.unpack(structEndArchive64, data)
|
||||||
if sig != stringEndArchive64:
|
if (diroffset + dirsize != reloff or
|
||||||
return endrec
|
sz + 12 != sizeEndCentDir64 + extrasz):
|
||||||
|
raise BadZipFile("Corrupt zip64 end of central directory record")
|
||||||
|
|
||||||
# Update the original endrec using data from the ZIP64 record
|
# Update the original endrec using data from the ZIP64 record
|
||||||
endrec[_ECD_SIGNATURE] = sig
|
endrec[_ECD_SIGNATURE] = sig
|
||||||
|
|
@ -289,6 +305,7 @@ def _EndRecData64(fpin, offset, endrec):
|
||||||
endrec[_ECD_ENTRIES_TOTAL] = dircount2
|
endrec[_ECD_ENTRIES_TOTAL] = dircount2
|
||||||
endrec[_ECD_SIZE] = dirsize
|
endrec[_ECD_SIZE] = dirsize
|
||||||
endrec[_ECD_OFFSET] = diroffset
|
endrec[_ECD_OFFSET] = diroffset
|
||||||
|
endrec[_ECD_LOCATION] = offset - extrasz
|
||||||
return endrec
|
return endrec
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -322,7 +339,7 @@ def _EndRecData(fpin):
|
||||||
endrec.append(filesize - sizeEndCentDir)
|
endrec.append(filesize - sizeEndCentDir)
|
||||||
|
|
||||||
# Try to read the "Zip64 end of central directory" structure
|
# Try to read the "Zip64 end of central directory" structure
|
||||||
return _EndRecData64(fpin, -sizeEndCentDir, endrec)
|
return _EndRecData64(fpin, filesize - sizeEndCentDir, endrec)
|
||||||
|
|
||||||
# Either this is not a ZIP file, or it is a ZIP file with an archive
|
# Either this is not a ZIP file, or it is a ZIP file with an archive
|
||||||
# comment. Search the end of the file for the "end of central directory"
|
# comment. Search the end of the file for the "end of central directory"
|
||||||
|
|
@ -346,8 +363,7 @@ def _EndRecData(fpin):
|
||||||
endrec.append(maxCommentStart + start)
|
endrec.append(maxCommentStart + start)
|
||||||
|
|
||||||
# Try to read the "Zip64 end of central directory" structure
|
# Try to read the "Zip64 end of central directory" structure
|
||||||
return _EndRecData64(fpin, maxCommentStart + start - filesize,
|
return _EndRecData64(fpin, maxCommentStart + start, endrec)
|
||||||
endrec)
|
|
||||||
|
|
||||||
# Unable to find a valid end of central directory structure
|
# Unable to find a valid end of central directory structure
|
||||||
return None
|
return None
|
||||||
|
|
@ -1458,9 +1474,6 @@ def _RealGetContents(self):
|
||||||
|
|
||||||
# "concat" is zero, unless zip was concatenated to another file
|
# "concat" is zero, unless zip was concatenated to another file
|
||||||
concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
|
concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
|
||||||
if endrec[_ECD_SIGNATURE] == stringEndArchive64:
|
|
||||||
# If Zip64 extension structures are present, account for them
|
|
||||||
concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator)
|
|
||||||
|
|
||||||
if self.debug > 2:
|
if self.debug > 2:
|
||||||
inferred = concat + offset_cd
|
inferred = concat + offset_cd
|
||||||
|
|
@ -2082,7 +2095,7 @@ def _write_end_record(self):
|
||||||
" would require ZIP64 extensions")
|
" would require ZIP64 extensions")
|
||||||
zip64endrec = struct.pack(
|
zip64endrec = struct.pack(
|
||||||
structEndArchive64, stringEndArchive64,
|
structEndArchive64, stringEndArchive64,
|
||||||
44, 45, 45, 0, 0, centDirCount, centDirCount,
|
sizeEndCentDir64 - 12, 45, 45, 0, 0, centDirCount, centDirCount,
|
||||||
centDirSize, centDirOffset)
|
centDirSize, centDirOffset)
|
||||||
self.fp.write(zip64endrec)
|
self.fp.write(zip64endrec)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
Check consistency of the zip64 end of central directory record. Support
|
||||||
|
records with "zip64 extensible data" if there are no bytes prepended to the
|
||||||
|
ZIP file.
|
||||||
Loading…
Add table
Add a link
Reference in a new issue