mirror of
https://github.com/python/cpython.git
synced 2026-04-13 15:20:52 +00:00
gh-66305: Fix a hang on Windows in the tempfile module (GH-144672)
It occurred when trying to create a temporary file or subdirectory in a non-writable directory.
This commit is contained in:
parent
b32c830d44
commit
ca66d3c40c
3 changed files with 46 additions and 24 deletions
|
|
@ -57,10 +57,11 @@
|
|||
if hasattr(_os, 'O_BINARY'):
|
||||
_bin_openflags |= _os.O_BINARY
|
||||
|
||||
if hasattr(_os, 'TMP_MAX'):
|
||||
TMP_MAX = _os.TMP_MAX
|
||||
else:
|
||||
TMP_MAX = 10000
|
||||
# This is more than enough.
|
||||
# Each name contains over 40 random bits. Even with a million temporary
|
||||
# files, the chance of a conflict is less than 1 in a million, and with
|
||||
# 20 attempts, it is less than 1e-120.
|
||||
TMP_MAX = 20
|
||||
|
||||
# This variable _was_ unused for legacy reasons, see issue 10354.
|
||||
# But as of 3.5 we actually use it at runtime so changing it would
|
||||
|
|
@ -196,8 +197,7 @@ def _get_default_tempdir(dirlist=None):
|
|||
for dir in dirlist:
|
||||
if dir != _os.curdir:
|
||||
dir = _os.path.abspath(dir)
|
||||
# Try only a few names per directory.
|
||||
for seq in range(100):
|
||||
for seq in range(TMP_MAX):
|
||||
name = next(namer)
|
||||
filename = _os.path.join(dir, name)
|
||||
try:
|
||||
|
|
@ -213,10 +213,8 @@ def _get_default_tempdir(dirlist=None):
|
|||
except FileExistsError:
|
||||
pass
|
||||
except PermissionError:
|
||||
# This exception is thrown when a directory with the chosen name
|
||||
# already exists on windows.
|
||||
if (_os.name == 'nt' and _os.path.isdir(dir) and
|
||||
_os.access(dir, _os.W_OK)):
|
||||
# See the comment in mkdtemp().
|
||||
if _os.name == 'nt' and _os.path.isdir(dir):
|
||||
continue
|
||||
break # no point trying more names in this directory
|
||||
except OSError:
|
||||
|
|
@ -258,10 +256,8 @@ def _mkstemp_inner(dir, pre, suf, flags, output_type):
|
|||
except FileExistsError:
|
||||
continue # try again
|
||||
except PermissionError:
|
||||
# This exception is thrown when a directory with the chosen name
|
||||
# already exists on windows.
|
||||
if (_os.name == 'nt' and _os.path.isdir(dir) and
|
||||
_os.access(dir, _os.W_OK)):
|
||||
# See the comment in mkdtemp().
|
||||
if _os.name == 'nt' and _os.path.isdir(dir) and seq < TMP_MAX - 1:
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
|
|
@ -386,10 +382,14 @@ def mkdtemp(suffix=None, prefix=None, dir=None):
|
|||
except FileExistsError:
|
||||
continue # try again
|
||||
except PermissionError:
|
||||
# This exception is thrown when a directory with the chosen name
|
||||
# already exists on windows.
|
||||
if (_os.name == 'nt' and _os.path.isdir(dir) and
|
||||
_os.access(dir, _os.W_OK)):
|
||||
# On Posix, this exception is raised when the user has no
|
||||
# write access to the parent directory.
|
||||
# On Windows, it is also raised when a directory with
|
||||
# the chosen name already exists, or if the parent directory
|
||||
# is not a directory.
|
||||
# We cannot distinguish between "directory-exists-error" and
|
||||
# "access-denied-error".
|
||||
if _os.name == 'nt' and _os.path.isdir(dir) and seq < TMP_MAX - 1:
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
|
|
|
|||
|
|
@ -330,17 +330,36 @@ def _mock_candidate_names(*names):
|
|||
class TestBadTempdir:
|
||||
def test_read_only_directory(self):
|
||||
with _inside_empty_temp_dir():
|
||||
oldmode = mode = os.stat(tempfile.tempdir).st_mode
|
||||
mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
|
||||
os.chmod(tempfile.tempdir, mode)
|
||||
probe = os.path.join(tempfile.tempdir, 'probe')
|
||||
if os.name == 'nt':
|
||||
cmd = ['icacls', tempfile.tempdir, '/deny', 'Everyone:(W)']
|
||||
stdout = None if support.verbose > 1 else subprocess.DEVNULL
|
||||
subprocess.run(cmd, check=True, stdout=stdout)
|
||||
else:
|
||||
oldmode = mode = os.stat(tempfile.tempdir).st_mode
|
||||
mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
|
||||
mode = stat.S_IREAD
|
||||
os.chmod(tempfile.tempdir, mode)
|
||||
try:
|
||||
if os.access(tempfile.tempdir, os.W_OK):
|
||||
# Check that the directory is read-only.
|
||||
try:
|
||||
os.mkdir(probe)
|
||||
except PermissionError:
|
||||
pass
|
||||
else:
|
||||
os.rmdir(probe)
|
||||
self.skipTest("can't set the directory read-only")
|
||||
# gh-66305: Now it takes a split second, but previously
|
||||
# it took about 10 days on Windows.
|
||||
with self.assertRaises(PermissionError):
|
||||
self.make_temp()
|
||||
self.assertEqual(os.listdir(tempfile.tempdir), [])
|
||||
finally:
|
||||
os.chmod(tempfile.tempdir, oldmode)
|
||||
if os.name == 'nt':
|
||||
cmd = ['icacls', tempfile.tempdir, '/grant:r', 'Everyone:(M)']
|
||||
subprocess.run(cmd, check=True, stdout=stdout)
|
||||
else:
|
||||
os.chmod(tempfile.tempdir, oldmode)
|
||||
self.assertEqual(os.listdir(tempfile.tempdir), [])
|
||||
|
||||
def test_nonexisting_directory(self):
|
||||
with _inside_empty_temp_dir():
|
||||
|
|
|
|||
|
|
@ -0,0 +1,3 @@
|
|||
Fixed a hang on Windows in the :mod:`tempfile` module when
|
||||
trying to create a temporary file or subdirectory in a non-writable
|
||||
directory.
|
||||
Loading…
Add table
Add a link
Reference in a new issue