mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
gh-142206: multiprocessing.resource_tracker: Decode messages using older protocol (GH-142215)
This commit is contained in:
parent
88cd5d9850
commit
4172644d78
3 changed files with 73 additions and 20 deletions
|
|
@ -68,6 +68,13 @@ def __init__(self):
|
||||||
self._exitcode = None
|
self._exitcode = None
|
||||||
self._reentrant_messages = deque()
|
self._reentrant_messages = deque()
|
||||||
|
|
||||||
|
# True to use colon-separated lines, rather than JSON lines,
|
||||||
|
# for internal communication. (Mainly for testing).
|
||||||
|
# Filenames not supported by the simple format will always be sent
|
||||||
|
# using JSON.
|
||||||
|
# The reader should understand all formats.
|
||||||
|
self._use_simple_format = False
|
||||||
|
|
||||||
def _reentrant_call_error(self):
|
def _reentrant_call_error(self):
|
||||||
# gh-109629: this happens if an explicit call to the ResourceTracker
|
# gh-109629: this happens if an explicit call to the ResourceTracker
|
||||||
# gets interrupted by a garbage collection, invoking a finalizer (*)
|
# gets interrupted by a garbage collection, invoking a finalizer (*)
|
||||||
|
|
@ -200,7 +207,9 @@ def _launch(self):
|
||||||
os.close(r)
|
os.close(r)
|
||||||
|
|
||||||
def _make_probe_message(self):
|
def _make_probe_message(self):
|
||||||
"""Return a JSON-encoded probe message."""
|
"""Return a probe message."""
|
||||||
|
if self._use_simple_format:
|
||||||
|
return b'PROBE:0:noop\n'
|
||||||
return (
|
return (
|
||||||
json.dumps(
|
json.dumps(
|
||||||
{"cmd": "PROBE", "rtype": "noop"},
|
{"cmd": "PROBE", "rtype": "noop"},
|
||||||
|
|
@ -267,6 +276,15 @@ def _write(self, msg):
|
||||||
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
|
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
|
||||||
|
|
||||||
def _send(self, cmd, name, rtype):
|
def _send(self, cmd, name, rtype):
|
||||||
|
if self._use_simple_format and '\n' not in name:
|
||||||
|
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
|
||||||
|
if len(msg) > 512:
|
||||||
|
# posix guarantees that writes to a pipe of less than PIPE_BUF
|
||||||
|
# bytes are atomic, and that PIPE_BUF >= 512
|
||||||
|
raise ValueError('msg too long')
|
||||||
|
self._ensure_running_and_write(msg)
|
||||||
|
return
|
||||||
|
|
||||||
# POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
|
# POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
|
||||||
# bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
|
# bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
|
||||||
# POSIX shm_open() and sem_open() require the name, including its leading slash,
|
# POSIX shm_open() and sem_open() require the name, including its leading slash,
|
||||||
|
|
@ -286,6 +304,7 @@ def _send(self, cmd, name, rtype):
|
||||||
|
|
||||||
# The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
|
# The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
|
||||||
assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)"
|
assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)"
|
||||||
|
assert msg.startswith(b'{')
|
||||||
|
|
||||||
self._ensure_running_and_write(msg)
|
self._ensure_running_and_write(msg)
|
||||||
|
|
||||||
|
|
@ -296,6 +315,30 @@ def _send(self, cmd, name, rtype):
|
||||||
getfd = _resource_tracker.getfd
|
getfd = _resource_tracker.getfd
|
||||||
|
|
||||||
|
|
||||||
|
def _decode_message(line):
|
||||||
|
if line.startswith(b'{'):
|
||||||
|
try:
|
||||||
|
obj = json.loads(line.decode('ascii'))
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
|
||||||
|
|
||||||
|
cmd = obj["cmd"]
|
||||||
|
rtype = obj["rtype"]
|
||||||
|
b64 = obj.get("base64_name", "")
|
||||||
|
|
||||||
|
if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
|
||||||
|
raise ValueError("malformed resource_tracker fields: %r" % (obj,))
|
||||||
|
|
||||||
|
try:
|
||||||
|
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
|
||||||
|
except ValueError as e:
|
||||||
|
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
|
||||||
|
else:
|
||||||
|
cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1)
|
||||||
|
name, rtype = rest.rsplit(':', maxsplit=1)
|
||||||
|
return cmd, rtype, name
|
||||||
|
|
||||||
|
|
||||||
def main(fd):
|
def main(fd):
|
||||||
'''Run resource tracker.'''
|
'''Run resource tracker.'''
|
||||||
# protect the process from ^C and "killall python" etc
|
# protect the process from ^C and "killall python" etc
|
||||||
|
|
@ -318,23 +361,7 @@ def main(fd):
|
||||||
with open(fd, 'rb') as f:
|
with open(fd, 'rb') as f:
|
||||||
for line in f:
|
for line in f:
|
||||||
try:
|
try:
|
||||||
try:
|
cmd, rtype, name = _decode_message(line)
|
||||||
obj = json.loads(line.decode('ascii'))
|
|
||||||
except Exception as e:
|
|
||||||
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
|
|
||||||
|
|
||||||
cmd = obj["cmd"]
|
|
||||||
rtype = obj["rtype"]
|
|
||||||
b64 = obj.get("base64_name", "")
|
|
||||||
|
|
||||||
if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
|
|
||||||
raise ValueError("malformed resource_tracker fields: %r" % (obj,))
|
|
||||||
|
|
||||||
try:
|
|
||||||
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
|
|
||||||
except ValueError as e:
|
|
||||||
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
|
|
||||||
|
|
||||||
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
|
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
|
||||||
if cleanup_func is None:
|
if cleanup_func is None:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
|
|
|
||||||
|
|
@ -39,6 +39,7 @@
|
||||||
from test.support import socket_helper
|
from test.support import socket_helper
|
||||||
from test.support import threading_helper
|
from test.support import threading_helper
|
||||||
from test.support import warnings_helper
|
from test.support import warnings_helper
|
||||||
|
from test.support import subTests
|
||||||
from test.support.script_helper import assert_python_failure, assert_python_ok
|
from test.support.script_helper import assert_python_failure, assert_python_ok
|
||||||
|
|
||||||
# Skip tests if _multiprocessing wasn't built.
|
# Skip tests if _multiprocessing wasn't built.
|
||||||
|
|
@ -4383,6 +4384,19 @@ def test_copy(self):
|
||||||
self.assertEqual(bar.z, 2 ** 33)
|
self.assertEqual(bar.z, 2 ** 33)
|
||||||
|
|
||||||
|
|
||||||
|
def resource_tracker_format_subtests(func):
|
||||||
|
"""Run given test using both resource tracker communication formats"""
|
||||||
|
def _inner(self, *args, **kwargs):
|
||||||
|
tracker = resource_tracker._resource_tracker
|
||||||
|
for use_simple_format in False, True:
|
||||||
|
with (
|
||||||
|
self.subTest(use_simple_format=use_simple_format),
|
||||||
|
unittest.mock.patch.object(
|
||||||
|
tracker, '_use_simple_format', use_simple_format)
|
||||||
|
):
|
||||||
|
func(self, *args, **kwargs)
|
||||||
|
return _inner
|
||||||
|
|
||||||
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
|
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
|
||||||
@hashlib_helper.requires_hashdigest('sha256')
|
@hashlib_helper.requires_hashdigest('sha256')
|
||||||
class _TestSharedMemory(BaseTestCase):
|
class _TestSharedMemory(BaseTestCase):
|
||||||
|
|
@ -4662,6 +4676,7 @@ def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
|
||||||
smm.shutdown()
|
smm.shutdown()
|
||||||
|
|
||||||
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
|
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
|
||||||
|
@resource_tracker_format_subtests
|
||||||
def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
|
def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
|
||||||
# bpo-36867: test that a SharedMemoryManager uses the
|
# bpo-36867: test that a SharedMemoryManager uses the
|
||||||
# same resource_tracker process as its parent.
|
# same resource_tracker process as its parent.
|
||||||
|
|
@ -4913,6 +4928,7 @@ def test_shared_memory_cleaned_after_process_termination(self):
|
||||||
"shared_memory objects to clean up at shutdown", err)
|
"shared_memory objects to clean up at shutdown", err)
|
||||||
|
|
||||||
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
|
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
|
||||||
|
@resource_tracker_format_subtests
|
||||||
def test_shared_memory_untracking(self):
|
def test_shared_memory_untracking(self):
|
||||||
# gh-82300: When a separate Python process accesses shared memory
|
# gh-82300: When a separate Python process accesses shared memory
|
||||||
# with track=False, it must not cause the memory to be deleted
|
# with track=False, it must not cause the memory to be deleted
|
||||||
|
|
@ -4940,6 +4956,7 @@ def test_shared_memory_untracking(self):
|
||||||
mem.close()
|
mem.close()
|
||||||
|
|
||||||
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
|
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
|
||||||
|
@resource_tracker_format_subtests
|
||||||
def test_shared_memory_tracking(self):
|
def test_shared_memory_tracking(self):
|
||||||
# gh-82300: When a separate Python process accesses shared memory
|
# gh-82300: When a separate Python process accesses shared memory
|
||||||
# with track=True, it must cause the memory to be deleted when
|
# with track=True, it must cause the memory to be deleted when
|
||||||
|
|
@ -7353,13 +7370,18 @@ def test_forkpty(self):
|
||||||
|
|
||||||
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
|
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
|
||||||
class TestSharedMemoryNames(unittest.TestCase):
|
class TestSharedMemoryNames(unittest.TestCase):
|
||||||
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self):
|
@subTests('use_simple_format', (True, False))
|
||||||
|
def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(
|
||||||
|
self, use_simple_format):
|
||||||
# Test script that creates and cleans up shared memory with colon in name
|
# Test script that creates and cleans up shared memory with colon in name
|
||||||
test_script = textwrap.dedent("""
|
test_script = textwrap.dedent("""
|
||||||
import sys
|
import sys
|
||||||
from multiprocessing import shared_memory
|
from multiprocessing import shared_memory
|
||||||
|
from multiprocessing import resource_tracker
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
resource_tracker._resource_tracker._use_simple_format = %s
|
||||||
|
|
||||||
# Test various patterns of colons in names
|
# Test various patterns of colons in names
|
||||||
test_names = [
|
test_names = [
|
||||||
"a:b",
|
"a:b",
|
||||||
|
|
@ -7387,7 +7409,7 @@ def test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
print("SUCCESS")
|
print("SUCCESS")
|
||||||
""")
|
""" % use_simple_format)
|
||||||
|
|
||||||
rc, out, err = assert_python_ok("-c", test_script)
|
rc, out, err = assert_python_ok("-c", test_script)
|
||||||
self.assertIn(b"SUCCESS", out)
|
self.assertIn(b"SUCCESS", out)
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,4 @@
|
||||||
|
The resource tracker in the :mod:`multiprocessing` module can now understand
|
||||||
|
messages from older versions of itself. This avoids issues with upgrading
|
||||||
|
Python while it is running. (Note that such 'in-place' upgrades are not
|
||||||
|
tested.)
|
||||||
Loading…
Add table
Add a link
Reference in a new issue