Update tests for location tuple and opcode field

Frame location is now a 4-tuple (lineno, end_lineno, col_offset,
end_col_offset). MockFrameInfo wraps locations in LocationInfo struct.
Updates assertions throughout and adds opcode_utils coverage.
This commit is contained in:
Pablo Galindo Salgado 2025-12-03 03:43:47 +00:00
parent 7ffe4cb39e
commit 8b423df632
6 changed files with 924 additions and 160 deletions

View file

@ -155,12 +155,12 @@ def foo():
p.wait(timeout=SHORT_TIMEOUT)
thread_expected_stack_trace = [
FrameInfo([script_name, 15, "foo"]),
FrameInfo([script_name, 12, "baz"]),
FrameInfo([script_name, 9, "bar"]),
FrameInfo([threading.__file__, ANY, "Thread.run"]),
FrameInfo([threading.__file__, ANY, "Thread._bootstrap_inner"]),
FrameInfo([threading.__file__, ANY, "Thread._bootstrap"]),
FrameInfo([script_name, (15, ANY, ANY, ANY), "foo", ANY]),
FrameInfo([script_name, (12, ANY, ANY, ANY), "baz", ANY]),
FrameInfo([script_name, (9, ANY, ANY, ANY), "bar", ANY]),
FrameInfo([threading.__file__, ANY, "Thread.run", ANY]),
FrameInfo([threading.__file__, ANY, "Thread._bootstrap_inner", ANY]),
FrameInfo([threading.__file__, ANY, "Thread._bootstrap", ANY]),
]
# Is possible that there are more threads, so we check that the
# expected stack traces are in the result (looking at you Windows!)
@ -175,7 +175,7 @@ def foo():
self.assertTrue(found_expected_stack, "Expected thread stack trace not found")
# Check that the main thread stack trace is in the result
frame = FrameInfo([script_name, 19, "<module>"])
frame = FrameInfo([script_name, (19, ANY, ANY, ANY), "<module>", ANY])
main_thread_found = False
for interpreter_info in stack_trace:
for thread_info in interpreter_info.threads:
@ -323,6 +323,7 @@ def new_eager_loop():
taskgroups.__file__,
ANY,
"TaskGroup._aexit",
None,
]
),
tuple(
@ -330,21 +331,22 @@ def new_eager_loop():
taskgroups.__file__,
ANY,
"TaskGroup.__aexit__",
None,
]
),
tuple([script_name, 26, "main"]),
tuple([script_name, (26, ANY, ANY, ANY), "main", None]),
)
],
"c2_root": [
(
tuple([script_name, 10, "c5"]),
tuple([script_name, 14, "c4"]),
tuple([script_name, 17, "c3"]),
tuple([script_name, 20, "c2"]),
tuple([script_name, (10, ANY, ANY, ANY), "c5", None]),
tuple([script_name, (14, ANY, ANY, ANY), "c4", None]),
tuple([script_name, (17, ANY, ANY, ANY), "c3", None]),
tuple([script_name, (20, ANY, ANY, ANY), "c2", None]),
)
],
"sub_main_1": [(tuple([script_name, 23, "c1"]),)],
"sub_main_2": [(tuple([script_name, 23, "c1"]),)],
"sub_main_1": [(tuple([script_name, (23, ANY, ANY, ANY), "c1", None]),)],
"sub_main_2": [(tuple([script_name, (23, ANY, ANY, ANY), "c1", None]),)],
},
)
@ -372,6 +374,7 @@ def new_eager_loop():
taskgroups.__file__,
ANY,
"TaskGroup._aexit",
None,
]
),
tuple(
@ -379,13 +382,14 @@ def new_eager_loop():
taskgroups.__file__,
ANY,
"TaskGroup.__aexit__",
None,
]
),
tuple([script_name, 26, "main"]),
tuple([script_name, (26, ANY, ANY, ANY), "main", None]),
),
),
("sub_main_1", (tuple([script_name, 23, "c1"]),)),
("sub_main_2", (tuple([script_name, 23, "c1"]),)),
("sub_main_1", (tuple([script_name, (23, ANY, ANY, ANY), "c1", None]),)),
("sub_main_2", (tuple([script_name, (23, ANY, ANY, ANY), "c1", None]),)),
],
"sub_main_1": [
(
@ -396,6 +400,7 @@ def new_eager_loop():
taskgroups.__file__,
ANY,
"TaskGroup._aexit",
None,
]
),
tuple(
@ -403,9 +408,10 @@ def new_eager_loop():
taskgroups.__file__,
ANY,
"TaskGroup.__aexit__",
None,
]
),
tuple([script_name, 26, "main"]),
tuple([script_name, (26, ANY, ANY, ANY), "main", None]),
),
)
],
@ -418,6 +424,7 @@ def new_eager_loop():
taskgroups.__file__,
ANY,
"TaskGroup._aexit",
None,
]
),
tuple(
@ -425,9 +432,10 @@ def new_eager_loop():
taskgroups.__file__,
ANY,
"TaskGroup.__aexit__",
None,
]
),
tuple([script_name, 26, "main"]),
tuple([script_name, (26, ANY, ANY, ANY), "main", None]),
),
)
],
@ -512,9 +520,9 @@ async def main():
coroutine_stack,
[
(
tuple([script_name, 10, "gen_nested_call"]),
tuple([script_name, 16, "gen"]),
tuple([script_name, 19, "main"]),
tuple([script_name, (10, ANY, ANY, ANY), "gen_nested_call", None]),
tuple([script_name, (16, ANY, ANY, ANY), "gen", None]),
tuple([script_name, (19, ANY, ANY, ANY), "main", None]),
)
],
)
@ -624,11 +632,11 @@ async def main():
self.assertEqual(
coroutine_stacks,
{
"Task-1": [(tuple([script_name, 21, "main"]),)],
"Task-1": [(tuple([script_name, (21, ANY, ANY, ANY), "main", None]),)],
"Task-2": [
(
tuple([script_name, 11, "deep"]),
tuple([script_name, 15, "c1"]),
tuple([script_name, (11, ANY, ANY, ANY), "deep", None]),
tuple([script_name, (15, ANY, ANY, ANY), "c1", None]),
)
],
},
@ -650,7 +658,7 @@ async def main():
{
"Task-1": [],
"Task-2": [
("Task-1", (tuple([script_name, 21, "main"]),))
("Task-1", (tuple([script_name, (21, ANY, ANY, ANY), "main", None]),))
],
},
)
@ -762,19 +770,20 @@ async def main():
{
"Task-1": [
(
tuple([staggered.__file__, ANY, "staggered_race"]),
tuple([script_name, 21, "main"]),
tuple([staggered.__file__, ANY, "staggered_race", None]),
tuple([script_name, (21, ANY, ANY, ANY), "main", None]),
)
],
"Task-2": [
(
tuple([script_name, 11, "deep"]),
tuple([script_name, 15, "c1"]),
tuple([script_name, (11, ANY, ANY, ANY), "deep", None]),
tuple([script_name, (15, ANY, ANY, ANY), "c1", None]),
tuple(
[
staggered.__file__,
ANY,
"staggered_race.<locals>.run_one_coro",
None,
]
),
)
@ -802,9 +811,9 @@ async def main():
"Task-1",
(
tuple(
[staggered.__file__, ANY, "staggered_race"]
[staggered.__file__, ANY, "staggered_race", None]
),
tuple([script_name, 21, "main"]),
tuple([script_name, (21, ANY, ANY, ANY), "main", None]),
),
)
],
@ -938,11 +947,11 @@ async def main():
self.assertGreaterEqual(len(entries), 1000)
# the first three tasks stem from the code structure
main_stack = [
FrameInfo([taskgroups.__file__, ANY, "TaskGroup._aexit"]),
FrameInfo([taskgroups.__file__, ANY, "TaskGroup._aexit", ANY]),
FrameInfo(
[taskgroups.__file__, ANY, "TaskGroup.__aexit__"]
[taskgroups.__file__, ANY, "TaskGroup.__aexit__", ANY]
),
FrameInfo([script_name, 60, "main"]),
FrameInfo([script_name, (60, ANY, ANY, ANY), "main", ANY]),
]
self.assertIn(
TaskInfo(
@ -964,6 +973,7 @@ async def main():
base_events.__file__,
ANY,
"Server.serve_forever",
ANY,
]
)
],
@ -980,6 +990,7 @@ async def main():
taskgroups.__file__,
ANY,
"TaskGroup._aexit",
ANY,
]
),
FrameInfo(
@ -987,11 +998,10 @@ async def main():
taskgroups.__file__,
ANY,
"TaskGroup.__aexit__",
ANY,
]
),
FrameInfo(
[script_name, ANY, "main"]
),
FrameInfo([script_name, ANY, "main", ANY]),
],
ANY,
]
@ -1010,14 +1020,13 @@ async def main():
CoroInfo(
[
[
FrameInfo(
[tasks.__file__, ANY, "sleep"]
),
FrameInfo([tasks.__file__, ANY, "sleep", ANY]),
FrameInfo(
[
script_name,
38,
(38, ANY, ANY, ANY),
"echo_client",
ANY,
]
),
],
@ -1034,6 +1043,7 @@ async def main():
taskgroups.__file__,
ANY,
"TaskGroup._aexit",
ANY,
]
),
FrameInfo(
@ -1041,13 +1051,15 @@ async def main():
taskgroups.__file__,
ANY,
"TaskGroup.__aexit__",
ANY,
]
),
FrameInfo(
[
script_name,
41,
(41, ANY, ANY, ANY),
"echo_client_spam",
ANY,
]
),
],
@ -1069,6 +1081,7 @@ async def main():
taskgroups.__file__,
ANY,
"TaskGroup._aexit",
ANY,
]
),
FrameInfo(
@ -1076,11 +1089,10 @@ async def main():
taskgroups.__file__,
ANY,
"TaskGroup.__aexit__",
ANY,
]
),
FrameInfo(
[script_name, 41, "echo_client_spam"]
),
FrameInfo([script_name, (41, ANY, ANY, ANY), "echo_client_spam", ANY]),
],
ANY,
]
@ -1137,20 +1149,223 @@ def test_self_trace(self):
FrameInfo(
[
__file__,
get_stack_trace.__code__.co_firstlineno + 2,
(get_stack_trace.__code__.co_firstlineno + 2, ANY, ANY, ANY),
"get_stack_trace",
ANY,
]
),
FrameInfo(
[
__file__,
self.test_self_trace.__code__.co_firstlineno + 6,
(self.test_self_trace.__code__.co_firstlineno + 6, ANY, ANY, ANY),
"TestGetStackTrace.test_self_trace",
ANY,
]
),
],
)
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_opcodes_collection(self):
"""Test that opcodes are collected when the opcodes flag is set."""
port = find_unused_port()
script = textwrap.dedent(
f"""\
import time
import sys
import socket
def compute():
# Do some work that involves bytecode execution
total = 0
for i in range(1000):
total += i
return total
def bar():
compute()
def foo():
bar()
# Signal that we're ready
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
sock.sendall(b"ready")
sock.close()
# Keep computing in a loop
while True:
foo()
"""
)
with os.fdopen(os.dup(1), "w") as stdout:
with subprocess.Popen(
[sys.executable, "-c", script],
stdout=stdout,
stderr=stdout,
text=True,
) as p:
client_socket = None
try:
# Accept the ready signal
server_socket = socket.socket(
socket.AF_INET, socket.SOCK_STREAM
)
server_socket.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEADDR, 1
)
server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
client_socket, _ = server_socket.accept()
client_socket.settimeout(SHORT_TIMEOUT)
response = client_socket.recv(1024)
self.assertEqual(response, b"ready")
server_socket.close()
# Get stack trace with opcodes=True
unwinder = RemoteUnwinder(p.pid, opcodes=True)
stack_trace = unwinder.get_stack_trace()
# Find the thread with our compute/bar/foo stack
found_opcodes = False
for interpreter_info in stack_trace:
for thread_info in interpreter_info.threads:
for frame in thread_info.frame_info:
# Check that frames have opcodes (not None)
# when opcodes=True is set
if frame.funcname in ("compute", "bar", "foo"):
# Opcode should be an integer, not None
self.assertIsInstance(
frame.opcode,
int,
f"Expected opcode to be int for {frame.funcname}, got {type(frame.opcode)}"
)
self.assertGreaterEqual(frame.opcode, 0)
found_opcodes = True
self.assertTrue(
found_opcodes,
"Did not find any frames with opcodes from compute/bar/foo"
)
finally:
if client_socket is not None:
client_socket.close()
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_location_extraction(self):
"""Test that location tuples (lineno, end_lineno, col_offset, end_col_offset) are correctly extracted."""
port = find_unused_port()
# Script with predictable column positions
# Line 1: import time, sys, socket
# Line 2: (empty or comment)
# ...
# The key is foo() function where we can predict column offsets
script = textwrap.dedent(
f"""\
import time, sys, socket
def foo():
x = 1 + 2
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
sock.sendall(b"ready")
time.sleep(10_000)
foo()
"""
)
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(("localhost", port))
server_socket.settimeout(SHORT_TIMEOUT)
server_socket.listen(1)
script_name = _make_test_script(script_dir, "script", script)
client_socket = None
try:
p = subprocess.Popen([sys.executable, script_name])
client_socket, _ = server_socket.accept()
server_socket.close()
response = client_socket.recv(1024)
self.assertEqual(response, b"ready")
# Get stack trace with opcodes to ensure we get full location info
unwinder = RemoteUnwinder(p.pid, opcodes=True)
stack_trace = unwinder.get_stack_trace()
# Find the foo function frame
foo_frame = None
for interpreter_info in stack_trace:
for thread_info in interpreter_info.threads:
for frame in thread_info.frame_info:
if frame.funcname == "foo":
foo_frame = frame
break
if foo_frame:
break
if foo_frame:
break
self.assertIsNotNone(foo_frame, "Should find 'foo' function in stack trace")
# Verify location is a tuple with 4 elements
location = foo_frame.location
self.assertIsInstance(location, tuple, "location should be a tuple")
self.assertEqual(len(location), 4, "location should have 4 elements")
lineno, end_lineno, col_offset, end_col_offset = location
# Verify lineno is reasonable (should be line 8 where time.sleep is)
self.assertIsInstance(lineno, int, "lineno should be an integer")
self.assertEqual(lineno, 8, "lineno should be 8 (time.sleep line)")
# Verify end_lineno
self.assertIsInstance(end_lineno, int, "end_lineno should be an integer")
self.assertGreaterEqual(end_lineno, lineno, "end_lineno should be >= lineno")
# Verify col_offset and end_col_offset are integers
# They may be -1 if not available, or valid column offsets
self.assertIsInstance(col_offset, int, "col_offset should be an integer")
self.assertIsInstance(end_col_offset, int, "end_col_offset should be an integer")
# If column info is available (not -1), verify it's reasonable
if col_offset >= 0:
self.assertLess(col_offset, 100, "col_offset should be reasonable")
if end_col_offset >= 0:
self.assertLess(end_col_offset, 100, "end_col_offset should be reasonable")
if col_offset >= 0:
self.assertGreaterEqual(end_col_offset, col_offset,
"end_col_offset should be >= col_offset")
except PermissionError:
self.skipTest("Insufficient permissions to read the stack trace")
finally:
if client_socket is not None:
client_socket.close()
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
@ -1605,7 +1820,7 @@ def main_work():
current_frame = thread_info.frame_info[0]
if (
current_frame.funcname == "main_work"
and current_frame.lineno > 15
and current_frame.location[0] > 15
):
found = True
break

View file

@ -4,8 +4,12 @@
import shutil
import tempfile
import unittest
from collections import namedtuple
from pathlib import Path
# Matches the C structseq LocationInfo from _remote_debugging
LocationInfo = namedtuple('LocationInfo', ['lineno', 'end_lineno', 'col_offset', 'end_col_offset'])
from profiling.sampling.heatmap_collector import (
HeatmapCollector,
get_python_path_info,
@ -220,7 +224,7 @@ def test_process_frames_increments_total_samples(self):
collector = HeatmapCollector(sample_interval_usec=100)
initial_count = collector._total_samples
frames = [('file.py', 10, 'func')]
frames = [('file.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
self.assertEqual(collector._total_samples, initial_count + 1)
@ -229,7 +233,7 @@ def test_process_frames_records_line_samples(self):
"""Test that process_frames records line samples."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', 5, 'test_func')]
frames = [('test.py', (5, 5, -1, -1), 'test_func', None)]
collector.process_frames(frames, thread_id=1)
# Check that line was recorded
@ -241,9 +245,9 @@ def test_process_frames_records_multiple_lines_in_stack(self):
collector = HeatmapCollector(sample_interval_usec=100)
frames = [
('file1.py', 10, 'func1'),
('file2.py', 20, 'func2'),
('file3.py', 30, 'func3')
('file1.py', (10, 10, -1, -1), 'func1', None),
('file2.py', (20, 20, -1, -1), 'func2', None),
('file3.py', (30, 30, -1, -1), 'func3', None)
]
collector.process_frames(frames, thread_id=1)
@ -257,8 +261,8 @@ def test_process_frames_distinguishes_self_samples(self):
collector = HeatmapCollector(sample_interval_usec=100)
frames = [
('leaf.py', 5, 'leaf_func'), # This is the leaf (top of stack)
('caller.py', 10, 'caller_func')
('leaf.py', (5, 5, -1, -1), 'leaf_func', None), # This is the leaf (top of stack)
('caller.py', (10, 10, -1, -1), 'caller_func', None)
]
collector.process_frames(frames, thread_id=1)
@ -273,7 +277,7 @@ def test_process_frames_accumulates_samples(self):
"""Test that multiple calls accumulate samples."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('file.py', 10, 'func')]
frames = [('file.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
collector.process_frames(frames, thread_id=1)
@ -288,11 +292,11 @@ def test_process_frames_ignores_invalid_frames(self):
# These should be ignored
invalid_frames = [
('<string>', 1, 'test'),
('[eval]', 1, 'test'),
('', 1, 'test'),
(None, 1, 'test'),
('__init__', 0, 'test'), # Special invalid frame
('<string>', (1, 1, -1, -1), 'test', None),
('[eval]', (1, 1, -1, -1), 'test', None),
('', (1, 1, -1, -1), 'test', None),
(None, (1, 1, -1, -1), 'test', None),
('__init__', (0, 0, -1, -1), 'test', None), # Special invalid frame
]
for frame in invalid_frames:
@ -301,15 +305,15 @@ def test_process_frames_ignores_invalid_frames(self):
# Should not record these invalid frames
for frame in invalid_frames:
if frame[0]:
self.assertNotIn((frame[0], frame[1]), collector.line_samples)
self.assertNotIn((frame[0], frame[1][0]), collector.line_samples)
def test_process_frames_builds_call_graph(self):
"""Test that process_frames builds call graph relationships."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [
('callee.py', 5, 'callee_func'),
('caller.py', 10, 'caller_func')
('callee.py', (5, 5, -1, -1), 'callee_func', None),
('caller.py', (10, 10, -1, -1), 'caller_func', None)
]
collector.process_frames(frames, thread_id=1)
@ -325,7 +329,7 @@ def test_process_frames_records_function_definitions(self):
"""Test that process_frames records function definition locations."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('module.py', 42, 'my_function')]
frames = [('module.py', (42, 42, -1, -1), 'my_function', None)]
collector.process_frames(frames, thread_id=1)
self.assertIn(('module.py', 'my_function'), collector.function_definitions)
@ -336,8 +340,8 @@ def test_process_frames_tracks_edge_samples(self):
collector = HeatmapCollector(sample_interval_usec=100)
frames = [
('callee.py', 5, 'callee'),
('caller.py', 10, 'caller')
('callee.py', (5, 5, -1, -1), 'callee', None),
('caller.py', (10, 10, -1, -1), 'caller', None)
]
# Process same call stack multiple times
@ -361,7 +365,7 @@ def test_process_frames_with_file_samples_dict(self):
"""Test that file_samples dict is properly populated."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', 10, 'func')]
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
self.assertIn('test.py', collector.file_samples)
@ -382,7 +386,7 @@ def test_export_creates_output_directory(self):
collector = HeatmapCollector(sample_interval_usec=100)
# Add some data
frames = [('test.py', 10, 'func')]
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'heatmap_output')
@ -397,7 +401,7 @@ def test_export_creates_index_html(self):
"""Test that export creates index.html."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', 10, 'func')]
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'heatmap_output')
@ -412,7 +416,7 @@ def test_export_creates_file_htmls(self):
"""Test that export creates individual file HTMLs."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', 10, 'func')]
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'heatmap_output')
@ -439,7 +443,7 @@ def test_export_handles_html_suffix(self):
"""Test that export handles .html suffix in output path."""
collector = HeatmapCollector(sample_interval_usec=100)
frames = [('test.py', 10, 'func')]
frames = [('test.py', (10, 10, -1, -1), 'func', None)]
collector.process_frames(frames, thread_id=1)
# Path with .html suffix should be stripped
@ -457,9 +461,9 @@ def test_export_with_multiple_files(self):
collector = HeatmapCollector(sample_interval_usec=100)
# Add samples for multiple files
collector.process_frames([('file1.py', 10, 'func1')], thread_id=1)
collector.process_frames([('file2.py', 20, 'func2')], thread_id=1)
collector.process_frames([('file3.py', 30, 'func3')], thread_id=1)
collector.process_frames([('file1.py', (10, 10, -1, -1), 'func1', None)], thread_id=1)
collector.process_frames([('file2.py', (20, 20, -1, -1), 'func2', None)], thread_id=1)
collector.process_frames([('file3.py', (30, 30, -1, -1), 'func3', None)], thread_id=1)
output_path = os.path.join(self.test_dir, 'multi_file')
@ -476,7 +480,7 @@ def test_export_index_contains_file_references(self):
collector = HeatmapCollector(sample_interval_usec=100)
collector.set_stats(sample_interval_usec=100, duration_sec=1.0, sample_rate=100.0)
frames = [('mytest.py', 10, 'my_func')]
frames = [('mytest.py', (10, 10, -1, -1), 'my_func', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'test_output')
@ -500,7 +504,7 @@ def test_export_file_html_has_line_numbers(self):
with open(temp_file, 'w') as f:
f.write('def test():\n pass\n')
frames = [(temp_file, 1, 'test')]
frames = [(temp_file, (1, 1, -1, -1), 'test', None)]
collector.process_frames(frames, thread_id=1)
output_path = os.path.join(self.test_dir, 'line_test')
@ -521,23 +525,39 @@ def test_export_file_html_has_line_numbers(self):
class MockFrameInfo:
"""Mock FrameInfo for testing since the real one isn't accessible."""
"""Mock FrameInfo for testing.
def __init__(self, filename, lineno, funcname):
Frame format: (filename, location, funcname, opcode) where:
- location is a tuple (lineno, end_lineno, col_offset, end_col_offset)
- opcode is an int or None
"""
def __init__(self, filename, lineno, funcname, opcode=None):
self.filename = filename
self.lineno = lineno
self.funcname = funcname
self.opcode = opcode
self.location = (lineno, lineno, -1, -1)
def __iter__(self):
return iter((self.filename, self.location, self.funcname, self.opcode))
def __getitem__(self, index):
return (self.filename, self.location, self.funcname, self.opcode)[index]
def __len__(self):
return 4
def __repr__(self):
return f"MockFrameInfo(filename='{self.filename}', lineno={self.lineno}, funcname='{self.funcname}')"
return f"MockFrameInfo('{self.filename}', {self.location}, '{self.funcname}', {self.opcode})"
class MockThreadInfo:
"""Mock ThreadInfo for testing since the real one isn't accessible."""
def __init__(self, thread_id, frame_info):
def __init__(self, thread_id, frame_info, status=0):
self.thread_id = thread_id
self.frame_info = frame_info
self.status = status # Thread status flags
def __repr__(self):
return f"MockThreadInfo(thread_id={self.thread_id}, frame_info={self.frame_info})"
@ -565,13 +585,13 @@ def test_heatmap_collector_basic(self):
self.assertEqual(len(collector.file_samples), 0)
self.assertEqual(len(collector.line_samples), 0)
# Test collecting sample data
# Test collecting sample data - frames are 4-tuples: (filename, location, funcname, opcode)
test_frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(
1,
[("file.py", 10, "func1"), ("file.py", 20, "func2")],
[MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")],
)]
)
]
@ -592,21 +612,21 @@ def test_heatmap_collector_export(self):
collector = HeatmapCollector(sample_interval_usec=100)
# Create test data with multiple files
# Create test data with multiple files using MockFrameInfo
test_frames1 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])],
[MockThreadInfo(1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")])],
)
]
test_frames2 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [("file.py", 10, "func1"), ("file.py", 20, "func2")])],
[MockThreadInfo(1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")])],
)
] # Same stack
test_frames3 = [
MockInterpreterInfo(0, [MockThreadInfo(1, [("other.py", 5, "other_func")])])
MockInterpreterInfo(0, [MockThreadInfo(1, [MockFrameInfo("other.py", 5, "other_func")])])
]
collector.collect(test_frames1)
@ -649,5 +669,95 @@ def test_heatmap_collector_export(self):
self.assertIn("nav-btn", file_content)
class TestHeatmapCollectorLocation(unittest.TestCase):
"""Tests for HeatmapCollector location handling."""
def test_heatmap_with_full_location_info(self):
"""Test HeatmapCollector uses full location tuple."""
collector = HeatmapCollector(sample_interval_usec=1000)
# Frame with full location: (lineno, end_lineno, col_offset, end_col_offset)
frame = MockFrameInfo("test.py", 10, "func")
# Override with full location info
frame.location = LocationInfo(10, 15, 4, 20)
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame])]
)
]
collector.collect(frames)
# Verify data was collected with location info
# HeatmapCollector uses file_samples dict with filename -> Counter of linenos
self.assertIn("test.py", collector.file_samples)
# Line 10 should have samples
self.assertIn(10, collector.file_samples["test.py"])
def test_heatmap_with_none_location(self):
"""Test HeatmapCollector handles None location gracefully."""
collector = HeatmapCollector(sample_interval_usec=1000)
# Synthetic frame with None location
frame = MockFrameInfo("~", 0, "<native>")
frame.location = None
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame])]
)
]
# Should not raise
collector.collect(frames)
def test_heatmap_export_with_location_data(self):
"""Test HeatmapCollector export includes location info."""
tmp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmp_dir)
collector = HeatmapCollector(sample_interval_usec=1000)
frame = MockFrameInfo("test.py", 10, "process")
frame.location = LocationInfo(10, 12, 0, 30)
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame])]
)
]
collector.collect(frames)
# Export should work
with (captured_stdout(), captured_stderr()):
collector.export(tmp_dir)
self.assertTrue(os.path.exists(os.path.join(tmp_dir, "index.html")))
def test_heatmap_collector_frame_format(self):
"""Test HeatmapCollector with 4-element frame format."""
collector = HeatmapCollector(sample_interval_usec=1000)
frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("app.py", 100, "main", opcode=90),
MockFrameInfo("utils.py", 50, "helper", opcode=100),
MockFrameInfo("lib.py", 25, "process", opcode=None),
],
)
],
)
]
collector.collect(frames)
# Should have recorded data for the files
self.assertIn("app.py", collector.file_samples)
self.assertIn("utils.py", collector.file_samples)
self.assertIn("lib.py", collector.file_samples)
if __name__ == "__main__":
unittest.main()

View file

@ -5,17 +5,7 @@
THREAD_STATUS_ON_CPU,
)
class MockFrameInfo:
"""Mock FrameInfo for testing."""
def __init__(self, filename, lineno, funcname):
self.filename = filename
self.lineno = lineno
self.funcname = funcname
def __repr__(self):
return f"MockFrameInfo(filename='{self.filename}', lineno={self.lineno}, funcname='{self.funcname}')"
from .mocks import LocationInfo, MockFrameInfo
class MockThreadInfo:

View file

@ -1,16 +1,36 @@
"""Mock classes for sampling profiler tests."""
from collections import namedtuple
# Matches the C structseq LocationInfo from _remote_debugging
LocationInfo = namedtuple('LocationInfo', ['lineno', 'end_lineno', 'col_offset', 'end_col_offset'])
class MockFrameInfo:
"""Mock FrameInfo for testing since the real one isn't accessible."""
"""Mock FrameInfo for testing.
def __init__(self, filename, lineno, funcname):
Frame format: (filename, location, funcname, opcode) where:
- location is a tuple (lineno, end_lineno, col_offset, end_col_offset)
- opcode is an int or None
"""
def __init__(self, filename, lineno, funcname, opcode=None):
self.filename = filename
self.lineno = lineno
self.funcname = funcname
self.opcode = opcode
self.location = LocationInfo(lineno, lineno, -1, -1)
def __iter__(self):
return iter((self.filename, self.location, self.funcname, self.opcode))
def __getitem__(self, index):
return (self.filename, self.location, self.funcname, self.opcode)[index]
def __len__(self):
return 4
def __repr__(self):
return f"MockFrameInfo(filename='{self.filename}', lineno={self.lineno}, funcname='{self.funcname}')"
return f"MockFrameInfo('{self.filename}', {self.location}, '{self.funcname}', {self.opcode})"
class MockThreadInfo:

View file

@ -14,9 +14,12 @@
FlamegraphCollector,
)
from profiling.sampling.gecko_collector import GeckoCollector
from profiling.sampling.collector import extract_lineno, normalize_location
from profiling.sampling.opcode_utils import get_opcode_info, format_opcode
from profiling.sampling.constants import (
PROFILING_MODE_WALL,
PROFILING_MODE_CPU,
DEFAULT_LOCATION,
)
from _remote_debugging import (
THREAD_STATUS_HAS_GIL,
@ -30,7 +33,7 @@
from test.support import captured_stdout, captured_stderr
from .mocks import MockFrameInfo, MockThreadInfo, MockInterpreterInfo
from .mocks import MockFrameInfo, MockThreadInfo, MockInterpreterInfo, LocationInfo
from .helpers import close_and_unlink
@ -42,9 +45,8 @@ def test_mock_frame_info_with_empty_and_unicode_values(self):
# Test with empty strings
frame = MockFrameInfo("", 0, "")
self.assertEqual(frame.filename, "")
self.assertEqual(frame.lineno, 0)
self.assertEqual(frame.location.lineno, 0)
self.assertEqual(frame.funcname, "")
self.assertIn("filename=''", repr(frame))
# Test with unicode characters
frame = MockFrameInfo("文件.py", 42, "函数名")
@ -56,7 +58,7 @@ def test_mock_frame_info_with_empty_and_unicode_values(self):
long_funcname = "func_" + "x" * 1000
frame = MockFrameInfo(long_filename, 999999, long_funcname)
self.assertEqual(frame.filename, long_filename)
self.assertEqual(frame.lineno, 999999)
self.assertEqual(frame.location.lineno, 999999)
self.assertEqual(frame.funcname, long_funcname)
def test_pstats_collector_with_extreme_intervals_and_empty_data(self):
@ -78,7 +80,7 @@ def test_pstats_collector_with_extreme_intervals_and_empty_data(self):
test_frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(None, [MockFrameInfo("file.py", 10, "func")])],
[MockThreadInfo(None, [MockFrameInfo("file.py", 10, "func", None)])],
)
]
collector.collect(test_frames)
@ -193,7 +195,7 @@ def test_collapsed_stack_collector_with_empty_and_deep_stacks(self):
# Test with single frame stack
test_frames = [
MockInterpreterInfo(
0, [MockThreadInfo(1, [("file.py", 10, "func")])]
0, [MockThreadInfo(1, [MockFrameInfo("file.py", 10, "func")])]
)
]
collector.collect(test_frames)
@ -204,7 +206,7 @@ def test_collapsed_stack_collector_with_empty_and_deep_stacks(self):
self.assertEqual(count, 1)
# Test with very deep stack
deep_stack = [(f"file{i}.py", i, f"func{i}") for i in range(100)]
deep_stack = [MockFrameInfo(f"file{i}.py", i, f"func{i}") for i in range(100)]
test_frames = [MockInterpreterInfo(0, [MockThreadInfo(1, deep_stack)])]
collector = CollapsedStackCollector(1000)
collector.collect(test_frames)
@ -317,7 +319,7 @@ def test_collapsed_stack_collector_basic(self):
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")]
)
],
)
@ -343,7 +345,7 @@ def test_collapsed_stack_collector_export(self):
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")]
)
],
)
@ -353,14 +355,14 @@ def test_collapsed_stack_collector_export(self):
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")]
)
],
)
] # Same stack
test_frames3 = [
MockInterpreterInfo(
0, [MockThreadInfo(1, [("other.py", 5, "other_func")])]
0, [MockThreadInfo(1, [MockFrameInfo("other.py", 5, "other_func")])]
)
]
@ -406,7 +408,7 @@ def test_flamegraph_collector_basic(self):
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")]
)
],
)
@ -454,7 +456,7 @@ def test_flamegraph_collector_export(self):
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")]
)
],
)
@ -464,14 +466,14 @@ def test_flamegraph_collector_export(self):
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")]
)
],
)
] # Same stack
test_frames3 = [
MockInterpreterInfo(
0, [MockThreadInfo(1, [("other.py", 5, "other_func")])]
0, [MockThreadInfo(1, [MockFrameInfo("other.py", 5, "other_func")])]
)
]
@ -518,7 +520,7 @@ def test_gecko_collector_basic(self):
[
MockThreadInfo(
1,
[("file.py", 10, "func1"), ("file.py", 20, "func2")],
[MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")],
)
],
)
@ -608,7 +610,7 @@ def test_gecko_collector_export(self):
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")]
)
],
)
@ -618,14 +620,14 @@ def test_gecko_collector_export(self):
0,
[
MockThreadInfo(
1, [("file.py", 10, "func1"), ("file.py", 20, "func2")]
1, [MockFrameInfo("file.py", 10, "func1"), MockFrameInfo("file.py", 20, "func2")]
)
],
)
] # Same stack
test_frames3 = [
MockInterpreterInfo(
0, [MockThreadInfo(1, [("other.py", 5, "other_func")])]
0, [MockThreadInfo(1, [MockFrameInfo("other.py", 5, "other_func")])]
)
]
@ -683,7 +685,7 @@ def test_gecko_collector_markers(self):
[
MockThreadInfo(
1,
[("test.py", 10, "python_func")],
[MockFrameInfo("test.py", 10, "python_func")],
status=HAS_GIL_ON_CPU,
)
],
@ -698,7 +700,7 @@ def test_gecko_collector_markers(self):
[
MockThreadInfo(
1,
[("test.py", 15, "wait_func")],
[MockFrameInfo("test.py", 15, "wait_func")],
status=WAITING_FOR_GIL,
)
],
@ -713,7 +715,7 @@ def test_gecko_collector_markers(self):
[
MockThreadInfo(
1,
[("test.py", 20, "python_func2")],
[MockFrameInfo("test.py", 20, "python_func2")],
status=HAS_GIL_ON_CPU,
)
],
@ -728,7 +730,7 @@ def test_gecko_collector_markers(self):
[
MockThreadInfo(
1,
[("native.c", 100, "native_func")],
[MockFrameInfo("native.c", 100, "native_func")],
status=NO_GIL_ON_CPU,
)
],
@ -902,8 +904,8 @@ def test_flamegraph_collector_stats_accumulation(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(2, [("b.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
MockThreadInfo(1, [MockFrameInfo("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(2, [MockFrameInfo("b.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
],
)
]
@ -917,9 +919,9 @@ def test_flamegraph_collector_stats_accumulation(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 1, "func_a")], status=THREAD_STATUS_GIL_REQUESTED),
MockThreadInfo(2, [("b.py", 2, "func_b")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(3, [("c.py", 3, "func_c")], status=THREAD_STATUS_ON_CPU),
MockThreadInfo(1, [MockFrameInfo("a.py", 1, "func_a")], status=THREAD_STATUS_GIL_REQUESTED),
MockThreadInfo(2, [MockFrameInfo("b.py", 2, "func_b")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(3, [MockFrameInfo("c.py", 3, "func_c")], status=THREAD_STATUS_ON_CPU),
],
)
]
@ -936,7 +938,7 @@ def test_flamegraph_collector_stats_accumulation(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("~", 0, "<GC>")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(1, [MockFrameInfo("~", 0, "<GC>")], status=THREAD_STATUS_HAS_GIL),
],
)
]
@ -960,9 +962,9 @@ def test_flamegraph_collector_per_thread_stats(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(2, [("b.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
MockThreadInfo(3, [("c.py", 3, "func_c")], status=THREAD_STATUS_GIL_REQUESTED),
MockThreadInfo(1, [MockFrameInfo("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(2, [MockFrameInfo("b.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
MockThreadInfo(3, [MockFrameInfo("c.py", 3, "func_c")], status=THREAD_STATUS_GIL_REQUESTED),
],
)
]
@ -992,7 +994,7 @@ def test_flamegraph_collector_per_thread_stats(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
MockThreadInfo(1, [MockFrameInfo("a.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
],
)
]
@ -1012,7 +1014,7 @@ def test_flamegraph_collector_percentage_calculations(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 1, "func")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(1, [MockFrameInfo("a.py", 1, "func")], status=THREAD_STATUS_HAS_GIL),
],
)
]
@ -1023,7 +1025,7 @@ def test_flamegraph_collector_percentage_calculations(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 1, "func")], status=THREAD_STATUS_ON_CPU),
MockThreadInfo(1, [MockFrameInfo("a.py", 1, "func")], status=THREAD_STATUS_ON_CPU),
],
)
]
@ -1046,7 +1048,7 @@ def test_flamegraph_collector_mode_handling(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 1, "func")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(1, [MockFrameInfo("a.py", 1, "func")], status=THREAD_STATUS_HAS_GIL),
],
)
]
@ -1085,8 +1087,8 @@ def test_flamegraph_collector_json_structure_includes_stats(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(2, [("b.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
MockThreadInfo(1, [MockFrameInfo("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(2, [MockFrameInfo("b.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
],
)
]
@ -1142,13 +1144,13 @@ def test_flamegraph_collector_per_thread_gc_percentage(self):
# First 5 samples: both threads, thread 1 has GC in 2
for i in range(5):
has_gc = i < 2 # First 2 samples have GC for thread 1
frames_1 = [("~", 0, "<GC>")] if has_gc else [("a.py", 1, "func_a")]
frames_1 = [MockFrameInfo("~", 0, "<GC>")] if has_gc else [MockFrameInfo("a.py", 1, "func_a")]
stack_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(1, frames_1, status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(2, [("b.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
MockThreadInfo(2, [MockFrameInfo("b.py", 2, "func_b")], status=THREAD_STATUS_ON_CPU),
],
)
]
@ -1162,8 +1164,8 @@ def test_flamegraph_collector_per_thread_gc_percentage(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(2, [("~", 0, "<GC>")], status=THREAD_STATUS_ON_CPU),
MockThreadInfo(1, [MockFrameInfo("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(2, [MockFrameInfo("~", 0, "<GC>")], status=THREAD_STATUS_ON_CPU),
],
)
]
@ -1173,7 +1175,7 @@ def test_flamegraph_collector_per_thread_gc_percentage(self):
MockInterpreterInfo(
0,
[
MockThreadInfo(1, [("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
MockThreadInfo(1, [MockFrameInfo("a.py", 1, "func_a")], status=THREAD_STATUS_HAS_GIL),
],
)
]
@ -1201,3 +1203,434 @@ def test_flamegraph_collector_per_thread_gc_percentage(self):
self.assertEqual(collector.per_thread_stats[2]["gc_samples"], 1)
self.assertEqual(collector.per_thread_stats[2]["total"], 6)
self.assertAlmostEqual(per_thread_stats[2]["gc_pct"], 10.0, places=1)
class TestLocationHelpers(unittest.TestCase):
"""Tests for location handling helper functions."""
def test_extract_lineno_from_location_info(self):
"""Test extracting lineno from LocationInfo namedtuple."""
loc = LocationInfo(42, 45, 0, 10)
self.assertEqual(extract_lineno(loc), 42)
def test_extract_lineno_from_tuple(self):
"""Test extracting lineno from plain tuple."""
loc = (100, 105, 5, 20)
self.assertEqual(extract_lineno(loc), 100)
def test_extract_lineno_from_none(self):
"""Test extracting lineno from None (synthetic frames)."""
self.assertEqual(extract_lineno(None), 0)
def test_normalize_location_with_location_info(self):
"""Test normalize_location passes through LocationInfo."""
loc = LocationInfo(10, 15, 0, 5)
result = normalize_location(loc)
self.assertEqual(result, loc)
def test_normalize_location_with_tuple(self):
"""Test normalize_location passes through tuple."""
loc = (10, 15, 0, 5)
result = normalize_location(loc)
self.assertEqual(result, loc)
def test_normalize_location_with_none(self):
"""Test normalize_location returns DEFAULT_LOCATION for None."""
result = normalize_location(None)
self.assertEqual(result, DEFAULT_LOCATION)
self.assertEqual(result, (0, 0, -1, -1))
class TestOpcodeFormatting(unittest.TestCase):
"""Tests for opcode formatting utilities."""
def test_get_opcode_info_standard_opcode(self):
"""Test get_opcode_info for a standard opcode."""
import opcode
# LOAD_CONST is a standard opcode
load_const = opcode.opmap.get('LOAD_CONST')
if load_const is not None:
info = get_opcode_info(load_const)
self.assertEqual(info['opname'], 'LOAD_CONST')
self.assertEqual(info['base_opname'], 'LOAD_CONST')
self.assertFalse(info['is_specialized'])
def test_get_opcode_info_unknown_opcode(self):
"""Test get_opcode_info for an unknown opcode."""
info = get_opcode_info(999)
self.assertEqual(info['opname'], '<999>')
self.assertEqual(info['base_opname'], '<999>')
self.assertFalse(info['is_specialized'])
def test_format_opcode_standard(self):
"""Test format_opcode for a standard opcode."""
import opcode
load_const = opcode.opmap.get('LOAD_CONST')
if load_const is not None:
formatted = format_opcode(load_const)
self.assertEqual(formatted, 'LOAD_CONST')
def test_format_opcode_specialized(self):
"""Test format_opcode for a specialized opcode shows base in parens."""
import opcode
if not hasattr(opcode, '_specialized_opmap'):
self.skipTest("No specialized opcodes in this Python version")
if not hasattr(opcode, '_specializations'):
self.skipTest("No specialization info in this Python version")
# Find any specialized opcode to test
for base_name, variants in opcode._specializations.items():
if not variants:
continue
variant_name = variants[0]
variant_opcode = opcode._specialized_opmap.get(variant_name)
if variant_opcode is None:
continue
formatted = format_opcode(variant_opcode)
# Should show: VARIANT_NAME (BASE_NAME)
self.assertIn(variant_name, formatted)
self.assertIn(f'({base_name})', formatted)
return
self.skipTest("No specialized opcodes found")
def test_format_opcode_unknown(self):
"""Test format_opcode for an unknown opcode."""
formatted = format_opcode(999)
self.assertEqual(formatted, '<999>')
class TestLocationInCollectors(unittest.TestCase):
"""Tests for location tuple handling in each collector."""
def _make_frames_with_location(self, location, opcode=None):
"""Create test frames with a specific location."""
frame = MockFrameInfo("test.py", 0, "test_func", opcode)
# Override the location
frame.location = location
return [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame], status=THREAD_STATUS_HAS_GIL)]
)
]
def test_pstats_collector_with_location_info(self):
"""Test PstatsCollector handles LocationInfo properly."""
collector = PstatsCollector(sample_interval_usec=1000)
# Frame with LocationInfo
frame = MockFrameInfo("test.py", 42, "my_function")
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames)
# Should extract lineno from location
key = ("test.py", 42, "my_function")
self.assertIn(key, collector.result)
self.assertEqual(collector.result[key]["direct_calls"], 1)
def test_pstats_collector_with_none_location(self):
"""Test PstatsCollector handles None location (synthetic frames)."""
collector = PstatsCollector(sample_interval_usec=1000)
# Create frame with None location (like GC frame)
frame = MockFrameInfo("~", 0, "<GC>")
frame.location = None # Synthetic frame has no location
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames)
# Should use lineno=0 for None location
key = ("~", 0, "<GC>")
self.assertIn(key, collector.result)
def test_collapsed_stack_with_location_info(self):
"""Test CollapsedStackCollector handles LocationInfo properly."""
collector = CollapsedStackCollector(1000)
frame1 = MockFrameInfo("main.py", 10, "main")
frame2 = MockFrameInfo("utils.py", 25, "helper")
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame1, frame2], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames)
# Check that linenos were extracted correctly
self.assertEqual(len(collector.stack_counter), 1)
(path, _), count = list(collector.stack_counter.items())[0]
# Reversed order: helper at top, main at bottom
self.assertEqual(path[0], ("utils.py", 25, "helper"))
self.assertEqual(path[1], ("main.py", 10, "main"))
def test_flamegraph_collector_with_location_info(self):
"""Test FlamegraphCollector handles LocationInfo properly."""
collector = FlamegraphCollector(sample_interval_usec=1000)
frame = MockFrameInfo("app.py", 100, "process_data")
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames)
data = collector._convert_to_flamegraph_format()
# Verify the function name includes lineno from location
strings = data.get("strings", [])
name_found = any("process_data" in s and "100" in s for s in strings if isinstance(s, str))
self.assertTrue(name_found, f"Expected to find 'process_data' with line 100 in {strings}")
def test_gecko_collector_with_location_info(self):
"""Test GeckoCollector handles LocationInfo properly."""
collector = GeckoCollector(sample_interval_usec=1000)
frame = MockFrameInfo("server.py", 50, "handle_request")
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames)
profile = collector._build_profile()
# Check that the function was recorded
self.assertEqual(len(profile["threads"]), 1)
thread_data = profile["threads"][0]
string_array = profile["shared"]["stringArray"]
# Verify function name is in string table
self.assertIn("handle_request", string_array)
class TestOpcodeHandling(unittest.TestCase):
"""Tests for opcode field handling in collectors."""
def test_frame_with_opcode(self):
"""Test MockFrameInfo properly stores opcode."""
frame = MockFrameInfo("test.py", 10, "my_func", opcode=90)
self.assertEqual(frame.opcode, 90)
# Verify tuple representation includes opcode
self.assertEqual(frame[3], 90)
self.assertEqual(len(frame), 4)
def test_frame_without_opcode(self):
"""Test MockFrameInfo with no opcode defaults to None."""
frame = MockFrameInfo("test.py", 10, "my_func")
self.assertIsNone(frame.opcode)
self.assertIsNone(frame[3])
def test_collectors_ignore_opcode_for_key_generation(self):
"""Test that collectors use (filename, lineno, funcname) as key, not opcode."""
collector = PstatsCollector(sample_interval_usec=1000)
# Same function, different opcodes
frame1 = MockFrameInfo("test.py", 10, "func", opcode=90)
frame2 = MockFrameInfo("test.py", 10, "func", opcode=100)
frames1 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame1], status=THREAD_STATUS_HAS_GIL)]
)
]
frames2 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame2], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames1)
collector.collect(frames2)
# Should be counted as same function (opcode not in key)
key = ("test.py", 10, "func")
self.assertIn(key, collector.result)
self.assertEqual(collector.result[key]["direct_calls"], 2)
class TestGeckoOpcodeMarkers(unittest.TestCase):
"""Tests for GeckoCollector opcode interval markers."""
def test_gecko_collector_opcodes_disabled_by_default(self):
"""Test that opcode tracking is disabled by default."""
collector = GeckoCollector(sample_interval_usec=1000)
self.assertFalse(collector.opcodes_enabled)
def test_gecko_collector_opcodes_enabled(self):
"""Test that opcode tracking can be enabled."""
collector = GeckoCollector(sample_interval_usec=1000, opcodes=True)
self.assertTrue(collector.opcodes_enabled)
def test_gecko_opcode_state_tracking(self):
"""Test that GeckoCollector tracks opcode state changes."""
collector = GeckoCollector(sample_interval_usec=1000, opcodes=True)
# First sample with opcode 90 (RAISE_VARARGS)
frame1 = MockFrameInfo("test.py", 10, "func", opcode=90)
frames1 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame1], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames1)
# Should start tracking this opcode state
self.assertIn(1, collector.opcode_state)
state = collector.opcode_state[1]
self.assertEqual(state[0], 90) # opcode
self.assertEqual(state[1], 10) # lineno
self.assertEqual(state[3], "func") # funcname
def test_gecko_opcode_state_change_emits_marker(self):
"""Test that opcode state change emits an interval marker."""
collector = GeckoCollector(sample_interval_usec=1000, opcodes=True)
# First sample: opcode 90
frame1 = MockFrameInfo("test.py", 10, "func", opcode=90)
frames1 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame1], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames1)
# Second sample: different opcode 100
frame2 = MockFrameInfo("test.py", 10, "func", opcode=100)
frames2 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame2], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames2)
# Should have emitted a marker for the first opcode
thread_data = collector.threads[1]
markers = thread_data["markers"]
# At least one marker should have been added
self.assertGreater(len(markers["name"]), 0)
def test_gecko_opcode_markers_not_emitted_when_disabled(self):
"""Test that no opcode markers when opcodes=False."""
collector = GeckoCollector(sample_interval_usec=1000, opcodes=False)
frame1 = MockFrameInfo("test.py", 10, "func", opcode=90)
frames1 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame1], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames1)
frame2 = MockFrameInfo("test.py", 10, "func", opcode=100)
frames2 = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame2], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames2)
# opcode_state should not be tracked
self.assertEqual(len(collector.opcode_state), 0)
def test_gecko_opcode_with_none_opcode(self):
"""Test that None opcode doesn't cause issues."""
collector = GeckoCollector(sample_interval_usec=1000, opcodes=True)
# Frame with no opcode (None)
frame = MockFrameInfo("test.py", 10, "func", opcode=None)
frames = [
MockInterpreterInfo(
0,
[MockThreadInfo(1, [frame], status=THREAD_STATUS_HAS_GIL)]
)
]
collector.collect(frames)
# Should track the state but opcode is None
self.assertIn(1, collector.opcode_state)
self.assertIsNone(collector.opcode_state[1][0])
class TestCollectorFrameFormat(unittest.TestCase):
"""Tests verifying all collectors handle the 4-element frame format."""
def _make_sample_frames(self):
"""Create sample frames with full format: (filename, location, funcname, opcode)."""
return [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[
MockFrameInfo("app.py", 100, "main", opcode=90),
MockFrameInfo("utils.py", 50, "helper", opcode=100),
MockFrameInfo("lib.py", 25, "process", opcode=None),
],
status=THREAD_STATUS_HAS_GIL,
)
],
)
]
def test_pstats_collector_frame_format(self):
"""Test PstatsCollector with 4-element frame format."""
collector = PstatsCollector(sample_interval_usec=1000)
collector.collect(self._make_sample_frames())
# All three functions should be recorded
self.assertEqual(len(collector.result), 3)
self.assertIn(("app.py", 100, "main"), collector.result)
self.assertIn(("utils.py", 50, "helper"), collector.result)
self.assertIn(("lib.py", 25, "process"), collector.result)
def test_collapsed_stack_frame_format(self):
"""Test CollapsedStackCollector with 4-element frame format."""
collector = CollapsedStackCollector(sample_interval_usec=1000)
collector.collect(self._make_sample_frames())
self.assertEqual(len(collector.stack_counter), 1)
(path, _), _ = list(collector.stack_counter.items())[0]
# 3 frames in the path (reversed order)
self.assertEqual(len(path), 3)
def test_flamegraph_collector_frame_format(self):
"""Test FlamegraphCollector with 4-element frame format."""
collector = FlamegraphCollector(sample_interval_usec=1000)
collector.collect(self._make_sample_frames())
data = collector._convert_to_flamegraph_format()
# Should have processed the frames
self.assertIn("children", data)
def test_gecko_collector_frame_format(self):
"""Test GeckoCollector with 4-element frame format."""
collector = GeckoCollector(sample_interval_usec=1000)
collector.collect(self._make_sample_frames())
profile = collector._build_profile()
# Should have one thread with the frames
self.assertEqual(len(profile["threads"]), 1)
thread = profile["threads"][0]
# Should have recorded 3 functions
self.assertEqual(thread["funcTable"]["length"], 3)

View file

@ -34,7 +34,7 @@
skip_if_not_supported,
PROCESS_VM_READV_SUPPORTED,
)
from .mocks import MockFrameInfo, MockThreadInfo, MockInterpreterInfo
from .mocks import MockFrameInfo, MockThreadInfo, MockInterpreterInfo, LocationInfo
# Duration for profiling tests - long enough for process to complete naturally
PROFILING_TIMEOUT = str(int(SHORT_TIMEOUT))
@ -301,10 +301,10 @@ def test_collapsed_stack_with_recursion(self):
MockThreadInfo(
1,
[
("factorial.py", 10, "factorial"),
("factorial.py", 10, "factorial"), # recursive
("factorial.py", 10, "factorial"), # deeper
("main.py", 5, "main"),
MockFrameInfo("factorial.py", 10, "factorial"),
MockFrameInfo("factorial.py", 10, "factorial"), # recursive
MockFrameInfo("factorial.py", 10, "factorial"), # deeper
MockFrameInfo("main.py", 5, "main"),
],
)
],
@ -315,13 +315,9 @@ def test_collapsed_stack_with_recursion(self):
MockThreadInfo(
1,
[
("factorial.py", 10, "factorial"),
(
"factorial.py",
10,
"factorial",
), # different depth
("main.py", 5, "main"),
MockFrameInfo("factorial.py", 10, "factorial"),
MockFrameInfo("factorial.py", 10, "factorial"), # different depth
MockFrameInfo("main.py", 5, "main"),
],
)
],
@ -385,7 +381,7 @@ def cpu_intensive_work():
def main_loop():
"""Main test loop."""
max_iterations = 200
max_iterations = 1000
for iteration in range(max_iterations):
if iteration % 2 == 0: