"""Tests for 'site'. Tests assume the initial paths in sys.path once the interpreter has begun executing have not been removed. """ import unittest import test.support from test import support from test.support.script_helper import assert_python_ok from test.support import import_helper from test.support import os_helper from test.support import socket_helper from test.support import captured_stderr from test.support.os_helper import TESTFN, EnvironmentVarGuard from test.support.script_helper import spawn_python, kill_python import ast import builtins import contextlib import glob import io import os import re import shutil import stat import subprocess import sys import sysconfig import tempfile from textwrap import dedent from types import SimpleNamespace import urllib.error import urllib.request from unittest import mock from copy import copy # These tests are not particularly useful if Python was invoked with -S. # If you add tests that are useful under -S, this skip should be moved # to the class level. if sys.flags.no_site: raise unittest.SkipTest("Python was invoked with -S") import site HAS_USER_SITE = (site.USER_SITE is not None) OLD_SYS_PATH = None def setUpModule(): global OLD_SYS_PATH OLD_SYS_PATH = sys.path[:] if site.ENABLE_USER_SITE and not os.path.isdir(site.USER_SITE): # need to add user site directory for tests try: os.makedirs(site.USER_SITE) # modify sys.path: will be restored by tearDownModule() site.addsitedir(site.USER_SITE) except PermissionError as exc: raise unittest.SkipTest('unable to create user site directory (%r): %s' % (site.USER_SITE, exc)) def tearDownModule(): sys.path[:] = OLD_SYS_PATH class HelperFunctionsTests(unittest.TestCase): """Tests for helper functions. """ def setUp(self): """Save a copy of sys.path""" self.sys_path = sys.path[:] self.old_base = site.USER_BASE self.old_site = site.USER_SITE self.old_prefixes = site.PREFIXES self.original_vars = sysconfig._CONFIG_VARS self.old_vars = copy(sysconfig._CONFIG_VARS) def tearDown(self): """Restore sys.path""" sys.path[:] = self.sys_path site.USER_BASE = self.old_base site.USER_SITE = self.old_site site.PREFIXES = self.old_prefixes sysconfig._CONFIG_VARS = self.original_vars # _CONFIG_VARS is None before get_config_vars() is called if sysconfig._CONFIG_VARS is not None: sysconfig._CONFIG_VARS.clear() sysconfig._CONFIG_VARS.update(self.old_vars) def test_makepath(self): # Test makepath() have an absolute path for its first return value # and a case-normalized version of the absolute path for its # second value. path_parts = ("Beginning", "End") original_dir = os.path.join(*path_parts) abs_dir, norm_dir = site.makepath(*path_parts) self.assertEqual(os.path.abspath(original_dir), abs_dir) if original_dir == os.path.normcase(original_dir): self.assertEqual(abs_dir, norm_dir) else: self.assertEqual(os.path.normcase(abs_dir), norm_dir) def test_init_pathinfo(self): dir_set = site._init_pathinfo() for entry in [site.makepath(path)[1] for path in sys.path if path and os.path.exists(path)]: self.assertIn(entry, dir_set, "%s from sys.path not found in set returned " "by _init_pathinfo(): %s" % (entry, dir_set)) def pth_file_tests(self, pth_file): """Contain common code for testing results of reading a .pth file""" self.assertIn(pth_file.imported, sys.modules, "%s not in sys.modules" % pth_file.imported) self.assertIn(site.makepath(pth_file.good_dir_path)[0], sys.path) self.assertFalse(os.path.exists(pth_file.bad_dir_path)) def test_addpackage(self): # Make sure addpackage() imports if the line starts with 'import', # adds directories to sys.path for any line in the file that is not a # comment or import that is a valid directory name for where the .pth # file resides; invalid directories are not added pth_file = PthFile() # Ensure we have a clean slate. pth_file.cleanup(prep=True) with pth_file.create(): site.addpackage(pth_file.base_dir, pth_file.filename, set()) self.pth_file_tests(pth_file) def make_pth(self, contents, pth_dir='.', pth_name=TESTFN): # Create a .pth file and return its (abspath, basename). pth_dir = os.path.abspath(pth_dir) pth_basename = pth_name + '.pth' pth_fn = os.path.join(pth_dir, pth_basename) with open(pth_fn, 'w', encoding='utf-8') as pth_file: self.addCleanup(lambda: os.remove(pth_fn)) pth_file.write(contents) return pth_dir, pth_basename def test_addpackage_import_bad_syntax(self): # Issue 10642 pth_dir, pth_fn = self.make_pth("import bad-syntax\n") with captured_stderr() as err_out: site.addpackage(pth_dir, pth_fn, set()) self.assertRegex(err_out.getvalue(), "line 1") self.assertRegex(err_out.getvalue(), re.escape(os.path.join(pth_dir, pth_fn))) self.assertRegex(err_out.getvalue(), 'Traceback') self.assertRegex(err_out.getvalue(), r'import bad-syntax') self.assertRegex(err_out.getvalue(), 'SyntaxError') def test_addpackage_import_bad_exec(self): # Issue 10642 pth_dir, pth_fn = self.make_pth("randompath\nimport nosuchmodule\n") with captured_stderr() as err_out: site.addpackage(pth_dir, pth_fn, set()) self.assertRegex(err_out.getvalue(), re.escape(os.path.join(pth_dir, pth_fn))) self.assertRegex(err_out.getvalue(), 'Traceback') self.assertRegex(err_out.getvalue(), 'ModuleNotFoundError') def test_addpackage_empty_lines(self): # Issue 33689 pth_dir, pth_fn = self.make_pth("\n\n \n\n") known_paths = site.addpackage(pth_dir, pth_fn, set()) self.assertEqual(known_paths, set()) def test_addpackage_import_bad_pth_file(self): # Issue 5258 pth_dir, pth_fn = self.make_pth("abc\x00def\n") for path in sys.path: if isinstance(path, str): self.assertNotIn("abc\x00def", path) def test_addsitedir(self): # addsitedir() reads .pth files and, when called standalone # (known_paths=None), flushes paths and import lines immediately. pth_file = PthFile() # Ensure we have a clean slate. pth_file.cleanup(prep=True) with pth_file.create(): site.addsitedir(pth_file.base_dir) self.pth_file_tests(pth_file) def test_addsitedir_explicit_flush(self): # addsitedir() reads .pth files and, with # defer_processing_start_files=True, accumulates pending state # without flushing. A subsequent process_startup_files() call # then applies the paths and runs the import lines. pth_file = PthFile() # Ensure we have a clean slate. pth_file.cleanup(prep=True) with pth_file.create(): # Pass defer_processing_start_files=True to prevent flushing. site.addsitedir( pth_file.base_dir, set(), defer_processing_start_files=True) self.assertNotIn(pth_file.imported, sys.modules) site.process_startup_files() self.pth_file_tests(pth_file) def test_addsitedir_dotfile(self): pth_file = PthFile('.dotfile') # Ensure we have a clean slate. pth_file.cleanup(prep=True) with pth_file.create(): site.addsitedir(pth_file.base_dir) self.assertNotIn(site.makepath(pth_file.good_dir_path)[0], sys.path) self.assertIn(pth_file.base_dir, sys.path) @unittest.skipUnless(hasattr(os, 'chflags'), 'test needs os.chflags()') def test_addsitedir_hidden_flags(self): pth_file = PthFile() # Ensure we have a clean slate. pth_file.cleanup(prep=True) with pth_file.create(): st = os.stat(pth_file.file_path) os.chflags(pth_file.file_path, st.st_flags | stat.UF_HIDDEN) site.addsitedir(pth_file.base_dir) self.assertNotIn(site.makepath(pth_file.good_dir_path)[0], sys.path) self.assertIn(pth_file.base_dir, sys.path) @unittest.skipUnless(sys.platform == 'win32', 'test needs Windows') @support.requires_subprocess() def test_addsitedir_hidden_file_attribute(self): pth_file = PthFile() # Ensure we have a clean slate. pth_file.cleanup(prep=True) with pth_file.create(): subprocess.check_call(['attrib', '+H', pth_file.file_path]) site.addsitedir(pth_file.base_dir) self.assertNotIn(site.makepath(pth_file.good_dir_path)[0], sys.path) self.assertIn(pth_file.base_dir, sys.path) # This tests _getuserbase, hence the double underline # to distinguish from a test for getuserbase def test__getuserbase(self): self.assertEqual(site._getuserbase(), sysconfig._getuserbase()) @unittest.skipUnless(HAS_USER_SITE, 'need user site') def test_get_path(self): if sys.platform == 'darwin' and sys._framework: scheme = 'osx_framework_user' else: scheme = os.name + '_user' self.assertEqual(os.path.normpath(site._get_path(site._getuserbase())), sysconfig.get_path('purelib', scheme)) @unittest.skipUnless(site.ENABLE_USER_SITE, "requires access to PEP 370 " "user-site (site.ENABLE_USER_SITE)") @support.requires_subprocess() def test_s_option(self): # (ncoghlan) Change this to use script_helper... usersite = os.path.normpath(site.USER_SITE) self.assertIn(usersite, sys.path) env = os.environ.copy() rc = subprocess.call([sys.executable, '-c', 'import sys; sys.exit(%r in sys.path)' % usersite], env=env) self.assertEqual(rc, 1) env = os.environ.copy() rc = subprocess.call([sys.executable, '-s', '-c', 'import sys; sys.exit(%r in sys.path)' % usersite], env=env) if usersite == site.getsitepackages()[0]: self.assertEqual(rc, 1) else: self.assertEqual(rc, 0, "User site still added to path with -s") env = os.environ.copy() env["PYTHONNOUSERSITE"] = "1" rc = subprocess.call([sys.executable, '-c', 'import sys; sys.exit(%r in sys.path)' % usersite], env=env) if usersite == site.getsitepackages()[0]: self.assertEqual(rc, 1) else: self.assertEqual(rc, 0, "User site still added to path with PYTHONNOUSERSITE") env = os.environ.copy() env["PYTHONUSERBASE"] = "/tmp" rc = subprocess.call([sys.executable, '-c', 'import sys, site; sys.exit(site.USER_BASE.startswith("/tmp"))'], env=env) self.assertEqual(rc, 1, "User base not set by PYTHONUSERBASE") @unittest.skipUnless(HAS_USER_SITE, 'need user site') def test_getuserbase(self): site.USER_BASE = None user_base = site.getuserbase() # the call sets site.USER_BASE self.assertEqual(site.USER_BASE, user_base) # let's set PYTHONUSERBASE and see if it uses it site.USER_BASE = None import sysconfig sysconfig._CONFIG_VARS = None with EnvironmentVarGuard() as environ: environ['PYTHONUSERBASE'] = 'xoxo' self.assertStartsWith(site.getuserbase(), 'xoxo') @unittest.skipUnless(HAS_USER_SITE, 'need user site') def test_getusersitepackages(self): site.USER_SITE = None site.USER_BASE = None user_site = site.getusersitepackages() # the call sets USER_BASE *and* USER_SITE self.assertEqual(site.USER_SITE, user_site) self.assertStartsWith(user_site, site.USER_BASE) self.assertEqual(site.USER_BASE, site.getuserbase()) def test_getsitepackages(self): site.PREFIXES = ['xoxo'] dirs = site.getsitepackages() if os.sep == '/': # OS X, Linux, FreeBSD, etc if sys.platlibdir != "lib": self.assertEqual(len(dirs), 2) wanted = os.path.join('xoxo', sys.platlibdir, f'python{sysconfig._get_python_version_abi()}', 'site-packages') self.assertEqual(dirs[0], wanted) else: self.assertEqual(len(dirs), 1) wanted = os.path.join('xoxo', 'lib', f'python{sysconfig._get_python_version_abi()}', 'site-packages') self.assertEqual(dirs[-1], wanted) else: # other platforms self.assertEqual(len(dirs), 2) self.assertEqual(dirs[0], 'xoxo') wanted = os.path.join('xoxo', 'lib', 'site-packages') self.assertEqual(os.path.normcase(dirs[1]), os.path.normcase(wanted)) @unittest.skipUnless(HAS_USER_SITE, 'need user site') def test_no_home_directory(self): # bpo-10496: getuserbase() and getusersitepackages() must not fail if # the current user has no home directory (if expanduser() returns the # path unchanged). site.USER_SITE = None site.USER_BASE = None with EnvironmentVarGuard() as environ, \ mock.patch('os.path.expanduser', lambda path: path): environ.unset('PYTHONUSERBASE', 'APPDATA') user_base = site.getuserbase() self.assertStartsWith(user_base, '~' + os.sep) user_site = site.getusersitepackages() self.assertStartsWith(user_site, user_base) with mock.patch('os.path.isdir', return_value=False) as mock_isdir, \ mock.patch.object(site, 'addsitedir') as mock_addsitedir, \ support.swap_attr(site, 'ENABLE_USER_SITE', True): # addusersitepackages() must not add user_site to sys.path # if it is not an existing directory known_paths = set() site.addusersitepackages(known_paths) mock_isdir.assert_called_once_with(user_site) mock_addsitedir.assert_not_called() self.assertFalse(known_paths) def test_gethistoryfile(self): filename = 'file' rc, out, err = assert_python_ok('-c', f'import site; assert site.gethistoryfile() == "{filename}"', PYTHON_HISTORY=filename) self.assertEqual(rc, 0) # Check that PYTHON_HISTORY is ignored in isolated mode. rc, out, err = assert_python_ok('-I', '-c', f'import site; assert site.gethistoryfile() != "{filename}"', PYTHON_HISTORY=filename) self.assertEqual(rc, 0) def test_trace(self): message = "bla-bla-bla" for verbose, out in (True, message + "\n"), (False, ""): with mock.patch('sys.flags', mock.Mock(verbose=verbose)), \ mock.patch('sys.stderr', io.StringIO()): site._trace(message) self.assertEqual(sys.stderr.getvalue(), out) class PthFile: """Helper class for handling testing of .pth files""" def __init__(self, filename_base=TESTFN, imported="time", good_dirname="__testdir__", bad_dirname="__bad"): """Initialize instance variables""" self.filename = filename_base + ".pth" self.base_dir = os.path.abspath('') self.file_path = os.path.join(self.base_dir, self.filename) self.imported = imported self.good_dirname = good_dirname self.bad_dirname = bad_dirname self.good_dir_path = os.path.join(self.base_dir, self.good_dirname) self.bad_dir_path = os.path.join(self.base_dir, self.bad_dirname) @contextlib.contextmanager def create(self): """Create a .pth file with a comment, blank lines, an ``import ``, a line with self.good_dirname, and a line with self.bad_dirname. Creation of the directory for self.good_dir_path (based off of self.good_dirname) is also performed. Used as a context manager: self.cleanup() is called on exit. """ with open(self.file_path, 'w') as fp: print(f"""\ #import @bad module name import {self.imported} {self.good_dirname} {self.bad_dirname} """, file=fp) os.mkdir(self.good_dir_path) try: yield self finally: self.cleanup() def cleanup(self, prep=False): """Make sure that the .pth file is deleted, self.imported is not in sys.modules, and that both self.good_dirname and self.bad_dirname are not existing directories.""" if os.path.exists(self.file_path): os.remove(self.file_path) if prep: self.imported_module = sys.modules.get(self.imported) if self.imported_module: del sys.modules[self.imported] else: if self.imported_module: sys.modules[self.imported] = self.imported_module if os.path.exists(self.good_dir_path): os.rmdir(self.good_dir_path) if os.path.exists(self.bad_dir_path): os.rmdir(self.bad_dir_path) class ImportSideEffectTests(unittest.TestCase): """Test side-effects from importing 'site'.""" def setUp(self): """Make a copy of sys.path""" self.sys_path = sys.path[:] def tearDown(self): """Restore sys.path""" sys.path[:] = self.sys_path def test_no_duplicate_paths(self): # No duplicate paths should exist in sys.path # Handled by removeduppaths() site.removeduppaths() seen_paths = set() for path in sys.path: self.assertNotIn(path, seen_paths) seen_paths.add(path) @unittest.skip('test not implemented') def test_add_build_dir(self): # Test that the build directory's Modules directory is used when it # should be. # XXX: implement pass def test_setting_quit(self): # 'quit' and 'exit' should be injected into builtins self.assertHasAttr(builtins, "quit") self.assertHasAttr(builtins, "exit") def test_setting_copyright(self): # 'copyright', 'credits', and 'license' should be in builtins self.assertHasAttr(builtins, "copyright") self.assertHasAttr(builtins, "credits") self.assertHasAttr(builtins, "license") def test_setting_help(self): # 'help' should be set in builtins self.assertHasAttr(builtins, "help") def test_sitecustomize_executed(self): # If sitecustomize is available, it should have been imported. if "sitecustomize" not in sys.modules: try: import sitecustomize # noqa: F401 except ImportError: pass else: self.fail("sitecustomize not imported automatically") @support.requires_subprocess() def test_customization_modules_on_startup(self): mod_names = [ 'sitecustomize' ] if site.ENABLE_USER_SITE: mod_names.append('usercustomize') temp_dir = tempfile.mkdtemp() self.addCleanup(os_helper.rmtree, temp_dir) with EnvironmentVarGuard() as environ: environ['PYTHONPATH'] = temp_dir for module_name in mod_names: os_helper.rmtree(temp_dir) os.mkdir(temp_dir) customize_path = os.path.join(temp_dir, f'{module_name}.py') eyecatcher = f'EXECUTED_{module_name}' with open(customize_path, 'w') as f: f.write(f'print("{eyecatcher}")') output = subprocess.check_output([sys.executable, '-c', '""']) self.assertIn(eyecatcher, output.decode('utf-8')) # -S blocks any site-packages output = subprocess.check_output([sys.executable, '-S', '-c', '""']) self.assertNotIn(eyecatcher, output.decode('utf-8')) # -s blocks user site-packages if 'usercustomize' == module_name: output = subprocess.check_output([sys.executable, '-s', '-c', '""']) self.assertNotIn(eyecatcher, output.decode('utf-8')) @unittest.skipUnless(hasattr(urllib.request, "HTTPSHandler"), 'need SSL support to download license') @test.support.requires_resource('network') @test.support.system_must_validate_cert def test_license_exists_at_url(self): # This test is a bit fragile since it depends on the format of the # string displayed by license in the absence of a LICENSE file. url = license._Printer__data.split()[1] req = urllib.request.Request(url, method='HEAD') # Reset global urllib.request._opener self.addCleanup(urllib.request.urlcleanup) try: with socket_helper.transient_internet(url): with urllib.request.urlopen(req) as data: code = data.getcode() except urllib.error.HTTPError as e: code = e.code self.assertEqual(code, 200, msg="Can't find " + url) @support.cpython_only def test_lazy_imports(self): import_helper.ensure_lazy_imports("site", [ "io", "locale", "traceback", "atexit", "warnings", "textwrap", ]) class StartupImportTests(unittest.TestCase): @support.requires_subprocess() def test_startup_imports(self): # Get sys.path in isolated mode (python3 -I) popen = subprocess.Popen([sys.executable, '-X', 'utf8', '-I', '-c', 'import sys; print(repr(sys.path))'], stdout=subprocess.PIPE, encoding='utf-8', errors='surrogateescape') stdout = popen.communicate()[0] self.assertEqual(popen.returncode, 0, repr(stdout)) isolated_paths = ast.literal_eval(stdout) # bpo-27807: Even with -I, the site module executes all .pth files # found in sys.path (see site.addpackage()). Skip the test if at least # one .pth file is found. for path in isolated_paths: pth_files = glob.glob(os.path.join(glob.escape(path), "*.pth")) if pth_files: self.skipTest(f"found {len(pth_files)} .pth files in: {path}") # This tests checks which modules are loaded by Python when it # initially starts upon startup. popen = subprocess.Popen([sys.executable, '-X', 'utf8', '-I', '-v', '-c', 'import sys; print(set(sys.modules))'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', errors='surrogateescape') stdout, stderr = popen.communicate() self.assertEqual(popen.returncode, 0, (stdout, stderr)) modules = ast.literal_eval(stdout) self.assertIn('site', modules) # http://bugs.python.org/issue19205 re_mods = {'re', '_sre', 're._compiler', 're._constants', 're._parser'} self.assertFalse(modules.intersection(re_mods), stderr) # http://bugs.python.org/issue9548 self.assertNotIn('locale', modules, stderr) # http://bugs.python.org/issue19209 self.assertNotIn('copyreg', modules, stderr) # http://bugs.python.org/issue19218 collection_mods = {'_collections', 'collections', 'functools', 'heapq', 'itertools', 'keyword', 'operator', 'reprlib', 'types', 'weakref' }.difference(sys.builtin_module_names) self.assertFalse(modules.intersection(collection_mods), stderr) @support.requires_subprocess() def test_startup_interactivehook(self): r = subprocess.Popen([sys.executable, '-c', 'import sys; sys.exit(hasattr(sys, "__interactivehook__"))']).wait() self.assertTrue(r, "'__interactivehook__' not added by site") @support.requires_subprocess() def test_startup_interactivehook_isolated(self): # issue28192 readline is not automatically enabled in isolated mode r = subprocess.Popen([sys.executable, '-I', '-c', 'import sys; sys.exit(hasattr(sys, "__interactivehook__"))']).wait() self.assertFalse(r, "'__interactivehook__' added in isolated mode") @support.requires_subprocess() def test_startup_interactivehook_isolated_explicit(self): # issue28192 readline can be explicitly enabled in isolated mode r = subprocess.Popen([sys.executable, '-I', '-c', 'import site, sys; site.enablerlcompleter(); sys.exit(hasattr(sys, "__interactivehook__"))']).wait() self.assertTrue(r, "'__interactivehook__' not added by enablerlcompleter()") class _pthFileTests(unittest.TestCase): if sys.platform == 'win32': def _create_underpth_exe(self, lines, exe_pth=True): import _winapi temp_dir = tempfile.mkdtemp() self.addCleanup(os_helper.rmtree, temp_dir) exe_file = os.path.join(temp_dir, os.path.split(sys.executable)[1]) dll_src_file = _winapi.GetModuleFileName(sys.dllhandle) dll_file = os.path.join(temp_dir, os.path.split(dll_src_file)[1]) shutil.copy(sys.executable, exe_file) shutil.copy(dll_src_file, dll_file) for fn in glob.glob(os.path.join(os.path.split(dll_src_file)[0], "vcruntime*.dll")): shutil.copy(fn, os.path.join(temp_dir, os.path.split(fn)[1])) if exe_pth: _pth_file = os.path.splitext(exe_file)[0] + '._pth' else: _pth_file = os.path.splitext(dll_file)[0] + '._pth' with open(_pth_file, 'w', encoding='utf8') as f: for line in lines: print(line, file=f) return exe_file else: def _create_underpth_exe(self, lines, exe_pth=True): if not exe_pth: raise unittest.SkipTest("library ._pth file not supported on this platform") temp_dir = tempfile.mkdtemp() self.addCleanup(os_helper.rmtree, temp_dir) exe_file = os.path.join(temp_dir, os.path.split(sys.executable)[1]) os.symlink(sys.executable, exe_file) _pth_file = exe_file + '._pth' with open(_pth_file, 'w') as f: for line in lines: print(line, file=f) return exe_file def _calc_sys_path_for_underpth_nosite(self, sys_prefix, lines): sys_path = [] for line in lines: if not line or line[0] == '#': continue abs_path = os.path.abspath(os.path.join(sys_prefix, line)) sys_path.append(abs_path) return sys_path def _get_pth_lines(self, libpath: str, *, import_site: bool): pth_lines = ['fake-path-name'] # include 200 lines of `libpath` in _pth lines (or fewer # if the `libpath` is long enough to get close to 32KB # see https://github.com/python/cpython/issues/113628) encoded_libpath_length = len(libpath.encode("utf-8")) repetitions = min(200, 30000 // encoded_libpath_length) if repetitions <= 2: self.skipTest( f"Python stdlib path is too long ({encoded_libpath_length:,} bytes)") pth_lines.extend(libpath for _ in range(repetitions)) pth_lines.extend(['', '# comment']) if import_site: pth_lines.append('import site') return pth_lines @support.requires_subprocess() def test_underpth_basic(self): pth_lines = ['#.', '# ..', *sys.path, '.', '..'] exe_file = self._create_underpth_exe(pth_lines) sys_path = self._calc_sys_path_for_underpth_nosite( os.path.dirname(exe_file), pth_lines) output = subprocess.check_output([exe_file, '-X', 'utf8', '-c', 'import sys; print("\\n".join(sys.path) if sys.flags.no_site else "")' ], encoding='utf-8', errors='surrogateescape') actual_sys_path = output.rstrip().split('\n') self.assertTrue(actual_sys_path, "sys.flags.no_site was False") self.assertEqual( actual_sys_path, sys_path, "sys.path is incorrect" ) @support.requires_subprocess() def test_underpth_nosite_file(self): libpath = test.support.STDLIB_DIR exe_prefix = os.path.dirname(sys.executable) pth_lines = self._get_pth_lines(libpath, import_site=False) exe_file = self._create_underpth_exe(pth_lines) sys_path = self._calc_sys_path_for_underpth_nosite( os.path.dirname(exe_file), pth_lines) env = os.environ.copy() env['PYTHONPATH'] = 'from-env' env['PATH'] = '{}{}{}'.format(exe_prefix, os.pathsep, os.getenv('PATH')) output = subprocess.check_output([exe_file, '-c', 'import sys; print("\\n".join(sys.path) if sys.flags.no_site else "")' ], env=env, encoding='utf-8', errors='surrogateescape') actual_sys_path = output.rstrip().split('\n') self.assertTrue(actual_sys_path, "sys.flags.no_site was False") self.assertEqual( actual_sys_path, sys_path, "sys.path is incorrect" ) @support.requires_subprocess() def test_underpth_file(self): libpath = test.support.STDLIB_DIR exe_prefix = os.path.dirname(sys.executable) exe_file = self._create_underpth_exe( self._get_pth_lines(libpath, import_site=True)) sys_prefix = os.path.dirname(exe_file) env = os.environ.copy() env['PYTHONPATH'] = 'from-env' env['PATH'] = '{};{}'.format(exe_prefix, os.getenv('PATH')) rc = subprocess.call([exe_file, '-c', 'import sys; sys.exit(not sys.flags.no_site and ' '%r in sys.path and %r in sys.path and %r not in sys.path and ' 'all("\\r" not in p and "\\n" not in p for p in sys.path))' % ( os.path.join(sys_prefix, 'fake-path-name'), libpath, os.path.join(sys_prefix, 'from-env'), )], env=env) self.assertTrue(rc, "sys.path is incorrect") @support.requires_subprocess() def test_underpth_dll_file(self): libpath = test.support.STDLIB_DIR exe_prefix = os.path.dirname(sys.executable) exe_file = self._create_underpth_exe( self._get_pth_lines(libpath, import_site=True), exe_pth=False) sys_prefix = os.path.dirname(exe_file) env = os.environ.copy() env['PYTHONPATH'] = 'from-env' env['PATH'] = '{};{}'.format(exe_prefix, os.getenv('PATH')) rc = subprocess.call([exe_file, '-c', 'import sys; sys.exit(not sys.flags.no_site and ' '%r in sys.path and %r in sys.path and %r not in sys.path and ' 'all("\\r" not in p and "\\n" not in p for p in sys.path))' % ( os.path.join(sys_prefix, 'fake-path-name'), libpath, os.path.join(sys_prefix, 'from-env'), )], env=env) self.assertTrue(rc, "sys.path is incorrect") @support.requires_subprocess() def test_underpth_no_user_site(self): pth_lines = [test.support.STDLIB_DIR, 'import site'] exe_file = self._create_underpth_exe(pth_lines) p = subprocess.run([exe_file, '-X', 'utf8', '-c', 'import sys; ' 'sys.exit(not sys.flags.no_user_site)']) self.assertEqual(p.returncode, 0, "sys.flags.no_user_site was 0") class CommandLineTests(unittest.TestCase): def exists(self, path): if path is not None and os.path.isdir(path): return "exists" else: return "doesn't exist" def get_excepted_output(self, *args): if len(args) == 0: user_base = site.getuserbase() user_site = site.getusersitepackages() output = io.StringIO() output.write("sys.path = [\n") for dir in sys.path: output.write(" %r,\n" % (dir,)) output.write("]\n") output.write(f"USER_BASE: {user_base} ({self.exists(user_base)})\n") output.write(f"USER_SITE: {user_site} ({self.exists(user_site)})\n") output.write(f"ENABLE_USER_SITE: {site.ENABLE_USER_SITE}\n") return 0, dedent(output.getvalue()).strip() buffer = [] if '--user-base' in args: buffer.append(site.getuserbase()) if '--user-site' in args: buffer.append(site.getusersitepackages()) if buffer: return_code = 3 if site.ENABLE_USER_SITE: return_code = 0 elif site.ENABLE_USER_SITE is False: return_code = 1 elif site.ENABLE_USER_SITE is None: return_code = 2 output = os.pathsep.join(buffer) return return_code, os.path.normpath(dedent(output).strip()) else: return 10, None def invoke_command_line(self, *args): cmd_args = [] if sys.flags.no_user_site: cmd_args.append("-s") cmd_args.extend(["-m", "site", *args]) with EnvironmentVarGuard() as env: env["PYTHONUTF8"] = "1" env["PYTHONIOENCODING"] = "utf-8" proc = spawn_python(*cmd_args, text=True, env=env, encoding='utf-8', errors='replace') output = kill_python(proc) return_code = proc.returncode return return_code, os.path.normpath(dedent(output).strip()) @support.requires_subprocess() def test_no_args(self): return_code, output = self.invoke_command_line() excepted_return_code, _ = self.get_excepted_output() self.assertEqual(return_code, excepted_return_code) lines = output.splitlines() self.assertEqual(lines[0], "sys.path = [") self.assertEqual(lines[-4], "]") excepted_base = f"USER_BASE: '{site.getuserbase()}'" +\ f" ({self.exists(site.getuserbase())})" self.assertEqual(lines[-3], excepted_base) excepted_site = f"USER_SITE: '{site.getusersitepackages()}'" +\ f" ({self.exists(site.getusersitepackages())})" self.assertEqual(lines[-2], excepted_site) self.assertEqual(lines[-1], f"ENABLE_USER_SITE: {site.ENABLE_USER_SITE}") @support.requires_subprocess() def test_unknown_args(self): return_code, output = self.invoke_command_line("--unknown-arg") excepted_return_code, _ = self.get_excepted_output("--unknown-arg") self.assertEqual(return_code, excepted_return_code) self.assertIn('[--user-base] [--user-site]', output) @support.requires_subprocess() def test_base_arg(self): return_code, output = self.invoke_command_line("--user-base") excepted = self.get_excepted_output("--user-base") excepted_return_code, excepted_output = excepted self.assertEqual(return_code, excepted_return_code) self.assertEqual(output, excepted_output) @support.requires_subprocess() def test_site_arg(self): return_code, output = self.invoke_command_line("--user-site") excepted = self.get_excepted_output("--user-site") excepted_return_code, excepted_output = excepted self.assertEqual(return_code, excepted_return_code) self.assertEqual(output, excepted_output) @support.requires_subprocess() def test_both_args(self): return_code, output = self.invoke_command_line("--user-base", "--user-site") excepted = self.get_excepted_output("--user-base", "--user-site") excepted_return_code, excepted_output = excepted self.assertEqual(return_code, excepted_return_code) self.assertEqual(output, excepted_output) class StartFileTests(unittest.TestCase): """Tests for .start file processing (PEP 829).""" def setUp(self): self.enterContext(import_helper.DirsOnSysPath()) self.tmpdir = self.sitedir = self.enterContext(os_helper.temp_dir()) # Each test gets its own _StartupState to drive the parser and # processor methods directly. Defensively clear any _startup_state # that a prior test may have left set via defer_processing_start_files # without a corresponding process_startup_files() flush. self.state = site._StartupState() site._startup_state = None self.addCleanup(self._reset_startup_state) def _reset_startup_state(self): site._startup_state = None def _make_start(self, content, name='testpkg', basedir=None): """Write a .start file and return its basename. ``basedir`` defaults to ``self.tmpdir``. Pass an explicit directory when the .start file needs to live somewhere other than the test's primary tmpdir (e.g. a nested user-site). """ basename = f"{name}.start" filepath = os.path.join(self.tmpdir if basedir is None else basedir, basename) with open(filepath, 'w', encoding='utf-8') as f: f.write(content) return basename def _make_pth(self, content, name='testpkg', basedir=None): """Write a .pth file and return its basename. ``basedir`` defaults to ``self.tmpdir``. Pass an explicit directory when the .pth file needs to live somewhere other than the test's primary tmpdir (e.g. a nested user-site). """ basename = f"{name}.pth" filepath = os.path.join(self.tmpdir if basedir is None else basedir, basename) with open(filepath, 'w', encoding='utf-8') as f: f.write(content) return basename def _make_mod(self, contents, name='mod', *, package=False, on_path=False): """Write an importable module (or package), returning its parent dir.""" extdir = os.path.join(self.sitedir, 'extdir') os.makedirs(extdir, exist_ok=True) # Put the code in a package's dunder-init or flat module. if package: pkgdir = os.path.join(extdir, name) os.mkdir(pkgdir) modpath = os.path.join(pkgdir, '__init__.py') else: modpath = os.path.join(extdir, f'{name}.py') with open(modpath, 'w') as fp: fp.write(contents) self.addCleanup(sys.modules.pop, name, None) if on_path: # Don't worry, DirsOnSysPath() in setUp() will clean this up. sys.path.insert(0, extdir) return extdir def _all_entrypoints(self): """Flatten state._entrypoints into a list of (filename, entry) tuples.""" result = [] for filename, entries in self.state._entrypoints.items(): for entry in entries: result.append((filename, entry)) return result def _just_entrypoints(self): return [entry for filename, entry in self._all_entrypoints()] # There are two classes of tests here. Tests that start with `test_impl_` # know details about the implementation and they access non-public methods # and data structures to perform focused functional tests. # # Tests that start with `test_addsitedir_` are end-to-end tests that ensure # integration semantics and functionality as a caller of the public # surfaces would see. # --- _StartupState.read_start_file tests --- def test_impl_read_start_file_basic(self): self._make_start("os.path:join\n", name='foo') self.state.read_start_file(self.sitedir, 'foo.start') fullname = os.path.join(self.sitedir, 'foo.start') self.assertEqual( self.state._entrypoints[fullname], ['os.path:join'] ) def test_impl_read_start_file_multiple_entries(self): self._make_start("os.path:join\nos.path:exists\n", name='foo') self.state.read_start_file(self.sitedir, 'foo.start') fullname = os.path.join(self.sitedir, 'foo.start') self.assertEqual( self.state._entrypoints[fullname], ['os.path:join', 'os.path:exists'], ) def test_impl_read_start_file_comments_and_blanks(self): self._make_start("# a comment\n\nos.path:join\n \n", name='foo') self.state.read_start_file(self.sitedir, 'foo.start') fullname = os.path.join(self.sitedir, 'foo.start') self.assertEqual( self.state._entrypoints[fullname], ['os.path:join'] ) def test_impl_read_start_file_accepts_all_non_blank_lines(self): # Syntax validation is deferred to entry-point execution time # (where pkgutil.resolve_name(strict=True) enforces the strict # pkg.mod:callable form), so parsing accepts every non-blank, # non-comment line, including syntactically invalid ones. content = ( "os.path\n" # no colon "pkg.mod:\n" # empty callable ":callable\n" # empty module "pkg.mod:callable:extra\n" # multiple colons "os.path:join\n" # valid ) self._make_start(content, name='foo') self.state.read_start_file(self.sitedir, 'foo.start') fullname = os.path.join(self.sitedir, 'foo.start') self.assertEqual(self.state._entrypoints[fullname], [ 'os.path', 'pkg.mod:', ':callable', 'pkg.mod:callable:extra', 'os.path:join', ]) def test_impl_read_start_file_empty(self): # PEP 829: an empty .start file is still registered as present # (with an empty entry point list) so that it suppresses `import` # lines in any matching .pth file. self._make_start("", name='foo') self.state.read_start_file(self.sitedir, 'foo.start') fullname = os.path.join(self.sitedir, 'foo.start') self.assertEqual(self.state._entrypoints, {fullname: []}) def test_impl_read_start_file_comments_only(self): # As with an empty file, a comments-only .start file is registered # as present so it can suppress matching .pth `import` lines. self._make_start("# just a comment\n# another\n", name='foo') self.state.read_start_file(self.sitedir, 'foo.start') fullname = os.path.join(self.sitedir, 'foo.start') self.assertEqual(self.state._entrypoints, {fullname: []}) def test_impl_read_start_file_nonexistent(self): with captured_stderr(): self.state.read_start_file(self.tmpdir, 'nonexistent.start') self.assertEqual(self.state._entrypoints, {}) @unittest.skipUnless(hasattr(os, 'chflags'), 'test needs os.chflags()') def test_impl_read_start_file_hidden_flags(self): self._make_start("os.path:join\n", name='foo') filepath = os.path.join(self.tmpdir, 'foo.start') st = os.stat(filepath) os.chflags(filepath, st.st_flags | stat.UF_HIDDEN) self.state.read_start_file(self.sitedir, 'foo.start') self.assertEqual(self.state._entrypoints, {}) def test_impl_one_start_file_with_duplicates_not_deduplicated(self): # PEP 829: duplicate entry points are NOT deduplicated. self._make_start("os.path:join\nos.path:join\n", name='foo') self.state.read_start_file(self.sitedir, 'foo.start') fullname = os.path.join(self.sitedir, 'foo.start') self.assertEqual( self.state._entrypoints[fullname], ['os.path:join', 'os.path:join'], ) def test_impl_two_start_files_with_duplicates_not_deduplicated(self): self._make_start("os.path:join", name="foo") self._make_start("os.path:join", name="bar") self.state.read_start_file(self.sitedir, 'foo.start') self.state.read_start_file(self.sitedir, 'bar.start') self.assertEqual( self._just_entrypoints(), ['os.path:join', 'os.path:join'], ) def test_impl_read_start_file_accepts_utf8_bom(self): # PEP 829: .start files MUST be utf-8-sig (UTF-8 with optional BOM). filepath = os.path.join(self.tmpdir, 'foo.start') with open(filepath, 'wb') as f: f.write(b'\xef\xbb\xbf' + b'os.path:join\n') self.state.read_start_file(self.sitedir, 'foo.start') fullname = os.path.join(self.sitedir, 'foo.start') self.assertEqual( self.state._entrypoints[fullname], ['os.path:join'] ) def test_impl_read_start_file_invalid_utf8_silently_skipped(self): # PEP 829: .start files MUST be utf-8-sig. Unlike .pth files, there # is no locale-encoding fallback. A .start file that is not valid # UTF-8 is silently skipped, with no key registered in # state._entrypoints and no output to stderr (parsing errors are # reported only under -v). filepath = os.path.join(self.tmpdir, 'foo.start') with open(filepath, 'wb') as f: # Bare continuation byte -- invalid as a UTF-8 start byte. f.write(b'\x80\x80\x80\n') with captured_stderr() as err: self.state.read_start_file(self.sitedir, 'foo.start') self.assertEqual(self.state._entrypoints, {}) self.assertEqual(err.getvalue(), "") # --- _StartupState.read_pth_file tests --- def test_impl_read_pth_file_paths(self): subdir = os.path.join(self.sitedir, 'mylib') os.mkdir(subdir) self._make_pth("mylib\n", name='foo') self.state.read_pth_file(self.sitedir, 'foo.pth', set()) fullname = os.path.join(self.sitedir, 'foo.pth') self.assertIn(subdir, self.state._syspaths[fullname]) def test_impl_read_pth_file_imports_collected(self): self._make_pth("import sys\n", name='foo') self.state.read_pth_file(self.sitedir, 'foo.pth', set()) fullname = os.path.join(self.sitedir, 'foo.pth') self.assertEqual( self.state._importexecs[fullname], ['import sys'] ) def test_impl_read_pth_file_comments_and_blanks(self): self._make_pth("# comment\n\n \n", name='foo') self.state.read_pth_file(self.sitedir, 'foo.pth', set()) self.assertEqual(self.state._syspaths, {}) self.assertEqual(self.state._importexecs, {}) def test_impl_read_pth_file_deduplication(self): subdir = os.path.join(self.sitedir, 'mylib') os.mkdir(subdir) # An accumulator acts as a deduplication ledger. known_paths = set() self._make_pth("mylib\n", name='a') self._make_pth("mylib\n", name='b') self.state.read_pth_file(self.sitedir, 'a.pth', known_paths) self.state.read_pth_file(self.sitedir, 'b.pth', known_paths) # There is only one entry across both files. all_dirs = [] for dirs in self.state._syspaths.values(): all_dirs.extend(dirs) self.assertEqual(all_dirs, [subdir]) def test_impl_read_pth_file_bad_line_continues(self): # PEP 829: errors on individual lines don't abort processing the file. subdir = os.path.join(self.sitedir, 'goodpath') os.mkdir(subdir) self._make_pth("abc\x00def\ngoodpath\n", name='foo') with captured_stderr(): self.state.read_pth_file(self.sitedir, 'foo.pth', set()) fullname = os.path.join(self.sitedir, 'foo.pth') self.assertIn(subdir, self.state._syspaths.get(fullname, [])) def _flags_with_verbose(self, verbose): # Build a sys.flags clone with verbose overridden but every # other field preserved, so unrelated reads like # sys.flags.optimize during io.open_code() continue to work. attrs = { name: getattr(sys.flags, name) for name in sys.flags.__match_args__ } attrs['verbose'] = verbose return SimpleNamespace(**attrs) def test_impl_read_pth_file_parse_error_silent_by_default(self): # PEP 829: parse-time errors are silent unless -v is given. # Force the error path by making makepath() raise an exception. self._make_pth("badline\n", name='foo') with ( mock.patch('site.makepath', side_effect=ValueError("boom")), mock.patch('sys.flags', self._flags_with_verbose(False)), captured_stderr() as err, ): self.state.read_pth_file(self.sitedir, 'foo.pth', set()) self.assertEqual(err.getvalue(), "") def test_impl_read_pth_file_parse_error_reported_under_verbose(self): # PEP 829: parse-time errors are reported when -v is given. self._make_pth("badline\n", name='foo') with ( mock.patch('site.makepath', side_effect=ValueError("boom")), mock.patch('sys.flags', self._flags_with_verbose(True)), captured_stderr() as err, ): self.state.read_pth_file(self.sitedir, 'foo.pth', set()) out = err.getvalue() self.assertIn('Error in', out) self.assertIn('foo.pth', out) def test_impl_read_pth_file_locale_fallback(self): # PEP 829: .pth files that fail UTF-8 decoding fall back to the # locale encoding for backward compatibility (deprecated in # 3.15, to be removed in 3.20). Mock locale.getencoding() so # the test does not depend on the host's actual locale. subdir = os.path.join(self.sitedir, 'mylib') os.mkdir(subdir) filepath = os.path.join(self.tmpdir, 'foo.pth') # \xe9 is invalid UTF-8 but valid in latin-1. with open(filepath, 'wb') as f: f.write(b'# caf\xe9 comment\nmylib\n') with ( mock.patch('locale.getencoding', return_value='latin-1'), captured_stderr(), ): self.state.read_pth_file(self.sitedir, 'foo.pth', set()) fullname = os.path.join(self.sitedir, 'foo.pth') self.assertIn(subdir, self.state._syspaths.get(fullname, [])) # --- _StartupState._execute_start_entrypoints tests --- def test_impl_execute_entrypoints_with_callable(self): # An entry point with a callable. self._make_mod("""\ called = False def startup(): global called called = True """, name='epmod', package=True, on_path=True) fullname = os.path.join(self.sitedir, 'epmod.start') self.state._entrypoints[fullname] = ['epmod:startup'] self.state._execute_start_entrypoints() import epmod self.assertTrue(epmod.called) def test_impl_execute_entrypoints_import_error(self): # Import errors print a traceback and continue. fullname = os.path.join(self.sitedir, 'bad.start') self.state._entrypoints[fullname] = [ 'nosuchmodule_xyz:func', 'os.path:join', ] with captured_stderr() as err: self.state._execute_start_entrypoints() self.assertIn('nosuchmodule_xyz', err.getvalue()) # os.path:join should still have been called (no exception for it) def test_impl_execute_entrypoints_strict_syntax_rejection(self): # PEP 829: only the strict pkg.mod:callable form is valid. At entry # point execution time, pkgutil.resolve_name(strict=True) raises a # ValueError for the invalid syntax. The invalid entry is reported # and execution continues with the next one. fullname = os.path.join(self.sitedir, 'bad.start') self.state._entrypoints[fullname] = [ 'os.path', # no colon 'pkg.mod:', # empty callable ':callable', # empty module 'pkg.mod:callable:extra', # multiple colons ] with captured_stderr() as err: self.state._execute_start_entrypoints() out = err.getvalue() self.assertIn('Invalid entry point syntax', out) for bad in ( 'os.path', 'pkg.mod:', ':callable', 'pkg.mod:callable:extra', ): self.assertIn(bad, out) def test_impl_execute_entrypoints_callable_error(self): # A callable that errors prints a traceback but continues. self._make_mod("""\ def fail(): raise RuntimeError("boom") """, name='badmod', package=True, on_path=True) fullname = os.path.join(self.sitedir, 'badmod.start') self.state._entrypoints[fullname] = ['badmod:fail'] with captured_stderr() as err: self.state._execute_start_entrypoints() self.assertIn('RuntimeError', err.getvalue()) self.assertIn('boom', err.getvalue()) def test_impl_execute_entrypoints_duplicates_called_twice(self): # PEP 829: duplicate entry points execute multiple times. self._make_mod("""\ call_count = 0 def bump(): global call_count call_count += 1 """, name='countmod', package=False, on_path=True) fullname = os.path.join(self.sitedir, 'countmod.start') self.state._entrypoints[fullname] = [ 'countmod:bump', 'countmod:bump', ] self.state._execute_start_entrypoints() import countmod self.assertEqual(countmod.call_count, 2) # --- _StartupState._exec_imports tests --- def test_impl_exec_imports_suppressed_by_matching_start(self): # Import lines from foo.pth are suppressed when foo.start exists. self._make_mod("""\ call_count = 0 def bump(): global call_count call_count += 1 """, name='countmod', package=False, on_path=True) pth_fullname = os.path.join(self.sitedir, 'foo.pth') start_fullname = os.path.join(self.sitedir, 'foo.start') self.state._importexecs[pth_fullname] = ['import countmod; countmod.bump()'] self.state._entrypoints[start_fullname] = ['os.path:join'] self.state._exec_imports() import countmod self.assertEqual(countmod.call_count, 0) def test_impl_exec_imports_not_suppressed_by_different_start(self): # Import lines from foo.pth are NOT suppressed by bar.start. self._make_mod("""\ call_count = 0 def bump(): global call_count call_count += 1 """, name='countmod', package=False, on_path=True) pth_fullname = os.path.join(self.sitedir, 'foo.pth') start_fullname = os.path.join(self.sitedir, 'bar.start') self.state._importexecs[pth_fullname] = ['import countmod; countmod.bump()'] self.state._entrypoints[start_fullname] = ['os.path:join'] self.state._exec_imports() import countmod self.assertEqual(countmod.call_count, 1) def test_impl_exec_imports_suppressed_by_empty_matching_start(self): self._make_start("", name='foo') self._make_pth("import epmod; epmod.startup()", name='foo') self._make_mod("""\ called = False def startup(): global called called = True """, name='epmod', package=True, on_path=True) self.state.read_pth_file(self.sitedir, 'foo.pth', set()) self.state.read_start_file(self.sitedir, 'foo.start') self.state._exec_imports() import epmod self.assertFalse(epmod.called) # --- _StartupState._extend_syspath tests --- def test_impl_extend_syspath_existing_dir(self): subdir = os.path.join(self.sitedir, 'extlib') os.mkdir(subdir) self.state._syspaths['test.pth'] = [subdir] self.state._extend_syspath() self.assertIn(subdir, sys.path) def test_impl_extend_syspath_nonexistent_dir(self): nonesuch = os.path.join(self.sitedir, 'nosuchdir') self.state._syspaths['test.pth'] = [nonesuch] with captured_stderr() as err: self.state._extend_syspath() self.assertNotIn(nonesuch, sys.path) self.assertIn('does not exist', err.getvalue()) # --- addsitedir integration tests --- def test_addsitedir_pth_import_skipped_when_matching_start_exists(self): # PEP 829: an empty .start file disables the matching .pth's import # lines, even when the .start has no entry points of its own. self._make_mod("flag = False\n", name='suppressed', on_path=True) self._make_start("", name='foo') self._make_pth( "import suppressed; suppressed.flag = True\n", name='foo') site.addsitedir(self.sitedir, set()) import suppressed self.assertFalse( suppressed.flag, "import line in foo.pth should be suppressed by foo.start") def test_addsitedir_dotfile_start_entrypoint_not_executed(self): # .start files starting with '.' are skipped, so their entry # points must not run. self._make_mod("""\ called = False def hook(): global called called = True """, name='dotted', on_path=True) self._make_start("dotted:hook\n", name='.hidden') site.addsitedir(self.sitedir, set()) import dotted self.assertFalse(dotted.called) def test_addsitedir_dedups_paths_across_pth_files(self): # PEP 829: when multiple .pth files reference the same path within # a single addsitedir() invocation, the path is appended to # sys.path exactly once. subdir = os.path.join(self.sitedir, 'shared') os.mkdir(subdir) self._make_pth("shared\n", name='a') self._make_pth("shared\n", name='b') before = sys.path.count(subdir) site.addsitedir(self.sitedir, set()) self.assertEqual(sys.path.count(subdir), before + 1) def test_addsitedir_discovers_start_files(self): # addsitedir() should discover .start files and accumulate entries. # With defer_processing_start_files=True the preserved state lives on # site._startup_state and isn't flushed until the caller invokes # process_startup_files(). self._make_start("os.path:join\n", name='foo') site.addsitedir( self.sitedir, set(), defer_processing_start_files=True, ) fullname = os.path.join(self.sitedir, 'foo.start') self.assertIn( 'os.path:join', site._startup_state._entrypoints[fullname] ) def test_impl_exec_imports_skips_when_matching_start(self): # When foo.start exists, import lines in foo.pth are skipped # at flush time by _StartupState._exec_imports(). self._make_start("os.path:join\n", name='foo') self._make_pth("import sys\n", name='foo') site.addsitedir( self.sitedir, set(), defer_processing_start_files=True, ) pth_fullname = os.path.join(self.sitedir, 'foo.pth') start_fullname = os.path.join(self.sitedir, 'foo.start') # Import line was collected... self.assertIn( 'import sys', site._startup_state._importexecs.get(pth_fullname, []), ) # ...but _exec_imports() will skip it because foo.start exists. site._startup_state._exec_imports() def test_addsitedir_pth_paths_still_work_with_start(self): # Path lines in .pth files still work even when a .start file exists. subdir = os.path.join(self.sitedir, 'mylib') os.mkdir(subdir) self._make_start("os.path:join\n", name='foo') self._make_pth("mylib\n", name='foo') site.addsitedir( self.sitedir, set(), defer_processing_start_files=True, ) fullname = os.path.join(self.sitedir, 'foo.pth') self.assertIn( subdir, site._startup_state._syspaths.get(fullname, []) ) def test_addsitedir_start_alphabetical_order(self): # Multiple .start files are discovered alphabetically. # _all_entrypoints() reads from self.state, so swap in the # preserved batch state for the duration of the assertion. self._make_start("os.path:join\n", name='zzz') self._make_start("os.path:exists\n", name='aaa') site.addsitedir( self.sitedir, set(), defer_processing_start_files=True, ) self.state = site._startup_state all_entries = self._all_entrypoints() entries = [entry for _, entry in all_entries] idx_a = entries.index('os.path:exists') idx_z = entries.index('os.path:join') self.assertLess(idx_a, idx_z) def test_addsitedir_pth_before_start(self): # PEP 829: .pth files are scanned before .start files. # Create a .pth and .start with the same basename; verify # the .pth data is collected before .start data. subdir = os.path.join(self.sitedir, 'mylib') os.mkdir(subdir) self._make_pth("mylib\n", name='foo') self._make_start("os.path:join\n", name='foo') site.addsitedir( self.sitedir, set(), defer_processing_start_files=True, ) # Both should be collected. pth_fullname = os.path.join(self.sitedir, 'foo.pth') start_fullname = os.path.join(self.sitedir, 'foo.start') self.assertIn( subdir, site._startup_state._syspaths.get(pth_fullname, []) ) self.assertIn( 'os.path:join', site._startup_state._entrypoints.get(start_fullname, []), ) def test_impl_addsitedir_skips_dotfile_start(self): # .start files starting with '.' are skipped. Defer flushing so # the preserved batch state stays inspectable on # site._startup_state; otherwise process_startup_files() would # detach and consume it regardless of whether the dotfile was # picked up. self._make_start("os.path:join\n", name='.hidden') site.addsitedir( self.sitedir, set(), defer_processing_start_files=True, ) self.assertEqual(site._startup_state._entrypoints, {}) def test_addsitedir_standalone_flushes(self): # When called with defer_processing_start_files=False (the # default), addsitedir creates a per-call _StartupState and # processes it before returning, so the caller sees the effect # immediately. No batch state is left behind on # site._startup_state. subdir = os.path.join(self.sitedir, 'flushlib') os.mkdir(subdir) self._make_pth("flushlib\n", name='foo') site.addsitedir(self.sitedir) # known_paths=None self.assertIn(subdir, sys.path) self.assertIsNone(site._startup_state) def test_addsitedir_defer_does_not_flush(self): # With defer_processing_start_files=True, addsitedir accumulates # pending state but does not flush; sys.path is updated only when # process_startup_files() is called explicitly. The accumulated # state lives on the lazily-promoted site._startup_state. subdir = os.path.join(self.sitedir, 'acclib') os.mkdir(subdir) self._make_pth("acclib\n", name='foo') site.addsitedir( self.sitedir, set(), defer_processing_start_files=True, ) # Path is pending, not yet on sys.path. self.assertNotIn(subdir, sys.path) fullname = os.path.join(self.sitedir, 'foo.pth') self.assertIn( subdir, site._startup_state._syspaths.get(fullname, []) ) def test_pth_path_is_available_to_start_entrypoint(self): # Core PEP 829 invariant: all .pth path extensions are applied to # sys.path *before* any .start entry point runs, so an entry # point may live in a module reachable only via a .pth-extended # path. If the flush phases were inverted, resolving the entry # point would fail with ModuleNotFoundError. extdir = self._make_mod("""\ called = False def hook(): global called called = True """) # extdir is not on sys.path; only the .pth file makes it so. self.assertNotIn(extdir, sys.path) self._make_pth("extdir\n", name='extlib') self._make_start("mod:hook\n", name='extlib') # Standalone addsitedir() triggers the full flush sequence. site.addsitedir(self.sitedir) self.assertIn(extdir, sys.path) import mod self.assertTrue( mod.called, "entry point did not run; .pth path was likely not applied " "before .start entry-point execution") # --- bugs --- # gh-75723 def test_addsitdir_idempotent_pth(self): # Adding the same sitedir twice with a known_paths, should not # process .pth files twice. extdir = self._make_mod("""\ _pth_count = 0 """) self._make_pth(f"""\ {extdir} import mod; mod._pth_count += 1 """) dirs = set() dirs = site.addsitedir(self.sitedir, dirs) dirs = site.addsitedir(self.sitedir, dirs) import mod self.assertEqual(mod._pth_count, 1) def test_addsitdir_idempotent_start(self): # Adding the same sitedir twice with a known_paths, should not # process .pth files twice. extdir = self._make_mod("""\ _pth_count = 0 def increment(): global _pth_count _pth_count += 1 """) self._make_pth(f"""\ {extdir} """) self._make_start("""\ mod:increment """) dirs = set() dirs = site.addsitedir(self.sitedir, dirs) dirs = site.addsitedir(self.sitedir, dirs) import mod self.assertEqual(mod._pth_count, 1) # gh-149504 def test_reentrant_addsitedir_pth(self): # An import line in a .pth file that calls site.addsitedir() # must not crash or re-execute outer entries while the outer # call is still processing its pending startup state. overlay = self.enterContext(os_helper.temp_dir()) overlay_pth = os.path.join(overlay, 'overlay.pth') pkgdir = self.enterContext(os_helper.temp_dir()) with open(overlay_pth, 'w', encoding='utf-8') as fp: print(pkgdir, file=fp) self._make_pth(f"import site; site.addsitedir({overlay!r})\n") site.addsitedir(self.sitedir, set()) self.assertIn(overlay, sys.path) self.assertIn(pkgdir, sys.path) # gh-149504 def test_reentrant_addsitedir_start(self): # As above, but the re-entry happens from a .start entry point # instead of a .pth import line. The entry point execution # phase is vulnerable to the same class of bug. overlay = self.enterContext(os_helper.temp_dir()) overlay_pth = os.path.join(overlay, 'overlay.pth') pkgdir = self.enterContext(os_helper.temp_dir()) with open(overlay_pth, 'w', encoding='utf-8') as fp: print(pkgdir, file=fp) self._make_mod(f"""\ import site def bootstrap(): site.addsitedir({overlay!r}) """, name='reenter_helper', on_path=True) self._make_start("reenter_helper:bootstrap\n") site.addsitedir(self.sitedir, set()) self.assertIn(overlay, sys.path) self.assertIn(pkgdir, sys.path) # gh-149819 @unittest.skipUnless(site.ENABLE_USER_SITE, "requires user-site") @support.requires_subprocess() def test_pth_processed_when_sitedir_already_on_path(self): # A .pth file in a site-packages directory must still be processed by # site.main() when that directory is already on sys.path at # interpreter start up, for example in a subprocess that inherits # PYTHONPATH from its parent. Before the fix, main() seeded # known_paths with all entries derived from removeduppaths(), and # addsitedir() then skipped .pth processing for any directory already # in known_paths. user_base = self.tmpdir user_site = site._get_path(user_base) os.makedirs(user_site) sentinel = "GH149819_PTH_RAN" # Writing some text to stderr is the simplest observable side effect. self._make_pth(f"""\ import sys; sys.stderr.write({sentinel!r}); sys.stderr.flush() """, name='gh149819', basedir=user_site) with EnvironmentVarGuard() as env: # PYTHONUSERBASE points USER_SITE at our temp directory so # site.main() will call addsitedir() on it, rather than on the # host interpreter's real user-site. env['PYTHONUSERBASE'] = user_base # PYTHONPATH puts that same directory on sys.path before # site.main() runs in the subprocess. This is what triggers the # bug: removeduppaths() records it in known_paths, and the unfixed # addsitedir() then skips .pth processing. env['PYTHONPATH'] = user_site result = subprocess.run( [sys.executable, '-c', ''], capture_output=True, check=True, ) self.assertIn(sentinel.encode(), result.stderr) @unittest.skipUnless(site.ENABLE_USER_SITE, "requires user-site") @support.requires_subprocess() def test_start_processed_when_sitedir_already_on_path(self): # Companion to test_pth_processed_when_sitedir_already_on_path: # the same dedup-guard skip in addsitedir() suppressed both .pth # and .start file processing, so verify .start entry points also # run for a site-packages directory inherited via PYTHONPATH. user_base = self.tmpdir user_site = site._get_path(user_base) os.makedirs(user_site) sentinel = "GH149819_START_RAN" # The .start entry point resolves to a callable, so we write a # tiny importable module that outputs the sentinel text. It lands in # /extdir. That path is added to PYTHONPATH below so # the subprocess can import it. extdir = self._make_mod(f"""\ import sys def run(): sys.stderr.write({sentinel!r}) sys.stderr.flush() """, name='gh149819mod') self._make_start( 'gh149819mod:run\n', name='gh149819', basedir=user_site ) with EnvironmentVarGuard() as env: # See above for details. env['PYTHONUSERBASE'] = user_base env['PYTHONPATH'] = os.pathsep.join([user_site, extdir]) result = subprocess.run( [sys.executable, '-c', ''], capture_output=True, check=True, ) self.assertIn(sentinel.encode(), result.stderr) if __name__ == "__main__": unittest.main()