gh-138122: Add blocking mode for accurate stack traces in Tachyon (#142998)

This commit is contained in:
Pablo Galindo Salgado 2025-12-23 10:49:47 +00:00 committed by GitHub
parent f9704f1d84
commit 81c8eb85e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 855 additions and 77 deletions

View file

@ -342,6 +342,8 @@ The default configuration works well for most use cases:
- Disabled
* - Default for ``--subprocesses``
- Disabled
* - Default for ``--blocking``
- Disabled (non-blocking sampling)
Sampling interval and duration
@ -392,6 +394,50 @@ This option is particularly useful when investigating concurrency issues or
when work is distributed across a thread pool.
.. _blocking-mode:
Blocking mode
-------------
By default, Tachyon reads the target process's memory without stopping it.
This non-blocking approach is ideal for most profiling scenarios because it
imposes virtually zero overhead on the target application: the profiled
program runs at full speed and is unaware it is being observed.
However, non-blocking sampling can occasionally produce incomplete or
inconsistent stack traces in applications with many generators or coroutines
that rapidly switch between yield points, or in programs with very fast-changing
call stacks where functions enter and exit between the start and end of a single
stack read, resulting in reconstructed stacks that mix frames from different
execution states or that never actually existed.
For these cases, the :option:`--blocking` option stops the target process during
each sample::
python -m profiling.sampling run --blocking script.py
python -m profiling.sampling attach --blocking 12345
When blocking mode is enabled, the profiler suspends the target process,
reads its stack, then resumes it. This guarantees that each captured stack
represents a real, consistent snapshot of what the process was doing at that
instant. The trade-off is that the target process runs slower because it is
repeatedly paused.
.. warning::
Do not use very high sample rates (low ``--interval`` values) with blocking
mode. Suspending and resuming a process takes time, and if the sampling
interval is too short, the target will spend more time stopped than running.
For blocking mode, intervals of 1000 microseconds (1 millisecond) or higher
are recommended. The default 100 microsecond interval may cause noticeable
slowdown in the target application.
Use blocking mode only when you observe inconsistent stacks in your profiles,
particularly with generator-heavy or coroutine-heavy code. For most
applications, the default non-blocking mode provides accurate results with
zero impact on the target process.
Special frames
--------------
@ -1383,6 +1429,13 @@ Sampling options
Also profile subprocesses. Each subprocess gets its own profiler
instance and output file. Incompatible with ``--live``.
.. option:: --blocking
Pause the target process during each sample. This ensures consistent
stack traces at the cost of slowing down the target. Use with longer
intervals (1000 µs or higher) to minimize impact. See :ref:`blocking-mode`
for details.
Mode options
------------

View file

@ -347,6 +347,13 @@ def _add_sampling_options(parser):
action="store_true",
help="Also profile subprocesses. Each subprocess gets its own profiler and output file.",
)
sampling_group.add_argument(
"--blocking",
action="store_true",
help="Stop all threads in target process before sampling to get consistent snapshots. "
"Uses thread_suspend on macOS and ptrace on Linux. Adds overhead but ensures memory "
"reads are from a frozen state.",
)
def _add_mode_options(parser):
@ -585,6 +592,15 @@ def _validate_args(args, parser):
if getattr(args, 'command', None) == "replay":
return
# Warn about blocking mode with aggressive sampling intervals
if args.blocking and args.interval < 100:
print(
f"Warning: --blocking with a {args.interval} µs interval will stop all threads "
f"{1_000_000 // args.interval} times per second. "
"Consider using --interval 1000 or higher to reduce overhead.",
file=sys.stderr
)
# Check if live mode is available
if hasattr(args, 'live') and args.live and LiveStatsCollector is None:
parser.error(
@ -861,6 +877,7 @@ def _handle_attach(args):
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
blocking=args.blocking,
)
_handle_output(collector, args, args.pid, mode)
@ -939,6 +956,7 @@ def _handle_run(args):
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
blocking=args.blocking,
)
_handle_output(collector, args, process.pid, mode)
finally:
@ -984,6 +1002,7 @@ def _handle_live_attach(args, pid):
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
blocking=args.blocking,
)
@ -1031,6 +1050,7 @@ def _handle_live_run(args):
native=args.native,
gc=args.gc,
opcodes=args.opcodes,
blocking=args.blocking,
)
finally:
# Clean up the subprocess

View file

@ -1,4 +1,5 @@
import _remote_debugging
import contextlib
import os
import statistics
import sys
@ -7,7 +8,26 @@
from collections import deque
from _colorize import ANSIColors
from .pstats_collector import PstatsCollector
from .stack_collector import CollapsedStackCollector, FlamegraphCollector
from .heatmap_collector import HeatmapCollector
from .gecko_collector import GeckoCollector
from .binary_collector import BinaryCollector
@contextlib.contextmanager
def _pause_threads(unwinder, blocking):
"""Context manager to pause/resume threads around sampling if blocking is True."""
if blocking:
unwinder.pause_threads()
try:
yield
finally:
unwinder.resume_threads()
else:
yield
from .constants import (
PROFILING_MODE_WALL,
PROFILING_MODE_CPU,
@ -25,12 +45,13 @@
class SampleProfiler:
def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False):
def __init__(self, pid, sample_interval_usec, all_threads, *, mode=PROFILING_MODE_WALL, native=False, gc=True, opcodes=False, skip_non_matching_threads=True, collect_stats=False, blocking=False):
self.pid = pid
self.sample_interval_usec = sample_interval_usec
self.all_threads = all_threads
self.mode = mode # Store mode for later use
self.collect_stats = collect_stats
self.blocking = blocking
try:
self.unwinder = self._new_unwinder(native, gc, opcodes, skip_non_matching_threads)
except RuntimeError as err:
@ -60,12 +81,11 @@ def sample(self, collector, duration_sec=10, *, async_aware=False):
running_time = 0
num_samples = 0
errors = 0
interrupted = False
start_time = next_time = time.perf_counter()
last_sample_time = start_time
realtime_update_interval = 1.0 # Update every second
last_realtime_update = start_time
interrupted = False
try:
while running_time < duration_sec:
# Check if live collector wants to stop
@ -75,14 +95,15 @@ def sample(self, collector, duration_sec=10, *, async_aware=False):
current_time = time.perf_counter()
if next_time < current_time:
try:
if async_aware == "all":
stack_frames = self.unwinder.get_all_awaited_by()
elif async_aware == "running":
stack_frames = self.unwinder.get_async_stack_trace()
else:
stack_frames = self.unwinder.get_stack_trace()
collector.collect(stack_frames)
except ProcessLookupError:
with _pause_threads(self.unwinder, self.blocking):
if async_aware == "all":
stack_frames = self.unwinder.get_all_awaited_by()
elif async_aware == "running":
stack_frames = self.unwinder.get_async_stack_trace()
else:
stack_frames = self.unwinder.get_stack_trace()
collector.collect(stack_frames)
except ProcessLookupError as e:
duration_sec = current_time - start_time
break
except (RuntimeError, UnicodeDecodeError, MemoryError, OSError):
@ -350,6 +371,7 @@ def sample(
native=False,
gc=True,
opcodes=False,
blocking=False,
):
"""Sample a process using the provided collector.
@ -365,6 +387,7 @@ def sample(
native: Whether to include native frames
gc: Whether to include GC frames
opcodes: Whether to include opcode information
blocking: Whether to stop all threads before sampling for consistent snapshots
Returns:
The collector with collected samples
@ -390,6 +413,7 @@ def sample(
opcodes=opcodes,
skip_non_matching_threads=skip_non_matching_threads,
collect_stats=realtime_stats,
blocking=blocking,
)
profiler.realtime_stats = realtime_stats
@ -411,6 +435,7 @@ def sample_live(
native=False,
gc=True,
opcodes=False,
blocking=False,
):
"""Sample a process in live/interactive mode with curses TUI.
@ -426,6 +451,7 @@ def sample_live(
native: Whether to include native frames
gc: Whether to include GC frames
opcodes: Whether to include opcode information
blocking: Whether to stop all threads before sampling for consistent snapshots
Returns:
The collector with collected samples
@ -451,6 +477,7 @@ def sample_live(
opcodes=opcodes,
skip_non_matching_threads=skip_non_matching_threads,
collect_stats=realtime_stats,
blocking=blocking,
)
profiler.realtime_stats = realtime_stats

View file

@ -2931,24 +2931,24 @@ def top():
"Test only runs on Linux with process_vm_readv support",
)
def test_partial_stack_reuse(self):
"""Test that unchanged bottom frames are reused when top changes (A→B→C to A→B→D)."""
"""Test that unchanged parent frames are reused from cache when top frame moves."""
script_body = """\
def func_c():
sock.sendall(b"at_c")
def level4():
sock.sendall(b"sync1")
sock.recv(16)
sock.sendall(b"sync2")
sock.recv(16)
def func_d():
sock.sendall(b"at_d")
sock.recv(16)
def level3():
level4()
def func_b():
func_c()
func_d()
def level2():
level3()
def func_a():
func_b()
def level1():
level2()
func_a()
level1()
"""
with self._target_process(script_body) as (
@ -2958,55 +2958,51 @@ def func_a():
):
unwinder = make_unwinder(cache_frames=True)
# Sample at C: stack is A→B→C
frames_c = self._sample_frames(
# Sample 1: level4 at first sendall
frames1 = self._sample_frames(
client_socket,
unwinder,
b"at_c",
b"sync1",
b"ack",
{"func_a", "func_b", "func_c"},
{"level1", "level2", "level3", "level4"},
)
# Sample at D: stack is A→B→D (C returned, D called)
frames_d = self._sample_frames(
# Sample 2: level4 at second sendall (same stack, different line)
frames2 = self._sample_frames(
client_socket,
unwinder,
b"at_d",
b"sync2",
b"done",
{"func_a", "func_b", "func_d"},
{"level1", "level2", "level3", "level4"},
)
self.assertIsNotNone(frames_c)
self.assertIsNotNone(frames_d)
self.assertIsNotNone(frames1)
self.assertIsNotNone(frames2)
# Find func_a and func_b frames in both samples
def find_frame(frames, funcname):
for f in frames:
if f.funcname == funcname:
return f
return None
frame_a_in_c = find_frame(frames_c, "func_a")
frame_b_in_c = find_frame(frames_c, "func_b")
frame_a_in_d = find_frame(frames_d, "func_a")
frame_b_in_d = find_frame(frames_d, "func_b")
self.assertIsNotNone(frame_a_in_c)
self.assertIsNotNone(frame_b_in_c)
self.assertIsNotNone(frame_a_in_d)
self.assertIsNotNone(frame_b_in_d)
# The bottom frames (A, B) should be the SAME objects (cache reuse)
self.assertIs(
frame_a_in_c,
frame_a_in_d,
"func_a frame should be reused from cache",
)
self.assertIs(
frame_b_in_c,
frame_b_in_d,
"func_b frame should be reused from cache",
# level4 should have different line numbers (it moved)
l4_1 = find_frame(frames1, "level4")
l4_2 = find_frame(frames2, "level4")
self.assertIsNotNone(l4_1)
self.assertIsNotNone(l4_2)
self.assertNotEqual(
l4_1.location.lineno,
l4_2.location.lineno,
"level4 should be at different lines",
)
# Parent frames (level1, level2, level3) should be reused from cache
for name in ["level1", "level2", "level3"]:
f1 = find_frame(frames1, name)
f2 = find_frame(frames2, name)
self.assertIsNotNone(f1, f"{name} missing from sample 1")
self.assertIsNotNone(f2, f"{name} missing from sample 2")
self.assertIs(f1, f2, f"{name} should be reused from cache")
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,

View file

@ -0,0 +1,141 @@
"""Tests for blocking mode sampling profiler."""
import io
import textwrap
import unittest
from unittest import mock
try:
import _remote_debugging # noqa: F401
import profiling.sampling
import profiling.sampling.sample
from profiling.sampling.stack_collector import CollapsedStackCollector
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
)
from test.support import requires_remote_subprocess_debugging
from .helpers import test_subprocess
# Duration for profiling in tests
PROFILING_DURATION_SEC = 1
@requires_remote_subprocess_debugging()
class TestBlockingModeStackAccuracy(unittest.TestCase):
"""Test that blocking mode produces accurate stack traces.
When using blocking mode, the target process is stopped during sampling.
This ensures that we see accurate stack traces where functions appear
in the correct caller/callee relationship.
These tests verify that generator functions are correctly shown at the
top of the stack when they are actively executing, and not incorrectly
shown under their caller's code.
"""
@classmethod
def setUpClass(cls):
# Test script that uses a generator consumed in a loop.
# When consume_generator is on the arithmetic lines (temp1, temp2, etc.),
# fibonacci_generator should NOT be in the stack at all.
# Line numbers are important here - see ARITHMETIC_LINES below.
cls.generator_script = textwrap.dedent('''
def fibonacci_generator(n):
a, b = 0, 1
for _ in range(n):
yield a
a, b = b, a + b
def consume_generator():
gen = fibonacci_generator(10000)
for value in gen:
temp1 = value + 1
temp2 = value * 2
temp3 = value - 1
result = temp1 + temp2 + temp3
def main():
while True:
consume_generator()
_test_sock.sendall(b"working")
main()
''')
# Line numbers of the arithmetic operations in consume_generator.
# These are the lines where fibonacci_generator should NOT be in the stack.
# The socket injection code adds 7 lines before our script.
# temp1 = value + 1 -> line 17
# temp2 = value * 2 -> line 18
# temp3 = value - 1 -> line 19
# result = ... -> line 20
cls.ARITHMETIC_LINES = {17, 18, 19, 20}
def test_generator_not_under_consumer_arithmetic(self):
"""Test that fibonacci_generator doesn't appear when consume_generator does arithmetic.
When consume_generator is executing arithmetic lines (temp1, temp2, etc.),
fibonacci_generator should NOT be anywhere in the stack - it's not being
called at that point.
Valid stacks:
- consume_generator at 'for value in gen:' line WITH fibonacci_generator
at the top (generator is yielding)
- consume_generator at arithmetic lines WITHOUT fibonacci_generator
(we're just doing math, not calling the generator)
Invalid stacks (indicate torn/inconsistent reads):
- consume_generator at arithmetic lines WITH fibonacci_generator
anywhere in the stack
Note: call_tree is ordered from bottom (index 0) to top (index -1).
"""
with test_subprocess(self.generator_script, wait_for_working=True) as subproc:
collector = CollapsedStackCollector(sample_interval_usec=100, skip_idle=False)
with (
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=PROFILING_DURATION_SEC,
blocking=True,
)
# Analyze collected stacks
total_samples = 0
invalid_stacks = 0
arithmetic_samples = 0
for (call_tree, _thread_id), count in collector.stack_counter.items():
total_samples += count
if not call_tree:
continue
# Find consume_generator in the stack and check its line number
for i, (filename, lineno, funcname) in enumerate(call_tree):
if funcname == "consume_generator" and lineno in self.ARITHMETIC_LINES:
arithmetic_samples += count
# Check if fibonacci_generator appears anywhere in this stack
func_names = [frame[2] for frame in call_tree]
if "fibonacci_generator" in func_names:
invalid_stacks += count
break
self.assertGreater(total_samples, 10,
f"Expected at least 10 samples, got {total_samples}")
# We should have some samples on the arithmetic lines
self.assertGreater(arithmetic_samples, 0,
f"Expected some samples on arithmetic lines, got {arithmetic_samples}")
self.assertEqual(invalid_stacks, 0,
f"Found {invalid_stacks}/{arithmetic_samples} invalid stacks where "
f"fibonacci_generator appears in the stack when consume_generator "
f"is on an arithmetic line. This indicates torn/inconsistent stack "
f"traces are being captured.")

View file

@ -372,6 +372,8 @@ def test_subprocesses_incompatible_with_live(self):
limit=None,
no_summary=False,
opcodes=False,
blocking=False,
interval=1000,
)
parser = argparse.ArgumentParser()

View file

@ -0,0 +1,2 @@
Add blocking mode to Tachyon for accurate stack traces in applications with
many generators or fast-changing call stacks. Patch by Pablo Galindo.

View file

@ -58,9 +58,49 @@ extern "C" {
# endif
#endif
// Platforms that support pausing/resuming threads for accurate stack sampling
#if defined(MS_WINDOWS) || defined(__linux__) || (defined(__APPLE__) && TARGET_OS_OSX)
# define Py_REMOTE_DEBUG_SUPPORTS_BLOCKING 1
#endif
#ifdef MS_WINDOWS
#include <windows.h>
#include <winternl.h>
#endif
#if defined(__APPLE__) && TARGET_OS_OSX
typedef struct {
mach_port_t task;
int suspended;
} _Py_RemoteDebug_ThreadsState;
#elif defined(__linux__)
typedef struct {
pid_t *tids; // Points to unwinder's reusable buffer
size_t count; // Number of threads currently seized
} _Py_RemoteDebug_ThreadsState;
#elif defined(MS_WINDOWS)
typedef NTSTATUS (NTAPI *NtSuspendProcessFunc)(HANDLE ProcessHandle);
typedef NTSTATUS (NTAPI *NtResumeProcessFunc)(HANDLE ProcessHandle);
typedef struct {
HANDLE hProcess;
int suspended;
} _Py_RemoteDebug_ThreadsState;
#else
typedef struct {
int dummy;
} _Py_RemoteDebug_ThreadsState;
#endif
#ifdef MS_WINDOWS
#define STATUS_SUCCESS ((NTSTATUS)0x00000000L)
#define STATUS_INFO_LENGTH_MISMATCH ((NTSTATUS)0xC0000004L)
typedef enum _WIN32_THREADSTATE {
@ -262,6 +302,15 @@ typedef struct {
#ifdef MS_WINDOWS
PVOID win_process_buffer;
ULONG win_process_buffer_size;
#endif
// Thread stopping state (only on platforms that support it)
#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
_Py_RemoteDebug_ThreadsState threads_state;
int threads_stopped; // 1 if threads are currently stopped
#endif
#ifdef __linux__
pid_t *thread_tids; // Reusable buffer for thread IDs
size_t thread_tids_capacity; // Current capacity of thread_tids buffer
#endif
} RemoteUnwinderObject;
@ -295,6 +344,7 @@ typedef struct {
uintptr_t gc_frame; // GC frame address (0 if not tracking)
uintptr_t last_profiled_frame; // Last cached frame (0 if no cache)
StackChunkList *chunks; // Pre-copied stack chunks
int skip_first_frame; // Skip frame_addr itself (continue from its caller)
/* Outputs */
PyObject *frame_info; // List to append FrameInfo objects
@ -518,6 +568,11 @@ extern PyObject* unwind_stack_for_thread(
uintptr_t gc_frame
);
/* Thread stopping functions (for blocking mode) */
extern void _Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st);
extern int _Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st);
extern void _Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st);
/* ============================================================================
* ASYNCIO FUNCTION DECLARATIONS
* ============================================================================ */

View file

@ -435,6 +435,66 @@ _remote_debugging_RemoteUnwinder_get_stats(PyObject *self, PyObject *Py_UNUSED(i
return return_value;
}
PyDoc_STRVAR(_remote_debugging_RemoteUnwinder_pause_threads__doc__,
"pause_threads($self, /)\n"
"--\n"
"\n"
"Pause all threads in the target process.\n"
"\n"
"This stops all threads in the target process to allow for consistent\n"
"memory reads during sampling. Must be paired with a call to resume_threads().\n"
"\n"
"Returns True if threads were successfully paused, False if they were already paused.\n"
"\n"
"Raises:\n"
" RuntimeError: If there is an error stopping the threads");
#define _REMOTE_DEBUGGING_REMOTEUNWINDER_PAUSE_THREADS_METHODDEF \
{"pause_threads", (PyCFunction)_remote_debugging_RemoteUnwinder_pause_threads, METH_NOARGS, _remote_debugging_RemoteUnwinder_pause_threads__doc__},
static PyObject *
_remote_debugging_RemoteUnwinder_pause_threads_impl(RemoteUnwinderObject *self);
static PyObject *
_remote_debugging_RemoteUnwinder_pause_threads(PyObject *self, PyObject *Py_UNUSED(ignored))
{
PyObject *return_value = NULL;
Py_BEGIN_CRITICAL_SECTION(self);
return_value = _remote_debugging_RemoteUnwinder_pause_threads_impl((RemoteUnwinderObject *)self);
Py_END_CRITICAL_SECTION();
return return_value;
}
PyDoc_STRVAR(_remote_debugging_RemoteUnwinder_resume_threads__doc__,
"resume_threads($self, /)\n"
"--\n"
"\n"
"Resume all threads in the target process.\n"
"\n"
"This resumes threads that were previously paused with pause_threads().\n"
"\n"
"Returns True if threads were successfully resumed, False if they were not paused.");
#define _REMOTE_DEBUGGING_REMOTEUNWINDER_RESUME_THREADS_METHODDEF \
{"resume_threads", (PyCFunction)_remote_debugging_RemoteUnwinder_resume_threads, METH_NOARGS, _remote_debugging_RemoteUnwinder_resume_threads__doc__},
static PyObject *
_remote_debugging_RemoteUnwinder_resume_threads_impl(RemoteUnwinderObject *self);
static PyObject *
_remote_debugging_RemoteUnwinder_resume_threads(PyObject *self, PyObject *Py_UNUSED(ignored))
{
PyObject *return_value = NULL;
Py_BEGIN_CRITICAL_SECTION(self);
return_value = _remote_debugging_RemoteUnwinder_resume_threads_impl((RemoteUnwinderObject *)self);
Py_END_CRITICAL_SECTION();
return return_value;
}
PyDoc_STRVAR(_remote_debugging_BinaryWriter___init____doc__,
"BinaryWriter(filename, sample_interval_us, start_time_us, *,\n"
" compression=0)\n"
@ -1236,4 +1296,4 @@ _remote_debugging_is_python_process(PyObject *module, PyObject *const *args, Py_
exit:
return return_value;
}
/*[clinic end generated code: output=036de0b06d0e34cc input=a9049054013a1b77]*/
/*[clinic end generated code: output=34f50b18f317b9b6 input=a9049054013a1b77]*/

View file

@ -174,8 +174,16 @@ frame_cache_lookup_and_extend(
Py_ssize_t num_frames = PyList_GET_SIZE(entry->frame_list);
// Extend frame_info with frames from start_idx onwards
PyObject *slice = PyList_GetSlice(entry->frame_list, start_idx, num_frames);
// Extend frame_info with frames ABOVE start_idx (not including it).
// The frame at start_idx (last_profiled_frame) was the executing frame
// in the previous sample and its line number may have changed.
// Only frames above it (its callers) are frozen at their call sites.
Py_ssize_t cache_start = start_idx + 1;
if (cache_start >= num_frames) {
return 0; // Nothing above last_profiled_frame to extend with
}
PyObject *slice = PyList_GetSlice(entry->frame_list, cache_start, num_frames);
if (!slice) {
return -1;
}
@ -188,9 +196,9 @@ frame_cache_lookup_and_extend(
return -1;
}
// Also extend frame_addrs with cached addresses if provided
// Also extend frame_addrs with cached addresses (above last_profiled_frame)
if (frame_addrs) {
for (Py_ssize_t i = start_idx; i < entry->num_addrs && *num_addrs < max_addrs; i++) {
for (Py_ssize_t i = cache_start; i < entry->num_addrs && *num_addrs < max_addrs; i++) {
frame_addrs[(*num_addrs)++] = entry->addrs[i];
}
}

View file

@ -281,16 +281,7 @@ process_frame_chain(
ctx->stopped_at_cached_frame = 0;
ctx->last_frame_visited = 0;
if (ctx->last_profiled_frame != 0 && ctx->frame_addr == ctx->last_profiled_frame) {
ctx->stopped_at_cached_frame = 1;
return 0;
}
while ((void*)frame_addr != NULL) {
if (ctx->last_profiled_frame != 0 && frame_addr == ctx->last_profiled_frame) {
ctx->stopped_at_cached_frame = 1;
break;
}
PyObject *frame = NULL;
uintptr_t next_frame_addr = 0;
uintptr_t stackpointer = 0;
@ -311,6 +302,14 @@ process_frame_chain(
return -1;
}
}
// Skip first frame if requested (used for cache miss continuation)
if (ctx->skip_first_frame && frame_count == 1) {
Py_XDECREF(frame);
frame_addr = next_frame_addr;
continue;
}
if (frame == NULL && PyList_GET_SIZE(ctx->frame_info) == 0) {
const char *e = "Failed to parse initial frame in chain";
PyErr_SetString(PyExc_RuntimeError, e);
@ -367,6 +366,11 @@ process_frame_chain(
Py_DECREF(frame);
}
if (ctx->last_profiled_frame != 0 && frame_addr == ctx->last_profiled_frame) {
ctx->stopped_at_cached_frame = 1;
break;
}
prev_frame_addr = next_frame_addr;
frame_addr = next_frame_addr;
}
@ -548,14 +552,15 @@ collect_frames_with_cache(
}
if (cache_result == 0) {
STATS_INC(unwinder, frame_cache_misses);
Py_ssize_t frames_before_walk = PyList_GET_SIZE(ctx->frame_info);
// Continue walking from last_profiled_frame, skipping it (already processed)
Py_ssize_t frames_before_walk = PyList_GET_SIZE(ctx->frame_info);
FrameWalkContext continue_ctx = {
.frame_addr = ctx->last_profiled_frame,
.base_frame_addr = ctx->base_frame_addr,
.gc_frame = ctx->gc_frame,
.last_profiled_frame = 0,
.chunks = ctx->chunks,
.skip_first_frame = 1,
.frame_info = ctx->frame_info,
.frame_addrs = ctx->frame_addrs,
.num_addrs = ctx->num_addrs,
@ -566,7 +571,6 @@ collect_frames_with_cache(
}
ctx->num_addrs = continue_ctx.num_addrs;
ctx->last_frame_visited = continue_ctx.last_frame_visited;
STATS_ADD(unwinder, frames_read_from_memory, PyList_GET_SIZE(ctx->frame_info) - frames_before_walk);
} else {
// Partial cache hit - cached stack was validated as complete when stored,

View file

@ -342,6 +342,9 @@ _remote_debugging_RemoteUnwinder___init___impl(RemoteUnwinderObject *self,
self->skip_non_matching_threads = skip_non_matching_threads;
self->cached_state = NULL;
self->frame_cache = NULL;
#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
self->threads_stopped = 0;
#endif
// Initialize stats to zero
memset(&self->stats, 0, sizeof(self->stats));
if (_Py_RemoteDebug_InitProcHandle(&self->handle, pid) < 0) {
@ -423,6 +426,10 @@ _remote_debugging_RemoteUnwinder___init___impl(RemoteUnwinderObject *self,
self->win_process_buffer = NULL;
self->win_process_buffer_size = 0;
#endif
#ifdef __linux__
self->thread_tids = NULL;
self->thread_tids_capacity = 0;
#endif
if (cache_frames && frame_cache_init(self) < 0) {
return -1;
@ -930,11 +937,81 @@ _remote_debugging_RemoteUnwinder_get_stats_impl(RemoteUnwinderObject *self)
return result;
}
/*[clinic input]
@critical_section
_remote_debugging.RemoteUnwinder.pause_threads
Pause all threads in the target process.
This stops all threads in the target process to allow for consistent
memory reads during sampling. Must be paired with a call to resume_threads().
Returns True if threads were successfully paused, False if they were already paused.
Raises:
RuntimeError: If there is an error stopping the threads
[clinic start generated code]*/
static PyObject *
_remote_debugging_RemoteUnwinder_pause_threads_impl(RemoteUnwinderObject *self)
/*[clinic end generated code: output=aaf2bdc0a725750c input=78601c60dbc245fe]*/
{
#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
if (self->threads_stopped) {
Py_RETURN_FALSE;
}
_Py_RemoteDebug_InitThreadsState(self, &self->threads_state);
if (_Py_RemoteDebug_StopAllThreads(self, &self->threads_state) < 0) {
return NULL;
}
self->threads_stopped = 1;
Py_RETURN_TRUE;
#else
PyErr_SetString(PyExc_NotImplementedError,
"pause_threads is not supported on this platform");
return NULL;
#endif
}
/*[clinic input]
@critical_section
_remote_debugging.RemoteUnwinder.resume_threads
Resume all threads in the target process.
This resumes threads that were previously paused with pause_threads().
Returns True if threads were successfully resumed, False if they were not paused.
[clinic start generated code]*/
static PyObject *
_remote_debugging_RemoteUnwinder_resume_threads_impl(RemoteUnwinderObject *self)
/*[clinic end generated code: output=8d6781ea37095536 input=67ca813bd804289e]*/
{
#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
if (!self->threads_stopped) {
Py_RETURN_FALSE;
}
_Py_RemoteDebug_ResumeAllThreads(self, &self->threads_state);
self->threads_stopped = 0;
Py_RETURN_TRUE;
#else
PyErr_SetString(PyExc_NotImplementedError,
"resume_threads is not supported on this platform");
return NULL;
#endif
}
static PyMethodDef RemoteUnwinder_methods[] = {
_REMOTE_DEBUGGING_REMOTEUNWINDER_GET_STACK_TRACE_METHODDEF
_REMOTE_DEBUGGING_REMOTEUNWINDER_GET_ALL_AWAITED_BY_METHODDEF
_REMOTE_DEBUGGING_REMOTEUNWINDER_GET_ASYNC_STACK_TRACE_METHODDEF
_REMOTE_DEBUGGING_REMOTEUNWINDER_GET_STATS_METHODDEF
_REMOTE_DEBUGGING_REMOTEUNWINDER_PAUSE_THREADS_METHODDEF
_REMOTE_DEBUGGING_REMOTEUNWINDER_RESUME_THREADS_METHODDEF
{NULL, NULL}
};
@ -943,6 +1020,20 @@ RemoteUnwinder_dealloc(PyObject *op)
{
RemoteUnwinderObject *self = RemoteUnwinder_CAST(op);
PyTypeObject *tp = Py_TYPE(self);
#ifdef Py_REMOTE_DEBUG_SUPPORTS_BLOCKING
if (self->threads_stopped) {
_Py_RemoteDebug_ResumeAllThreads(self, &self->threads_state);
self->threads_stopped = 0;
}
#endif
#ifdef __linux__
if (self->thread_tids != NULL) {
PyMem_RawFree(self->thread_tids);
self->thread_tids = NULL;
}
#endif
if (self->code_object_cache) {
_Py_hashtable_destroy(self->code_object_cache);
}

View file

@ -11,6 +11,12 @@
#include <unistd.h>
#endif
#ifdef __linux__
#include <dirent.h>
#include <sys/ptrace.h>
#include <sys/wait.h>
#endif
/* ============================================================================
* THREAD ITERATION FUNCTIONS
* ============================================================================ */
@ -501,3 +507,286 @@ unwind_stack_for_thread(
cleanup_stack_chunks(&chunks);
return NULL;
}
/* ============================================================================
* PROCESS STOP FUNCTIONS
* ============================================================================ */
#if defined(__APPLE__) && TARGET_OS_OSX
void
_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
st->task = MACH_PORT_NULL;
st->suspended = 0;
}
int
_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
kern_return_t kr = task_suspend(unwinder->handle.task);
if (kr != KERN_SUCCESS) {
if (kr == MACH_SEND_INVALID_DEST) {
PyErr_Format(PyExc_ProcessLookupError,
"Process %d has terminated", unwinder->handle.pid);
} else {
PyErr_Format(PyExc_RuntimeError,
"task_suspend failed for PID %d: kern_return_t %d",
unwinder->handle.pid, kr);
}
return -1;
}
st->task = unwinder->handle.task;
st->suspended = 1;
_Py_RemoteDebug_ClearCache(&unwinder->handle);
return 0;
}
void
_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
if (!st->suspended || st->task == MACH_PORT_NULL) {
return;
}
task_resume(st->task);
st->task = MACH_PORT_NULL;
st->suspended = 0;
}
#elif defined(__linux__)
void
_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
st->tids = NULL;
st->count = 0;
}
static int
read_thread_ids(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
char task_path[64];
snprintf(task_path, sizeof(task_path), "/proc/%d/task", unwinder->handle.pid);
DIR *dir = opendir(task_path);
if (dir == NULL) {
st->tids = NULL;
st->count = 0;
if (errno == ENOENT || errno == ESRCH) {
PyErr_Format(PyExc_ProcessLookupError,
"Process %d has terminated", unwinder->handle.pid);
} else {
PyErr_SetFromErrnoWithFilename(PyExc_OSError, task_path);
}
return -1;
}
st->count = 0;
struct dirent *entry;
while ((entry = readdir(dir)) != NULL) {
if (entry->d_name[0] < '1' || entry->d_name[0] > '9') {
continue;
}
char *endptr;
long tid = strtol(entry->d_name, &endptr, 10);
if (*endptr != '\0' || tid <= 0) {
continue;
}
if (st->count >= unwinder->thread_tids_capacity) {
size_t new_cap = unwinder->thread_tids_capacity == 0 ? 64 : unwinder->thread_tids_capacity * 2;
pid_t *new_tids = PyMem_RawRealloc(unwinder->thread_tids, new_cap * sizeof(pid_t));
if (new_tids == NULL) {
closedir(dir);
st->tids = NULL;
st->count = 0;
PyErr_NoMemory();
return -1;
}
unwinder->thread_tids = new_tids;
unwinder->thread_tids_capacity = new_cap;
}
unwinder->thread_tids[st->count++] = (pid_t)tid;
}
st->tids = unwinder->thread_tids;
closedir(dir);
return 0;
}
static inline void
detach_threads(_Py_RemoteDebug_ThreadsState *st, size_t up_to)
{
for (size_t j = 0; j < up_to; j++) {
ptrace(PTRACE_DETACH, st->tids[j], NULL, NULL);
}
}
static int
seize_thread(pid_t tid)
{
if (ptrace(PTRACE_SEIZE, tid, NULL, 0) == 0) {
return 0;
}
if (errno == ESRCH) {
return 1; // Thread gone, skip
}
if (errno == EINVAL || errno == EIO) {
// Fallback for older kernels
if (ptrace(PTRACE_ATTACH, tid, NULL, NULL) == 0) {
int status;
waitpid(tid, &status, __WALL);
return 0;
}
if (errno == ESRCH) {
return 1; // Thread gone
}
}
return -1; // Real error
}
int
_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
if (read_thread_ids(unwinder, st) < 0) {
return -1;
}
for (size_t i = 0; i < st->count; i++) {
pid_t tid = st->tids[i];
int ret = seize_thread(tid);
if (ret == 1) {
continue; // Thread gone, skip
}
if (ret < 0) {
detach_threads(st, i);
PyErr_Format(PyExc_RuntimeError, "Failed to seize thread %d: %s", tid, strerror(errno));
st->tids = NULL;
st->count = 0;
return -1;
}
if (ptrace(PTRACE_INTERRUPT, tid, NULL, NULL) == -1 && errno != ESRCH) {
detach_threads(st, i + 1);
PyErr_Format(PyExc_RuntimeError, "Failed to interrupt thread %d: %s", tid, strerror(errno));
st->tids = NULL;
st->count = 0;
return -1;
}
int status;
if (waitpid(tid, &status, __WALL) == -1 && errno != ECHILD && errno != ESRCH) {
detach_threads(st, i + 1);
PyErr_Format(PyExc_RuntimeError, "waitpid failed for thread %d: %s", tid, strerror(errno));
st->tids = NULL;
st->count = 0;
return -1;
}
}
return 0;
}
void
_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
if (st->tids == NULL || st->count == 0) {
return;
}
detach_threads(st, st->count);
st->tids = NULL;
st->count = 0;
}
#elif defined(MS_WINDOWS)
void
_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
st->hProcess = NULL;
st->suspended = 0;
}
int
_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
static NtSuspendProcessFunc pNtSuspendProcess = NULL;
static int tried_load = 0;
if (!tried_load) {
HMODULE hNtdll = GetModuleHandleW(L"ntdll.dll");
if (hNtdll) {
pNtSuspendProcess = (NtSuspendProcessFunc)GetProcAddress(hNtdll, "NtSuspendProcess");
}
tried_load = 1;
}
if (pNtSuspendProcess == NULL) {
PyErr_SetString(PyExc_RuntimeError, "NtSuspendProcess not available");
return -1;
}
NTSTATUS status = pNtSuspendProcess(unwinder->handle.hProcess);
if (status >= 0) {
st->hProcess = unwinder->handle.hProcess;
st->suspended = 1;
_Py_RemoteDebug_ClearCache(&unwinder->handle);
return 0;
}
PyErr_Format(PyExc_RuntimeError, "NtSuspendProcess failed: 0x%lx", status);
return -1;
}
void
_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
if (!st->suspended || st->hProcess == NULL) {
return;
}
static NtResumeProcessFunc pNtResumeProcess = NULL;
static int tried_load = 0;
if (!tried_load) {
HMODULE hNtdll = GetModuleHandleW(L"ntdll.dll");
if (hNtdll) {
pNtResumeProcess = (NtResumeProcessFunc)GetProcAddress(hNtdll, "NtResumeProcess");
}
tried_load = 1;
}
if (pNtResumeProcess != NULL) {
pNtResumeProcess(st->hProcess);
}
st->hProcess = NULL;
st->suspended = 0;
}
#else
void
_Py_RemoteDebug_InitThreadsState(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
(void)unwinder;
(void)st;
}
int
_Py_RemoteDebug_StopAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
(void)unwinder;
(void)st;
return 0;
}
void
_Py_RemoteDebug_ResumeAllThreads(RemoteUnwinderObject *unwinder, _Py_RemoteDebug_ThreadsState *st)
{
(void)unwinder;
(void)st;
}
#endif

View file

@ -33,6 +33,9 @@ extern "C" {
#ifdef __linux__
# include <elf.h>
# include <sys/uio.h>
# include <sys/ptrace.h>
# include <sys/wait.h>
# include <dirent.h>
# if INTPTR_MAX == INT64_MAX
# define Elf_Ehdr Elf64_Ehdr
# define Elf_Shdr Elf64_Shdr
@ -43,6 +46,17 @@ extern "C" {
# define Elf_Phdr Elf32_Phdr
# endif
# include <sys/mman.h>
// PTRACE options - define if not available
# ifndef PTRACE_SEIZE
# define PTRACE_SEIZE 0x4206
# endif
# ifndef PTRACE_INTERRUPT
# define PTRACE_INTERRUPT 0x4207
# endif
# ifndef PTRACE_EVENT_STOP
# define PTRACE_EVENT_STOP 128
# endif
#endif
#if defined(__APPLE__) && defined(TARGET_OS_OSX) && TARGET_OS_OSX
@ -55,6 +69,7 @@ extern "C" {
# include <mach/mach_vm.h>
# include <mach/machine.h>
# include <mach/task_info.h>
# include <mach/thread_act.h>
# include <sys/mman.h>
# include <sys/proc.h>
# include <sys/sysctl.h>
@ -169,7 +184,7 @@ _Py_RemoteDebug_InitProcHandle(proc_handle_t *handle, pid_t pid) {
}
#elif defined(MS_WINDOWS)
handle->hProcess = OpenProcess(
PROCESS_VM_READ | PROCESS_VM_WRITE | PROCESS_VM_OPERATION | PROCESS_QUERY_INFORMATION,
PROCESS_VM_READ | PROCESS_VM_WRITE | PROCESS_VM_OPERATION | PROCESS_QUERY_INFORMATION | PROCESS_SUSPEND_RESUME,
FALSE, pid);
if (handle->hProcess == NULL) {
PyErr_SetFromWindowsErr(0);

View file

@ -167,7 +167,7 @@ def create_threads(n):
}
def benchmark(unwinder, duration_seconds=10):
def benchmark(unwinder, duration_seconds=10, blocking=False):
"""Benchmark mode - measure raw sampling speed for specified duration"""
sample_count = 0
fail_count = 0
@ -187,9 +187,15 @@ def benchmark(unwinder, duration_seconds=10):
total_attempts += 1
work_start = time.perf_counter()
try:
stack_trace = unwinder.get_stack_trace()
if stack_trace:
sample_count += 1
if blocking:
unwinder.pause_threads()
try:
stack_trace = unwinder.get_stack_trace()
if stack_trace:
sample_count += 1
finally:
if blocking:
unwinder.resume_threads()
except (OSError, RuntimeError, UnicodeDecodeError) as e:
fail_count += 1
@ -353,6 +359,12 @@ def parse_arguments():
help="Which threads to include in the benchmark (default: all)",
)
parser.add_argument(
"--blocking",
action="store_true",
help="Stop all threads before sampling for consistent snapshots",
)
return parser.parse_args()
@ -408,6 +420,9 @@ def main():
print(
f"{colors.CYAN}Benchmark Duration:{colors.RESET} {colors.YELLOW}{args.duration}{colors.RESET} seconds"
)
print(
f"{colors.CYAN}Blocking Mode:{colors.RESET} {colors.GREEN if args.blocking else colors.YELLOW}{'enabled' if args.blocking else 'disabled'}{colors.RESET}"
)
process = None
temp_file_path = None
@ -436,7 +451,7 @@ def main():
unwinder = _remote_debugging.RemoteUnwinder(
process.pid, cache_frames=True, **kwargs
)
results = benchmark(unwinder, duration_seconds=args.duration)
results = benchmark(unwinder, duration_seconds=args.duration, blocking=args.blocking)
finally:
cleanup_process(process, temp_file_path)