import sys import os import marshal import glob import importlib import importlib.util import re import struct import time import unittest import unittest.mock from test import support from test.support import import_helper from test.support import os_helper from zipfile import ZipFile, ZipInfo, ZIP_STORED, ZIP_DEFLATED, ZIP_ZSTANDARD import zipimport import linecache import doctest import inspect import io from traceback import extract_tb, extract_stack, print_tb try: import zlib except ImportError: zlib = None test_src = """\ def get_name(): return __name__ def get_file(): return __file__ """ test_co = compile(test_src, "", "exec") raise_src = 'def do_raise(): raise TypeError\n' def make_pyc(co, mtime, size): data = marshal.dumps(co) pyc = (importlib.util.MAGIC_NUMBER + struct.pack("", "exec"), NOW, len(src)) files = {TESTMOD + pyc_ext: pyc, "some.data": "some data"} self.doTest(pyc_ext, files, TESTMOD, prefix='') def testDefaultOptimizationLevel(self): # zipimport should use the default optimization level (#28131) src = """if 1: # indent hack def test(val): assert(val) return val\n""" files = {TESTMOD + '.py': src} self.makeZip(files) sys.path.insert(0, TEMP_ZIP) mod = importlib.import_module(TESTMOD) self.assertEqual(mod.test(1), 1) if __debug__: self.assertRaises(AssertionError, mod.test, False) else: self.assertEqual(mod.test(0), 0) def testImport_WithStuff(self): # try importing from a zipfile which contains additional # stuff at the beginning of the file files = {TESTMOD + ".py": test_src} self.doTest(".py", files, TESTMOD, stuff=b"Some Stuff"*31) def assertModuleSource(self, module): self.assertEqual(inspect.getsource(module), test_src) def testGetSource(self): files = {TESTMOD + ".py": test_src} self.doTest(".py", files, TESTMOD, call=self.assertModuleSource) def testGetCompiledSource(self): pyc = make_pyc(compile(test_src, "", "exec"), NOW, len(test_src)) files = {TESTMOD + ".py": test_src, TESTMOD + pyc_ext: pyc} self.doTest(pyc_ext, files, TESTMOD, call=self.assertModuleSource) def runDoctest(self, callback): files = {TESTMOD + ".py": test_src, "xyz.txt": ">>> log.append(True)\n"} self.doTest(".py", files, TESTMOD, call=callback) def doDoctestFile(self, module): log = [] old_master, doctest.master = doctest.master, None try: doctest.testfile( 'xyz.txt', package=module, module_relative=True, globs=locals() ) finally: doctest.master = old_master self.assertEqual(log,[True]) def testDoctestFile(self): self.runDoctest(self.doDoctestFile) def doDoctestSuite(self, module): log = [] doctest.DocFileTest( 'xyz.txt', package=module, module_relative=True, globs=locals() ).run() self.assertEqual(log,[True]) def testDoctestSuite(self): self.runDoctest(self.doDoctestSuite) def doTraceback(self, module): try: module.do_raise() except Exception as e: tb = e.__traceback__.tb_next f,lno,n,line = extract_tb(tb, 1)[0] self.assertEqual(line, raise_src.strip()) f,lno,n,line = extract_stack(tb.tb_frame, 1)[0] self.assertEqual(line, raise_src.strip()) s = io.StringIO() print_tb(tb, 1, s) self.assertEndsWith(s.getvalue(), ' def do_raise(): raise TypeError\n' '' if support.has_no_debug_ranges() else ' ^^^^^^^^^^^^^^^\n' ) else: raise AssertionError("This ought to be impossible") def testTraceback(self): files = {TESTMOD + ".py": raise_src} self.doTest(None, files, TESTMOD, call=self.doTraceback) @unittest.skipIf(os_helper.TESTFN_UNENCODABLE is None, "need an unencodable filename") def testUnencodable(self): filename = os_helper.TESTFN_UNENCODABLE + ".zip" self.makeZip({TESTMOD + ".py": test_src}, filename) spec = zipimport.zipimporter(filename).find_spec(TESTMOD) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) def testBytesPath(self): filename = os_helper.TESTFN + ".zip" self.makeZip({TESTMOD + ".py": test_src}, filename) zipimport.zipimporter(filename) with self.assertRaises(TypeError): zipimport.zipimporter(os.fsencode(filename)) with self.assertRaises(TypeError): zipimport.zipimporter(bytearray(os.fsencode(filename))) with self.assertRaises(TypeError): zipimport.zipimporter(memoryview(os.fsencode(filename))) def testComment(self): files = {TESTMOD + ".py": test_src} self.doTest(".py", files, TESTMOD, comment=b"comment") def testBeginningCruftAndComment(self): files = {TESTMOD + ".py": test_src} self.doTest(".py", files, TESTMOD, stuff=b"cruft" * 64, comment=b"hi") def testLargestPossibleComment(self): files = {TESTMOD + ".py": test_src} self.doTest(".py", files, TESTMOD, comment=b"c" * ((1 << 16) - 1)) @support.requires_resource('cpu') def testZip64(self): files = self.getZip64Files() self.doTest(".py", files, "f6") @support.requires_resource('cpu') def testZip64CruftAndComment(self): files = self.getZip64Files() self.doTest(".py", files, "f65536", comment=b"c" * ((1 << 16) - 1)) def testZip64LargeFile(self): support.requires( "largefile", f"test generates files >{0xFFFFFFFF} bytes and takes a long time " "to run" ) # N.B.: We do a lot of gymnastics below in the ZIP_STORED case to save # and reconstruct a sparse zip on systems that support sparse files. # Instead of creating a ~8GB zip file mainly consisting of null bytes # for every run of the test, we create the zip once and save off the # non-null portions of the resulting file as data blobs with offsets # that allow re-creating the zip file sparsely. This drops disk space # usage to ~9KB for the ZIP_STORED case and drops that test time by ~2 # orders of magnitude. For the ZIP_DEFLATED case, however, we bite the # bullet. The resulting zip file is ~8MB of non-null data; so the sparse # trick doesn't work and would result in that full ~8MB zip data file # being checked in to source control. parts_glob = f"sparse-zip64-c{self.compression:d}-0x*.part" full_parts_glob = os.path.join(TEST_DATA_DIR, parts_glob) pre_built_zip_parts = glob.glob(full_parts_glob) self.addCleanup(os_helper.unlink, TEMP_ZIP) if not pre_built_zip_parts: if self.compression != ZIP_STORED: support.requires( "cpu", "test requires a lot of CPU for compression." ) self.addCleanup(os_helper.unlink, os_helper.TESTFN) with open(os_helper.TESTFN, "wb") as f: f.write(b"data") f.write(os.linesep.encode()) f.seek(0xffff_ffff, os.SEEK_CUR) f.write(os.linesep.encode()) os.utime(os_helper.TESTFN, (0.0, 0.0)) with ZipFile( TEMP_ZIP, "w", compression=self.compression, strict_timestamps=False ) as z: z.write(os_helper.TESTFN, "data1") z.writestr( ZipInfo("module.py", (1980, 1, 1, 0, 0, 0)), test_src ) z.write(os_helper.TESTFN, "data2") # This "works" but relies on the zip format having a non-empty # final page due to the trailing central directory to wind up with # the correct length file. def make_sparse_zip_parts(name): empty_page = b"\0" * 4096 with open(name, "rb") as f: part = None try: while True: offset = f.tell() data = f.read(len(empty_page)) if not data: break if data != empty_page: if not part: part_fullname = os.path.join( TEST_DATA_DIR, f"sparse-zip64-c{self.compression:d}-" f"{offset:#011x}.part", ) os.makedirs( os.path.dirname(part_fullname), exist_ok=True ) part = open(part_fullname, "wb") print("Created", part_fullname) part.write(data) else: if part: part.close() part = None finally: if part: part.close() if self.compression == ZIP_STORED: print(f"Creating sparse parts to check in into {TEST_DATA_DIR}:") make_sparse_zip_parts(TEMP_ZIP) else: def extract_offset(name): if m := re.search(r"-(0x[0-9a-f]{9})\.part$", name): return int(m.group(1), base=16) raise ValueError(f"{name=} does not fit expected pattern.") offset_parts = [(extract_offset(n), n) for n in pre_built_zip_parts] with open(TEMP_ZIP, "wb") as f: for offset, part_fn in sorted(offset_parts): with open(part_fn, "rb") as part: f.seek(offset, os.SEEK_SET) f.write(part.read()) # Confirm that the reconstructed zip file works and looks right. with ZipFile(TEMP_ZIP, "r") as z: self.assertEqual( z.getinfo("module.py").date_time, (1980, 1, 1, 0, 0, 0) ) self.assertEqual( z.read("module.py"), test_src.encode(), msg=f"Recreate {full_parts_glob}, unexpected contents." ) def assertDataEntry(name): zinfo = z.getinfo(name) self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) self.assertGreater(zinfo.file_size, 0xffff_ffff) assertDataEntry("data1") assertDataEntry("data2") self.doTestWithPreBuiltZip(".py", "module") @support.requires_zlib() class DeflateCompressedZipImportTestCase(UncompressedZipImportTestCase): compression = ZIP_DEFLATED @support.requires_zstd() class ZStdCompressedZipImportTestCase(UncompressedZipImportTestCase): compression = ZIP_ZSTANDARD class BadFileZipImportTestCase(unittest.TestCase): def assertZipFailure(self, filename): self.assertRaises(zipimport.ZipImportError, zipimport.zipimporter, filename) def testNoFile(self): self.assertZipFailure('AdfjdkFJKDFJjdklfjs') def testEmptyFilename(self): self.assertZipFailure('') def testBadArgs(self): self.assertRaises(TypeError, zipimport.zipimporter, None) self.assertRaises(TypeError, zipimport.zipimporter, TESTMOD, kwd=None) self.assertRaises(TypeError, zipimport.zipimporter, list(os.fsencode(TESTMOD))) def testFilenameTooLong(self): self.assertZipFailure('A' * 33000) def testEmptyFile(self): os_helper.unlink(TESTMOD) os_helper.create_empty_file(TESTMOD) self.assertZipFailure(TESTMOD) @unittest.skipIf(support.is_wasi, "mode 000 not supported.") def testFileUnreadable(self): os_helper.unlink(TESTMOD) fd = os.open(TESTMOD, os.O_CREAT, 000) try: os.close(fd) with self.assertRaises(zipimport.ZipImportError) as cm: zipimport.zipimporter(TESTMOD) finally: # If we leave "the read-only bit" set on Windows, nothing can # delete TESTMOD, and later tests suffer bogus failures. os.chmod(TESTMOD, 0o666) os_helper.unlink(TESTMOD) def testNotZipFile(self): os_helper.unlink(TESTMOD) fp = open(TESTMOD, 'w+') fp.write('a' * 22) fp.close() self.assertZipFailure(TESTMOD) # XXX: disabled until this works on Big-endian machines def _testBogusZipFile(self): os_helper.unlink(TESTMOD) fp = open(TESTMOD, 'w+') fp.write(struct.pack('=I', 0x06054B50)) fp.write('a' * 18) fp.close() z = zipimport.zipimporter(TESTMOD) try: self.assertRaises(TypeError, z.find_module, None) self.assertRaises(TypeError, z.find_spec, None) self.assertRaises(TypeError, z.exec_module, None) self.assertRaises(TypeError, z.is_package, None) self.assertRaises(TypeError, z.get_code, None) self.assertRaises(TypeError, z.get_data, None) self.assertRaises(TypeError, z.get_source, None) error = zipimport.ZipImportError self.assertIsNone(z.find_spec('abc')) self.assertRaises(error, z.get_code, 'abc') self.assertRaises(OSError, z.get_data, 'abc') self.assertRaises(error, z.get_source, 'abc') self.assertRaises(error, z.is_package, 'abc') finally: zipimport._zip_directory_cache.clear() def tearDownModule(): os_helper.unlink(TESTMOD) if __name__ == "__main__": unittest.main()