mirror of
https://github.com/python/cpython.git
synced 2026-06-27 19:36:07 +00:00
[3.15] bpo-45509: Check gzip headers for corrupted fields (GH-29028) (GH-149769)
Check the header checksum it the HCRC field is present.
(cherry picked from commit dd94457893)
Co-authored-by: Ruben Vorderman <r.h.p.vorderman@lumc.nl>
This commit is contained in:
parent
a5f77a13fd
commit
37f3deb571
3 changed files with 67 additions and 14 deletions
51
Lib/gzip.py
51
Lib/gzip.py
|
|
@ -484,40 +484,63 @@ def _read_exact(fp, n):
|
|||
return data
|
||||
|
||||
|
||||
def _read_until_null(fp, append_to):
|
||||
'''Read until the first encountered null byte in fp.
|
||||
Append to given byte array object'''
|
||||
while True:
|
||||
s = fp.read(1)
|
||||
append_to += s
|
||||
if not s or s == b'\000':
|
||||
break
|
||||
|
||||
|
||||
def _read_gzip_header(fp):
|
||||
'''Read a gzip header from `fp` and progress to the end of the header.
|
||||
|
||||
Returns last mtime if header was present or None otherwise.
|
||||
'''
|
||||
magic = fp.read(2)
|
||||
if magic == b'':
|
||||
if not magic:
|
||||
return None
|
||||
|
||||
if magic != b'\037\213':
|
||||
raise BadGzipFile('Not a gzipped file (%r)' % magic)
|
||||
|
||||
(method, flag, last_mtime) = struct.unpack("<BBIxx", _read_exact(fp, 8))
|
||||
base_header = _read_exact(fp, 8)
|
||||
(method, flag, last_mtime) = struct.unpack("<BBIxx", base_header)
|
||||
if method != 8:
|
||||
raise BadGzipFile('Unknown compression method')
|
||||
|
||||
if flag & FEXTRA:
|
||||
# Read & discard the extra field, if present
|
||||
extra_len, = struct.unpack("<H", _read_exact(fp, 2))
|
||||
_read_exact(fp, extra_len)
|
||||
if flag & FNAME:
|
||||
# Most common cases are no flags (gzip.compress, zlib.compress) or only
|
||||
# FNAME set (GzipFile, gzip command line application). Exit early
|
||||
# in those cases.
|
||||
if not flag:
|
||||
return last_mtime
|
||||
if flag == FNAME:
|
||||
# Read and discard a null-terminated string containing the filename
|
||||
while True:
|
||||
s = fp.read(1)
|
||||
if not s or s==b'\000':
|
||||
break
|
||||
return last_mtime
|
||||
|
||||
# Processing for more complex flags. Save header parts for FHCRC checking.
|
||||
header = bytearray(magic + base_header)
|
||||
if flag & FEXTRA:
|
||||
extra_len_bytes = _read_exact(fp, 2)
|
||||
extra_len, = struct.unpack("<H", extra_len_bytes)
|
||||
header += extra_len_bytes
|
||||
header += _read_exact(fp, extra_len)
|
||||
if flag & FNAME:
|
||||
_read_until_null(fp, append_to=header)
|
||||
if flag & FCOMMENT:
|
||||
# Read and discard a null-terminated string containing a comment
|
||||
while True:
|
||||
s = fp.read(1)
|
||||
if not s or s==b'\000':
|
||||
break
|
||||
_read_until_null(fp, append_to=header)
|
||||
if flag & FHCRC:
|
||||
_read_exact(fp, 2) # Read & discard the 16-bit header CRC
|
||||
# Header CRC is the last 16 bits of a crc32.
|
||||
header_crc, = struct.unpack("<H", _read_exact(fp, 2))
|
||||
true_crc = zlib.crc32(header) & 0xFFFF
|
||||
if header_crc != true_crc:
|
||||
raise BadGzipFile(f"Corrupted gzip header. Checksums do not "
|
||||
f"match: {true_crc:04x} != {header_crc:04x}")
|
||||
return last_mtime
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -795,6 +795,35 @@ def test_decompress_missing_trailer(self):
|
|||
compressed_data = gzip.compress(data1)
|
||||
self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
|
||||
|
||||
def test_truncated_header(self):
|
||||
truncated_headers = [
|
||||
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00", # Missing OS byte
|
||||
b"\x1f\x8b\x08\x02\x00\x00\x00\x00\x00\xff", # FHRC, but no checksum
|
||||
b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff", # FEXTRA, but no xlen
|
||||
b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\xaa\x00", # FEXTRA, xlen, but no data
|
||||
b"\x1f\x8b\x08\x08\x00\x00\x00\x00\x00\xff", # FNAME but no fname
|
||||
b"\x1f\x8b\x08\x10\x00\x00\x00\x00\x00\xff", # FCOMMENT, but no fcomment
|
||||
]
|
||||
for header in truncated_headers:
|
||||
with self.subTest(header=header):
|
||||
with self.assertRaises(EOFError):
|
||||
gzip.decompress(header)
|
||||
|
||||
def test_corrupted_gzip_header(self):
|
||||
header = (b"\x1f\x8b\x08\x1f\x00\x00\x00\x00\x00\xff" # All flags set
|
||||
b"\x05\x00" # Xlen = 5
|
||||
b"extra"
|
||||
b"name\x00"
|
||||
b"comment\x00")
|
||||
true_crc = zlib.crc32(header) & 0xFFFF
|
||||
corrupted_crc = true_crc ^ 0xFFFF
|
||||
corrupted_header = header + corrupted_crc.to_bytes(2, "little")
|
||||
with self.assertRaises(gzip.BadGzipFile) as err:
|
||||
gzip.decompress(corrupted_header)
|
||||
self.assertEqual(str(err.exception),
|
||||
f"Corrupted gzip header. Checksums do not "
|
||||
f"match: {true_crc:04x} != {corrupted_crc:04x}")
|
||||
|
||||
def test_read_truncated(self):
|
||||
data = data1*50
|
||||
# Drop the CRC (4 bytes) and file size (4 bytes).
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
Gzip headers are now checked for corrupted NAME, COMMENT and HCRC fields.
|
||||
Loading…
Add table
Add a link
Reference in a new issue