cpython/Lib/test/test_profiling/test_sampling_profiler/helpers.py
2026-05-05 00:44:37 +00:00

202 lines
6.2 KiB
Python

"""Helper utilities for sampling profiler tests."""
import contextlib
import socket
import subprocess
import sys
import unittest
from collections import namedtuple
from test.support import SHORT_TIMEOUT
from test.support.socket_helper import find_unused_port
from test.support.os_helper import unlink
PROCESS_VM_READV_SUPPORTED = False
try:
from _remote_debugging import PROCESS_VM_READV_SUPPORTED # noqa: F401
import _remote_debugging # noqa: F401
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
else:
import profiling.sampling # noqa: F401
from profiling.sampling.sample import SampleProfiler # noqa: F401
skip_if_not_supported = unittest.skipIf(
(
sys.platform != "darwin"
and sys.platform != "linux"
and sys.platform != "win32"
),
"Test only runs on Linux, Windows and MacOS",
)
SubprocessInfo = namedtuple("SubprocessInfo", ["process", "socket"])
def _wait_for_signal(sock, expected_signals, timeout=SHORT_TIMEOUT):
"""
Wait for expected signal(s) from a socket with proper timeout and EOF handling.
Args:
sock: Connected socket to read from
expected_signals: Single bytes object or list of bytes objects to wait for
timeout: Socket timeout in seconds
Returns:
bytes: Complete accumulated response buffer
Raises:
RuntimeError: If connection closed before signal received or timeout
"""
if isinstance(expected_signals, bytes):
expected_signals = [expected_signals]
sock.settimeout(timeout)
buffer = b""
while True:
# Check if all expected signals are in buffer
if all(sig in buffer for sig in expected_signals):
return buffer
try:
chunk = sock.recv(4096)
if not chunk:
raise RuntimeError(
f"Connection closed before receiving expected signals. "
f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
)
buffer += chunk
except socket.timeout:
raise RuntimeError(
f"Timeout waiting for signals. "
f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
) from None
except OSError as e:
raise RuntimeError(
f"Socket error while waiting for signals: {e}. "
f"Expected: {expected_signals}, Got: {buffer[-200:]!r}"
) from None
def _cleanup_sockets(*sockets):
"""Safely close multiple sockets, ignoring errors."""
for sock in sockets:
if sock is not None:
try:
sock.close()
except OSError:
pass
def _cleanup_process(proc, timeout=SHORT_TIMEOUT):
"""Terminate a process gracefully, escalating to kill if needed."""
if proc.poll() is not None:
return
proc.terminate()
try:
proc.wait(timeout=timeout)
return
except subprocess.TimeoutExpired:
pass
proc.kill()
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
pass # Process refuses to die, nothing more we can do
@contextlib.contextmanager
def test_subprocess(script, wait_for_working=False):
"""Context manager to create a test subprocess with socket synchronization.
Args:
script: Python code to execute in the subprocess. If wait_for_working
is True, script should send b"working" after starting work.
wait_for_working: If True, wait for both "ready" and "working" signals.
Default False for backward compatibility.
Yields:
SubprocessInfo: Named tuple with process and socket objects
"""
# Find an unused port for socket communication
port = find_unused_port()
# Inject socket connection code at the beginning of the script
socket_code = f"""
import socket
_test_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_test_sock.connect(('localhost', {port}))
_test_sock.sendall(b"ready")
"""
# Combine socket code with user script
full_script = socket_code + script
# Create server socket to wait for process to be ready
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
proc = subprocess.Popen(
[sys.executable, "-c", full_script],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
client_socket = None
try:
# Wait for process to connect and send ready signal
client_socket, _ = server_socket.accept()
server_socket.close()
server_socket = None
# Wait for ready signal, and optionally working signal
if wait_for_working:
_wait_for_signal(client_socket, [b"ready", b"working"])
else:
_wait_for_signal(client_socket, b"ready")
yield SubprocessInfo(proc, client_socket)
finally:
_cleanup_sockets(client_socket, server_socket)
_cleanup_process(proc)
def close_and_unlink(file):
"""Close a file and unlink it from the filesystem."""
file.close()
unlink(file.name)
def jsonl_tables(records):
"""Extract the canonical sections of a parsed JSONL profile.
Returns ``(meta, str_defs, frame_defs, agg, end)`` where ``str_defs`` is a
``{str_id: value}`` dict, ``frame_defs`` is a flat list of all frame
definitions across chunks, and ``agg`` is the first agg record (sufficient
for tests that only emit one chunk).
"""
meta = next(record for record in records if record["type"] == "meta")
end = next(record for record in records if record["type"] == "end")
agg = next(record for record in records if record["type"] == "agg")
str_defs = {
item["str_id"]: item["value"]
for record in records
if record["type"] == "string_table"
for item in record["strings"]
}
frame_defs = [
item
for record in records
if record["type"] == "frame_table"
for item in record["frames"]
]
return meta, str_defs, frame_defs, agg, end