mirror of
https://github.com/python/cpython.git
synced 2025-10-23 18:03:48 +00:00

In the private pathlib ABCs, replace `_WritablePath._write_info()` with `_WritablePath._copy_from()`. This provides the target path object with more control over the copying process, including support for querying and setting metadata *before* the path is created. Adjust `_ReadablePath.copy()` so that it forwards its keyword arguments to `_WritablePath._copy_from()` of the target path object. This allows us to remove the unimplemented *preserve_metadata* argument in the ABC method, making it a `Path` exclusive.
520 lines
17 KiB
Python
520 lines
17 KiB
Python
"""
|
|
Low-level OS functionality wrappers used by pathlib.
|
|
"""
|
|
|
|
from errno import *
|
|
from stat import S_ISDIR, S_ISREG, S_ISLNK, S_IMODE
|
|
import io
|
|
import os
|
|
import sys
|
|
try:
|
|
import fcntl
|
|
except ImportError:
|
|
fcntl = None
|
|
try:
|
|
import posix
|
|
except ImportError:
|
|
posix = None
|
|
try:
|
|
import _winapi
|
|
except ImportError:
|
|
_winapi = None
|
|
|
|
|
|
def _get_copy_blocksize(infd):
|
|
"""Determine blocksize for fastcopying on Linux.
|
|
Hopefully the whole file will be copied in a single call.
|
|
The copying itself should be performed in a loop 'till EOF is
|
|
reached (0 return) so a blocksize smaller or bigger than the actual
|
|
file size should not make any difference, also in case the file
|
|
content changes while being copied.
|
|
"""
|
|
try:
|
|
blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB
|
|
except OSError:
|
|
blocksize = 2 ** 27 # 128 MiB
|
|
# On 32-bit architectures truncate to 1 GiB to avoid OverflowError,
|
|
# see gh-82500.
|
|
if sys.maxsize < 2 ** 32:
|
|
blocksize = min(blocksize, 2 ** 30)
|
|
return blocksize
|
|
|
|
|
|
if fcntl and hasattr(fcntl, 'FICLONE'):
|
|
def _ficlone(source_fd, target_fd):
|
|
"""
|
|
Perform a lightweight copy of two files, where the data blocks are
|
|
copied only when modified. This is known as Copy on Write (CoW),
|
|
instantaneous copy or reflink.
|
|
"""
|
|
fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd)
|
|
else:
|
|
_ficlone = None
|
|
|
|
|
|
if posix and hasattr(posix, '_fcopyfile'):
|
|
def _fcopyfile(source_fd, target_fd):
|
|
"""
|
|
Copy a regular file content using high-performance fcopyfile(3)
|
|
syscall (macOS).
|
|
"""
|
|
posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA)
|
|
else:
|
|
_fcopyfile = None
|
|
|
|
|
|
if hasattr(os, 'copy_file_range'):
|
|
def _copy_file_range(source_fd, target_fd):
|
|
"""
|
|
Copy data from one regular mmap-like fd to another by using a
|
|
high-performance copy_file_range(2) syscall that gives filesystems
|
|
an opportunity to implement the use of reflinks or server-side
|
|
copy.
|
|
This should work on Linux >= 4.5 only.
|
|
"""
|
|
blocksize = _get_copy_blocksize(source_fd)
|
|
offset = 0
|
|
while True:
|
|
sent = os.copy_file_range(source_fd, target_fd, blocksize,
|
|
offset_dst=offset)
|
|
if sent == 0:
|
|
break # EOF
|
|
offset += sent
|
|
else:
|
|
_copy_file_range = None
|
|
|
|
|
|
if hasattr(os, 'sendfile'):
|
|
def _sendfile(source_fd, target_fd):
|
|
"""Copy data from one regular mmap-like fd to another by using
|
|
high-performance sendfile(2) syscall.
|
|
This should work on Linux >= 2.6.33 only.
|
|
"""
|
|
blocksize = _get_copy_blocksize(source_fd)
|
|
offset = 0
|
|
while True:
|
|
sent = os.sendfile(target_fd, source_fd, offset, blocksize)
|
|
if sent == 0:
|
|
break # EOF
|
|
offset += sent
|
|
else:
|
|
_sendfile = None
|
|
|
|
|
|
if _winapi and hasattr(_winapi, 'CopyFile2'):
|
|
def copyfile2(source, target):
|
|
"""
|
|
Copy from one file to another using CopyFile2 (Windows only).
|
|
"""
|
|
_winapi.CopyFile2(source, target, 0)
|
|
else:
|
|
copyfile2 = None
|
|
|
|
|
|
def copyfileobj(source_f, target_f):
|
|
"""
|
|
Copy data from file-like object source_f to file-like object target_f.
|
|
"""
|
|
try:
|
|
source_fd = source_f.fileno()
|
|
target_fd = target_f.fileno()
|
|
except Exception:
|
|
pass # Fall through to generic code.
|
|
else:
|
|
try:
|
|
# Use OS copy-on-write where available.
|
|
if _ficlone:
|
|
try:
|
|
_ficlone(source_fd, target_fd)
|
|
return
|
|
except OSError as err:
|
|
if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV):
|
|
raise err
|
|
|
|
# Use OS copy where available.
|
|
if _fcopyfile:
|
|
try:
|
|
_fcopyfile(source_fd, target_fd)
|
|
return
|
|
except OSError as err:
|
|
if err.errno not in (EINVAL, ENOTSUP):
|
|
raise err
|
|
if _copy_file_range:
|
|
try:
|
|
_copy_file_range(source_fd, target_fd)
|
|
return
|
|
except OSError as err:
|
|
if err.errno not in (ETXTBSY, EXDEV):
|
|
raise err
|
|
if _sendfile:
|
|
try:
|
|
_sendfile(source_fd, target_fd)
|
|
return
|
|
except OSError as err:
|
|
if err.errno != ENOTSOCK:
|
|
raise err
|
|
except OSError as err:
|
|
# Produce more useful error messages.
|
|
err.filename = source_f.name
|
|
err.filename2 = target_f.name
|
|
raise err
|
|
|
|
# Last resort: copy with fileobj read() and write().
|
|
read_source = source_f.read
|
|
write_target = target_f.write
|
|
while buf := read_source(1024 * 1024):
|
|
write_target(buf)
|
|
|
|
|
|
def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None,
|
|
newline=None):
|
|
"""
|
|
Open the file pointed to by this path and return a file object, as
|
|
the built-in open() function does.
|
|
"""
|
|
try:
|
|
return io.open(path, mode, buffering, encoding, errors, newline)
|
|
except TypeError:
|
|
pass
|
|
cls = type(path)
|
|
text = 'b' not in mode
|
|
mode = ''.join(sorted(c for c in mode if c not in 'bt'))
|
|
if text:
|
|
try:
|
|
attr = getattr(cls, f'__open_{mode}__')
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
return attr(path, buffering, encoding, errors, newline)
|
|
|
|
try:
|
|
attr = getattr(cls, f'__open_{mode}b__')
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
stream = attr(path, buffering)
|
|
if text:
|
|
stream = io.TextIOWrapper(stream, encoding, errors, newline)
|
|
return stream
|
|
|
|
raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}")
|
|
|
|
|
|
def ensure_distinct_paths(source, target):
|
|
"""
|
|
Raise OSError(EINVAL) if the other path is within this path.
|
|
"""
|
|
# Note: there is no straightforward, foolproof algorithm to determine
|
|
# if one directory is within another (a particularly perverse example
|
|
# would be a single network share mounted in one location via NFS, and
|
|
# in another location via CIFS), so we simply checks whether the
|
|
# other path is lexically equal to, or within, this path.
|
|
if source == target:
|
|
err = OSError(EINVAL, "Source and target are the same path")
|
|
elif source in target.parents:
|
|
err = OSError(EINVAL, "Source path is a parent of target path")
|
|
else:
|
|
return
|
|
err.filename = str(source)
|
|
err.filename2 = str(target)
|
|
raise err
|
|
|
|
|
|
def ensure_different_files(source, target):
|
|
"""
|
|
Raise OSError(EINVAL) if both paths refer to the same file.
|
|
"""
|
|
try:
|
|
source_file_id = source.info._file_id
|
|
target_file_id = target.info._file_id
|
|
except AttributeError:
|
|
if source != target:
|
|
return
|
|
else:
|
|
try:
|
|
if source_file_id() != target_file_id():
|
|
return
|
|
except (OSError, ValueError):
|
|
return
|
|
err = OSError(EINVAL, "Source and target are the same file")
|
|
err.filename = str(source)
|
|
err.filename2 = str(target)
|
|
raise err
|
|
|
|
|
|
def copy_info(info, target, follow_symlinks=True):
|
|
"""Copy metadata from the given PathInfo to the given local path."""
|
|
copy_times_ns = (
|
|
hasattr(info, '_access_time_ns') and
|
|
hasattr(info, '_mod_time_ns') and
|
|
(follow_symlinks or os.utime in os.supports_follow_symlinks))
|
|
if copy_times_ns:
|
|
t0 = info._access_time_ns(follow_symlinks=follow_symlinks)
|
|
t1 = info._mod_time_ns(follow_symlinks=follow_symlinks)
|
|
os.utime(target, ns=(t0, t1), follow_symlinks=follow_symlinks)
|
|
|
|
# We must copy extended attributes before the file is (potentially)
|
|
# chmod()'ed read-only, otherwise setxattr() will error with -EACCES.
|
|
copy_xattrs = (
|
|
hasattr(info, '_xattrs') and
|
|
hasattr(os, 'setxattr') and
|
|
(follow_symlinks or os.setxattr in os.supports_follow_symlinks))
|
|
if copy_xattrs:
|
|
xattrs = info._xattrs(follow_symlinks=follow_symlinks)
|
|
for attr, value in xattrs:
|
|
try:
|
|
os.setxattr(target, attr, value, follow_symlinks=follow_symlinks)
|
|
except OSError as e:
|
|
if e.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
|
|
raise
|
|
|
|
copy_posix_permissions = (
|
|
hasattr(info, '_posix_permissions') and
|
|
(follow_symlinks or os.chmod in os.supports_follow_symlinks))
|
|
if copy_posix_permissions:
|
|
posix_permissions = info._posix_permissions(follow_symlinks=follow_symlinks)
|
|
try:
|
|
os.chmod(target, posix_permissions, follow_symlinks=follow_symlinks)
|
|
except NotImplementedError:
|
|
# if we got a NotImplementedError, it's because
|
|
# * follow_symlinks=False,
|
|
# * lchown() is unavailable, and
|
|
# * either
|
|
# * fchownat() is unavailable or
|
|
# * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW.
|
|
# (it returned ENOSUP.)
|
|
# therefore we're out of options--we simply cannot chown the
|
|
# symlink. give up, suppress the error.
|
|
# (which is what shutil always did in this circumstance.)
|
|
pass
|
|
|
|
copy_bsd_flags = (
|
|
hasattr(info, '_bsd_flags') and
|
|
hasattr(os, 'chflags') and
|
|
(follow_symlinks or os.chflags in os.supports_follow_symlinks))
|
|
if copy_bsd_flags:
|
|
bsd_flags = info._bsd_flags(follow_symlinks=follow_symlinks)
|
|
try:
|
|
os.chflags(target, bsd_flags, follow_symlinks=follow_symlinks)
|
|
except OSError as why:
|
|
if why.errno not in (EOPNOTSUPP, ENOTSUP):
|
|
raise
|
|
|
|
|
|
class _PathInfoBase:
|
|
__slots__ = ('_path', '_stat_result', '_lstat_result')
|
|
|
|
def __init__(self, path):
|
|
self._path = str(path)
|
|
|
|
def __repr__(self):
|
|
path_type = "WindowsPath" if os.name == "nt" else "PosixPath"
|
|
return f"<{path_type}.info>"
|
|
|
|
def _stat(self, *, follow_symlinks=True, ignore_errors=False):
|
|
"""Return the status as an os.stat_result, or None if stat() fails and
|
|
ignore_errors is true."""
|
|
if follow_symlinks:
|
|
try:
|
|
result = self._stat_result
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
if ignore_errors or result is not None:
|
|
return result
|
|
try:
|
|
self._stat_result = os.stat(self._path)
|
|
except (OSError, ValueError):
|
|
self._stat_result = None
|
|
if not ignore_errors:
|
|
raise
|
|
return self._stat_result
|
|
else:
|
|
try:
|
|
result = self._lstat_result
|
|
except AttributeError:
|
|
pass
|
|
else:
|
|
if ignore_errors or result is not None:
|
|
return result
|
|
try:
|
|
self._lstat_result = os.lstat(self._path)
|
|
except (OSError, ValueError):
|
|
self._lstat_result = None
|
|
if not ignore_errors:
|
|
raise
|
|
return self._lstat_result
|
|
|
|
def _posix_permissions(self, *, follow_symlinks=True):
|
|
"""Return the POSIX file permissions."""
|
|
return S_IMODE(self._stat(follow_symlinks=follow_symlinks).st_mode)
|
|
|
|
def _file_id(self, *, follow_symlinks=True):
|
|
"""Returns the identifier of the file."""
|
|
st = self._stat(follow_symlinks=follow_symlinks)
|
|
return st.st_dev, st.st_ino
|
|
|
|
def _access_time_ns(self, *, follow_symlinks=True):
|
|
"""Return the access time in nanoseconds."""
|
|
return self._stat(follow_symlinks=follow_symlinks).st_atime_ns
|
|
|
|
def _mod_time_ns(self, *, follow_symlinks=True):
|
|
"""Return the modify time in nanoseconds."""
|
|
return self._stat(follow_symlinks=follow_symlinks).st_mtime_ns
|
|
|
|
if hasattr(os.stat_result, 'st_flags'):
|
|
def _bsd_flags(self, *, follow_symlinks=True):
|
|
"""Return the flags."""
|
|
return self._stat(follow_symlinks=follow_symlinks).st_flags
|
|
|
|
if hasattr(os, 'listxattr'):
|
|
def _xattrs(self, *, follow_symlinks=True):
|
|
"""Return the xattrs as a list of (attr, value) pairs, or an empty
|
|
list if extended attributes aren't supported."""
|
|
try:
|
|
return [
|
|
(attr, os.getxattr(self._path, attr, follow_symlinks=follow_symlinks))
|
|
for attr in os.listxattr(self._path, follow_symlinks=follow_symlinks)]
|
|
except OSError as err:
|
|
if err.errno not in (EPERM, ENOTSUP, ENODATA, EINVAL, EACCES):
|
|
raise
|
|
return []
|
|
|
|
|
|
class _WindowsPathInfo(_PathInfoBase):
|
|
"""Implementation of pathlib.types.PathInfo that provides status
|
|
information for Windows paths. Don't try to construct it yourself."""
|
|
__slots__ = ('_exists', '_is_dir', '_is_file', '_is_symlink')
|
|
|
|
def exists(self, *, follow_symlinks=True):
|
|
"""Whether this path exists."""
|
|
if not follow_symlinks and self.is_symlink():
|
|
return True
|
|
try:
|
|
return self._exists
|
|
except AttributeError:
|
|
if os.path.exists(self._path):
|
|
self._exists = True
|
|
return True
|
|
else:
|
|
self._exists = self._is_dir = self._is_file = False
|
|
return False
|
|
|
|
def is_dir(self, *, follow_symlinks=True):
|
|
"""Whether this path is a directory."""
|
|
if not follow_symlinks and self.is_symlink():
|
|
return False
|
|
try:
|
|
return self._is_dir
|
|
except AttributeError:
|
|
if os.path.isdir(self._path):
|
|
self._is_dir = self._exists = True
|
|
return True
|
|
else:
|
|
self._is_dir = False
|
|
return False
|
|
|
|
def is_file(self, *, follow_symlinks=True):
|
|
"""Whether this path is a regular file."""
|
|
if not follow_symlinks and self.is_symlink():
|
|
return False
|
|
try:
|
|
return self._is_file
|
|
except AttributeError:
|
|
if os.path.isfile(self._path):
|
|
self._is_file = self._exists = True
|
|
return True
|
|
else:
|
|
self._is_file = False
|
|
return False
|
|
|
|
def is_symlink(self):
|
|
"""Whether this path is a symbolic link."""
|
|
try:
|
|
return self._is_symlink
|
|
except AttributeError:
|
|
self._is_symlink = os.path.islink(self._path)
|
|
return self._is_symlink
|
|
|
|
|
|
class _PosixPathInfo(_PathInfoBase):
|
|
"""Implementation of pathlib.types.PathInfo that provides status
|
|
information for POSIX paths. Don't try to construct it yourself."""
|
|
__slots__ = ()
|
|
|
|
def exists(self, *, follow_symlinks=True):
|
|
"""Whether this path exists."""
|
|
st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True)
|
|
if st is None:
|
|
return False
|
|
return True
|
|
|
|
def is_dir(self, *, follow_symlinks=True):
|
|
"""Whether this path is a directory."""
|
|
st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True)
|
|
if st is None:
|
|
return False
|
|
return S_ISDIR(st.st_mode)
|
|
|
|
def is_file(self, *, follow_symlinks=True):
|
|
"""Whether this path is a regular file."""
|
|
st = self._stat(follow_symlinks=follow_symlinks, ignore_errors=True)
|
|
if st is None:
|
|
return False
|
|
return S_ISREG(st.st_mode)
|
|
|
|
def is_symlink(self):
|
|
"""Whether this path is a symbolic link."""
|
|
st = self._stat(follow_symlinks=False, ignore_errors=True)
|
|
if st is None:
|
|
return False
|
|
return S_ISLNK(st.st_mode)
|
|
|
|
|
|
PathInfo = _WindowsPathInfo if os.name == 'nt' else _PosixPathInfo
|
|
|
|
|
|
class DirEntryInfo(_PathInfoBase):
|
|
"""Implementation of pathlib.types.PathInfo that provides status
|
|
information by querying a wrapped os.DirEntry object. Don't try to
|
|
construct it yourself."""
|
|
__slots__ = ('_entry',)
|
|
|
|
def __init__(self, entry):
|
|
super().__init__(entry.path)
|
|
self._entry = entry
|
|
|
|
def _stat(self, *, follow_symlinks=True, ignore_errors=False):
|
|
try:
|
|
return self._entry.stat(follow_symlinks=follow_symlinks)
|
|
except OSError:
|
|
if not ignore_errors:
|
|
raise
|
|
return None
|
|
|
|
def exists(self, *, follow_symlinks=True):
|
|
"""Whether this path exists."""
|
|
if not follow_symlinks:
|
|
return True
|
|
return self._stat(ignore_errors=True) is not None
|
|
|
|
def is_dir(self, *, follow_symlinks=True):
|
|
"""Whether this path is a directory."""
|
|
try:
|
|
return self._entry.is_dir(follow_symlinks=follow_symlinks)
|
|
except OSError:
|
|
return False
|
|
|
|
def is_file(self, *, follow_symlinks=True):
|
|
"""Whether this path is a regular file."""
|
|
try:
|
|
return self._entry.is_file(follow_symlinks=follow_symlinks)
|
|
except OSError:
|
|
return False
|
|
|
|
def is_symlink(self):
|
|
"""Whether this path is a symbolic link."""
|
|
try:
|
|
return self._entry.is_symlink()
|
|
except OSError:
|
|
return False
|