Fix _communicate_streams_windows to avoid blocking with large input

Move stdin writing to a background thread in _communicate_streams_windows
to avoid blocking indefinitely when writing large input to a pipeline
where the subprocess doesn't consume stdin quickly.

This mirrors the fix made to Popen._communicate() for Windows in
commit 5b1862b (gh-87512).

Add test_pipeline_timeout_large_input to verify that TimeoutExpired
is raised promptly when run_pipeline() is called with large input
and a timeout, even when the first process is slow to consume stdin.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Gregory P. Smith using claude.ai/code 2025-11-29 08:41:25 +00:00
parent 9f53a8e883
commit d420f29e2b
No known key found for this signature in database
2 changed files with 86 additions and 18 deletions

View file

@ -443,11 +443,48 @@ def _reader_thread_func(fh, buffer):
except OSError:
buffer.append(b'')
def _writer_thread_func(fh, data, result):
"""Thread function to write data to a file handle and close it."""
try:
if data:
fh.write(data)
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
result.append(exc)
try:
fh.close()
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL and not result:
result.append(exc)
def _communicate_streams_windows(stdin, input_data, read_streams,
endtime, orig_timeout, cmd_for_timeout):
"""Windows implementation using threads."""
threads = []
buffers = {}
writer_thread = None
writer_result = []
# Start writer thread to send input to stdin
if stdin and input_data:
writer_thread = threading.Thread(
target=_writer_thread_func,
args=(stdin, input_data, writer_result))
writer_thread.daemon = True
writer_thread.start()
elif stdin:
# No input data, just close stdin
try:
stdin.close()
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
raise
# Start reader threads for each stream
for stream in read_streams:
@ -458,25 +495,23 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
t.start()
threads.append((stream, t))
# Write stdin
if stdin and input_data:
try:
stdin.write(input_data)
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
raise
if stdin:
try:
stdin.close()
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
raise
# Join writer thread with timeout first
if writer_thread is not None:
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
remaining = 0
writer_thread.join(remaining)
if writer_thread.is_alive():
# Timed out during write - collect partial results
results = {s: (b[0] if b else b'') for s, b in buffers.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
# Check for write errors
if writer_result:
raise writer_result[0]
# Join threads with timeout
# Join reader threads with timeout
for stream, t in threads:
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:

View file

@ -2298,6 +2298,39 @@ def test_pipeline_large_data_with_stderr(self):
self.assertGreater(len(result.stderr), stderr_size)
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_timeout_large_input(self):
"""Test that timeout is enforced with large input to a slow pipeline.
This verifies that run_pipeline() doesn't block indefinitely when
writing large input to a pipeline where the first process is slow
to consume stdin. The timeout should be enforced promptly.
This is particularly important on Windows where stdin writing could
block without proper threading.
"""
# Input larger than typical pipe buffer (64KB)
input_data = 'x' * (128 * 1024)
start = time.monotonic()
with self.assertRaises(subprocess.TimeoutExpired):
subprocess.run_pipeline(
# First process sleeps before reading - simulates slow consumer
[sys.executable, '-c',
'import sys, time; time.sleep(30); print(sys.stdin.read())'],
[sys.executable, '-c',
'import sys; print(len(sys.stdin.read()))'],
input=input_data, capture_output=True, text=True, timeout=0.5
)
elapsed = time.monotonic() - start
# Timeout should occur close to the specified timeout value,
# not after waiting for the subprocess to finish sleeping.
# Allow generous margin for slow CI, but must be well under
# the subprocess sleep time.
self.assertLess(elapsed, 5.0,
f"TimeoutExpired raised after {elapsed:.2f}s; expected ~0.5s. "
"Input writing may have blocked without checking timeout.")
def _get_test_grp_name():
for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'):