[3.14] gh-131788: make resource_tracker re-entrant safe (GH-131787) (#137737)

gh-131788: make resource_tracker re-entrant safe (GH-131787)

* make resource_tracker re-entrant safe
* Update Lib/multiprocessing/resource_tracker.py
* trim trailing whitespace
* use f-string and args = [x, *y, z]
* raise self._reentrant_call_error

---------
(cherry picked from commit f24a012350)

Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Gregory P. Smith <greg@krypto.org>
Co-authored-by: Gregory P. Smith <68491+gpshead@users.noreply.github.com>
This commit is contained in:
Miss Islington (bot) 2025-10-17 06:25:14 +02:00 committed by GitHub
parent 32e60fa220
commit 5513f6a99d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 95 additions and 71 deletions

View file

@ -20,6 +20,7 @@
import sys
import threading
import warnings
from collections import deque
from . import spawn
from . import util
@ -62,6 +63,7 @@ def __init__(self):
self._fd = None
self._pid = None
self._exitcode = None
self._reentrant_messages = deque()
def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
@ -98,7 +100,7 @@ def _stop_locked(
# This shouldn't happen (it might when called by a finalizer)
# so we check for it anyway.
if self._lock._recursion_count() > 1:
return self._reentrant_call_error()
raise self._reentrant_call_error()
if self._fd is None:
# not running
return
@ -128,16 +130,9 @@ def ensure_running(self):
This can be run from any process. Usually a child process will use
the resource created by its parent.'''
with self._lock:
if self._lock._recursion_count() > 1:
# The code below is certainly not reentrant-safe, so bail out
return self._reentrant_call_error()
if self._fd is not None:
# resource tracker was launched before, is it still running?
if self._check_alive():
# => still alive
return
# => dead, launch it again
return self._ensure_running_and_write()
def _teardown_dead_process(self):
os.close(self._fd)
# Clean-up to avoid dangling processes.
@ -156,19 +151,23 @@ def ensure_running(self):
warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')
def _launch(self):
fds_to_pass = []
try:
fds_to_pass.append(sys.stderr.fileno())
except Exception:
pass
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
r, w = os.pipe()
try:
fds_to_pass.append(r)
# process will out live us, so no need to wait on pid
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd % r]
args = [
exe,
*util._args_from_interpreter_flags(),
'-c',
f'from multiprocessing.resource_tracker import main;main({r})',
]
# bpo-33613: Register a signal mask that will block the signals.
# This signal mask will be inherited by the child that is going
# to be spawned and will protect the child from a race condition
@ -192,6 +191,39 @@ def ensure_running(self):
finally:
os.close(r)
def _ensure_running_and_write(self, msg=None):
with self._lock:
if self._lock._recursion_count() > 1:
# The code below is certainly not reentrant-safe, so bail out
if msg is None:
raise self._reentrant_call_error()
return self._reentrant_messages.append(msg)
if self._fd is not None:
# resource tracker was launched before, is it still running?
if msg is None:
to_send = b'PROBE:0:noop\n'
else:
to_send = msg
try:
self._write(to_send)
except OSError:
self._teardown_dead_process()
self._launch()
msg = None # message was sent in probe
else:
self._launch()
while True:
try:
reentrant_msg = self._reentrant_messages.popleft()
except IndexError:
break
self._write(reentrant_msg)
if msg is not None:
self._write(msg)
def _check_alive(self):
'''Check that the pipe has not been closed by sending a probe.'''
try:
@ -211,27 +243,18 @@ def unregister(self, name, rtype):
'''Unregister name of resource with resource tracker.'''
self._send('UNREGISTER', name, rtype)
def _write(self, msg):
nbytes = os.write(self._fd, msg)
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
def _send(self, cmd, name, rtype):
try:
self.ensure_running()
except ReentrantCallError:
# The code below might or might not work, depending on whether
# the resource tracker was already running and still alive.
# Better warn the user.
# (XXX is warnings.warn itself reentrant-safe? :-)
warnings.warn(
f"ResourceTracker called reentrantly for resource cleanup, "
f"which is unsupported. "
f"The {rtype} object {name!r} might leak.")
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
if len(msg) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('msg too long')
nbytes = os.write(self._fd, msg)
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
nbytes, len(msg))
self._ensure_running_and_write(msg)
_resource_tracker = ResourceTracker()
ensure_running = _resource_tracker.ensure_running

View file

@ -0,0 +1 @@
Make ``ResourceTracker.send`` from :mod:`multiprocessing` re-entrant safe