mirror of
https://github.com/python/cpython.git
synced 2026-06-23 01:21:05 +00:00
[3.15] gh-149504: Fix re-entrancy bug when .pth/.start file invokes site.add sitedir() (#149659) (#149799)
* gh-149504: Fix re-entrancy bug when .pth/.start file invokes site.addsitedir() (#149659)
* Add re-entrant tests for gh-149504
* Add end-to-end integration test coverage
This ensures that future whitebox internal test changes do not regress the
public surface semantics.
* Implement a state class to process .pth and .start files
By using this state class and managing implicit and explicit batching, we make it structurally
impossible to get bitten by re-entrant site startup processing.
Fixes #149504
(cherry picked from commit b162307d7f)
* Add myself back to CODEOWNERS
This commit is contained in:
parent
b6503057b2
commit
45fc9acb8c
4 changed files with 746 additions and 416 deletions
6
.github/CODEOWNERS
vendored
6
.github/CODEOWNERS
vendored
|
|
@ -573,9 +573,9 @@ Lib/shutil.py @giampaolo
|
|||
Lib/test/test_shutil.py @giampaolo
|
||||
|
||||
# Site
|
||||
Lib/site.py @FFY00
|
||||
Lib/test/test_site.py @FFY00
|
||||
Doc/library/site.rst @FFY00
|
||||
Lib/site.py @FFY00 @warsaw
|
||||
Lib/test/test_site.py @FFY00 @warsaw
|
||||
Doc/library/site.rst @FFY00 @warsaw
|
||||
|
||||
# string.templatelib
|
||||
Doc/library/string.templatelib.rst @lysnikolaou @AA-Turner
|
||||
|
|
|
|||
486
Lib/site.py
486
Lib/site.py
|
|
@ -154,13 +154,37 @@ def _init_pathinfo():
|
|||
return d
|
||||
|
||||
|
||||
# Accumulated entry points from .start files across all site-packages
|
||||
# directories. Execution is deferred until all paths in .pth files have been
|
||||
# appended to sys.path. Map the .pth/.start file the data is found in to the
|
||||
# data.
|
||||
_pending_entrypoints = {}
|
||||
_pending_syspaths = {}
|
||||
_pending_importexecs = {}
|
||||
# PEP 829 implementation notes.
|
||||
#
|
||||
# Startup information (.pth and .start file information) can be processed in
|
||||
# implicit or explicit batches. Implicit batches are handled by the site.py
|
||||
# machinery automatically, while explicit batches are driven by user code and
|
||||
# processed on boundaries defined by that code.
|
||||
#
|
||||
# addsitedir() calls which use the default defer_processing_start_files=False
|
||||
# are self-contained: they create a per-call _StartupState, populate it from
|
||||
# the site directory's .pth/.start files, run process() on it, and then throw
|
||||
# the state away. This is implicit batching and in that case the
|
||||
# _startup_state global variable stays None.
|
||||
#
|
||||
# main() needs different semantics: it accumulates state across multiple
|
||||
# addsitedir() calls (user-site plus all global site-packages) so that
|
||||
# every sys.path extension is visible *before* any startup code (.pth
|
||||
# import lines and .start entry points) runs. Callers opt into this by
|
||||
# passing defer_processing_start_files=True, which preserves the _StartupState
|
||||
# into the global _startup_state. Subsequent addsitedir() calls (with
|
||||
# or without defer_processing_start_files=True) then write into that
|
||||
# same shared state, and a later process_startup_files() call flushes
|
||||
# all the state and resets the global to None.
|
||||
#
|
||||
# Here's the CRITICAL reentrancy invariant: process_startup_files() must clear
|
||||
# the global _startup_state *before* calling state.process(), so that any
|
||||
# reentrant site.addsitedir() calls reached from an exec'd .pth import line or
|
||||
# a .start entry point falls into the per-call branch and gets its own fresh
|
||||
# state. Otherwise the recursive addsitedir() would mutate the very dicts
|
||||
# that the outer state.process() is iterating. This is the bug reported in
|
||||
# gh-149504.
|
||||
_startup_state = None
|
||||
|
||||
|
||||
def _read_pthstart_file(sitedir, name, suffix):
|
||||
|
|
@ -194,13 +218,13 @@ def _read_pthstart_file(sitedir, name, suffix):
|
|||
return None, filename
|
||||
|
||||
try:
|
||||
# Accept BOM markers in .start and .pth files as we do in source files (Windows PowerShell
|
||||
# 5.1 makes it hard to emit UTF-8 files without a BOM).
|
||||
# Accept BOM markers in .start and .pth files as we do in source files
|
||||
# (Windows PowerShell 5.1 makes it hard to emit UTF-8 files without a BOM).
|
||||
content = raw_content.decode("utf-8-sig")
|
||||
except UnicodeDecodeError:
|
||||
_trace(f"Cannot read {filename!r} as UTF-8.")
|
||||
# For .pth files only, and then only until Python 3.20, fallback to locale encoding for
|
||||
# backward compatibility.
|
||||
# For .pth files only, and then only until Python 3.20, fall back to
|
||||
# locale encoding for backward compatibility.
|
||||
_warn_future_us(
|
||||
".pth files decoded to locale encoding as a fallback",
|
||||
remove=(3, 20)
|
||||
|
|
@ -214,153 +238,221 @@ def _read_pthstart_file(sitedir, name, suffix):
|
|||
return content.splitlines(), filename
|
||||
|
||||
|
||||
def _read_pth_file(sitedir, name, known_paths):
|
||||
"""Parse a .pth file, accumulating sys.path extensions and import lines.
|
||||
class _StartupState:
|
||||
"""Per-batch accumulator for .pth and .start file processing.
|
||||
|
||||
Errors on individual lines do not abort processing of the rest of the
|
||||
file (PEP 829).
|
||||
A _StartupState collects sys.path extensions, deprecated .pth import
|
||||
lines, and .start entry points read from one or more site-packages
|
||||
directories. Calling process() applies them in PEP 829 order: paths
|
||||
are added to sys.path first, then import lines from .pth files (skipping
|
||||
any with a matching .start), then entry points from .start files.
|
||||
|
||||
State lives entirely on the instance; there is no module-level pending
|
||||
state. This is what makes the module reentrancy-safe: a site.addsitedir()
|
||||
call reached recursively from an exec'd import line or a .start entry
|
||||
point operates on a different _StartupState than the one being processed
|
||||
by the outer call.
|
||||
|
||||
The internal data is intentionally private; the public methods
|
||||
(read_pth_file, read_start_file, process) are the only supported write
|
||||
APIs.
|
||||
"""
|
||||
lines, filename = _read_pthstart_file(sitedir, name, ".pth")
|
||||
if lines is None:
|
||||
return
|
||||
__slots__ = ('_syspaths', '_importexecs', '_entrypoints')
|
||||
|
||||
for n, line in enumerate(lines, 1):
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
def __init__(self):
|
||||
# All three dicts map "<full path to .pth or .start file>" -> list
|
||||
# of items collected from that file. Mapping by filename lets us
|
||||
# cross-reference a .pth and its matching .start (PEP 829 import
|
||||
# suppression rule) and lets _print_error report the source file
|
||||
# when an entry fails.
|
||||
self._syspaths = {}
|
||||
self._importexecs = {}
|
||||
self._entrypoints = {}
|
||||
|
||||
# In Python 3.18 and 3.19, `import` lines are silently ignored. In
|
||||
# Python 3.20 and beyond, issue a warning when `import` lines in .pth
|
||||
# files are detected.
|
||||
if line.startswith(("import ", "import\t")):
|
||||
_warn_future_us(
|
||||
"import lines in .pth files are silently ignored",
|
||||
remove=(3, 18)
|
||||
)
|
||||
_warn_future_us(
|
||||
"import lines in .pth files are noisily ignored",
|
||||
remove=(3, 20)
|
||||
)
|
||||
_pending_importexecs.setdefault(filename, []).append(line)
|
||||
continue
|
||||
def read_pth_file(self, sitedir, name, known_paths):
|
||||
"""Parse a .pth file, accumulating sys.path extensions and import lines.
|
||||
|
||||
try:
|
||||
dir_, dircase = makepath(sitedir, line)
|
||||
except Exception as exc:
|
||||
_trace(f"Error in {filename!r}, line {n:d}: {line!r}", exc)
|
||||
continue
|
||||
Errors on individual lines do not abort processing of the rest of
|
||||
the file (PEP 829). ``known_paths`` is the per-batch dedup
|
||||
ledger: any path already in it is skipped, and newly accepted
|
||||
paths are added to it so that subsequent .pth files in the same
|
||||
batch don't add them more than once.
|
||||
"""
|
||||
lines, filename = _read_pthstart_file(sitedir, name, ".pth")
|
||||
if lines is None:
|
||||
return
|
||||
|
||||
if dircase in known_paths:
|
||||
_trace(f"In {filename!r}, line {n:d}: "
|
||||
f"skipping duplicate sys.path entry: {dir_}")
|
||||
else:
|
||||
_pending_syspaths.setdefault(filename, []).append(dir_)
|
||||
known_paths.add(dircase)
|
||||
for n, line in enumerate(lines, 1):
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
# In Python 3.18 and 3.19, `import` lines are silently
|
||||
# ignored. In Python 3.20 and beyond, issue a warning when
|
||||
# `import` lines in .pth files are detected.
|
||||
if line.startswith(("import ", "import\t")):
|
||||
_warn_future_us(
|
||||
"import lines in .pth files are silently ignored",
|
||||
remove=(3, 18),
|
||||
)
|
||||
_warn_future_us(
|
||||
"import lines in .pth files are noisily ignored",
|
||||
remove=(3, 20),
|
||||
)
|
||||
self._importexecs.setdefault(filename, []).append(line)
|
||||
continue
|
||||
|
||||
def _read_start_file(sitedir, name):
|
||||
"""Parse a .start file for a list of entry point strings."""
|
||||
lines, filename = _read_pthstart_file(sitedir, name, ".start")
|
||||
if lines is None:
|
||||
return
|
||||
try:
|
||||
dir_, dircase = makepath(sitedir, line)
|
||||
except Exception as exc:
|
||||
_trace(f"Error in {filename!r}, line {n:d}: {line!r}", exc)
|
||||
continue
|
||||
|
||||
# PEP 829: the *presence* of a matching .start file disables `import`
|
||||
# line processing in the matched .pth file, regardless of whether the
|
||||
# .start file produced any entry points. Register the filename as a
|
||||
# key now so an empty (or comment-only) .start file still suppresses.
|
||||
entrypoints = _pending_entrypoints.setdefault(filename, [])
|
||||
|
||||
for n, line in enumerate(lines, 1):
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
# Syntax validation is deferred to entry-point execution time,
|
||||
# where pkgutil.resolve_name(strict=True) enforces the
|
||||
# pkg.mod:callable form.
|
||||
entrypoints.append(line)
|
||||
|
||||
|
||||
def _extend_syspath():
|
||||
# We've already filtered out duplicates, either in the existing sys.path
|
||||
# or in all the .pth files we've seen. We've also abspath/normpath'd all
|
||||
# the entries, so all that's left to do is to ensure that the path exists.
|
||||
for filename, dirs in _pending_syspaths.items():
|
||||
for dir_ in dirs:
|
||||
if os.path.exists(dir_):
|
||||
_trace(f"Extending sys.path with {dir_} from {filename}")
|
||||
sys.path.append(dir_)
|
||||
# PEP 829 dedup: skip paths already seen in this batch. See
|
||||
# _startup_state docstring above for batch lifetimes.
|
||||
if dircase in known_paths:
|
||||
_trace(
|
||||
f"In {filename!r}, line {n:d}: "
|
||||
f"skipping duplicate sys.path entry: {dir_}"
|
||||
)
|
||||
else:
|
||||
_print_error(
|
||||
f"In {filename}: {dir_} does not exist; "
|
||||
f"skipping sys.path append")
|
||||
self._syspaths.setdefault(filename, []).append(dir_)
|
||||
known_paths.add(dircase)
|
||||
|
||||
def read_start_file(self, sitedir, name):
|
||||
"""Parse a .start file for a list of entry point strings."""
|
||||
lines, filename = _read_pthstart_file(sitedir, name, ".start")
|
||||
if lines is None:
|
||||
return
|
||||
|
||||
def _exec_imports():
|
||||
# For all the `import` lines we've seen in .pth files, exec() them in
|
||||
# order. However, if they come from a file with a matching .start, then
|
||||
# we ignore these import lines. For the ones we do process, print a
|
||||
# warning but only when -v was given.
|
||||
for filename, imports in _pending_importexecs.items():
|
||||
name, dot, pth = filename.rpartition(".")
|
||||
assert dot == "." and pth == "pth", f"Bad startup filename: {filename}"
|
||||
# PEP 829: the *presence* of a matching .start file disables `import`
|
||||
# line processing in the matched .pth file, regardless of whether this
|
||||
# .start file contains any entry points. Register the filename as a
|
||||
# key now so an empty (or comment-only) .start file still suppresses.
|
||||
entrypoints = self._entrypoints.setdefault(filename, [])
|
||||
|
||||
if f"{name}.start" in _pending_entrypoints:
|
||||
# Skip import lines in favor of entry points.
|
||||
continue
|
||||
|
||||
_trace(
|
||||
f"import lines in {filename} are deprecated, "
|
||||
f"use entry points in a {name}.start file instead."
|
||||
)
|
||||
|
||||
for line in imports:
|
||||
try:
|
||||
_trace(f"Exec'ing from {filename}: {line}")
|
||||
exec(line)
|
||||
except Exception as exc:
|
||||
_print_error(
|
||||
f"Error in import line from {filename}: {line}", exc)
|
||||
|
||||
|
||||
def _execute_start_entrypoints():
|
||||
"""Execute all accumulated .start file entry points.
|
||||
|
||||
Called after all site-packages directories have been processed so that
|
||||
sys.path is fully populated before any entry point code runs. Uses
|
||||
pkgutil.resolve_name(strict=True) which both validates the strict
|
||||
pkg.mod:callable form and resolves the entry point in one step.
|
||||
"""
|
||||
for filename, entrypoints in _pending_entrypoints.items():
|
||||
for entrypoint in entrypoints:
|
||||
try:
|
||||
_trace(f"Executing entry point: {entrypoint} from {filename}")
|
||||
callable_ = pkgutil.resolve_name(entrypoint, strict=True)
|
||||
except ValueError as exc:
|
||||
_print_error(
|
||||
f"Invalid entry point syntax in {filename}: "
|
||||
f"{entrypoint!r}", exc)
|
||||
for n, line in enumerate(lines, 1):
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
except Exception as exc:
|
||||
_print_error(
|
||||
f"Error resolving entry point {entrypoint} "
|
||||
f"from {filename}", exc)
|
||||
# Syntax validation is deferred to entry point execution
|
||||
# time, where pkgutil.resolve_name(strict=True) enforces the
|
||||
# pkg.mod:callable form.
|
||||
entrypoints.append(line)
|
||||
|
||||
def process(self):
|
||||
"""Apply accumulated state in PEP 829 order.
|
||||
|
||||
Phase order matters: all .pth path extensions are applied to
|
||||
sys.path *before* any import line or .start entry point runs, so
|
||||
that an entry point may live in a module reachable only via a
|
||||
.pth-extended path.
|
||||
"""
|
||||
self._extend_syspath()
|
||||
self._exec_imports()
|
||||
self._execute_start_entrypoints()
|
||||
|
||||
def _extend_syspath(self):
|
||||
# Duplicates have already been filtered (in existing sys.path or
|
||||
# across .pth files via known_paths), and entries are already
|
||||
# abspath/normpath'd, so all that remains is to confirm the path
|
||||
# exists.
|
||||
for filename, dirs in self._syspaths.items():
|
||||
for dir_ in dirs:
|
||||
if os.path.exists(dir_):
|
||||
_trace(f"Extending sys.path with {dir_} from {filename}")
|
||||
sys.path.append(dir_)
|
||||
else:
|
||||
_print_error(
|
||||
f"In {filename}: {dir_} does not exist; "
|
||||
f"skipping sys.path append"
|
||||
)
|
||||
|
||||
def _exec_imports(self):
|
||||
# For each `import` line we've seen in a .pth file, exec() it in
|
||||
# order, unless the .pth has a matching .start file in this same
|
||||
# batch. In that case, PEP 829 says the import lines are
|
||||
# suppressed in favor of the .start's entry points.
|
||||
for filename, imports in self._importexecs.items():
|
||||
# Given "/path/to/foo.pth", check whether "/path/to/foo.start" was
|
||||
# registered in this same batch.
|
||||
name, dot, pth = filename.rpartition(".")
|
||||
assert dot == "." and pth == "pth", (
|
||||
f"Bad startup filename: {filename}"
|
||||
)
|
||||
if f"{name}.start" in self._entrypoints:
|
||||
_trace(
|
||||
f"import lines in {filename} are suppressed "
|
||||
f"due to matching {name}.start file."
|
||||
)
|
||||
continue
|
||||
try:
|
||||
callable_()
|
||||
except Exception as exc:
|
||||
_print_error(
|
||||
f"Error in entry point {entrypoint} from {filename}",
|
||||
exc)
|
||||
|
||||
_trace(
|
||||
f"import lines in {filename} are deprecated, "
|
||||
f"use entry points in a {name}.start file instead."
|
||||
)
|
||||
for line in imports:
|
||||
try:
|
||||
_trace(f"Exec'ing from {filename}: {line}")
|
||||
exec(line)
|
||||
except Exception as exc:
|
||||
_print_error(
|
||||
f"Error in import line from {filename}: {line}",
|
||||
exc,
|
||||
)
|
||||
|
||||
def _execute_start_entrypoints(self):
|
||||
# Resolve each entry point string to a callable via
|
||||
# pkgutil.resolve_name(strict=True), which both validates the
|
||||
# required pkg.mod:callable form and performs the import in one
|
||||
# step, then call it with no arguments.
|
||||
for filename, entrypoints in self._entrypoints.items():
|
||||
for entrypoint in entrypoints:
|
||||
try:
|
||||
_trace(
|
||||
f"Executing entry point: {entrypoint} from {filename}"
|
||||
)
|
||||
callable_ = pkgutil.resolve_name(entrypoint, strict=True)
|
||||
except ValueError as exc:
|
||||
_print_error(
|
||||
f"Invalid entry point syntax in {filename}: "
|
||||
f"{entrypoint!r}",
|
||||
exc,
|
||||
)
|
||||
except Exception as exc:
|
||||
_print_error(
|
||||
f"Error resolving entry point {entrypoint} "
|
||||
f"from {filename}",
|
||||
exc,
|
||||
)
|
||||
else:
|
||||
try:
|
||||
callable_()
|
||||
except Exception as exc:
|
||||
_print_error(
|
||||
f"Error in entry point {entrypoint} from {filename}",
|
||||
exc,
|
||||
)
|
||||
|
||||
|
||||
def process_startup_files():
|
||||
"""Flush all pending sys.path and entry points."""
|
||||
_extend_syspath()
|
||||
_exec_imports()
|
||||
_execute_start_entrypoints()
|
||||
_pending_syspaths.clear()
|
||||
_pending_importexecs.clear()
|
||||
_pending_entrypoints.clear()
|
||||
"""Flush any pending startup-file state accumulated during a batch.
|
||||
|
||||
Used by main() (and any external caller that drove addsitedir() with
|
||||
defer_processing_start_files=True) to apply the accumulated paths
|
||||
and run the deferred import lines / entry points.
|
||||
|
||||
Reentrancy: the active batch state is detached from _startup_state
|
||||
*before* state.process() runs. This way, if an exec'd import line
|
||||
or .start entry point itself calls site.addsitedir(), that call
|
||||
creates its own per-call _StartupState rather than mutating the dicts
|
||||
being iterated here. See gh-149504.
|
||||
"""
|
||||
global _startup_state
|
||||
if _startup_state is None:
|
||||
return
|
||||
state, _startup_state = _startup_state, None
|
||||
state.process()
|
||||
|
||||
|
||||
def addpackage(sitedir, name, known_paths):
|
||||
|
|
@ -370,16 +462,26 @@ def addpackage(sitedir, name, known_paths):
|
|||
reset = True
|
||||
else:
|
||||
reset = False
|
||||
_read_pth_file(sitedir, name, known_paths)
|
||||
process_startup_files()
|
||||
if reset:
|
||||
known_paths = None
|
||||
return known_paths
|
||||
|
||||
# If a batch is already in progress (for example, main() is still
|
||||
# accumulating sitedirs), participate in the batch by writing into the
|
||||
# shared _startup_state and letting the eventual process_startup_files()
|
||||
# flush it. Otherwise this is a standalone call, so create a unique
|
||||
# per-call state, populate it, and process it before returning.
|
||||
if _startup_state is None:
|
||||
state = _StartupState()
|
||||
state.read_pth_file(sitedir, name, known_paths)
|
||||
state.process()
|
||||
else:
|
||||
_startup_state.read_pth_file(sitedir, name, known_paths)
|
||||
|
||||
return None if reset else known_paths
|
||||
|
||||
|
||||
def addsitedir(sitedir, known_paths=None, *, defer_processing_start_files=False):
|
||||
"""Add 'sitedir' argument to sys.path if missing and handle startup
|
||||
files."""
|
||||
global _startup_state
|
||||
_trace(f"Adding directory: {sitedir!r}")
|
||||
if known_paths is None:
|
||||
known_paths = _init_pathinfo()
|
||||
|
|
@ -387,44 +489,74 @@ def addsitedir(sitedir, known_paths=None, *, defer_processing_start_files=False)
|
|||
else:
|
||||
reset = False
|
||||
sitedir, sitedircase = makepath(sitedir)
|
||||
if not sitedircase in known_paths:
|
||||
sys.path.append(sitedir) # Add path component
|
||||
|
||||
# If the normcase'd new sitedir isn't already known, append it to
|
||||
# sys.path, keep a record of it, and process all .pth and .start files
|
||||
# found in that directory. If the new sitedir is known, be sure not
|
||||
# to process all of those more than once! gh-75723
|
||||
if sitedircase not in known_paths:
|
||||
sys.path.append(sitedir)
|
||||
known_paths.add(sitedircase)
|
||||
try:
|
||||
names = os.listdir(sitedir)
|
||||
except OSError:
|
||||
return
|
||||
|
||||
# The following phases are defined by PEP 829.
|
||||
# Phases 1-3: Read .pth files, accumulating paths and import lines.
|
||||
pth_names = sorted(
|
||||
name for name in names
|
||||
if name.endswith(".pth") and not name.startswith(".")
|
||||
)
|
||||
for name in pth_names:
|
||||
_read_pth_file(sitedir, name, known_paths)
|
||||
try:
|
||||
names = os.listdir(sitedir)
|
||||
except OSError:
|
||||
return None if reset else known_paths
|
||||
|
||||
# Phases 6-7: Discover .start files and accumulate their entry points.
|
||||
# Import lines from .pth files with a matching .start file are discarded
|
||||
# at flush time by _exec_imports().
|
||||
start_names = sorted(
|
||||
name for name in names
|
||||
if name.endswith(".start") and not name.startswith(".")
|
||||
)
|
||||
for name in start_names:
|
||||
_read_start_file(sitedir, name)
|
||||
# Pick the _StartupState we'll write into. There are three cases:
|
||||
#
|
||||
# 1. A batch is already active (_startup_state is set, e.g. because
|
||||
# main() previously called us with
|
||||
# defer_processing_start_files=True). Participate in this batch by
|
||||
# sharing the same state. Don't flush the state since the batch's
|
||||
# eventual process_startup_files() will do that.
|
||||
#
|
||||
# 2. There is no active batch but the caller passed
|
||||
# defer_processing_start_files=True. Preserve a fresh
|
||||
# _StartupState into the global _startup_state so that subsequent
|
||||
# addsitedir() calls participate in this batch, and so that the
|
||||
# caller's later process_startup_files() finds it.
|
||||
#
|
||||
# 3. This is a standalone call (there is no active batch and
|
||||
# defer_processing_start_files=False). Create a unique per-call
|
||||
# state, populate it, process it, and then clear it. Per-call
|
||||
# state is what makes reentrant addsitedir() safe; a recursive call
|
||||
# from inside process() lands here too and gets its own independent
|
||||
# state.
|
||||
|
||||
# Generally, when addsitedir() is called explicitly, we'll want to process
|
||||
# all the startup file data immediately. However, when called through
|
||||
# main(), we'll want to batch up all the startup file processing. main()
|
||||
# will set this flag to True to defer processing.
|
||||
if not defer_processing_start_files:
|
||||
process_startup_files()
|
||||
if _startup_state is not None:
|
||||
state = _startup_state
|
||||
flush_now = False
|
||||
elif defer_processing_start_files:
|
||||
state = _startup_state = _StartupState()
|
||||
flush_now = False
|
||||
else:
|
||||
state = _StartupState()
|
||||
flush_now = True
|
||||
|
||||
if reset:
|
||||
known_paths = None
|
||||
# The following phases are defined by PEP 829.
|
||||
# Phases 1-3: Read .pth files, accumulating paths and import lines.
|
||||
pth_names = sorted(
|
||||
name for name in names
|
||||
if name.endswith(".pth") and not name.startswith(".")
|
||||
)
|
||||
for name in pth_names:
|
||||
state.read_pth_file(sitedir, name, known_paths)
|
||||
|
||||
return known_paths
|
||||
# Phases 6-7: Discover .start files and accumulate their entry points.
|
||||
# Import lines from .pth files with a matching .start file are
|
||||
# discarded at flush time by _StartupState._exec_imports().
|
||||
start_names = sorted(
|
||||
name for name in names
|
||||
if name.endswith(".start") and not name.startswith(".")
|
||||
)
|
||||
for name in start_names:
|
||||
state.read_start_file(sitedir, name)
|
||||
|
||||
if flush_now:
|
||||
state.process()
|
||||
|
||||
return None if reset else known_paths
|
||||
|
||||
|
||||
def check_enableusersite():
|
||||
|
|
|
|||
|
|
@ -196,8 +196,9 @@ def test_addsitedir_explicit_flush(self):
|
|||
pth_file.cleanup(prep=True)
|
||||
with pth_file.create():
|
||||
# Pass defer_processing_start_files=True to prevent flushing.
|
||||
site.addsitedir(pth_file.base_dir, set(),
|
||||
defer_processing_start_files=True)
|
||||
site.addsitedir(
|
||||
pth_file.base_dir, set(),
|
||||
defer_processing_start_files=True)
|
||||
self.assertNotIn(pth_file.imported, sys.modules)
|
||||
site.process_startup_files()
|
||||
self.pth_file_tests(pth_file)
|
||||
|
|
@ -423,15 +424,14 @@ def create(self):
|
|||
|
||||
Used as a context manager: self.cleanup() is called on exit.
|
||||
"""
|
||||
FILE = open(self.file_path, 'w')
|
||||
try:
|
||||
print("#import @bad module name", file=FILE)
|
||||
print("\n", file=FILE)
|
||||
print("import %s" % self.imported, file=FILE)
|
||||
print(self.good_dirname, file=FILE)
|
||||
print(self.bad_dirname, file=FILE)
|
||||
finally:
|
||||
FILE.close()
|
||||
with open(self.file_path, 'w') as fp:
|
||||
print(f"""\
|
||||
#import @bad module name
|
||||
import {self.imported}
|
||||
{self.good_dirname}
|
||||
{self.bad_dirname}
|
||||
""", file=fp)
|
||||
|
||||
os.mkdir(self.good_dir_path)
|
||||
try:
|
||||
yield self
|
||||
|
|
@ -915,18 +915,16 @@ class StartFileTests(unittest.TestCase):
|
|||
def setUp(self):
|
||||
self.enterContext(import_helper.DirsOnSysPath())
|
||||
self.tmpdir = self.sitedir = self.enterContext(os_helper.temp_dir())
|
||||
# Save and clear all pending dicts.
|
||||
self.saved_entrypoints = site._pending_entrypoints.copy()
|
||||
self.saved_syspaths = site._pending_syspaths.copy()
|
||||
self.saved_importexecs = site._pending_importexecs.copy()
|
||||
site._pending_entrypoints.clear()
|
||||
site._pending_syspaths.clear()
|
||||
site._pending_importexecs.clear()
|
||||
# Each test gets its own _StartupState to drive the parser and
|
||||
# processor methods directly. Defensively clear any _startup_state
|
||||
# that a prior test may have left set via defer_processing_start_files
|
||||
# without a corresponding process_startup_files() flush.
|
||||
self.state = site._StartupState()
|
||||
site._startup_state = None
|
||||
self.addCleanup(self._reset_startup_state)
|
||||
|
||||
def tearDown(self):
|
||||
site._pending_entrypoints = self.saved_entrypoints.copy()
|
||||
site._pending_syspaths = self.saved_syspaths.copy()
|
||||
site._pending_importexecs = self.saved_importexecs.copy()
|
||||
def _reset_startup_state(self):
|
||||
site._startup_state = None
|
||||
|
||||
def _make_start(self, content, name='testpkg'):
|
||||
"""Write a <name>.start file and return its basename."""
|
||||
|
|
@ -944,10 +942,32 @@ def _make_pth(self, content, name='testpkg'):
|
|||
f.write(content)
|
||||
return basename
|
||||
|
||||
def _make_mod(self, contents, name='mod', *, package=False, on_path=False):
|
||||
"""Write an importable module (or package), returning its parent dir."""
|
||||
extdir = os.path.join(self.sitedir, 'extdir')
|
||||
os.makedirs(extdir, exist_ok=True)
|
||||
|
||||
# Put the code in a package's dunder-init or flat module.
|
||||
if package:
|
||||
pkgdir = os.path.join(extdir, name)
|
||||
os.mkdir(pkgdir)
|
||||
modpath = os.path.join(pkgdir, '__init__.py')
|
||||
else:
|
||||
modpath = os.path.join(extdir, f'{name}.py')
|
||||
|
||||
with open(modpath, 'w') as fp:
|
||||
fp.write(contents)
|
||||
|
||||
self.addCleanup(sys.modules.pop, name, None)
|
||||
if on_path:
|
||||
# Don't worry, DirsOnSysPath() in setUp() will clean this up.
|
||||
sys.path.insert(0, extdir)
|
||||
return extdir
|
||||
|
||||
def _all_entrypoints(self):
|
||||
"""Flatten _pending_entrypoints dict into a list of (filename, entry) tuples."""
|
||||
"""Flatten state._entrypoints into a list of (filename, entry) tuples."""
|
||||
result = []
|
||||
for filename, entries in site._pending_entrypoints.items():
|
||||
for filename, entries in self.state._entrypoints.items():
|
||||
for entry in entries:
|
||||
result.append((filename, entry))
|
||||
return result
|
||||
|
|
@ -955,28 +975,42 @@ def _all_entrypoints(self):
|
|||
def _just_entrypoints(self):
|
||||
return [entry for filename, entry in self._all_entrypoints()]
|
||||
|
||||
# --- _read_start_file tests ---
|
||||
# There are two classes of tests here. Tests that start with `test_impl_`
|
||||
# know details about the implementation and they access non-public methods
|
||||
# and data structures to perform focused functional tests.
|
||||
#
|
||||
# Tests that start with `test_addsitedir_` are end-to-end tests that ensure
|
||||
# integration semantics and functionality as a caller of the public
|
||||
# surfaces would see.
|
||||
|
||||
def test_read_start_file_basic(self):
|
||||
# --- _StartupState.read_start_file tests ---
|
||||
|
||||
def test_impl_read_start_file_basic(self):
|
||||
self._make_start("os.path:join\n", name='foo')
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertEqual(site._pending_entrypoints[fullname], ['os.path:join'])
|
||||
self.assertEqual(
|
||||
self.state._entrypoints[fullname], ['os.path:join']
|
||||
)
|
||||
|
||||
def test_read_start_file_multiple_entries(self):
|
||||
def test_impl_read_start_file_multiple_entries(self):
|
||||
self._make_start("os.path:join\nos.path:exists\n", name='foo')
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertEqual(site._pending_entrypoints[fullname],
|
||||
['os.path:join', 'os.path:exists'])
|
||||
self.assertEqual(
|
||||
self.state._entrypoints[fullname],
|
||||
['os.path:join', 'os.path:exists'],
|
||||
)
|
||||
|
||||
def test_read_start_file_comments_and_blanks(self):
|
||||
def test_impl_read_start_file_comments_and_blanks(self):
|
||||
self._make_start("# a comment\n\nos.path:join\n \n", name='foo')
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertEqual(site._pending_entrypoints[fullname], ['os.path:join'])
|
||||
self.assertEqual(
|
||||
self.state._entrypoints[fullname], ['os.path:join']
|
||||
)
|
||||
|
||||
def test_read_start_file_accepts_all_non_blank_lines(self):
|
||||
def test_impl_read_start_file_accepts_all_non_blank_lines(self):
|
||||
# Syntax validation is deferred to entry-point execution time
|
||||
# (where pkgutil.resolve_name(strict=True) enforces the strict
|
||||
# pkg.mod:callable form), so parsing accepts every non-blank,
|
||||
|
|
@ -989,9 +1023,9 @@ def test_read_start_file_accepts_all_non_blank_lines(self):
|
|||
"os.path:join\n" # valid
|
||||
)
|
||||
self._make_start(content, name='foo')
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertEqual(site._pending_entrypoints[fullname], [
|
||||
self.assertEqual(self.state._entrypoints[fullname], [
|
||||
'os.path',
|
||||
'pkg.mod:',
|
||||
':callable',
|
||||
|
|
@ -999,155 +1033,169 @@ def test_read_start_file_accepts_all_non_blank_lines(self):
|
|||
'os.path:join',
|
||||
])
|
||||
|
||||
def test_read_start_file_empty(self):
|
||||
def test_impl_read_start_file_empty(self):
|
||||
# PEP 829: an empty .start file is still registered as present
|
||||
# (with an empty entry-point list) so that it suppresses `import`
|
||||
# (with an empty entry point list) so that it suppresses `import`
|
||||
# lines in any matching .pth file.
|
||||
self._make_start("", name='foo')
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertEqual(site._pending_entrypoints, {fullname: []})
|
||||
self.assertEqual(self.state._entrypoints, {fullname: []})
|
||||
|
||||
def test_read_start_file_comments_only(self):
|
||||
def test_impl_read_start_file_comments_only(self):
|
||||
# As with an empty file, a comments-only .start file is registered
|
||||
# as present so it can suppress matching .pth `import` lines.
|
||||
self._make_start("# just a comment\n# another\n", name='foo')
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertEqual(site._pending_entrypoints, {fullname: []})
|
||||
self.assertEqual(self.state._entrypoints, {fullname: []})
|
||||
|
||||
def test_read_start_file_nonexistent(self):
|
||||
def test_impl_read_start_file_nonexistent(self):
|
||||
with captured_stderr():
|
||||
site._read_start_file(self.tmpdir, 'nonexistent.start')
|
||||
self.assertEqual(site._pending_entrypoints, {})
|
||||
self.state.read_start_file(self.tmpdir, 'nonexistent.start')
|
||||
self.assertEqual(self.state._entrypoints, {})
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'chflags'), 'test needs os.chflags()')
|
||||
def test_read_start_file_hidden_flags(self):
|
||||
def test_impl_read_start_file_hidden_flags(self):
|
||||
self._make_start("os.path:join\n", name='foo')
|
||||
filepath = os.path.join(self.tmpdir, 'foo.start')
|
||||
st = os.stat(filepath)
|
||||
os.chflags(filepath, st.st_flags | stat.UF_HIDDEN)
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.assertEqual(site._pending_entrypoints, {})
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
self.assertEqual(self.state._entrypoints, {})
|
||||
|
||||
def test_read_start_file_duplicates_not_deduplicated(self):
|
||||
def test_impl_one_start_file_with_duplicates_not_deduplicated(self):
|
||||
# PEP 829: duplicate entry points are NOT deduplicated.
|
||||
self._make_start("os.path:join\nos.path:join\n", name='foo')
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertEqual(site._pending_entrypoints[fullname],
|
||||
['os.path:join', 'os.path:join'])
|
||||
self.assertEqual(
|
||||
self.state._entrypoints[fullname],
|
||||
['os.path:join', 'os.path:join'],
|
||||
)
|
||||
|
||||
def test_read_start_file_accepts_utf8_bom(self):
|
||||
def test_impl_two_start_files_with_duplicates_not_deduplicated(self):
|
||||
self._make_start("os.path:join", name="foo")
|
||||
self._make_start("os.path:join", name="bar")
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
self.state.read_start_file(self.sitedir, 'bar.start')
|
||||
self.assertEqual(
|
||||
self._just_entrypoints(),
|
||||
['os.path:join', 'os.path:join'],
|
||||
)
|
||||
|
||||
def test_impl_read_start_file_accepts_utf8_bom(self):
|
||||
# PEP 829: .start files MUST be utf-8-sig (UTF-8 with optional BOM).
|
||||
filepath = os.path.join(self.tmpdir, 'foo.start')
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(b'\xef\xbb\xbf' + b'os.path:join\n')
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertEqual(
|
||||
site._pending_entrypoints[fullname], ['os.path:join'])
|
||||
self.state._entrypoints[fullname], ['os.path:join']
|
||||
)
|
||||
|
||||
def test_read_start_file_invalid_utf8_silently_skipped(self):
|
||||
# PEP 829: .start files MUST be utf-8-sig. Unlike .pth, there is
|
||||
# no locale-encoding fallback -- a .start file that is not valid
|
||||
def test_impl_read_start_file_invalid_utf8_silently_skipped(self):
|
||||
# PEP 829: .start files MUST be utf-8-sig. Unlike .pth files, there
|
||||
# is no locale-encoding fallback. A .start file that is not valid
|
||||
# UTF-8 is silently skipped, with no key registered in
|
||||
# _pending_entrypoints and no output to stderr (parsing errors
|
||||
# are reported only under -v).
|
||||
# state._entrypoints and no output to stderr (parsing errors are
|
||||
# reported only under -v).
|
||||
filepath = os.path.join(self.tmpdir, 'foo.start')
|
||||
with open(filepath, 'wb') as f:
|
||||
# Bare continuation byte -- invalid as a UTF-8 start byte.
|
||||
f.write(b'\x80\x80\x80\n')
|
||||
with captured_stderr() as err:
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
self.assertEqual(site._pending_entrypoints, {})
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
self.assertEqual(self.state._entrypoints, {})
|
||||
self.assertEqual(err.getvalue(), "")
|
||||
|
||||
def test_two_start_files_with_duplicates_not_deduplicated(self):
|
||||
self._make_start("os.path:join", name="foo")
|
||||
self._make_start("os.path:join", name="bar")
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
site._read_start_file(self.sitedir, 'bar.start')
|
||||
self.assertEqual(self._just_entrypoints(),
|
||||
['os.path:join', 'os.path:join'])
|
||||
# --- _StartupState.read_pth_file tests ---
|
||||
|
||||
# --- _read_pth_file tests ---
|
||||
|
||||
def test_read_pth_file_paths(self):
|
||||
def test_impl_read_pth_file_paths(self):
|
||||
subdir = os.path.join(self.sitedir, 'mylib')
|
||||
os.mkdir(subdir)
|
||||
self._make_pth("mylib\n", name='foo')
|
||||
site._read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
self.state.read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
self.assertIn(subdir, site._pending_syspaths[fullname])
|
||||
self.assertIn(subdir, self.state._syspaths[fullname])
|
||||
|
||||
def test_read_pth_file_imports_collected(self):
|
||||
def test_impl_read_pth_file_imports_collected(self):
|
||||
self._make_pth("import sys\n", name='foo')
|
||||
site._read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
self.state.read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
self.assertEqual(site._pending_importexecs[fullname], ['import sys'])
|
||||
self.assertEqual(
|
||||
self.state._importexecs[fullname], ['import sys']
|
||||
)
|
||||
|
||||
def test_read_pth_file_comments_and_blanks(self):
|
||||
def test_impl_read_pth_file_comments_and_blanks(self):
|
||||
self._make_pth("# comment\n\n \n", name='foo')
|
||||
site._read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
self.assertEqual(site._pending_syspaths, {})
|
||||
self.assertEqual(site._pending_importexecs, {})
|
||||
self.state.read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
self.assertEqual(self.state._syspaths, {})
|
||||
self.assertEqual(self.state._importexecs, {})
|
||||
|
||||
def test_read_pth_file_deduplication(self):
|
||||
def test_impl_read_pth_file_deduplication(self):
|
||||
subdir = os.path.join(self.sitedir, 'mylib')
|
||||
os.mkdir(subdir)
|
||||
# An accumulator acts as a deduplication ledger.
|
||||
known_paths = set()
|
||||
self._make_pth("mylib\n", name='a')
|
||||
self._make_pth("mylib\n", name='b')
|
||||
site._read_pth_file(self.sitedir, 'a.pth', known_paths)
|
||||
site._read_pth_file(self.sitedir, 'b.pth', known_paths)
|
||||
# Only one entry across both files.
|
||||
self.state.read_pth_file(self.sitedir, 'a.pth', known_paths)
|
||||
self.state.read_pth_file(self.sitedir, 'b.pth', known_paths)
|
||||
# There is only one entry across both files.
|
||||
all_dirs = []
|
||||
for dirs in site._pending_syspaths.values():
|
||||
for dirs in self.state._syspaths.values():
|
||||
all_dirs.extend(dirs)
|
||||
self.assertEqual(all_dirs, [subdir])
|
||||
|
||||
def test_read_pth_file_bad_line_continues(self):
|
||||
# PEP 829: errors on individual lines don't abort the file.
|
||||
def test_impl_read_pth_file_bad_line_continues(self):
|
||||
# PEP 829: errors on individual lines don't abort processing the file.
|
||||
subdir = os.path.join(self.sitedir, 'goodpath')
|
||||
os.mkdir(subdir)
|
||||
self._make_pth("abc\x00def\ngoodpath\n", name='foo')
|
||||
with captured_stderr():
|
||||
site._read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
self.state.read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
self.assertIn(subdir, site._pending_syspaths.get(fullname, []))
|
||||
self.assertIn(subdir, self.state._syspaths.get(fullname, []))
|
||||
|
||||
def _flags_with_verbose(self, verbose):
|
||||
# Build a sys.flags clone with verbose overridden but every
|
||||
# other field preserved, so unrelated reads like
|
||||
# sys.flags.optimize during io.open_code() continue to work.
|
||||
attrs = {name: getattr(sys.flags, name)
|
||||
for name in sys.flags.__match_args__}
|
||||
attrs = {
|
||||
name: getattr(sys.flags, name)
|
||||
for name in sys.flags.__match_args__
|
||||
}
|
||||
attrs['verbose'] = verbose
|
||||
return SimpleNamespace(**attrs)
|
||||
|
||||
def test_read_pth_file_parse_error_silent_by_default(self):
|
||||
def test_impl_read_pth_file_parse_error_silent_by_default(self):
|
||||
# PEP 829: parse-time errors are silent unless -v is given.
|
||||
# Force the error path by making makepath() raise.
|
||||
# Force the error path by making makepath() raise an exception.
|
||||
self._make_pth("badline\n", name='foo')
|
||||
with mock.patch('site.makepath', side_effect=ValueError("boom")), \
|
||||
mock.patch('sys.flags', self._flags_with_verbose(False)), \
|
||||
captured_stderr() as err:
|
||||
site._read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
with (
|
||||
mock.patch('site.makepath', side_effect=ValueError("boom")),
|
||||
mock.patch('sys.flags', self._flags_with_verbose(False)),
|
||||
captured_stderr() as err,
|
||||
):
|
||||
self.state.read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
self.assertEqual(err.getvalue(), "")
|
||||
|
||||
def test_read_pth_file_parse_error_reported_under_verbose(self):
|
||||
def test_impl_read_pth_file_parse_error_reported_under_verbose(self):
|
||||
# PEP 829: parse-time errors are reported when -v is given.
|
||||
self._make_pth("badline\n", name='foo')
|
||||
with mock.patch('site.makepath', side_effect=ValueError("boom")), \
|
||||
mock.patch('sys.flags', self._flags_with_verbose(True)), \
|
||||
captured_stderr() as err:
|
||||
site._read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
with (
|
||||
mock.patch('site.makepath', side_effect=ValueError("boom")),
|
||||
mock.patch('sys.flags', self._flags_with_verbose(True)),
|
||||
captured_stderr() as err,
|
||||
):
|
||||
self.state.read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
out = err.getvalue()
|
||||
self.assertIn('Error in', out)
|
||||
self.assertIn('foo.pth', out)
|
||||
|
||||
def test_read_pth_file_locale_fallback(self):
|
||||
def test_impl_read_pth_file_locale_fallback(self):
|
||||
# PEP 829: .pth files that fail UTF-8 decoding fall back to the
|
||||
# locale encoding for backward compatibility (deprecated in
|
||||
# 3.15, to be removed in 3.20). Mock locale.getencoding() so
|
||||
|
|
@ -1158,186 +1206,236 @@ def test_read_pth_file_locale_fallback(self):
|
|||
# \xe9 is invalid UTF-8 but valid in latin-1.
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(b'# caf\xe9 comment\nmylib\n')
|
||||
with mock.patch('locale.getencoding', return_value='latin-1'), \
|
||||
captured_stderr():
|
||||
site._read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
with (
|
||||
mock.patch('locale.getencoding', return_value='latin-1'),
|
||||
captured_stderr(),
|
||||
):
|
||||
self.state.read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
self.assertIn(subdir, site._pending_syspaths.get(fullname, []))
|
||||
self.assertIn(subdir, self.state._syspaths.get(fullname, []))
|
||||
|
||||
# --- _execute_start_entrypoints tests ---
|
||||
# --- _StartupState._execute_start_entrypoints tests ---
|
||||
|
||||
def test_execute_entrypoints_with_callable(self):
|
||||
# Entrypoint with callable is invoked.
|
||||
mod_dir = os.path.join(self.sitedir, 'epmod')
|
||||
os.mkdir(mod_dir)
|
||||
init_file = os.path.join(mod_dir, '__init__.py')
|
||||
with open(init_file, 'w') as f:
|
||||
f.write("""\
|
||||
def test_impl_execute_entrypoints_with_callable(self):
|
||||
# An entry point with a callable.
|
||||
self._make_mod("""\
|
||||
called = False
|
||||
def startup():
|
||||
global called
|
||||
called = True
|
||||
""")
|
||||
sys.path.insert(0, self.sitedir)
|
||||
self.addCleanup(sys.modules.pop, 'epmod', None)
|
||||
""", name='epmod', package=True, on_path=True)
|
||||
fullname = os.path.join(self.sitedir, 'epmod.start')
|
||||
site._pending_entrypoints[fullname] = ['epmod:startup']
|
||||
site._execute_start_entrypoints()
|
||||
self.state._entrypoints[fullname] = ['epmod:startup']
|
||||
self.state._execute_start_entrypoints()
|
||||
import epmod
|
||||
self.assertTrue(epmod.called)
|
||||
|
||||
def test_execute_entrypoints_import_error(self):
|
||||
# Import error prints traceback but continues.
|
||||
def test_impl_execute_entrypoints_import_error(self):
|
||||
# Import errors print a traceback and continue.
|
||||
fullname = os.path.join(self.sitedir, 'bad.start')
|
||||
site._pending_entrypoints[fullname] = [
|
||||
'nosuchmodule_xyz:func', 'os.path:join']
|
||||
self.state._entrypoints[fullname] = [
|
||||
'nosuchmodule_xyz:func', 'os.path:join',
|
||||
]
|
||||
with captured_stderr() as err:
|
||||
site._execute_start_entrypoints()
|
||||
self.state._execute_start_entrypoints()
|
||||
self.assertIn('nosuchmodule_xyz', err.getvalue())
|
||||
# os.path:join should still have been called (no exception for it)
|
||||
|
||||
def test_execute_entrypoints_strict_syntax_rejection(self):
|
||||
# PEP 829: only the strict pkg.mod:callable form is valid.
|
||||
# At entry-point execution, pkgutil.resolve_name(strict=True)
|
||||
# raises ValueError for invalid syntax; the invalid entry is
|
||||
# reported and execution continues with the next one.
|
||||
def test_impl_execute_entrypoints_strict_syntax_rejection(self):
|
||||
# PEP 829: only the strict pkg.mod:callable form is valid. At entry
|
||||
# point execution time, pkgutil.resolve_name(strict=True) raises a
|
||||
# ValueError for the invalid syntax. The invalid entry is reported
|
||||
# and execution continues with the next one.
|
||||
fullname = os.path.join(self.sitedir, 'bad.start')
|
||||
site._pending_entrypoints[fullname] = [
|
||||
self.state._entrypoints[fullname] = [
|
||||
'os.path', # no colon
|
||||
'pkg.mod:', # empty callable
|
||||
':callable', # empty module
|
||||
'pkg.mod:callable:extra', # multiple colons
|
||||
]
|
||||
with captured_stderr() as err:
|
||||
site._execute_start_entrypoints()
|
||||
self.state._execute_start_entrypoints()
|
||||
out = err.getvalue()
|
||||
self.assertIn('Invalid entry point syntax', out)
|
||||
for bad in ('os.path', 'pkg.mod:', ':callable',
|
||||
'pkg.mod:callable:extra'):
|
||||
for bad in (
|
||||
'os.path',
|
||||
'pkg.mod:',
|
||||
':callable',
|
||||
'pkg.mod:callable:extra',
|
||||
):
|
||||
self.assertIn(bad, out)
|
||||
|
||||
def test_execute_entrypoints_callable_error(self):
|
||||
# Callable that raises prints traceback but continues.
|
||||
mod_dir = os.path.join(self.sitedir, 'badmod')
|
||||
os.mkdir(mod_dir)
|
||||
init_file = os.path.join(mod_dir, '__init__.py')
|
||||
with open(init_file, 'w') as f:
|
||||
f.write("""\
|
||||
def test_impl_execute_entrypoints_callable_error(self):
|
||||
# A callable that errors prints a traceback but continues.
|
||||
self._make_mod("""\
|
||||
def fail():
|
||||
raise RuntimeError("boom")
|
||||
""")
|
||||
sys.path.insert(0, self.sitedir)
|
||||
self.addCleanup(sys.modules.pop, 'badmod', None)
|
||||
""", name='badmod', package=True, on_path=True)
|
||||
fullname = os.path.join(self.sitedir, 'badmod.start')
|
||||
site._pending_entrypoints[fullname] = ['badmod:fail']
|
||||
self.state._entrypoints[fullname] = ['badmod:fail']
|
||||
with captured_stderr() as err:
|
||||
site._execute_start_entrypoints()
|
||||
self.state._execute_start_entrypoints()
|
||||
self.assertIn('RuntimeError', err.getvalue())
|
||||
self.assertIn('boom', err.getvalue())
|
||||
|
||||
def test_execute_entrypoints_duplicates_called_twice(self):
|
||||
def test_impl_execute_entrypoints_duplicates_called_twice(self):
|
||||
# PEP 829: duplicate entry points execute multiple times.
|
||||
mod_dir = os.path.join(self.sitedir, 'countmod')
|
||||
os.mkdir(mod_dir)
|
||||
init_file = os.path.join(mod_dir, '__init__.py')
|
||||
with open(init_file, 'w') as f:
|
||||
f.write("""\
|
||||
self._make_mod("""\
|
||||
call_count = 0
|
||||
def bump():
|
||||
global call_count
|
||||
call_count += 1
|
||||
""")
|
||||
sys.path.insert(0, self.sitedir)
|
||||
self.addCleanup(sys.modules.pop, 'countmod', None)
|
||||
""", name='countmod', package=False, on_path=True)
|
||||
fullname = os.path.join(self.sitedir, 'countmod.start')
|
||||
site._pending_entrypoints[fullname] = [
|
||||
'countmod:bump', 'countmod:bump']
|
||||
site._execute_start_entrypoints()
|
||||
self.state._entrypoints[fullname] = [
|
||||
'countmod:bump', 'countmod:bump',
|
||||
]
|
||||
self.state._execute_start_entrypoints()
|
||||
import countmod
|
||||
self.assertEqual(countmod.call_count, 2)
|
||||
|
||||
# --- _exec_imports tests ---
|
||||
# --- _StartupState._exec_imports tests ---
|
||||
|
||||
def test_exec_imports_suppressed_by_matching_start(self):
|
||||
def test_impl_exec_imports_suppressed_by_matching_start(self):
|
||||
# Import lines from foo.pth are suppressed when foo.start exists.
|
||||
self._make_mod("""\
|
||||
call_count = 0
|
||||
def bump():
|
||||
global call_count
|
||||
call_count += 1
|
||||
""", name='countmod', package=False, on_path=True)
|
||||
pth_fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
start_fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
site._pending_importexecs[pth_fullname] = ['import sys']
|
||||
site._pending_entrypoints[start_fullname] = ['os.path:join']
|
||||
# Should not exec the import line; no error expected.
|
||||
site._exec_imports()
|
||||
self.state._importexecs[pth_fullname] = ['import countmod; countmod.bump()']
|
||||
self.state._entrypoints[start_fullname] = ['os.path:join']
|
||||
self.state._exec_imports()
|
||||
import countmod
|
||||
self.assertEqual(countmod.call_count, 0)
|
||||
|
||||
def test_exec_imports_not_suppressed_by_different_start(self):
|
||||
def test_impl_exec_imports_not_suppressed_by_different_start(self):
|
||||
# Import lines from foo.pth are NOT suppressed by bar.start.
|
||||
self._make_mod("""\
|
||||
call_count = 0
|
||||
def bump():
|
||||
global call_count
|
||||
call_count += 1
|
||||
""", name='countmod', package=False, on_path=True)
|
||||
pth_fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
start_fullname = os.path.join(self.sitedir, 'bar.start')
|
||||
site._pending_importexecs[pth_fullname] = ['import sys']
|
||||
site._pending_entrypoints[start_fullname] = ['os.path:join']
|
||||
# Should execute the import line without error.
|
||||
site._exec_imports()
|
||||
self.state._importexecs[pth_fullname] = ['import countmod; countmod.bump()']
|
||||
self.state._entrypoints[start_fullname] = ['os.path:join']
|
||||
self.state._exec_imports()
|
||||
import countmod
|
||||
self.assertEqual(countmod.call_count, 1)
|
||||
|
||||
def test_exec_imports_suppressed_by_empty_matching_start(self):
|
||||
def test_impl_exec_imports_suppressed_by_empty_matching_start(self):
|
||||
self._make_start("", name='foo')
|
||||
self._make_pth("import epmod; epmod.startup()", name='foo')
|
||||
mod_dir = os.path.join(self.sitedir, 'epmod')
|
||||
os.mkdir(mod_dir)
|
||||
init_file = os.path.join(mod_dir, '__init__.py')
|
||||
with open(init_file, 'w') as f:
|
||||
f.write("""\
|
||||
self._make_mod("""\
|
||||
called = False
|
||||
def startup():
|
||||
global called
|
||||
called = True
|
||||
""")
|
||||
sys.path.insert(0, self.sitedir)
|
||||
self.addCleanup(sys.modules.pop, 'epmod', None)
|
||||
site._read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
site._read_start_file(self.sitedir, 'foo.start')
|
||||
site._exec_imports()
|
||||
""", name='epmod', package=True, on_path=True)
|
||||
self.state.read_pth_file(self.sitedir, 'foo.pth', set())
|
||||
self.state.read_start_file(self.sitedir, 'foo.start')
|
||||
self.state._exec_imports()
|
||||
import epmod
|
||||
self.assertFalse(epmod.called)
|
||||
|
||||
# --- _extend_syspath tests ---
|
||||
# --- _StartupState._extend_syspath tests ---
|
||||
|
||||
def test_extend_syspath_existing_dir(self):
|
||||
def test_impl_extend_syspath_existing_dir(self):
|
||||
subdir = os.path.join(self.sitedir, 'extlib')
|
||||
os.mkdir(subdir)
|
||||
site._pending_syspaths['test.pth'] = [subdir]
|
||||
site._extend_syspath()
|
||||
self.state._syspaths['test.pth'] = [subdir]
|
||||
self.state._extend_syspath()
|
||||
self.assertIn(subdir, sys.path)
|
||||
|
||||
def test_extend_syspath_nonexistent_dir(self):
|
||||
nosuch = os.path.join(self.sitedir, 'nosuchdir')
|
||||
site._pending_syspaths['test.pth'] = [nosuch]
|
||||
def test_impl_extend_syspath_nonexistent_dir(self):
|
||||
nonesuch = os.path.join(self.sitedir, 'nosuchdir')
|
||||
self.state._syspaths['test.pth'] = [nonesuch]
|
||||
with captured_stderr() as err:
|
||||
site._extend_syspath()
|
||||
self.assertNotIn(nosuch, sys.path)
|
||||
self.state._extend_syspath()
|
||||
self.assertNotIn(nonesuch, sys.path)
|
||||
self.assertIn('does not exist', err.getvalue())
|
||||
|
||||
# --- addsitedir integration tests ---
|
||||
|
||||
def test_addsitedir_pth_import_skipped_when_matching_start_exists(self):
|
||||
# PEP 829: an empty .start file disables the matching .pth's import
|
||||
# lines, even when the .start has no entry points of its own.
|
||||
self._make_mod("flag = False\n", name='suppressed', on_path=True)
|
||||
self._make_start("", name='foo')
|
||||
self._make_pth(
|
||||
"import suppressed; suppressed.flag = True\n",
|
||||
name='foo')
|
||||
site.addsitedir(self.sitedir, set())
|
||||
import suppressed
|
||||
self.assertFalse(
|
||||
suppressed.flag,
|
||||
"import line in foo.pth should be suppressed by foo.start")
|
||||
|
||||
def test_addsitedir_dotfile_start_entrypoint_not_executed(self):
|
||||
# .start files starting with '.' are skipped, so their entry
|
||||
# points must not run.
|
||||
self._make_mod("""\
|
||||
called = False
|
||||
def hook():
|
||||
global called
|
||||
called = True
|
||||
""",
|
||||
name='dotted', on_path=True)
|
||||
self._make_start("dotted:hook\n", name='.hidden')
|
||||
site.addsitedir(self.sitedir, set())
|
||||
import dotted
|
||||
self.assertFalse(dotted.called)
|
||||
|
||||
def test_addsitedir_dedups_paths_across_pth_files(self):
|
||||
# PEP 829: when multiple .pth files reference the same path within
|
||||
# a single addsitedir() invocation, the path is appended to
|
||||
# sys.path exactly once.
|
||||
subdir = os.path.join(self.sitedir, 'shared')
|
||||
os.mkdir(subdir)
|
||||
self._make_pth("shared\n", name='a')
|
||||
self._make_pth("shared\n", name='b')
|
||||
before = sys.path.count(subdir)
|
||||
site.addsitedir(self.sitedir, set())
|
||||
self.assertEqual(sys.path.count(subdir), before + 1)
|
||||
|
||||
def test_addsitedir_discovers_start_files(self):
|
||||
# addsitedir() should discover .start files and accumulate entries.
|
||||
# With defer_processing_start_files=True the preserved state lives on
|
||||
# site._startup_state and isn't flushed until the caller invokes
|
||||
# process_startup_files().
|
||||
self._make_start("os.path:join\n", name='foo')
|
||||
site.addsitedir(self.sitedir, set(),
|
||||
defer_processing_start_files=True)
|
||||
site.addsitedir(
|
||||
self.sitedir, set(),
|
||||
defer_processing_start_files=True,
|
||||
)
|
||||
fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertIn('os.path:join', site._pending_entrypoints[fullname])
|
||||
self.assertIn(
|
||||
'os.path:join', site._startup_state._entrypoints[fullname]
|
||||
)
|
||||
|
||||
def test_addsitedir_start_suppresses_pth_imports(self):
|
||||
def test_impl_exec_imports_skips_when_matching_start(self):
|
||||
# When foo.start exists, import lines in foo.pth are skipped
|
||||
# at flush time by _exec_imports().
|
||||
# at flush time by _StartupState._exec_imports().
|
||||
self._make_start("os.path:join\n", name='foo')
|
||||
self._make_pth("import sys\n", name='foo')
|
||||
site.addsitedir(self.sitedir, set(),
|
||||
defer_processing_start_files=True)
|
||||
site.addsitedir(
|
||||
self.sitedir, set(),
|
||||
defer_processing_start_files=True,
|
||||
)
|
||||
pth_fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
start_fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
# Import line was collected...
|
||||
self.assertIn('import sys',
|
||||
site._pending_importexecs.get(pth_fullname, []))
|
||||
self.assertIn(
|
||||
'import sys',
|
||||
site._startup_state._importexecs.get(pth_fullname, []),
|
||||
)
|
||||
# ...but _exec_imports() will skip it because foo.start exists.
|
||||
site._exec_imports()
|
||||
site._startup_state._exec_imports()
|
||||
|
||||
def test_addsitedir_pth_paths_still_work_with_start(self):
|
||||
# Path lines in .pth files still work even when a .start file exists.
|
||||
|
|
@ -1345,17 +1443,26 @@ def test_addsitedir_pth_paths_still_work_with_start(self):
|
|||
os.mkdir(subdir)
|
||||
self._make_start("os.path:join\n", name='foo')
|
||||
self._make_pth("mylib\n", name='foo')
|
||||
site.addsitedir(self.sitedir, set(),
|
||||
defer_processing_start_files=True)
|
||||
site.addsitedir(
|
||||
self.sitedir, set(),
|
||||
defer_processing_start_files=True,
|
||||
)
|
||||
fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
self.assertIn(subdir, site._pending_syspaths.get(fullname, []))
|
||||
self.assertIn(
|
||||
subdir, site._startup_state._syspaths.get(fullname, [])
|
||||
)
|
||||
|
||||
def test_addsitedir_start_alphabetical_order(self):
|
||||
# Multiple .start files are discovered alphabetically.
|
||||
# _all_entrypoints() reads from self.state, so swap in the
|
||||
# preserved batch state for the duration of the assertion.
|
||||
self._make_start("os.path:join\n", name='zzz')
|
||||
self._make_start("os.path:exists\n", name='aaa')
|
||||
site.addsitedir(self.sitedir, set(),
|
||||
defer_processing_start_files=True)
|
||||
site.addsitedir(
|
||||
self.sitedir, set(),
|
||||
defer_processing_start_files=True,
|
||||
)
|
||||
self.state = site._startup_state
|
||||
all_entries = self._all_entrypoints()
|
||||
entries = [entry for _, entry in all_entries]
|
||||
idx_a = entries.index('os.path:exists')
|
||||
|
|
@ -1370,49 +1477,65 @@ def test_addsitedir_pth_before_start(self):
|
|||
os.mkdir(subdir)
|
||||
self._make_pth("mylib\n", name='foo')
|
||||
self._make_start("os.path:join\n", name='foo')
|
||||
site.addsitedir(self.sitedir, set(),
|
||||
defer_processing_start_files=True)
|
||||
site.addsitedir(
|
||||
self.sitedir, set(),
|
||||
defer_processing_start_files=True,
|
||||
)
|
||||
# Both should be collected.
|
||||
pth_fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
start_fullname = os.path.join(self.sitedir, 'foo.start')
|
||||
self.assertIn(subdir, site._pending_syspaths.get(pth_fullname, []))
|
||||
self.assertIn('os.path:join',
|
||||
site._pending_entrypoints.get(start_fullname, []))
|
||||
self.assertIn(
|
||||
subdir, site._startup_state._syspaths.get(pth_fullname, [])
|
||||
)
|
||||
self.assertIn(
|
||||
'os.path:join',
|
||||
site._startup_state._entrypoints.get(start_fullname, []),
|
||||
)
|
||||
|
||||
def test_addsitedir_dotfile_start_ignored(self):
|
||||
def test_impl_addsitedir_skips_dotfile_start(self):
|
||||
# .start files starting with '.' are skipped. Defer flushing so
|
||||
# the assertion against _pending_entrypoints is meaningful;
|
||||
# otherwise process_startup_files() would clear the dict
|
||||
# regardless of whether the dotfile was picked up.
|
||||
# the preserved batch state stays inspectable on
|
||||
# site._startup_state; otherwise process_startup_files() would
|
||||
# detach and consume it regardless of whether the dotfile was
|
||||
# picked up.
|
||||
self._make_start("os.path:join\n", name='.hidden')
|
||||
site.addsitedir(self.sitedir, set(),
|
||||
defer_processing_start_files=True)
|
||||
self.assertEqual(site._pending_entrypoints, {})
|
||||
site.addsitedir(
|
||||
self.sitedir, set(),
|
||||
defer_processing_start_files=True,
|
||||
)
|
||||
self.assertEqual(site._startup_state._entrypoints, {})
|
||||
|
||||
def test_addsitedir_standalone_flushes(self):
|
||||
# When called with known_paths=None (standalone), addsitedir
|
||||
# flushes immediately so the caller sees the effect.
|
||||
# When called with defer_processing_start_files=False (the
|
||||
# default), addsitedir creates a per-call _StartupState and
|
||||
# processes it before returning, so the caller sees the effect
|
||||
# immediately. No batch state is left behind on
|
||||
# site._startup_state.
|
||||
subdir = os.path.join(self.sitedir, 'flushlib')
|
||||
os.mkdir(subdir)
|
||||
self._make_pth("flushlib\n", name='foo')
|
||||
site.addsitedir(self.sitedir) # known_paths=None
|
||||
self.assertIn(subdir, sys.path)
|
||||
# Pending dicts should be cleared after flush.
|
||||
self.assertEqual(site._pending_syspaths, {})
|
||||
self.assertIsNone(site._startup_state)
|
||||
|
||||
def test_addsitedir_defer_does_not_flush(self):
|
||||
# With defer_processing_start_files=True, addsitedir accumulates
|
||||
# pending state but does not flush; sys.path is updated only when
|
||||
# process_startup_files() is called explicitly.
|
||||
# process_startup_files() is called explicitly. The accumulated
|
||||
# state lives on the lazily-promoted site._startup_state.
|
||||
subdir = os.path.join(self.sitedir, 'acclib')
|
||||
os.mkdir(subdir)
|
||||
self._make_pth("acclib\n", name='foo')
|
||||
site.addsitedir(self.sitedir, set(),
|
||||
defer_processing_start_files=True)
|
||||
site.addsitedir(
|
||||
self.sitedir, set(),
|
||||
defer_processing_start_files=True,
|
||||
)
|
||||
# Path is pending, not yet on sys.path.
|
||||
self.assertNotIn(subdir, sys.path)
|
||||
fullname = os.path.join(self.sitedir, 'foo.pth')
|
||||
self.assertIn(subdir, site._pending_syspaths.get(fullname, []))
|
||||
self.assertIn(
|
||||
subdir, site._startup_state._syspaths.get(fullname, [])
|
||||
)
|
||||
|
||||
def test_pth_path_is_available_to_start_entrypoint(self):
|
||||
# Core PEP 829 invariant: all .pth path extensions are applied to
|
||||
|
|
@ -1420,18 +1543,12 @@ def test_pth_path_is_available_to_start_entrypoint(self):
|
|||
# point may live in a module reachable only via a .pth-extended
|
||||
# path. If the flush phases were inverted, resolving the entry
|
||||
# point would fail with ModuleNotFoundError.
|
||||
extdir = os.path.join(self.sitedir, 'extdir')
|
||||
os.mkdir(extdir)
|
||||
modpath = os.path.join(extdir, 'mod.py')
|
||||
with open(modpath, 'w') as f:
|
||||
f.write("""\
|
||||
extdir = self._make_mod("""\
|
||||
called = False
|
||||
def hook():
|
||||
global called
|
||||
called = True
|
||||
""")
|
||||
self.addCleanup(sys.modules.pop, 'mod', None)
|
||||
|
||||
# extdir is not on sys.path; only the .pth file makes it so.
|
||||
self.assertNotIn(extdir, sys.path)
|
||||
self._make_pth("extdir\n", name='extlib')
|
||||
|
|
@ -1447,6 +1564,82 @@ def hook():
|
|||
"entry point did not run; .pth path was likely not applied "
|
||||
"before .start entry-point execution")
|
||||
|
||||
# --- bugs ---
|
||||
|
||||
# gh-75723
|
||||
def test_addsitdir_idempotent_pth(self):
|
||||
# Adding the same sitedir twice with a known_paths, should not
|
||||
# process .pth files twice.
|
||||
extdir = self._make_mod("""\
|
||||
_pth_count = 0
|
||||
""")
|
||||
self._make_pth(f"""\
|
||||
{extdir}
|
||||
import mod; mod._pth_count += 1
|
||||
""")
|
||||
dirs = set()
|
||||
dirs = site.addsitedir(self.sitedir, dirs)
|
||||
dirs = site.addsitedir(self.sitedir, dirs)
|
||||
import mod
|
||||
self.assertEqual(mod._pth_count, 1)
|
||||
|
||||
def test_addsitdir_idempotent_start(self):
|
||||
# Adding the same sitedir twice with a known_paths, should not
|
||||
# process .pth files twice.
|
||||
extdir = self._make_mod("""\
|
||||
_pth_count = 0
|
||||
def increment():
|
||||
global _pth_count
|
||||
_pth_count += 1
|
||||
""")
|
||||
self._make_pth(f"""\
|
||||
{extdir}
|
||||
""")
|
||||
self._make_start("""\
|
||||
mod:increment
|
||||
""")
|
||||
dirs = set()
|
||||
dirs = site.addsitedir(self.sitedir, dirs)
|
||||
dirs = site.addsitedir(self.sitedir, dirs)
|
||||
import mod
|
||||
self.assertEqual(mod._pth_count, 1)
|
||||
|
||||
# gh-149504
|
||||
def test_reentrant_addsitedir_pth(self):
|
||||
# An import line in a .pth file that calls site.addsitedir()
|
||||
# must not crash or re-execute outer entries while the outer
|
||||
# call is still processing its pending startup state.
|
||||
overlay = self.enterContext(os_helper.temp_dir())
|
||||
overlay_pth = os.path.join(overlay, 'overlay.pth')
|
||||
pkgdir = self.enterContext(os_helper.temp_dir())
|
||||
with open(overlay_pth, 'w', encoding='utf-8') as fp:
|
||||
print(pkgdir, file=fp)
|
||||
self._make_pth(f"import site; site.addsitedir({overlay!r})\n")
|
||||
site.addsitedir(self.sitedir, set())
|
||||
self.assertIn(overlay, sys.path)
|
||||
self.assertIn(pkgdir, sys.path)
|
||||
|
||||
# gh-149504
|
||||
def test_reentrant_addsitedir_start(self):
|
||||
# As above, but the re-entry happens from a .start entry point
|
||||
# instead of a .pth import line. The entry point execution
|
||||
# phase is vulnerable to the same class of bug.
|
||||
overlay = self.enterContext(os_helper.temp_dir())
|
||||
overlay_pth = os.path.join(overlay, 'overlay.pth')
|
||||
pkgdir = self.enterContext(os_helper.temp_dir())
|
||||
with open(overlay_pth, 'w', encoding='utf-8') as fp:
|
||||
print(pkgdir, file=fp)
|
||||
self._make_mod(f"""\
|
||||
import site
|
||||
def bootstrap():
|
||||
site.addsitedir({overlay!r})
|
||||
""",
|
||||
name='reenter_helper', on_path=True)
|
||||
self._make_start("reenter_helper:bootstrap\n")
|
||||
site.addsitedir(self.sitedir, set())
|
||||
self.assertIn(overlay, sys.path)
|
||||
self.assertIn(pkgdir, sys.path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
Fix :func:`site.addsitedir` to allow re-entrant calls from within startup
|
||||
files. Previously, a ``.pth`` file containing an ``import`` line that
|
||||
called :func:`site.addsitedir` (or a ``.start`` entry point doing the same)
|
||||
could crash with ``RuntimeError: dictionary changed size during iteration``
|
||||
during site initialization, breaking tools such as ``uv run --with``.
|
||||
Loading…
Add table
Add a link
Reference in a new issue