This commit is contained in:
Gregory P. Smith 2025-12-08 06:12:03 +02:00 committed by GitHub
commit 4a406e11f8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 1184 additions and 48 deletions

View file

@ -264,6 +264,182 @@ underlying :class:`Popen` interface can be used directly.
*stdout* and *stderr* attributes added
.. function:: run_pipeline(*commands, stdin=None, input=None, \
stdout=None, stderr=None, capture_output=False, \
timeout=None, check=False, encoding=None, \
errors=None, text=None, env=None, \
**other_popen_kwargs)
Run a pipeline of commands connected via pipes, similar to shell pipelines.
Wait for all commands to complete, then return a :class:`PipelineResult`
instance.
Each positional argument should be a command (a list of strings, or a string
if ``shell=True``) to execute. The standard output of each command is
connected to the standard input of the next command in the pipeline.
This function requires at least two commands. For a single command, use
:func:`run` instead.
If *capture_output* is true, the standard output of the final command and
the standard error of all commands will be captured. All processes in the
pipeline share a single stderr pipe, so their error output will be
interleaved. The *stdout* and *stderr* arguments may not be supplied at
the same time as *capture_output*.
A *timeout* may be specified in seconds. If the timeout expires, all
child processes will be killed and waited for, and then a
:exc:`TimeoutExpired` exception will be raised.
The *input* argument is passed to the first command's stdin. If used, it
must be a byte sequence, or a string if *encoding* or *errors* is specified
or *text* is true.
If *check* is true, and any process in the pipeline exits with a non-zero
exit code, a :exc:`PipelineError` exception will be raised. This behavior
is similar to the shell's ``pipefail`` option.
If *encoding* or *errors* are specified, or *text* is true, file objects
are opened in text mode using the specified encoding and errors.
.. note::
When using ``text=True`` with ``capture_output=True`` or ``stderr=PIPE``,
be aware that stderr output from multiple processes may be interleaved
in ways that produce incomplete multi-byte character sequences. For
reliable text decoding of stderr, consider capturing in binary mode
and decoding manually with appropriate error handling, or use
``errors='replace'`` or ``errors='backslashreplace'``.
If *stdin* is specified, it is connected to the first command's standard
input. If *stdout* is specified, it is connected to the last command's
standard output. When *stdout* is :data:`PIPE`, the output is available
in the returned :class:`PipelineResult`'s :attr:`~PipelineResult.stdout`
attribute. Other keyword arguments are passed to each :class:`Popen` call.
Examples::
>>> import subprocess
>>> # Equivalent to: echo "hello world" | tr a-z A-Z
>>> result = subprocess.run_pipeline(
... ['echo', 'hello world'],
... ['tr', 'a-z', 'A-Z'],
... capture_output=True, text=True
... )
>>> result.stdout
'HELLO WORLD\n'
>>> result.returncodes
[0, 0]
>>> # Pipeline with three commands
>>> result = subprocess.run_pipeline(
... ['echo', 'one\ntwo\nthree'],
... ['sort'],
... ['head', '-n', '2'],
... capture_output=True, text=True
... )
>>> result.stdout
'one\nthree\n'
>>> # Using input parameter
>>> result = subprocess.run_pipeline(
... ['cat'],
... ['wc', '-l'],
... input='line1\nline2\nline3\n',
... capture_output=True, text=True
... )
>>> result.stdout.strip()
'3'
>>> # Error handling with check=True
>>> subprocess.run_pipeline(
... ['echo', 'hello'],
... ['false'], # exits with status 1
... check=True
... )
Traceback (most recent call last):
...
subprocess.PipelineError: Pipeline failed: command 1 ['false'] returned 1
.. versionadded:: next
.. class:: PipelineResult
The return value from :func:`run_pipeline`, representing a pipeline of
processes that have finished.
.. attribute:: commands
The list of commands used to launch the pipeline. Each command is a list
of strings (or a string if ``shell=True`` was used).
.. attribute:: returncodes
List of exit status codes for each command in the pipeline. Typically,
an exit status of 0 indicates that the command ran successfully.
A negative value ``-N`` indicates that the command was terminated by
signal ``N`` (POSIX only).
.. attribute:: returncode
Exit status of the final command in the pipeline. This is a convenience
property equivalent to ``returncodes[-1]``.
.. attribute:: stdout
Captured stdout from the final command in the pipeline. A bytes sequence,
or a string if :func:`run_pipeline` was called with an encoding, errors,
or ``text=True``. ``None`` if stdout was not captured.
.. attribute:: stderr
Captured stderr from all commands in the pipeline, combined. A bytes
sequence, or a string if :func:`run_pipeline` was called with an
encoding, errors, or ``text=True``. ``None`` if stderr was not captured.
.. method:: check_returncodes()
If any command's :attr:`returncode` is non-zero, raise a
:exc:`PipelineError`.
.. versionadded:: next
.. exception:: PipelineError
Subclass of :exc:`SubprocessError`, raised when a pipeline run by
:func:`run_pipeline` (with ``check=True``) contains one or more commands
that returned a non-zero exit status. This is similar to the shell's
``pipefail`` behavior.
.. attribute:: commands
List of commands that were used in the pipeline.
.. attribute:: returncodes
List of exit status codes for each command in the pipeline.
.. attribute:: stdout
Output of the final command if it was captured. Otherwise, ``None``.
.. attribute:: stderr
Combined stderr output of all commands if it was captured.
Otherwise, ``None``.
.. attribute:: failed
List of ``(index, command, returncode)`` tuples for each command
that returned a non-zero exit status. The *index* is the position
of the command in the pipeline (0-based).
.. versionadded:: next
.. _frequently-used-arguments:
Frequently Used Arguments

View file

@ -62,7 +62,8 @@
__all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
"getoutput", "check_output", "run", "CalledProcessError", "DEVNULL",
"SubprocessError", "TimeoutExpired", "CompletedProcess"]
"SubprocessError", "TimeoutExpired", "CompletedProcess",
"run_pipeline", "PipelineResult", "PipelineError"]
# NOTE: We intentionally exclude list2cmdline as it is
# considered an internal implementation detail. issue10838.
@ -194,6 +195,36 @@ def stdout(self, value):
self.output = value
class PipelineError(SubprocessError):
"""Raised when run_pipeline() is called with check=True and one or more
commands in the pipeline return a non-zero exit status.
Attributes:
commands: List of commands in the pipeline (each a list of strings).
returncodes: List of return codes corresponding to each command.
stdout: Standard output from the final command (if captured).
stderr: Standard error output (if captured).
failed: List of (index, command, returncode) tuples for failed commands.
"""
def __init__(self, commands, returncodes, stdout=None, stderr=None):
self.commands = commands
self.returncodes = returncodes
self.stdout = stdout
self.stderr = stderr
self.failed = [
(i, cmd, rc)
for i, (cmd, rc) in enumerate(zip(commands, returncodes))
if rc != 0
]
def __str__(self):
failed_info = ", ".join(
f"command {i} {cmd!r} returned {rc}"
for i, cmd, rc in self.failed
)
return f"Pipeline failed: {failed_info}"
if _mswindows:
class STARTUPINFO:
def __init__(self, *, dwFlags=0, hStdInput=None, hStdOutput=None,
@ -289,6 +320,295 @@ def _cleanup():
DEVNULL = -3
# Helper function for multiplexed I/O
def _remaining_time_helper(endtime):
"""Calculate remaining time until deadline."""
if endtime is None:
return None
return endtime - _time()
def _flush_stdin(stdin):
"""Flush stdin, ignoring BrokenPipeError and closed file ValueError."""
try:
stdin.flush()
except BrokenPipeError:
pass
except ValueError:
# Ignore ValueError: I/O operation on closed file.
if not stdin.closed:
raise
def _make_input_view(input_data):
"""Convert input data to a byte memoryview for writing.
Handles the case where input_data is already a memoryview with
non-byte elements (e.g., int32 array) by casting to a byte view.
This ensures len(view) returns the byte count, not element count.
"""
if not input_data:
return None
if isinstance(input_data, memoryview):
return input_data.cast("b") # ensure byte view for correct len()
return memoryview(input_data)
def _translate_newlines(data, encoding, errors):
"""Decode bytes to str and translate newlines to \n."""
data = data.decode(encoding, errors)
return data.replace("\r\n", "\n").replace("\r", "\n")
def _communicate_io_posix(selector, stdin, input_view, input_offset,
output_buffers, endtime):
"""
Low-level POSIX I/O multiplexing loop.
This is the common core used by both _communicate_streams() and
Popen._communicate(). It handles the select loop for reading/writing
but does not manage stream lifecycle or raise timeout exceptions.
Args:
selector: A _PopenSelector with streams already registered
stdin: Writable file object for input, or None
input_view: memoryview of input bytes, or None
input_offset: Starting offset into input_view (for resume support)
output_buffers: Dict {file_object: list} to append read chunks to
endtime: Deadline timestamp, or None for no timeout
Returns:
(new_input_offset, completed)
- new_input_offset: How many bytes of input were written
- completed: True if all I/O finished, False if timed out
Note:
- Does NOT close any streams (caller decides)
- Does NOT raise TimeoutExpired (caller handles)
- Appends to output_buffers lists in place
"""
stdin_fd = stdin.fileno() if stdin else None
while selector.get_map():
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
return (input_offset, False) # Timed out
ready = selector.select(remaining)
# Check timeout after select (may have woken spuriously)
if endtime is not None and _time() > endtime:
return (input_offset, False) # Timed out
for key, events in ready:
if key.fd == stdin_fd:
# Write chunk to stdin
chunk = input_view[input_offset:input_offset + _PIPE_BUF]
try:
input_offset += os.write(key.fd, chunk)
except BrokenPipeError:
selector.unregister(key.fd)
try:
stdin.close()
except BrokenPipeError:
pass
else:
if input_offset >= len(input_view):
selector.unregister(key.fd)
try:
stdin.close()
except BrokenPipeError:
pass
elif key.fileobj in output_buffers:
# Read chunk from output stream
data = os.read(key.fd, 32768)
if not data:
selector.unregister(key.fileobj)
else:
output_buffers[key.fileobj].append(data)
return (input_offset, True) # Completed
def _communicate_streams(stdin=None, input_data=None, read_streams=None,
timeout=None, cmd_for_timeout=None):
"""
Multiplex I/O: write input_data to stdin, read from read_streams.
All streams must be file objects (not raw file descriptors).
All I/O is done in binary mode; caller handles text encoding.
Args:
stdin: Writable binary file object for input, or None
input_data: Bytes to write to stdin, or None
read_streams: List of readable binary file objects to read from
timeout: Timeout in seconds, or None for no timeout
cmd_for_timeout: Value to use for TimeoutExpired.cmd
Returns:
Dict mapping each file object in read_streams to its bytes data.
All file objects in read_streams will be closed.
Raises:
TimeoutExpired: If timeout expires (with partial data)
"""
if timeout is not None:
endtime = _time() + timeout
else:
endtime = None
read_streams = read_streams or []
if _mswindows:
return _communicate_streams_windows(
stdin, input_data, read_streams, endtime, timeout, cmd_for_timeout)
else:
return _communicate_streams_posix(
stdin, input_data, read_streams, endtime, timeout, cmd_for_timeout)
if _mswindows:
def _reader_thread_func(fh, buffer):
"""Thread function to read from a file handle into a buffer list."""
try:
buffer.append(fh.read())
except OSError:
buffer.append(b'')
def _writer_thread_func(fh, data, result):
"""Thread function to write data to a file handle and close it."""
try:
if data:
fh.write(data)
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
result.append(exc)
try:
fh.close()
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL and not result:
result.append(exc)
def _communicate_streams_windows(stdin, input_data, read_streams,
endtime, orig_timeout, cmd_for_timeout):
"""Windows implementation using threads."""
threads = []
buffers = {}
writer_thread = None
writer_result = []
# Start writer thread to send input to stdin
if stdin and input_data:
writer_thread = threading.Thread(
target=_writer_thread_func,
args=(stdin, input_data, writer_result))
writer_thread.daemon = True
writer_thread.start()
elif stdin:
# No input data, just close stdin
try:
stdin.close()
except BrokenPipeError:
pass
except OSError as exc:
if exc.errno != errno.EINVAL:
raise
# Start reader threads for each stream
for stream in read_streams:
buf = []
buffers[stream] = buf
t = threading.Thread(target=_reader_thread_func, args=(stream, buf))
t.daemon = True
t.start()
threads.append((stream, t))
# Join writer thread with timeout first
if writer_thread is not None:
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
remaining = 0
writer_thread.join(remaining)
if writer_thread.is_alive():
# Timed out during write - collect partial results
results = {s: (b[0] if b else b'') for s, b in buffers.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
# Check for write errors
if writer_result:
raise writer_result[0]
# Join reader threads with timeout
for stream, t in threads:
remaining = _remaining_time_helper(endtime)
if remaining is not None and remaining < 0:
remaining = 0
t.join(remaining)
if t.is_alive():
# Collect partial results
results = {s: (b[0] if b else b'') for s, b in buffers.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
# Collect results
return {stream: (buf[0] if buf else b'') for stream, buf in buffers.items()}
else:
def _communicate_streams_posix(stdin, input_data, read_streams,
endtime, orig_timeout, cmd_for_timeout):
"""POSIX implementation using selectors."""
# Build output buffers for each stream
output_buffers = {stream: [] for stream in read_streams}
# Prepare stdin
if stdin:
_flush_stdin(stdin)
if not input_data:
try:
stdin.close()
except BrokenPipeError:
pass
stdin = None # Don't register with selector
# Prepare input data
input_view = _make_input_view(input_data)
with _PopenSelector() as selector:
if stdin and input_data:
selector.register(stdin, selectors.EVENT_WRITE)
for stream in read_streams:
selector.register(stream, selectors.EVENT_READ)
# Run the common I/O loop
_, completed = _communicate_io_posix(
selector, stdin, input_view, 0, output_buffers, endtime)
if not completed:
# Timed out - collect partial results
results = {stream: b''.join(chunks)
for stream, chunks in output_buffers.items()}
raise TimeoutExpired(
cmd_for_timeout, orig_timeout,
output=results.get(read_streams[0]) if read_streams else None)
# Build results and close all file objects
results = {}
for stream, chunks in output_buffers.items():
results[stream] = b''.join(chunks)
try:
stream.close()
except OSError:
pass
return results
# XXX This function is only used by multiprocessing and the test suite,
# but it's here so that it can be imported when Python is compiled without
# threads.
@ -508,6 +828,47 @@ def check_returncode(self):
self.stderr)
class PipelineResult:
"""A pipeline of processes that have finished running.
This is returned by run_pipeline().
Attributes:
commands: List of commands in the pipeline (each command is a list).
returncodes: List of return codes for each command in the pipeline.
returncode: The return code of the final command (for convenience).
stdout: The standard output of the final command (None if not captured).
stderr: The standard error output (None if not captured).
"""
def __init__(self, commands, returncodes, stdout=None, stderr=None):
self.commands = list(commands)
self.returncodes = list(returncodes)
self.stdout = stdout
self.stderr = stderr
@property
def returncode(self):
"""Return the exit code of the final command in the pipeline."""
return self.returncodes[-1] if self.returncodes else None
def __repr__(self):
args = [f'commands={self.commands!r}',
f'returncodes={self.returncodes!r}']
if self.stdout is not None:
args.append(f'stdout={self.stdout!r}')
if self.stderr is not None:
args.append(f'stderr={self.stderr!r}')
return f"{type(self).__name__}({', '.join(args)})"
__class_getitem__ = classmethod(types.GenericAlias)
def check_returncodes(self):
"""Raise PipelineError if any command's exit code is non-zero."""
if any(rc != 0 for rc in self.returncodes):
raise PipelineError(self.commands, self.returncodes,
self.stdout, self.stderr)
def run(*popenargs,
input=None, capture_output=False, timeout=None, check=False, **kwargs):
"""Run command with arguments and return a CompletedProcess instance.
@ -578,6 +939,235 @@ def run(*popenargs,
return CompletedProcess(process.args, retcode, stdout, stderr)
def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
check=False, **kwargs):
"""Run a pipeline of commands connected via pipes.
Each positional argument should be a command (list of strings or a string
if shell=True) to execute. The stdout of each command is connected to the
stdin of the next command in the pipeline, similar to shell pipelines.
Returns a PipelineResult instance with attributes commands, returncodes,
stdout, and stderr. By default, stdout and stderr are not captured, and
those attributes will be None. Pass capture_output=True to capture both
the final command's stdout and stderr from all commands.
If check is True and any command's exit code is non-zero, it raises a
PipelineError. This is similar to shell "pipefail" behavior.
If timeout (seconds) is given and the pipeline takes too long, a
TimeoutExpired exception will be raised and all processes will be killed.
The optional "input" argument allows passing bytes or a string to the
first command's stdin. If you use this argument, you may not also specify
stdin in kwargs.
By default, all communication is in bytes. Use text=True, encoding, or
errors to enable text mode, which affects the input argument and stdout/
stderr outputs.
.. note::
When using text=True with capture_output=True or stderr=PIPE, be aware
that stderr output from multiple processes may be interleaved in ways
that produce invalid character sequences when decoded. For reliable
text decoding, avoid text=True when capturing stderr from pipelines,
or handle decoding errors appropriately.
Other keyword arguments are passed to each Popen call, except for stdin,
stdout which are managed by the pipeline.
Example:
# Equivalent to: cat file.txt | grep pattern | wc -l
result = run_pipeline(
['cat', 'file.txt'],
['grep', 'pattern'],
['wc', '-l'],
capture_output=True, text=True
)
print(result.stdout) # "42\\n"
print(result.returncodes) # [0, 0, 0]
"""
if len(commands) < 2:
raise ValueError('run_pipeline requires at least 2 commands')
# Validate no conflicting arguments
if input is not None:
if kwargs.get('stdin') is not None:
raise ValueError('stdin and input arguments may not both be used.')
if capture_output:
if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None:
raise ValueError('stdout and stderr arguments may not be used '
'with capture_output.')
# Determine stderr handling - all processes share the same stderr pipe
# When capturing, we create one pipe and all processes write to it
stderr_arg = kwargs.pop('stderr', None)
capture_stderr = capture_output or stderr_arg == PIPE
# stdin is for the first process, stdout is for the last process
stdin_arg = kwargs.pop('stdin', None)
stdout_arg = kwargs.pop('stdout', None)
processes = []
stderr_reader = None # File object for reading shared stderr (for parent)
stderr_write_fd = None # Write end of shared stderr pipe (for children)
try:
# Create a single stderr pipe that all processes will share
if capture_stderr:
stderr_read_fd, stderr_write_fd = os.pipe()
stderr_reader = os.fdopen(stderr_read_fd, 'rb')
for i, cmd in enumerate(commands):
is_first = (i == 0)
is_last = (i == len(commands) - 1)
# Determine stdin for this process
if is_first:
if input is not None:
proc_stdin = PIPE
else:
proc_stdin = stdin_arg # Could be None, PIPE, fd, or file
else:
proc_stdin = processes[-1].stdout
# Determine stdout for this process
if is_last:
if capture_output:
proc_stdout = PIPE
else:
proc_stdout = stdout_arg # Could be None, PIPE, fd, or file
else:
proc_stdout = PIPE
# All processes share the same stderr pipe (write end)
if capture_stderr:
proc_stderr = stderr_write_fd
else:
proc_stderr = stderr_arg
proc = Popen(cmd, stdin=proc_stdin, stdout=proc_stdout,
stderr=proc_stderr, **kwargs)
processes.append(proc)
# Close the parent's copy of the previous process's stdout
# to allow the pipe to signal EOF when the previous process exits
if not is_first and processes[-2].stdout is not None:
processes[-2].stdout.close()
# Close the write end of stderr pipe in parent - children have it
if stderr_write_fd is not None:
os.close(stderr_write_fd)
stderr_write_fd = None
first_proc = processes[0]
last_proc = processes[-1]
# Calculate deadline for timeout (used throughout)
if timeout is not None:
endtime = _time() + timeout
else:
endtime = None
# Determine if we're in text mode (text= or universal_newlines=)
text_mode = (kwargs.get('text') or kwargs.get('universal_newlines')
or kwargs.get('encoding') or kwargs.get('errors'))
encoding = kwargs.get('encoding')
errors_param = kwargs.get('errors', 'strict')
if text_mode and encoding is None:
encoding = locale.getencoding()
# Encode input if in text mode
input_data = input
if input_data is not None and text_mode:
input_data = input_data.encode(encoding, errors_param)
# Build list of streams to read from
read_streams = []
if last_proc.stdout is not None:
read_streams.append(last_proc.stdout)
if stderr_reader is not None:
read_streams.append(stderr_reader)
# Use multiplexed I/O to handle stdin/stdout/stderr concurrently
# This avoids deadlocks from pipe buffer limits
stdin_stream = first_proc.stdin if input is not None else None
try:
results = _communicate_streams(
stdin=stdin_stream,
input_data=input_data,
read_streams=read_streams,
timeout=_remaining_time_helper(endtime),
cmd_for_timeout=commands,
)
except TimeoutExpired:
# Kill all processes on timeout
for p in processes:
if p.poll() is None:
p.kill()
for p in processes:
p.wait()
raise
# Extract results
stdout = results.get(last_proc.stdout)
stderr = results.get(stderr_reader)
# Translate newlines if in text mode (decode and convert \r\n to \n)
if text_mode and stdout is not None:
stdout = _translate_newlines(stdout, encoding, errors_param)
if text_mode and stderr is not None:
stderr = _translate_newlines(stderr, encoding, errors_param)
# Wait for all processes to complete (use remaining time from deadline)
returncodes = []
for proc in processes:
try:
remaining = _remaining_time_helper(endtime)
proc.wait(timeout=remaining)
except TimeoutExpired:
# Kill all processes on timeout
for p in processes:
if p.poll() is None:
p.kill()
for p in processes:
p.wait()
raise TimeoutExpired(commands, timeout, stdout, stderr)
returncodes.append(proc.returncode)
result = PipelineResult(commands, returncodes, stdout, stderr)
if check and any(rc != 0 for rc in returncodes):
raise PipelineError(commands, returncodes, stdout, stderr)
return result
finally:
# Ensure all processes are cleaned up
for proc in processes:
if proc.poll() is None:
proc.kill()
proc.wait()
# Close any open file handles
if proc.stdin and not proc.stdin.closed:
proc.stdin.close()
if proc.stdout and not proc.stdout.closed:
proc.stdout.close()
# Close stderr pipe (reader is a file object, writer is a raw fd)
if stderr_reader is not None and not stderr_reader.closed:
try:
stderr_reader.close()
except OSError:
pass
if stderr_write_fd is not None:
try:
os.close(stderr_write_fd)
except OSError:
pass
def list2cmdline(seq):
"""
Translate a sequence of arguments into a command line
@ -1094,8 +1684,7 @@ def universal_newlines(self, universal_newlines):
self.text_mode = bool(universal_newlines)
def _translate_newlines(self, data, encoding, errors):
data = data.decode(encoding, errors)
return data.replace("\r\n", "\n").replace("\r", "\n")
return _translate_newlines(data, encoding, errors)
def __enter__(self):
return self
@ -2092,14 +2681,7 @@ def _communicate(self, input, endtime, orig_timeout):
if self.stdin and not self._communication_started:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
try:
self.stdin.flush()
except BrokenPipeError:
pass # communicate() must ignore BrokenPipeError.
except ValueError:
# ignore ValueError: I/O operation on closed file.
if not self.stdin.closed:
raise
_flush_stdin(self.stdin)
if not input:
try:
self.stdin.close()
@ -2124,11 +2706,8 @@ def _communicate(self, input, endtime, orig_timeout):
self._save_input(input)
if self._input:
if not isinstance(self._input, memoryview):
input_view = memoryview(self._input)
else:
input_view = self._input.cast("b") # byte input required
input_view = _make_input_view(self._input)
input_offset = self._input_offset if self._input else 0
with _PopenSelector() as selector:
if self.stdin and not self.stdin.closed and self._input:
@ -2138,41 +2717,32 @@ def _communicate(self, input, endtime, orig_timeout):
if self.stderr and not self.stderr.closed:
selector.register(self.stderr, selectors.EVENT_READ)
while selector.get_map():
timeout = self._remaining_time(endtime)
if timeout is not None and timeout < 0:
self._check_timeout(endtime, orig_timeout,
stdout, stderr,
skip_check_and_raise=True)
raise RuntimeError( # Impossible :)
'_check_timeout(..., skip_check_and_raise=True) '
'failed to raise TimeoutExpired.')
# Use the common I/O loop (supports resume via _input_offset)
stdin_to_write = (self.stdin if self.stdin and self._input
and not self.stdin.closed else None)
new_offset, completed = _communicate_io_posix(
selector,
stdin_to_write,
input_view,
input_offset,
self._fileobj2output,
endtime)
if self._input:
self._input_offset = new_offset
ready = selector.select(timeout)
self._check_timeout(endtime, orig_timeout, stdout, stderr)
if not completed:
self._check_timeout(endtime, orig_timeout, stdout, stderr,
skip_check_and_raise=True)
raise RuntimeError( # Impossible :)
'_check_timeout(..., skip_check_and_raise=True) '
'failed to raise TimeoutExpired.')
# XXX Rewrite these to use non-blocking I/O on the file
# objects; they are no longer using C stdio!
# Close streams now that we're done reading
if self.stdout:
self.stdout.close()
if self.stderr:
self.stderr.close()
for key, events in ready:
if key.fileobj is self.stdin:
chunk = input_view[self._input_offset :
self._input_offset + _PIPE_BUF]
try:
self._input_offset += os.write(key.fd, chunk)
except BrokenPipeError:
selector.unregister(key.fileobj)
key.fileobj.close()
else:
if self._input_offset >= len(input_view):
selector.unregister(key.fileobj)
key.fileobj.close()
elif key.fileobj in (self.stdout, self.stderr):
data = os.read(key.fd, 32768)
if not data:
selector.unregister(key.fileobj)
key.fileobj.close()
self._fileobj2output[key.fileobj].append(data)
try:
self.wait(timeout=self._remaining_time(endtime))
except TimeoutExpired as exc:

View file

@ -1984,6 +1984,396 @@ def test_encoding_warning(self):
self.assertStartsWith(lines[1], b"<string>:3: EncodingWarning: ")
class PipelineTestCase(BaseTestCase):
"""Tests for subprocess.run_pipeline()"""
def test_pipeline_basic(self):
"""Test basic two-command pipeline"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'print("hello world")'],
[sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'],
capture_output=True, text=True
)
self.assertEqual(result.stdout.strip(), 'HELLO WORLD')
self.assertEqual(result.returncodes, [0, 0])
self.assertEqual(result.returncode, 0)
def test_pipeline_three_commands(self):
"""Test pipeline with three commands"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'print("one\\ntwo\\nthree")'],
[sys.executable, '-c', 'import sys; print("".join(sorted(sys.stdin.readlines())))'],
[sys.executable, '-c', 'import sys; print(sys.stdin.read().strip().upper())'],
capture_output=True, text=True
)
self.assertEqual(result.stdout.strip(), 'ONE\nTHREE\nTWO')
self.assertEqual(result.returncodes, [0, 0, 0])
def test_pipeline_with_input(self):
"""Test pipeline with input data"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'],
[sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'],
input='hello', capture_output=True, text=True
)
self.assertEqual(result.stdout.strip(), '5')
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_memoryview_input(self):
"""Test pipeline with memoryview input (byte elements)"""
test_data = b"Hello, memoryview pipeline!"
mv = memoryview(test_data)
result = subprocess.run_pipeline(
[sys.executable, '-c',
'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'],
[sys.executable, '-c',
'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'],
input=mv, capture_output=True
)
self.assertEqual(result.stdout, test_data.upper())
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_memoryview_input_nonbyte(self):
"""Test pipeline with non-byte memoryview input (e.g., int32).
This tests the fix for gh-134453 where non-byte memoryviews
had incorrect length tracking on POSIX, causing data truncation.
"""
import array
# Create an array of 32-bit integers large enough to trigger
# chunked writing behavior (> PIPE_BUF)
pipe_buf = getattr(select, 'PIPE_BUF', 512)
# Each 'i' element is 4 bytes, need more than pipe_buf bytes total
num_elements = (pipe_buf // 4) + 100
test_array = array.array('i', [0x41424344 for _ in range(num_elements)])
expected_bytes = test_array.tobytes()
mv = memoryview(test_array)
result = subprocess.run_pipeline(
[sys.executable, '-c',
'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'],
[sys.executable, '-c',
'import sys; data = sys.stdin.buffer.read(); '
'sys.stdout.buffer.write(data)'],
input=mv, capture_output=True
)
self.assertEqual(result.stdout, expected_bytes,
msg=f"{len(result.stdout)=} != {len(expected_bytes)=}")
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_bytes_mode(self):
"""Test pipeline in binary mode"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'import sys; sys.stdout.buffer.write(b"hello")'],
[sys.executable, '-c', 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'],
capture_output=True
)
self.assertEqual(result.stdout, b'HELLO')
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_error_check(self):
"""Test that check=True raises PipelineError on failure"""
with self.assertRaises(subprocess.PipelineError) as cm:
subprocess.run_pipeline(
[sys.executable, '-c', 'print("hello")'],
[sys.executable, '-c', 'import sys; sys.exit(1)'],
capture_output=True, check=True
)
exc = cm.exception
self.assertEqual(len(exc.failed), 1)
self.assertEqual(exc.failed[0][0], 1) # Second command failed
self.assertEqual(exc.returncodes, [0, 1])
def test_pipeline_first_command_fails(self):
"""Test pipeline where first command fails"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'import sys; sys.exit(42)'],
[sys.executable, '-c', 'import sys; print(sys.stdin.read())'],
capture_output=True
)
self.assertEqual(result.returncodes[0], 42)
def test_pipeline_requires_two_commands(self):
"""Test that pipeline requires at least 2 commands"""
with self.assertRaises(ValueError) as cm:
subprocess.run_pipeline(
[sys.executable, '-c', 'print("hello")'],
capture_output=True
)
self.assertIn('at least 2 commands', str(cm.exception))
def test_pipeline_stdin_and_input_conflict(self):
"""Test that stdin and input cannot both be specified"""
with self.assertRaises(ValueError) as cm:
subprocess.run_pipeline(
[sys.executable, '-c', 'pass'],
[sys.executable, '-c', 'pass'],
input='data', stdin=subprocess.PIPE
)
self.assertIn('stdin', str(cm.exception))
self.assertIn('input', str(cm.exception))
def test_pipeline_capture_output_conflict(self):
"""Test that capture_output conflicts with stdout/stderr"""
with self.assertRaises(ValueError) as cm:
subprocess.run_pipeline(
[sys.executable, '-c', 'pass'],
[sys.executable, '-c', 'pass'],
capture_output=True, stdout=subprocess.PIPE
)
self.assertIn('capture_output', str(cm.exception))
def test_pipeline_universal_newlines(self):
"""Test that universal_newlines=True works like text=True"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'print("hello")'],
[sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'],
capture_output=True, universal_newlines=True
)
self.assertIsInstance(result.stdout, str)
self.assertIn('HELLO', result.stdout)
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_result_repr(self):
"""Test PipelineResult string representation"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'print("test")'],
[sys.executable, '-c', 'import sys; print(sys.stdin.read())'],
capture_output=True, text=True
)
repr_str = repr(result)
self.assertIn('PipelineResult', repr_str)
self.assertIn('commands=', repr_str)
self.assertIn('returncodes=', repr_str)
def test_pipeline_check_returncodes_method(self):
"""Test PipelineResult.check_returncodes() method"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'print("hello")'],
[sys.executable, '-c', 'import sys; sys.exit(5)'],
capture_output=True
)
with self.assertRaises(subprocess.PipelineError) as cm:
result.check_returncodes()
self.assertEqual(cm.exception.returncodes[1], 5)
def test_pipeline_no_capture(self):
"""Test pipeline without capturing output"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'pass'],
[sys.executable, '-c', 'pass'],
)
self.assertEqual(result.stdout, None)
self.assertEqual(result.stderr, None)
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_stderr_capture(self):
"""Test that stderr is captured from all processes"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'import sys; print("err1", file=sys.stderr); print("out1")'],
[sys.executable, '-c', 'import sys; print("err2", file=sys.stderr); print(sys.stdin.read())'],
capture_output=True, text=True
)
self.assertIn('err1', result.stderr)
self.assertIn('err2', result.stderr)
@unittest.skipIf(mswindows, "POSIX specific test")
def test_pipeline_timeout(self):
"""Test pipeline with timeout"""
with self.assertRaises(subprocess.TimeoutExpired):
subprocess.run_pipeline(
[sys.executable, '-c', 'import time; time.sleep(10); print("done")'],
[sys.executable, '-c', 'import sys; print(sys.stdin.read())'],
capture_output=True, timeout=0.1
)
def test_pipeline_error_str(self):
"""Test PipelineError string representation"""
try:
subprocess.run_pipeline(
[sys.executable, '-c', 'import sys; sys.exit(1)'],
[sys.executable, '-c', 'import sys; sys.exit(2)'],
capture_output=True, check=True
)
except subprocess.PipelineError as e:
error_str = str(e)
self.assertIn('Pipeline failed', error_str)
def test_pipeline_explicit_stdout_pipe(self):
"""Test pipeline with explicit stdout=PIPE"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'print("hello")'],
[sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'],
stdout=subprocess.PIPE
)
self.assertEqual(result.stdout.strip(), b'HELLO')
self.assertIsNone(result.stderr)
def test_pipeline_stdin_from_file(self):
"""Test pipeline with stdin from file"""
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write('file content\n')
f.flush()
fname = f.name
try:
with open(fname, 'r') as f:
result = subprocess.run_pipeline(
[sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'],
[sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'],
stdin=f, capture_output=True, text=True
)
self.assertEqual(result.stdout.strip(), '12') # "FILE CONTENT"
finally:
os.unlink(fname)
def test_pipeline_stdout_to_devnull(self):
"""Test pipeline with stdout to DEVNULL"""
result = subprocess.run_pipeline(
[sys.executable, '-c', 'print("hello")'],
[sys.executable, '-c', 'import sys; print(sys.stdin.read())'],
stdout=subprocess.DEVNULL
)
self.assertIsNone(result.stdout)
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_large_data_no_deadlock(self):
"""Test that large data doesn't cause pipe buffer deadlock.
This test verifies that the multiplexed I/O implementation properly
handles cases where pipe buffers would fill up. Without proper
multiplexing, this would deadlock because:
1. First process outputs large data filling stdout pipe buffer
2. Middle process reads some, processes, writes to its stdout
3. If stdout pipe buffer fills, middle process blocks on write
4. But first process is blocked waiting for middle to read more
5. Classic deadlock
The test uses data larger than typical pipe buffer size (64KB on Linux)
to ensure the multiplexed I/O is working correctly.
"""
# Generate data larger than typical pipe buffer (64KB)
# Use 256KB to ensure we exceed buffer on most systems
large_data = 'x' * (256 * 1024)
# Pipeline: input -> double the data -> count chars
# The middle process outputs twice as much, increasing buffer pressure
result = subprocess.run_pipeline(
[sys.executable, '-c',
'import sys; data = sys.stdin.read(); print(data + data)'],
[sys.executable, '-c',
'import sys; print(len(sys.stdin.read().strip()))'],
input=large_data, capture_output=True, text=True, timeout=30
)
# Original data doubled = 512KB = 524288 chars
# Second process strips whitespace (removes trailing newline) then counts
expected_len = 256 * 1024 * 2 # doubled data, newline stripped
self.assertEqual(result.stdout.strip(), str(expected_len))
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_large_data_three_stages(self):
"""Test large data through a three-stage pipeline.
This is a more complex deadlock scenario with three processes,
where buffer pressure can occur at multiple points.
"""
# Use 128KB of data
large_data = 'y' * (128 * 1024)
# Pipeline: input -> uppercase -> add prefix to each line -> count
# We use line-based processing to create more buffer churn
result = subprocess.run_pipeline(
[sys.executable, '-c',
'import sys; print(sys.stdin.read().upper())'],
[sys.executable, '-c',
'import sys; print("".join("PREFIX:" + line for line in sys.stdin))'],
[sys.executable, '-c',
'import sys; print(len(sys.stdin.read()))'],
input=large_data, capture_output=True, text=True, timeout=30
)
self.assertEqual(result.returncodes, [0, 0, 0])
# Just verify we got a reasonable numeric output without deadlock
output_len = int(result.stdout.strip())
self.assertGreater(output_len, len(large_data))
def test_pipeline_large_data_with_stderr(self):
"""Test large data with large stderr output from multiple processes.
Ensures stderr collection doesn't interfere with the main data flow
and doesn't cause deadlocks when multiple processes write large
amounts to stderr concurrently with stdin/stdout data flow.
"""
# 64KB of data through the pipeline
data_size = 64 * 1024
large_data = 'z' * data_size
# Each process writes 64KB to stderr as well
stderr_size = 64 * 1024
result = subprocess.run_pipeline(
[sys.executable, '-c', f'''
import sys
# Write large stderr output
sys.stderr.write("E" * {stderr_size})
sys.stderr.write("\\nstage1 done\\n")
# Pass through stdin to stdout
data = sys.stdin.read()
print(data)
'''],
[sys.executable, '-c', f'''
import sys
# Write large stderr output
sys.stderr.write("F" * {stderr_size})
sys.stderr.write("\\nstage2 done\\n")
# Count input size
data = sys.stdin.read()
print(len(data.strip()))
'''],
input=large_data, capture_output=True, text=True, timeout=30
)
self.assertEqual(result.stdout.strip(), str(data_size))
# Verify both processes wrote to stderr
self.assertIn('stage1 done', result.stderr)
self.assertIn('stage2 done', result.stderr)
# Verify large stderr was captured (at least most of it)
self.assertGreater(len(result.stderr), stderr_size)
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_timeout_large_input(self):
"""Test that timeout is enforced with large input to a slow pipeline.
This verifies that run_pipeline() doesn't block indefinitely when
writing large input to a pipeline where the first process is slow
to consume stdin. The timeout should be enforced promptly.
This is particularly important on Windows where stdin writing could
block without proper threading.
"""
# Input larger than typical pipe buffer (64KB)
input_data = 'x' * (128 * 1024)
start = time.monotonic()
with self.assertRaises(subprocess.TimeoutExpired):
subprocess.run_pipeline(
# First process sleeps before reading - simulates slow consumer
[sys.executable, '-c',
'import sys, time; time.sleep(30); print(sys.stdin.read())'],
[sys.executable, '-c',
'import sys; print(len(sys.stdin.read()))'],
input=input_data, capture_output=True, text=True, timeout=0.5
)
elapsed = time.monotonic() - start
# Timeout should occur close to the specified timeout value,
# not after waiting for the subprocess to finish sleeping.
# Allow generous margin for slow CI, but must be well under
# the subprocess sleep time.
self.assertLess(elapsed, 5.0,
f"TimeoutExpired raised after {elapsed:.2f}s; expected ~0.5s. "
"Input writing may have blocked without checking timeout.")
def _get_test_grp_name():
for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'):
if grp: