mirror of
https://github.com/python/cpython.git
synced 2025-12-08 06:10:17 +00:00
Support universal_newlines and use _translate_newlines in run_pipeline
- Factor out _translate_newlines() as a module-level function, have Popen's method delegate to it for code sharing - Remove rejection of universal_newlines kwarg in run_pipeline(), treat it the same as text=True (consistent with Popen behavior) - Use _translate_newlines() for text mode decoding in run_pipeline() to properly handle \r\n and \r newline sequences - Update documentation to remove mention of universal_newlines rejection - Update test to verify universal_newlines=True works like text=True Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
parent
978cd76cd8
commit
15f8a93bcb
3 changed files with 23 additions and 29 deletions
|
|
@ -317,9 +317,6 @@ underlying :class:`Popen` interface can be used directly.
|
|||
in the returned :class:`PipelineResult`'s :attr:`~PipelineResult.stdout`
|
||||
attribute. Other keyword arguments are passed to each :class:`Popen` call.
|
||||
|
||||
Unlike :func:`run`, this function does not accept *universal_newlines*.
|
||||
Use ``text=True`` instead.
|
||||
|
||||
Examples::
|
||||
|
||||
>>> import subprocess
|
||||
|
|
|
|||
|
|
@ -354,6 +354,12 @@ def _make_input_view(input_data):
|
|||
return memoryview(input_data)
|
||||
|
||||
|
||||
def _translate_newlines(data, encoding, errors):
|
||||
"""Decode bytes to str and translate newlines to \n."""
|
||||
data = data.decode(encoding, errors)
|
||||
return data.replace("\r\n", "\n").replace("\r", "\n")
|
||||
|
||||
|
||||
def _communicate_io_posix(selector, stdin, input_view, input_offset,
|
||||
output_buffers, endtime):
|
||||
"""
|
||||
|
|
@ -984,13 +990,6 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
|
|||
if len(commands) < 2:
|
||||
raise ValueError('run_pipeline requires at least 2 commands')
|
||||
|
||||
# Reject universal_newlines - use text= instead
|
||||
if kwargs.get('universal_newlines') is not None:
|
||||
raise TypeError(
|
||||
"run_pipeline() does not support 'universal_newlines'. "
|
||||
"Use 'text=True' instead."
|
||||
)
|
||||
|
||||
# Validate no conflicting arguments
|
||||
if input is not None:
|
||||
if kwargs.get('stdin') is not None:
|
||||
|
|
@ -1071,8 +1070,9 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
|
|||
else:
|
||||
endtime = None
|
||||
|
||||
# Determine if we're in text mode
|
||||
text_mode = kwargs.get('text') or kwargs.get('encoding') or kwargs.get('errors')
|
||||
# Determine if we're in text mode (text= or universal_newlines=)
|
||||
text_mode = (kwargs.get('text') or kwargs.get('universal_newlines')
|
||||
or kwargs.get('encoding') or kwargs.get('errors'))
|
||||
encoding = kwargs.get('encoding')
|
||||
errors_param = kwargs.get('errors', 'strict')
|
||||
if text_mode and encoding is None:
|
||||
|
|
@ -1115,13 +1115,11 @@ def run_pipeline(*commands, input=None, capture_output=False, timeout=None,
|
|||
stdout = results.get(last_proc.stdout)
|
||||
stderr = results.get(stderr_reader)
|
||||
|
||||
# Decode stdout if in text mode (Popen text mode only applies to
|
||||
# streams it creates, but we read via _communicate_streams which
|
||||
# always returns bytes)
|
||||
# Translate newlines if in text mode (decode and convert \r\n to \n)
|
||||
if text_mode and stdout is not None:
|
||||
stdout = stdout.decode(encoding, errors_param)
|
||||
stdout = _translate_newlines(stdout, encoding, errors_param)
|
||||
if text_mode and stderr is not None:
|
||||
stderr = stderr.decode(encoding, errors_param)
|
||||
stderr = _translate_newlines(stderr, encoding, errors_param)
|
||||
|
||||
# Wait for all processes to complete (use remaining time from deadline)
|
||||
returncodes = []
|
||||
|
|
@ -1686,8 +1684,7 @@ def universal_newlines(self, universal_newlines):
|
|||
self.text_mode = bool(universal_newlines)
|
||||
|
||||
def _translate_newlines(self, data, encoding, errors):
|
||||
data = data.decode(encoding, errors)
|
||||
return data.replace("\r\n", "\n").replace("\r", "\n")
|
||||
return _translate_newlines(data, encoding, errors)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
|
|
|||
|
|
@ -2123,16 +2123,16 @@ def test_pipeline_capture_output_conflict(self):
|
|||
)
|
||||
self.assertIn('capture_output', str(cm.exception))
|
||||
|
||||
def test_pipeline_rejects_universal_newlines(self):
|
||||
"""Test that universal_newlines is not supported"""
|
||||
with self.assertRaises(TypeError) as cm:
|
||||
subprocess.run_pipeline(
|
||||
[sys.executable, '-c', 'pass'],
|
||||
[sys.executable, '-c', 'pass'],
|
||||
universal_newlines=True
|
||||
)
|
||||
self.assertIn('universal_newlines', str(cm.exception))
|
||||
self.assertIn('text=True', str(cm.exception))
|
||||
def test_pipeline_universal_newlines(self):
|
||||
"""Test that universal_newlines=True works like text=True"""
|
||||
result = subprocess.run_pipeline(
|
||||
[sys.executable, '-c', 'print("hello")'],
|
||||
[sys.executable, '-c', 'import sys; print(sys.stdin.read().upper())'],
|
||||
capture_output=True, universal_newlines=True
|
||||
)
|
||||
self.assertIsInstance(result.stdout, str)
|
||||
self.assertIn('HELLO', result.stdout)
|
||||
self.assertEqual(result.returncodes, [0, 0])
|
||||
|
||||
def test_pipeline_result_repr(self):
|
||||
"""Test PipelineResult string representation"""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue