mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			458 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			458 lines
		
	
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import fnmatch
 | 
						|
import glob
 | 
						|
import os
 | 
						|
import os.path
 | 
						|
import shutil
 | 
						|
import stat
 | 
						|
 | 
						|
from .iterutil import iter_many
 | 
						|
 | 
						|
 | 
						|
USE_CWD = object()
 | 
						|
 | 
						|
 | 
						|
C_SOURCE_SUFFIXES = ('.c', '.h')
 | 
						|
 | 
						|
 | 
						|
def create_backup(old, backup=None):
 | 
						|
    if isinstance(old, str):
 | 
						|
        filename = old
 | 
						|
    else:
 | 
						|
        filename = getattr(old, 'name', None)
 | 
						|
    if not filename:
 | 
						|
        return None
 | 
						|
    if not backup or backup is True:
 | 
						|
        backup = f'{filename}.bak'
 | 
						|
    try:
 | 
						|
        shutil.copyfile(filename, backup)
 | 
						|
    except FileNotFoundError as exc:
 | 
						|
        if exc.filename != filename:
 | 
						|
            raise   # re-raise
 | 
						|
        backup = None
 | 
						|
    return backup
 | 
						|
 | 
						|
 | 
						|
##################################
 | 
						|
# filenames
 | 
						|
 | 
						|
def fix_filename(filename, relroot=USE_CWD, *,
 | 
						|
                 fixroot=True,
 | 
						|
                 _badprefix=f'..{os.path.sep}',
 | 
						|
                 ):
 | 
						|
    """Return a normalized, absolute-path copy of the given filename."""
 | 
						|
    if not relroot or relroot is USE_CWD:
 | 
						|
        return os.path.abspath(filename)
 | 
						|
    if fixroot:
 | 
						|
        relroot = os.path.abspath(relroot)
 | 
						|
    return _fix_filename(filename, relroot)
 | 
						|
 | 
						|
 | 
						|
def _fix_filename(filename, relroot, *,
 | 
						|
                  _badprefix=f'..{os.path.sep}',
 | 
						|
                  ):
 | 
						|
    orig = filename
 | 
						|
 | 
						|
    # First we normalize.
 | 
						|
    filename = os.path.normpath(filename)
 | 
						|
    if filename.startswith(_badprefix):
 | 
						|
        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
 | 
						|
 | 
						|
    # Now make sure it is absolute (relative to relroot).
 | 
						|
    if not os.path.isabs(filename):
 | 
						|
        filename = os.path.join(relroot, filename)
 | 
						|
    else:
 | 
						|
        relpath = os.path.relpath(filename, relroot)
 | 
						|
        if os.path.join(relroot, relpath) != filename:
 | 
						|
            raise ValueError(f'expected {relroot!r} as lroot, got {orig!r}')
 | 
						|
 | 
						|
    return filename
 | 
						|
 | 
						|
 | 
						|
def fix_filenames(filenames, relroot=USE_CWD):
 | 
						|
    if not relroot or relroot is USE_CWD:
 | 
						|
        filenames = (os.path.abspath(v) for v in filenames)
 | 
						|
    else:
 | 
						|
        relroot = os.path.abspath(relroot)
 | 
						|
        filenames = (_fix_filename(v, relroot) for v in filenames)
 | 
						|
    return filenames, relroot
 | 
						|
 | 
						|
 | 
						|
def format_filename(filename, relroot=USE_CWD, *,
 | 
						|
                    fixroot=True,
 | 
						|
                    normalize=True,
 | 
						|
                    _badprefix=f'..{os.path.sep}',
 | 
						|
                    ):
 | 
						|
    """Return a consistent relative-path representation of the filename."""
 | 
						|
    orig = filename
 | 
						|
    if normalize:
 | 
						|
        filename = os.path.normpath(filename)
 | 
						|
    if relroot is None:
 | 
						|
        # Otherwise leave it as-is.
 | 
						|
        return filename
 | 
						|
    elif relroot is USE_CWD:
 | 
						|
        # Make it relative to CWD.
 | 
						|
        filename = os.path.relpath(filename)
 | 
						|
    else:
 | 
						|
        # Make it relative to "relroot".
 | 
						|
        if fixroot:
 | 
						|
            relroot = os.path.abspath(relroot)
 | 
						|
        elif not relroot:
 | 
						|
            raise ValueError('missing relroot')
 | 
						|
        filename = os.path.relpath(filename, relroot)
 | 
						|
    if filename.startswith(_badprefix):
 | 
						|
        raise ValueError(f'bad filename {orig!r} (resolves beyond relative root')
 | 
						|
    return filename
 | 
						|
 | 
						|
 | 
						|
##################################
 | 
						|
# find files
 | 
						|
 | 
						|
def match_glob(filename, pattern):
 | 
						|
    if fnmatch.fnmatch(filename, pattern):
 | 
						|
        return True
 | 
						|
 | 
						|
    # fnmatch doesn't handle ** quite right.  It will not match the
 | 
						|
    # following:
 | 
						|
    #
 | 
						|
    #  ('x/spam.py', 'x/**/*.py')
 | 
						|
    #  ('spam.py', '**/*.py')
 | 
						|
    #
 | 
						|
    # though it *will* match the following:
 | 
						|
    #
 | 
						|
    #  ('x/y/spam.py', 'x/**/*.py')
 | 
						|
    #  ('x/spam.py', '**/*.py')
 | 
						|
 | 
						|
    if '**/' not in pattern:
 | 
						|
        return False
 | 
						|
 | 
						|
    # We only accommodate the single-"**" case.
 | 
						|
    return fnmatch.fnmatch(filename, pattern.replace('**/', '', 1))
 | 
						|
 | 
						|
 | 
						|
def process_filenames(filenames, *,
 | 
						|
                      start=None,
 | 
						|
                      include=None,
 | 
						|
                      exclude=None,
 | 
						|
                      relroot=USE_CWD,
 | 
						|
                      ):
 | 
						|
    if relroot and relroot is not USE_CWD:
 | 
						|
        relroot = os.path.abspath(relroot)
 | 
						|
    if start:
 | 
						|
        start = fix_filename(start, relroot, fixroot=False)
 | 
						|
    if include:
 | 
						|
        include = set(fix_filename(v, relroot, fixroot=False)
 | 
						|
                      for v in include)
 | 
						|
    if exclude:
 | 
						|
        exclude = set(fix_filename(v, relroot, fixroot=False)
 | 
						|
                      for v in exclude)
 | 
						|
 | 
						|
    onempty = Exception('no filenames provided')
 | 
						|
    for filename, solo in iter_many(filenames, onempty):
 | 
						|
        filename = fix_filename(filename, relroot, fixroot=False)
 | 
						|
        relfile = format_filename(filename, relroot, fixroot=False, normalize=False)
 | 
						|
        check, start = _get_check(filename, start, include, exclude)
 | 
						|
        yield filename, relfile, check, solo
 | 
						|
 | 
						|
 | 
						|
def expand_filenames(filenames):
 | 
						|
    for filename in filenames:
 | 
						|
        # XXX Do we need to use glob.escape (a la commit 9355868458, GH-20994)?
 | 
						|
        if '**/' in filename:
 | 
						|
            yield from glob.glob(filename.replace('**/', ''))
 | 
						|
        yield from glob.glob(filename)
 | 
						|
 | 
						|
 | 
						|
def _get_check(filename, start, include, exclude):
 | 
						|
    if start and filename != start:
 | 
						|
        return (lambda: '<skipped>'), start
 | 
						|
    else:
 | 
						|
        def check():
 | 
						|
            if _is_excluded(filename, exclude, include):
 | 
						|
                return '<excluded>'
 | 
						|
            return None
 | 
						|
        return check, None
 | 
						|
 | 
						|
 | 
						|
def _is_excluded(filename, exclude, include):
 | 
						|
    if include:
 | 
						|
        for included in include:
 | 
						|
            if match_glob(filename, included):
 | 
						|
                return False
 | 
						|
        return True
 | 
						|
    elif exclude:
 | 
						|
        for excluded in exclude:
 | 
						|
            if match_glob(filename, excluded):
 | 
						|
                return True
 | 
						|
        return False
 | 
						|
    else:
 | 
						|
        return False
 | 
						|
 | 
						|
 | 
						|
def _walk_tree(root, *,
 | 
						|
               _walk=os.walk,
 | 
						|
               ):
 | 
						|
    # A wrapper around os.walk that resolves the filenames.
 | 
						|
    for parent, _, names in _walk(root):
 | 
						|
        for name in names:
 | 
						|
            yield os.path.join(parent, name)
 | 
						|
 | 
						|
 | 
						|
def walk_tree(root, *,
 | 
						|
              suffix=None,
 | 
						|
              walk=_walk_tree,
 | 
						|
              ):
 | 
						|
    """Yield each file in the tree under the given directory name.
 | 
						|
 | 
						|
    If "suffix" is provided then only files with that suffix will
 | 
						|
    be included.
 | 
						|
    """
 | 
						|
    if suffix and not isinstance(suffix, str):
 | 
						|
        raise ValueError('suffix must be a string')
 | 
						|
 | 
						|
    for filename in walk(root):
 | 
						|
        if suffix and not filename.endswith(suffix):
 | 
						|
            continue
 | 
						|
        yield filename
 | 
						|
 | 
						|
 | 
						|
def glob_tree(root, *,
 | 
						|
              suffix=None,
 | 
						|
              _glob=glob.iglob,
 | 
						|
              ):
 | 
						|
    """Yield each file in the tree under the given directory name.
 | 
						|
 | 
						|
    If "suffix" is provided then only files with that suffix will
 | 
						|
    be included.
 | 
						|
    """
 | 
						|
    suffix = suffix or ''
 | 
						|
    if not isinstance(suffix, str):
 | 
						|
        raise ValueError('suffix must be a string')
 | 
						|
 | 
						|
    for filename in _glob(f'{root}/*{suffix}'):
 | 
						|
        yield filename
 | 
						|
    for filename in _glob(f'{root}/**/*{suffix}'):
 | 
						|
        yield filename
 | 
						|
 | 
						|
 | 
						|
def iter_files(root, suffix=None, relparent=None, *,
 | 
						|
               get_files=os.walk,
 | 
						|
               _glob=glob_tree,
 | 
						|
               _walk=walk_tree,
 | 
						|
               ):
 | 
						|
    """Yield each file in the tree under the given directory name.
 | 
						|
 | 
						|
    If "root" is a non-string iterable then do the same for each of
 | 
						|
    those trees.
 | 
						|
 | 
						|
    If "suffix" is provided then only files with that suffix will
 | 
						|
    be included.
 | 
						|
 | 
						|
    if "relparent" is provided then it is used to resolve each
 | 
						|
    filename as a relative path.
 | 
						|
    """
 | 
						|
    if not isinstance(root, str):
 | 
						|
        roots = root
 | 
						|
        for root in roots:
 | 
						|
            yield from iter_files(root, suffix, relparent,
 | 
						|
                                  get_files=get_files,
 | 
						|
                                  _glob=_glob, _walk=_walk)
 | 
						|
        return
 | 
						|
 | 
						|
    # Use the right "walk" function.
 | 
						|
    if get_files in (glob.glob, glob.iglob, glob_tree):
 | 
						|
        get_files = _glob
 | 
						|
    else:
 | 
						|
        _files = _walk_tree if get_files in (os.walk, walk_tree) else get_files
 | 
						|
        get_files = (lambda *a, **k: _walk(*a, walk=_files, **k))
 | 
						|
 | 
						|
    # Handle a single suffix.
 | 
						|
    if suffix and not isinstance(suffix, str):
 | 
						|
        filenames = get_files(root)
 | 
						|
        suffix = tuple(suffix)
 | 
						|
    else:
 | 
						|
        filenames = get_files(root, suffix=suffix)
 | 
						|
        suffix = None
 | 
						|
 | 
						|
    for filename in filenames:
 | 
						|
        if suffix and not isinstance(suffix, str):  # multiple suffixes
 | 
						|
            if not filename.endswith(suffix):
 | 
						|
                continue
 | 
						|
        if relparent:
 | 
						|
            filename = os.path.relpath(filename, relparent)
 | 
						|
        yield filename
 | 
						|
 | 
						|
 | 
						|
def iter_files_by_suffix(root, suffixes, relparent=None, *,
 | 
						|
                         walk=walk_tree,
 | 
						|
                         _iter_files=iter_files,
 | 
						|
                         ):
 | 
						|
    """Yield each file in the tree that has the given suffixes.
 | 
						|
 | 
						|
    Unlike iter_files(), the results are in the original suffix order.
 | 
						|
    """
 | 
						|
    if isinstance(suffixes, str):
 | 
						|
        suffixes = [suffixes]
 | 
						|
    # XXX Ignore repeated suffixes?
 | 
						|
    for suffix in suffixes:
 | 
						|
        yield from _iter_files(root, suffix, relparent)
 | 
						|
 | 
						|
 | 
						|
##################################
 | 
						|
# file info
 | 
						|
 | 
						|
# XXX posix-only?
 | 
						|
 | 
						|
S_IRANY = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
 | 
						|
S_IWANY = stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH
 | 
						|
S_IXANY = stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
 | 
						|
 | 
						|
 | 
						|
def is_readable(file, *, user=None, check=False):
 | 
						|
    filename, st, mode = _get_file_info(file)
 | 
						|
    if check:
 | 
						|
        try:
 | 
						|
            okay = _check_file(filename, S_IRANY)
 | 
						|
        except NotImplementedError:
 | 
						|
            okay = NotImplemented
 | 
						|
        if okay is not NotImplemented:
 | 
						|
            return okay
 | 
						|
        # Fall back to checking the mode.
 | 
						|
    return _check_mode(st, mode, S_IRANY, user)
 | 
						|
 | 
						|
 | 
						|
def is_writable(file, *, user=None, check=False):
 | 
						|
    filename, st, mode = _get_file_info(file)
 | 
						|
    if check:
 | 
						|
        try:
 | 
						|
            okay = _check_file(filename, S_IWANY)
 | 
						|
        except NotImplementedError:
 | 
						|
            okay = NotImplemented
 | 
						|
        if okay is not NotImplemented:
 | 
						|
            return okay
 | 
						|
        # Fall back to checking the mode.
 | 
						|
    return _check_mode(st, mode, S_IWANY, user)
 | 
						|
 | 
						|
 | 
						|
def is_executable(file, *, user=None, check=False):
 | 
						|
    filename, st, mode = _get_file_info(file)
 | 
						|
    if check:
 | 
						|
        try:
 | 
						|
            okay = _check_file(filename, S_IXANY)
 | 
						|
        except NotImplementedError:
 | 
						|
            okay = NotImplemented
 | 
						|
        if okay is not NotImplemented:
 | 
						|
            return okay
 | 
						|
        # Fall back to checking the mode.
 | 
						|
    return _check_mode(st, mode, S_IXANY, user)
 | 
						|
 | 
						|
 | 
						|
def _get_file_info(file):
 | 
						|
    filename = st = mode = None
 | 
						|
    if isinstance(file, int):
 | 
						|
        mode = file
 | 
						|
    elif isinstance(file, os.stat_result):
 | 
						|
        st = file
 | 
						|
    else:
 | 
						|
        if isinstance(file, str):
 | 
						|
            filename = file
 | 
						|
        elif hasattr(file, 'name') and os.path.exists(file.name):
 | 
						|
            filename = file.name
 | 
						|
        else:
 | 
						|
            raise NotImplementedError(file)
 | 
						|
        st = os.stat(filename)
 | 
						|
    return filename, st, mode or st.st_mode
 | 
						|
 | 
						|
 | 
						|
def _check_file(filename, check):
 | 
						|
    if not isinstance(filename, str):
 | 
						|
        raise Exception(f'filename required to check file, got {filename}')
 | 
						|
    if check & S_IRANY:
 | 
						|
        flags = os.O_RDONLY
 | 
						|
    elif check & S_IWANY:
 | 
						|
        flags = os.O_WRONLY
 | 
						|
    elif check & S_IXANY:
 | 
						|
        # We can worry about S_IXANY later
 | 
						|
        return NotImplemented
 | 
						|
    else:
 | 
						|
        raise NotImplementedError(check)
 | 
						|
 | 
						|
    try:
 | 
						|
        fd = os.open(filename, flags)
 | 
						|
    except PermissionError:
 | 
						|
        return False
 | 
						|
    # We do not ignore other exceptions.
 | 
						|
    else:
 | 
						|
        os.close(fd)
 | 
						|
        return True
 | 
						|
 | 
						|
 | 
						|
def _get_user_info(user):
 | 
						|
    import pwd
 | 
						|
    username = uid = gid = groups = None
 | 
						|
    if user is None:
 | 
						|
        uid = os.geteuid()
 | 
						|
        #username = os.getlogin()
 | 
						|
        username = pwd.getpwuid(uid)[0]
 | 
						|
        gid = os.getgid()
 | 
						|
        groups = os.getgroups()
 | 
						|
    else:
 | 
						|
        if isinstance(user, int):
 | 
						|
            uid = user
 | 
						|
            entry = pwd.getpwuid(uid)
 | 
						|
            username = entry.pw_name
 | 
						|
        elif isinstance(user, str):
 | 
						|
            username = user
 | 
						|
            entry = pwd.getpwnam(username)
 | 
						|
            uid = entry.pw_uid
 | 
						|
        else:
 | 
						|
            raise NotImplementedError(user)
 | 
						|
        gid = entry.pw_gid
 | 
						|
        os.getgrouplist(username, gid)
 | 
						|
    return username, uid, gid, groups
 | 
						|
 | 
						|
 | 
						|
def _check_mode(st, mode, check, user):
 | 
						|
    orig = check
 | 
						|
    _, uid, gid, groups = _get_user_info(user)
 | 
						|
    if check & S_IRANY:
 | 
						|
        check -= S_IRANY
 | 
						|
        matched = False
 | 
						|
        if mode & stat.S_IRUSR:
 | 
						|
            if st.st_uid == uid:
 | 
						|
                matched = True
 | 
						|
        if mode & stat.S_IRGRP:
 | 
						|
            if st.st_uid == gid or st.st_uid in groups:
 | 
						|
                matched = True
 | 
						|
        if mode & stat.S_IROTH:
 | 
						|
            matched = True
 | 
						|
        if not matched:
 | 
						|
            return False
 | 
						|
    if check & S_IWANY:
 | 
						|
        check -= S_IWANY
 | 
						|
        matched = False
 | 
						|
        if mode & stat.S_IWUSR:
 | 
						|
            if st.st_uid == uid:
 | 
						|
                matched = True
 | 
						|
        if mode & stat.S_IWGRP:
 | 
						|
            if st.st_uid == gid or st.st_uid in groups:
 | 
						|
                matched = True
 | 
						|
        if mode & stat.S_IWOTH:
 | 
						|
            matched = True
 | 
						|
        if not matched:
 | 
						|
            return False
 | 
						|
    if check & S_IXANY:
 | 
						|
        check -= S_IXANY
 | 
						|
        matched = False
 | 
						|
        if mode & stat.S_IXUSR:
 | 
						|
            if st.st_uid == uid:
 | 
						|
                matched = True
 | 
						|
        if mode & stat.S_IXGRP:
 | 
						|
            if st.st_uid == gid or st.st_uid in groups:
 | 
						|
                matched = True
 | 
						|
        if mode & stat.S_IXOTH:
 | 
						|
            matched = True
 | 
						|
        if not matched:
 | 
						|
            return False
 | 
						|
    if check:
 | 
						|
        raise NotImplementedError((orig, check))
 | 
						|
    return True
 |