import sys import unittest if sys.platform != "win32": raise unittest.SkipTest("Win32 specific tests") import _winapi import fnmatch import mmap import os import shutil import signal import stat import subprocess import textwrap import time import uuid from test import support from test.support import import_helper from test.support import os_helper from .utils import create_file class Win32KillTests(unittest.TestCase): def _kill(self, sig): # Start sys.executable as a subprocess and communicate from the # subprocess to the parent that the interpreter is ready. When it # becomes ready, send *sig* via os.kill to the subprocess and check # that the return code is equal to *sig*. import ctypes from ctypes import wintypes import msvcrt # Since we can't access the contents of the process' stdout until the # process has exited, use PeekNamedPipe to see what's inside stdout # without waiting. This is done so we can tell that the interpreter # is started and running at a point where it could handle a signal. PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe PeekNamedPipe.restype = wintypes.BOOL PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle ctypes.POINTER(ctypes.c_char), # stdout buf wintypes.DWORD, # Buffer size ctypes.POINTER(wintypes.DWORD), # bytes read ctypes.POINTER(wintypes.DWORD), # bytes avail ctypes.POINTER(wintypes.DWORD)) # bytes left msg = "running" proc = subprocess.Popen([sys.executable, "-c", "import sys;" "sys.stdout.write('{}');" "sys.stdout.flush();" "input()".format(msg)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) self.addCleanup(proc.stdout.close) self.addCleanup(proc.stderr.close) self.addCleanup(proc.stdin.close) count, max = 0, 100 while count < max and proc.poll() is None: # Create a string buffer to store the result of stdout from the pipe buf = ctypes.create_string_buffer(len(msg)) # Obtain the text currently in proc.stdout # Bytes read/avail/left are left as NULL and unused rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()), buf, ctypes.sizeof(buf), None, None, None) self.assertNotEqual(rslt, 0, "PeekNamedPipe failed") if buf.value: self.assertEqual(msg, buf.value.decode()) break time.sleep(0.1) count += 1 else: self.fail("Did not receive communication from the subprocess") os.kill(proc.pid, sig) self.assertEqual(proc.wait(), sig) def test_kill_sigterm(self): # SIGTERM doesn't mean anything special, but make sure it works self._kill(signal.SIGTERM) def test_kill_int(self): # os.kill on Windows can take an int which gets set as the exit code self._kill(100) @unittest.skipIf(mmap is None, "requires mmap") def _kill_with_event(self, event, name): tagname = "test_os_%s" % uuid.uuid1() m = mmap.mmap(-1, 1, tagname) m[0] = 0 # Run a script which has console control handling enabled. script = os.path.join(os.path.dirname(__file__), "win_console_handler.py") cmd = [sys.executable, script, tagname] proc = subprocess.Popen(cmd, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) with proc: # Let the interpreter startup before we send signals. See #3137. for _ in support.sleeping_retry(support.SHORT_TIMEOUT): if proc.poll() is None: break else: # Forcefully kill the process if we weren't able to signal it. proc.kill() self.fail("Subprocess didn't finish initialization") os.kill(proc.pid, event) try: # proc.send_signal(event) could also be done here. # Allow time for the signal to be passed and the process to exit. proc.wait(timeout=support.SHORT_TIMEOUT) except subprocess.TimeoutExpired: # Forcefully kill the process if we weren't able to signal it. proc.kill() self.fail("subprocess did not stop on {}".format(name)) @unittest.skip("subprocesses aren't inheriting Ctrl+C property") @support.requires_subprocess() def test_CTRL_C_EVENT(self): from ctypes import wintypes import ctypes # Make a NULL value by creating a pointer with no argument. NULL = ctypes.POINTER(ctypes.c_int)() SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int), wintypes.BOOL) SetConsoleCtrlHandler.restype = wintypes.BOOL # Calling this with NULL and FALSE causes the calling process to # handle Ctrl+C, rather than ignore it. This property is inherited # by subprocesses. SetConsoleCtrlHandler(NULL, 0) self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT") @support.requires_subprocess() def test_CTRL_BREAK_EVENT(self): self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT") class Win32ListdirTests(unittest.TestCase): """Test listdir on Windows.""" def setUp(self): self.created_paths = [] for i in range(2): dir_name = 'SUB%d' % i dir_path = os.path.join(os_helper.TESTFN, dir_name) file_name = 'FILE%d' % i file_path = os.path.join(os_helper.TESTFN, file_name) os.makedirs(dir_path) with open(file_path, 'w', encoding='utf-8') as f: f.write("I'm %s and proud of it. Blame test_os.\n" % file_path) self.created_paths.extend([dir_name, file_name]) self.created_paths.sort() def tearDown(self): shutil.rmtree(os_helper.TESTFN) def test_listdir_no_extended_path(self): """Test when the path is not an "extended" path.""" # unicode self.assertEqual( sorted(os.listdir(os_helper.TESTFN)), self.created_paths) # bytes self.assertEqual( sorted(os.listdir(os.fsencode(os_helper.TESTFN))), [os.fsencode(path) for path in self.created_paths]) def test_listdir_extended_path(self): """Test when the path starts with '\\\\?\\'.""" # See: http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx#maxpath # unicode path = '\\\\?\\' + os.path.abspath(os_helper.TESTFN) self.assertEqual( sorted(os.listdir(path)), self.created_paths) # bytes path = b'\\\\?\\' + os.fsencode(os.path.abspath(os_helper.TESTFN)) self.assertEqual( sorted(os.listdir(path)), [os.fsencode(path) for path in self.created_paths]) @unittest.skipUnless(os.name == "nt", "NT specific tests") class Win32ListdriveTests(unittest.TestCase): """Test listdrive, listmounts and listvolume on Windows.""" def setUp(self): # Get drives and volumes from fsutil out = subprocess.check_output( ["fsutil.exe", "volume", "list"], cwd=os.path.join(os.getenv("SystemRoot", "\\Windows"), "System32"), encoding="mbcs", errors="ignore", ) lines = out.splitlines() self.known_volumes = {l for l in lines if l.startswith('\\\\?\\')} self.known_drives = {l for l in lines if l[1:] == ':\\'} self.known_mounts = {l for l in lines if l[1:3] == ':\\'} def test_listdrives(self): drives = os.listdrives() self.assertIsInstance(drives, list) self.assertSetEqual( self.known_drives, self.known_drives & set(drives), ) def test_listvolumes(self): volumes = os.listvolumes() self.assertIsInstance(volumes, list) self.assertSetEqual( self.known_volumes, self.known_volumes & set(volumes), ) def test_listmounts(self): for volume in os.listvolumes(): try: mounts = os.listmounts(volume) except OSError as ex: if support.verbose: print("Skipping", volume, "because of", ex) else: self.assertIsInstance(mounts, list) self.assertSetEqual( set(mounts), self.known_mounts & set(mounts), ) @os_helper.skip_unless_symlink class Win32SymlinkTests(unittest.TestCase): filelink = 'filelinktest' filelink_target = os.path.abspath(__file__) dirlink = 'dirlinktest' dirlink_target = os.path.dirname(filelink_target) missing_link = 'missing link' def setUp(self): assert os.path.exists(self.dirlink_target) assert os.path.exists(self.filelink_target) assert not os.path.exists(self.dirlink) assert not os.path.exists(self.filelink) assert not os.path.exists(self.missing_link) def tearDown(self): if os.path.exists(self.filelink): os.remove(self.filelink) if os.path.exists(self.dirlink): os.rmdir(self.dirlink) if os.path.lexists(self.missing_link): os.remove(self.missing_link) def test_directory_link(self): os.symlink(self.dirlink_target, self.dirlink) self.assertTrue(os.path.exists(self.dirlink)) self.assertTrue(os.path.isdir(self.dirlink)) self.assertTrue(os.path.islink(self.dirlink)) self.check_stat(self.dirlink, self.dirlink_target) def test_file_link(self): os.symlink(self.filelink_target, self.filelink) self.assertTrue(os.path.exists(self.filelink)) self.assertTrue(os.path.isfile(self.filelink)) self.assertTrue(os.path.islink(self.filelink)) self.check_stat(self.filelink, self.filelink_target) def _create_missing_dir_link(self): 'Create a "directory" link to a non-existent target' linkname = self.missing_link if os.path.lexists(linkname): os.remove(linkname) target = r'c:\\target does not exist.29r3c740' assert not os.path.exists(target) target_is_dir = True os.symlink(target, linkname, target_is_dir) def test_remove_directory_link_to_missing_target(self): self._create_missing_dir_link() # For compatibility with Unix, os.remove will check the # directory status and call RemoveDirectory if the symlink # was created with target_is_dir==True. os.remove(self.missing_link) def test_isdir_on_directory_link_to_missing_target(self): self._create_missing_dir_link() self.assertFalse(os.path.isdir(self.missing_link)) def test_rmdir_on_directory_link_to_missing_target(self): self._create_missing_dir_link() os.rmdir(self.missing_link) def check_stat(self, link, target): self.assertEqual(os.stat(link), os.stat(target)) self.assertNotEqual(os.lstat(link), os.stat(link)) bytes_link = os.fsencode(link) self.assertEqual(os.stat(bytes_link), os.stat(target)) self.assertNotEqual(os.lstat(bytes_link), os.stat(bytes_link)) def test_12084(self): level1 = os.path.abspath(os_helper.TESTFN) level2 = os.path.join(level1, "level2") level3 = os.path.join(level2, "level3") self.addCleanup(os_helper.rmtree, level1) os.mkdir(level1) os.mkdir(level2) os.mkdir(level3) file1 = os.path.abspath(os.path.join(level1, "file1")) create_file(file1) orig_dir = os.getcwd() try: os.chdir(level2) link = os.path.join(level2, "link") os.symlink(os.path.relpath(file1), "link") self.assertIn("link", os.listdir(os.getcwd())) # Check os.stat calls from the same dir as the link self.assertEqual(os.stat(file1), os.stat("link")) # Check os.stat calls from a dir below the link os.chdir(level1) self.assertEqual(os.stat(file1), os.stat(os.path.relpath(link))) # Check os.stat calls from a dir above the link os.chdir(level3) self.assertEqual(os.stat(file1), os.stat(os.path.relpath(link))) finally: os.chdir(orig_dir) @unittest.skipUnless(os.path.lexists(r'C:\Users\All Users') and os.path.exists(r'C:\ProgramData'), 'Test directories not found') def test_29248(self): # os.symlink() calls CreateSymbolicLink, which creates # the reparse data buffer with the print name stored # first, so the offset is always 0. CreateSymbolicLink # stores the "PrintName" DOS path (e.g. "C:\") first, # with an offset of 0, followed by the "SubstituteName" # NT path (e.g. "\??\C:\"). The "All Users" link, on # the other hand, seems to have been created manually # with an inverted order. target = os.readlink(r'C:\Users\All Users') self.assertTrue(os.path.samefile(target, r'C:\ProgramData')) def test_buffer_overflow(self): # Older versions would have a buffer overflow when detecting # whether a link source was a directory. This test ensures we # no longer crash, but does not otherwise validate the behavior segment = 'X' * 27 path = os.path.join(*[segment] * 10) test_cases = [ # overflow with absolute src ('\\' + path, segment), # overflow dest with relative src (segment, path), # overflow when joining src (path[:180], path[:180]), ] for src, dest in test_cases: try: os.symlink(src, dest) except FileNotFoundError: pass else: try: os.remove(dest) except OSError: pass # Also test with bytes, since that is a separate code path. try: os.symlink(os.fsencode(src), os.fsencode(dest)) except FileNotFoundError: pass else: try: os.remove(dest) except OSError: pass def test_appexeclink(self): root = os.path.expandvars(r'%LOCALAPPDATA%\Microsoft\WindowsApps') if not os.path.isdir(root): self.skipTest("test requires a WindowsApps directory") aliases = [os.path.join(root, a) for a in fnmatch.filter(os.listdir(root), '*.exe')] for alias in aliases: if support.verbose: print() print("Testing with", alias) st = os.lstat(alias) self.assertEqual(st, os.stat(alias)) self.assertFalse(stat.S_ISLNK(st.st_mode)) self.assertEqual(st.st_reparse_tag, stat.IO_REPARSE_TAG_APPEXECLINK) self.assertTrue(os.path.isfile(alias)) # testing the first one we see is sufficient break else: self.skipTest("test requires an app execution alias") class Win32JunctionTests(unittest.TestCase): junction = 'junctiontest' junction_target = os.path.dirname(os.path.abspath(__file__)) def setUp(self): assert os.path.exists(self.junction_target) assert not os.path.lexists(self.junction) def tearDown(self): if os.path.lexists(self.junction): os.unlink(self.junction) def test_create_junction(self): _winapi.CreateJunction(self.junction_target, self.junction) self.assertTrue(os.path.lexists(self.junction)) self.assertTrue(os.path.exists(self.junction)) self.assertTrue(os.path.isdir(self.junction)) self.assertNotEqual(os.stat(self.junction), os.lstat(self.junction)) self.assertEqual(os.stat(self.junction), os.stat(self.junction_target)) # bpo-37834: Junctions are not recognized as links. self.assertFalse(os.path.islink(self.junction)) self.assertEqual(os.path.normcase("\\\\?\\" + self.junction_target), os.path.normcase(os.readlink(self.junction))) def test_unlink_removes_junction(self): _winapi.CreateJunction(self.junction_target, self.junction) self.assertTrue(os.path.exists(self.junction)) self.assertTrue(os.path.lexists(self.junction)) os.unlink(self.junction) self.assertFalse(os.path.exists(self.junction)) class Win32NtTests(unittest.TestCase): def test_getfinalpathname_handles(self): nt = import_helper.import_module('nt') ctypes = import_helper.import_module('ctypes') # Ruff false positive -- it thinks we're redefining `ctypes` here import ctypes.wintypes # noqa: F811 kernel = ctypes.WinDLL('Kernel32.dll', use_last_error=True) kernel.GetCurrentProcess.restype = ctypes.wintypes.HANDLE kernel.GetProcessHandleCount.restype = ctypes.wintypes.BOOL kernel.GetProcessHandleCount.argtypes = (ctypes.wintypes.HANDLE, ctypes.wintypes.LPDWORD) # This is a pseudo-handle that doesn't need to be closed hproc = kernel.GetCurrentProcess() handle_count = ctypes.wintypes.DWORD() ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) self.assertEqual(1, ok) before_count = handle_count.value # The first two test the error path, __file__ tests the success path filenames = [ r'\\?\C:', r'\\?\NUL', r'\\?\CONIN', __file__, ] for _ in range(10): for name in filenames: try: nt._getfinalpathname(name) except Exception: # Failure is expected pass try: os.stat(name) except Exception: pass ok = kernel.GetProcessHandleCount(hproc, ctypes.byref(handle_count)) self.assertEqual(1, ok) handle_delta = handle_count.value - before_count self.assertEqual(0, handle_delta) @support.requires_subprocess() def test_stat_unlink_race(self): # bpo-46785: the implementation of os.stat() falls back to reading # the parent directory if CreateFileW() fails with a permission # error. If reading the parent directory fails because the file or # directory are subsequently unlinked, or because the volume or # share are no longer available, then the original permission error # should not be restored. filename = os_helper.TESTFN self.addCleanup(os_helper.unlink, filename) deadline = time.time() + 5 command = textwrap.dedent("""\ import os import sys import time filename = sys.argv[1] deadline = float(sys.argv[2]) while time.time() < deadline: try: with open(filename, "w") as f: pass except OSError: pass try: os.remove(filename) except OSError: pass """) with subprocess.Popen([sys.executable, '-c', command, filename, str(deadline)]) as proc: while time.time() < deadline: try: os.stat(filename) except FileNotFoundError as e: assert e.winerror == 2 # ERROR_FILE_NOT_FOUND try: proc.wait(1) except subprocess.TimeoutExpired: proc.terminate() @support.requires_subprocess() def test_stat_inaccessible_file(self): filename = os_helper.TESTFN ICACLS = os.path.expandvars(r"%SystemRoot%\System32\icacls.exe") with open(filename, "wb") as f: f.write(b'Test data') stat1 = os.stat(filename) try: # Remove all permissions from the file subprocess.check_output([ICACLS, filename, "/inheritance:r"], stderr=subprocess.STDOUT) except subprocess.CalledProcessError as ex: if support.verbose: print(ICACLS, filename, "/inheritance:r", "failed.") print(ex.stdout.decode("oem", "replace").rstrip()) try: os.unlink(filename) except OSError: pass self.skipTest("Unable to create inaccessible file") def cleanup(): # Give delete permission to the owner (us) subprocess.check_output([ICACLS, filename, "/grant", "*WD:(D)"], stderr=subprocess.STDOUT) os.unlink(filename) self.addCleanup(cleanup) if support.verbose: print("File:", filename) print("stat with access:", stat1) # First test - we shouldn't raise here, because we still have access to # the directory and can extract enough information from its metadata. stat2 = os.stat(filename) if support.verbose: print(" without access:", stat2) # We may not get st_dev/st_ino, so ensure those are 0 or match self.assertIn(stat2.st_dev, (0, stat1.st_dev)) self.assertIn(stat2.st_ino, (0, stat1.st_ino)) # st_mode and st_size should match (for a normal file, at least) self.assertEqual(stat1.st_mode, stat2.st_mode) self.assertEqual(stat1.st_size, stat2.st_size) # st_ctime and st_mtime should be the same self.assertEqual(stat1.st_ctime, stat2.st_ctime) self.assertEqual(stat1.st_mtime, stat2.st_mtime) # st_atime should be the same or later self.assertGreaterEqual(stat1.st_atime, stat2.st_atime) if __name__ == "__main__": unittest.main()