From e3a2fbe6da1007b3989cdf2ff3789a67313d52a2 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith using claude.ai/code" Date: Thu, 27 Nov 2025 23:52:43 +0000 Subject: [PATCH] Add subprocess.run_pipeline() for command pipe chaining Add a new run_pipeline() function to the subprocess module that enables running multiple commands connected via pipes, similar to shell pipelines. New API: - run_pipeline(*commands, ...) - Run a pipeline of commands - PipelineResult - Return type with commands, returncodes, stdout, stderr - PipelineError - Raised when check=True and any command fails Features: - Supports arbitrary number of commands (minimum 2) - capture_output, input, timeout, and check parameters like run() - stdin= connects to first process, stdout= connects to last process - Text mode support via text=True, encoding, errors - All processes share a single stderr pipe for simplicity - "pipefail" semantics: check=True fails if any command fails Unlike run(), this function does not accept universal_newlines. Use text=True instead. Example: result = subprocess.run_pipeline( ['cat', 'file.txt'], ['grep', 'pattern'], ['wc', '-l'], capture_output=True, text=True ) Co-authored-by: Claude Opus 4.5 --- Lib/subprocess.py | 304 +++++++++++++++++++++++++++++++++++- Lib/test/test_subprocess.py | 211 +++++++++++++++++++++++++ 2 files changed, 514 insertions(+), 1 deletion(-) diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 4d5ab6fbff0..d89876fa4d1 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -62,7 +62,8 @@ __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput", "getoutput", "check_output", "run", "CalledProcessError", "DEVNULL", - "SubprocessError", "TimeoutExpired", "CompletedProcess"] + "SubprocessError", "TimeoutExpired", "CompletedProcess", + "run_pipeline", "PipelineResult", "PipelineError"] # NOTE: We intentionally exclude list2cmdline as it is # considered an internal implementation detail. issue10838. @@ -194,6 +195,36 @@ def stdout(self, value): self.output = value +class PipelineError(SubprocessError): + """Raised when run_pipeline() is called with check=True and one or more + commands in the pipeline return a non-zero exit status. + + Attributes: + commands: List of commands in the pipeline (each a list of strings). + returncodes: List of return codes corresponding to each command. + stdout: Standard output from the final command (if captured). + stderr: Standard error output (if captured). + failed: List of (index, command, returncode) tuples for failed commands. + """ + def __init__(self, commands, returncodes, stdout=None, stderr=None): + self.commands = commands + self.returncodes = returncodes + self.stdout = stdout + self.stderr = stderr + self.failed = [ + (i, cmd, rc) + for i, (cmd, rc) in enumerate(zip(commands, returncodes)) + if rc != 0 + ] + + def __str__(self): + failed_info = ", ".join( + f"command {i} {cmd!r} returned {rc}" + for i, cmd, rc in self.failed + ) + return f"Pipeline failed: {failed_info}" + + if _mswindows: class STARTUPINFO: def __init__(self, *, dwFlags=0, hStdInput=None, hStdOutput=None, @@ -508,6 +539,47 @@ def check_returncode(self): self.stderr) +class PipelineResult: + """A pipeline of processes that have finished running. + + This is returned by run_pipeline(). + + Attributes: + commands: List of commands in the pipeline (each command is a list). + returncodes: List of return codes for each command in the pipeline. + returncode: The return code of the final command (for convenience). + stdout: The standard output of the final command (None if not captured). + stderr: The standard error output (None if not captured). + """ + def __init__(self, commands, returncodes, stdout=None, stderr=None): + self.commands = list(commands) + self.returncodes = list(returncodes) + self.stdout = stdout + self.stderr = stderr + + @property + def returncode(self): + """Return the exit code of the final command in the pipeline.""" + return self.returncodes[-1] if self.returncodes else None + + def __repr__(self): + args = [f'commands={self.commands!r}', + f'returncodes={self.returncodes!r}'] + if self.stdout is not None: + args.append(f'stdout={self.stdout!r}') + if self.stderr is not None: + args.append(f'stderr={self.stderr!r}') + return f"{type(self).__name__}({', '.join(args)})" + + __class_getitem__ = classmethod(types.GenericAlias) + + def check_returncodes(self): + """Raise PipelineError if any command's exit code is non-zero.""" + if any(rc != 0 for rc in self.returncodes): + raise PipelineError(self.commands, self.returncodes, + self.stdout, self.stderr) + + def run(*popenargs, input=None, capture_output=False, timeout=None, check=False, **kwargs): """Run command with arguments and return a CompletedProcess instance. @@ -578,6 +650,236 @@ def run(*popenargs, return CompletedProcess(process.args, retcode, stdout, stderr) +def run_pipeline(*commands, input=None, capture_output=False, timeout=None, + check=False, **kwargs): + """Run a pipeline of commands connected via pipes. + + Each positional argument should be a command (list of strings or a string + if shell=True) to execute. The stdout of each command is connected to the + stdin of the next command in the pipeline, similar to shell pipelines. + + Returns a PipelineResult instance with attributes commands, returncodes, + stdout, and stderr. By default, stdout and stderr are not captured, and + those attributes will be None. Pass capture_output=True to capture both + the final command's stdout and stderr from all commands. + + If check is True and any command's exit code is non-zero, it raises a + PipelineError. This is similar to shell "pipefail" behavior. + + If timeout (seconds) is given and the pipeline takes too long, a + TimeoutExpired exception will be raised and all processes will be killed. + + The optional "input" argument allows passing bytes or a string to the + first command's stdin. If you use this argument, you may not also specify + stdin in kwargs. + + By default, all communication is in bytes. Use text=True, encoding, or + errors to enable text mode, which affects the input argument and stdout/ + stderr outputs. + + .. note:: + When using text=True with capture_output=True or stderr=PIPE, be aware + that stderr output from multiple processes may be interleaved in ways + that produce invalid character sequences when decoded. For reliable + text decoding, avoid text=True when capturing stderr from pipelines, + or handle decoding errors appropriately. + + Other keyword arguments are passed to each Popen call, except for stdin, + stdout which are managed by the pipeline. + + Example: + # Equivalent to: cat file.txt | grep pattern | wc -l + result = run_pipeline( + ['cat', 'file.txt'], + ['grep', 'pattern'], + ['wc', '-l'], + capture_output=True, text=True + ) + print(result.stdout) # "42\\n" + print(result.returncodes) # [0, 0, 0] + """ + if len(commands) < 2: + raise ValueError('run_pipeline requires at least 2 commands') + + # Reject universal_newlines - use text= instead + if kwargs.get('universal_newlines') is not None: + raise TypeError( + "run_pipeline() does not support 'universal_newlines'. " + "Use 'text=True' instead." + ) + + # Validate no conflicting arguments + if input is not None: + if kwargs.get('stdin') is not None: + raise ValueError('stdin and input arguments may not both be used.') + + if capture_output: + if kwargs.get('stdout') is not None or kwargs.get('stderr') is not None: + raise ValueError('stdout and stderr arguments may not be used ' + 'with capture_output.') + + # Determine stderr handling - all processes share the same stderr pipe + # When capturing, we create one pipe and all processes write to it + stderr_arg = kwargs.pop('stderr', None) + capture_stderr = capture_output or stderr_arg == PIPE + + # stdin is for the first process, stdout is for the last process + stdin_arg = kwargs.pop('stdin', None) + stdout_arg = kwargs.pop('stdout', None) + + processes = [] + stderr_read_fd = None # Read end of shared stderr pipe (for parent) + stderr_write_fd = None # Write end of shared stderr pipe (for children) + + try: + # Create a single stderr pipe that all processes will share + if capture_stderr: + stderr_read_fd, stderr_write_fd = os.pipe() + + for i, cmd in enumerate(commands): + is_first = (i == 0) + is_last = (i == len(commands) - 1) + + # Determine stdin for this process + if is_first: + if input is not None: + proc_stdin = PIPE + else: + proc_stdin = stdin_arg # Could be None, PIPE, fd, or file + else: + proc_stdin = processes[-1].stdout + + # Determine stdout for this process + if is_last: + if capture_output: + proc_stdout = PIPE + else: + proc_stdout = stdout_arg # Could be None, PIPE, fd, or file + else: + proc_stdout = PIPE + + # All processes share the same stderr pipe (write end) + if capture_stderr: + proc_stderr = stderr_write_fd + else: + proc_stderr = stderr_arg + + proc = Popen(cmd, stdin=proc_stdin, stdout=proc_stdout, + stderr=proc_stderr, **kwargs) + processes.append(proc) + + # Close the parent's copy of the previous process's stdout + # to allow the pipe to signal EOF when the previous process exits + if not is_first and processes[-2].stdout is not None: + processes[-2].stdout.close() + + # Close the write end of stderr pipe in parent - children have it + if stderr_write_fd is not None: + os.close(stderr_write_fd) + stderr_write_fd = None + + first_proc = processes[0] + last_proc = processes[-1] + + # Handle communication with timeout + start_time = _time() if timeout is not None else None + + # Write input to first process if provided + if input is not None and first_proc.stdin is not None: + try: + first_proc.stdin.write(input) + except BrokenPipeError: + pass # First process may have exited early + finally: + first_proc.stdin.close() + + # Determine if we're in text mode + text_mode = kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors') + + # Read output from the last process + stdout = None + stderr = None + + # Read stdout if we created a pipe for it (capture_output or stdout=PIPE) + if last_proc.stdout is not None: + stdout = last_proc.stdout.read() + + # Read stderr from the shared pipe + if stderr_read_fd is not None: + stderr = os.read(stderr_read_fd, 1024 * 1024 * 10) # Up to 10MB + # Keep reading until EOF + while True: + chunk = os.read(stderr_read_fd, 65536) + if not chunk: + break + stderr += chunk + + # Calculate remaining timeout + def remaining_timeout(): + if timeout is None: + return None + elapsed = _time() - start_time + remaining = timeout - elapsed + if remaining <= 0: + raise TimeoutExpired(commands, timeout, stdout, stderr) + return remaining + + # Wait for all processes to complete + returncodes = [] + for proc in processes: + try: + proc.wait(timeout=remaining_timeout()) + except TimeoutExpired: + # Kill all processes on timeout + for p in processes: + if p.poll() is None: + p.kill() + for p in processes: + p.wait() + raise TimeoutExpired(commands, timeout, stdout, stderr) + returncodes.append(proc.returncode) + + # Handle text mode conversion for stderr (stdout is already handled + # by Popen when text=True). stderr is always read as bytes since + # we use os.pipe() directly. + if text_mode and stderr is not None: + encoding = kwargs.get('encoding') + errors = kwargs.get('errors', 'strict') + if encoding is None: + encoding = locale.getencoding() + stderr = stderr.decode(encoding, errors) + + result = PipelineResult(commands, returncodes, stdout, stderr) + + if check and any(rc != 0 for rc in returncodes): + raise PipelineError(commands, returncodes, stdout, stderr) + + return result + + finally: + # Ensure all processes are cleaned up + for proc in processes: + if proc.poll() is None: + proc.kill() + proc.wait() + # Close any open file handles + if proc.stdin and not proc.stdin.closed: + proc.stdin.close() + if proc.stdout and not proc.stdout.closed: + proc.stdout.close() + # Close stderr pipe file descriptors + if stderr_read_fd is not None: + try: + os.close(stderr_read_fd) + except OSError: + pass + if stderr_write_fd is not None: + try: + os.close(stderr_write_fd) + except OSError: + pass + + def list2cmdline(seq): """ Translate a sequence of arguments into a command line diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index 806a1e3fa30..01aa6c02dc2 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -1984,6 +1984,217 @@ def test_encoding_warning(self): self.assertStartsWith(lines[1], b":3: EncodingWarning: ") +class PipelineTestCase(BaseTestCase): + """Tests for subprocess.run_pipeline()""" + + def test_pipeline_basic(self): + """Test basic two-command pipeline""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello world")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), 'HELLO WORLD') + self.assertEqual(result.returncodes, [0, 0]) + self.assertEqual(result.returncode, 0) + + def test_pipeline_three_commands(self): + """Test pipeline with three commands""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("one\\ntwo\\nthree")'], + [sys.executable, '-c', 'import sys; print("".join(sorted(sys.stdin.readlines())))'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().strip().upper())'], + capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), 'ONE\nTHREE\nTWO') + self.assertEqual(result.returncodes, [0, 0, 0]) + + def test_pipeline_with_input(self): + """Test pipeline with input data""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'], + input='hello', capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), '5') + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_bytes_mode(self): + """Test pipeline in binary mode""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.stdout.buffer.write(b"hello")'], + [sys.executable, '-c', 'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'], + capture_output=True + ) + self.assertEqual(result.stdout, b'HELLO') + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_error_check(self): + """Test that check=True raises PipelineError on failure""" + with self.assertRaises(subprocess.PipelineError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; sys.exit(1)'], + capture_output=True, check=True + ) + exc = cm.exception + self.assertEqual(len(exc.failed), 1) + self.assertEqual(exc.failed[0][0], 1) # Second command failed + self.assertEqual(exc.returncodes, [0, 1]) + + def test_pipeline_first_command_fails(self): + """Test pipeline where first command fails""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.exit(42)'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True + ) + self.assertEqual(result.returncodes[0], 42) + + def test_pipeline_requires_two_commands(self): + """Test that pipeline requires at least 2 commands""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + capture_output=True + ) + self.assertIn('at least 2 commands', str(cm.exception)) + + def test_pipeline_stdin_and_input_conflict(self): + """Test that stdin and input cannot both be specified""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + input='data', stdin=subprocess.PIPE + ) + self.assertIn('stdin', str(cm.exception)) + self.assertIn('input', str(cm.exception)) + + def test_pipeline_capture_output_conflict(self): + """Test that capture_output conflicts with stdout/stderr""" + with self.assertRaises(ValueError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + capture_output=True, stdout=subprocess.PIPE + ) + self.assertIn('capture_output', str(cm.exception)) + + def test_pipeline_rejects_universal_newlines(self): + """Test that universal_newlines is not supported""" + with self.assertRaises(TypeError) as cm: + subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + universal_newlines=True + ) + self.assertIn('universal_newlines', str(cm.exception)) + self.assertIn('text=True', str(cm.exception)) + + def test_pipeline_result_repr(self): + """Test PipelineResult string representation""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("test")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True, text=True + ) + repr_str = repr(result) + self.assertIn('PipelineResult', repr_str) + self.assertIn('commands=', repr_str) + self.assertIn('returncodes=', repr_str) + + def test_pipeline_check_returncodes_method(self): + """Test PipelineResult.check_returncodes() method""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; sys.exit(5)'], + capture_output=True + ) + with self.assertRaises(subprocess.PipelineError) as cm: + result.check_returncodes() + self.assertEqual(cm.exception.returncodes[1], 5) + + def test_pipeline_no_capture(self): + """Test pipeline without capturing output""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'pass'], + [sys.executable, '-c', 'pass'], + ) + self.assertEqual(result.stdout, None) + self.assertEqual(result.stderr, None) + self.assertEqual(result.returncodes, [0, 0]) + + def test_pipeline_stderr_capture(self): + """Test that stderr is captured from all processes""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print("err1", file=sys.stderr); print("out1")'], + [sys.executable, '-c', 'import sys; print("err2", file=sys.stderr); print(sys.stdin.read())'], + capture_output=True, text=True + ) + self.assertIn('err1', result.stderr) + self.assertIn('err2', result.stderr) + + @unittest.skipIf(mswindows, "POSIX specific test") + def test_pipeline_timeout(self): + """Test pipeline with timeout""" + with self.assertRaises(subprocess.TimeoutExpired): + subprocess.run_pipeline( + [sys.executable, '-c', 'import time; time.sleep(10); print("done")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + capture_output=True, timeout=0.1 + ) + + def test_pipeline_error_str(self): + """Test PipelineError string representation""" + try: + subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; sys.exit(1)'], + [sys.executable, '-c', 'import sys; sys.exit(2)'], + capture_output=True, check=True + ) + except subprocess.PipelineError as e: + error_str = str(e) + self.assertIn('Pipeline failed', error_str) + + def test_pipeline_explicit_stdout_pipe(self): + """Test pipeline with explicit stdout=PIPE""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + stdout=subprocess.PIPE + ) + self.assertEqual(result.stdout.strip(), b'HELLO') + self.assertIsNone(result.stderr) + + def test_pipeline_stdin_from_file(self): + """Test pipeline with stdin from file""" + with tempfile.NamedTemporaryFile(mode='w', delete=False) as f: + f.write('file content\n') + f.flush() + fname = f.name + try: + with open(fname, 'r') as f: + result = subprocess.run_pipeline( + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + [sys.executable, '-c', 'import sys; print(len(sys.stdin.read().strip()))'], + stdin=f, capture_output=True, text=True + ) + self.assertEqual(result.stdout.strip(), '12') # "FILE CONTENT" + finally: + os.unlink(fname) + + def test_pipeline_stdout_to_devnull(self): + """Test pipeline with stdout to DEVNULL""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read())'], + stdout=subprocess.DEVNULL + ) + self.assertIsNone(result.stdout) + self.assertEqual(result.returncodes, [0, 0]) + + def _get_test_grp_name(): for name_group in ('staff', 'nogroup', 'grp', 'nobody', 'nfsnobody'): if grp: