[3.12] gh-130736: Fix asyncio test_shutdown_default_executor_timeout() (GH-130800) (#130826)

gh-130736: Fix asyncio test_shutdown_default_executor_timeout() (GH-130800)

Replace time.sleep() with threading.Event.
(cherry picked from commit 6c48ed7d62)

Co-authored-by: Victor Stinner <vstinner@python.org>
This commit is contained in:
Miss Islington (bot) 2025-03-04 13:11:00 +01:00 committed by GitHub
parent 7ce5f15981
commit a0ac4dbe88
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -232,20 +232,25 @@ def test_set_default_executor_error(self):
self.assertIsNone(self.loop._default_executor)
def test_shutdown_default_executor_timeout(self):
event = threading.Event()
class DummyExecutor(concurrent.futures.ThreadPoolExecutor):
def shutdown(self, wait=True, *, cancel_futures=False):
if wait:
time.sleep(0.1)
event.wait()
self.loop._process_events = mock.Mock()
self.loop._write_to_self = mock.Mock()
executor = DummyExecutor()
self.loop.set_default_executor(executor)
with self.assertWarnsRegex(RuntimeWarning,
"The executor did not finishing joining"):
self.loop.run_until_complete(
self.loop.shutdown_default_executor(timeout=0.01))
try:
with self.assertWarnsRegex(RuntimeWarning,
"The executor did not finishing joining"):
self.loop.run_until_complete(
self.loop.shutdown_default_executor(timeout=0.01))
finally:
event.set()
def test_call_soon(self):
def cb():