This commit is contained in:
Pablo Galindo Salgado 2025-12-08 00:10:56 +00:00
parent c10628acb7
commit 04563f0276

View file

@ -379,6 +379,31 @@ def _extract_coroutine_stacks(self, stack_trace):
for task in stack_trace[0].awaited_by
}
@staticmethod
def _frame_to_lineno_tuple(frame):
"""Convert frame to (filename, lineno, funcname, opcode) tuple.
This extracts just the line number from the location, ignoring column
offsets which can vary due to sampling timing (e.g., when two statements
are on the same line, the sample might catch either one).
"""
filename, location, funcname, opcode = frame
return (filename, location.lineno, funcname, opcode)
def _extract_coroutine_stacks_lineno_only(self, stack_trace):
"""Extract coroutine stacks with line numbers only (no column offsets).
Use this for tests where sampling timing can cause column offset
variations (e.g., 'expr1; expr2' on the same line).
"""
return {
task.task_name: sorted(
tuple(self._frame_to_lineno_tuple(frame) for frame in coro.call_stack)
for coro in task.coroutine_stack
)
for task in stack_trace[0].awaited_by
}
# ============================================================================
# Test classes
@ -582,8 +607,10 @@ def new_eager_loop():
},
)
# Check coroutine stacks
coroutine_stacks = self._extract_coroutine_stacks(
# Check coroutine stacks (using line numbers only to avoid
# flakiness from column offset variations when sampling
# catches different statements on the same line)
coroutine_stacks = self._extract_coroutine_stacks_lineno_only(
stack_trace
)
self.assertEqual(
@ -591,50 +618,36 @@ def new_eager_loop():
{
"Task-1": [
(
tuple(
[
taskgroups.__file__,
(121, 121, 16, 44),
"TaskGroup._aexit",
None,
]
),
tuple(
[
taskgroups.__file__,
(72, 72, 19, 45),
"TaskGroup.__aexit__",
None,
]
),
tuple([script_name, (26, 26, 15, 34), "main", None]),
(taskgroups.__file__, 121, "TaskGroup._aexit", None),
(taskgroups.__file__, 72, "TaskGroup.__aexit__", None),
(script_name, 26, "main", None),
)
],
"c2_root": [
(
tuple([script_name, (10, 10, 28, 46), "c5", None]),
tuple([script_name, (14, 14, 4, 8), "c4", None]),
tuple([script_name, (17, 17, 4, 14), "c3", None]),
tuple([script_name, (20, 20, 4, 14), "c2", None]),
(script_name, 10, "c5", None),
(script_name, 14, "c4", None),
(script_name, 17, "c3", None),
(script_name, 20, "c2", None),
)
],
"sub_main_1": [
(tuple([script_name, (23, 23, 4, 14), "c1", None]),)
((script_name, 23, "c1", None),)
],
"sub_main_2": [
(tuple([script_name, (23, 23, 4, 14), "c1", None]),)
((script_name, 23, "c1", None),)
],
},
)
# Check awaited_by coroutine stacks
# Check awaited_by coroutine stacks (line numbers only)
id_to_task = self._get_task_id_map(stack_trace)
awaited_by_coroutine_stacks = {
task.task_name: sorted(
(
id_to_task[coro.task_name].task_name,
tuple(
tuple(frame)
self._frame_to_lineno_tuple(frame)
for frame in coro.call_stack
),
)
@ -650,55 +663,27 @@ def new_eager_loop():
(
"Task-1",
(
tuple(
[
taskgroups.__file__,
(121, 121, 16, 44),
"TaskGroup._aexit",
None,
]
),
tuple(
[
taskgroups.__file__,
(72, 72, 19, 45),
"TaskGroup.__aexit__",
None,
]
),
tuple([script_name, (26, 26, 15, 34), "main", None]),
(taskgroups.__file__, 121, "TaskGroup._aexit", None),
(taskgroups.__file__, 72, "TaskGroup.__aexit__", None),
(script_name, 26, "main", None),
),
),
(
"sub_main_1",
(tuple([script_name, (23, 23, 4, 14), "c1", None]),),
((script_name, 23, "c1", None),),
),
(
"sub_main_2",
(tuple([script_name, (23, 23, 4, 14), "c1", None]),),
((script_name, 23, "c1", None),),
),
],
"sub_main_1": [
(
"Task-1",
(
tuple(
[
taskgroups.__file__,
(121, 121, 16, 44),
"TaskGroup._aexit",
None,
]
),
tuple(
[
taskgroups.__file__,
(72, 72, 19, 45),
"TaskGroup.__aexit__",
None,
]
),
tuple([script_name, (26, 26, 15, 34), "main", None]),
(taskgroups.__file__, 121, "TaskGroup._aexit", None),
(taskgroups.__file__, 72, "TaskGroup.__aexit__", None),
(script_name, 26, "main", None),
),
)
],
@ -706,23 +691,9 @@ def new_eager_loop():
(
"Task-1",
(
tuple(
[
taskgroups.__file__,
(121, 121, 16, 44),
"TaskGroup._aexit",
None,
]
),
tuple(
[
taskgroups.__file__,
(72, 72, 19, 45),
"TaskGroup.__aexit__",
None,
]
),
tuple([script_name, (26, 26, 15, 34), "main", None]),
(taskgroups.__file__, 121, "TaskGroup._aexit", None),
(taskgroups.__file__, 72, "TaskGroup.__aexit__", None),
(script_name, 26, "main", None),
),
)
],
@ -794,18 +765,20 @@ async def main():
task = stack_trace[0].awaited_by[0]
self.assertEqual(task.task_name, "Task-1")
# Check the coroutine stack
# Check the coroutine stack (using line numbers only to avoid
# flakiness from column offset variations when sampling
# catches different statements on the same line)
coroutine_stack = sorted(
tuple(tuple(frame) for frame in coro.call_stack)
tuple(self._frame_to_lineno_tuple(frame) for frame in coro.call_stack)
for coro in task.coroutine_stack
)
self.assertEqual(
coroutine_stack,
[
(
tuple([script_name, (10, 10, 28, 46), "gen_nested_call", None]),
tuple([script_name, (16, 16, 12, 35), "gen", None]),
tuple([script_name, (19, 20, 4, 12), "main", None]),
(script_name, 10, "gen_nested_call", None),
(script_name, 16, "gen", None),
(script_name, 19, "main", None),
)
],
)
@ -893,31 +866,33 @@ async def main():
},
)
# Check coroutine stacks
coroutine_stacks = self._extract_coroutine_stacks(
# Check coroutine stacks (using line numbers only to avoid
# flakiness from column offset variations when sampling
# catches different statements on the same line)
coroutine_stacks = self._extract_coroutine_stacks_lineno_only(
stack_trace
)
self.assertEqual(
coroutine_stacks,
{
"Task-1": [(tuple([script_name, (21, 21, 4, 36), "main", None]),)],
"Task-1": [((script_name, 21, "main", None),)],
"Task-2": [
(
tuple([script_name, (11, 11, 28, 46), "deep", None]),
tuple([script_name, (15, 15, 4, 16), "c1", None]),
(script_name, 11, "deep", None),
(script_name, 15, "c1", None),
)
],
},
)
# Check awaited_by coroutine stacks
# Check awaited_by coroutine stacks (line numbers only)
id_to_task = self._get_task_id_map(stack_trace)
awaited_by_coroutine_stacks = {
task.task_name: sorted(
(
id_to_task[coro.task_name].task_name,
tuple(
tuple(frame) for frame in coro.call_stack
self._frame_to_lineno_tuple(frame) for frame in coro.call_stack
),
)
for coro in task.awaited_by
@ -929,7 +904,7 @@ async def main():
{
"Task-1": [],
"Task-2": [
("Task-1", (tuple([script_name, (21, 21, 4, 36), "main", None]),))
("Task-1", ((script_name, 21, "main", None),))
],
},
)
@ -1017,8 +992,10 @@ async def main():
},
)
# Check coroutine stacks
coroutine_stacks = self._extract_coroutine_stacks(
# Check coroutine stacks (using line numbers only to avoid
# flakiness from column offset variations when sampling
# catches different statements on the same line)
coroutine_stacks = self._extract_coroutine_stacks_lineno_only(
stack_trace
)
self.assertEqual(
@ -1026,42 +1003,28 @@ async def main():
{
"Task-1": [
(
tuple(
[
staggered.__file__,
(164, 164, 16, 38),
"staggered_race",
None,
]
),
tuple([script_name, (21, 24, 4, 5), "main", None]),
(staggered.__file__, 164, "staggered_race", None),
(script_name, 21, "main", None),
)
],
"Task-2": [
(
tuple([script_name, (11, 11, 28, 46), "deep", None]),
tuple([script_name, (15, 15, 4, 16), "c1", None]),
tuple(
[
staggered.__file__,
(126, 126, 21, 36),
"staggered_race.<locals>.run_one_coro",
None,
]
),
(script_name, 11, "deep", None),
(script_name, 15, "c1", None),
(staggered.__file__, 126, "staggered_race.<locals>.run_one_coro", None),
)
],
},
)
# Check awaited_by coroutine stacks
# Check awaited_by coroutine stacks (line numbers only)
id_to_task = self._get_task_id_map(stack_trace)
awaited_by_coroutine_stacks = {
task.task_name: sorted(
(
id_to_task[coro.task_name].task_name,
tuple(
tuple(frame) for frame in coro.call_stack
self._frame_to_lineno_tuple(frame) for frame in coro.call_stack
),
)
for coro in task.awaited_by
@ -1076,15 +1039,8 @@ async def main():
(
"Task-1",
(
tuple(
[
staggered.__file__,
(164, 164, 16, 38),
"staggered_race",
None,
]
),
tuple([script_name, (21, 24, 4, 5), "main", None]),
(staggered.__file__, 164, "staggered_race", None),
(script_name, 21, "main", None),
),
)
],
@ -1931,6 +1887,50 @@ def get_trace_with_opcodes(pid):
self.assertIsInstance(end_col_offset, int)
self.assertGreaterEqual(end_col_offset, col_offset)
@skip_if_not_supported
@unittest.skipIf(
sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support",
)
def test_location_tuple_exact_values(self):
"""Test exact values of location tuple including column offsets."""
script = textwrap.dedent(
"""\
import time, sys, socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', {port}))
def foo():
sock.sendall(b"ready")
time.sleep(10_000)
foo()
"""
)
def get_trace_with_opcodes(pid):
return RemoteUnwinder(pid, opcodes=True).get_stack_trace()
stack_trace, _ = self._run_script_and_get_trace(
script, get_trace_with_opcodes, wait_for_signals=b"ready"
)
foo_frame = self._find_frame_in_trace(
stack_trace, lambda f: f.funcname == "foo"
)
self.assertIsNotNone(foo_frame, "Could not find foo frame")
# Can catch either sock.sendall (line 7) or time.sleep (line 8)
location = foo_frame.location
valid_locations = [
(7, 7, 4, 26), # sock.sendall(b"ready")
(8, 8, 4, 22), # time.sleep(10_000)
]
actual = (location.lineno, location.end_lineno,
location.col_offset, location.end_col_offset)
self.assertIn(actual, valid_locations)
class TestUnsupportedPlatformHandling(unittest.TestCase):
@unittest.skipIf(