gh-87512: Fix subprocess using timeout= on Windows blocking with a large input= (GH-142058)

On Windows, Popen._communicate() previously wrote to stdin synchronously, which could block indefinitely if the subprocess didn't consume input= quickly and the pipe buffer filled up. The timeout= parameter was only checked when joining the reader threads, not during the stdin write.

This change moves the Windows stdin writing to a background thread (similar to how stdout/stderr are read in threads), allowing the timeout to be properly enforced. If timeout expires, TimeoutExpired is raised promptly and the writer thread continues in the background. Subsequent calls to communicate() will join the existing writer thread.

Adds test_communicate_timeout_large_input to verify that TimeoutExpired is raised promptly when communicate() is called with large input and a timeout, even when the subprocess doesn't consume stdin quickly.

This test already passed on POSIX (where select() is used) but failed on Windows where the stdin write blocks without checking the timeout.

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Gregory P. Smith 2025-11-28 22:07:03 -08:00 committed by GitHub
parent 923056b2d4
commit 5b1862bdd8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 82 additions and 2 deletions

View file

@ -1613,6 +1613,10 @@ def _readerthread(self, fh, buffer):
fh.close()
def _writerthread(self, input):
self._stdin_write(input)
def _communicate(self, input, endtime, orig_timeout):
# Start reader threads feeding into a list hanging off of this
# object, unless they've already been started.
@ -1631,8 +1635,23 @@ def _communicate(self, input, endtime, orig_timeout):
self.stderr_thread.daemon = True
self.stderr_thread.start()
if self.stdin:
self._stdin_write(input)
# Start writer thread to send input to stdin, unless already
# started. The thread writes input and closes stdin when done,
# or continues in the background on timeout.
if self.stdin and not hasattr(self, "_stdin_thread"):
self._stdin_thread = \
threading.Thread(target=self._writerthread,
args=(input,))
self._stdin_thread.daemon = True
self._stdin_thread.start()
# Wait for the writer thread, or time out. If we time out, the
# thread remains writing and the fd left open in case the user
# calls communicate again.
if hasattr(self, "_stdin_thread"):
self._stdin_thread.join(self._remaining_time(endtime))
if self._stdin_thread.is_alive():
raise TimeoutExpired(self.args, orig_timeout)
# Wait for the reader threads, or time out. If we time out, the
# threads remain reading and the fds left open in case the user