diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 4c92d17da93..50b437e3ee5 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -320,7 +320,7 @@ def _cleanup(): DEVNULL = -3 -# Helper function for multiplexed I/O, used by run_pipeline() +# Helper function for multiplexed I/O def _remaining_time_helper(endtime): """Calculate remaining time until deadline.""" if endtime is None: @@ -328,6 +328,76 @@ def _remaining_time_helper(endtime): return endtime - _time() +def _communicate_io_posix(selector, stdin, input_view, input_offset, + output_buffers, endtime): + """ + Low-level POSIX I/O multiplexing loop. + + This is the common core used by both _communicate_streams() and + Popen._communicate(). It handles the select loop for reading/writing + but does not manage stream lifecycle or raise timeout exceptions. + + Args: + selector: A _PopenSelector with streams already registered + stdin: Writable file object for input, or None + input_view: memoryview of input bytes, or None + input_offset: Starting offset into input_view (for resume support) + output_buffers: Dict {file_object: list} to append read chunks to + endtime: Deadline timestamp, or None for no timeout + + Returns: + (new_input_offset, completed) + - new_input_offset: How many bytes of input were written + - completed: True if all I/O finished, False if timed out + + Note: + - Does NOT close any streams (caller decides) + - Does NOT raise TimeoutExpired (caller handles) + - Appends to output_buffers lists in place + """ + stdin_fd = stdin.fileno() if stdin else None + + while selector.get_map(): + remaining = _remaining_time_helper(endtime) + if remaining is not None and remaining < 0: + return (input_offset, False) # Timed out + + ready = selector.select(remaining) + + # Check timeout after select (may have woken spuriously) + if endtime is not None and _time() > endtime: + return (input_offset, False) # Timed out + + for key, events in ready: + if key.fd == stdin_fd: + # Write chunk to stdin + chunk = input_view[input_offset:input_offset + _PIPE_BUF] + try: + input_offset += os.write(key.fd, chunk) + except BrokenPipeError: + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + else: + if input_offset >= len(input_view): + selector.unregister(key.fd) + try: + stdin.close() + except BrokenPipeError: + pass + elif key.fileobj in output_buffers: + # Read chunk from output stream + data = os.read(key.fd, 32768) + if not data: + selector.unregister(key.fileobj) + else: + output_buffers[key.fileobj].append(data) + + return (input_offset, True) # Completed + + def _communicate_streams(stdin=None, input_data=None, read_streams=None, timeout=None, cmd_for_timeout=None): """ @@ -426,86 +496,46 @@ def _communicate_streams_windows(stdin, input_data, read_streams, def _communicate_streams_posix(stdin, input_data, read_streams, endtime, orig_timeout, cmd_for_timeout): """POSIX implementation using selectors.""" - # Build mapping of fd -> (file_object, chunks_list) - fd_info = {} - for stream in read_streams: - fd_info[stream.fileno()] = (stream, []) + # Build output buffers for each stream + output_buffers = {stream: [] for stream in read_streams} # Prepare stdin - stdin_fd = None if stdin: try: stdin.flush() except BrokenPipeError: pass - if input_data: - stdin_fd = stdin.fileno() - else: + if not input_data: try: stdin.close() except BrokenPipeError: pass + stdin = None # Don't register with selector # Prepare input data - input_offset = 0 input_view = memoryview(input_data) if input_data else None with _PopenSelector() as selector: - if stdin_fd is not None and input_data: - selector.register(stdin_fd, selectors.EVENT_WRITE) - for fd in fd_info: - selector.register(fd, selectors.EVENT_READ) + if stdin and input_data: + selector.register(stdin, selectors.EVENT_WRITE) + for stream in read_streams: + selector.register(stream, selectors.EVENT_READ) - while selector.get_map(): - remaining = _remaining_time_helper(endtime) - if remaining is not None and remaining < 0: - # Timed out - collect partial results - results = {stream: b''.join(chunks) - for fd, (stream, chunks) in fd_info.items()} - raise TimeoutExpired( - cmd_for_timeout, orig_timeout, - output=results.get(read_streams[0]) if read_streams else None) + # Run the common I/O loop + _, completed = _communicate_io_posix( + selector, stdin, input_view, 0, output_buffers, endtime) - ready = selector.select(remaining) - - # Check timeout after select - if endtime is not None and _time() > endtime: - results = {stream: b''.join(chunks) - for fd, (stream, chunks) in fd_info.items()} - raise TimeoutExpired( - cmd_for_timeout, orig_timeout, - output=results.get(read_streams[0]) if read_streams else None) - - for key, events in ready: - if key.fd == stdin_fd: - # Write chunk to stdin - chunk = input_view[input_offset:input_offset + _PIPE_BUF] - try: - input_offset += os.write(key.fd, chunk) - except BrokenPipeError: - selector.unregister(key.fd) - try: - stdin.close() - except BrokenPipeError: - pass - else: - if input_offset >= len(input_data): - selector.unregister(key.fd) - try: - stdin.close() - except BrokenPipeError: - pass - elif key.fd in fd_info: - # Read chunk from output stream - data = os.read(key.fd, 32768) - if not data: - selector.unregister(key.fd) - else: - fd_info[key.fd][1].append(data) + if not completed: + # Timed out - collect partial results + results = {stream: b''.join(chunks) + for stream, chunks in output_buffers.items()} + raise TimeoutExpired( + cmd_for_timeout, orig_timeout, + output=results.get(read_streams[0]) if read_streams else None) # Build results and close all file objects results = {} - for fd, (stream, chunks) in fd_info.items(): + for stream, chunks in output_buffers.items(): results[stream] = b''.join(chunks) try: stream.close() @@ -2633,6 +2663,10 @@ def _communicate(self, input, endtime, orig_timeout): input_view = memoryview(self._input) else: input_view = self._input.cast("b") # byte input required + input_offset = self._input_offset + else: + input_view = None + input_offset = 0 with _PopenSelector() as selector: if self.stdin and not self.stdin.closed and self._input: @@ -2642,38 +2676,32 @@ def _communicate(self, input, endtime, orig_timeout): if self.stderr and not self.stderr.closed: selector.register(self.stderr, selectors.EVENT_READ) - while selector.get_map(): - timeout = self._remaining_time(endtime) - if timeout is not None and timeout < 0: - self._check_timeout(endtime, orig_timeout, - stdout, stderr, - skip_check_and_raise=True) - raise RuntimeError( # Impossible :) - '_check_timeout(..., skip_check_and_raise=True) ' - 'failed to raise TimeoutExpired.') + # Use the common I/O loop (supports resume via _input_offset) + stdin_to_write = (self.stdin if self.stdin and self._input + and not self.stdin.closed else None) + new_offset, completed = _communicate_io_posix( + selector, + stdin_to_write, + input_view, + input_offset, + self._fileobj2output, + endtime) + if self._input: + self._input_offset = new_offset - ready = selector.select(timeout) - self._check_timeout(endtime, orig_timeout, stdout, stderr) + if not completed: + self._check_timeout(endtime, orig_timeout, stdout, stderr, + skip_check_and_raise=True) + raise RuntimeError( # Impossible :) + '_check_timeout(..., skip_check_and_raise=True) ' + 'failed to raise TimeoutExpired.') + + # Close streams now that we're done reading + if self.stdout: + self.stdout.close() + if self.stderr: + self.stderr.close() - for key, events in ready: - if key.fileobj is self.stdin: - chunk = input_view[self._input_offset : - self._input_offset + _PIPE_BUF] - try: - self._input_offset += os.write(key.fd, chunk) - except BrokenPipeError: - selector.unregister(key.fileobj) - key.fileobj.close() - else: - if self._input_offset >= len(input_view): - selector.unregister(key.fileobj) - key.fileobj.close() - elif key.fileobj in (self.stdout, self.stderr): - data = os.read(key.fd, 32768) - if not data: - selector.unregister(key.fileobj) - key.fileobj.close() - self._fileobj2output[key.fileobj].append(data) try: self.wait(timeout=self._remaining_time(endtime)) except TimeoutExpired as exc: