mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 11:14:33 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			787 lines
		
	
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			787 lines
		
	
	
	
		
			24 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import collections.abc
 | |
| import contextlib
 | |
| import errno
 | |
| import os
 | |
| import re
 | |
| import stat
 | |
| import string
 | |
| import sys
 | |
| import time
 | |
| import unittest
 | |
| import warnings
 | |
| 
 | |
| from test import support
 | |
| 
 | |
| 
 | |
| # Filename used for testing
 | |
| TESTFN_ASCII = '@test'
 | |
| 
 | |
| # Disambiguate TESTFN for parallel testing, while letting it remain a valid
 | |
| # module name.
 | |
| TESTFN_ASCII = "{}_{}_tmp".format(TESTFN_ASCII, os.getpid())
 | |
| 
 | |
| # TESTFN_UNICODE is a non-ascii filename
 | |
| TESTFN_UNICODE = TESTFN_ASCII + "-\xe0\xf2\u0258\u0141\u011f"
 | |
| if support.is_apple:
 | |
|     # On Apple's VFS API file names are, by definition, canonically
 | |
|     # decomposed Unicode, encoded using UTF-8. See QA1173:
 | |
|     # http://developer.apple.com/mac/library/qa/qa2001/qa1173.html
 | |
|     import unicodedata
 | |
|     TESTFN_UNICODE = unicodedata.normalize('NFD', TESTFN_UNICODE)
 | |
| 
 | |
| # TESTFN_UNENCODABLE is a filename (str type) that should *not* be able to be
 | |
| # encoded by the filesystem encoding (in strict mode). It can be None if we
 | |
| # cannot generate such filename.
 | |
| TESTFN_UNENCODABLE = None
 | |
| if os.name == 'nt':
 | |
|     # skip win32s (0) or Windows 9x/ME (1)
 | |
|     if sys.getwindowsversion().platform >= 2:
 | |
|         # Different kinds of characters from various languages to minimize the
 | |
|         # probability that the whole name is encodable to MBCS (issue #9819)
 | |
|         TESTFN_UNENCODABLE = TESTFN_ASCII + "-\u5171\u0141\u2661\u0363\uDC80"
 | |
|         try:
 | |
|             TESTFN_UNENCODABLE.encode(sys.getfilesystemencoding())
 | |
|         except UnicodeEncodeError:
 | |
|             pass
 | |
|         else:
 | |
|             print('WARNING: The filename %r CAN be encoded by the filesystem '
 | |
|                   'encoding (%s). Unicode filename tests may not be effective'
 | |
|                   % (TESTFN_UNENCODABLE, sys.getfilesystemencoding()))
 | |
|             TESTFN_UNENCODABLE = None
 | |
| # Apple and Emscripten deny unencodable filenames (invalid utf-8)
 | |
| elif not support.is_apple and sys.platform not in {"emscripten", "wasi"}:
 | |
|     try:
 | |
|         # ascii and utf-8 cannot encode the byte 0xff
 | |
|         b'\xff'.decode(sys.getfilesystemencoding())
 | |
|     except UnicodeDecodeError:
 | |
|         # 0xff will be encoded using the surrogate character u+DCFF
 | |
|         TESTFN_UNENCODABLE = TESTFN_ASCII \
 | |
|             + b'-\xff'.decode(sys.getfilesystemencoding(), 'surrogateescape')
 | |
|     else:
 | |
|         # File system encoding (eg. ISO-8859-* encodings) can encode
 | |
|         # the byte 0xff. Skip some unicode filename tests.
 | |
|         pass
 | |
| 
 | |
| # FS_NONASCII: non-ASCII character encodable by os.fsencode(),
 | |
| # or an empty string if there is no such character.
 | |
| FS_NONASCII = ''
 | |
| for character in (
 | |
|     # First try printable and common characters to have a readable filename.
 | |
|     # For each character, the encoding list are just example of encodings able
 | |
|     # to encode the character (the list is not exhaustive).
 | |
| 
 | |
|     # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
 | |
|     '\u00E6',
 | |
|     # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
 | |
|     '\u0130',
 | |
|     # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
 | |
|     '\u0141',
 | |
|     # U+03C6 (Greek Small Letter Phi): cp1253
 | |
|     '\u03C6',
 | |
|     # U+041A (Cyrillic Capital Letter Ka): cp1251
 | |
|     '\u041A',
 | |
|     # U+05D0 (Hebrew Letter Alef): Encodable to cp424
 | |
|     '\u05D0',
 | |
|     # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
 | |
|     '\u060C',
 | |
|     # U+062A (Arabic Letter Teh): cp720
 | |
|     '\u062A',
 | |
|     # U+0E01 (Thai Character Ko Kai): cp874
 | |
|     '\u0E01',
 | |
| 
 | |
|     # Then try more "special" characters. "special" because they may be
 | |
|     # interpreted or displayed differently depending on the exact locale
 | |
|     # encoding and the font.
 | |
| 
 | |
|     # U+00A0 (No-Break Space)
 | |
|     '\u00A0',
 | |
|     # U+20AC (Euro Sign)
 | |
|     '\u20AC',
 | |
| ):
 | |
|     try:
 | |
|         # If Python is set up to use the legacy 'mbcs' in Windows,
 | |
|         # 'replace' error mode is used, and encode() returns b'?'
 | |
|         # for characters missing in the ANSI codepage
 | |
|         if os.fsdecode(os.fsencode(character)) != character:
 | |
|             raise UnicodeError
 | |
|     except UnicodeError:
 | |
|         pass
 | |
|     else:
 | |
|         FS_NONASCII = character
 | |
|         break
 | |
| 
 | |
| # Save the initial cwd
 | |
| SAVEDCWD = os.getcwd()
 | |
| 
 | |
| # TESTFN_UNDECODABLE is a filename (bytes type) that should *not* be able to be
 | |
| # decoded from the filesystem encoding (in strict mode). It can be None if we
 | |
| # cannot generate such filename (ex: the latin1 encoding can decode any byte
 | |
| # sequence). On UNIX, TESTFN_UNDECODABLE can be decoded by os.fsdecode() thanks
 | |
| # to the surrogateescape error handler (PEP 383), but not from the filesystem
 | |
| # encoding in strict mode.
 | |
| TESTFN_UNDECODABLE = None
 | |
| for name in (
 | |
|     # b'\xff' is not decodable by os.fsdecode() with code page 932. Windows
 | |
|     # accepts it to create a file or a directory, or don't accept to enter to
 | |
|     # such directory (when the bytes name is used). So test b'\xe7' first:
 | |
|     # it is not decodable from cp932.
 | |
|     b'\xe7w\xf0',
 | |
|     # undecodable from ASCII, UTF-8
 | |
|     b'\xff',
 | |
|     # undecodable from iso8859-3, iso8859-6, iso8859-7, cp424, iso8859-8, cp856
 | |
|     # and cp857
 | |
|     b'\xae\xd5'
 | |
|     # undecodable from UTF-8 (UNIX and Mac OS X)
 | |
|     b'\xed\xb2\x80', b'\xed\xb4\x80',
 | |
|     # undecodable from shift_jis, cp869, cp874, cp932, cp1250, cp1251, cp1252,
 | |
|     # cp1253, cp1254, cp1255, cp1257, cp1258
 | |
|     b'\x81\x98',
 | |
| ):
 | |
|     try:
 | |
|         name.decode(sys.getfilesystemencoding())
 | |
|     except UnicodeDecodeError:
 | |
|         try:
 | |
|             name.decode(sys.getfilesystemencoding(),
 | |
|                         sys.getfilesystemencodeerrors())
 | |
|         except UnicodeDecodeError:
 | |
|             continue
 | |
|         TESTFN_UNDECODABLE = os.fsencode(TESTFN_ASCII) + name
 | |
|         break
 | |
| 
 | |
| if FS_NONASCII:
 | |
|     TESTFN_NONASCII = TESTFN_ASCII + FS_NONASCII
 | |
| else:
 | |
|     TESTFN_NONASCII = None
 | |
| TESTFN = TESTFN_NONASCII or TESTFN_ASCII
 | |
| 
 | |
| 
 | |
| def make_bad_fd():
 | |
|     """
 | |
|     Create an invalid file descriptor by opening and closing a file and return
 | |
|     its fd.
 | |
|     """
 | |
|     file = open(TESTFN, "wb")
 | |
|     try:
 | |
|         return file.fileno()
 | |
|     finally:
 | |
|         file.close()
 | |
|         unlink(TESTFN)
 | |
| 
 | |
| 
 | |
| _can_symlink = None
 | |
| 
 | |
| 
 | |
| def can_symlink():
 | |
|     global _can_symlink
 | |
|     if _can_symlink is not None:
 | |
|         return _can_symlink
 | |
|     # WASI / wasmtime prevents symlinks with absolute paths, see man
 | |
|     # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute
 | |
|     # paths. Skip symlink tests on WASI for now.
 | |
|     src = os.path.abspath(TESTFN)
 | |
|     symlink_path = src + "can_symlink"
 | |
|     try:
 | |
|         os.symlink(src, symlink_path)
 | |
|         can = True
 | |
|     except (OSError, NotImplementedError, AttributeError):
 | |
|         can = False
 | |
|     else:
 | |
|         os.remove(symlink_path)
 | |
|     _can_symlink = can
 | |
|     return can
 | |
| 
 | |
| 
 | |
| def skip_unless_symlink(test):
 | |
|     """Skip decorator for tests that require functional symlink"""
 | |
|     ok = can_symlink()
 | |
|     msg = "Requires functional symlink implementation"
 | |
|     return test if ok else unittest.skip(msg)(test)
 | |
| 
 | |
| 
 | |
| _can_hardlink = None
 | |
| 
 | |
| def can_hardlink():
 | |
|     global _can_hardlink
 | |
|     if _can_hardlink is None:
 | |
|         # Android blocks hard links using SELinux
 | |
|         # (https://stackoverflow.com/q/32365690).
 | |
|         _can_hardlink = hasattr(os, "link") and not support.is_android
 | |
|     return _can_hardlink
 | |
| 
 | |
| 
 | |
| def skip_unless_hardlink(test):
 | |
|     ok = can_hardlink()
 | |
|     msg = "requires hardlink support"
 | |
|     return test if ok else unittest.skip(msg)(test)
 | |
| 
 | |
| 
 | |
| _can_xattr = None
 | |
| 
 | |
| 
 | |
| def can_xattr():
 | |
|     import tempfile
 | |
|     global _can_xattr
 | |
|     if _can_xattr is not None:
 | |
|         return _can_xattr
 | |
|     if not hasattr(os, "setxattr"):
 | |
|         can = False
 | |
|     else:
 | |
|         import platform
 | |
|         tmp_dir = tempfile.mkdtemp()
 | |
|         tmp_fp, tmp_name = tempfile.mkstemp(dir=tmp_dir)
 | |
|         try:
 | |
|             with open(TESTFN, "wb") as fp:
 | |
|                 try:
 | |
|                     # TESTFN & tempfile may use different file systems with
 | |
|                     # different capabilities
 | |
|                     os.setxattr(tmp_fp, b"user.test", b"")
 | |
|                     os.setxattr(tmp_name, b"trusted.foo", b"42")
 | |
|                     os.setxattr(fp.fileno(), b"user.test", b"")
 | |
|                     # Kernels < 2.6.39 don't respect setxattr flags.
 | |
|                     kernel_version = platform.release()
 | |
|                     m = re.match(r"2.6.(\d{1,2})", kernel_version)
 | |
|                     can = m is None or int(m.group(1)) >= 39
 | |
|                 except OSError:
 | |
|                     can = False
 | |
|         finally:
 | |
|             unlink(TESTFN)
 | |
|             unlink(tmp_name)
 | |
|             rmdir(tmp_dir)
 | |
|     _can_xattr = can
 | |
|     return can
 | |
| 
 | |
| 
 | |
| def skip_unless_xattr(test):
 | |
|     """Skip decorator for tests that require functional extended attributes"""
 | |
|     ok = can_xattr()
 | |
|     msg = "no non-broken extended attribute support"
 | |
|     return test if ok else unittest.skip(msg)(test)
 | |
| 
 | |
| 
 | |
| _can_chmod = None
 | |
| 
 | |
| def can_chmod():
 | |
|     global _can_chmod
 | |
|     if _can_chmod is not None:
 | |
|         return _can_chmod
 | |
|     if not hasattr(os, "chmod"):
 | |
|         _can_chmod = False
 | |
|         return _can_chmod
 | |
|     try:
 | |
|         with open(TESTFN, "wb") as f:
 | |
|             try:
 | |
|                 os.chmod(TESTFN, 0o555)
 | |
|                 mode1 = os.stat(TESTFN).st_mode
 | |
|                 os.chmod(TESTFN, 0o777)
 | |
|                 mode2 = os.stat(TESTFN).st_mode
 | |
|             except OSError as e:
 | |
|                 can = False
 | |
|             else:
 | |
|                 can = stat.S_IMODE(mode1) != stat.S_IMODE(mode2)
 | |
|     finally:
 | |
|         unlink(TESTFN)
 | |
|     _can_chmod = can
 | |
|     return can
 | |
| 
 | |
| 
 | |
| def skip_unless_working_chmod(test):
 | |
|     """Skip tests that require working os.chmod()
 | |
| 
 | |
|     WASI SDK 15.0 cannot change file mode bits.
 | |
|     """
 | |
|     ok = can_chmod()
 | |
|     msg = "requires working os.chmod()"
 | |
|     return test if ok else unittest.skip(msg)(test)
 | |
| 
 | |
| 
 | |
| # Check whether the current effective user has the capability to override
 | |
| # DAC (discretionary access control). Typically user root is able to
 | |
| # bypass file read, write, and execute permission checks. The capability
 | |
| # is independent of the effective user. See capabilities(7).
 | |
| _can_dac_override = None
 | |
| 
 | |
| def can_dac_override():
 | |
|     global _can_dac_override
 | |
| 
 | |
|     if not can_chmod():
 | |
|         _can_dac_override = False
 | |
|     if _can_dac_override is not None:
 | |
|         return _can_dac_override
 | |
| 
 | |
|     try:
 | |
|         with open(TESTFN, "wb") as f:
 | |
|             os.chmod(TESTFN, 0o400)
 | |
|             try:
 | |
|                 with open(TESTFN, "wb"):
 | |
|                     pass
 | |
|             except OSError:
 | |
|                 _can_dac_override = False
 | |
|             else:
 | |
|                 _can_dac_override = True
 | |
|     finally:
 | |
|         try:
 | |
|             os.chmod(TESTFN, 0o700)
 | |
|         except OSError:
 | |
|             pass
 | |
|         unlink(TESTFN)
 | |
| 
 | |
|     return _can_dac_override
 | |
| 
 | |
| 
 | |
| def skip_if_dac_override(test):
 | |
|     ok = not can_dac_override()
 | |
|     msg = "incompatible with CAP_DAC_OVERRIDE"
 | |
|     return test if ok else unittest.skip(msg)(test)
 | |
| 
 | |
| 
 | |
| def skip_unless_dac_override(test):
 | |
|     ok = can_dac_override()
 | |
|     msg = "requires CAP_DAC_OVERRIDE"
 | |
|     return test if ok else unittest.skip(msg)(test)
 | |
| 
 | |
| 
 | |
| def unlink(filename):
 | |
|     try:
 | |
|         _unlink(filename)
 | |
|     except (FileNotFoundError, NotADirectoryError):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| if sys.platform.startswith("win"):
 | |
|     def _waitfor(func, pathname, waitall=False):
 | |
|         # Perform the operation
 | |
|         func(pathname)
 | |
|         # Now setup the wait loop
 | |
|         if waitall:
 | |
|             dirname = pathname
 | |
|         else:
 | |
|             dirname, name = os.path.split(pathname)
 | |
|             dirname = dirname or '.'
 | |
|         # Check for `pathname` to be removed from the filesystem.
 | |
|         # The exponential backoff of the timeout amounts to a total
 | |
|         # of ~1 second after which the deletion is probably an error
 | |
|         # anyway.
 | |
|         # Testing on an i7@4.3GHz shows that usually only 1 iteration is
 | |
|         # required when contention occurs.
 | |
|         timeout = 0.001
 | |
|         while timeout < 1.0:
 | |
|             # Note we are only testing for the existence of the file(s) in
 | |
|             # the contents of the directory regardless of any security or
 | |
|             # access rights.  If we have made it this far, we have sufficient
 | |
|             # permissions to do that much using Python's equivalent of the
 | |
|             # Windows API FindFirstFile.
 | |
|             # Other Windows APIs can fail or give incorrect results when
 | |
|             # dealing with files that are pending deletion.
 | |
|             L = os.listdir(dirname)
 | |
|             if not (L if waitall else name in L):
 | |
|                 return
 | |
|             # Increase the timeout and try again
 | |
|             time.sleep(timeout)
 | |
|             timeout *= 2
 | |
|         warnings.warn('tests may fail, delete still pending for ' + pathname,
 | |
|                       RuntimeWarning, stacklevel=4)
 | |
| 
 | |
|     def _unlink(filename):
 | |
|         _waitfor(os.unlink, filename)
 | |
| 
 | |
|     def _rmdir(dirname):
 | |
|         _waitfor(os.rmdir, dirname)
 | |
| 
 | |
|     def _rmtree(path):
 | |
|         from test.support import _force_run
 | |
| 
 | |
|         def _rmtree_inner(path):
 | |
|             for name in _force_run(path, os.listdir, path):
 | |
|                 fullname = os.path.join(path, name)
 | |
|                 try:
 | |
|                     mode = os.lstat(fullname).st_mode
 | |
|                 except OSError as exc:
 | |
|                     print("support.rmtree(): os.lstat(%r) failed with %s"
 | |
|                           % (fullname, exc),
 | |
|                           file=sys.__stderr__)
 | |
|                     mode = 0
 | |
|                 if stat.S_ISDIR(mode):
 | |
|                     _waitfor(_rmtree_inner, fullname, waitall=True)
 | |
|                     _force_run(fullname, os.rmdir, fullname)
 | |
|                 else:
 | |
|                     _force_run(fullname, os.unlink, fullname)
 | |
|         _waitfor(_rmtree_inner, path, waitall=True)
 | |
|         _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
 | |
| 
 | |
|     def _longpath(path):
 | |
|         try:
 | |
|             import ctypes
 | |
|         except ImportError:
 | |
|             # No ctypes means we can't expands paths.
 | |
|             pass
 | |
|         else:
 | |
|             buffer = ctypes.create_unicode_buffer(len(path) * 2)
 | |
|             length = ctypes.windll.kernel32.GetLongPathNameW(path, buffer,
 | |
|                                                              len(buffer))
 | |
|             if length:
 | |
|                 return buffer[:length]
 | |
|         return path
 | |
| else:
 | |
|     _unlink = os.unlink
 | |
|     _rmdir = os.rmdir
 | |
| 
 | |
|     def _rmtree(path):
 | |
|         import shutil
 | |
|         try:
 | |
|             shutil.rmtree(path)
 | |
|             return
 | |
|         except OSError:
 | |
|             pass
 | |
| 
 | |
|         def _rmtree_inner(path):
 | |
|             from test.support import _force_run
 | |
|             for name in _force_run(path, os.listdir, path):
 | |
|                 fullname = os.path.join(path, name)
 | |
|                 try:
 | |
|                     mode = os.lstat(fullname).st_mode
 | |
|                 except OSError:
 | |
|                     mode = 0
 | |
|                 if stat.S_ISDIR(mode):
 | |
|                     _rmtree_inner(fullname)
 | |
|                     _force_run(path, os.rmdir, fullname)
 | |
|                 else:
 | |
|                     _force_run(path, os.unlink, fullname)
 | |
|         _rmtree_inner(path)
 | |
|         os.rmdir(path)
 | |
| 
 | |
|     def _longpath(path):
 | |
|         return path
 | |
| 
 | |
| 
 | |
| def rmdir(dirname):
 | |
|     try:
 | |
|         _rmdir(dirname)
 | |
|     except FileNotFoundError:
 | |
|         pass
 | |
| 
 | |
| 
 | |
| def rmtree(path):
 | |
|     try:
 | |
|         _rmtree(path)
 | |
|     except FileNotFoundError:
 | |
|         pass
 | |
| 
 | |
| 
 | |
| @contextlib.contextmanager
 | |
| def temp_dir(path=None, quiet=False):
 | |
|     """Return a context manager that creates a temporary directory.
 | |
| 
 | |
|     Arguments:
 | |
| 
 | |
|       path: the directory to create temporarily.  If omitted or None,
 | |
|         defaults to creating a temporary directory using tempfile.mkdtemp.
 | |
| 
 | |
|       quiet: if False (the default), the context manager raises an exception
 | |
|         on error.  Otherwise, if the path is specified and cannot be
 | |
|         created, only a warning is issued.
 | |
| 
 | |
|     """
 | |
|     import tempfile
 | |
|     dir_created = False
 | |
|     if path is None:
 | |
|         path = tempfile.mkdtemp()
 | |
|         dir_created = True
 | |
|         path = os.path.realpath(path)
 | |
|     else:
 | |
|         try:
 | |
|             os.mkdir(path)
 | |
|             dir_created = True
 | |
|         except OSError as exc:
 | |
|             if not quiet:
 | |
|                 raise
 | |
|             warnings.warn(f'tests may fail, unable to create '
 | |
|                           f'temporary directory {path!r}: {exc}',
 | |
|                           RuntimeWarning, stacklevel=3)
 | |
|     if dir_created:
 | |
|         pid = os.getpid()
 | |
|     try:
 | |
|         yield path
 | |
|     finally:
 | |
|         # In case the process forks, let only the parent remove the
 | |
|         # directory. The child has a different process id. (bpo-30028)
 | |
|         if dir_created and pid == os.getpid():
 | |
|             rmtree(path)
 | |
| 
 | |
| 
 | |
| @contextlib.contextmanager
 | |
| def change_cwd(path, quiet=False):
 | |
|     """Return a context manager that changes the current working directory.
 | |
| 
 | |
|     Arguments:
 | |
| 
 | |
|       path: the directory to use as the temporary current working directory.
 | |
| 
 | |
|       quiet: if False (the default), the context manager raises an exception
 | |
|         on error.  Otherwise, it issues only a warning and keeps the current
 | |
|         working directory the same.
 | |
| 
 | |
|     """
 | |
|     saved_dir = os.getcwd()
 | |
|     try:
 | |
|         os.chdir(os.path.realpath(path))
 | |
|     except OSError as exc:
 | |
|         if not quiet:
 | |
|             raise
 | |
|         warnings.warn(f'tests may fail, unable to change the current working '
 | |
|                       f'directory to {path!r}: {exc}',
 | |
|                       RuntimeWarning, stacklevel=3)
 | |
|     try:
 | |
|         yield os.getcwd()
 | |
|     finally:
 | |
|         os.chdir(saved_dir)
 | |
| 
 | |
| 
 | |
| @contextlib.contextmanager
 | |
| def temp_cwd(name='tempcwd', quiet=False):
 | |
|     """
 | |
|     Context manager that temporarily creates and changes the CWD.
 | |
| 
 | |
|     The function temporarily changes the current working directory
 | |
|     after creating a temporary directory in the current directory with
 | |
|     name *name*.  If *name* is None, the temporary directory is
 | |
|     created using tempfile.mkdtemp.
 | |
| 
 | |
|     If *quiet* is False (default) and it is not possible to
 | |
|     create or change the CWD, an error is raised.  If *quiet* is True,
 | |
|     only a warning is raised and the original CWD is used.
 | |
| 
 | |
|     """
 | |
|     with temp_dir(path=name, quiet=quiet) as temp_path:
 | |
|         with change_cwd(temp_path, quiet=quiet) as cwd_dir:
 | |
|             yield cwd_dir
 | |
| 
 | |
| 
 | |
| def create_empty_file(filename):
 | |
|     """Create an empty file. If the file already exists, truncate it."""
 | |
|     fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_TRUNC)
 | |
|     os.close(fd)
 | |
| 
 | |
| 
 | |
| @contextlib.contextmanager
 | |
| def open_dir_fd(path):
 | |
|     """Open a file descriptor to a directory."""
 | |
|     assert os.path.isdir(path)
 | |
|     flags = os.O_RDONLY
 | |
|     if hasattr(os, "O_DIRECTORY"):
 | |
|         flags |= os.O_DIRECTORY
 | |
|     dir_fd = os.open(path, flags)
 | |
|     try:
 | |
|         yield dir_fd
 | |
|     finally:
 | |
|         os.close(dir_fd)
 | |
| 
 | |
| 
 | |
| def fs_is_case_insensitive(directory):
 | |
|     """Detects if the file system for the specified directory
 | |
|     is case-insensitive."""
 | |
|     import tempfile
 | |
|     with tempfile.NamedTemporaryFile(dir=directory) as base:
 | |
|         base_path = base.name
 | |
|         case_path = base_path.upper()
 | |
|         if case_path == base_path:
 | |
|             case_path = base_path.lower()
 | |
|         try:
 | |
|             return os.path.samefile(base_path, case_path)
 | |
|         except FileNotFoundError:
 | |
|             return False
 | |
| 
 | |
| 
 | |
| class FakePath:
 | |
|     """Simple implementation of the path protocol.
 | |
|     """
 | |
|     def __init__(self, path):
 | |
|         self.path = path
 | |
| 
 | |
|     def __repr__(self):
 | |
|         return f'<FakePath {self.path!r}>'
 | |
| 
 | |
|     def __fspath__(self):
 | |
|         if (isinstance(self.path, BaseException) or
 | |
|             isinstance(self.path, type) and
 | |
|                 issubclass(self.path, BaseException)):
 | |
|             raise self.path
 | |
|         else:
 | |
|             return self.path
 | |
| 
 | |
| 
 | |
| def fd_count():
 | |
|     """Count the number of open file descriptors.
 | |
|     """
 | |
|     if sys.platform.startswith(('linux', 'android', 'freebsd', 'emscripten')):
 | |
|         fd_path = "/proc/self/fd"
 | |
|     elif sys.platform == "darwin":
 | |
|         fd_path = "/dev/fd"
 | |
|     else:
 | |
|         fd_path = None
 | |
| 
 | |
|     if fd_path is not None:
 | |
|         try:
 | |
|             names = os.listdir(fd_path)
 | |
|             # Subtract one because listdir() internally opens a file
 | |
|             # descriptor to list the content of the directory.
 | |
|             return len(names) - 1
 | |
|         except FileNotFoundError:
 | |
|             pass
 | |
| 
 | |
|     MAXFD = 256
 | |
|     if hasattr(os, 'sysconf'):
 | |
|         try:
 | |
|             MAXFD = os.sysconf("SC_OPEN_MAX")
 | |
|         except (OSError, ValueError):
 | |
|             # gh-118201: ValueError is raised intermittently on iOS
 | |
|             pass
 | |
| 
 | |
|     old_modes = None
 | |
|     if sys.platform == 'win32':
 | |
|         # bpo-25306, bpo-31009: Call CrtSetReportMode() to not kill the process
 | |
|         # on invalid file descriptor if Python is compiled in debug mode
 | |
|         try:
 | |
|             import msvcrt
 | |
|             msvcrt.CrtSetReportMode
 | |
|         except (AttributeError, ImportError):
 | |
|             # no msvcrt or a release build
 | |
|             pass
 | |
|         else:
 | |
|             old_modes = {}
 | |
|             for report_type in (msvcrt.CRT_WARN,
 | |
|                                 msvcrt.CRT_ERROR,
 | |
|                                 msvcrt.CRT_ASSERT):
 | |
|                 old_modes[report_type] = msvcrt.CrtSetReportMode(report_type,
 | |
|                                                                  0)
 | |
| 
 | |
|     try:
 | |
|         count = 0
 | |
|         for fd in range(MAXFD):
 | |
|             try:
 | |
|                 # Prefer dup() over fstat(). fstat() can require input/output
 | |
|                 # whereas dup() doesn't.
 | |
|                 fd2 = os.dup(fd)
 | |
|             except OSError as e:
 | |
|                 if e.errno != errno.EBADF:
 | |
|                     raise
 | |
|             else:
 | |
|                 os.close(fd2)
 | |
|                 count += 1
 | |
|     finally:
 | |
|         if old_modes is not None:
 | |
|             for report_type in (msvcrt.CRT_WARN,
 | |
|                                 msvcrt.CRT_ERROR,
 | |
|                                 msvcrt.CRT_ASSERT):
 | |
|                 msvcrt.CrtSetReportMode(report_type, old_modes[report_type])
 | |
| 
 | |
|     return count
 | |
| 
 | |
| 
 | |
| if hasattr(os, "umask"):
 | |
|     @contextlib.contextmanager
 | |
|     def temp_umask(umask):
 | |
|         """Context manager that temporarily sets the process umask."""
 | |
|         oldmask = os.umask(umask)
 | |
|         try:
 | |
|             yield
 | |
|         finally:
 | |
|             os.umask(oldmask)
 | |
| else:
 | |
|     @contextlib.contextmanager
 | |
|     def temp_umask(umask):
 | |
|         """no-op on platforms without umask()"""
 | |
|         yield
 | |
| 
 | |
| 
 | |
| class EnvironmentVarGuard(collections.abc.MutableMapping):
 | |
| 
 | |
|     """Class to help protect the environment variable properly.  Can be used as
 | |
|     a context manager."""
 | |
| 
 | |
|     def __init__(self):
 | |
|         self._environ = os.environ
 | |
|         self._changed = {}
 | |
| 
 | |
|     def __getitem__(self, envvar):
 | |
|         return self._environ[envvar]
 | |
| 
 | |
|     def __setitem__(self, envvar, value):
 | |
|         # Remember the initial value on the first access
 | |
|         if envvar not in self._changed:
 | |
|             self._changed[envvar] = self._environ.get(envvar)
 | |
|         self._environ[envvar] = value
 | |
| 
 | |
|     def __delitem__(self, envvar):
 | |
|         # Remember the initial value on the first access
 | |
|         if envvar not in self._changed:
 | |
|             self._changed[envvar] = self._environ.get(envvar)
 | |
|         if envvar in self._environ:
 | |
|             del self._environ[envvar]
 | |
| 
 | |
|     def keys(self):
 | |
|         return self._environ.keys()
 | |
| 
 | |
|     def __iter__(self):
 | |
|         return iter(self._environ)
 | |
| 
 | |
|     def __len__(self):
 | |
|         return len(self._environ)
 | |
| 
 | |
|     def set(self, envvar, value):
 | |
|         self[envvar] = value
 | |
| 
 | |
|     def unset(self, envvar):
 | |
|         del self[envvar]
 | |
| 
 | |
|     def copy(self):
 | |
|         # We do what os.environ.copy() does.
 | |
|         return dict(self)
 | |
| 
 | |
|     def __enter__(self):
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, *ignore_exc):
 | |
|         for (k, v) in self._changed.items():
 | |
|             if v is None:
 | |
|                 if k in self._environ:
 | |
|                     del self._environ[k]
 | |
|             else:
 | |
|                 self._environ[k] = v
 | |
|         os.environ = self._environ
 | |
| 
 | |
| 
 | |
| try:
 | |
|     if support.MS_WINDOWS:
 | |
|         import ctypes
 | |
|         kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)
 | |
| 
 | |
|         ERROR_FILE_NOT_FOUND = 2
 | |
|         DDD_REMOVE_DEFINITION = 2
 | |
|         DDD_EXACT_MATCH_ON_REMOVE = 4
 | |
|         DDD_NO_BROADCAST_SYSTEM = 8
 | |
|     else:
 | |
|         raise AttributeError
 | |
| except (ImportError, AttributeError):
 | |
|     def subst_drive(path):
 | |
|         raise unittest.SkipTest('ctypes or kernel32 is not available')
 | |
| else:
 | |
|     @contextlib.contextmanager
 | |
|     def subst_drive(path):
 | |
|         """Temporarily yield a substitute drive for a given path."""
 | |
|         for c in reversed(string.ascii_uppercase):
 | |
|             drive = f'{c}:'
 | |
|             if (not kernel32.QueryDosDeviceW(drive, None, 0) and
 | |
|                     ctypes.get_last_error() == ERROR_FILE_NOT_FOUND):
 | |
|                 break
 | |
|         else:
 | |
|             raise unittest.SkipTest('no available logical drive')
 | |
|         if not kernel32.DefineDosDeviceW(
 | |
|                 DDD_NO_BROADCAST_SYSTEM, drive, path):
 | |
|             raise ctypes.WinError(ctypes.get_last_error())
 | |
|         try:
 | |
|             yield drive
 | |
|         finally:
 | |
|             if not kernel32.DefineDosDeviceW(
 | |
|                     DDD_REMOVE_DEFINITION | DDD_EXACT_MATCH_ON_REMOVE,
 | |
|                     drive, path):
 | |
|                 raise ctypes.WinError(ctypes.get_last_error())
 | 
