gh-138385: Sample all interpreters in the tachyon profiler (#138398)

This commit is contained in:
Pablo Galindo Salgado 2025-09-09 00:41:08 +01:00 committed by GitHub
parent 01895d233b
commit 03ee060ec8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 884 additions and 320 deletions

View file

@ -9,3 +9,11 @@ def collect(self, stack_frames):
@abstractmethod
def export(self, filename):
"""Export collected data to a file."""
def _iter_all_frames(self, stack_frames):
"""Iterate over all frame stacks from all interpreters and threads."""
for interpreter_info in stack_frames:
for thread_info in interpreter_info.threads:
frames = thread_info.frame_info
if frames:
yield frames

View file

@ -15,10 +15,10 @@ def __init__(self, sample_interval_usec):
lambda: collections.defaultdict(int)
)
def collect(self, stack_frames):
for thread_id, frames in stack_frames:
def _process_frames(self, frames):
"""Process a single thread's frame stack."""
if not frames:
continue
return
# Process each frame in the stack to track cumulative calls
for frame in frames:
@ -26,13 +26,7 @@ def collect(self, stack_frames):
self.result[location]["cumulative_calls"] += 1
# The top frame gets counted as an inline call (directly executing)
top_frame = frames[0]
top_location = (
top_frame.filename,
top_frame.lineno,
top_frame.funcname,
)
top_location = (frames[0].filename, frames[0].lineno, frames[0].funcname)
self.result[top_location]["direct_calls"] += 1
# Track caller-callee relationships for call graph
@ -40,19 +34,15 @@ def collect(self, stack_frames):
callee_frame = frames[i - 1]
caller_frame = frames[i]
callee = (
callee_frame.filename,
callee_frame.lineno,
callee_frame.funcname,
)
caller = (
caller_frame.filename,
caller_frame.lineno,
caller_frame.funcname,
)
callee = (callee_frame.filename, callee_frame.lineno, callee_frame.funcname)
caller = (caller_frame.filename, caller_frame.lineno, caller_frame.funcname)
self.callers[callee][caller] += 1
def collect(self, stack_frames):
for frames in self._iter_all_frames(stack_frames):
self._process_frames(frames)
def export(self, filename):
self.create_stats()
self._dump_stats(filename)

View file

@ -9,9 +9,11 @@ def __init__(self):
self.call_trees = []
self.function_samples = collections.defaultdict(int)
def collect(self, stack_frames):
for thread_id, frames in stack_frames:
if frames:
def _process_frames(self, frames):
"""Process a single thread's frame stack."""
if not frames:
return
# Store the complete call stack (reverse order - root first)
call_tree = list(reversed(frames))
self.call_trees.append(call_tree)
@ -20,6 +22,10 @@ def collect(self, stack_frames):
for frame in frames:
self.function_samples[frame] += 1
def collect(self, stack_frames):
for frames in self._iter_all_frames(stack_frames):
self._process_frames(frames)
class CollapsedStackCollector(StackTraceCollector):
def export(self, filename):

View file

@ -19,6 +19,11 @@
import subprocess
try:
from concurrent import interpreters
except ImportError:
interpreters = None
PROCESS_VM_READV_SUPPORTED = False
try:
@ -47,6 +52,12 @@ def _make_test_script(script_dir, script_basename, source):
)
def requires_subinterpreters(meth):
"""Decorator to skip a test if subinterpreters are not supported."""
return unittest.skipIf(interpreters is None,
'subinterpreters required')(meth)
def get_stack_trace(pid):
unwinder = RemoteUnwinder(pid, all_threads=True, debug=True)
return unwinder.get_stack_trace()
@ -140,15 +151,27 @@ def foo():
]
# Is possible that there are more threads, so we check that the
# expected stack traces are in the result (looking at you Windows!)
self.assertIn((ANY, thread_expected_stack_trace), stack_trace)
found_expected_stack = False
for interpreter_info in stack_trace:
for thread_info in interpreter_info.threads:
if thread_info.frame_info == thread_expected_stack_trace:
found_expected_stack = True
break
if found_expected_stack:
break
self.assertTrue(found_expected_stack, "Expected thread stack trace not found")
# Check that the main thread stack trace is in the result
frame = FrameInfo([script_name, 19, "<module>"])
for _, stack in stack_trace:
if frame in stack:
main_thread_found = False
for interpreter_info in stack_trace:
for thread_info in interpreter_info.threads:
if frame in thread_info.frame_info:
main_thread_found = True
break
else:
self.fail("Main thread stack trace not found in result")
if main_thread_found:
break
self.assertTrue(main_thread_found, "Main thread stack trace not found in result")
@skip_if_not_supported
@unittest.skipIf(
@ -1086,13 +1109,17 @@ def test_self_trace(self):
# Is possible that there are more threads, so we check that the
# expected stack traces are in the result (looking at you Windows!)
this_tread_stack = None
for thread_id, stack in stack_trace:
if thread_id == threading.get_native_id():
this_tread_stack = stack
# New format: [InterpreterInfo(interpreter_id, [ThreadInfo(...)])]
for interpreter_info in stack_trace:
for thread_info in interpreter_info.threads:
if thread_info.thread_id == threading.get_native_id():
this_tread_stack = thread_info.frame_info
break
if this_tread_stack:
break
self.assertIsNotNone(this_tread_stack)
self.assertEqual(
stack[:2],
this_tread_stack[:2],
[
FrameInfo(
[
@ -1111,6 +1138,360 @@ def test_self_trace(self):
],
)
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
@requires_subinterpreters
def test_subinterpreter_stack_trace(self):
# Test that subinterpreters are correctly handled
port = find_unused_port()
# Calculate subinterpreter code separately and pickle it to avoid f-string issues
import pickle
subinterp_code = textwrap.dedent(f'''
import socket
import time
def sub_worker():
def nested_func():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
sock.sendall(b"ready:sub\\n")
time.sleep(10_000)
nested_func()
sub_worker()
''').strip()
# Pickle the subinterpreter code
pickled_code = pickle.dumps(subinterp_code)
script = textwrap.dedent(
f"""
from concurrent import interpreters
import time
import sys
import socket
import threading
# Connect to the test process
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
def main_worker():
# Function running in main interpreter
sock.sendall(b"ready:main\\n")
time.sleep(10_000)
def run_subinterp():
# Create and run subinterpreter
subinterp = interpreters.create()
import pickle
pickled_code = {pickled_code!r}
subinterp_code = pickle.loads(pickled_code)
subinterp.exec(subinterp_code)
# Start subinterpreter in thread
sub_thread = threading.Thread(target=run_subinterp)
sub_thread.start()
# Start main thread work
main_thread = threading.Thread(target=main_worker)
main_thread.start()
# Keep main thread alive
main_thread.join()
sub_thread.join()
"""
)
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
script_name = _make_test_script(script_dir, "script", script)
client_sockets = []
try:
p = subprocess.Popen([sys.executable, script_name])
# Accept connections from both main and subinterpreter
responses = set()
while len(responses) < 2: # Wait for both "ready:main" and "ready:sub"
try:
client_socket, _ = server_socket.accept()
client_sockets.append(client_socket)
# Read the response from this connection
response = client_socket.recv(1024)
if b"ready:main" in response:
responses.add("main")
if b"ready:sub" in response:
responses.add("sub")
except socket.timeout:
break
server_socket.close()
stack_trace = get_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
finally:
for client_socket in client_sockets:
if client_socket is not None:
client_socket.close()
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
# Verify we have multiple interpreters
self.assertGreaterEqual(len(stack_trace), 1, "Should have at least one interpreter")
# Look for main interpreter (ID 0) and subinterpreter (ID > 0)
main_interp = None
sub_interp = None
for interpreter_info in stack_trace:
if interpreter_info.interpreter_id == 0:
main_interp = interpreter_info
elif interpreter_info.interpreter_id > 0:
sub_interp = interpreter_info
self.assertIsNotNone(main_interp, "Main interpreter should be present")
# Check main interpreter has expected stack trace
main_found = False
for thread_info in main_interp.threads:
for frame in thread_info.frame_info:
if frame.funcname == "main_worker":
main_found = True
break
if main_found:
break
self.assertTrue(main_found, "Main interpreter should have main_worker in stack")
# If subinterpreter is present, check its stack trace
if sub_interp:
sub_found = False
for thread_info in sub_interp.threads:
for frame in thread_info.frame_info:
if frame.funcname in ("sub_worker", "nested_func"):
sub_found = True
break
if sub_found:
break
self.assertTrue(sub_found, "Subinterpreter should have sub_worker or nested_func in stack")
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
@requires_subinterpreters
def test_multiple_subinterpreters_with_threads(self):
# Test multiple subinterpreters, each with multiple threads
port = find_unused_port()
# Calculate subinterpreter codes separately and pickle them
import pickle
# Code for first subinterpreter with 2 threads
subinterp1_code = textwrap.dedent(f'''
import socket
import time
import threading
def worker1():
def nested_func():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
sock.sendall(b"ready:sub1-t1\\n")
time.sleep(10_000)
nested_func()
def worker2():
def nested_func():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
sock.sendall(b"ready:sub1-t2\\n")
time.sleep(10_000)
nested_func()
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t1.start()
t2.start()
t1.join()
t2.join()
''').strip()
# Code for second subinterpreter with 2 threads
subinterp2_code = textwrap.dedent(f'''
import socket
import time
import threading
def worker1():
def nested_func():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
sock.sendall(b"ready:sub2-t1\\n")
time.sleep(10_000)
nested_func()
def worker2():
def nested_func():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
sock.sendall(b"ready:sub2-t2\\n")
time.sleep(10_000)
nested_func()
t1 = threading.Thread(target=worker1)
t2 = threading.Thread(target=worker2)
t1.start()
t2.start()
t1.join()
t2.join()
''').strip()
# Pickle the subinterpreter codes
pickled_code1 = pickle.dumps(subinterp1_code)
pickled_code2 = pickle.dumps(subinterp2_code)
script = textwrap.dedent(
f"""
from concurrent import interpreters
import time
import sys
import socket
import threading
# Connect to the test process
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
def main_worker():
# Function running in main interpreter
sock.sendall(b"ready:main\\n")
time.sleep(10_000)
def run_subinterp1():
# Create and run first subinterpreter
subinterp = interpreters.create()
import pickle
pickled_code = {pickled_code1!r}
subinterp_code = pickle.loads(pickled_code)
subinterp.exec(subinterp_code)
def run_subinterp2():
# Create and run second subinterpreter
subinterp = interpreters.create()
import pickle
pickled_code = {pickled_code2!r}
subinterp_code = pickle.loads(pickled_code)
subinterp.exec(subinterp_code)
# Start subinterpreters in threads
sub1_thread = threading.Thread(target=run_subinterp1)
sub2_thread = threading.Thread(target=run_subinterp2)
sub1_thread.start()
sub2_thread.start()
# Start main thread work
main_thread = threading.Thread(target=main_worker)
main_thread.start()
# Keep main thread alive
main_thread.join()
sub1_thread.join()
sub2_thread.join()
"""
)
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)
# Create a socket server to communicate with the target process
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(5) # Allow multiple connections
script_name = _make_test_script(script_dir, "script", script)
client_sockets = []
try:
p = subprocess.Popen([sys.executable, script_name])
# Accept connections from main and all subinterpreter threads
expected_responses = {"ready:main", "ready:sub1-t1", "ready:sub1-t2", "ready:sub2-t1", "ready:sub2-t2"}
responses = set()
while len(responses) < 5: # Wait for all 5 ready signals
try:
client_socket, _ = server_socket.accept()
client_sockets.append(client_socket)
# Read the response from this connection
response = client_socket.recv(1024)
response_str = response.decode().strip()
if response_str in expected_responses:
responses.add(response_str)
except socket.timeout:
break
server_socket.close()
stack_trace = get_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace"
)
finally:
for client_socket in client_sockets:
if client_socket is not None:
client_socket.close()
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
# Verify we have multiple interpreters
self.assertGreaterEqual(len(stack_trace), 2, "Should have at least two interpreters")
# Count interpreters by ID
interpreter_ids = {interp.interpreter_id for interp in stack_trace}
self.assertIn(0, interpreter_ids, "Main interpreter should be present")
self.assertGreaterEqual(len(interpreter_ids), 3, "Should have main + at least 2 subinterpreters")
# Count total threads across all interpreters
total_threads = sum(len(interp.threads) for interp in stack_trace)
self.assertGreaterEqual(total_threads, 5, "Should have at least 5 threads total")
# Look for expected function names in stack traces
all_funcnames = set()
for interpreter_info in stack_trace:
for thread_info in interpreter_info.threads:
for frame in thread_info.frame_info:
all_funcnames.add(frame.funcname)
# Should find functions from different interpreters and threads
expected_funcs = {"main_worker", "worker1", "worker2", "nested_func"}
found_funcs = expected_funcs.intersection(all_funcnames)
self.assertGreater(len(found_funcs), 0, f"Should find some expected functions, got: {all_funcnames}")
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
@ -1203,15 +1584,20 @@ def main_work():
# Wait for the main thread to start its busy work
all_traces = unwinder_all.get_stack_trace()
found = False
for thread_id, stack in all_traces:
if not stack:
# New format: [InterpreterInfo(interpreter_id, [ThreadInfo(...)])]
for interpreter_info in all_traces:
for thread_info in interpreter_info.threads:
if not thread_info.frame_info:
continue
current_frame = stack[0]
current_frame = thread_info.frame_info[0]
if (
current_frame.funcname == "main_work"
and current_frame.lineno > 15
):
found = True
break
if found:
break
if found:
break
@ -1237,19 +1623,31 @@ def main_work():
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
# Verify we got multiple threads in all_traces
# Count total threads across all interpreters in all_traces
total_threads = sum(len(interpreter_info.threads) for interpreter_info in all_traces)
self.assertGreater(
len(all_traces), 1, "Should have multiple threads"
total_threads, 1, "Should have multiple threads"
)
# Verify we got exactly one thread in gil_traces
# Count total threads across all interpreters in gil_traces
total_gil_threads = sum(len(interpreter_info.threads) for interpreter_info in gil_traces)
self.assertEqual(
len(gil_traces), 1, "Should have exactly one GIL holder"
total_gil_threads, 1, "Should have exactly one GIL holder"
)
# The GIL holder should be in the all_traces list
gil_thread_id = gil_traces[0][0]
all_thread_ids = [trace[0] for trace in all_traces]
# Get the GIL holder thread ID
gil_thread_id = None
for interpreter_info in gil_traces:
if interpreter_info.threads:
gil_thread_id = interpreter_info.threads[0].thread_id
break
# Get all thread IDs from all_traces
all_thread_ids = []
for interpreter_info in all_traces:
for thread_info in interpreter_info.threads:
all_thread_ids.append(thread_info.thread_id)
self.assertIn(
gil_thread_id,
all_thread_ids,

View file

@ -49,6 +49,28 @@ def __repr__(self):
return f"MockFrameInfo(filename='{self.filename}', lineno={self.lineno}, funcname='{self.funcname}')"
class MockThreadInfo:
"""Mock ThreadInfo for testing since the real one isn't accessible."""
def __init__(self, thread_id, frame_info):
self.thread_id = thread_id
self.frame_info = frame_info
def __repr__(self):
return f"MockThreadInfo(thread_id={self.thread_id}, frame_info={self.frame_info})"
class MockInterpreterInfo:
"""Mock InterpreterInfo for testing since the real one isn't accessible."""
def __init__(self, interpreter_id, threads):
self.interpreter_id = interpreter_id
self.threads = threads
def __repr__(self):
return f"MockInterpreterInfo(interpreter_id={self.interpreter_id}, threads={self.threads})"
skip_if_not_supported = unittest.skipIf(
(
sys.platform != "darwin"
@ -152,19 +174,22 @@ def test_pstats_collector_with_extreme_intervals_and_empty_data(self):
self.assertEqual(len(collector.result), 0)
# Test collecting frames with None thread id
test_frames = [(None, [MockFrameInfo("file.py", 10, "func")])]
test_frames = [MockInterpreterInfo(0, [MockThreadInfo(None, [MockFrameInfo("file.py", 10, "func")])])]
collector.collect(test_frames)
# Should still process the frames
self.assertEqual(len(collector.result), 1)
# Test collecting duplicate frames in same sample
test_frames = [
(
MockInterpreterInfo(
0, # interpreter_id
[MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 10, "func1"), # Duplicate
],
)]
)
]
collector = PstatsCollector(sample_interval_usec=1000)
@ -179,7 +204,7 @@ def test_pstats_collector_single_frame_stacks(self):
collector = PstatsCollector(sample_interval_usec=1000)
# Test with exactly one frame (should trigger the <= 1 condition)
single_frame = [(1, [MockFrameInfo("single.py", 10, "single_func")])]
single_frame = [MockInterpreterInfo(0, [MockThreadInfo(1, [MockFrameInfo("single.py", 10, "single_func")])])]
collector.collect(single_frame)
# Should record the single frame with inline call
@ -190,7 +215,7 @@ def test_pstats_collector_single_frame_stacks(self):
self.assertEqual(collector.result[single_key]["cumulative_calls"], 1)
# Test with empty frames (should also trigger <= 1 condition)
empty_frames = [(1, [])]
empty_frames = [MockInterpreterInfo(0, [MockThreadInfo(1, [])])]
collector.collect(empty_frames)
# Should not add any new entries
@ -200,11 +225,14 @@ def test_pstats_collector_single_frame_stacks(self):
# Test mixed single and multi-frame stacks
mixed_frames = [
(
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[MockFrameInfo("single2.py", 20, "single_func2")],
), # Single frame
(
MockThreadInfo(
2,
[ # Multi-frame stack
MockFrameInfo("multi.py", 30, "multi_func1"),
@ -212,6 +240,8 @@ def test_pstats_collector_single_frame_stacks(self):
],
),
]
),
]
collector.collect(mixed_frames)
# Should have recorded all functions
@ -244,14 +274,14 @@ def test_collapsed_stack_collector_with_empty_and_deep_stacks(self):
self.assertEqual(len(collector.call_trees), 0)
# Test with single frame stack
test_frames = [(1, [("file.py", 10, "func")])]
test_frames = [MockInterpreterInfo(0, [MockThreadInfo(1, [("file.py", 10, "func")])])]
collector.collect(test_frames)
self.assertEqual(len(collector.call_trees), 1)
self.assertEqual(collector.call_trees[0], [("file.py", 10, "func")])
# Test with very deep stack
deep_stack = [(f"file{i}.py", i, f"func{i}") for i in range(100)]
test_frames = [(1, deep_stack)]
test_frames = [MockInterpreterInfo(0, [MockThreadInfo(1, deep_stack)])]
collector = CollapsedStackCollector()
collector.collect(test_frames)
self.assertEqual(len(collector.call_trees[0]), 100)
@ -271,12 +301,15 @@ def test_pstats_collector_basic(self):
# Test collecting sample data
test_frames = [
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 20, "func2"),
],
)]
)
]
collector.collect(test_frames)
@ -309,12 +342,15 @@ def test_pstats_collector_create_stats(self):
) # 1 second intervals
test_frames = [
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 20, "func2"),
],
)]
)
]
collector.collect(test_frames)
@ -350,7 +386,7 @@ def test_collapsed_stack_collector_basic(self):
# Test collecting sample data
test_frames = [
(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])
MockInterpreterInfo(0, [MockThreadInfo(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])])
]
collector.collect(test_frames)
@ -374,12 +410,12 @@ def test_collapsed_stack_collector_export(self):
collector = CollapsedStackCollector()
test_frames1 = [
(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])
MockInterpreterInfo(0, [MockThreadInfo(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])])
]
test_frames2 = [
(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])
MockInterpreterInfo(0, [MockThreadInfo(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])])
] # Same stack
test_frames3 = [(1, [("other.py", 5, "other_func")])]
test_frames3 = [MockInterpreterInfo(0, [MockThreadInfo(1, [("other.py", 5, "other_func")])])]
collector.collect(test_frames1)
collector.collect(test_frames2)
@ -406,24 +442,30 @@ def test_pstats_collector_export(self):
) # 1 second intervals
test_frames1 = [
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 20, "func2"),
],
)]
)
]
test_frames2 = [
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
MockFrameInfo("file.py", 10, "func1"),
MockFrameInfo("file.py", 20, "func2"),
],
)]
)
] # Same stack
test_frames3 = [(1, [MockFrameInfo("other.py", 5, "other_func")])]
test_frames3 = [MockInterpreterInfo(0, [MockThreadInfo(1, [MockFrameInfo("other.py", 5, "other_func")])])]
collector.collect(test_frames1)
collector.collect(test_frames2)
@ -1138,7 +1180,9 @@ def test_recursive_function_call_counting(self):
# Simulate a recursive call pattern: fibonacci(5) calling itself
recursive_frames = [
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[ # First sample: deep in recursion
MockFrameInfo("fib.py", 10, "fibonacci"),
@ -1149,16 +1193,22 @@ def test_recursive_function_call_counting(self):
MockFrameInfo("fib.py", 10, "fibonacci"), # even deeper
MockFrameInfo("main.py", 5, "main"), # main caller
],
)]
),
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[ # Second sample: different recursion depth
MockFrameInfo("fib.py", 10, "fibonacci"),
MockFrameInfo("fib.py", 10, "fibonacci"), # recursive call
MockFrameInfo("main.py", 5, "main"), # main caller
],
)]
),
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[ # Third sample: back to deeper recursion
MockFrameInfo("fib.py", 10, "fibonacci"),
@ -1166,6 +1216,7 @@ def test_recursive_function_call_counting(self):
MockFrameInfo("fib.py", 10, "fibonacci"),
MockFrameInfo("main.py", 5, "main"),
],
)]
),
]
@ -1202,7 +1253,9 @@ def test_nested_function_hierarchy(self):
# Simulate a deep call hierarchy
deep_call_frames = [
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
MockFrameInfo("level1.py", 10, "level1_func"),
@ -1212,8 +1265,11 @@ def test_nested_function_hierarchy(self):
MockFrameInfo("level5.py", 50, "level5_func"),
MockFrameInfo("main.py", 5, "main"),
],
)]
),
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[ # Same hierarchy sampled again
MockFrameInfo("level1.py", 10, "level1_func"),
@ -1223,6 +1279,7 @@ def test_nested_function_hierarchy(self):
MockFrameInfo("level5.py", 50, "level5_func"),
MockFrameInfo("main.py", 5, "main"),
],
)]
),
]
@ -1260,40 +1317,52 @@ def test_alternating_call_patterns(self):
# Simulate alternating execution paths
pattern_frames = [
# Pattern A: path through func_a
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
MockFrameInfo("module.py", 10, "func_a"),
MockFrameInfo("module.py", 30, "shared_func"),
MockFrameInfo("main.py", 5, "main"),
],
)]
),
# Pattern B: path through func_b
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
MockFrameInfo("module.py", 20, "func_b"),
MockFrameInfo("module.py", 30, "shared_func"),
MockFrameInfo("main.py", 5, "main"),
],
)]
),
# Pattern A again
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
MockFrameInfo("module.py", 10, "func_a"),
MockFrameInfo("module.py", 30, "shared_func"),
MockFrameInfo("main.py", 5, "main"),
],
)]
),
# Pattern B again
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
MockFrameInfo("module.py", 20, "func_b"),
MockFrameInfo("module.py", 30, "shared_func"),
MockFrameInfo("main.py", 5, "main"),
],
)]
),
]
@ -1328,7 +1397,9 @@ def test_collapsed_stack_with_recursion(self):
# Recursive call pattern
recursive_frames = [
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
("factorial.py", 10, "factorial"),
@ -1336,14 +1407,18 @@ def test_collapsed_stack_with_recursion(self):
("factorial.py", 10, "factorial"), # deeper
("main.py", 5, "main"),
],
)]
),
(
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[
("factorial.py", 10, "factorial"),
("factorial.py", 10, "factorial"), # different depth
("main.py", 5, "main"),
],
)]
),
]

View file

@ -164,6 +164,20 @@ static PyStructSequence_Desc ThreadInfo_desc = {
2
};
// InterpreterInfo structseq type - replaces 2-tuple (interpreter_id, thread_list)
static PyStructSequence_Field InterpreterInfo_fields[] = {
{"interpreter_id", "Interpreter ID"},
{"threads", "List of threads in this interpreter"},
{NULL}
};
static PyStructSequence_Desc InterpreterInfo_desc = {
"_remote_debugging.InterpreterInfo",
"Information about an interpreter",
InterpreterInfo_fields,
2
};
// AwaitedInfo structseq type - replaces 2-tuple (tid, awaited_by_list)
static PyStructSequence_Field AwaitedInfo_fields[] = {
{"thread_id", "Thread ID"},
@ -193,6 +207,7 @@ typedef struct {
PyTypeObject *FrameInfo_Type;
PyTypeObject *CoroInfo_Type;
PyTypeObject *ThreadInfo_Type;
PyTypeObject *InterpreterInfo_Type;
PyTypeObject *AwaitedInfo_Type;
} RemoteDebuggingState;
@ -2649,20 +2664,23 @@ _remote_debugging_RemoteUnwinder___init___impl(RemoteUnwinderObject *self,
@critical_section
_remote_debugging.RemoteUnwinder.get_stack_trace
Returns a list of stack traces for threads in the target process.
Returns stack traces for all interpreters and threads in process.
Each element in the returned list is a tuple of (thread_id, frame_list), where:
Each element in the returned list is a tuple of (interpreter_id, thread_list), where:
- interpreter_id is the interpreter identifier
- thread_list is a list of tuples (thread_id, frame_list) for threads in that interpreter
- thread_id is the OS thread identifier
- frame_list is a list of tuples (function_name, filename, line_number) representing
the Python stack frames for that thread, ordered from most recent to oldest
The threads returned depend on the initialization parameters:
- If only_active_thread was True: returns only the thread holding the GIL
- If all_threads was True: returns all threads
- Otherwise: returns only the main thread
- If only_active_thread was True: returns only the thread holding the GIL across all interpreters
- If all_threads was True: returns all threads across all interpreters
- Otherwise: returns only the main thread of each interpreter
Example:
[
(0, [ # Main interpreter
(1234, [
('process_data', 'worker.py', 127),
('run_worker', 'worker.py', 45),
@ -2672,6 +2690,12 @@ The threads returned depend on the initialization parameters:
('handle_request', 'server.py', 89),
('serve_forever', 'server.py', 52)
])
]),
(1, [ # Sub-interpreter
(1236, [
('sub_worker', 'sub.py', 15)
])
])
]
Raises:
@ -2684,20 +2708,32 @@ The threads returned depend on the initialization parameters:
static PyObject *
_remote_debugging_RemoteUnwinder_get_stack_trace_impl(RemoteUnwinderObject *self)
/*[clinic end generated code: output=666192b90c69d567 input=c527a4b858601408]*/
/*[clinic end generated code: output=666192b90c69d567 input=bcff01c73cccc1c0]*/
{
PyObject* result = NULL;
// Read interpreter state into opaque buffer
PyObject* result = PyList_New(0);
if (!result) {
set_exception_cause(self, PyExc_MemoryError, "Failed to create stack trace result list");
return NULL;
}
// Iterate over all interpreters
uintptr_t current_interpreter = self->interpreter_addr;
while (current_interpreter != 0) {
// Read interpreter state to get the interpreter ID
char interp_state_buffer[INTERP_STATE_BUFFER_SIZE];
if (_Py_RemoteDebug_PagedReadRemoteMemory(
&self->handle,
self->interpreter_addr,
current_interpreter,
INTERP_STATE_BUFFER_SIZE,
interp_state_buffer) < 0) {
set_exception_cause(self, PyExc_RuntimeError, "Failed to read interpreter state buffer");
Py_CLEAR(result);
goto exit;
}
int64_t interpreter_id = GET_MEMBER(int64_t, interp_state_buffer,
self->debug_offsets.interpreter_state.id);
// Get code object generation from buffer
uint64_t code_object_generation = GET_MEMBER(uint64_t, interp_state_buffer,
self->debug_offsets.interpreter_state.code_object_generation);
@ -2707,28 +2743,6 @@ _remote_debugging_RemoteUnwinder_get_stack_trace_impl(RemoteUnwinderObject *self
_Py_hashtable_clear(self->code_object_cache);
}
// If only_active_thread is true, we need to determine which thread holds the GIL
PyThreadState* gil_holder = NULL;
if (self->only_active_thread) {
// The GIL state is already in interp_state_buffer, just read from there
// Check if GIL is locked
int gil_locked = GET_MEMBER(int, interp_state_buffer,
self->debug_offsets.interpreter_state.gil_runtime_state_locked);
if (gil_locked) {
// Get the last holder (current holder when GIL is locked)
gil_holder = GET_MEMBER(PyThreadState*, interp_state_buffer,
self->debug_offsets.interpreter_state.gil_runtime_state_holder);
} else {
// GIL is not locked, return empty list
result = PyList_New(0);
if (!result) {
set_exception_cause(self, PyExc_MemoryError, "Failed to create empty result list");
}
goto exit;
}
}
#ifdef Py_GIL_DISABLED
// Check TLBC generation and invalidate cache if needed
uint32_t current_tlbc_generation = GET_MEMBER(uint32_t, interp_state_buffer,
@ -2739,47 +2753,101 @@ _remote_debugging_RemoteUnwinder_get_stack_trace_impl(RemoteUnwinderObject *self
}
#endif
// Create a list to hold threads for this interpreter
PyObject *interpreter_threads = PyList_New(0);
if (!interpreter_threads) {
set_exception_cause(self, PyExc_MemoryError, "Failed to create interpreter threads list");
Py_CLEAR(result);
goto exit;
}
uintptr_t current_tstate;
if (self->only_active_thread && gil_holder != NULL) {
// We have the GIL holder, process only that thread
current_tstate = (uintptr_t)gil_holder;
if (self->only_active_thread) {
// Find the GIL holder for THIS interpreter
int gil_locked = GET_MEMBER(int, interp_state_buffer,
self->debug_offsets.interpreter_state.gil_runtime_state_locked);
if (!gil_locked) {
// This interpreter's GIL is not locked, skip it
Py_DECREF(interpreter_threads);
goto next_interpreter;
}
// Get the GIL holder for this interpreter
current_tstate = (uintptr_t)GET_MEMBER(PyThreadState*, interp_state_buffer,
self->debug_offsets.interpreter_state.gil_runtime_state_holder);
} else if (self->tstate_addr == 0) {
// Get threads head from buffer
// Get all threads for this interpreter
current_tstate = GET_MEMBER(uintptr_t, interp_state_buffer,
self->debug_offsets.interpreter_state.threads_head);
} else {
// Target specific thread (only process first interpreter)
current_tstate = self->tstate_addr;
}
result = PyList_New(0);
if (!result) {
set_exception_cause(self, PyExc_MemoryError, "Failed to create stack trace result list");
goto exit;
}
while (current_tstate != 0) {
PyObject* frame_info = unwind_stack_for_thread(self, &current_tstate);
if (!frame_info) {
Py_CLEAR(result);
Py_DECREF(interpreter_threads);
set_exception_cause(self, PyExc_RuntimeError, "Failed to unwind stack for thread");
goto exit;
}
if (PyList_Append(result, frame_info) == -1) {
Py_DECREF(frame_info);
Py_CLEAR(result);
goto exit;
}
if (PyList_Append(interpreter_threads, frame_info) == -1) {
Py_DECREF(frame_info);
Py_DECREF(interpreter_threads);
set_exception_cause(self, PyExc_RuntimeError, "Failed to append thread frame info");
Py_CLEAR(result);
goto exit;
}
Py_DECREF(frame_info);
// We are targeting a single tstate, break here
if (self->tstate_addr) {
// If targeting specific thread or only active thread, process just one
if (self->tstate_addr || self->only_active_thread) {
break;
}
}
// If we're only processing the GIL holder, we're done after one iteration
if (self->only_active_thread && gil_holder != NULL) {
// Create the InterpreterInfo StructSequence
RemoteDebuggingState *state = RemoteDebugging_GetStateFromObject((PyObject*)self);
PyObject *interpreter_info = PyStructSequence_New(state->InterpreterInfo_Type);
if (!interpreter_info) {
Py_DECREF(interpreter_threads);
set_exception_cause(self, PyExc_MemoryError, "Failed to create InterpreterInfo");
Py_CLEAR(result);
goto exit;
}
PyObject *interp_id = PyLong_FromLongLong(interpreter_id);
if (!interp_id) {
Py_DECREF(interpreter_threads);
Py_DECREF(interpreter_info);
set_exception_cause(self, PyExc_MemoryError, "Failed to create interpreter ID");
Py_CLEAR(result);
goto exit;
}
PyStructSequence_SetItem(interpreter_info, 0, interp_id); // steals reference
PyStructSequence_SetItem(interpreter_info, 1, interpreter_threads); // steals reference
// Add this interpreter to the result list
if (PyList_Append(result, interpreter_info) == -1) {
Py_DECREF(interpreter_info);
set_exception_cause(self, PyExc_RuntimeError, "Failed to append interpreter info");
Py_CLEAR(result);
goto exit;
}
Py_DECREF(interpreter_info);
next_interpreter:
// Get the next interpreter address
current_interpreter = GET_MEMBER(uintptr_t, interp_state_buffer,
self->debug_offsets.interpreter_state.next);
// If we're targeting a specific thread, stop after first interpreter
if (self->tstate_addr != 0) {
break;
}
}
@ -3053,6 +3121,14 @@ _remote_debugging_exec(PyObject *m)
return -1;
}
st->InterpreterInfo_Type = PyStructSequence_NewType(&InterpreterInfo_desc);
if (st->InterpreterInfo_Type == NULL) {
return -1;
}
if (PyModule_AddType(m, st->InterpreterInfo_Type) < 0) {
return -1;
}
st->AwaitedInfo_Type = PyStructSequence_NewType(&AwaitedInfo_desc);
if (st->AwaitedInfo_Type == NULL) {
return -1;
@ -3082,6 +3158,7 @@ remote_debugging_traverse(PyObject *mod, visitproc visit, void *arg)
Py_VISIT(state->FrameInfo_Type);
Py_VISIT(state->CoroInfo_Type);
Py_VISIT(state->ThreadInfo_Type);
Py_VISIT(state->InterpreterInfo_Type);
Py_VISIT(state->AwaitedInfo_Type);
return 0;
}
@ -3095,6 +3172,7 @@ remote_debugging_clear(PyObject *mod)
Py_CLEAR(state->FrameInfo_Type);
Py_CLEAR(state->CoroInfo_Type);
Py_CLEAR(state->ThreadInfo_Type);
Py_CLEAR(state->InterpreterInfo_Type);
Py_CLEAR(state->AwaitedInfo_Type);
return 0;
}

View file

@ -125,20 +125,23 @@ PyDoc_STRVAR(_remote_debugging_RemoteUnwinder_get_stack_trace__doc__,
"get_stack_trace($self, /)\n"
"--\n"
"\n"
"Returns a list of stack traces for threads in the target process.\n"
"Returns stack traces for all interpreters and threads in process.\n"
"\n"
"Each element in the returned list is a tuple of (thread_id, frame_list), where:\n"
"Each element in the returned list is a tuple of (interpreter_id, thread_list), where:\n"
"- interpreter_id is the interpreter identifier\n"
"- thread_list is a list of tuples (thread_id, frame_list) for threads in that interpreter\n"
" - thread_id is the OS thread identifier\n"
" - frame_list is a list of tuples (function_name, filename, line_number) representing\n"
" the Python stack frames for that thread, ordered from most recent to oldest\n"
"\n"
"The threads returned depend on the initialization parameters:\n"
"- If only_active_thread was True: returns only the thread holding the GIL\n"
"- If all_threads was True: returns all threads\n"
"- Otherwise: returns only the main thread\n"
"- If only_active_thread was True: returns only the thread holding the GIL across all interpreters\n"
"- If all_threads was True: returns all threads across all interpreters\n"
"- Otherwise: returns only the main thread of each interpreter\n"
"\n"
"Example:\n"
" [\n"
" (0, [ # Main interpreter\n"
" (1234, [\n"
" (\'process_data\', \'worker.py\', 127),\n"
" (\'run_worker\', \'worker.py\', 45),\n"
@ -148,6 +151,12 @@ PyDoc_STRVAR(_remote_debugging_RemoteUnwinder_get_stack_trace__doc__,
" (\'handle_request\', \'server.py\', 89),\n"
" (\'serve_forever\', \'server.py\', 52)\n"
" ])\n"
" ]),\n"
" (1, [ # Sub-interpreter\n"
" (1236, [\n"
" (\'sub_worker\', \'sub.py\', 15)\n"
" ])\n"
" ])\n"
" ]\n"
"\n"
"Raises:\n"
@ -288,4 +297,4 @@ _remote_debugging_RemoteUnwinder_get_async_stack_trace(PyObject *self, PyObject
return return_value;
}
/*[clinic end generated code: output=0dd1e6e8bab2a8b1 input=a9049054013a1b77]*/
/*[clinic end generated code: output=2ba15411abf82c33 input=a9049054013a1b77]*/