Improve test_pipeline_large_data_with_stderr to use large stderr

Update the test to write 64KB to stderr from each process (128KB total)
instead of just small status messages. This better tests that the
multiplexed I/O handles concurrent large data on both stdout and stderr
without deadlocking.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Gregory P. Smith using claude.ai/code 2025-11-28 07:07:43 +00:00
parent e22d1da9bc
commit a3e98a73be
No known key found for this signature in database

View file

@ -2256,36 +2256,46 @@ def test_pipeline_large_data_three_stages(self):
self.assertGreater(output_len, len(large_data))
def test_pipeline_large_data_with_stderr(self):
"""Test large data with stderr output from multiple processes.
"""Test large data with large stderr output from multiple processes.
Ensures stderr collection doesn't interfere with the main data flow
and doesn't cause deadlocks when multiple processes write stderr.
and doesn't cause deadlocks when multiple processes write large
amounts to stderr concurrently with stdin/stdout data flow.
"""
# 64KB of data
# 64KB of data through the pipeline
data_size = 64 * 1024
large_data = 'z' * data_size
# Each process writes 64KB to stderr as well
stderr_size = 64 * 1024
result = subprocess.run_pipeline(
[sys.executable, '-c', '''
[sys.executable, '-c', f'''
import sys
sys.stderr.write("stage1 processing\\n")
# Write large stderr output
sys.stderr.write("E" * {stderr_size})
sys.stderr.write("\\nstage1 done\\n")
# Pass through stdin to stdout
data = sys.stdin.read()
sys.stderr.write(f"stage1 read {len(data)} bytes\\n")
print(data)
'''],
[sys.executable, '-c', '''
[sys.executable, '-c', f'''
import sys
sys.stderr.write("stage2 processing\\n")
# Write large stderr output
sys.stderr.write("F" * {stderr_size})
sys.stderr.write("\\nstage2 done\\n")
# Count input size
data = sys.stdin.read()
sys.stderr.write(f"stage2 read {len(data)} bytes\\n")
print(len(data.strip()))
'''],
input=large_data, capture_output=True, text=True, timeout=30
)
self.assertEqual(result.stdout.strip(), str(data_size))
self.assertIn('stage1 processing', result.stderr)
self.assertIn('stage2 processing', result.stderr)
# Verify both processes wrote to stderr
self.assertIn('stage1 done', result.stderr)
self.assertIn('stage2 done', result.stderr)
# Verify large stderr was captured (at least most of it)
self.assertGreater(len(result.stderr), stderr_size)
self.assertEqual(result.returncodes, [0, 0])