mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	* bpo-44771: Sync with importlib_resources 5.2.2, fixing refleak. * Include new 'resources' dir in the Makefile.
		
			
				
	
	
		
			118 lines
		
	
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			118 lines
		
	
	
	
		
			3.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
import pathlib
 | 
						|
import tempfile
 | 
						|
import functools
 | 
						|
import contextlib
 | 
						|
import types
 | 
						|
import importlib
 | 
						|
 | 
						|
from typing import Union, Any, Optional
 | 
						|
from .abc import ResourceReader, Traversable
 | 
						|
 | 
						|
from ._adapters import wrap_spec
 | 
						|
 | 
						|
Package = Union[types.ModuleType, str]
 | 
						|
Resource = Union[str, os.PathLike]
 | 
						|
 | 
						|
 | 
						|
def files(package):
 | 
						|
    # type: (Package) -> Traversable
 | 
						|
    """
 | 
						|
    Get a Traversable resource from a package
 | 
						|
    """
 | 
						|
    return from_package(get_package(package))
 | 
						|
 | 
						|
 | 
						|
def normalize_path(path):
 | 
						|
    # type: (Any) -> str
 | 
						|
    """Normalize a path by ensuring it is a string.
 | 
						|
 | 
						|
    If the resulting string contains path separators, an exception is raised.
 | 
						|
    """
 | 
						|
    str_path = str(path)
 | 
						|
    parent, file_name = os.path.split(str_path)
 | 
						|
    if parent:
 | 
						|
        raise ValueError(f'{path!r} must be only a file name')
 | 
						|
    return file_name
 | 
						|
 | 
						|
 | 
						|
def get_resource_reader(package):
 | 
						|
    # type: (types.ModuleType) -> Optional[ResourceReader]
 | 
						|
    """
 | 
						|
    Return the package's loader if it's a ResourceReader.
 | 
						|
    """
 | 
						|
    # We can't use
 | 
						|
    # a issubclass() check here because apparently abc.'s __subclasscheck__()
 | 
						|
    # hook wants to create a weak reference to the object, but
 | 
						|
    # zipimport.zipimporter does not support weak references, resulting in a
 | 
						|
    # TypeError.  That seems terrible.
 | 
						|
    spec = package.__spec__
 | 
						|
    reader = getattr(spec.loader, 'get_resource_reader', None)  # type: ignore
 | 
						|
    if reader is None:
 | 
						|
        return None
 | 
						|
    return reader(spec.name)  # type: ignore
 | 
						|
 | 
						|
 | 
						|
def resolve(cand):
 | 
						|
    # type: (Package) -> types.ModuleType
 | 
						|
    return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)
 | 
						|
 | 
						|
 | 
						|
def get_package(package):
 | 
						|
    # type: (Package) -> types.ModuleType
 | 
						|
    """Take a package name or module object and return the module.
 | 
						|
 | 
						|
    Raise an exception if the resolved module is not a package.
 | 
						|
    """
 | 
						|
    resolved = resolve(package)
 | 
						|
    if wrap_spec(resolved).submodule_search_locations is None:
 | 
						|
        raise TypeError(f'{package!r} is not a package')
 | 
						|
    return resolved
 | 
						|
 | 
						|
 | 
						|
def from_package(package):
 | 
						|
    """
 | 
						|
    Return a Traversable object for the given package.
 | 
						|
 | 
						|
    """
 | 
						|
    spec = wrap_spec(package)
 | 
						|
    reader = spec.loader.get_resource_reader(spec.name)
 | 
						|
    return reader.files()
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def _tempfile(reader, suffix=''):
 | 
						|
    # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
 | 
						|
    # blocks due to the need to close the temporary file to work on Windows
 | 
						|
    # properly.
 | 
						|
    fd, raw_path = tempfile.mkstemp(suffix=suffix)
 | 
						|
    try:
 | 
						|
        try:
 | 
						|
            os.write(fd, reader())
 | 
						|
        finally:
 | 
						|
            os.close(fd)
 | 
						|
        del reader
 | 
						|
        yield pathlib.Path(raw_path)
 | 
						|
    finally:
 | 
						|
        try:
 | 
						|
            os.remove(raw_path)
 | 
						|
        except FileNotFoundError:
 | 
						|
            pass
 | 
						|
 | 
						|
 | 
						|
@functools.singledispatch
 | 
						|
def as_file(path):
 | 
						|
    """
 | 
						|
    Given a Traversable object, return that object as a
 | 
						|
    path on the local file system in a context manager.
 | 
						|
    """
 | 
						|
    return _tempfile(path.read_bytes, suffix=path.name)
 | 
						|
 | 
						|
 | 
						|
@as_file.register(pathlib.Path)
 | 
						|
@contextlib.contextmanager
 | 
						|
def _(path):
 | 
						|
    """
 | 
						|
    Degenerate behavior for pathlib.Path objects.
 | 
						|
    """
 | 
						|
    yield path
 |