mirror of
https://github.com/python/cpython.git
synced 2026-01-06 15:32:22 +00:00
gh-101566: Sync with zipp 3.14. (GH-102018)
This commit is contained in:
parent
84181c1404
commit
36854bbb24
6 changed files with 215 additions and 56 deletions
30
Lib/test/test_zipfile/_context.py
Normal file
30
Lib/test/test_zipfile/_context.py
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
import contextlib
|
||||
import time
|
||||
|
||||
|
||||
class DeadlineExceeded(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TimedContext(contextlib.ContextDecorator):
|
||||
"""
|
||||
A context that will raise DeadlineExceeded if the
|
||||
max duration is reached during the execution.
|
||||
|
||||
>>> TimedContext(1)(time.sleep)(.1)
|
||||
>>> TimedContext(0)(time.sleep)(.1)
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
tests._context.DeadlineExceeded: (..., 0)
|
||||
"""
|
||||
|
||||
def __init__(self, max_duration: int):
|
||||
self.max_duration = max_duration
|
||||
|
||||
def __enter__(self):
|
||||
self.start = time.monotonic()
|
||||
|
||||
def __exit__(self, *err):
|
||||
duration = time.monotonic() - self.start
|
||||
if duration > self.max_duration:
|
||||
raise DeadlineExceeded(duration, self.max_duration)
|
||||
8
Lib/test/test_zipfile/_func_timeout_compat.py
Normal file
8
Lib/test/test_zipfile/_func_timeout_compat.py
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
try:
|
||||
from func_timeout import func_set_timeout as set_timeout
|
||||
except ImportError: # pragma: no cover
|
||||
# provide a fallback that doesn't actually time out
|
||||
from ._context import TimedContext as set_timeout
|
||||
|
||||
|
||||
__all__ = ['set_timeout']
|
||||
|
|
@ -1,3 +1,32 @@
|
|||
import itertools
|
||||
|
||||
|
||||
# from jaraco.itertools 6.3.0
|
||||
class Counter:
|
||||
"""
|
||||
Wrap an iterable in an object that stores the count of items
|
||||
that pass through it.
|
||||
|
||||
>>> items = Counter(range(20))
|
||||
>>> items.count
|
||||
0
|
||||
>>> values = list(items)
|
||||
>>> items.count
|
||||
20
|
||||
"""
|
||||
|
||||
def __init__(self, i):
|
||||
self.count = 0
|
||||
self.iter = zip(itertools.count(1), i)
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
self.count, result = next(self.iter)
|
||||
return result
|
||||
|
||||
|
||||
# from more_itertools v8.13.0
|
||||
def always_iterable(obj, base_type=(str, bytes)):
|
||||
if obj is None:
|
||||
|
|
|
|||
|
|
@ -4,36 +4,25 @@
|
|||
import pathlib
|
||||
import pickle
|
||||
import string
|
||||
from test.support.script_helper import assert_python_ok
|
||||
import sys
|
||||
import unittest
|
||||
import zipfile
|
||||
|
||||
from ._test_params import parameterize, Invoked
|
||||
from ._functools import compose
|
||||
from ._itertools import Counter
|
||||
|
||||
from ._test_params import parameterize, Invoked
|
||||
from ._func_timeout_compat import set_timeout
|
||||
|
||||
from test.support.os_helper import temp_dir
|
||||
|
||||
|
||||
# Poor man's technique to consume a (smallish) iterable.
|
||||
consume = tuple
|
||||
|
||||
|
||||
# from jaraco.itertools 5.0
|
||||
class jaraco:
|
||||
class itertools:
|
||||
class Counter:
|
||||
def __init__(self, i):
|
||||
self.count = 0
|
||||
self._orig_iter = iter(i)
|
||||
Counter = Counter
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def __next__(self):
|
||||
result = next(self._orig_iter)
|
||||
self.count += 1
|
||||
return result
|
||||
consume = tuple
|
||||
|
||||
|
||||
def add_dirs(zf):
|
||||
|
|
@ -161,10 +150,10 @@ def test_open_encoding_utf16(self):
|
|||
u16 = path.joinpath("16.txt")
|
||||
with u16.open('r', "utf-16") as strm:
|
||||
data = strm.read()
|
||||
self.assertEqual(data, "This was utf-16")
|
||||
assert data == "This was utf-16"
|
||||
with u16.open(encoding="utf-16") as strm:
|
||||
data = strm.read()
|
||||
self.assertEqual(data, "This was utf-16")
|
||||
assert data == "This was utf-16"
|
||||
|
||||
def test_open_encoding_errors(self):
|
||||
in_memory_file = io.BytesIO()
|
||||
|
|
@ -177,9 +166,9 @@ def test_open_encoding_errors(self):
|
|||
|
||||
# encoding= as a positional argument for gh-101144.
|
||||
data = u16.read_text("utf-8", errors="ignore")
|
||||
self.assertEqual(data, "invalid utf-8: .")
|
||||
assert data == "invalid utf-8: ."
|
||||
with u16.open("r", "utf-8", errors="surrogateescape") as f:
|
||||
self.assertEqual(f.read(), "invalid utf-8: \udcff\udcff.")
|
||||
assert f.read() == "invalid utf-8: \udcff\udcff."
|
||||
|
||||
# encoding= both positional and keyword is an error; gh-101144.
|
||||
with self.assertRaisesRegex(TypeError, "encoding"):
|
||||
|
|
@ -191,24 +180,21 @@ def test_open_encoding_errors(self):
|
|||
with self.assertRaises(UnicodeDecodeError):
|
||||
f.read()
|
||||
|
||||
def test_encoding_warnings(self):
|
||||
@unittest.skipIf(
|
||||
not getattr(sys.flags, 'warn_default_encoding', 0),
|
||||
"Requires warn_default_encoding",
|
||||
)
|
||||
@pass_alpharep
|
||||
def test_encoding_warnings(self, alpharep):
|
||||
"""EncodingWarning must blame the read_text and open calls."""
|
||||
code = '''\
|
||||
import io, zipfile
|
||||
with zipfile.ZipFile(io.BytesIO(), "w") as zf:
|
||||
zf.filename = '<test_encoding_warnings in memory zip file>'
|
||||
zf.writestr("path/file.txt", b"Spanish Inquisition")
|
||||
root = zipfile.Path(zf)
|
||||
(path,) = root.iterdir()
|
||||
file_path = path.joinpath("file.txt")
|
||||
unused = file_path.read_text() # should warn
|
||||
file_path.open("r").close() # should warn
|
||||
'''
|
||||
proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code)
|
||||
warnings = proc.err.splitlines()
|
||||
self.assertEqual(len(warnings), 2, proc.err)
|
||||
self.assertRegex(warnings[0], rb"^<string>:8: EncodingWarning:")
|
||||
self.assertRegex(warnings[1], rb"^<string>:9: EncodingWarning:")
|
||||
assert sys.flags.warn_default_encoding
|
||||
root = zipfile.Path(alpharep)
|
||||
with self.assertWarns(EncodingWarning) as wc:
|
||||
root.joinpath("a.txt").read_text()
|
||||
assert __file__ == wc.filename
|
||||
with self.assertWarns(EncodingWarning) as wc:
|
||||
root.joinpath("a.txt").open("r").close()
|
||||
assert __file__ == wc.filename
|
||||
|
||||
def test_open_write(self):
|
||||
"""
|
||||
|
|
@ -250,7 +236,8 @@ def test_read(self, alpharep):
|
|||
root = zipfile.Path(alpharep)
|
||||
a, b, g = root.iterdir()
|
||||
assert a.read_text(encoding="utf-8") == "content of a"
|
||||
a.read_text("utf-8") # No positional arg TypeError per gh-101144.
|
||||
# Also check positional encoding arg (gh-101144).
|
||||
assert a.read_text("utf-8") == "content of a"
|
||||
assert a.read_bytes() == b"content of a"
|
||||
|
||||
@pass_alpharep
|
||||
|
|
@ -275,19 +262,6 @@ def test_traverse_truediv(self, alpharep):
|
|||
e = root / "b" / "d" / "e.txt"
|
||||
assert e.read_text(encoding="utf-8") == "content of e"
|
||||
|
||||
@pass_alpharep
|
||||
def test_traverse_simplediv(self, alpharep):
|
||||
"""
|
||||
Disable the __future__.division when testing traversal.
|
||||
"""
|
||||
code = compile(
|
||||
source="zipfile.Path(alpharep) / 'a'",
|
||||
filename="(test)",
|
||||
mode="eval",
|
||||
dont_inherit=True,
|
||||
)
|
||||
eval(code)
|
||||
|
||||
@pass_alpharep
|
||||
def test_pathlike_construction(self, alpharep):
|
||||
"""
|
||||
|
|
@ -356,7 +330,7 @@ def test_joinpath_constant_time(self):
|
|||
# Check the file iterated all items
|
||||
assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES
|
||||
|
||||
# @func_timeout.func_set_timeout(3)
|
||||
@set_timeout(3)
|
||||
def test_implied_dirs_performance(self):
|
||||
data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)]
|
||||
zipfile.CompleteDirs._implied_dirs(data)
|
||||
|
|
@ -472,6 +446,52 @@ def test_root_unnamed(self, alpharep):
|
|||
assert sub.name == "b"
|
||||
assert sub.parent
|
||||
|
||||
@pass_alpharep
|
||||
def test_match_and_glob(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert not root.match("*.txt")
|
||||
|
||||
assert list(root.glob("b/c.*")) == [zipfile.Path(alpharep, "b/c.txt")]
|
||||
|
||||
files = root.glob("**/*.txt")
|
||||
assert all(each.match("*.txt") for each in files)
|
||||
|
||||
assert list(root.glob("**/*.txt")) == list(root.rglob("*.txt"))
|
||||
|
||||
def test_glob_empty(self):
|
||||
root = zipfile.Path(zipfile.ZipFile(io.BytesIO(), 'w'))
|
||||
with self.assertRaises(ValueError):
|
||||
root.glob('')
|
||||
|
||||
@pass_alpharep
|
||||
def test_eq_hash(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root == zipfile.Path(alpharep)
|
||||
|
||||
assert root != (root / "a.txt")
|
||||
assert (root / "a.txt") == (root / "a.txt")
|
||||
|
||||
root = zipfile.Path(alpharep)
|
||||
assert root in {root}
|
||||
|
||||
@pass_alpharep
|
||||
def test_is_symlink(self, alpharep):
|
||||
"""
|
||||
See python/cpython#82102 for symlink support beyond this object.
|
||||
"""
|
||||
|
||||
root = zipfile.Path(alpharep)
|
||||
assert not root.is_symlink()
|
||||
|
||||
@pass_alpharep
|
||||
def test_relative_to(self, alpharep):
|
||||
root = zipfile.Path(alpharep)
|
||||
relative = root.joinpath("b", "c.txt").relative_to(root / "b")
|
||||
assert str(relative) == "c.txt"
|
||||
|
||||
relative = root.joinpath("b", "d", "e.txt").relative_to(root / "b")
|
||||
assert str(relative) == "d/e.txt"
|
||||
|
||||
@pass_alpharep
|
||||
def test_inheritance(self, alpharep):
|
||||
cls = type('PathChild', (zipfile.Path,), {})
|
||||
|
|
@ -493,3 +513,14 @@ def test_pickle(self, alpharep, path_type, subpath):
|
|||
restored_1 = pickle.loads(saved_1)
|
||||
first, *rest = restored_1.iterdir()
|
||||
assert first.read_text().startswith('content of ')
|
||||
|
||||
@pass_alpharep
|
||||
def test_extract_orig_with_implied_dirs(self, alpharep):
|
||||
"""
|
||||
A zip file wrapped in a Path should extract even with implied dirs.
|
||||
"""
|
||||
source_path = self.zipfile_ondisk(alpharep)
|
||||
zf = zipfile.ZipFile(source_path)
|
||||
# wrap the zipfile for its side effect
|
||||
zipfile.Path(zf)
|
||||
zf.extractall(source_path.parent)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,8 @@
|
|||
import itertools
|
||||
import contextlib
|
||||
import pathlib
|
||||
import re
|
||||
import fnmatch
|
||||
|
||||
|
||||
__all__ = ['Path']
|
||||
|
|
@ -93,7 +95,7 @@ def _implied_dirs(names):
|
|||
return _dedupe(_difference(as_dirs, names))
|
||||
|
||||
def namelist(self):
|
||||
names = super(CompleteDirs, self).namelist()
|
||||
names = super().namelist()
|
||||
return names + list(self._implied_dirs(names))
|
||||
|
||||
def _name_set(self):
|
||||
|
|
@ -109,6 +111,17 @@ def resolve_dir(self, name):
|
|||
dir_match = name not in names and dirname in names
|
||||
return dirname if dir_match else name
|
||||
|
||||
def getinfo(self, name):
|
||||
"""
|
||||
Supplement getinfo for implied dirs.
|
||||
"""
|
||||
try:
|
||||
return super().getinfo(name)
|
||||
except KeyError:
|
||||
if not name.endswith('/') or name not in self._name_set():
|
||||
raise
|
||||
return zipfile.ZipInfo(filename=name)
|
||||
|
||||
@classmethod
|
||||
def make(cls, source):
|
||||
"""
|
||||
|
|
@ -138,13 +151,13 @@ class FastLookup(CompleteDirs):
|
|||
def namelist(self):
|
||||
with contextlib.suppress(AttributeError):
|
||||
return self.__names
|
||||
self.__names = super(FastLookup, self).namelist()
|
||||
self.__names = super().namelist()
|
||||
return self.__names
|
||||
|
||||
def _name_set(self):
|
||||
with contextlib.suppress(AttributeError):
|
||||
return self.__lookup
|
||||
self.__lookup = super(FastLookup, self)._name_set()
|
||||
self.__lookup = super()._name_set()
|
||||
return self.__lookup
|
||||
|
||||
|
||||
|
|
@ -246,6 +259,18 @@ def __init__(self, root, at=""):
|
|||
self.root = FastLookup.make(root)
|
||||
self.at = at
|
||||
|
||||
def __eq__(self, other):
|
||||
"""
|
||||
>>> Path(zipfile.ZipFile(io.BytesIO(), 'w')) == 'foo'
|
||||
False
|
||||
"""
|
||||
if self.__class__ is not other.__class__:
|
||||
return NotImplemented
|
||||
return (self.root, self.at) == (other.root, other.at)
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.root, self.at))
|
||||
|
||||
def open(self, mode='r', *args, pwd=None, **kwargs):
|
||||
"""
|
||||
Open this entry as text or binary following the semantics
|
||||
|
|
@ -316,6 +341,38 @@ def iterdir(self):
|
|||
subs = map(self._next, self.root.namelist())
|
||||
return filter(self._is_child, subs)
|
||||
|
||||
def match(self, path_pattern):
|
||||
return pathlib.Path(self.at).match(path_pattern)
|
||||
|
||||
def is_symlink(self):
|
||||
"""
|
||||
Return whether this path is a symlink. Always false (python/cpython#82102).
|
||||
"""
|
||||
return False
|
||||
|
||||
def _descendants(self):
|
||||
for child in self.iterdir():
|
||||
yield child
|
||||
if child.is_dir():
|
||||
yield from child._descendants()
|
||||
|
||||
def glob(self, pattern):
|
||||
if not pattern:
|
||||
raise ValueError(f"Unacceptable pattern: {pattern!r}")
|
||||
|
||||
matches = re.compile(fnmatch.translate(pattern)).fullmatch
|
||||
return (
|
||||
child
|
||||
for child in self._descendants()
|
||||
if matches(str(child.relative_to(self)))
|
||||
)
|
||||
|
||||
def rglob(self, pattern):
|
||||
return self.glob(f'**/{pattern}')
|
||||
|
||||
def relative_to(self, other, *extra):
|
||||
return posixpath.relpath(str(self), str(other.joinpath(*extra)))
|
||||
|
||||
def __str__(self):
|
||||
return posixpath.join(self.root.filename, self.at)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,4 @@
|
|||
In zipfile, sync Path with `zipp 3.14
|
||||
<https://zipp.readthedocs.io/en/latest/history.html#v3-14-0>`_, including
|
||||
fix for extractall on the underlying zipfile after being wrapped in
|
||||
``Path``.
|
||||
Loading…
Add table
Add a link
Reference in a new issue