Simplify _communicate_streams() to only accept file objects

Remove support for raw file descriptors in _communicate_streams(),
requiring all streams to be file objects. This simplifies both the
Windows and POSIX implementations by removing isinstance() checks
and fd-wrapping logic.

The run_pipeline() function now wraps the stderr pipe's read end
with os.fdopen() immediately after creation.

This change makes _communicate_streams() more compatible with
Popen.communicate() which already uses file objects, enabling
potential future refactoring to share the multiplexed I/O logic.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Gregory P. Smith using claude.ai/code 2025-11-28 07:04:28 +00:00
parent 2470e14a70
commit e22d1da9bc
No known key found for this signature in database

View file

@ -333,18 +333,19 @@ def _communicate_streams(stdin=None, input_data=None, read_streams=None,
"""
Multiplex I/O: write input_data to stdin, read from read_streams.
Works with both file objects and raw file descriptors.
All streams must be file objects (not raw file descriptors).
All I/O is done in binary mode; caller handles text encoding.
Args:
stdin: Writable file object for input, or None
stdin: Writable binary file object for input, or None
input_data: Bytes to write to stdin, or None
read_streams: List of readable file objects or raw fds to read from
read_streams: List of readable binary file objects to read from
timeout: Timeout in seconds, or None for no timeout
cmd_for_timeout: Value to use for TimeoutExpired.cmd
Returns:
Dict mapping each item in read_streams to its bytes data
Dict mapping each file object in read_streams to its bytes data.
All file objects in read_streams will be closed.
Raises:
TimeoutExpired: If timeout expires (with partial data)
@ -377,22 +378,15 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
"""Windows implementation using threads."""
threads = []
buffers = {}
fds_to_close = []
# Start reader threads
# Start reader threads for each stream
for stream in read_streams:
buf = []
buffers[stream] = buf
# Wrap raw fds in file objects
if isinstance(stream, int):
fobj = os.fdopen(os.dup(stream), 'rb')
fds_to_close.append(stream)
else:
fobj = stream
t = threading.Thread(target=_reader_thread_func, args=(fobj, buf))
t = threading.Thread(target=_reader_thread_func, args=(stream, buf))
t.daemon = True
t.start()
threads.append((stream, t, fobj))
threads.append((stream, t))
# Write stdin
if stdin and input_data:
@ -413,7 +407,7 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
raise
# Join threads with timeout
for stream, t, fobj in threads:
for stream, t in threads:
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
remaining = 0
@ -425,13 +419,6 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
# Close any raw fds we duped
for fd in fds_to_close:
try:
os.close(fd)
except OSError:
pass
# Collect results
return {stream: (buf[0] if buf else b'') for stream, buf in buffers.items()}
@ -439,14 +426,10 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
def _communicate_streams_posix(stdin, input_data, read_streams,
endtime, orig_timeout, cmd_for_timeout):
"""POSIX implementation using selectors."""
# Normalize read_streams: build mapping of fd -> (original_key, chunks)
fd_info = {} # fd -> (original_stream, chunks_list)
# Build mapping of fd -> (file_object, chunks_list)
fd_info = {}
for stream in read_streams:
if isinstance(stream, int):
fd = stream
else:
fd = stream.fileno()
fd_info[fd] = (stream, [])
fd_info[stream.fileno()] = (stream, [])
# Prepare stdin
stdin_fd = None
@ -477,8 +460,8 @@ def _communicate_streams_posix(stdin, input_data, read_streams,
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
# Timed out - collect partial results
results = {orig: b''.join(chunks)
for fd, (orig, chunks) in fd_info.items()}
results = {stream: b''.join(chunks)
for fd, (stream, chunks) in fd_info.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
@ -487,8 +470,8 @@ def _communicate_streams_posix(stdin, input_data, read_streams,
# Check timeout after select
if endtime is not None and _time() > endtime:
results = {orig: b''.join(chunks)
for fd, (orig, chunks) in fd_info.items()}
results = {stream: b''.join(chunks)
for fd, (stream, chunks) in fd_info.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
@ -520,16 +503,14 @@ def _communicate_streams_posix(stdin, input_data, read_streams,
else:
fd_info[key.fd][1].append(data)
# Build results: map original stream keys to joined data
# Build results and close all file objects
results = {}
for fd, (orig_stream, chunks) in fd_info.items():
results[orig_stream] = b''.join(chunks)
# Close file objects (but not raw fds - caller manages those)
if not isinstance(orig_stream, int):
try:
orig_stream.close()
except OSError:
pass
for fd, (stream, chunks) in fd_info.items():
results[stream] = b''.join(chunks)
try:
stream.close()
except OSError:
pass
return results
@ -942,13 +923,14 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
stdout_arg = kwargs.pop('stdout', None)
processes = []
stderr_read_fd = None # Read end of shared stderr pipe (for parent)
stderr_reader = None # File object for reading shared stderr (for parent)
stderr_write_fd = None # Write end of shared stderr pipe (for children)
try:
# Create a single stderr pipe that all processes will share
if capture_stderr:
stderr_read_fd, stderr_write_fd = os.pipe()
stderr_reader = os.fdopen(stderr_read_fd, 'rb')
for i, cmd in enumerate(commands):
is_first = (i == 0)
@ -1017,8 +999,8 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
read_streams = []
if last_proc.stdout is not None:
read_streams.append(last_proc.stdout)
if stderr_read_fd is not None:
read_streams.append(stderr_read_fd)
if stderr_reader is not None:
read_streams.append(stderr_reader)
# Use multiplexed I/O to handle stdin/stdout/stderr concurrently
# This avoids deadlocks from pipe buffer limits
@ -1043,7 +1025,7 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
# Extract results
stdout = results.get(last_proc.stdout)
stderr = results.get(stderr_read_fd)
stderr = results.get(stderr_reader)
# Decode stdout if in text mode (Popen text mode only applies to
# streams it creates, but we read via _communicate_streams which
@ -1087,10 +1069,10 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
proc.stdin.close()
if proc.stdout and not proc.stdout.closed:
proc.stdout.close()
# Close stderr pipe file descriptor
if stderr_read_fd is not None:
# Close stderr pipe (reader is a file object, writer is a raw fd)
if stderr_reader is not None and not stderr_reader.closed:
try:
os.close(stderr_read_fd)
stderr_reader.close()
except OSError:
pass
if stderr_write_fd is not None: