Merge remote-tracking branch 'upstream/main' into tachyon-opcodes

# Conflicts:
#	Lib/test/test_profiling/test_sampling_profiler/test_integration.py
This commit is contained in:
Pablo Galindo Salgado 2025-12-07 23:04:11 +00:00
commit dac78a525d
27 changed files with 619 additions and 389 deletions

View file

@ -39,6 +39,9 @@
# Duration for profiling tests - long enough for process to complete naturally
PROFILING_TIMEOUT = str(int(SHORT_TIMEOUT))
# Duration for profiling in tests - short enough to complete quickly
PROFILING_DURATION_SEC = 2
@skip_if_not_supported
@unittest.skipIf(
@ -355,23 +358,14 @@ def total_occurrences(func):
self.assertEqual(total_occurrences(main_key), 2)
@requires_subprocess()
@skip_if_not_supported
class TestSampleProfilerIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.test_script = '''
import time
import os
# Shared workload functions for test scripts
_WORKLOAD_FUNCTIONS = '''
def slow_fibonacci(n):
"""Recursive fibonacci - should show up prominently in profiler."""
if n <= 1:
return n
return slow_fibonacci(n-1) + slow_fibonacci(n-2)
def cpu_intensive_work():
"""CPU intensive work that should show in profiler."""
result = 0
for i in range(10000):
result += i * i
@ -379,33 +373,48 @@ def cpu_intensive_work():
result = result % 1000000
return result
def main_loop():
"""Main test loop."""
max_iterations = 1000
for iteration in range(max_iterations):
def do_work():
iteration = 0
while True:
if iteration % 2 == 0:
result = slow_fibonacci(15)
slow_fibonacci(15)
else:
result = cpu_intensive_work()
cpu_intensive_work()
iteration += 1
'''
if __name__ == "__main__":
main_loop()
@requires_subprocess()
@skip_if_not_supported
class TestSampleProfilerIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Test script for use with test_subprocess() - signals when work starts
cls.test_script = _WORKLOAD_FUNCTIONS + '''
_test_sock.sendall(b"working")
do_work()
'''
# CLI test script - runs for fixed duration (no socket sync)
cls.cli_test_script = '''
import time
''' + _WORKLOAD_FUNCTIONS.replace(
'while True:', 'end_time = time.time() + 30\n while time.time() < end_time:'
) + '''
do_work()
'''
def test_sampling_basic_functionality(self):
with (
test_subprocess(self.test_script) as subproc,
test_subprocess(self.test_script, wait_for_working=True) as subproc,
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
try:
# Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations
collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=SHORT_TIMEOUT,
duration_sec=PROFILING_DURATION_SEC,
)
collector.print_stats(show_summary=False)
except PermissionError:
@ -427,7 +436,7 @@ def test_sampling_with_pstats_export(self):
)
self.addCleanup(close_and_unlink, pstats_out)
with test_subprocess(self.test_script) as subproc:
with test_subprocess(self.test_script, wait_for_working=True) as subproc:
# Suppress profiler output when testing file export
with (
io.StringIO() as captured_output,
@ -438,7 +447,7 @@ def test_sampling_with_pstats_export(self):
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
duration_sec=PROFILING_DURATION_SEC,
)
collector.export(pstats_out.name)
except PermissionError:
@ -472,7 +481,7 @@ def test_sampling_with_collapsed_export(self):
self.addCleanup(close_and_unlink, collapsed_file)
with (
test_subprocess(self.test_script) as subproc,
test_subprocess(self.test_script, wait_for_working=True) as subproc,
):
# Suppress profiler output when testing file export
with (
@ -484,7 +493,7 @@ def test_sampling_with_collapsed_export(self):
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
duration_sec=PROFILING_DURATION_SEC,
)
collector.export(collapsed_file.name)
except PermissionError:
@ -522,7 +531,7 @@ def test_sampling_with_collapsed_export(self):
def test_sampling_all_threads(self):
with (
test_subprocess(self.test_script) as subproc,
test_subprocess(self.test_script, wait_for_working=True) as subproc,
# Suppress profiler output
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
@ -532,7 +541,7 @@ def test_sampling_all_threads(self):
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=1,
duration_sec=PROFILING_DURATION_SEC,
all_threads=True,
)
collector.print_stats(show_summary=False)
@ -544,12 +553,16 @@ def test_sampling_all_threads(self):
def test_sample_target_script(self):
script_file = tempfile.NamedTemporaryFile(delete=False)
script_file.write(self.test_script.encode("utf-8"))
script_file.write(self.cli_test_script.encode("utf-8"))
script_file.flush()
self.addCleanup(close_and_unlink, script_file)
# Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations
test_args = ["profiling.sampling.sample", "run", "-d", PROFILING_TIMEOUT, script_file.name]
# Sample for PROFILING_DURATION_SEC seconds
test_args = [
"profiling.sampling.sample", "run",
"-d", str(PROFILING_DURATION_SEC),
script_file.name
]
with (
mock.patch("sys.argv", test_args),
@ -579,13 +592,13 @@ def test_sample_target_module(self):
module_path = os.path.join(tempdir.name, "test_module.py")
with open(module_path, "w") as f:
f.write(self.test_script)
f.write(self.cli_test_script)
test_args = [
"profiling.sampling.cli",
"run",
"-d",
PROFILING_TIMEOUT,
str(PROFILING_DURATION_SEC),
"-m",
"test_module",
]
@ -626,8 +639,10 @@ def test_invalid_pid(self):
profiling.sampling.sample.sample(-1, collector, duration_sec=1)
def test_process_dies_during_sampling(self):
# Use wait_for_working=False since this simple script doesn't send "working"
with test_subprocess(
"import time; time.sleep(0.5); exit()"
"import time; time.sleep(0.5); exit()",
wait_for_working=False
) as subproc:
with (
io.StringIO() as captured_output,
@ -650,7 +665,11 @@ def test_process_dies_during_sampling(self):
self.assertIn("Error rate", output)
def test_is_process_running(self):
with test_subprocess("import time; time.sleep(1000)") as subproc:
# Use wait_for_working=False since this simple script doesn't send "working"
with test_subprocess(
"import time; time.sleep(1000)",
wait_for_working=False
) as subproc:
try:
profiler = SampleProfiler(
pid=subproc.process.pid,
@ -677,7 +696,11 @@ def test_is_process_running(self):
@unittest.skipUnless(sys.platform == "linux", "Only valid on Linux")
def test_esrch_signal_handling(self):
with test_subprocess("import time; time.sleep(1000)") as subproc:
# Use wait_for_working=False since this simple script doesn't send "working"
with test_subprocess(
"import time; time.sleep(1000)",
wait_for_working=False
) as subproc:
try:
unwinder = _remote_debugging.RemoteUnwinder(
subproc.process.pid
@ -789,38 +812,34 @@ class TestAsyncAwareProfilingIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Async test script that runs indefinitely until killed.
# Sends "working" signal AFTER tasks are created and scheduled.
cls.async_script = '''
import asyncio
async def sleeping_leaf():
"""Leaf task that just sleeps - visible in 'all' mode."""
for _ in range(50):
while True:
await asyncio.sleep(0.02)
async def cpu_leaf():
"""Leaf task that does CPU work - visible in both modes."""
total = 0
for _ in range(200):
while True:
for i in range(10000):
total += i * i
await asyncio.sleep(0)
return total
async def supervisor():
"""Middle layer that spawns leaf tasks."""
tasks = [
asyncio.create_task(sleeping_leaf(), name="Sleeper-0"),
asyncio.create_task(sleeping_leaf(), name="Sleeper-1"),
asyncio.create_task(sleeping_leaf(), name="Sleeper-2"),
asyncio.create_task(cpu_leaf(), name="Worker"),
]
await asyncio.sleep(0) # Let tasks get scheduled
_test_sock.sendall(b"working")
await asyncio.gather(*tasks)
async def main():
await supervisor()
if __name__ == "__main__":
asyncio.run(main())
asyncio.run(supervisor())
'''
def _collect_async_samples(self, async_aware_mode):
@ -828,13 +847,13 @@ def _collect_async_samples(self, async_aware_mode):
Returns a dict mapping function names to their sample counts.
"""
with test_subprocess(self.async_script) as subproc:
with test_subprocess(self.async_script, wait_for_working=True) as subproc:
try:
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
collector,
duration_sec=SHORT_TIMEOUT,
duration_sec=PROFILING_DURATION_SEC,
async_aware=async_aware_mode,
)
except PermissionError: