From d420f29e2bc04b97f82c20e3822cd3b6e68cf4f4 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Sat, 29 Nov 2025 08:41:25 +0000 Subject: [PATCH] Fix _communicate_streams_windows to avoid blocking with large input Move stdin writing to a background thread in _communicate_streams_windows to avoid blocking indefinitely when writing large input to a pipeline where the subprocess doesn't consume stdin quickly. This mirrors the fix made to Popen._communicate() for Windows in commit 5b1862b (gh-87512). Add test_pipeline_timeout_large_input to verify that TimeoutExpired is raised promptly when run_pipeline() is called with large input and a timeout, even when the first process is slow to consume stdin. Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 71 +++++++++++++++++++++++++++---------- Lib/test/test_subprocess.py | 33 +++++++++++++++++ 2 files changed, 86 insertions(+), 18 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 50b437e3ee5..d360af52323 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -443,11 +443,48 @@ def _reader_thread_func(fh, buffer): except OSError: buffer.append(b'') + def _writer_thread_func(fh, data, result): + """Thread function to write data to a file handle and close it.""" + try: + if data: + fh.write(data) + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + result.append(exc) + try: + fh.close() + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL and not result: + result.append(exc) + def _communicate_streams_windows(stdin, input_data, read_streams, endtime, orig_timeout, cmd_for_timeout): """Windows implementation using threads.""" threads = [] buffers = {} + writer_thread = None + writer_result = [] + + # Start writer thread to send input to stdin + if stdin and input_data: + writer_thread = threading.Thread( + target=_writer_thread_func, + args=(stdin, input_data, writer_result)) + writer_thread.daemon = True + writer_thread.start() + elif stdin: + # No input data, just close stdin + try: + stdin.close() + except BrokenPipeError: + pass + except OSError as exc: + if exc.errno != errno.EINVAL: + raise # Start reader threads for each stream for stream in read_streams: @@ -458,25 +495,23 @@ def _communicate_streams_windows(stdin, input_data, read_streams, t.start() threads.append((stream, t)) - # Write stdin - if stdin and input_data: - try: - stdin.write(input_data) - except BrokenPipeError: - pass - except OSError as exc: - if exc.errno != errno.EINVAL: - raise - if stdin: - try: - stdin.close() - except BrokenPipeError: - pass - except OSError as exc: - if exc.errno != errno.EINVAL: - raise + # Join writer thread with timeout first + if writer_thread is not None: + remaining = _remaining_time_helper(endtime) + if remaining is not None and remaining < 0: + remaining = 0 + writer_thread.join(remaining) + if writer_thread.is_alive(): + # Timed out during write - collect partial results + results = {s: (b[0] if b else b'') for s, b in buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) + # Check for write errors + if writer_result: + raise writer_result[0] - # Join threads with timeout + # Join reader threads with timeout for stream, t in threads: remaining = _remaining_time_helper(endtime) if remaining is not None and remaining < 0: diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 58d8c1385b2..b0bc13e3d11 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -2298,6 +2298,39 @@ def test_pipeline_large_data_with_stderr(self): self.assertGreater(len(result.stderr), stderr_size) self.assertEqual(result.returncodes, [0, 0]) + def test_pipeline_timeout_large_input(self): + """Test that timeout is enforced with large input to a slow pipeline. + + This verifies that run_pipeline() doesn't block indefinitely when + writing large input to a pipeline where the first process is slow + to consume stdin. The timeout should be enforced promptly. + + This is particularly important on Windows where stdin writing could + block without proper threading. + """ + # Input larger than typical pipe buffer (64KB) + input_data = 'x' * (128 * 1024) + + start = time.monotonic() + with self.assertRaises(subprocess.TimeoutExpired): + subprocess.run_pipeline( + # First process sleeps before reading - simulates slow consumer + [sys.executable, '-c', + 'import sys, time; time.sleep(30); print(sys.stdin.read())'], + [sys.executable, '-c', + 'import sys; print(len(sys.stdin.read()))'], + input=input_data, capture_output=True, text=True, timeout=0.5 + ) + elapsed = time.monotonic() - start + + # Timeout should occur close to the specified timeout value, + # not after waiting for the subprocess to finish sleeping. + # Allow generous margin for slow CI, but must be well under + # the subprocess sleep time. + self.assertLess(elapsed, 5.0, + f"TimeoutExpired raised after {elapsed:.2f}s; expected ~0.5s. " + "Input writing may have blocked without checking timeout.") + def _get_test_grp_name(): for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'):