diff --git a/Doc/library/subprocess.rst b/Doc/library/subprocess.rst index da1d4047851..95014e2b66d 100644 --- a/Doc/library/subprocess.rst +++ b/Doc/library/subprocess.rst @@ -317,9 +317,6 @@ underlying :class:`Popen` interface can be used directly. in the returned :class:`PipelineResult`'s :attr:`~PipelineResult.stdout` attribute. Other keyword arguments are passed to each :class:`Popen` call. - Unlike :func:`run`, this function does not accept *universal_newlines*. - Use ``text=True`` instead. - Examples:: >>> import subprocess diff --git a/Lib/subprocess.py b/Lib/subprocess.py index 088a8ebb936..702918b8c38 100644 --- a/Lib/subprocess.py +++ b/Lib/subprocess.py @@ -354,6 +354,12 @@ def _make_input_view(input_data): return memoryview(input_data) +def _translate_newlines(data, encoding, errors): + """Decode bytes to str and translate newlines to \n.""" + data = data.decode(encoding, errors) + return data.replace("\r\n", "\n").replace("\r", "\n") + + def _communicate_io_posix(selector, stdin, input_view, input_offset, output_buffers, endtime): """ @@ -984,13 +990,6 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, if len(commands) < 2: raise ValueError('run_pipeline requires at least 2 commands') - # Reject universal_newlines - use text= instead - if kwargs.get('universal_newlines') is not None: - raise TypeError( - "run_pipeline() does not support 'universal_newlines'. " - "Use 'text=True' instead." - ) - # Validate no conflicting arguments if input is not None: if kwargs.get('stdin') is not None: @@ -1071,8 +1070,9 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, else: endtime = None - # Determine if we're in text mode - text_mode = kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors') + # Determine if we're in text mode (text= or universal_newlines=) + text_mode = (kwargs.get('text') or kwargs.get('universal_newlines') + or kwargs.get('encoding') or kwargs.get('errors')) encoding = kwargs.get('encoding') errors_param = kwargs.get('errors', 'strict') if text_mode and encoding is None: @@ -1115,13 +1115,11 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None, stdout = results.get(last_proc.stdout) stderr = results.get(stderr_reader) - # Decode stdout if in text mode (Popen text mode only applies to - # streams it creates, but we read via _communicate_streams which - # always returns bytes) + # Translate newlines if in text mode (decode and convert \r\n to \n) if text_mode and stdout is not None: - stdout = stdout.decode(encoding, errors_param) + stdout = _translate_newlines(stdout, encoding, errors_param) if text_mode and stderr is not None: - stderr = stderr.decode(encoding, errors_param) + stderr = _translate_newlines(stderr, encoding, errors_param) # Wait for all processes to complete (use remaining time from deadline) returncodes = [] @@ -1686,8 +1684,7 @@ def universal_newlines(self, universal_newlines): self.text_mode = bool(universal_newlines) def _translate_newlines(self, data, encoding, errors): - data = data.decode(encoding, errors) - return data.replace("\r\n", "\n").replace("\r", "\n") + return _translate_newlines(data, encoding, errors) def __enter__(self): return self diff --git a/Lib/test/test_subprocess.py b/Lib/test/test_subprocess.py index f91d1f3522f..4e43ea7bbfd 100644 --- a/Lib/test/test_subprocess.py +++ b/Lib/test/test_subprocess.py @@ -2123,16 +2123,16 @@ def test_pipeline_capture_output_conflict(self): ) self.assertIn('capture_output', str(cm.exception)) - def test_pipeline_rejects_universal_newlines(self): - """Test that universal_newlines is not supported""" - with self.assertRaises(TypeError) as cm: - subprocess.run_pipeline( - [sys.executable, '-c', 'pass'], - [sys.executable, '-c', 'pass'], - universal_newlines=True - ) - self.assertIn('universal_newlines', str(cm.exception)) - self.assertIn('text=True', str(cm.exception)) + def test_pipeline_universal_newlines(self): + """Test that universal_newlines=True works like text=True""" + result = subprocess.run_pipeline( + [sys.executable, '-c', 'print("hello")'], + [sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'], + capture_output=True, universal_newlines=True + ) + self.assertIsInstance(result.stdout, str) + self.assertIn('HELLO', result.stdout) + self.assertEqual(result.returncodes, [0, 0]) def test_pipeline_result_repr(self): """Test PipelineResult string representation"""