Fix memoryview and closed stdin handling in _communicate_streams_posix

Apply the same fixes from Popen._communicate() to _communicate_streams_posix
for run_pipeline():

1. Handle non-byte memoryview input by casting to byte view (gh-134453):
   Non-byte memoryviews (e.g., int32 arrays) had incorrect length tracking
   because len() returns element count, not byte count. Now cast to "b"
   view for correct progress tracking.

2. Handle ValueError on stdin.flush() when stdin is closed (gh-74389):
   Ignore ValueError from flush() if stdin is already closed, matching
   the BrokenPipeError handling.

Add tests for memoryview input to run_pipeline:
- test_pipeline_memoryview_input: basic byte memoryview
- test_pipeline_memoryview_input_nonbyte: int32 array memoryview

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Gregory P. Smith using claude.ai/code 2025-11-29 08:46:22 +00:00
parent d420f29e2b
commit df8f082f59
No known key found for this signature in database
2 changed files with 54 additions and 2 deletions

View file

@ -540,6 +540,10 @@ def _communicate_streams_posix(stdin, input_data, read_streams,
stdin.flush()
except BrokenPipeError:
pass
except ValueError:
# ignore ValueError: I/O operation on closed file.
if not stdin.closed:
raise
if not input_data:
try:
stdin.close()
@ -547,8 +551,14 @@ def _communicate_streams_posix(stdin, input_data, read_streams,
pass
stdin = None # Don't register with selector
# Prepare input data
input_view = memoryview(input_data) if input_data else None
# Prepare input data - cast to bytes view for correct length tracking
if input_data:
if not isinstance(input_data, memoryview):
input_view = memoryview(input_data)
else:
input_view = input_data.cast("b") # byte view required
else:
input_view = None
with _PopenSelector() as selector:
if stdin and input_data:

View file

@ -2019,6 +2019,48 @@ def test_pipeline_with_input(self):
self.assertEqual(result.stdout.strip(), '5')
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_memoryview_input(self):
"""Test pipeline with memoryview input (byte elements)"""
test_data = b"Hello, memoryview pipeline!"
mv = memoryview(test_data)
result = subprocess.run_pipeline(
[sys.executable, '-c',
'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'],
[sys.executable, '-c',
'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read().upper())'],
input=mv, capture_output=True
)
self.assertEqual(result.stdout, test_data.upper())
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_memoryview_input_nonbyte(self):
"""Test pipeline with non-byte memoryview input (e.g., int32).
This tests the fix for gh-134453 where non-byte memoryviews
had incorrect length tracking on POSIX, causing data truncation.
"""
import array
# Create an array of 32-bit integers large enough to trigger
# chunked writing behavior (> PIPE_BUF)
pipe_buf = getattr(select, 'PIPE_BUF', 512)
# Each 'i' element is 4 bytes, need more than pipe_buf bytes total
num_elements = (pipe_buf // 4) + 100
test_array = array.array('i', [0x41424344 for _ in range(num_elements)])
expected_bytes = test_array.tobytes()
mv = memoryview(test_array)
result = subprocess.run_pipeline(
[sys.executable, '-c',
'import sys; sys.stdout.buffer.write(sys.stdin.buffer.read())'],
[sys.executable, '-c',
'import sys; data = sys.stdin.buffer.read(); '
'sys.stdout.buffer.write(data)'],
input=mv, capture_output=True
)
self.assertEqual(result.stdout, expected_bytes,
msg=f"{len(result.stdout)=} != {len(expected_bytes)=}")
self.assertEqual(result.returncodes, [0, 0])
def test_pipeline_bytes_mode(self):
"""Test pipeline in binary mode"""
result = subprocess.run_pipeline(