cpython/Lib/pathlib/_os.py
Barney Gale 9e6493849e
GH-139174: Prepare pathlib.Path.info for new methods (#139175)
Merge `_WindowsPathInfo` and `_PosixPathInfo` classes into a new
`_StatResultInfo` class. On Windows, this means relying on `os.stat()`
rather than `os.path.isfile()` and friends, which is a little slower. But
there's value in making the code easier to maintain, and we're going to
need the stat result for implementing `size()`, `mode()` etc.

Also move the classes from `pathlib._os` to `pathlib` proper.
2025-09-24 01:52:24 +00:00

303 lines
9.3 KiB
Python

"""
Low-level OS functionality wrappers used by pathlib.
"""
from errno import *
from io import TextIOWrapper, text_encoding
import os
import sys
try:
import fcntl
except ImportError:
fcntl = None
try:
import posix
except ImportError:
posix = None
try:
import _winapi
except ImportError:
_winapi = None
def _get_copy_blocksize(infd):
"""Determine blocksize for fastcopying on Linux.
Hopefully the whole file will be copied in a single call.
The copying itself should be performed in a loop 'till EOF is
reached (0 return) so a blocksize smaller or bigger than the actual
file size should not make any difference, also in case the file
content changes while being copied.
"""
try:
blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8 MiB
except OSError:
blocksize = 2 ** 27 # 128 MiB
# On 32-bit architectures truncate to 1 GiB to avoid OverflowError,
# see gh-82500.
if sys.maxsize < 2 ** 32:
blocksize = min(blocksize, 2 ** 30)
return blocksize
if fcntl and hasattr(fcntl, 'FICLONE'):
def _ficlone(source_fd, target_fd):
"""
Perform a lightweight copy of two files, where the data blocks are
copied only when modified. This is known as Copy on Write (CoW),
instantaneous copy or reflink.
"""
fcntl.ioctl(target_fd, fcntl.FICLONE, source_fd)
else:
_ficlone = None
if posix and hasattr(posix, '_fcopyfile'):
def _fcopyfile(source_fd, target_fd):
"""
Copy a regular file content using high-performance fcopyfile(3)
syscall (macOS).
"""
posix._fcopyfile(source_fd, target_fd, posix._COPYFILE_DATA)
else:
_fcopyfile = None
if hasattr(os, 'copy_file_range'):
def _copy_file_range(source_fd, target_fd):
"""
Copy data from one regular mmap-like fd to another by using a
high-performance copy_file_range(2) syscall that gives filesystems
an opportunity to implement the use of reflinks or server-side
copy.
This should work on Linux >= 4.5 only.
"""
blocksize = _get_copy_blocksize(source_fd)
offset = 0
while True:
sent = os.copy_file_range(source_fd, target_fd, blocksize,
offset_dst=offset)
if sent == 0:
break # EOF
offset += sent
else:
_copy_file_range = None
if hasattr(os, 'sendfile'):
def _sendfile(source_fd, target_fd):
"""Copy data from one regular mmap-like fd to another by using
high-performance sendfile(2) syscall.
This should work on Linux >= 2.6.33 only.
"""
blocksize = _get_copy_blocksize(source_fd)
offset = 0
while True:
sent = os.sendfile(target_fd, source_fd, offset, blocksize)
if sent == 0:
break # EOF
offset += sent
else:
_sendfile = None
if _winapi and hasattr(_winapi, 'CopyFile2'):
def copyfile2(source, target):
"""
Copy from one file to another using CopyFile2 (Windows only).
"""
_winapi.CopyFile2(source, target, 0)
else:
copyfile2 = None
def copyfileobj(source_f, target_f):
"""
Copy data from file-like object source_f to file-like object target_f.
"""
try:
source_fd = source_f.fileno()
target_fd = target_f.fileno()
except Exception:
pass # Fall through to generic code.
else:
try:
# Use OS copy-on-write where available.
if _ficlone:
try:
_ficlone(source_fd, target_fd)
return
except OSError as err:
if err.errno not in (EBADF, EOPNOTSUPP, ETXTBSY, EXDEV):
raise err
# Use OS copy where available.
if _fcopyfile:
try:
_fcopyfile(source_fd, target_fd)
return
except OSError as err:
if err.errno not in (EINVAL, ENOTSUP):
raise err
if _copy_file_range:
try:
_copy_file_range(source_fd, target_fd)
return
except OSError as err:
if err.errno not in (ETXTBSY, EXDEV):
raise err
if _sendfile:
try:
_sendfile(source_fd, target_fd)
return
except OSError as err:
if err.errno != ENOTSOCK:
raise err
except OSError as err:
# Produce more useful error messages.
err.filename = source_f.name
err.filename2 = target_f.name
raise err
# Last resort: copy with fileobj read() and write().
read_source = source_f.read
write_target = target_f.write
while buf := read_source(1024 * 1024):
write_target(buf)
def _open_reader(obj):
cls = type(obj)
try:
open_reader = cls.__open_reader__
except AttributeError:
cls_name = cls.__name__
raise TypeError(f"{cls_name} can't be opened for reading") from None
else:
return open_reader(obj)
def _open_writer(obj, mode):
cls = type(obj)
try:
open_writer = cls.__open_writer__
except AttributeError:
cls_name = cls.__name__
raise TypeError(f"{cls_name} can't be opened for writing") from None
else:
return open_writer(obj, mode)
def _open_updater(obj, mode):
cls = type(obj)
try:
open_updater = cls.__open_updater__
except AttributeError:
cls_name = cls.__name__
raise TypeError(f"{cls_name} can't be opened for updating") from None
else:
return open_updater(obj, mode)
def vfsopen(obj, mode='r', buffering=-1, encoding=None, errors=None,
newline=None):
"""
Open the file pointed to by this path and return a file object, as
the built-in open() function does.
Unlike the built-in open() function, this function additionally accepts
'openable' objects, which are objects with any of these special methods:
__open_reader__()
__open_writer__(mode)
__open_updater__(mode)
'__open_reader__' is called for 'r' mode; '__open_writer__' for 'a', 'w'
and 'x' modes; and '__open_updater__' for 'r+' and 'w+' modes. If text
mode is requested, the result is wrapped in an io.TextIOWrapper object.
"""
if buffering != -1:
raise ValueError("buffer size can't be customized")
text = 'b' not in mode
if text:
# Call io.text_encoding() here to ensure any warning is raised at an
# appropriate stack level.
encoding = text_encoding(encoding)
try:
return open(obj, mode, buffering, encoding, errors, newline)
except TypeError:
pass
if not text:
if encoding is not None:
raise ValueError("binary mode doesn't take an encoding argument")
if errors is not None:
raise ValueError("binary mode doesn't take an errors argument")
if newline is not None:
raise ValueError("binary mode doesn't take a newline argument")
mode = ''.join(sorted(c for c in mode if c not in 'bt'))
if mode == 'r':
stream = _open_reader(obj)
elif mode in ('a', 'w', 'x'):
stream = _open_writer(obj, mode)
elif mode in ('+r', '+w'):
stream = _open_updater(obj, mode[1])
else:
raise ValueError(f'invalid mode: {mode}')
if text:
stream = TextIOWrapper(stream, encoding, errors, newline)
return stream
def vfspath(obj):
"""
Return the string representation of a virtual path object.
"""
cls = type(obj)
try:
vfspath_method = cls.__vfspath__
except AttributeError:
cls_name = cls.__name__
raise TypeError(f"expected JoinablePath object, not {cls_name}") from None
else:
return vfspath_method(obj)
def ensure_distinct_paths(source, target):
"""
Raise OSError(EINVAL) if the other path is within this path.
"""
# Note: there is no straightforward, foolproof algorithm to determine
# if one directory is within another (a particularly perverse example
# would be a single network share mounted in one location via NFS, and
# in another location via CIFS), so we simply checks whether the
# other path is lexically equal to, or within, this path.
if source == target:
err = OSError(EINVAL, "Source and target are the same path")
elif source in target.parents:
err = OSError(EINVAL, "Source path is a parent of target path")
else:
return
err.filename = vfspath(source)
err.filename2 = vfspath(target)
raise err
def ensure_different_files(source, target):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
try:
source_file_id = source.info._file_id
target_file_id = target.info._file_id
except AttributeError:
if source != target:
return
else:
try:
if source_file_id() != target_file_id():
return
except (OSError, ValueError):
return
err = OSError(EINVAL, "Source and target are the same file")
err.filename = vfspath(source)
err.filename2 = vfspath(target)
raise err