gh-138122: Add --subprocesses flag to profile child processes in tachyon (#142636)

This commit is contained in:
Pablo Galindo Salgado 2025-12-15 12:11:40 +00:00 committed by GitHub
parent 14e6052b43
commit 6658e2cb07
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 2702 additions and 562 deletions

View file

@ -295,21 +295,23 @@ The default configuration works well for most use cases:
:widths: 25 75
* - Option
- Default behavior
* - ``--interval`` / ``-i``
- Default
* - Default for ``--interval`` / ``-i``
- 100 µs between samples (~10,000 samples/sec)
* - ``--duration`` / ``-d``
- Profile for 10 seconds
* - ``--all-threads`` / ``-a``
- Sample main thread only
* - ``--native``
* - Default for ``--duration`` / ``-d``
- 10 seconds
* - Default for ``--all-threads`` / ``-a``
- Main thread only
* - Default for ``--native``
- No ``<native>`` frames (C code time attributed to caller)
* - ``--no-gc``
- Include ``<GC>`` frames when garbage collection is active
* - ``--mode``
* - Default for ``--no-gc``
- ``<GC>`` frames included when garbage collection is active
* - Default for ``--mode``
- Wall-clock mode (all samples recorded)
* - ``--realtime-stats``
- No live statistics display during profiling
* - Default for ``--realtime-stats``
- Disabled
* - Default for ``--subprocesses``
- Disabled
Sampling interval and duration
@ -442,6 +444,78 @@ working correctly and that sufficient samples are being collected. See
:ref:`sampling-efficiency` for details on interpreting these metrics.
Subprocess profiling
--------------------
The :option:`--subprocesses` option enables automatic profiling of subprocesses
spawned by the target::
python -m profiling.sampling run --subprocesses script.py
python -m profiling.sampling attach --subprocesses 12345
When enabled, the profiler monitors the target process for child process
creation. When a new Python child process is detected, a separate profiler
instance is automatically spawned to profile it. This is useful for
applications that use :mod:`multiprocessing`, :mod:`subprocess`,
:mod:`concurrent.futures` with :class:`~concurrent.futures.ProcessPoolExecutor`,
or other process spawning mechanisms.
.. code-block:: python
:caption: worker_pool.py
from concurrent.futures import ProcessPoolExecutor
import math
def compute_factorial(n):
total = 0
for i in range(50):
total += math.factorial(n)
return total
if __name__ == "__main__":
numbers = [5000 + i * 100 for i in range(50)]
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(compute_factorial, numbers))
print(f"Computed {len(results)} factorials")
::
python -m profiling.sampling run --subprocesses --flamegraph worker_pool.py
This produces separate flame graphs for the main process and each worker
process: ``flamegraph_<main_pid>.html``, ``flamegraph_<worker1_pid>.html``,
and so on.
Each subprocess receives its own output file. The filename is derived from
the specified output path (or the default) with the subprocess's process ID
appended:
- If you specify ``-o profile.html``, subprocesses produce ``profile_12345.html``,
``profile_12346.html``, and so on
- With default output, subprocesses produce files like ``flamegraph_12345.html``
or directories like ``heatmap_12345``
- For pstats format (which defaults to stdout), subprocesses produce files like
``profile_12345.pstats``
The subprocess profilers inherit most sampling options from the parent (interval,
duration, thread selection, native frames, GC frames, async-aware mode, and
output format). All Python descendant processes are profiled recursively,
including grandchildren and further descendants.
Subprocess detection works by periodically scanning for new descendants of
the target process and checking whether each new process is a Python process
by probing the process memory for Python runtime structures. Non-Python
subprocesses (such as shell commands or external tools) are ignored.
There is a limit of 100 concurrent subprocess profilers to prevent resource
exhaustion in programs that spawn many processes. If this limit is reached,
additional subprocesses are not profiled and a warning is printed.
The :option:`--subprocesses` option is incompatible with :option:`--live` mode
because live mode uses an interactive terminal interface that cannot
accommodate multiple concurrent profiler displays.
.. _sampling-efficiency:
Sampling efficiency
@ -1217,6 +1291,11 @@ Sampling options
Compatible with ``--live``, ``--flamegraph``, ``--heatmap``, and ``--gecko``
formats only.
.. option:: --subprocesses
Also profile subprocesses. Each subprocess gets its own profiler
instance and output file. Incompatible with ``--live``.
Mode options
------------

View file

@ -1994,6 +1994,7 @@ _PyStaticObjects_CheckRefcnt(PyInterpreterState *interp) {
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(readline));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(readonly));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(real));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(recursive));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(reducer_override));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(registry));
_PyStaticObject_CheckRefcnt((PyObject *)&_Py_ID(rel_tol));

View file

@ -717,6 +717,7 @@ struct _Py_global_strings {
STRUCT_FOR_ID(readline)
STRUCT_FOR_ID(readonly)
STRUCT_FOR_ID(real)
STRUCT_FOR_ID(recursive)
STRUCT_FOR_ID(reducer_override)
STRUCT_FOR_ID(registry)
STRUCT_FOR_ID(rel_tol)

View file

@ -1992,6 +1992,7 @@ extern "C" {
INIT_ID(readline), \
INIT_ID(readonly), \
INIT_ID(real), \
INIT_ID(recursive), \
INIT_ID(reducer_override), \
INIT_ID(registry), \
INIT_ID(rel_tol), \

View file

@ -2648,6 +2648,10 @@ _PyUnicode_InitStaticStrings(PyInterpreterState *interp) {
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
assert(PyUnicode_GET_LENGTH(string) != 1);
string = &_Py_ID(recursive);
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));
assert(PyUnicode_GET_LENGTH(string) != 1);
string = &_Py_ID(reducer_override);
_PyUnicode_InternStatic(interp, &string);
assert(_PyUnicode_CheckConsistency(string, 1));

View file

@ -0,0 +1,279 @@
"""
Child process monitoring for the sampling profiler.
This module monitors a target process for child process creation and spawns
separate profiler instances for each discovered child.
"""
import subprocess
import sys
import threading
import time
import _remote_debugging
# Polling interval for child process discovery
_CHILD_POLL_INTERVAL_SEC = 0.1
# Default timeout for waiting on child profilers
_DEFAULT_WAIT_TIMEOUT = 30.0
# Maximum number of child profilers to spawn (prevents resource exhaustion)
_MAX_CHILD_PROFILERS = 100
# Interval for cleaning up completed profilers (in polling cycles)
_CLEANUP_INTERVAL_CYCLES = 10
def get_child_pids(pid, recursive=True):
"""
Get all child process IDs of the given process.
Args:
pid: Process ID of the parent process
recursive: If True, return all descendants (children, grandchildren, etc.)
Returns:
List of child PIDs
"""
return _remote_debugging.get_child_pids(pid, recursive=recursive)
def is_python_process(pid):
"""
Check if a process is a Python process.
Args:
pid: Process ID to check
Returns:
bool: True if the process appears to be a Python process, False otherwise
"""
return _remote_debugging.is_python_process(pid)
class ChildProcessMonitor:
"""
Monitors a target process for child processes and spawns profilers for them.
Use as a context manager:
with ChildProcessMonitor(pid, cli_args, output_pattern) as monitor:
# monitoring runs here
monitor.wait_for_profilers() # optional: wait before cleanup
# cleanup happens automatically
"""
def __init__(self, pid, cli_args, output_pattern):
"""
Initialize the child process monitor.
Args:
pid: Parent process ID to monitor
cli_args: CLI arguments to pass to child profilers
output_pattern: Pattern for output files (format string with {pid})
"""
self.parent_pid = pid
self.cli_args = cli_args
self.output_pattern = output_pattern
self._known_children = set()
self._spawned_profilers = []
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._monitor_thread = None
self._poll_count = 0
def __enter__(self):
self._monitor_thread = threading.Thread(
target=self._monitor_loop,
daemon=True,
name=f"child-monitor-{self.parent_pid}",
)
self._monitor_thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._stop_event.set()
if self._monitor_thread is not None:
self._monitor_thread.join(timeout=2.0)
if self._monitor_thread.is_alive():
print(
"Warning: Monitor thread did not stop cleanly",
file=sys.stderr,
)
# Wait for child profilers to complete naturally
self.wait_for_profilers()
# Terminate any remaining profilers
with self._lock:
profilers_to_cleanup = list(self._spawned_profilers)
self._spawned_profilers.clear()
for proc in profilers_to_cleanup:
self._cleanup_process(proc)
return False
def _cleanup_process(self, proc, terminate_timeout=2.0, kill_timeout=1.0):
if proc.poll() is not None:
return # Already terminated
proc.terminate()
try:
proc.wait(timeout=terminate_timeout)
except subprocess.TimeoutExpired:
proc.kill()
try:
proc.wait(timeout=kill_timeout)
except subprocess.TimeoutExpired:
# Last resort: wait indefinitely to avoid zombie
# SIGKILL should always work, but we must reap the process
try:
proc.wait()
except Exception:
pass
@property
def spawned_profilers(self):
with self._lock:
return list(self._spawned_profilers)
def wait_for_profilers(self, timeout=_DEFAULT_WAIT_TIMEOUT):
"""
Wait for all spawned child profilers to complete.
Call this before exiting the context if you want profilers to finish
their work naturally rather than being terminated.
Args:
timeout: Maximum time to wait in seconds
"""
profilers = self.spawned_profilers
if not profilers:
return
print(
f"Waiting for {len(profilers)} child profiler(s) to complete...",
file=sys.stderr,
)
deadline = time.monotonic() + timeout
for proc in profilers:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
try:
proc.wait(timeout=max(0.1, remaining))
except subprocess.TimeoutExpired:
pass
def _monitor_loop(self):
# Note: There is an inherent TOCTOU race between discovering a child
# process and checking if it's Python. This is expected for process monitoring.
while not self._stop_event.is_set():
try:
self._poll_count += 1
# Periodically clean up completed profilers to avoid memory buildup
if self._poll_count % _CLEANUP_INTERVAL_CYCLES == 0:
self._cleanup_completed_profilers()
children = set(get_child_pids(self.parent_pid, recursive=True))
with self._lock:
new_children = children - self._known_children
self._known_children.update(new_children)
for child_pid in new_children:
# Only spawn profiler if this is actually a Python process
if is_python_process(child_pid):
self._spawn_profiler_for_child(child_pid)
except ProcessLookupError:
# Parent process exited, stop monitoring
break
except Exception as e:
# Log error but continue monitoring
print(
f"Warning: Error in child monitor loop: {e}",
file=sys.stderr,
)
self._stop_event.wait(timeout=_CHILD_POLL_INTERVAL_SEC)
def _cleanup_completed_profilers(self):
with self._lock:
# Keep only profilers that are still running
self._spawned_profilers = [
p for p in self._spawned_profilers if p.poll() is None
]
def _spawn_profiler_for_child(self, child_pid):
if self._stop_event.is_set():
return
# Check if we've reached the maximum number of child profilers
with self._lock:
if len(self._spawned_profilers) >= _MAX_CHILD_PROFILERS:
print(
f"Warning: Max child profilers ({_MAX_CHILD_PROFILERS}) reached, "
f"skipping PID {child_pid}",
file=sys.stderr,
)
return
cmd = [
sys.executable,
"-m",
"profiling.sampling",
"attach",
str(child_pid),
]
cmd.extend(self._build_child_cli_args(child_pid))
proc = None
try:
proc = subprocess.Popen(
cmd,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
with self._lock:
if self._stop_event.is_set():
self._cleanup_process(
proc, terminate_timeout=1.0, kill_timeout=1.0
)
return
self._spawned_profilers.append(proc)
print(
f"Started profiler for child process {child_pid}",
file=sys.stderr,
)
except Exception as e:
if proc is not None:
self._cleanup_process(
proc, terminate_timeout=1.0, kill_timeout=1.0
)
print(
f"Warning: Failed to start profiler for child {child_pid}: {e}",
file=sys.stderr,
)
def _build_child_cli_args(self, child_pid):
args = list(self.cli_args)
if self.output_pattern:
# Use replace() instead of format() to handle user filenames with braces
output_file = self.output_pattern.replace("{pid}", str(child_pid))
found_output = False
for i, arg in enumerate(args):
if arg in ("-o", "--output") and i + 1 < len(args):
args[i + 1] = output_file
found_output = True
break
if not found_output:
args.extend(["-o", output_file])
return args

View file

@ -8,6 +8,7 @@
import subprocess
import sys
import time
from contextlib import nullcontext
from .sample import sample, sample_live
from .pstats_collector import PstatsCollector
@ -83,6 +84,86 @@ class CustomFormatter(
"heatmap": HeatmapCollector,
}
def _setup_child_monitor(args, parent_pid):
from ._child_monitor import ChildProcessMonitor
# Build CLI args for child profilers (excluding --subprocesses to avoid recursion)
child_cli_args = _build_child_profiler_args(args)
# Build output pattern
output_pattern = _build_output_pattern(args)
return ChildProcessMonitor(
pid=parent_pid,
cli_args=child_cli_args,
output_pattern=output_pattern,
)
def _get_child_monitor_context(args, pid):
if getattr(args, 'subprocesses', False):
return _setup_child_monitor(args, pid)
return nullcontext()
def _build_child_profiler_args(args):
child_args = []
# Sampling options
child_args.extend(["-i", str(args.interval)])
child_args.extend(["-d", str(args.duration)])
if args.all_threads:
child_args.append("-a")
if args.realtime_stats:
child_args.append("--realtime-stats")
if args.native:
child_args.append("--native")
if not args.gc:
child_args.append("--no-gc")
if args.opcodes:
child_args.append("--opcodes")
if args.async_aware:
child_args.append("--async-aware")
async_mode = getattr(args, 'async_mode', 'running')
if async_mode != "running":
child_args.extend(["--async-mode", async_mode])
# Mode options
mode = getattr(args, 'mode', 'wall')
if mode != "wall":
child_args.extend(["--mode", mode])
# Format options (skip pstats as it's the default)
if args.format != "pstats":
child_args.append(f"--{args.format}")
return child_args
def _build_output_pattern(args):
"""Build output filename pattern for child profilers.
The pattern uses {pid} as a placeholder which will be replaced with the
actual child PID using str.replace(), so user filenames with braces are safe.
"""
if args.outfile:
# User specified output - add PID to filename
base, ext = os.path.splitext(args.outfile)
if ext:
return f"{base}_{{pid}}{ext}"
else:
return f"{args.outfile}_{{pid}}"
else:
# Use default pattern based on format (consistent _ separator)
extension = FORMAT_EXTENSIONS.get(args.format, "txt")
if args.format == "heatmap":
return "heatmap_{pid}"
if args.format == "pstats":
# pstats defaults to stdout, but for subprocesses we need files
return "profile_{pid}.pstats"
return f"{args.format}_{{pid}}.{extension}"
def _parse_mode(mode_string):
"""Convert mode string to mode constant."""
@ -255,6 +336,11 @@ def _add_sampling_options(parser):
action="store_true",
help="Enable async-aware profiling (uses task-based stack reconstruction)",
)
sampling_group.add_argument(
"--subprocesses",
action="store_true",
help="Also profile subprocesses. Each subprocess gets its own profiler and output file.",
)
def _add_mode_options(parser):
@ -413,7 +499,7 @@ def _generate_output_filename(format_type, pid):
# For heatmap, use cleaner directory name without extension
if format_type == "heatmap":
return f"heatmap_{pid}"
return f"{format_type}.{pid}.{extension}"
return f"{format_type}_{pid}.{extension}"
def _handle_output(collector, args, pid, mode):
@ -427,7 +513,12 @@ def _handle_output(collector, args, pid, mode):
"""
if args.format == "pstats":
if args.outfile:
collector.export(args.outfile)
# If outfile is a directory, generate filename inside it
if os.path.isdir(args.outfile):
filename = os.path.join(args.outfile, _generate_output_filename(args.format, pid))
collector.export(filename)
else:
collector.export(args.outfile)
else:
# Print to stdout with defaults applied
sort_choice = args.sort if args.sort is not None else "nsamples"
@ -438,7 +529,11 @@ def _handle_output(collector, args, pid, mode):
)
else:
# Export to file
filename = args.outfile or _generate_output_filename(args.format, pid)
if args.outfile and os.path.isdir(args.outfile):
# If outfile is a directory, generate filename inside it
filename = os.path.join(args.outfile, _generate_output_filename(args.format, pid))
else:
filename = args.outfile or _generate_output_filename(args.format, pid)
collector.export(filename)
@ -455,6 +550,11 @@ def _validate_args(args, parser):
"Live mode requires the curses module, which is not available."
)
# --subprocesses is incompatible with --live
if hasattr(args, 'subprocesses') and args.subprocesses:
if hasattr(args, 'live') and args.live:
parser.error("--subprocesses is incompatible with --live mode.")
# Async-aware mode is incompatible with --native, --no-gc, --mode, and --all-threads
if args.async_aware:
issues = []
@ -663,22 +763,20 @@ def _handle_attach(args):
# Create the appropriate collector
collector = _create_collector(args.format, args.interval, skip_idle, args.opcodes)
# Sample the process
collector = sample(
args.pid,
collector,
duration_sec=args.duration,
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
)
# Handle output
_handle_output(collector, args, args.pid, mode)
with _get_child_monitor_context(args, args.pid):
collector = sample(
args.pid,
collector,
duration_sec=args.duration,
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
)
_handle_output(collector, args, args.pid, mode)
def _handle_run(args):
@ -734,32 +832,31 @@ def _handle_run(args):
# Create the appropriate collector
collector = _create_collector(args.format, args.interval, skip_idle, args.opcodes)
# Profile the subprocess
try:
collector = sample(
process.pid,
collector,
duration_sec=args.duration,
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
)
# Handle output
_handle_output(collector, args, process.pid, mode)
finally:
# Clean up the subprocess
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=_PROCESS_KILL_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
with _get_child_monitor_context(args, process.pid):
try:
collector = sample(
process.pid,
collector,
duration_sec=args.duration,
all_threads=args.all_threads,
realtime_stats=args.realtime_stats,
mode=mode,
async_aware=args.async_mode if args.async_aware else None,
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
)
_handle_output(collector, args, process.pid, mode)
finally:
# Terminate the main subprocess - child profilers finish when their
# target processes exit
if process.poll() is None:
process.terminate()
try:
process.wait(timeout=_PROCESS_KILL_TIMEOUT)
except subprocess.TimeoutExpired:
process.kill()
process.wait()
def _handle_live_attach(args, pid):

View file

@ -39,6 +39,7 @@
"has_fork_support", "requires_fork",
"has_subprocess_support", "requires_subprocess",
"has_socket_support", "requires_working_socket",
"has_remote_subprocess_debugging", "requires_remote_subprocess_debugging",
"anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "skip_if_buggy_ucrt_strfptime",
"check_disallow_instantiation", "check_sanitizer", "skip_if_sanitizer",
@ -643,6 +644,93 @@ def requires_working_socket(*, module=False):
else:
return unittest.skipUnless(has_socket_support, msg)
@functools.cache
def has_remote_subprocess_debugging():
"""Check if we have permissions to debug subprocesses remotely.
Returns True if we have permissions, False if we don't.
Checks for:
- Platform support (Linux, macOS, Windows only)
- On Linux: process_vm_readv support
- _remote_debugging module availability
- Actual subprocess debugging permissions (e.g., macOS entitlements)
Result is cached.
"""
# Check platform support
if sys.platform not in ("linux", "darwin", "win32"):
return False
try:
import _remote_debugging
except ImportError:
return False
# On Linux, check for process_vm_readv support
if sys.platform == "linux":
if not getattr(_remote_debugging, "PROCESS_VM_READV_SUPPORTED", False):
return False
# First check if we can read our own process
if not _remote_debugging.is_python_process(os.getpid()):
return False
# Check subprocess access - debugging child processes may require
# additional permissions depending on platform security settings
import socket
import subprocess
# Create a socket for child to signal readiness
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("127.0.0.1", 0))
server.listen(1)
port = server.getsockname()[1]
# Child connects to signal it's ready, then waits for parent to close
child_code = f"""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", {port}))
s.recv(1) # Wait for parent to signal done
"""
proc = subprocess.Popen(
[sys.executable, "-c", child_code],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
try:
server.settimeout(5.0)
conn, _ = server.accept()
# Child is ready, test if we can probe it
result = _remote_debugging.is_python_process(proc.pid)
# Check if subprocess is still alive after probing
if proc.poll() is not None:
return False
conn.close() # Signal child to exit
return result
except (socket.timeout, OSError):
return False
finally:
server.close()
proc.kill()
proc.wait()
def requires_remote_subprocess_debugging():
"""Skip tests that require remote subprocess debugging permissions.
This also implies subprocess support, so no need to use both
@requires_subprocess() and @requires_remote_subprocess_debugging().
"""
if not has_subprocess_support:
return unittest.skip("requires subprocess support")
return unittest.skipUnless(
has_remote_subprocess_debugging(),
"requires remote subprocess debugging permissions"
)
# Does strftime() support glibc extension like '%4Y'?
has_strftime_extensions = False
if sys.platform != "win32":

View file

@ -15,6 +15,7 @@
SHORT_TIMEOUT,
busy_retry,
requires_gil_enabled,
requires_remote_subprocess_debugging,
)
from test.support.script_helper import make_script
from test.support.socket_helper import find_unused_port
@ -303,12 +304,7 @@ def _run_script_and_get_trace(
if wait_for_signals:
_wait_for_signal(client_socket, wait_for_signals)
try:
trace = trace_func(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
trace = trace_func(p.pid)
return trace, script_name
finally:
_cleanup_sockets(client_socket, server_socket)
@ -412,6 +408,7 @@ def _extract_coroutine_stacks_lineno_only(self, stack_trace):
# ============================================================================
@requires_remote_subprocess_debugging()
class TestGetStackTrace(RemoteInspectionTestBase):
@skip_if_not_supported
@unittest.skipIf(
@ -462,12 +459,7 @@ def foo():
client_socket, [b"ready:main", b"ready:thread"]
)
try:
stack_trace = get_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
stack_trace = get_stack_trace(p.pid)
# Find expected thread stack by funcname
found_thread = self._find_thread_with_frame(
@ -572,12 +564,7 @@ def new_eager_loop():
response = _wait_for_signal(client_socket, b"ready")
self.assertIn(b"ready", response)
try:
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
stack_trace = get_async_stack_trace(p.pid)
# Check all tasks are present
tasks_names = [
@ -755,12 +742,7 @@ async def main():
response = _wait_for_signal(client_socket, b"ready")
self.assertIn(b"ready", response)
try:
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
stack_trace = get_async_stack_trace(p.pid)
# For this simple asyncgen test, we only expect one task
self.assertEqual(len(stack_trace[0].awaited_by), 1)
@ -842,12 +824,7 @@ async def main():
response = _wait_for_signal(client_socket, b"ready")
self.assertIn(b"ready", response)
try:
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
stack_trace = get_async_stack_trace(p.pid)
# Check all tasks are present
tasks_names = [
@ -968,12 +945,7 @@ async def main():
response = _wait_for_signal(client_socket, b"ready")
self.assertIn(b"ready", response)
try:
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
stack_trace = get_async_stack_trace(p.pid)
# Check all tasks are present
tasks_names = [
@ -1143,12 +1115,7 @@ async def main():
except RuntimeError as e:
self.fail(str(e))
try:
all_awaited_by = get_all_awaited_by(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
all_awaited_by = get_all_awaited_by(p.pid)
# Expected: a list of two elements: 1 thread, 1 interp
self.assertEqual(len(all_awaited_by), 2)
@ -1442,12 +1409,7 @@ def run_subinterp():
server_socket.close()
server_socket = None
try:
stack_trace = get_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
stack_trace = get_stack_trace(p.pid)
# Verify we have at least one interpreter
self.assertGreaterEqual(len(stack_trace), 1)
@ -1637,12 +1599,7 @@ def run_subinterp2():
server_socket.close()
server_socket = None
try:
stack_trace = get_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
stack_trace = get_stack_trace(p.pid)
# Verify we have multiple interpreters
self.assertGreaterEqual(len(stack_trace), 2)
@ -1745,33 +1702,28 @@ def main_work():
# Wait for ready and working signals
_wait_for_signal(client_socket, [b"ready", b"working"])
try:
# Get stack trace with all threads
unwinder_all = RemoteUnwinder(p.pid, all_threads=True)
for _ in range(MAX_TRIES):
all_traces = unwinder_all.get_stack_trace()
found = self._find_frame_in_trace(
all_traces,
lambda f: f.funcname == "main_work"
and f.location.lineno > 12,
)
if found:
break
time.sleep(0.1)
else:
self.fail(
"Main thread did not start its busy work on time"
)
# Get stack trace with all threads
unwinder_all = RemoteUnwinder(p.pid, all_threads=True)
for _ in range(MAX_TRIES):
all_traces = unwinder_all.get_stack_trace()
found = self._find_frame_in_trace(
all_traces,
lambda f: f.funcname == "main_work"
and f.location.lineno > 12,
)
if found:
break
time.sleep(0.1)
else:
self.fail(
"Main thread did not start its busy work on time"
)
# Get stack trace with only GIL holder
unwinder_gil = RemoteUnwinder(
p.pid, only_active_thread=True
)
gil_traces = unwinder_gil.get_stack_trace()
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
# Get stack trace with only GIL holder
unwinder_gil = RemoteUnwinder(
p.pid, only_active_thread=True
)
gil_traces = unwinder_gil.get_stack_trace()
# Count threads
total_threads = sum(
@ -1952,6 +1904,7 @@ def test_unsupported_platform_error(self):
)
@requires_remote_subprocess_debugging()
class TestDetectionOfThreadStatus(RemoteInspectionTestBase):
def _run_thread_status_test(self, mode, check_condition):
"""
@ -2039,26 +1992,21 @@ def busy():
# Sample until we see expected thread states
statuses = {}
try:
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=mode,
skip_non_matching_threads=False,
)
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=mode,
skip_non_matching_threads=False,
)
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
if check_condition(
statuses, sleeper_tid, busy_tid
):
break
time.sleep(0.5)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
if check_condition(
statuses, sleeper_tid, busy_tid
):
break
time.sleep(0.5)
return statuses, sleeper_tid, busy_tid
finally:
@ -2196,40 +2144,35 @@ def busy_thread():
server_socket = None
statuses = {}
try:
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=PROFILING_MODE_ALL,
skip_non_matching_threads=False,
)
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=PROFILING_MODE_ALL,
skip_non_matching_threads=False,
)
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
# Check ALL mode provides both GIL and CPU info
if (
sleeper_tid in statuses
and busy_tid in statuses
and not (
statuses[sleeper_tid]
& THREAD_STATUS_ON_CPU
)
and not (
statuses[sleeper_tid]
& THREAD_STATUS_HAS_GIL
)
and (statuses[busy_tid] & THREAD_STATUS_ON_CPU)
and (
statuses[busy_tid] & THREAD_STATUS_HAS_GIL
)
):
break
time.sleep(0.5)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
# Check ALL mode provides both GIL and CPU info
if (
sleeper_tid in statuses
and busy_tid in statuses
and not (
statuses[sleeper_tid]
& THREAD_STATUS_ON_CPU
)
and not (
statuses[sleeper_tid]
& THREAD_STATUS_HAS_GIL
)
and (statuses[busy_tid] & THREAD_STATUS_ON_CPU)
and (
statuses[busy_tid] & THREAD_STATUS_HAS_GIL
)
):
break
time.sleep(0.5)
self.assertIsNotNone(
sleeper_tid, "Sleeper thread id not received"
@ -2347,27 +2290,24 @@ def test_thread_status_exception_detection(self):
self.assertIsNotNone(normal_tid, "Normal thread id not received")
statuses = {}
try:
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=PROFILING_MODE_ALL,
skip_non_matching_threads=False,
)
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=PROFILING_MODE_ALL,
skip_non_matching_threads=False,
)
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
if (
exception_tid in statuses
and normal_tid in statuses
and (statuses[exception_tid] & THREAD_STATUS_HAS_EXCEPTION)
and not (statuses[normal_tid] & THREAD_STATUS_HAS_EXCEPTION)
):
break
time.sleep(0.5)
except PermissionError:
self.skipTest("Insufficient permissions to read the stack trace")
if (
exception_tid in statuses
and normal_tid in statuses
and (statuses[exception_tid] & THREAD_STATUS_HAS_EXCEPTION)
and not (statuses[normal_tid] & THREAD_STATUS_HAS_EXCEPTION)
):
break
time.sleep(0.5)
self.assertIn(exception_tid, statuses)
self.assertIn(normal_tid, statuses)
@ -2393,30 +2333,28 @@ def test_thread_status_exception_mode_filtering(self):
self.assertIsNotNone(exception_tid, "Exception thread id not received")
self.assertIsNotNone(normal_tid, "Normal thread id not received")
try:
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=PROFILING_MODE_EXCEPTION,
skip_non_matching_threads=True,
)
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=PROFILING_MODE_EXCEPTION,
skip_non_matching_threads=True,
)
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
if exception_tid in statuses:
self.assertNotIn(
normal_tid,
statuses,
"Normal thread should be filtered out in exception mode",
)
return
time.sleep(0.5)
except PermissionError:
self.skipTest("Insufficient permissions to read the stack trace")
if exception_tid in statuses:
self.assertNotIn(
normal_tid,
statuses,
"Normal thread should be filtered out in exception mode",
)
return
time.sleep(0.5)
self.fail("Never found exception thread in exception mode")
@requires_remote_subprocess_debugging()
class TestExceptionDetectionScenarios(RemoteInspectionTestBase):
"""Test exception detection across all scenarios.
@ -2557,47 +2495,43 @@ def _run_scenario_process(self, scenario):
def _check_exception_status(self, p, thread_tid, expect_exception):
"""Helper to check if thread has expected exception status."""
try:
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=PROFILING_MODE_ALL,
skip_non_matching_threads=False,
unwinder = RemoteUnwinder(
p.pid,
all_threads=True,
mode=PROFILING_MODE_ALL,
skip_non_matching_threads=False,
)
# Collect multiple samples for reliability
results = []
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
if thread_tid in statuses:
has_exc = bool(statuses[thread_tid] & THREAD_STATUS_HAS_EXCEPTION)
results.append(has_exc)
if len(results) >= 3:
break
time.sleep(0.2)
# Check majority of samples match expected
if not results:
self.fail("Never found target thread in stack traces")
majority = sum(results) > len(results) // 2
if expect_exception:
self.assertTrue(
majority,
f"Thread should have HAS_EXCEPTION flag, got {results}"
)
else:
self.assertFalse(
majority,
f"Thread should NOT have HAS_EXCEPTION flag, got {results}"
)
# Collect multiple samples for reliability
results = []
for _ in range(MAX_TRIES):
traces = unwinder.get_stack_trace()
statuses = self._get_thread_statuses(traces)
if thread_tid in statuses:
has_exc = bool(statuses[thread_tid] & THREAD_STATUS_HAS_EXCEPTION)
results.append(has_exc)
if len(results) >= 3:
break
time.sleep(0.2)
# Check majority of samples match expected
if not results:
self.fail("Never found target thread in stack traces")
majority = sum(results) > len(results) // 2
if expect_exception:
self.assertTrue(
majority,
f"Thread should have HAS_EXCEPTION flag, got {results}"
)
else:
self.assertFalse(
majority,
f"Thread should NOT have HAS_EXCEPTION flag, got {results}"
)
except PermissionError:
self.skipTest("Insufficient permissions to read the stack trace")
@unittest.skipIf(
sys.platform not in ("linux", "darwin", "win32"),
@ -2669,6 +2603,7 @@ def test_finally_no_exception_no_flag(self):
self._check_exception_status(p, thread_tid, expect_exception=False)
@requires_remote_subprocess_debugging()
class TestFrameCaching(RemoteInspectionTestBase):
"""Test that frame caching produces correct results.
@ -2707,11 +2642,6 @@ def make_unwinder(cache_frames=True):
)
yield p, client_socket, make_unwinder
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
finally:
_cleanup_sockets(client_socket, server_socket)

View file

@ -20,15 +20,14 @@
SHORT_TIMEOUT,
SuppressCrashReport,
os_helper,
requires_subprocess,
requires_remote_subprocess_debugging,
script_helper,
)
from .helpers import close_and_unlink, skip_if_not_supported, test_subprocess
from .helpers import close_and_unlink, test_subprocess
@requires_subprocess()
@skip_if_not_supported
@requires_remote_subprocess_debugging()
class TestGCFrameTracking(unittest.TestCase):
"""Tests for GC frame tracking in the sampling profiler."""
@ -62,19 +61,16 @@ def test_gc_frames_enabled(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
native=False,
gc=True,
)
collector.print_stats(show_summary=False)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
native=False,
gc=True,
)
collector.print_stats(show_summary=False)
output = captured_output.getvalue()
@ -92,19 +88,16 @@ def test_gc_frames_disabled(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
native=False,
gc=False,
)
collector.print_stats(show_summary=False)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
native=False,
gc=False,
)
collector.print_stats(show_summary=False)
output = captured_output.getvalue()
@ -116,8 +109,7 @@ def test_gc_frames_disabled(self):
self.assertNotIn("<GC>", output)
@requires_subprocess()
@skip_if_not_supported
@requires_remote_subprocess_debugging()
class TestNativeFrameTracking(unittest.TestCase):
"""Tests for native frame tracking in the sampling profiler."""
@ -148,20 +140,15 @@ def test_native_frames_enabled(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
from profiling.sampling.stack_collector import CollapsedStackCollector
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
native=True,
)
collector.export(collapsed_file.name)
except PermissionError:
self.skipTest(
"Insufficient permissions for remote profiling"
)
from profiling.sampling.stack_collector import CollapsedStackCollector
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
native=True,
)
collector.export(collapsed_file.name)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(collapsed_file.name))
@ -189,24 +176,20 @@ def test_native_frames_disabled(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
)
collector.print_stats(show_summary=False)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
)
collector.print_stats(show_summary=False)
output = captured_output.getvalue()
# Native frames should NOT be present:
self.assertNotIn("<native>", output)
@requires_subprocess()
@skip_if_not_supported
@requires_remote_subprocess_debugging()
class TestProcessPoolExecutorSupport(unittest.TestCase):
"""
Test that ProcessPoolExecutor works correctly with profiling.sampling.
@ -251,8 +234,5 @@ def worker(x):
proc.kill()
stdout, stderr = proc.communicate()
if "Permission Error" in stderr:
self.skipTest("Insufficient permissions for remote profiling")
self.assertIn("Results: [2, 4, 6]", stdout)
self.assertNotIn("Can't pickle", stderr)

File diff suppressed because it is too large Load diff

View file

@ -13,7 +13,7 @@
"Test only runs when _remote_debugging is available"
)
from test.support import is_emscripten, requires_subprocess
from test.support import is_emscripten, requires_remote_subprocess_debugging
from profiling.sampling.cli import main
@ -64,7 +64,7 @@ def _verify_coordinator_command(self, mock_popen, expected_target_args):
self.assertEqual(coordinator_cmd[5:], expected_target_args)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
@requires_subprocess()
@requires_remote_subprocess_debugging()
def test_cli_module_argument_parsing(self):
test_args = ["profiling.sampling.cli", "run", "-m", "mymodule"]
@ -84,7 +84,7 @@ def test_cli_module_argument_parsing(self):
mock_sample.assert_called_once()
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
@requires_subprocess()
@requires_remote_subprocess_debugging()
def test_cli_module_with_arguments(self):
test_args = [
"profiling.sampling.cli",
@ -226,7 +226,7 @@ def test_cli_no_target_specified(self):
self.assertIn("invalid choice", error_msg)
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
@requires_subprocess()
@requires_remote_subprocess_debugging()
def test_cli_module_with_profiler_options(self):
test_args = [
"profiling.sampling.cli",
@ -307,7 +307,7 @@ def test_cli_empty_module_name(self):
self.assertIn("required: target", error_msg) # argparse error for missing positional arg
@unittest.skipIf(is_emscripten, "socket.SO_REUSEADDR does not exist")
@requires_subprocess()
@requires_remote_subprocess_debugging()
def test_cli_long_module_option(self):
test_args = [
"profiling.sampling.cli",

View file

@ -24,15 +24,13 @@
)
from test.support import (
requires_subprocess,
requires_remote_subprocess_debugging,
SHORT_TIMEOUT,
)
from .helpers import (
test_subprocess,
close_and_unlink,
skip_if_not_supported,
PROCESS_VM_READV_SUPPORTED,
)
from .mocks import MockFrameInfo, MockThreadInfo, MockInterpreterInfo
@ -43,11 +41,7 @@
PROFILING_DURATION_SEC = 2
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
@requires_remote_subprocess_debugging()
class TestRecursiveFunctionProfiling(unittest.TestCase):
"""Test profiling of recursive functions and complex call patterns."""
@ -385,8 +379,7 @@ def do_work():
'''
@requires_subprocess()
@skip_if_not_supported
@requires_remote_subprocess_debugging()
class TestSampleProfilerIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
@ -410,16 +403,13 @@ def test_sampling_basic_functionality(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
)
collector.print_stats(show_summary=False)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
)
collector.print_stats(show_summary=False)
output = captured_output.getvalue()
@ -443,18 +433,13 @@ def test_sampling_with_pstats_export(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
)
collector.export(pstats_out.name)
except PermissionError:
self.skipTest(
"Insufficient permissions for remote profiling"
)
collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
)
collector.export(pstats_out.name)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(pstats_out.name))
@ -489,18 +474,13 @@ def test_sampling_with_collapsed_export(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
)
collector.export(collapsed_file.name)
except PermissionError:
self.skipTest(
"Insufficient permissions for remote profiling"
)
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
)
collector.export(collapsed_file.name)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(collapsed_file.name))
@ -537,17 +517,14 @@ def test_sampling_all_threads(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
all_threads=True,
)
collector.print_stats(show_summary=False)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
collector = PstatsCollector(sample_interval_usec=10000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
all_threads=True,
)
collector.print_stats(show_summary=False)
# Just verify that sampling completed without error
# We're not testing output format here
@ -570,11 +547,8 @@ def test_sample_target_script(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
from profiling.sampling.cli import main
main()
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
from profiling.sampling.cli import main
main()
output = captured_output.getvalue()
@ -611,11 +585,8 @@ def test_sample_target_module(self):
# Change to temp directory so subprocess can find the module
contextlib.chdir(tempdir.name),
):
try:
from profiling.sampling.cli import main
main()
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
from profiling.sampling.cli import main
main()
output = captured_output.getvalue()
@ -628,11 +599,7 @@ def test_sample_target_module(self):
self.assertIn("slow_fibonacci", output)
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
@requires_remote_subprocess_debugging()
class TestSampleProfilerErrorHandling(unittest.TestCase):
def test_invalid_pid(self):
with self.assertRaises((OSError, RuntimeError)):
@ -649,17 +616,12 @@ def test_process_dies_during_sampling(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=50000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2, # Longer than process lifetime
)
except PermissionError:
self.skipTest(
"Insufficient permissions for remote profiling"
)
collector = PstatsCollector(sample_interval_usec=50000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2, # Longer than process lifetime
)
output = captured_output.getvalue()
@ -671,16 +633,11 @@ def test_is_process_running(self):
"import time; time.sleep(1000)",
wait_for_working=False
) as subproc:
try:
profiler = SampleProfiler(
pid=subproc.process.pid,
sample_interval_usec=1000,
all_threads=False,
)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
profiler = SampleProfiler(
pid=subproc.process.pid,
sample_interval_usec=1000,
all_threads=False,
)
self.assertTrue(profiler._is_process_running())
self.assertIsNotNone(profiler.unwinder.get_stack_trace())
subproc.process.kill()
@ -702,14 +659,9 @@ def test_esrch_signal_handling(self):
"import time; time.sleep(1000)",
wait_for_working=False
) as subproc:
try:
unwinder = _remote_debugging.RemoteUnwinder(
subproc.process.pid
)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
unwinder = _remote_debugging.RemoteUnwinder(
subproc.process.pid
)
initial_trace = unwinder.get_stack_trace()
self.assertIsNotNone(initial_trace)
@ -744,8 +696,6 @@ def test_script_error_treatment(self):
)
output = result.stdout + result.stderr
if "PermissionError" in output:
self.skipTest("Insufficient permissions for remote profiling")
self.assertNotIn("Script file not found", output)
self.assertIn(
"No such file or directory: 'nonexistent_file.txt'", output
@ -802,12 +752,7 @@ def test_live_incompatible_with_pstats_default_values(self):
self.assertNotEqual(cm.exception.code, 0)
@requires_subprocess()
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
@requires_remote_subprocess_debugging()
class TestAsyncAwareProfilingIntegration(unittest.TestCase):
"""Integration tests for async-aware profiling mode."""
@ -849,16 +794,13 @@ def _collect_async_samples(self, async_aware_mode):
Returns a dict mapping function names to their sample counts.
"""
with test_subprocess(self.async_script, wait_for_working=True) as subproc:
try:
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
async_aware=async_aware_mode,
)
except PermissionError:
self.skipTest("Insufficient permissions for remote profiling")
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
async_aware=async_aware_mode,
)
# Count samples per function from collapsed stacks
# stack_counter keys are (call_tree, thread_id) where call_tree

View file

@ -14,12 +14,13 @@
"Test only runs when _remote_debugging is available"
)
from test.support import requires_subprocess
from test.support import requires_remote_subprocess_debugging
from .helpers import test_subprocess
from .mocks import MockFrameInfo, MockInterpreterInfo
@requires_remote_subprocess_debugging()
class TestCpuModeFiltering(unittest.TestCase):
"""Test CPU mode filtering functionality (--mode=cpu)."""
@ -124,7 +125,6 @@ def __init__(self, thread_id, frame_info, status):
idle_key, collector_no_skip.result
) # Idle thread should be included
@requires_subprocess()
def test_cpu_mode_integration_filtering(self):
"""Integration test: CPU mode should only capture active threads, not idle ones."""
# Script with one mostly-idle thread and one CPU-active thread
@ -158,20 +158,15 @@ def cpu_active_worker():
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=1, # CPU mode
all_threads=True,
)
collector.print_stats(show_summary=False, mode=1)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=1, # CPU mode
all_threads=True,
)
collector.print_stats(show_summary=False, mode=1)
cpu_mode_output = captured_output.getvalue()
@ -180,20 +175,15 @@ def cpu_active_worker():
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=0, # Wall-clock mode
all_threads=True,
)
collector.print_stats(show_summary=False)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=0, # Wall-clock mode
all_threads=True,
)
collector.print_stats(show_summary=False)
wall_mode_output = captured_output.getvalue()
@ -244,6 +234,7 @@ def test_cpu_mode_with_no_samples(self):
self.assertIn("CPU mode", output)
@requires_remote_subprocess_debugging()
class TestGilModeFiltering(unittest.TestCase):
"""Test GIL mode filtering functionality (--mode=gil)."""
@ -335,7 +326,6 @@ def test_gil_mode_cli_argument_parsing(self):
self.assertEqual(call_args.kwargs.get("mode"), 2) # GIL mode
self.assertEqual(call_args.kwargs.get("duration_sec"), 5)
@requires_subprocess()
def test_gil_mode_integration_behavior(self):
"""Integration test: GIL mode should capture GIL-holding threads."""
# Create a test script with GIL-releasing operations
@ -369,20 +359,15 @@ def gil_holding_work():
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=2, # GIL mode
all_threads=True,
)
collector.print_stats(show_summary=False)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=2, # GIL mode
all_threads=True,
)
collector.print_stats(show_summary=False)
gil_mode_output = captured_output.getvalue()
@ -391,20 +376,15 @@ def gil_holding_work():
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=0.5,
mode=0, # Wall-clock mode
all_threads=True,
)
collector.print_stats(show_summary=False)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=0.5,
mode=0, # Wall-clock mode
all_threads=True,
)
collector.print_stats(show_summary=False)
wall_mode_output = captured_output.getvalue()
@ -434,6 +414,7 @@ def test_parse_mode_function(self):
_parse_mode("invalid")
@requires_remote_subprocess_debugging()
class TestExceptionModeFiltering(unittest.TestCase):
"""Test exception mode filtering functionality (--mode=exception)."""
@ -530,7 +511,6 @@ def test_exception_mode_constants_are_defined(self):
from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
self.assertEqual(PROFILING_MODE_EXCEPTION, 4)
@requires_subprocess()
def test_exception_mode_integration_filtering(self):
"""Integration test: Exception mode should only capture threads with active exceptions."""
# Script with one thread handling an exception and one normal thread
@ -570,20 +550,15 @@ def exception_handling_worker():
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=4, # Exception mode
all_threads=True,
)
collector.print_stats(show_summary=False, mode=4)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=True)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=4, # Exception mode
all_threads=True,
)
collector.print_stats(show_summary=False, mode=4)
exception_mode_output = captured_output.getvalue()
@ -592,20 +567,15 @@ def exception_handling_worker():
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=0, # Wall-clock mode
all_threads=True,
)
collector.print_stats(show_summary=False)
except (PermissionError, RuntimeError) as e:
self.skipTest(
"Insufficient permissions for remote profiling"
)
collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=2.0,
mode=0, # Wall-clock mode
all_threads=True,
)
collector.print_stats(show_summary=False)
wall_mode_output = captured_output.getvalue()

View file

@ -0,0 +1,6 @@
Add ``--subprocesses`` flag to :mod:`profiling.sampling` CLI to automatically
profile subprocesses spawned by the target. When enabled, the profiler
monitors for new Python subprocesses and profiles each one separately,
writing results to individual output files. This is useful for profiling
applications that use :mod:`multiprocessing`, :class:`~concurrent.futures.ProcessPoolExecutor`,
or other subprocess-based parallelism. Patch by Pablo Galindo.

View file

@ -41,7 +41,7 @@
@MODULE__PICKLE_TRUE@_pickle _pickle.c
@MODULE__QUEUE_TRUE@_queue _queuemodule.c
@MODULE__RANDOM_TRUE@_random _randommodule.c
@MODULE__REMOTE_DEBUGGING_TRUE@_remote_debugging _remote_debugging/module.c _remote_debugging/object_reading.c _remote_debugging/code_objects.c _remote_debugging/frames.c _remote_debugging/frame_cache.c _remote_debugging/threads.c _remote_debugging/asyncio.c
@MODULE__REMOTE_DEBUGGING_TRUE@_remote_debugging _remote_debugging/module.c _remote_debugging/object_reading.c _remote_debugging/code_objects.c _remote_debugging/frames.c _remote_debugging/frame_cache.c _remote_debugging/threads.c _remote_debugging/asyncio.c _remote_debugging/subprocess.c
@MODULE__STRUCT_TRUE@_struct _struct.c
# build supports subinterpreters

View file

@ -8,23 +8,24 @@
#ifndef Py_REMOTE_DEBUGGING_H
#define Py_REMOTE_DEBUGGING_H
/* _GNU_SOURCE must be defined before any system headers */
#define _GNU_SOURCE
#ifdef __cplusplus
extern "C" {
#endif
#define _GNU_SOURCE
#ifndef Py_BUILD_CORE_BUILTIN
# define Py_BUILD_CORE_MODULE 1
#endif
#include "Python.h"
#include <internal/pycore_debug_offsets.h> // _Py_DebugOffsets
#include <internal/pycore_frame.h> // FRAME_SUSPENDED_YIELD_FROM
#include <internal/pycore_interpframe.h> // FRAME_OWNED_BY_INTERPRETER
#include <internal/pycore_llist.h> // struct llist_node
#include <internal/pycore_long.h> // _PyLong_GetZero
#include <internal/pycore_stackref.h> // Py_TAG_BITS
#include "internal/pycore_debug_offsets.h" // _Py_DebugOffsets
#include "internal/pycore_frame.h" // FRAME_SUSPENDED_YIELD_FROM
#include "internal/pycore_interpframe.h" // FRAME_OWNED_BY_INTERPRETER
#include "internal/pycore_llist.h" // struct llist_node
#include "internal/pycore_long.h" // _PyLong_GetZero
#include "internal/pycore_stackref.h" // Py_TAG_BITS
#include "../../Python/remote_debug.h"
#include <assert.h>
@ -40,10 +41,17 @@ extern "C" {
# define HAVE_PROCESS_VM_READV 0
#endif
#if defined(__APPLE__) && TARGET_OS_OSX
#include <libproc.h>
#include <sys/types.h>
#define MAX_NATIVE_THREADS 4096
#if defined(__APPLE__)
#include <TargetConditionals.h>
# if !defined(TARGET_OS_OSX)
/* Older macOS SDKs do not define TARGET_OS_OSX */
# define TARGET_OS_OSX 1
# endif
# if TARGET_OS_OSX
# include <libproc.h>
# include <sys/types.h>
# define MAX_NATIVE_THREADS 4096
# endif
#endif
#ifdef MS_WINDOWS
@ -581,6 +589,16 @@ extern int process_thread_for_async_stack_trace(
void *context
);
/* ============================================================================
* SUBPROCESS ENUMERATION FUNCTION DECLARATIONS
* ============================================================================ */
/* Get all child PIDs of a process.
* Returns a new Python list of PIDs, or NULL on error with exception set.
* If recursive is true, includes all descendants (children, grandchildren, etc.)
*/
extern PyObject *enumerate_child_pids(pid_t target_pid, int recursive);
#ifdef __cplusplus
}
#endif

View file

@ -433,4 +433,153 @@ _remote_debugging_RemoteUnwinder_get_stats(PyObject *self, PyObject *Py_UNUSED(i
return return_value;
}
/*[clinic end generated code: output=1943fb7a56197e39 input=a9049054013a1b77]*/
PyDoc_STRVAR(_remote_debugging_get_child_pids__doc__,
"get_child_pids($module, /, pid, *, recursive=True)\n"
"--\n"
"\n"
"Get all child process IDs of the given process.\n"
"\n"
" pid\n"
" Process ID of the parent process\n"
" recursive\n"
" If True, return all descendants (children, grandchildren, etc.).\n"
" If False, return only direct children.\n"
"\n"
"Returns a list of child process IDs. Returns an empty list if no children\n"
"are found.\n"
"\n"
"This function provides a snapshot of child processes at a moment in time.\n"
"Child processes may exit or new ones may be created after the list is returned.\n"
"\n"
"Raises:\n"
" OSError: If unable to enumerate processes\n"
" NotImplementedError: If not supported on this platform");
#define _REMOTE_DEBUGGING_GET_CHILD_PIDS_METHODDEF \
{"get_child_pids", _PyCFunction_CAST(_remote_debugging_get_child_pids), METH_FASTCALL|METH_KEYWORDS, _remote_debugging_get_child_pids__doc__},
static PyObject *
_remote_debugging_get_child_pids_impl(PyObject *module, int pid,
int recursive);
static PyObject *
_remote_debugging_get_child_pids(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
#define NUM_KEYWORDS 2
static struct {
PyGC_Head _this_is_not_used;
PyObject_VAR_HEAD
Py_hash_t ob_hash;
PyObject *ob_item[NUM_KEYWORDS];
} _kwtuple = {
.ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
.ob_hash = -1,
.ob_item = { &_Py_ID(pid), &_Py_ID(recursive), },
};
#undef NUM_KEYWORDS
#define KWTUPLE (&_kwtuple.ob_base.ob_base)
#else // !Py_BUILD_CORE
# define KWTUPLE NULL
#endif // !Py_BUILD_CORE
static const char * const _keywords[] = {"pid", "recursive", NULL};
static _PyArg_Parser _parser = {
.keywords = _keywords,
.fname = "get_child_pids",
.kwtuple = KWTUPLE,
};
#undef KWTUPLE
PyObject *argsbuf[2];
Py_ssize_t noptargs = nargs + (kwnames ? PyTuple_GET_SIZE(kwnames) : 0) - 1;
int pid;
int recursive = 1;
args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser,
/*minpos*/ 1, /*maxpos*/ 1, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
if (!args) {
goto exit;
}
pid = PyLong_AsInt(args[0]);
if (pid == -1 && PyErr_Occurred()) {
goto exit;
}
if (!noptargs) {
goto skip_optional_kwonly;
}
recursive = PyObject_IsTrue(args[1]);
if (recursive < 0) {
goto exit;
}
skip_optional_kwonly:
return_value = _remote_debugging_get_child_pids_impl(module, pid, recursive);
exit:
return return_value;
}
PyDoc_STRVAR(_remote_debugging_is_python_process__doc__,
"is_python_process($module, /, pid)\n"
"--\n"
"\n"
"Check if a process is a Python process.");
#define _REMOTE_DEBUGGING_IS_PYTHON_PROCESS_METHODDEF \
{"is_python_process", _PyCFunction_CAST(_remote_debugging_is_python_process), METH_FASTCALL|METH_KEYWORDS, _remote_debugging_is_python_process__doc__},
static PyObject *
_remote_debugging_is_python_process_impl(PyObject *module, int pid);
static PyObject *
_remote_debugging_is_python_process(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
{
PyObject *return_value = NULL;
#if defined(Py_BUILD_CORE) && !defined(Py_BUILD_CORE_MODULE)
#define NUM_KEYWORDS 1
static struct {
PyGC_Head _this_is_not_used;
PyObject_VAR_HEAD
Py_hash_t ob_hash;
PyObject *ob_item[NUM_KEYWORDS];
} _kwtuple = {
.ob_base = PyVarObject_HEAD_INIT(&PyTuple_Type, NUM_KEYWORDS)
.ob_hash = -1,
.ob_item = { &_Py_ID(pid), },
};
#undef NUM_KEYWORDS
#define KWTUPLE (&_kwtuple.ob_base.ob_base)
#else // !Py_BUILD_CORE
# define KWTUPLE NULL
#endif // !Py_BUILD_CORE
static const char * const _keywords[] = {"pid", NULL};
static _PyArg_Parser _parser = {
.keywords = _keywords,
.fname = "is_python_process",
.kwtuple = KWTUPLE,
};
#undef KWTUPLE
PyObject *argsbuf[1];
int pid;
args = _PyArg_UnpackKeywords(args, nargs, NULL, kwnames, &_parser,
/*minpos*/ 1, /*maxpos*/ 1, /*minkw*/ 0, /*varpos*/ 0, argsbuf);
if (!args) {
goto exit;
}
pid = PyLong_AsInt(args[0]);
if (pid == -1 && PyErr_Occurred()) {
goto exit;
}
return_value = _remote_debugging_is_python_process_impl(module, pid);
exit:
return return_value;
}
/*[clinic end generated code: output=dc0550ad3d6a409c input=a9049054013a1b77]*/

View file

@ -350,7 +350,7 @@ _remote_debugging_RemoteUnwinder___init___impl(RemoteUnwinderObject *self,
}
// Validate that the debug offsets are valid
if(validate_debug_offsets(&self->debug_offsets) == -1) {
if (validate_debug_offsets(&self->debug_offsets) == -1) {
set_exception_cause(self, PyExc_RuntimeError, "Invalid debug offsets found");
return -1;
}
@ -933,7 +933,7 @@ RemoteUnwinder_dealloc(PyObject *op)
_Py_hashtable_destroy(self->code_object_cache);
}
#ifdef MS_WINDOWS
if(self->win_process_buffer != NULL) {
if (self->win_process_buffer != NULL) {
PyMem_Free(self->win_process_buffer);
}
#endif
@ -1122,7 +1122,74 @@ static PyModuleDef_Slot remote_debugging_slots[] = {
{0, NULL},
};
/* ============================================================================
* MODULE-LEVEL FUNCTIONS
* ============================================================================ */
/*[clinic input]
_remote_debugging.get_child_pids
pid: int
Process ID of the parent process
*
recursive: bool = True
If True, return all descendants (children, grandchildren, etc.).
If False, return only direct children.
Get all child process IDs of the given process.
Returns a list of child process IDs. Returns an empty list if no children
are found.
This function provides a snapshot of child processes at a moment in time.
Child processes may exit or new ones may be created after the list is returned.
Raises:
OSError: If unable to enumerate processes
NotImplementedError: If not supported on this platform
[clinic start generated code]*/
static PyObject *
_remote_debugging_get_child_pids_impl(PyObject *module, int pid,
int recursive)
/*[clinic end generated code: output=1ae2289c6b953e4b input=3395cbe7f17066c9]*/
{
return enumerate_child_pids((pid_t)pid, recursive);
}
/*[clinic input]
_remote_debugging.is_python_process
pid: int
Check if a process is a Python process.
[clinic start generated code]*/
static PyObject *
_remote_debugging_is_python_process_impl(PyObject *module, int pid)
/*[clinic end generated code: output=22947dc8afcac362 input=13488e28c7295d84]*/
{
proc_handle_t handle;
if (_Py_RemoteDebug_InitProcHandle(&handle, pid) < 0) {
PyErr_Clear();
Py_RETURN_FALSE;
}
uintptr_t runtime_start_address = _Py_RemoteDebug_GetPyRuntimeAddress(&handle);
_Py_RemoteDebug_CleanupProcHandle(&handle);
if (runtime_start_address == 0) {
PyErr_Clear();
Py_RETURN_FALSE;
}
Py_RETURN_TRUE;
}
static PyMethodDef remote_debugging_methods[] = {
_REMOTE_DEBUGGING_GET_CHILD_PIDS_METHODDEF
_REMOTE_DEBUGGING_IS_PYTHON_PROCESS_METHODDEF
{NULL, NULL, 0, NULL},
};

View file

@ -0,0 +1,459 @@
/******************************************************************************
* Remote Debugging Module - Subprocess Enumeration
*
* This file contains platform-specific functions for enumerating child
* processes of a given PID.
******************************************************************************/
#include "_remote_debugging.h"
#ifndef MS_WINDOWS
#include <unistd.h>
#include <dirent.h>
#endif
#ifdef MS_WINDOWS
#include <tlhelp32.h>
#endif
/* ============================================================================
* INTERNAL DATA STRUCTURES
* ============================================================================ */
/* Simple dynamic array for collecting PIDs */
typedef struct {
pid_t *pids;
size_t count;
size_t capacity;
} pid_array_t;
static int
pid_array_init(pid_array_t *arr)
{
arr->capacity = 64;
arr->count = 0;
arr->pids = (pid_t *)PyMem_Malloc(arr->capacity * sizeof(pid_t));
if (arr->pids == NULL) {
PyErr_NoMemory();
return -1;
}
return 0;
}
static void
pid_array_cleanup(pid_array_t *arr)
{
if (arr->pids != NULL) {
PyMem_Free(arr->pids);
arr->pids = NULL;
}
arr->count = 0;
arr->capacity = 0;
}
static int
pid_array_append(pid_array_t *arr, pid_t pid)
{
if (arr->count >= arr->capacity) {
/* Check for overflow before multiplication */
if (arr->capacity > SIZE_MAX / 2) {
PyErr_SetString(PyExc_OverflowError, "PID array capacity overflow");
return -1;
}
size_t new_capacity = arr->capacity * 2;
/* Check allocation size won't overflow */
if (new_capacity > SIZE_MAX / sizeof(pid_t)) {
PyErr_SetString(PyExc_OverflowError, "PID array size overflow");
return -1;
}
pid_t *new_pids = (pid_t *)PyMem_Realloc(arr->pids, new_capacity * sizeof(pid_t));
if (new_pids == NULL) {
PyErr_NoMemory();
return -1;
}
arr->pids = new_pids;
arr->capacity = new_capacity;
}
arr->pids[arr->count++] = pid;
return 0;
}
static int
pid_array_contains(pid_array_t *arr, pid_t pid)
{
for (size_t i = 0; i < arr->count; i++) {
if (arr->pids[i] == pid) {
return 1;
}
}
return 0;
}
/* ============================================================================
* SHARED BFS HELPER
* ============================================================================ */
/* Find child PIDs using BFS traversal of the pid->ppid mapping.
* all_pids and ppids must have the same count (parallel arrays).
* Returns 0 on success, -1 on error. */
static int
find_children_bfs(pid_t target_pid, int recursive,
pid_t *all_pids, pid_t *ppids, size_t pid_count,
pid_array_t *result)
{
int retval = -1;
pid_array_t to_process = {0};
if (pid_array_init(&to_process) < 0) {
goto done;
}
if (pid_array_append(&to_process, target_pid) < 0) {
goto done;
}
size_t process_idx = 0;
while (process_idx < to_process.count) {
pid_t current_pid = to_process.pids[process_idx++];
for (size_t i = 0; i < pid_count; i++) {
if (ppids[i] != current_pid) {
continue;
}
pid_t child_pid = all_pids[i];
if (pid_array_contains(result, child_pid)) {
continue;
}
if (pid_array_append(result, child_pid) < 0) {
goto done;
}
if (recursive && pid_array_append(&to_process, child_pid) < 0) {
goto done;
}
}
if (!recursive) {
break;
}
}
retval = 0;
done:
pid_array_cleanup(&to_process);
return retval;
}
/* ============================================================================
* LINUX IMPLEMENTATION
* ============================================================================ */
#if defined(__linux__)
/* Parse /proc/{pid}/stat to get parent PID */
static pid_t
get_ppid_linux(pid_t pid)
{
char stat_path[64];
char buffer[2048];
snprintf(stat_path, sizeof(stat_path), "/proc/%d/stat", (int)pid);
int fd = open(stat_path, O_RDONLY);
if (fd == -1) {
return -1;
}
ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
close(fd);
if (n <= 0) {
return -1;
}
buffer[n] = '\0';
/* Find closing paren of comm field - stat format: pid (comm) state ppid ... */
char *p = strrchr(buffer, ')');
if (!p) {
return -1;
}
/* Skip ") " with bounds checking */
char *end = buffer + n;
p += 2;
if (p >= end) {
return -1;
}
if (*p == ' ') {
p++;
if (p >= end) {
return -1;
}
}
/* Parse: state ppid */
char state;
int ppid;
if (sscanf(p, "%c %d", &state, &ppid) != 2) {
return -1;
}
return (pid_t)ppid;
}
static int
get_child_pids_platform(pid_t target_pid, int recursive, pid_array_t *result)
{
int retval = -1;
pid_array_t all_pids = {0};
pid_array_t ppids = {0};
DIR *proc_dir = NULL;
if (pid_array_init(&all_pids) < 0) {
goto done;
}
if (pid_array_init(&ppids) < 0) {
goto done;
}
proc_dir = opendir("/proc");
if (!proc_dir) {
PyErr_SetFromErrnoWithFilename(PyExc_OSError, "/proc");
goto done;
}
/* Single pass: collect PIDs and their PPIDs together */
struct dirent *entry;
while ((entry = readdir(proc_dir)) != NULL) {
/* Skip non-numeric entries (also skips . and ..) */
if (entry->d_name[0] < '1' || entry->d_name[0] > '9') {
continue;
}
char *endptr;
long pid_long = strtol(entry->d_name, &endptr, 10);
if (*endptr != '\0' || pid_long <= 0) {
continue;
}
pid_t pid = (pid_t)pid_long;
pid_t ppid = get_ppid_linux(pid);
if (ppid < 0) {
continue;
}
if (pid_array_append(&all_pids, pid) < 0 ||
pid_array_append(&ppids, ppid) < 0) {
goto done;
}
}
closedir(proc_dir);
proc_dir = NULL;
if (find_children_bfs(target_pid, recursive,
all_pids.pids, ppids.pids, all_pids.count,
result) < 0) {
goto done;
}
retval = 0;
done:
if (proc_dir) {
closedir(proc_dir);
}
pid_array_cleanup(&all_pids);
pid_array_cleanup(&ppids);
return retval;
}
#endif /* __linux__ */
/* ============================================================================
* MACOS IMPLEMENTATION
* ============================================================================ */
#if defined(__APPLE__) && TARGET_OS_OSX
#include <sys/proc_info.h>
static int
get_child_pids_platform(pid_t target_pid, int recursive, pid_array_t *result)
{
int retval = -1;
pid_t *pid_list = NULL;
pid_t *ppids = NULL;
/* Get count of all PIDs */
int n_pids = proc_listallpids(NULL, 0);
if (n_pids <= 0) {
PyErr_SetString(PyExc_OSError, "Failed to get process count");
goto done;
}
/* Allocate buffer for PIDs (add some slack for new processes) */
int buffer_size = n_pids + 64;
pid_list = (pid_t *)PyMem_Malloc(buffer_size * sizeof(pid_t));
if (!pid_list) {
PyErr_NoMemory();
goto done;
}
/* Get actual PIDs */
int actual = proc_listallpids(pid_list, buffer_size * sizeof(pid_t));
if (actual <= 0) {
PyErr_SetString(PyExc_OSError, "Failed to list PIDs");
goto done;
}
/* Build pid -> ppid mapping */
ppids = (pid_t *)PyMem_Malloc(actual * sizeof(pid_t));
if (!ppids) {
PyErr_NoMemory();
goto done;
}
/* Get parent PIDs for each process */
int valid_count = 0;
for (int i = 0; i < actual; i++) {
struct proc_bsdinfo proc_info;
int ret = proc_pidinfo(pid_list[i], PROC_PIDTBSDINFO, 0,
&proc_info, sizeof(proc_info));
if (ret != sizeof(proc_info)) {
continue;
}
pid_list[valid_count] = pid_list[i];
ppids[valid_count] = proc_info.pbi_ppid;
valid_count++;
}
if (find_children_bfs(target_pid, recursive,
pid_list, ppids, valid_count,
result) < 0) {
goto done;
}
retval = 0;
done:
PyMem_Free(pid_list);
PyMem_Free(ppids);
return retval;
}
#endif /* __APPLE__ && TARGET_OS_OSX */
/* ============================================================================
* WINDOWS IMPLEMENTATION
* ============================================================================ */
#ifdef MS_WINDOWS
static int
get_child_pids_platform(pid_t target_pid, int recursive, pid_array_t *result)
{
int retval = -1;
pid_array_t all_pids = {0};
pid_array_t ppids = {0};
HANDLE snapshot = INVALID_HANDLE_VALUE;
snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
if (snapshot == INVALID_HANDLE_VALUE) {
PyErr_SetFromWindowsErr(0);
goto done;
}
if (pid_array_init(&all_pids) < 0) {
goto done;
}
if (pid_array_init(&ppids) < 0) {
goto done;
}
/* Single pass: collect PIDs and PPIDs together */
PROCESSENTRY32 pe;
pe.dwSize = sizeof(PROCESSENTRY32);
if (Process32First(snapshot, &pe)) {
do {
if (pid_array_append(&all_pids, (pid_t)pe.th32ProcessID) < 0 ||
pid_array_append(&ppids, (pid_t)pe.th32ParentProcessID) < 0) {
goto done;
}
} while (Process32Next(snapshot, &pe));
}
CloseHandle(snapshot);
snapshot = INVALID_HANDLE_VALUE;
if (find_children_bfs(target_pid, recursive,
all_pids.pids, ppids.pids, all_pids.count,
result) < 0) {
goto done;
}
retval = 0;
done:
if (snapshot != INVALID_HANDLE_VALUE) {
CloseHandle(snapshot);
}
pid_array_cleanup(&all_pids);
pid_array_cleanup(&ppids);
return retval;
}
#endif /* MS_WINDOWS */
/* ============================================================================
* UNSUPPORTED PLATFORM STUB
* ============================================================================ */
#if !defined(__linux__) && !(defined(__APPLE__) && TARGET_OS_OSX) && !defined(MS_WINDOWS)
static int
get_child_pids_platform(pid_t target_pid, int recursive, pid_array_t *result)
{
PyErr_SetString(PyExc_NotImplementedError,
"Subprocess enumeration not supported on this platform");
return -1;
}
#endif
/* ============================================================================
* PUBLIC API
* ============================================================================ */
PyObject *
enumerate_child_pids(pid_t target_pid, int recursive)
{
pid_array_t result;
if (pid_array_init(&result) < 0) {
return NULL;
}
if (get_child_pids_platform(target_pid, recursive, &result) < 0) {
pid_array_cleanup(&result);
return NULL;
}
/* Convert to Python list */
PyObject *list = PyList_New(result.count);
if (list == NULL) {
pid_array_cleanup(&result);
return NULL;
}
for (size_t i = 0; i < result.count; i++) {
PyObject *pid_obj = PyLong_FromLong((long)result.pids[i]);
if (pid_obj == NULL) {
Py_DECREF(list);
pid_array_cleanup(&result);
return NULL;
}
PyList_SET_ITEM(list, i, pid_obj);
}
pid_array_cleanup(&result);
return list;
}

View file

@ -105,6 +105,7 @@
<ClCompile Include="..\Modules\_remote_debugging\frame_cache.c" />
<ClCompile Include="..\Modules\_remote_debugging\threads.c" />
<ClCompile Include="..\Modules\_remote_debugging\asyncio.c" />
<ClCompile Include="..\Modules\_remote_debugging\subprocess.c" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\Modules\_remote_debugging\_remote_debugging.h" />

View file

@ -33,6 +33,9 @@
<ClCompile Include="..\Modules\_remote_debugging\asyncio.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\Modules\_remote_debugging\subprocess.c">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\Modules\_remote_debugging\_remote_debugging.h">