diff --git a/Lib/test/test_profiling/test_sampling_profiler/helpers.py b/Lib/test/test_profiling/test_sampling_profiler/helpers.py index f1c01afd0fa..0e32d8dd9ea 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/helpers.py +++ b/Lib/test/test_profiling/test_sampling_profiler/helpers.py @@ -38,12 +38,88 @@ SubprocessInfo = namedtuple("SubprocessInfo", ["process", "socket"]) +def _wait_for_signal(sock, expected_signals, timeout=SHORT_TIMEOUT): + """ + Wait for expected signal(s) from a socket with proper timeout and EOF handling. + + Args: + sock: Connected socket to read from + expected_signals: Single bytes object or list of bytes objects to wait for + timeout: Socket timeout in seconds + + Returns: + bytes: Complete accumulated response buffer + + Raises: + RuntimeError: If connection closed before signal received or timeout + """ + if isinstance(expected_signals, bytes): + expected_signals = [expected_signals] + + sock.settimeout(timeout) + buffer = b"" + + while True: + # Check if all expected signals are in buffer + if all(sig in buffer for sig in expected_signals): + return buffer + + try: + chunk = sock.recv(4096) + if not chunk: + raise RuntimeError( + f"Connection closed before receiving expected signals. " + f"Expected: {expected_signals}, Got: {buffer[-200:]!r}" + ) + buffer += chunk + except socket.timeout: + raise RuntimeError( + f"Timeout waiting for signals. " + f"Expected: {expected_signals}, Got: {buffer[-200:]!r}" + ) from None + except OSError as e: + raise RuntimeError( + f"Socket error while waiting for signals: {e}. " + f"Expected: {expected_signals}, Got: {buffer[-200:]!r}" + ) from None + + +def _cleanup_sockets(*sockets): + """Safely close multiple sockets, ignoring errors.""" + for sock in sockets: + if sock is not None: + try: + sock.close() + except OSError: + pass + + +def _cleanup_process(proc, timeout=SHORT_TIMEOUT): + """Terminate a process gracefully, escalating to kill if needed.""" + if proc.poll() is not None: + return + proc.terminate() + try: + proc.wait(timeout=timeout) + return + except subprocess.TimeoutExpired: + pass + proc.kill() + try: + proc.wait(timeout=timeout) + except subprocess.TimeoutExpired: + pass # Process refuses to die, nothing more we can do + + @contextlib.contextmanager -def test_subprocess(script): +def test_subprocess(script, wait_for_working=False): """Context manager to create a test subprocess with socket synchronization. Args: - script: Python code to execute in the subprocess + script: Python code to execute in the subprocess. If wait_for_working + is True, script should send b"working" after starting work. + wait_for_working: If True, wait for both "ready" and "working" signals. + Default False for backward compatibility. Yields: SubprocessInfo: Named tuple with process and socket objects @@ -80,19 +156,18 @@ def test_subprocess(script): # Wait for process to connect and send ready signal client_socket, _ = server_socket.accept() server_socket.close() - response = client_socket.recv(1024) - if response != b"ready": - raise RuntimeError( - f"Unexpected response from subprocess: {response!r}" - ) + server_socket = None + + # Wait for ready signal, and optionally working signal + if wait_for_working: + _wait_for_signal(client_socket, [b"ready", b"working"]) + else: + _wait_for_signal(client_socket, b"ready") yield SubprocessInfo(proc, client_socket) finally: - if client_socket is not None: - client_socket.close() - if proc.poll() is None: - proc.kill() - proc.wait() + _cleanup_sockets(client_socket, server_socket) + _cleanup_process(proc) def close_and_unlink(file): diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py index 94946d74aa4..843fb3b7416 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py @@ -39,32 +39,26 @@ def setUpClass(cls): import gc class ExpensiveGarbage: - """Class that triggers GC with expensive finalizer (callback).""" def __init__(self): self.cycle = self def __del__(self): - # CPU-intensive work in the finalizer callback result = 0 for i in range(100000): result += i * i if i % 1000 == 0: result = result % 1000000 -def main_loop(): - """Main loop that triggers GC with expensive callback.""" - while True: - ExpensiveGarbage() - gc.collect() - -if __name__ == "__main__": - main_loop() +_test_sock.sendall(b"working") +while True: + ExpensiveGarbage() + gc.collect() ''' def test_gc_frames_enabled(self): """Test that GC frames appear when gc tracking is enabled.""" with ( - test_subprocess(self.gc_test_script) as subproc, + test_subprocess(self.gc_test_script, wait_for_working=True) as subproc, io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): @@ -94,7 +88,7 @@ def test_gc_frames_enabled(self): def test_gc_frames_disabled(self): """Test that GC frames do not appear when gc tracking is disabled.""" with ( - test_subprocess(self.gc_test_script) as subproc, + test_subprocess(self.gc_test_script, wait_for_working=True) as subproc, io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): @@ -133,18 +127,13 @@ def setUpClass(cls): cls.native_test_script = """ import operator -def main_loop(): - while True: - # Native code in the middle of the stack: - operator.call(inner) - def inner(): - # Python code at the top of the stack: for _ in range(1_000_0000): pass -if __name__ == "__main__": - main_loop() +_test_sock.sendall(b"working") +while True: + operator.call(inner) """ def test_native_frames_enabled(self): @@ -154,10 +143,7 @@ def test_native_frames_enabled(self): ) self.addCleanup(close_and_unlink, collapsed_file) - with ( - test_subprocess(self.native_test_script) as subproc, - ): - # Suppress profiler output when testing file export + with test_subprocess(self.native_test_script, wait_for_working=True) as subproc: with ( io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), @@ -199,7 +185,7 @@ def test_native_frames_enabled(self): def test_native_frames_disabled(self): """Test that native frames do not appear when native tracking is disabled.""" with ( - test_subprocess(self.native_test_script) as subproc, + test_subprocess(self.native_test_script, wait_for_working=True) as subproc, io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py index aae241a3335..e92b3f45fbc 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py @@ -39,6 +39,9 @@ # Duration for profiling tests - long enough for process to complete naturally PROFILING_TIMEOUT = str(int(SHORT_TIMEOUT)) +# Duration for profiling in tests - short enough to complete quickly +PROFILING_DURATION_SEC = 2 + @skip_if_not_supported @unittest.skipIf( @@ -359,23 +362,14 @@ def total_occurrences(func): self.assertEqual(total_occurrences(main_key), 2) -@requires_subprocess() -@skip_if_not_supported -class TestSampleProfilerIntegration(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.test_script = ''' -import time -import os - +# Shared workload functions for test scripts +_WORKLOAD_FUNCTIONS = ''' def slow_fibonacci(n): - """Recursive fibonacci - should show up prominently in profiler.""" if n <= 1: return n return slow_fibonacci(n-1) + slow_fibonacci(n-2) def cpu_intensive_work(): - """CPU intensive work that should show in profiler.""" result = 0 for i in range(10000): result += i * i @@ -383,33 +377,48 @@ def cpu_intensive_work(): result = result % 1000000 return result -def main_loop(): - """Main test loop.""" - max_iterations = 200 - - for iteration in range(max_iterations): +def do_work(): + iteration = 0 + while True: if iteration % 2 == 0: - result = slow_fibonacci(15) + slow_fibonacci(15) else: - result = cpu_intensive_work() + cpu_intensive_work() + iteration += 1 +''' -if __name__ == "__main__": - main_loop() + +@requires_subprocess() +@skip_if_not_supported +class TestSampleProfilerIntegration(unittest.TestCase): + @classmethod + def setUpClass(cls): + # Test script for use with test_subprocess() - signals when work starts + cls.test_script = _WORKLOAD_FUNCTIONS + ''' +_test_sock.sendall(b"working") +do_work() +''' + # CLI test script - runs for fixed duration (no socket sync) + cls.cli_test_script = ''' +import time +''' + _WORKLOAD_FUNCTIONS.replace( + 'while True:', 'end_time = time.time() + 30\n while time.time() < end_time:' +) + ''' +do_work() ''' def test_sampling_basic_functionality(self): with ( - test_subprocess(self.test_script) as subproc, + test_subprocess(self.test_script, wait_for_working=True) as subproc, io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): try: - # Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations collector = PstatsCollector(sample_interval_usec=1000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, collector, - duration_sec=SHORT_TIMEOUT, + duration_sec=PROFILING_DURATION_SEC, ) collector.print_stats(show_summary=False) except PermissionError: @@ -431,7 +440,7 @@ def test_sampling_with_pstats_export(self): ) self.addCleanup(close_and_unlink, pstats_out) - with test_subprocess(self.test_script) as subproc: + with test_subprocess(self.test_script, wait_for_working=True) as subproc: # Suppress profiler output when testing file export with ( io.StringIO() as captured_output, @@ -442,7 +451,7 @@ def test_sampling_with_pstats_export(self): profiling.sampling.sample.sample( subproc.process.pid, collector, - duration_sec=1, + duration_sec=PROFILING_DURATION_SEC, ) collector.export(pstats_out.name) except PermissionError: @@ -476,7 +485,7 @@ def test_sampling_with_collapsed_export(self): self.addCleanup(close_and_unlink, collapsed_file) with ( - test_subprocess(self.test_script) as subproc, + test_subprocess(self.test_script, wait_for_working=True) as subproc, ): # Suppress profiler output when testing file export with ( @@ -488,7 +497,7 @@ def test_sampling_with_collapsed_export(self): profiling.sampling.sample.sample( subproc.process.pid, collector, - duration_sec=1, + duration_sec=PROFILING_DURATION_SEC, ) collector.export(collapsed_file.name) except PermissionError: @@ -526,7 +535,7 @@ def test_sampling_with_collapsed_export(self): def test_sampling_all_threads(self): with ( - test_subprocess(self.test_script) as subproc, + test_subprocess(self.test_script, wait_for_working=True) as subproc, # Suppress profiler output io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), @@ -536,7 +545,7 @@ def test_sampling_all_threads(self): profiling.sampling.sample.sample( subproc.process.pid, collector, - duration_sec=1, + duration_sec=PROFILING_DURATION_SEC, all_threads=True, ) collector.print_stats(show_summary=False) @@ -548,12 +557,16 @@ def test_sampling_all_threads(self): def test_sample_target_script(self): script_file = tempfile.NamedTemporaryFile(delete=False) - script_file.write(self.test_script.encode("utf-8")) + script_file.write(self.cli_test_script.encode("utf-8")) script_file.flush() self.addCleanup(close_and_unlink, script_file) - # Sample for up to SHORT_TIMEOUT seconds, but process exits after fixed iterations - test_args = ["profiling.sampling.sample", "run", "-d", PROFILING_TIMEOUT, script_file.name] + # Sample for PROFILING_DURATION_SEC seconds + test_args = [ + "profiling.sampling.sample", "run", + "-d", str(PROFILING_DURATION_SEC), + script_file.name + ] with ( mock.patch("sys.argv", test_args), @@ -583,13 +596,13 @@ def test_sample_target_module(self): module_path = os.path.join(tempdir.name, "test_module.py") with open(module_path, "w") as f: - f.write(self.test_script) + f.write(self.cli_test_script) test_args = [ "profiling.sampling.cli", "run", "-d", - PROFILING_TIMEOUT, + str(PROFILING_DURATION_SEC), "-m", "test_module", ] @@ -630,8 +643,10 @@ def test_invalid_pid(self): profiling.sampling.sample.sample(-1, collector, duration_sec=1) def test_process_dies_during_sampling(self): + # Use wait_for_working=False since this simple script doesn't send "working" with test_subprocess( - "import time; time.sleep(0.5); exit()" + "import time; time.sleep(0.5); exit()", + wait_for_working=False ) as subproc: with ( io.StringIO() as captured_output, @@ -654,7 +669,11 @@ def test_process_dies_during_sampling(self): self.assertIn("Error rate", output) def test_is_process_running(self): - with test_subprocess("import time; time.sleep(1000)") as subproc: + # Use wait_for_working=False since this simple script doesn't send "working" + with test_subprocess( + "import time; time.sleep(1000)", + wait_for_working=False + ) as subproc: try: profiler = SampleProfiler( pid=subproc.process.pid, @@ -681,7 +700,11 @@ def test_is_process_running(self): @unittest.skipUnless(sys.platform == "linux", "Only valid on Linux") def test_esrch_signal_handling(self): - with test_subprocess("import time; time.sleep(1000)") as subproc: + # Use wait_for_working=False since this simple script doesn't send "working" + with test_subprocess( + "import time; time.sleep(1000)", + wait_for_working=False + ) as subproc: try: unwinder = _remote_debugging.RemoteUnwinder( subproc.process.pid @@ -793,38 +816,34 @@ class TestAsyncAwareProfilingIntegration(unittest.TestCase): @classmethod def setUpClass(cls): + # Async test script that runs indefinitely until killed. + # Sends "working" signal AFTER tasks are created and scheduled. cls.async_script = ''' import asyncio async def sleeping_leaf(): - """Leaf task that just sleeps - visible in 'all' mode.""" - for _ in range(50): + while True: await asyncio.sleep(0.02) async def cpu_leaf(): - """Leaf task that does CPU work - visible in both modes.""" total = 0 - for _ in range(200): + while True: for i in range(10000): total += i * i await asyncio.sleep(0) - return total async def supervisor(): - """Middle layer that spawns leaf tasks.""" tasks = [ asyncio.create_task(sleeping_leaf(), name="Sleeper-0"), asyncio.create_task(sleeping_leaf(), name="Sleeper-1"), asyncio.create_task(sleeping_leaf(), name="Sleeper-2"), asyncio.create_task(cpu_leaf(), name="Worker"), ] + await asyncio.sleep(0) # Let tasks get scheduled + _test_sock.sendall(b"working") await asyncio.gather(*tasks) -async def main(): - await supervisor() - -if __name__ == "__main__": - asyncio.run(main()) +asyncio.run(supervisor()) ''' def _collect_async_samples(self, async_aware_mode): @@ -832,13 +851,13 @@ def _collect_async_samples(self, async_aware_mode): Returns a dict mapping function names to their sample counts. """ - with test_subprocess(self.async_script) as subproc: + with test_subprocess(self.async_script, wait_for_working=True) as subproc: try: collector = CollapsedStackCollector(1000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, collector, - duration_sec=SHORT_TIMEOUT, + duration_sec=PROFILING_DURATION_SEC, async_aware=async_aware_mode, ) except PermissionError: diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py index 1b0e21a5fe4..c0457ee7eb8 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py @@ -143,27 +143,16 @@ def cpu_active_worker(): while True: x += 1 -def main(): - # Start both threads - idle_thread = threading.Thread(target=idle_worker) - cpu_thread = threading.Thread(target=cpu_active_worker) - idle_thread.start() - cpu_thread.start() - - # Wait for CPU thread to be running, then signal test - cpu_ready.wait() - _test_sock.sendall(b"threads_ready") - - idle_thread.join() - cpu_thread.join() - -main() - +idle_thread = threading.Thread(target=idle_worker) +cpu_thread = threading.Thread(target=cpu_active_worker) +idle_thread.start() +cpu_thread.start() +cpu_ready.wait() +_test_sock.sendall(b"working") +idle_thread.join() +cpu_thread.join() """ - with test_subprocess(cpu_vs_idle_script) as subproc: - # Wait for signal that threads are running - response = subproc.socket.recv(1024) - self.assertEqual(response, b"threads_ready") + with test_subprocess(cpu_vs_idle_script, wait_for_working=True) as subproc: with ( io.StringIO() as captured_output, @@ -365,26 +354,16 @@ def gil_holding_work(): while True: x += 1 -def main(): - # Start both threads - idle_thread = threading.Thread(target=gil_releasing_work) - cpu_thread = threading.Thread(target=gil_holding_work) - idle_thread.start() - cpu_thread.start() - - # Wait for GIL-holding thread to be running, then signal test - gil_ready.wait() - _test_sock.sendall(b"threads_ready") - - idle_thread.join() - cpu_thread.join() - -main() +idle_thread = threading.Thread(target=gil_releasing_work) +cpu_thread = threading.Thread(target=gil_holding_work) +idle_thread.start() +cpu_thread.start() +gil_ready.wait() +_test_sock.sendall(b"working") +idle_thread.join() +cpu_thread.join() """ - with test_subprocess(gil_test_script) as subproc: - # Wait for signal that threads are running - response = subproc.socket.recv(1024) - self.assertEqual(response, b"threads_ready") + with test_subprocess(gil_test_script, wait_for_working=True) as subproc: with ( io.StringIO() as captured_output,