From e22d1da9bccb0c1cb40ec6e0d21b6ce16316cd1c Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Fri, 28 Nov 2025 07:04:28 +0000 Subject: [PATCH] Simplify _communicate_streams() to only accept file objects Remove support for raw file descriptors in _communicate_streams(), requiring all streams to be file objects. This simplifies both the Windows and POSIX implementations by removing isinstance() checks and fd-wrapping logic. The run_pipeline() function now wraps the stderr pipe's read end with os.fdopen() immediately after creation. This change makes _communicate_streams() more compatible with Popen.communicate() which already uses file objects, enabling potential future refactoring to share the multiplexed I/O logic. Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 80 ++++++++++++++++++----------------------------- 1 file changed, 31 insertions(+), 49 deletions(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 5a6a7086db1..7d497cc102a 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -333,18 +333,19 @@ def _communicate_streams(stdin=None, input_data=None, read_streams=None, """ Multiplex I/O: write input_data to stdin, read from read_streams. - Works with both file objects and raw file descriptors. + All streams must be file objects (not raw file descriptors). All I/O is done in binary mode; caller handles text encoding. Args: - stdin: Writable file object for input, or None + stdin: Writable binary file object for input, or None input_data: Bytes to write to stdin, or None - read_streams: List of readable file objects or raw fds to read from + read_streams: List of readable binary file objects to read from timeout: Timeout in seconds, or None for no timeout cmd_for_timeout: Value to use for TimeoutExpired.cmd Returns: - Dict mapping each item in read_streams to its bytes data + Dict mapping each file object in read_streams to its bytes data. + All file objects in read_streams will be closed. Raises: TimeoutExpired: If timeout expires (with partial data) @@ -377,22 +378,15 @@ def _communicate_streams_windows(stdin, input_data, read_streams, """Windows implementation using threads.""" threads = [] buffers = {} - fds_to_close = [] - # Start reader threads + # Start reader threads for each stream for stream in read_streams: buf = [] buffers[stream] = buf - # Wrap raw fds in file objects - if isinstance(stream, int): - fobj = os.fdopen(os.dup(stream), 'rb') - fds_to_close.append(stream) - else: - fobj = stream - t = threading.Thread(target=_reader_thread_func, args=(fobj, buf)) + t = threading.Thread(target=_reader_thread_func, args=(stream, buf)) t.daemon = True t.start() - threads.append((stream, t, fobj)) + threads.append((stream, t)) # Write stdin if stdin and input_data: @@ -413,7 +407,7 @@ def _communicate_streams_windows(stdin, input_data, read_streams, raise # Join threads with timeout - for stream, t, fobj in threads: + for stream, t in threads: remaining = _remaining_time_helper(endtime) if remaining is not None and remaining < 0: remaining = 0 @@ -425,13 +419,6 @@ def _communicate_streams_windows(stdin, input_data, read_streams, cmd_for_timeout, orig_timeout, output=results.get(read_streams[0]) if read_streams else None) - # Close any raw fds we duped - for fd in fds_to_close: - try: - os.close(fd) - except OSError: - pass - # Collect results return {stream: (buf[0] if buf else b'') for stream, buf in buffers.items()} @@ -439,14 +426,10 @@ def _communicate_streams_windows(stdin, input_data, read_streams, def _communicate_streams_posix(stdin, input_data, read_streams, endtime, orig_timeout, cmd_for_timeout): """POSIX implementation using selectors.""" - # Normalize read_streams: build mapping of fd -> (original_key, chunks) - fd_info = {} # fd -> (original_stream, chunks_list) + # Build mapping of fd -> (file_object, chunks_list) + fd_info = {} for stream in read_streams: - if isinstance(stream, int): - fd = stream - else: - fd = stream.fileno() - fd_info[fd] = (stream, []) + fd_info[stream.fileno()] = (stream, []) # Prepare stdin stdin_fd = None @@ -477,8 +460,8 @@ def _communicate_streams_posix(stdin, input_data, read_streams, remaining = _remaining_time_helper(endtime) if remaining is not None and remaining < 0: # Timed out - collect partial results - results = {orig: b''.join(chunks) - for fd, (orig, chunks) in fd_info.items()} + results = {stream: b''.join(chunks) + for fd, (stream, chunks) in fd_info.items()} raise TimeoutExpired( cmd_for_timeout, orig_timeout, output=results.get(read_streams[0]) if read_streams else None) @@ -487,8 +470,8 @@ def _communicate_streams_posix(stdin, input_data, read_streams, # Check timeout after select if endtime is not None and _time() > endtime: - results = {orig: b''.join(chunks) - for fd, (orig, chunks) in fd_info.items()} + results = {stream: b''.join(chunks) + for fd, (stream, chunks) in fd_info.items()} raise TimeoutExpired( cmd_for_timeout, orig_timeout, output=results.get(read_streams[0]) if read_streams else None) @@ -520,16 +503,14 @@ def _communicate_streams_posix(stdin, input_data, read_streams, else: fd_info[key.fd][1].append(data) - # Build results: map original stream keys to joined data + # Build results and close all file objects results = {} - for fd, (orig_stream, chunks) in fd_info.items(): - results[orig_stream] = b''.join(chunks) - # Close file objects (but not raw fds - caller manages those) - if not isinstance(orig_stream, int): - try: - orig_stream.close() - except OSError: - pass + for fd, (stream, chunks) in fd_info.items(): + results[stream] = b''.join(chunks) + try: + stream.close() + except OSError: + pass return results @@ -942,13 +923,14 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, stdout_arg = kwargs.pop('stdout', None) processes = [] - stderr_read_fd = None # Read end of shared stderr pipe (for parent) + stderr_reader = None # File object for reading shared stderr (for parent) stderr_write_fd = None # Write end of shared stderr pipe (for children) try: # Create a single stderr pipe that all processes will share if capture_stderr: stderr_read_fd, stderr_write_fd = os.pipe() + stderr_reader = os.fdopen(stderr_read_fd, 'rb') for i, cmd in enumerate(commands): is_first = (i == 0) @@ -1017,8 +999,8 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, read_streams = [] if last_proc.stdout is not None: read_streams.append(last_proc.stdout) - if stderr_read_fd is not None: - read_streams.append(stderr_read_fd) + if stderr_reader is not None: + read_streams.append(stderr_reader) # Use multiplexed I/O to handle stdin/stdout/stderr concurrently # This avoids deadlocks from pipe buffer limits @@ -1043,7 +1025,7 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, # Extract results stdout = results.get(last_proc.stdout) - stderr = results.get(stderr_read_fd) + stderr = results.get(stderr_reader) # Decode stdout if in text mode (Popen text mode only applies to # streams it creates, but we read via _communicate_streams which @@ -1087,10 +1069,10 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, proc.stdin.close() if proc.stdout and not proc.stdout.closed: proc.stdout.close() - # Close stderr pipe file descriptor - if stderr_read_fd is not None: + # Close stderr pipe (reader is a file object, writer is a raw fd) + if stderr_reader is not None and not stderr_reader.closed: try: - os.close(stderr_read_fd) + stderr_reader.close() except OSError: pass if stderr_write_fd is not None: