Refactor POSIX communicate I/O into shared _communicate_io_posix()

Extract the core selector-based I/O loop into a new _communicate_io_posix()
function that is shared by both _communicate_streams_posix() (used by
run_pipeline) and Popen._communicate() (used by Popen.communicate).

The new function:
- Takes a pre-configured selector and output buffers
- Supports resume via input_offset parameter (for Popen timeout retry)
- Returns (new_offset, completed) instead of raising TimeoutExpired
- Does not close streams (caller decides based on use case)

This reduces code duplication and ensures both APIs use the same
well-tested I/O multiplexing logic.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Gregory P. Smith using claude.ai/code 2025-11-29 00:54:27 +00:00
parent 3c28ed6e93
commit 9f53a8e883
No known key found for this signature in database

View file

@ -320,7 +320,7 @@ def _cleanup():
DEVNULL = -3
# Helper function for multiplexed I/O, used by run_pipeline()
# Helper function for multiplexed I/O
def _remaining_time_helper(endtime):
"""Calculate remaining time until deadline."""
if endtime is None:
@ -328,6 +328,76 @@ def _remaining_time_helper(endtime):
return endtime - _time()
def _communicate_io_posix(selector, stdin, input_view, input_offset,
output_buffers, endtime):
"""
Low-level POSIX I/O multiplexing loop.
This is the common core used by both _communicate_streams() and
Popen._communicate(). It handles the select loop for reading/writing
but does not manage stream lifecycle or raise timeout exceptions.
Args:
selector: A _PopenSelector with streams already registered
stdin: Writable file object for input, or None
input_view: memoryview of input bytes, or None
input_offset: Starting offset into input_view (for resume support)
output_buffers: Dict {file_object: list} to append read chunks to
endtime: Deadline timestamp, or None for no timeout
Returns:
(new_input_offset, completed)
- new_input_offset: How many bytes of input were written
- completed: True if all I/O finished, False if timed out
Note:
- Does NOT close any streams (caller decides)
- Does NOT raise TimeoutExpired (caller handles)
- Appends to output_buffers lists in place
"""
stdin_fd = stdin.fileno() if stdin else None
while selector.get_map():
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
return (input_offset, False) # Timed out
ready = selector.select(remaining)
# Check timeout after select (may have woken spuriously)
if endtime is not None and _time() > endtime:
return (input_offset, False) # Timed out
for key, events in ready:
if key.fd == stdin_fd:
# Write chunk to stdin
chunk = input_view[input_offset:input_offset + _PIPE_BUF]
try:
input_offset += os.write(key.fd, chunk)
except BrokenPipeError:
selector.unregister(key.fd)
try:
stdin.close()
except BrokenPipeError:
pass
else:
if input_offset >= len(input_view):
selector.unregister(key.fd)
try:
stdin.close()
except BrokenPipeError:
pass
elif key.fileobj in output_buffers:
# Read chunk from output stream
data = os.read(key.fd, 32768)
if not data:
selector.unregister(key.fileobj)
else:
output_buffers[key.fileobj].append(data)
return (input_offset, True) # Completed
def _communicate_streams(stdin=None, input_data=None, read_streams=None,
timeout=None, cmd_for_timeout=None):
"""
@ -426,86 +496,46 @@ def _communicate_streams_windows(stdin, input_data, read_streams,
def _communicate_streams_posix(stdin, input_data, read_streams,
endtime, orig_timeout, cmd_for_timeout):
"""POSIX implementation using selectors."""
# Build mapping of fd -> (file_object, chunks_list)
fd_info = {}
for stream in read_streams:
fd_info[stream.fileno()] = (stream, [])
# Build output buffers for each stream
output_buffers = {stream: [] for stream in read_streams}
# Prepare stdin
stdin_fd = None
if stdin:
try:
stdin.flush()
except BrokenPipeError:
pass
if input_data:
stdin_fd = stdin.fileno()
else:
if not input_data:
try:
stdin.close()
except BrokenPipeError:
pass
stdin = None # Don't register with selector
# Prepare input data
input_offset = 0
input_view = memoryview(input_data) if input_data else None
with _PopenSelector() as selector:
if stdin_fd is not None and input_data:
selector.register(stdin_fd, selectors.EVENT_WRITE)
for fd in fd_info:
selector.register(fd, selectors.EVENT_READ)
if stdin and input_data:
selector.register(stdin, selectors.EVENT_WRITE)
for stream in read_streams:
selector.register(stream, selectors.EVENT_READ)
while selector.get_map():
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
# Timed out - collect partial results
results = {stream: b''.join(chunks)
for fd, (stream, chunks) in fd_info.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
# Run the common I/O loop
_, completed = _communicate_io_posix(
selector, stdin, input_view, 0, output_buffers, endtime)
ready = selector.select(remaining)
# Check timeout after select
if endtime is not None and _time() > endtime:
results = {stream: b''.join(chunks)
for fd, (stream, chunks) in fd_info.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
for key, events in ready:
if key.fd == stdin_fd:
# Write chunk to stdin
chunk = input_view[input_offset:input_offset + _PIPE_BUF]
try:
input_offset += os.write(key.fd, chunk)
except BrokenPipeError:
selector.unregister(key.fd)
try:
stdin.close()
except BrokenPipeError:
pass
else:
if input_offset >= len(input_data):
selector.unregister(key.fd)
try:
stdin.close()
except BrokenPipeError:
pass
elif key.fd in fd_info:
# Read chunk from output stream
data = os.read(key.fd, 32768)
if not data:
selector.unregister(key.fd)
else:
fd_info[key.fd][1].append(data)
if not completed:
# Timed out - collect partial results
results = {stream: b''.join(chunks)
for stream, chunks in output_buffers.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
# Build results and close all file objects
results = {}
for fd, (stream, chunks) in fd_info.items():
for stream, chunks in output_buffers.items():
results[stream] = b''.join(chunks)
try:
stream.close()
@ -2633,6 +2663,10 @@ def _communicate(self, input, endtime, orig_timeout):
input_view = memoryview(self._input)
else:
input_view = self._input.cast("b") # byte input required
input_offset = self._input_offset
else:
input_view = None
input_offset = 0
with _PopenSelector() as selector:
if self.stdin and not self.stdin.closed and self._input:
@ -2642,38 +2676,32 @@ def _communicate(self, input, endtime, orig_timeout):
if self.stderr and not self.stderr.closed:
selector.register(self.stderr, selectors.EVENT_READ)
while selector.get_map():
timeout = self._remaining_time(endtime)
if timeout is not None and timeout < 0:
self._check_timeout(endtime, orig_timeout,
stdout, stderr,
skip_check_and_raise=True)
raise RuntimeError( # Impossible :)
'_check_timeout(..., skip_check_and_raise=True) '
'failed to raise TimeoutExpired.')
# Use the common I/O loop (supports resume via _input_offset)
stdin_to_write = (self.stdin if self.stdin and self._input
and not self.stdin.closed else None)
new_offset, completed = _communicate_io_posix(
selector,
stdin_to_write,
input_view,
input_offset,
self._fileobj2output,
endtime)
if self._input:
self._input_offset = new_offset
ready = selector.select(timeout)
self._check_timeout(endtime, orig_timeout, stdout, stderr)
if not completed:
self._check_timeout(endtime, orig_timeout, stdout, stderr,
skip_check_and_raise=True)
raise RuntimeError( # Impossible :)
'_check_timeout(..., skip_check_and_raise=True) '
'failed to raise TimeoutExpired.')
# Close streams now that we're done reading
if self.stdout:
self.stdout.close()
if self.stderr:
self.stderr.close()
for key, events in ready:
if key.fileobj is self.stdin:
chunk = input_view[self._input_offset :
self._input_offset + _PIPE_BUF]
try:
self._input_offset += os.write(key.fd, chunk)
except BrokenPipeError:
selector.unregister(key.fileobj)
key.fileobj.close()
else:
if self._input_offset >= len(input_view):
selector.unregister(key.fileobj)
key.fileobj.close()
elif key.fileobj in (self.stdout, self.stderr):
data = os.read(key.fd, 32768)
if not data:
selector.unregister(key.fileobj)
key.fileobj.close()
self._fileobj2output[key.fileobj].append(data)
try:
self.wait(timeout=self._remaining_time(endtime))
except TimeoutExpired as exc: