mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 18:54:53 +00:00 
			
		
		
		
	
		
			
	
	
		
			304 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			304 lines
		
	
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | import os | ||
|  | import tempfile | ||
|  | 
 | ||
|  | from . import abc as resources_abc | ||
|  | from builtins import open as builtins_open | ||
|  | from contextlib import contextmanager, suppress | ||
|  | from importlib import import_module | ||
|  | from importlib.abc import ResourceLoader | ||
|  | from io import BytesIO, TextIOWrapper | ||
|  | from pathlib import Path | ||
|  | from types import ModuleType | ||
|  | from typing import Iterator, Optional, Set, Union   # noqa: F401 | ||
|  | from typing import cast | ||
|  | from typing.io import BinaryIO, TextIO | ||
|  | from zipfile import ZipFile | ||
|  | 
 | ||
|  | 
 | ||
|  | Package = Union[str, ModuleType] | ||
|  | Resource = Union[str, os.PathLike] | ||
|  | 
 | ||
|  | 
 | ||
|  | def _get_package(package) -> ModuleType: | ||
|  |     """Take a package name or module object and return the module.
 | ||
|  | 
 | ||
|  |     If a name, the module is imported.  If the passed or imported module | ||
|  |     object is not a package, raise an exception. | ||
|  |     """
 | ||
|  |     if hasattr(package, '__spec__'): | ||
|  |         if package.__spec__.submodule_search_locations is None: | ||
|  |             raise TypeError('{!r} is not a package'.format( | ||
|  |                 package.__spec__.name)) | ||
|  |         else: | ||
|  |             return package | ||
|  |     else: | ||
|  |         module = import_module(package) | ||
|  |         if module.__spec__.submodule_search_locations is None: | ||
|  |             raise TypeError('{!r} is not a package'.format(package)) | ||
|  |         else: | ||
|  |             return module | ||
|  | 
 | ||
|  | 
 | ||
|  | def _normalize_path(path) -> str: | ||
|  |     """Normalize a path by ensuring it is a string.
 | ||
|  | 
 | ||
|  |     If the resulting string contains path separators, an exception is raised. | ||
|  |     """
 | ||
|  |     str_path = str(path) | ||
|  |     parent, file_name = os.path.split(str_path) | ||
|  |     if parent: | ||
|  |         raise ValueError('{!r} must be only a file name'.format(path)) | ||
|  |     else: | ||
|  |         return file_name | ||
|  | 
 | ||
|  | 
 | ||
|  | def _get_resource_reader( | ||
|  |         package: ModuleType) -> Optional[resources_abc.ResourceReader]: | ||
|  |     # Return the package's loader if it's a ResourceReader.  We can't use | ||
|  |     # a issubclass() check here because apparently abc.'s __subclasscheck__() | ||
|  |     # hook wants to create a weak reference to the object, but | ||
|  |     # zipimport.zipimporter does not support weak references, resulting in a | ||
|  |     # TypeError.  That seems terrible. | ||
|  |     if hasattr(package.__spec__.loader, 'open_resource'): | ||
|  |         return cast(resources_abc.ResourceReader, package.__spec__.loader) | ||
|  |     return None | ||
|  | 
 | ||
|  | 
 | ||
|  | def open_binary(package: Package, resource: Resource) -> BinaryIO: | ||
|  |     """Return a file-like object opened for binary reading of the resource.""" | ||
|  |     resource = _normalize_path(resource) | ||
|  |     package = _get_package(package) | ||
|  |     reader = _get_resource_reader(package) | ||
|  |     if reader is not None: | ||
|  |         return reader.open_resource(resource) | ||
|  |     absolute_package_path = os.path.abspath(package.__spec__.origin) | ||
|  |     package_path = os.path.dirname(absolute_package_path) | ||
|  |     full_path = os.path.join(package_path, resource) | ||
|  |     try: | ||
|  |         return builtins_open(full_path, mode='rb') | ||
|  |     except OSError: | ||
|  |         # Just assume the loader is a resource loader; all the relevant | ||
|  |         # importlib.machinery loaders are and an AttributeError for | ||
|  |         # get_data() will make it clear what is needed from the loader. | ||
|  |         loader = cast(ResourceLoader, package.__spec__.loader) | ||
|  |         data = None | ||
|  |         if hasattr(package.__spec__.loader, 'get_data'): | ||
|  |             with suppress(OSError): | ||
|  |                 data = loader.get_data(full_path) | ||
|  |         if data is None: | ||
|  |             package_name = package.__spec__.name | ||
|  |             message = '{!r} resource not found in {!r}'.format( | ||
|  |                 resource, package_name) | ||
|  |             raise FileNotFoundError(message) | ||
|  |         else: | ||
|  |             return BytesIO(data) | ||
|  | 
 | ||
|  | 
 | ||
|  | def open_text(package: Package, | ||
|  |               resource: Resource, | ||
|  |               encoding: str = 'utf-8', | ||
|  |               errors: str = 'strict') -> TextIO: | ||
|  |     """Return a file-like object opened for text reading of the resource.""" | ||
|  |     resource = _normalize_path(resource) | ||
|  |     package = _get_package(package) | ||
|  |     reader = _get_resource_reader(package) | ||
|  |     if reader is not None: | ||
|  |         return TextIOWrapper(reader.open_resource(resource), encoding, errors) | ||
|  |     absolute_package_path = os.path.abspath(package.__spec__.origin) | ||
|  |     package_path = os.path.dirname(absolute_package_path) | ||
|  |     full_path = os.path.join(package_path, resource) | ||
|  |     try: | ||
|  |         return builtins_open( | ||
|  |             full_path, mode='r', encoding=encoding, errors=errors) | ||
|  |     except OSError: | ||
|  |         # Just assume the loader is a resource loader; all the relevant | ||
|  |         # importlib.machinery loaders are and an AttributeError for | ||
|  |         # get_data() will make it clear what is needed from the loader. | ||
|  |         loader = cast(ResourceLoader, package.__spec__.loader) | ||
|  |         data = None | ||
|  |         if hasattr(package.__spec__.loader, 'get_data'): | ||
|  |             with suppress(OSError): | ||
|  |                 data = loader.get_data(full_path) | ||
|  |         if data is None: | ||
|  |             package_name = package.__spec__.name | ||
|  |             message = '{!r} resource not found in {!r}'.format( | ||
|  |                 resource, package_name) | ||
|  |             raise FileNotFoundError(message) | ||
|  |         else: | ||
|  |             return TextIOWrapper(BytesIO(data), encoding, errors) | ||
|  | 
 | ||
|  | 
 | ||
|  | def read_binary(package: Package, resource: Resource) -> bytes: | ||
|  |     """Return the binary contents of the resource.""" | ||
|  |     resource = _normalize_path(resource) | ||
|  |     package = _get_package(package) | ||
|  |     with open_binary(package, resource) as fp: | ||
|  |         return fp.read() | ||
|  | 
 | ||
|  | 
 | ||
|  | def read_text(package: Package, | ||
|  |               resource: Resource, | ||
|  |               encoding: str = 'utf-8', | ||
|  |               errors: str = 'strict') -> str: | ||
|  |     """Return the decoded string of the resource.
 | ||
|  | 
 | ||
|  |     The decoding-related arguments have the same semantics as those of | ||
|  |     bytes.decode(). | ||
|  |     """
 | ||
|  |     resource = _normalize_path(resource) | ||
|  |     package = _get_package(package) | ||
|  |     with open_text(package, resource, encoding, errors) as fp: | ||
|  |         return fp.read() | ||
|  | 
 | ||
|  | 
 | ||
|  | @contextmanager | ||
|  | def path(package: Package, resource: Resource) -> Iterator[Path]: | ||
|  |     """A context manager providing a file path object to the resource.
 | ||
|  | 
 | ||
|  |     If the resource does not already exist on its own on the file system, | ||
|  |     a temporary file will be created. If the file was created, the file | ||
|  |     will be deleted upon exiting the context manager (no exception is | ||
|  |     raised if the file was deleted prior to the context manager | ||
|  |     exiting). | ||
|  |     """
 | ||
|  |     resource = _normalize_path(resource) | ||
|  |     package = _get_package(package) | ||
|  |     reader = _get_resource_reader(package) | ||
|  |     if reader is not None: | ||
|  |         try: | ||
|  |             yield Path(reader.resource_path(resource)) | ||
|  |             return | ||
|  |         except FileNotFoundError: | ||
|  |             pass | ||
|  |     # Fall-through for both the lack of resource_path() *and* if | ||
|  |     # resource_path() raises FileNotFoundError. | ||
|  |     package_directory = Path(package.__spec__.origin).parent | ||
|  |     file_path = package_directory / resource | ||
|  |     if file_path.exists(): | ||
|  |         yield file_path | ||
|  |     else: | ||
|  |         with open_binary(package, resource) as fp: | ||
|  |             data = fp.read() | ||
|  |         # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' | ||
|  |         # blocks due to the need to close the temporary file to work on | ||
|  |         # Windows properly. | ||
|  |         fd, raw_path = tempfile.mkstemp() | ||
|  |         try: | ||
|  |             os.write(fd, data) | ||
|  |             os.close(fd) | ||
|  |             yield Path(raw_path) | ||
|  |         finally: | ||
|  |             try: | ||
|  |                 os.remove(raw_path) | ||
|  |             except FileNotFoundError: | ||
|  |                 pass | ||
|  | 
 | ||
|  | 
 | ||
|  | def is_resource(package: Package, name: str) -> bool: | ||
|  |     """True if 'name' is a resource inside 'package'.
 | ||
|  | 
 | ||
|  |     Directories are *not* resources. | ||
|  |     """
 | ||
|  |     package = _get_package(package) | ||
|  |     _normalize_path(name) | ||
|  |     reader = _get_resource_reader(package) | ||
|  |     if reader is not None: | ||
|  |         return reader.is_resource(name) | ||
|  |     try: | ||
|  |         package_contents = set(contents(package)) | ||
|  |     except (NotADirectoryError, FileNotFoundError): | ||
|  |         return False | ||
|  |     if name not in package_contents: | ||
|  |         return False | ||
|  |     # Just because the given file_name lives as an entry in the package's | ||
|  |     # contents doesn't necessarily mean it's a resource.  Directories are not | ||
|  |     # resources, so let's try to find out if it's a directory or not. | ||
|  |     path = Path(package.__spec__.origin).parent / name | ||
|  |     if path.is_file(): | ||
|  |         return True | ||
|  |     if path.is_dir(): | ||
|  |         return False | ||
|  |     # If it's not a file and it's not a directory, what is it?  Well, this | ||
|  |     # means the file doesn't exist on the file system, so it probably lives | ||
|  |     # inside a zip file.  We have to crack open the zip, look at its table of | ||
|  |     # contents, and make sure that this entry doesn't have sub-entries. | ||
|  |     archive_path = package.__spec__.loader.archive   # type: ignore | ||
|  |     package_directory = Path(package.__spec__.origin).parent | ||
|  |     with ZipFile(archive_path) as zf: | ||
|  |         toc = zf.namelist() | ||
|  |     relpath = package_directory.relative_to(archive_path) | ||
|  |     candidate_path = relpath / name | ||
|  |     for entry in toc: | ||
|  |         try: | ||
|  |             relative_to_candidate = Path(entry).relative_to(candidate_path) | ||
|  |         except ValueError: | ||
|  |             # The two paths aren't relative to each other so we can ignore it. | ||
|  |             continue | ||
|  |         # Since directories aren't explicitly listed in the zip file, we must | ||
|  |         # infer their 'directory-ness' by looking at the number of path | ||
|  |         # components in the path relative to the package resource we're | ||
|  |         # looking up.  If there are zero additional parts, it's a file, i.e. a | ||
|  |         # resource.  If there are more than zero it's a directory, i.e. not a | ||
|  |         # resource.  It has to be one of these two cases. | ||
|  |         return len(relative_to_candidate.parts) == 0 | ||
|  |     # I think it's impossible to get here.  It would mean that we are looking | ||
|  |     # for a resource in a zip file, there's an entry matching it in the return | ||
|  |     # value of contents(), but we never actually found it in the zip's table of | ||
|  |     # contents. | ||
|  |     raise AssertionError('Impossible situation') | ||
|  | 
 | ||
|  | 
 | ||
|  | def contents(package: Package) -> Iterator[str]: | ||
|  |     """Return the list of entries in 'package'.
 | ||
|  | 
 | ||
|  |     Note that not all entries are resources.  Specifically, directories are | ||
|  |     not considered resources.  Use `is_resource()` on each entry returned here | ||
|  |     to check if it is a resource or not. | ||
|  |     """
 | ||
|  |     package = _get_package(package) | ||
|  |     reader = _get_resource_reader(package) | ||
|  |     if reader is not None: | ||
|  |         yield from reader.contents() | ||
|  |         return | ||
|  |     # Is the package a namespace package?  By definition, namespace packages | ||
|  |     # cannot have resources. | ||
|  |     if (package.__spec__.origin == 'namespace' and | ||
|  |             not package.__spec__.has_location): | ||
|  |         return [] | ||
|  |     package_directory = Path(package.__spec__.origin).parent | ||
|  |     try: | ||
|  |         yield from os.listdir(str(package_directory)) | ||
|  |     except (NotADirectoryError, FileNotFoundError): | ||
|  |         # The package is probably in a zip file. | ||
|  |         archive_path = getattr(package.__spec__.loader, 'archive', None) | ||
|  |         if archive_path is None: | ||
|  |             raise | ||
|  |         relpath = package_directory.relative_to(archive_path) | ||
|  |         with ZipFile(archive_path) as zf: | ||
|  |             toc = zf.namelist() | ||
|  |         subdirs_seen = set()                        # type: Set | ||
|  |         for filename in toc: | ||
|  |             path = Path(filename) | ||
|  |             # Strip off any path component parts that are in common with the | ||
|  |             # package directory, relative to the zip archive's file system | ||
|  |             # path.  This gives us all the parts that live under the named | ||
|  |             # package inside the zip file.  If the length of these subparts is | ||
|  |             # exactly 1, then it is situated inside the package.  The resulting | ||
|  |             # length will be 0 if it's above the package, and it will be | ||
|  |             # greater than 1 if it lives in a subdirectory of the package | ||
|  |             # directory. | ||
|  |             # | ||
|  |             # However, since directories themselves don't appear in the zip | ||
|  |             # archive as a separate entry, we need to return the first path | ||
|  |             # component for any case that has > 1 subparts -- but only once! | ||
|  |             if path.parts[:len(relpath.parts)] != relpath.parts: | ||
|  |                 continue | ||
|  |             subparts = path.parts[len(relpath.parts):] | ||
|  |             if len(subparts) == 1: | ||
|  |                 yield subparts[0] | ||
|  |             elif len(subparts) > 1: | ||
|  |                 subdir = subparts[0] | ||
|  |                 if subdir not in subdirs_seen: | ||
|  |                     subdirs_seen.add(subdir) | ||
|  |                     yield subdir |