mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
gh-139322: Create test_os package (#139453)
* Move test_posix.py and test_os.py to Lib/test/test_os/. * Split Windows specific test cases to a new test_windows.py file.
This commit is contained in:
parent
f39dea3bae
commit
0158890005
7 changed files with 653 additions and 630 deletions
|
|
@ -25,10 +25,11 @@
|
|||
"test_gdb",
|
||||
"test_inspect",
|
||||
"test_io",
|
||||
"test_pydoc",
|
||||
"test_multiprocessing_fork",
|
||||
"test_multiprocessing_forkserver",
|
||||
"test_multiprocessing_spawn",
|
||||
"test_os",
|
||||
"test_pydoc",
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
6
Lib/test/test_os/__init__.py
Normal file
6
Lib/test/test_os/__init__.py
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
import os.path
|
||||
from test.support import load_package_tests
|
||||
|
||||
|
||||
def load_tests(*args):
|
||||
return load_package_tests(os.path.dirname(__file__), *args)
|
||||
|
|
@ -7,7 +7,6 @@
|
|||
import contextlib
|
||||
import decimal
|
||||
import errno
|
||||
import fnmatch
|
||||
import fractions
|
||||
import itertools
|
||||
import locale
|
||||
|
|
@ -31,12 +30,12 @@
|
|||
import uuid
|
||||
import warnings
|
||||
from test import support
|
||||
from test.support import import_helper
|
||||
from test.support import os_helper
|
||||
from test.support import socket_helper
|
||||
from test.support import infinite_recursion
|
||||
from test.support import warnings_helper
|
||||
from platform import win32_is_iot
|
||||
from .utils import create_file
|
||||
|
||||
try:
|
||||
import resource
|
||||
|
|
@ -46,10 +45,6 @@
|
|||
import fcntl
|
||||
except ImportError:
|
||||
fcntl = None
|
||||
try:
|
||||
import _winapi
|
||||
except ImportError:
|
||||
_winapi = None
|
||||
try:
|
||||
import pwd
|
||||
all_users = [u.pw_uid for u in pwd.getpwall()]
|
||||
|
|
@ -93,11 +88,6 @@ def requires_os_func(name):
|
|||
return unittest.skipUnless(hasattr(os, name), 'requires os.%s' % name)
|
||||
|
||||
|
||||
def create_file(filename, content=b'content'):
|
||||
with open(filename, "xb", 0) as fp:
|
||||
fp.write(content)
|
||||
|
||||
|
||||
# bpo-41625: On AIX, splice() only works with a socket, not with a pipe.
|
||||
requires_splice_pipe = unittest.skipIf(sys.platform.startswith("aix"),
|
||||
'on AIX, splice() only accepts sockets')
|
||||
|
|
@ -2466,42 +2456,6 @@ def test_execve_with_empty_path(self):
|
|||
self.fail('No OSError raised')
|
||||
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
class Win32ErrorTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
try:
|
||||
os.stat(os_helper.TESTFN)
|
||||
except FileNotFoundError:
|
||||
exists = False
|
||||
except OSError as exc:
|
||||
exists = True
|
||||
self.fail("file %s must not exist; os.stat failed with %s"
|
||||
% (os_helper.TESTFN, exc))
|
||||
else:
|
||||
self.fail("file %s must not exist" % os_helper.TESTFN)
|
||||
|
||||
def test_rename(self):
|
||||
self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
|
||||
|
||||
def test_remove(self):
|
||||
self.assertRaises(OSError, os.remove, os_helper.TESTFN)
|
||||
|
||||
def test_chdir(self):
|
||||
self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
|
||||
|
||||
def test_mkdir(self):
|
||||
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||
|
||||
with open(os_helper.TESTFN, "x") as f:
|
||||
self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
|
||||
|
||||
def test_utime(self):
|
||||
self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
|
||||
|
||||
def test_chmod(self):
|
||||
self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
|
||||
|
||||
|
||||
@unittest.skipIf(support.is_wasi, "Cannot create invalid FD on WASI.")
|
||||
class TestInvalidFD(unittest.TestCase):
|
||||
singles = ["fchdir", "dup", "fstat", "fstatvfs", "tcgetpgrp", "ttyname"]
|
||||
|
|
@ -2836,224 +2790,6 @@ def test_stat(self):
|
|||
for fn in self.unicodefn:
|
||||
os.stat(os.path.join(self.dir, fn))
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
class Win32KillTests(unittest.TestCase):
|
||||
def _kill(self, sig):
|
||||
# Start sys.executable as a subprocess and communicate from the
|
||||
# subprocess to the parent that the interpreter is ready. When it
|
||||
# becomes ready, send *sig* via os.kill to the subprocess and check
|
||||
# that the return code is equal to *sig*.
|
||||
import ctypes
|
||||
from ctypes import wintypes
|
||||
import msvcrt
|
||||
|
||||
# Since we can't access the contents of the process' stdout until the
|
||||
# process has exited, use PeekNamedPipe to see what's inside stdout
|
||||
# without waiting. This is done so we can tell that the interpreter
|
||||
# is started and running at a point where it could handle a signal.
|
||||
PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
|
||||
PeekNamedPipe.restype = wintypes.BOOL
|
||||
PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
|
||||
ctypes.POINTER(ctypes.c_char), # stdout buf
|
||||
wintypes.DWORD, # Buffer size
|
||||
ctypes.POINTER(wintypes.DWORD), # bytes read
|
||||
ctypes.POINTER(wintypes.DWORD), # bytes avail
|
||||
ctypes.POINTER(wintypes.DWORD)) # bytes left
|
||||
msg = "running"
|
||||
proc = subprocess.Popen([sys.executable, "-c",
|
||||
"import sys;"
|
||||
"sys.stdout.write('{}');"
|
||||
"sys.stdout.flush();"
|
||||
"input()".format(msg)],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
stdin=subprocess.PIPE)
|
||||
self.addCleanup(proc.stdout.close)
|
||||
self.addCleanup(proc.stderr.close)
|
||||
self.addCleanup(proc.stdin.close)
|
||||
|
||||
count, max = 0, 100
|
||||
while count < max and proc.poll() is None:
|
||||
# Create a string buffer to store the result of stdout from the pipe
|
||||
buf = ctypes.create_string_buffer(len(msg))
|
||||
# Obtain the text currently in proc.stdout
|
||||
# Bytes read/avail/left are left as NULL and unused
|
||||
rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
|
||||
buf, ctypes.sizeof(buf), None, None, None)
|
||||
self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
|
||||
if buf.value:
|
||||
self.assertEqual(msg, buf.value.decode())
|
||||
break
|
||||
time.sleep(0.1)
|
||||
count += 1
|
||||
else:
|
||||
self.fail("Did not receive communication from the subprocess")
|
||||
|
||||
os.kill(proc.pid, sig)
|
||||
self.assertEqual(proc.wait(), sig)
|
||||
|
||||
def test_kill_sigterm(self):
|
||||
# SIGTERM doesn't mean anything special, but make sure it works
|
||||
self._kill(signal.SIGTERM)
|
||||
|
||||
def test_kill_int(self):
|
||||
# os.kill on Windows can take an int which gets set as the exit code
|
||||
self._kill(100)
|
||||
|
||||
@unittest.skipIf(mmap is None, "requires mmap")
|
||||
def _kill_with_event(self, event, name):
|
||||
tagname = "test_os_%s" % uuid.uuid1()
|
||||
m = mmap.mmap(-1, 1, tagname)
|
||||
m[0] = 0
|
||||
|
||||
# Run a script which has console control handling enabled.
|
||||
script = os.path.join(os.path.dirname(__file__),
|
||||
"win_console_handler.py")
|
||||
cmd = [sys.executable, script, tagname]
|
||||
proc = subprocess.Popen(cmd,
|
||||
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
|
||||
|
||||
with proc:
|
||||
# Let the interpreter startup before we send signals. See #3137.
|
||||
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||
if proc.poll() is None:
|
||||
break
|
||||
else:
|
||||
# Forcefully kill the process if we weren't able to signal it.
|
||||
proc.kill()
|
||||
self.fail("Subprocess didn't finish initialization")
|
||||
|
||||
os.kill(proc.pid, event)
|
||||
|
||||
try:
|
||||
# proc.send_signal(event) could also be done here.
|
||||
# Allow time for the signal to be passed and the process to exit.
|
||||
proc.wait(timeout=support.SHORT_TIMEOUT)
|
||||
except subprocess.TimeoutExpired:
|
||||
# Forcefully kill the process if we weren't able to signal it.
|
||||
proc.kill()
|
||||
self.fail("subprocess did not stop on {}".format(name))
|
||||
|
||||
@unittest.skip("subprocesses aren't inheriting Ctrl+C property")
|
||||
@support.requires_subprocess()
|
||||
def test_CTRL_C_EVENT(self):
|
||||
from ctypes import wintypes
|
||||
import ctypes
|
||||
|
||||
# Make a NULL value by creating a pointer with no argument.
|
||||
NULL = ctypes.POINTER(ctypes.c_int)()
|
||||
SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
|
||||
SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
|
||||
wintypes.BOOL)
|
||||
SetConsoleCtrlHandler.restype = wintypes.BOOL
|
||||
|
||||
# Calling this with NULL and FALSE causes the calling process to
|
||||
# handle Ctrl+C, rather than ignore it. This property is inherited
|
||||
# by subprocesses.
|
||||
SetConsoleCtrlHandler(NULL, 0)
|
||||
|
||||
self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
|
||||
|
||||
@support.requires_subprocess()
|
||||
def test_CTRL_BREAK_EVENT(self):
|
||||
self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
|
||||
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
class Win32ListdirTests(unittest.TestCase):
|
||||
"""Test listdir on Windows."""
|
||||
|
||||
def setUp(self):
|
||||
self.created_paths = []
|
||||
for i in range(2):
|
||||
dir_name = 'SUB%d' % i
|
||||
dir_path = os.path.join(os_helper.TESTFN, dir_name)
|
||||
file_name = 'FILE%d' % i
|
||||
file_path = os.path.join(os_helper.TESTFN, file_name)
|
||||
os.makedirs(dir_path)
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
|
||||
self.created_paths.extend([dir_name, file_name])
|
||||
self.created_paths.sort()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(os_helper.TESTFN)
|
||||
|
||||
def test_listdir_no_extended_path(self):
|
||||
"""Test when the path is not an "extended" path."""
|
||||
# unicode
|
||||
self.assertEqual(
|
||||
sorted(os.listdir(os_helper.TESTFN)),
|
||||
self.created_paths)
|
||||
|
||||
# bytes
|
||||
self.assertEqual(
|
||||
sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
|
||||
[os.fsencode(path) for path in self.created_paths])
|
||||
|
||||
def test_listdir_extended_path(self):
|
||||
"""Test when the path starts with '\\\\?\\'."""
|
||||
# See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
|
||||
# unicode
|
||||
path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
|
||||
self.assertEqual(
|
||||
sorted(os.listdir(path)),
|
||||
self.created_paths)
|
||||
|
||||
# bytes
|
||||
path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
|
||||
self.assertEqual(
|
||||
sorted(os.listdir(path)),
|
||||
[os.fsencode(path) for path in self.created_paths])
|
||||
|
||||
|
||||
@unittest.skipUnless(os.name == "nt", "NT specific tests")
|
||||
class Win32ListdriveTests(unittest.TestCase):
|
||||
"""Test listdrive, listmounts and listvolume on Windows."""
|
||||
|
||||
def setUp(self):
|
||||
# Get drives and volumes from fsutil
|
||||
out = subprocess.check_output(
|
||||
["fsutil.exe", "volume", "list"],
|
||||
cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"),
|
||||
encoding="mbcs",
|
||||
errors="ignore",
|
||||
)
|
||||
lines = out.splitlines()
|
||||
self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')}
|
||||
self.known_drives = {l for l in lines if l[1:] == ':\\'}
|
||||
self.known_mounts = {l for l in lines if l[1:3] == ':\\'}
|
||||
|
||||
def test_listdrives(self):
|
||||
drives = os.listdrives()
|
||||
self.assertIsInstance(drives, list)
|
||||
self.assertSetEqual(
|
||||
self.known_drives,
|
||||
self.known_drives & set(drives),
|
||||
)
|
||||
|
||||
def test_listvolumes(self):
|
||||
volumes = os.listvolumes()
|
||||
self.assertIsInstance(volumes, list)
|
||||
self.assertSetEqual(
|
||||
self.known_volumes,
|
||||
self.known_volumes & set(volumes),
|
||||
)
|
||||
|
||||
def test_listmounts(self):
|
||||
for volume in os.listvolumes():
|
||||
try:
|
||||
mounts = os.listmounts(volume)
|
||||
except OSError as ex:
|
||||
if support.verbose:
|
||||
print("Skipping", volume, "because of", ex)
|
||||
else:
|
||||
self.assertIsInstance(mounts, list)
|
||||
self.assertSetEqual(
|
||||
set(mounts),
|
||||
self.known_mounts & set(mounts),
|
||||
)
|
||||
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'readlink'), 'needs os.readlink()')
|
||||
class ReadlinkTests(unittest.TestCase):
|
||||
|
|
@ -3116,370 +2852,6 @@ def test_bytes(self):
|
|||
self.assertIsInstance(path, bytes)
|
||||
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
@os_helper.skip_unless_symlink
|
||||
class Win32SymlinkTests(unittest.TestCase):
|
||||
filelink = 'filelinktest'
|
||||
filelink_target = os.path.abspath(__file__)
|
||||
dirlink = 'dirlinktest'
|
||||
dirlink_target = os.path.dirname(filelink_target)
|
||||
missing_link = 'missing link'
|
||||
|
||||
def setUp(self):
|
||||
assert os.path.exists(self.dirlink_target)
|
||||
assert os.path.exists(self.filelink_target)
|
||||
assert not os.path.exists(self.dirlink)
|
||||
assert not os.path.exists(self.filelink)
|
||||
assert not os.path.exists(self.missing_link)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.filelink):
|
||||
os.remove(self.filelink)
|
||||
if os.path.exists(self.dirlink):
|
||||
os.rmdir(self.dirlink)
|
||||
if os.path.lexists(self.missing_link):
|
||||
os.remove(self.missing_link)
|
||||
|
||||
def test_directory_link(self):
|
||||
os.symlink(self.dirlink_target, self.dirlink)
|
||||
self.assertTrue(os.path.exists(self.dirlink))
|
||||
self.assertTrue(os.path.isdir(self.dirlink))
|
||||
self.assertTrue(os.path.islink(self.dirlink))
|
||||
self.check_stat(self.dirlink, self.dirlink_target)
|
||||
|
||||
def test_file_link(self):
|
||||
os.symlink(self.filelink_target, self.filelink)
|
||||
self.assertTrue(os.path.exists(self.filelink))
|
||||
self.assertTrue(os.path.isfile(self.filelink))
|
||||
self.assertTrue(os.path.islink(self.filelink))
|
||||
self.check_stat(self.filelink, self.filelink_target)
|
||||
|
||||
def _create_missing_dir_link(self):
|
||||
'Create a "directory" link to a non-existent target'
|
||||
linkname = self.missing_link
|
||||
if os.path.lexists(linkname):
|
||||
os.remove(linkname)
|
||||
target = r'c:\\target does not exist.29r3c740'
|
||||
assert not os.path.exists(target)
|
||||
target_is_dir = True
|
||||
os.symlink(target, linkname, target_is_dir)
|
||||
|
||||
def test_remove_directory_link_to_missing_target(self):
|
||||
self._create_missing_dir_link()
|
||||
# For compatibility with Unix, os.remove will check the
|
||||
# directory status and call RemoveDirectory if the symlink
|
||||
# was created with target_is_dir==True.
|
||||
os.remove(self.missing_link)
|
||||
|
||||
def test_isdir_on_directory_link_to_missing_target(self):
|
||||
self._create_missing_dir_link()
|
||||
self.assertFalse(os.path.isdir(self.missing_link))
|
||||
|
||||
def test_rmdir_on_directory_link_to_missing_target(self):
|
||||
self._create_missing_dir_link()
|
||||
os.rmdir(self.missing_link)
|
||||
|
||||
def check_stat(self, link, target):
|
||||
self.assertEqual(os.stat(link), os.stat(target))
|
||||
self.assertNotEqual(os.lstat(link), os.stat(link))
|
||||
|
||||
bytes_link = os.fsencode(link)
|
||||
self.assertEqual(os.stat(bytes_link), os.stat(target))
|
||||
self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
|
||||
|
||||
def test_12084(self):
|
||||
level1 = os.path.abspath(os_helper.TESTFN)
|
||||
level2 = os.path.join(level1, "level2")
|
||||
level3 = os.path.join(level2, "level3")
|
||||
self.addCleanup(os_helper.rmtree, level1)
|
||||
|
||||
os.mkdir(level1)
|
||||
os.mkdir(level2)
|
||||
os.mkdir(level3)
|
||||
|
||||
file1 = os.path.abspath(os.path.join(level1, "file1"))
|
||||
create_file(file1)
|
||||
|
||||
orig_dir = os.getcwd()
|
||||
try:
|
||||
os.chdir(level2)
|
||||
link = os.path.join(level2, "link")
|
||||
os.symlink(os.path.relpath(file1), "link")
|
||||
self.assertIn("link", os.listdir(os.getcwd()))
|
||||
|
||||
# Check os.stat calls from the same dir as the link
|
||||
self.assertEqual(os.stat(file1), os.stat("link"))
|
||||
|
||||
# Check os.stat calls from a dir below the link
|
||||
os.chdir(level1)
|
||||
self.assertEqual(os.stat(file1),
|
||||
os.stat(os.path.relpath(link)))
|
||||
|
||||
# Check os.stat calls from a dir above the link
|
||||
os.chdir(level3)
|
||||
self.assertEqual(os.stat(file1),
|
||||
os.stat(os.path.relpath(link)))
|
||||
finally:
|
||||
os.chdir(orig_dir)
|
||||
|
||||
@unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
|
||||
and os.path.exists(r'C:\ProgramData'),
|
||||
'Test directories not found')
|
||||
def test_29248(self):
|
||||
# os.symlink() calls CreateSymbolicLink, which creates
|
||||
# the reparse data buffer with the print name stored
|
||||
# first, so the offset is always 0. CreateSymbolicLink
|
||||
# stores the "PrintName" DOS path (e.g. "C:\") first,
|
||||
# with an offset of 0, followed by the "SubstituteName"
|
||||
# NT path (e.g. "\??\C:\"). The "All Users" link, on
|
||||
# the other hand, seems to have been created manually
|
||||
# with an inverted order.
|
||||
target = os.readlink(r'C:\Users\All Users')
|
||||
self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
|
||||
|
||||
def test_buffer_overflow(self):
|
||||
# Older versions would have a buffer overflow when detecting
|
||||
# whether a link source was a directory. This test ensures we
|
||||
# no longer crash, but does not otherwise validate the behavior
|
||||
segment = 'X' * 27
|
||||
path = os.path.join(*[segment] * 10)
|
||||
test_cases = [
|
||||
# overflow with absolute src
|
||||
('\\' + path, segment),
|
||||
# overflow dest with relative src
|
||||
(segment, path),
|
||||
# overflow when joining src
|
||||
(path[:180], path[:180]),
|
||||
]
|
||||
for src, dest in test_cases:
|
||||
try:
|
||||
os.symlink(src, dest)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
os.remove(dest)
|
||||
except OSError:
|
||||
pass
|
||||
# Also test with bytes, since that is a separate code path.
|
||||
try:
|
||||
os.symlink(os.fsencode(src), os.fsencode(dest))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
os.remove(dest)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def test_appexeclink(self):
|
||||
root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
|
||||
if not os.path.isdir(root):
|
||||
self.skipTest("test requires a WindowsApps directory")
|
||||
|
||||
aliases = [os.path.join(root, a)
|
||||
for a in fnmatch.filter(os.listdir(root), '*.exe')]
|
||||
|
||||
for alias in aliases:
|
||||
if support.verbose:
|
||||
print()
|
||||
print("Testing with", alias)
|
||||
st = os.lstat(alias)
|
||||
self.assertEqual(st, os.stat(alias))
|
||||
self.assertFalse(stat.S_ISLNK(st.st_mode))
|
||||
self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
|
||||
self.assertTrue(os.path.isfile(alias))
|
||||
# testing the first one we see is sufficient
|
||||
break
|
||||
else:
|
||||
self.skipTest("test requires an app execution alias")
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
class Win32JunctionTests(unittest.TestCase):
|
||||
junction = 'junctiontest'
|
||||
junction_target = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
def setUp(self):
|
||||
assert os.path.exists(self.junction_target)
|
||||
assert not os.path.lexists(self.junction)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.lexists(self.junction):
|
||||
os.unlink(self.junction)
|
||||
|
||||
def test_create_junction(self):
|
||||
_winapi.CreateJunction(self.junction_target, self.junction)
|
||||
self.assertTrue(os.path.lexists(self.junction))
|
||||
self.assertTrue(os.path.exists(self.junction))
|
||||
self.assertTrue(os.path.isdir(self.junction))
|
||||
self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
|
||||
self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
|
||||
|
||||
# bpo-37834: Junctions are not recognized as links.
|
||||
self.assertFalse(os.path.islink(self.junction))
|
||||
self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
|
||||
os.path.normcase(os.readlink(self.junction)))
|
||||
|
||||
def test_unlink_removes_junction(self):
|
||||
_winapi.CreateJunction(self.junction_target, self.junction)
|
||||
self.assertTrue(os.path.exists(self.junction))
|
||||
self.assertTrue(os.path.lexists(self.junction))
|
||||
|
||||
os.unlink(self.junction)
|
||||
self.assertFalse(os.path.exists(self.junction))
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
|
||||
class Win32NtTests(unittest.TestCase):
|
||||
def test_getfinalpathname_handles(self):
|
||||
nt = import_helper.import_module('nt')
|
||||
ctypes = import_helper.import_module('ctypes')
|
||||
# Ruff false positive -- it thinks we're redefining `ctypes` here
|
||||
import ctypes.wintypes # noqa: F811
|
||||
|
||||
kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
|
||||
kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
|
||||
|
||||
kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
|
||||
kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
|
||||
ctypes.wintypes.LPDWORD)
|
||||
|
||||
# This is a pseudo-handle that doesn't need to be closed
|
||||
hproc = kernel.GetCurrentProcess()
|
||||
|
||||
handle_count = ctypes.wintypes.DWORD()
|
||||
ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
|
||||
self.assertEqual(1, ok)
|
||||
|
||||
before_count = handle_count.value
|
||||
|
||||
# The first two test the error path, __file__ tests the success path
|
||||
filenames = [
|
||||
r'\\?\C:',
|
||||
r'\\?\NUL',
|
||||
r'\\?\CONIN',
|
||||
__file__,
|
||||
]
|
||||
|
||||
for _ in range(10):
|
||||
for name in filenames:
|
||||
try:
|
||||
nt._getfinalpathname(name)
|
||||
except Exception:
|
||||
# Failure is expected
|
||||
pass
|
||||
try:
|
||||
os.stat(name)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
|
||||
self.assertEqual(1, ok)
|
||||
|
||||
handle_delta = handle_count.value - before_count
|
||||
|
||||
self.assertEqual(0, handle_delta)
|
||||
|
||||
@support.requires_subprocess()
|
||||
def test_stat_unlink_race(self):
|
||||
# bpo-46785: the implementation of os.stat() falls back to reading
|
||||
# the parent directory if CreateFileW() fails with a permission
|
||||
# error. If reading the parent directory fails because the file or
|
||||
# directory are subsequently unlinked, or because the volume or
|
||||
# share are no longer available, then the original permission error
|
||||
# should not be restored.
|
||||
filename = os_helper.TESTFN
|
||||
self.addCleanup(os_helper.unlink, filename)
|
||||
deadline = time.time() + 5
|
||||
command = textwrap.dedent("""\
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
filename = sys.argv[1]
|
||||
deadline = float(sys.argv[2])
|
||||
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
with open(filename, "w") as f:
|
||||
pass
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
os.remove(filename)
|
||||
except OSError:
|
||||
pass
|
||||
""")
|
||||
|
||||
with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc:
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
os.stat(filename)
|
||||
except FileNotFoundError as e:
|
||||
assert e.winerror == 2 # ERROR_FILE_NOT_FOUND
|
||||
try:
|
||||
proc.wait(1)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.terminate()
|
||||
|
||||
@support.requires_subprocess()
|
||||
def test_stat_inaccessible_file(self):
|
||||
filename = os_helper.TESTFN
|
||||
ICACLS = os.path.expandvars(r"%SystemRoot%\System32\icacls.exe")
|
||||
|
||||
with open(filename, "wb") as f:
|
||||
f.write(b'Test data')
|
||||
|
||||
stat1 = os.stat(filename)
|
||||
|
||||
try:
|
||||
# Remove all permissions from the file
|
||||
subprocess.check_output([ICACLS, filename, "/inheritance:r"],
|
||||
stderr=subprocess.STDOUT)
|
||||
except subprocess.CalledProcessError as ex:
|
||||
if support.verbose:
|
||||
print(ICACLS, filename, "/inheritance:r", "failed.")
|
||||
print(ex.stdout.decode("oem", "replace").rstrip())
|
||||
try:
|
||||
os.unlink(filename)
|
||||
except OSError:
|
||||
pass
|
||||
self.skipTest("Unable to create inaccessible file")
|
||||
|
||||
def cleanup():
|
||||
# Give delete permission to the owner (us)
|
||||
subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"],
|
||||
stderr=subprocess.STDOUT)
|
||||
os.unlink(filename)
|
||||
|
||||
self.addCleanup(cleanup)
|
||||
|
||||
if support.verbose:
|
||||
print("File:", filename)
|
||||
print("stat with access:", stat1)
|
||||
|
||||
# First test - we shouldn't raise here, because we still have access to
|
||||
# the directory and can extract enough information from its metadata.
|
||||
stat2 = os.stat(filename)
|
||||
|
||||
if support.verbose:
|
||||
print(" without access:", stat2)
|
||||
|
||||
# We may not get st_dev/st_ino, so ensure those are 0 or match
|
||||
self.assertIn(stat2.st_dev, (0, stat1.st_dev))
|
||||
self.assertIn(stat2.st_ino, (0, stat1.st_ino))
|
||||
|
||||
# st_mode and st_size should match (for a normal file, at least)
|
||||
self.assertEqual(stat1.st_mode, stat2.st_mode)
|
||||
self.assertEqual(stat1.st_size, stat2.st_size)
|
||||
|
||||
# st_ctime and st_mtime should be the same
|
||||
self.assertEqual(stat1.st_ctime, stat2.st_ctime)
|
||||
self.assertEqual(stat1.st_mtime, stat2.st_mtime)
|
||||
|
||||
# st_atime should be the same or later
|
||||
self.assertGreaterEqual(stat1.st_atime, stat2.st_atime)
|
||||
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
class NonLocalSymlinkTests(unittest.TestCase):
|
||||
|
||||
640
Lib/test/test_os/test_windows.py
Normal file
640
Lib/test/test_os/test_windows.py
Normal file
|
|
@ -0,0 +1,640 @@
|
|||
import sys
|
||||
import unittest
|
||||
|
||||
if sys.platform != "win32":
|
||||
raise unittest.SkipTest("Win32 specific tests")
|
||||
|
||||
import _winapi
|
||||
import fnmatch
|
||||
import mmap
|
||||
import os
|
||||
import shutil
|
||||
import signal
|
||||
import stat
|
||||
import subprocess
|
||||
import textwrap
|
||||
import time
|
||||
import uuid
|
||||
from test import support
|
||||
from test.support import import_helper
|
||||
from test.support import os_helper
|
||||
from .utils import create_file
|
||||
|
||||
|
||||
class Win32ErrorTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
try:
|
||||
os.stat(os_helper.TESTFN)
|
||||
except FileNotFoundError:
|
||||
exists = False
|
||||
except OSError as exc:
|
||||
exists = True
|
||||
self.fail("file %s must not exist; os.stat failed with %s"
|
||||
% (os_helper.TESTFN, exc))
|
||||
else:
|
||||
self.fail("file %s must not exist" % os_helper.TESTFN)
|
||||
|
||||
def test_rename(self):
|
||||
self.assertRaises(OSError, os.rename, os_helper.TESTFN, os_helper.TESTFN+".bak")
|
||||
|
||||
def test_remove(self):
|
||||
self.assertRaises(OSError, os.remove, os_helper.TESTFN)
|
||||
|
||||
def test_chdir(self):
|
||||
self.assertRaises(OSError, os.chdir, os_helper.TESTFN)
|
||||
|
||||
def test_mkdir(self):
|
||||
self.addCleanup(os_helper.unlink, os_helper.TESTFN)
|
||||
|
||||
with open(os_helper.TESTFN, "x") as f:
|
||||
self.assertRaises(OSError, os.mkdir, os_helper.TESTFN)
|
||||
|
||||
def test_utime(self):
|
||||
self.assertRaises(OSError, os.utime, os_helper.TESTFN, None)
|
||||
|
||||
def test_chmod(self):
|
||||
self.assertRaises(OSError, os.chmod, os_helper.TESTFN, 0)
|
||||
|
||||
|
||||
class Win32KillTests(unittest.TestCase):
|
||||
def _kill(self, sig):
|
||||
# Start sys.executable as a subprocess and communicate from the
|
||||
# subprocess to the parent that the interpreter is ready. When it
|
||||
# becomes ready, send *sig* via os.kill to the subprocess and check
|
||||
# that the return code is equal to *sig*.
|
||||
import ctypes
|
||||
from ctypes import wintypes
|
||||
import msvcrt
|
||||
|
||||
# Since we can't access the contents of the process' stdout until the
|
||||
# process has exited, use PeekNamedPipe to see what's inside stdout
|
||||
# without waiting. This is done so we can tell that the interpreter
|
||||
# is started and running at a point where it could handle a signal.
|
||||
PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
|
||||
PeekNamedPipe.restype = wintypes.BOOL
|
||||
PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
|
||||
ctypes.POINTER(ctypes.c_char), # stdout buf
|
||||
wintypes.DWORD, # Buffer size
|
||||
ctypes.POINTER(wintypes.DWORD), # bytes read
|
||||
ctypes.POINTER(wintypes.DWORD), # bytes avail
|
||||
ctypes.POINTER(wintypes.DWORD)) # bytes left
|
||||
msg = "running"
|
||||
proc = subprocess.Popen([sys.executable, "-c",
|
||||
"import sys;"
|
||||
"sys.stdout.write('{}');"
|
||||
"sys.stdout.flush();"
|
||||
"input()".format(msg)],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
stdin=subprocess.PIPE)
|
||||
self.addCleanup(proc.stdout.close)
|
||||
self.addCleanup(proc.stderr.close)
|
||||
self.addCleanup(proc.stdin.close)
|
||||
|
||||
count, max = 0, 100
|
||||
while count < max and proc.poll() is None:
|
||||
# Create a string buffer to store the result of stdout from the pipe
|
||||
buf = ctypes.create_string_buffer(len(msg))
|
||||
# Obtain the text currently in proc.stdout
|
||||
# Bytes read/avail/left are left as NULL and unused
|
||||
rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
|
||||
buf, ctypes.sizeof(buf), None, None, None)
|
||||
self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
|
||||
if buf.value:
|
||||
self.assertEqual(msg, buf.value.decode())
|
||||
break
|
||||
time.sleep(0.1)
|
||||
count += 1
|
||||
else:
|
||||
self.fail("Did not receive communication from the subprocess")
|
||||
|
||||
os.kill(proc.pid, sig)
|
||||
self.assertEqual(proc.wait(), sig)
|
||||
|
||||
def test_kill_sigterm(self):
|
||||
# SIGTERM doesn't mean anything special, but make sure it works
|
||||
self._kill(signal.SIGTERM)
|
||||
|
||||
def test_kill_int(self):
|
||||
# os.kill on Windows can take an int which gets set as the exit code
|
||||
self._kill(100)
|
||||
|
||||
@unittest.skipIf(mmap is None, "requires mmap")
|
||||
def _kill_with_event(self, event, name):
|
||||
tagname = "test_os_%s" % uuid.uuid1()
|
||||
m = mmap.mmap(-1, 1, tagname)
|
||||
m[0] = 0
|
||||
|
||||
# Run a script which has console control handling enabled.
|
||||
script = os.path.join(os.path.dirname(__file__),
|
||||
"win_console_handler.py")
|
||||
cmd = [sys.executable, script, tagname]
|
||||
proc = subprocess.Popen(cmd,
|
||||
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
|
||||
|
||||
with proc:
|
||||
# Let the interpreter startup before we send signals. See #3137.
|
||||
for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
|
||||
if proc.poll() is None:
|
||||
break
|
||||
else:
|
||||
# Forcefully kill the process if we weren't able to signal it.
|
||||
proc.kill()
|
||||
self.fail("Subprocess didn't finish initialization")
|
||||
|
||||
os.kill(proc.pid, event)
|
||||
|
||||
try:
|
||||
# proc.send_signal(event) could also be done here.
|
||||
# Allow time for the signal to be passed and the process to exit.
|
||||
proc.wait(timeout=support.SHORT_TIMEOUT)
|
||||
except subprocess.TimeoutExpired:
|
||||
# Forcefully kill the process if we weren't able to signal it.
|
||||
proc.kill()
|
||||
self.fail("subprocess did not stop on {}".format(name))
|
||||
|
||||
@unittest.skip("subprocesses aren't inheriting Ctrl+C property")
|
||||
@support.requires_subprocess()
|
||||
def test_CTRL_C_EVENT(self):
|
||||
from ctypes import wintypes
|
||||
import ctypes
|
||||
|
||||
# Make a NULL value by creating a pointer with no argument.
|
||||
NULL = ctypes.POINTER(ctypes.c_int)()
|
||||
SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
|
||||
SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
|
||||
wintypes.BOOL)
|
||||
SetConsoleCtrlHandler.restype = wintypes.BOOL
|
||||
|
||||
# Calling this with NULL and FALSE causes the calling process to
|
||||
# handle Ctrl+C, rather than ignore it. This property is inherited
|
||||
# by subprocesses.
|
||||
SetConsoleCtrlHandler(NULL, 0)
|
||||
|
||||
self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
|
||||
|
||||
@support.requires_subprocess()
|
||||
def test_CTRL_BREAK_EVENT(self):
|
||||
self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
|
||||
|
||||
|
||||
class Win32ListdirTests(unittest.TestCase):
|
||||
"""Test listdir on Windows."""
|
||||
|
||||
def setUp(self):
|
||||
self.created_paths = []
|
||||
for i in range(2):
|
||||
dir_name = 'SUB%d' % i
|
||||
dir_path = os.path.join(os_helper.TESTFN, dir_name)
|
||||
file_name = 'FILE%d' % i
|
||||
file_path = os.path.join(os_helper.TESTFN, file_name)
|
||||
os.makedirs(dir_path)
|
||||
with open(file_path, 'w', encoding='utf-8') as f:
|
||||
f.write("I'm %s and proud of it. Blame test_os.\n" % file_path)
|
||||
self.created_paths.extend([dir_name, file_name])
|
||||
self.created_paths.sort()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(os_helper.TESTFN)
|
||||
|
||||
def test_listdir_no_extended_path(self):
|
||||
"""Test when the path is not an "extended" path."""
|
||||
# unicode
|
||||
self.assertEqual(
|
||||
sorted(os.listdir(os_helper.TESTFN)),
|
||||
self.created_paths)
|
||||
|
||||
# bytes
|
||||
self.assertEqual(
|
||||
sorted(os.listdir(os.fsencode(os_helper.TESTFN))),
|
||||
[os.fsencode(path) for path in self.created_paths])
|
||||
|
||||
def test_listdir_extended_path(self):
|
||||
"""Test when the path starts with '\\\\?\\'."""
|
||||
# See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath
|
||||
# unicode
|
||||
path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN)
|
||||
self.assertEqual(
|
||||
sorted(os.listdir(path)),
|
||||
self.created_paths)
|
||||
|
||||
# bytes
|
||||
path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN))
|
||||
self.assertEqual(
|
||||
sorted(os.listdir(path)),
|
||||
[os.fsencode(path) for path in self.created_paths])
|
||||
|
||||
|
||||
@unittest.skipUnless(os.name == "nt", "NT specific tests")
|
||||
class Win32ListdriveTests(unittest.TestCase):
|
||||
"""Test listdrive, listmounts and listvolume on Windows."""
|
||||
|
||||
def setUp(self):
|
||||
# Get drives and volumes from fsutil
|
||||
out = subprocess.check_output(
|
||||
["fsutil.exe", "volume", "list"],
|
||||
cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"),
|
||||
encoding="mbcs",
|
||||
errors="ignore",
|
||||
)
|
||||
lines = out.splitlines()
|
||||
self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')}
|
||||
self.known_drives = {l for l in lines if l[1:] == ':\\'}
|
||||
self.known_mounts = {l for l in lines if l[1:3] == ':\\'}
|
||||
|
||||
def test_listdrives(self):
|
||||
drives = os.listdrives()
|
||||
self.assertIsInstance(drives, list)
|
||||
self.assertSetEqual(
|
||||
self.known_drives,
|
||||
self.known_drives & set(drives),
|
||||
)
|
||||
|
||||
def test_listvolumes(self):
|
||||
volumes = os.listvolumes()
|
||||
self.assertIsInstance(volumes, list)
|
||||
self.assertSetEqual(
|
||||
self.known_volumes,
|
||||
self.known_volumes & set(volumes),
|
||||
)
|
||||
|
||||
def test_listmounts(self):
|
||||
for volume in os.listvolumes():
|
||||
try:
|
||||
mounts = os.listmounts(volume)
|
||||
except OSError as ex:
|
||||
if support.verbose:
|
||||
print("Skipping", volume, "because of", ex)
|
||||
else:
|
||||
self.assertIsInstance(mounts, list)
|
||||
self.assertSetEqual(
|
||||
set(mounts),
|
||||
self.known_mounts & set(mounts),
|
||||
)
|
||||
|
||||
|
||||
@os_helper.skip_unless_symlink
|
||||
class Win32SymlinkTests(unittest.TestCase):
|
||||
filelink = 'filelinktest'
|
||||
filelink_target = os.path.abspath(__file__)
|
||||
dirlink = 'dirlinktest'
|
||||
dirlink_target = os.path.dirname(filelink_target)
|
||||
missing_link = 'missing link'
|
||||
|
||||
def setUp(self):
|
||||
assert os.path.exists(self.dirlink_target)
|
||||
assert os.path.exists(self.filelink_target)
|
||||
assert not os.path.exists(self.dirlink)
|
||||
assert not os.path.exists(self.filelink)
|
||||
assert not os.path.exists(self.missing_link)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.exists(self.filelink):
|
||||
os.remove(self.filelink)
|
||||
if os.path.exists(self.dirlink):
|
||||
os.rmdir(self.dirlink)
|
||||
if os.path.lexists(self.missing_link):
|
||||
os.remove(self.missing_link)
|
||||
|
||||
def test_directory_link(self):
|
||||
os.symlink(self.dirlink_target, self.dirlink)
|
||||
self.assertTrue(os.path.exists(self.dirlink))
|
||||
self.assertTrue(os.path.isdir(self.dirlink))
|
||||
self.assertTrue(os.path.islink(self.dirlink))
|
||||
self.check_stat(self.dirlink, self.dirlink_target)
|
||||
|
||||
def test_file_link(self):
|
||||
os.symlink(self.filelink_target, self.filelink)
|
||||
self.assertTrue(os.path.exists(self.filelink))
|
||||
self.assertTrue(os.path.isfile(self.filelink))
|
||||
self.assertTrue(os.path.islink(self.filelink))
|
||||
self.check_stat(self.filelink, self.filelink_target)
|
||||
|
||||
def _create_missing_dir_link(self):
|
||||
'Create a "directory" link to a non-existent target'
|
||||
linkname = self.missing_link
|
||||
if os.path.lexists(linkname):
|
||||
os.remove(linkname)
|
||||
target = r'c:\\target does not exist.29r3c740'
|
||||
assert not os.path.exists(target)
|
||||
target_is_dir = True
|
||||
os.symlink(target, linkname, target_is_dir)
|
||||
|
||||
def test_remove_directory_link_to_missing_target(self):
|
||||
self._create_missing_dir_link()
|
||||
# For compatibility with Unix, os.remove will check the
|
||||
# directory status and call RemoveDirectory if the symlink
|
||||
# was created with target_is_dir==True.
|
||||
os.remove(self.missing_link)
|
||||
|
||||
def test_isdir_on_directory_link_to_missing_target(self):
|
||||
self._create_missing_dir_link()
|
||||
self.assertFalse(os.path.isdir(self.missing_link))
|
||||
|
||||
def test_rmdir_on_directory_link_to_missing_target(self):
|
||||
self._create_missing_dir_link()
|
||||
os.rmdir(self.missing_link)
|
||||
|
||||
def check_stat(self, link, target):
|
||||
self.assertEqual(os.stat(link), os.stat(target))
|
||||
self.assertNotEqual(os.lstat(link), os.stat(link))
|
||||
|
||||
bytes_link = os.fsencode(link)
|
||||
self.assertEqual(os.stat(bytes_link), os.stat(target))
|
||||
self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link))
|
||||
|
||||
def test_12084(self):
|
||||
level1 = os.path.abspath(os_helper.TESTFN)
|
||||
level2 = os.path.join(level1, "level2")
|
||||
level3 = os.path.join(level2, "level3")
|
||||
self.addCleanup(os_helper.rmtree, level1)
|
||||
|
||||
os.mkdir(level1)
|
||||
os.mkdir(level2)
|
||||
os.mkdir(level3)
|
||||
|
||||
file1 = os.path.abspath(os.path.join(level1, "file1"))
|
||||
create_file(file1)
|
||||
|
||||
orig_dir = os.getcwd()
|
||||
try:
|
||||
os.chdir(level2)
|
||||
link = os.path.join(level2, "link")
|
||||
os.symlink(os.path.relpath(file1), "link")
|
||||
self.assertIn("link", os.listdir(os.getcwd()))
|
||||
|
||||
# Check os.stat calls from the same dir as the link
|
||||
self.assertEqual(os.stat(file1), os.stat("link"))
|
||||
|
||||
# Check os.stat calls from a dir below the link
|
||||
os.chdir(level1)
|
||||
self.assertEqual(os.stat(file1),
|
||||
os.stat(os.path.relpath(link)))
|
||||
|
||||
# Check os.stat calls from a dir above the link
|
||||
os.chdir(level3)
|
||||
self.assertEqual(os.stat(file1),
|
||||
os.stat(os.path.relpath(link)))
|
||||
finally:
|
||||
os.chdir(orig_dir)
|
||||
|
||||
@unittest.skipUnless(os.path.lexists(r'C:\Users\All Users')
|
||||
and os.path.exists(r'C:\ProgramData'),
|
||||
'Test directories not found')
|
||||
def test_29248(self):
|
||||
# os.symlink() calls CreateSymbolicLink, which creates
|
||||
# the reparse data buffer with the print name stored
|
||||
# first, so the offset is always 0. CreateSymbolicLink
|
||||
# stores the "PrintName" DOS path (e.g. "C:\") first,
|
||||
# with an offset of 0, followed by the "SubstituteName"
|
||||
# NT path (e.g. "\??\C:\"). The "All Users" link, on
|
||||
# the other hand, seems to have been created manually
|
||||
# with an inverted order.
|
||||
target = os.readlink(r'C:\Users\All Users')
|
||||
self.assertTrue(os.path.samefile(target, r'C:\ProgramData'))
|
||||
|
||||
def test_buffer_overflow(self):
|
||||
# Older versions would have a buffer overflow when detecting
|
||||
# whether a link source was a directory. This test ensures we
|
||||
# no longer crash, but does not otherwise validate the behavior
|
||||
segment = 'X' * 27
|
||||
path = os.path.join(*[segment] * 10)
|
||||
test_cases = [
|
||||
# overflow with absolute src
|
||||
('\\' + path, segment),
|
||||
# overflow dest with relative src
|
||||
(segment, path),
|
||||
# overflow when joining src
|
||||
(path[:180], path[:180]),
|
||||
]
|
||||
for src, dest in test_cases:
|
||||
try:
|
||||
os.symlink(src, dest)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
os.remove(dest)
|
||||
except OSError:
|
||||
pass
|
||||
# Also test with bytes, since that is a separate code path.
|
||||
try:
|
||||
os.symlink(os.fsencode(src), os.fsencode(dest))
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
os.remove(dest)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def test_appexeclink(self):
|
||||
root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps')
|
||||
if not os.path.isdir(root):
|
||||
self.skipTest("test requires a WindowsApps directory")
|
||||
|
||||
aliases = [os.path.join(root, a)
|
||||
for a in fnmatch.filter(os.listdir(root), '*.exe')]
|
||||
|
||||
for alias in aliases:
|
||||
if support.verbose:
|
||||
print()
|
||||
print("Testing with", alias)
|
||||
st = os.lstat(alias)
|
||||
self.assertEqual(st, os.stat(alias))
|
||||
self.assertFalse(stat.S_ISLNK(st.st_mode))
|
||||
self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK)
|
||||
self.assertTrue(os.path.isfile(alias))
|
||||
# testing the first one we see is sufficient
|
||||
break
|
||||
else:
|
||||
self.skipTest("test requires an app execution alias")
|
||||
|
||||
|
||||
class Win32JunctionTests(unittest.TestCase):
|
||||
junction = 'junctiontest'
|
||||
junction_target = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
def setUp(self):
|
||||
assert os.path.exists(self.junction_target)
|
||||
assert not os.path.lexists(self.junction)
|
||||
|
||||
def tearDown(self):
|
||||
if os.path.lexists(self.junction):
|
||||
os.unlink(self.junction)
|
||||
|
||||
def test_create_junction(self):
|
||||
_winapi.CreateJunction(self.junction_target, self.junction)
|
||||
self.assertTrue(os.path.lexists(self.junction))
|
||||
self.assertTrue(os.path.exists(self.junction))
|
||||
self.assertTrue(os.path.isdir(self.junction))
|
||||
self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction))
|
||||
self.assertEqual(os.stat(self.junction), os.stat(self.junction_target))
|
||||
|
||||
# bpo-37834: Junctions are not recognized as links.
|
||||
self.assertFalse(os.path.islink(self.junction))
|
||||
self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target),
|
||||
os.path.normcase(os.readlink(self.junction)))
|
||||
|
||||
def test_unlink_removes_junction(self):
|
||||
_winapi.CreateJunction(self.junction_target, self.junction)
|
||||
self.assertTrue(os.path.exists(self.junction))
|
||||
self.assertTrue(os.path.lexists(self.junction))
|
||||
|
||||
os.unlink(self.junction)
|
||||
self.assertFalse(os.path.exists(self.junction))
|
||||
|
||||
|
||||
class Win32NtTests(unittest.TestCase):
|
||||
def test_getfinalpathname_handles(self):
|
||||
nt = import_helper.import_module('nt')
|
||||
ctypes = import_helper.import_module('ctypes')
|
||||
# Ruff false positive -- it thinks we're redefining `ctypes` here
|
||||
import ctypes.wintypes # noqa: F811
|
||||
|
||||
kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True)
|
||||
kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE
|
||||
|
||||
kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL
|
||||
kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE,
|
||||
ctypes.wintypes.LPDWORD)
|
||||
|
||||
# This is a pseudo-handle that doesn't need to be closed
|
||||
hproc = kernel.GetCurrentProcess()
|
||||
|
||||
handle_count = ctypes.wintypes.DWORD()
|
||||
ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
|
||||
self.assertEqual(1, ok)
|
||||
|
||||
before_count = handle_count.value
|
||||
|
||||
# The first two test the error path, __file__ tests the success path
|
||||
filenames = [
|
||||
r'\\?\C:',
|
||||
r'\\?\NUL',
|
||||
r'\\?\CONIN',
|
||||
__file__,
|
||||
]
|
||||
|
||||
for _ in range(10):
|
||||
for name in filenames:
|
||||
try:
|
||||
nt._getfinalpathname(name)
|
||||
except Exception:
|
||||
# Failure is expected
|
||||
pass
|
||||
try:
|
||||
os.stat(name)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count))
|
||||
self.assertEqual(1, ok)
|
||||
|
||||
handle_delta = handle_count.value - before_count
|
||||
|
||||
self.assertEqual(0, handle_delta)
|
||||
|
||||
@support.requires_subprocess()
|
||||
def test_stat_unlink_race(self):
|
||||
# bpo-46785: the implementation of os.stat() falls back to reading
|
||||
# the parent directory if CreateFileW() fails with a permission
|
||||
# error. If reading the parent directory fails because the file or
|
||||
# directory are subsequently unlinked, or because the volume or
|
||||
# share are no longer available, then the original permission error
|
||||
# should not be restored.
|
||||
filename = os_helper.TESTFN
|
||||
self.addCleanup(os_helper.unlink, filename)
|
||||
deadline = time.time() + 5
|
||||
command = textwrap.dedent("""\
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
filename = sys.argv[1]
|
||||
deadline = float(sys.argv[2])
|
||||
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
with open(filename, "w") as f:
|
||||
pass
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
os.remove(filename)
|
||||
except OSError:
|
||||
pass
|
||||
""")
|
||||
|
||||
with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc:
|
||||
while time.time() < deadline:
|
||||
try:
|
||||
os.stat(filename)
|
||||
except FileNotFoundError as e:
|
||||
assert e.winerror == 2 # ERROR_FILE_NOT_FOUND
|
||||
try:
|
||||
proc.wait(1)
|
||||
except subprocess.TimeoutExpired:
|
||||
proc.terminate()
|
||||
|
||||
@support.requires_subprocess()
|
||||
def test_stat_inaccessible_file(self):
|
||||
filename = os_helper.TESTFN
|
||||
ICACLS = os.path.expandvars(r"%SystemRoot%\System32\icacls.exe")
|
||||
|
||||
with open(filename, "wb") as f:
|
||||
f.write(b'Test data')
|
||||
|
||||
stat1 = os.stat(filename)
|
||||
|
||||
try:
|
||||
# Remove all permissions from the file
|
||||
subprocess.check_output([ICACLS, filename, "/inheritance:r"],
|
||||
stderr=subprocess.STDOUT)
|
||||
except subprocess.CalledProcessError as ex:
|
||||
if support.verbose:
|
||||
print(ICACLS, filename, "/inheritance:r", "failed.")
|
||||
print(ex.stdout.decode("oem", "replace").rstrip())
|
||||
try:
|
||||
os.unlink(filename)
|
||||
except OSError:
|
||||
pass
|
||||
self.skipTest("Unable to create inaccessible file")
|
||||
|
||||
def cleanup():
|
||||
# Give delete permission to the owner (us)
|
||||
subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"],
|
||||
stderr=subprocess.STDOUT)
|
||||
os.unlink(filename)
|
||||
|
||||
self.addCleanup(cleanup)
|
||||
|
||||
if support.verbose:
|
||||
print("File:", filename)
|
||||
print("stat with access:", stat1)
|
||||
|
||||
# First test - we shouldn't raise here, because we still have access to
|
||||
# the directory and can extract enough information from its metadata.
|
||||
stat2 = os.stat(filename)
|
||||
|
||||
if support.verbose:
|
||||
print(" without access:", stat2)
|
||||
|
||||
# We may not get st_dev/st_ino, so ensure those are 0 or match
|
||||
self.assertIn(stat2.st_dev, (0, stat1.st_dev))
|
||||
self.assertIn(stat2.st_ino, (0, stat1.st_ino))
|
||||
|
||||
# st_mode and st_size should match (for a normal file, at least)
|
||||
self.assertEqual(stat1.st_mode, stat2.st_mode)
|
||||
self.assertEqual(stat1.st_size, stat2.st_size)
|
||||
|
||||
# st_ctime and st_mtime should be the same
|
||||
self.assertEqual(stat1.st_ctime, stat2.st_ctime)
|
||||
self.assertEqual(stat1.st_mtime, stat2.st_mtime)
|
||||
|
||||
# st_atime should be the same or later
|
||||
self.assertGreaterEqual(stat1.st_atime, stat2.st_atime)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
3
Lib/test/test_os/utils.py
Normal file
3
Lib/test/test_os/utils.py
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
def create_file(filename, content=b'content'):
|
||||
with open(filename, "xb", 0) as fp:
|
||||
fp.write(content)
|
||||
|
|
@ -2682,6 +2682,7 @@ TESTSUBDIRS= idlelib/idle_test \
|
|||
test/test_multiprocessing_fork \
|
||||
test/test_multiprocessing_forkserver \
|
||||
test/test_multiprocessing_spawn \
|
||||
test/test_os \
|
||||
test/test_pathlib \
|
||||
test/test_pathlib/support \
|
||||
test/test_peg_generator \
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue