mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 11:14:33 +00:00 
			
		
		
		
	 0412e3ae03
			
		
	
	
		0412e3ae03
		
			
		
	
	
	
	
		
			
			(cherry picked from commit 3dd9bfac04)
Co-authored-by: Andrew Svetlov <andrew.svetlov@gmail.com>
		
	
			
		
			
				
	
	
		
			294 lines
		
	
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			294 lines
		
	
	
	
		
			8.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import asyncio
 | |
| import unittest
 | |
| import time
 | |
| 
 | |
| 
 | |
| def tearDownModule():
 | |
|     asyncio.set_event_loop_policy(None)
 | |
| 
 | |
| 
 | |
| # The following value can be used as a very small timeout:
 | |
| # it passes check "timeout > 0", but has almost
 | |
| # no effect on the test performance
 | |
| _EPSILON = 0.0001
 | |
| 
 | |
| 
 | |
| class SlowTask:
 | |
|     """ Task will run for this defined time, ignoring cancel requests """
 | |
|     TASK_TIMEOUT = 0.2
 | |
| 
 | |
|     def __init__(self):
 | |
|         self.exited = False
 | |
| 
 | |
|     async def run(self):
 | |
|         exitat = time.monotonic() + self.TASK_TIMEOUT
 | |
| 
 | |
|         while True:
 | |
|             tosleep = exitat - time.monotonic()
 | |
|             if tosleep <= 0:
 | |
|                 break
 | |
| 
 | |
|             try:
 | |
|                 await asyncio.sleep(tosleep)
 | |
|             except asyncio.CancelledError:
 | |
|                 pass
 | |
| 
 | |
|         self.exited = True
 | |
| 
 | |
| 
 | |
| class AsyncioWaitForTest(unittest.IsolatedAsyncioTestCase):
 | |
| 
 | |
|     async def test_asyncio_wait_for_cancelled(self):
 | |
|         t = SlowTask()
 | |
| 
 | |
|         waitfortask = asyncio.create_task(
 | |
|             asyncio.wait_for(t.run(), t.TASK_TIMEOUT * 2))
 | |
|         await asyncio.sleep(0)
 | |
|         waitfortask.cancel()
 | |
|         await asyncio.wait({waitfortask})
 | |
| 
 | |
|         self.assertTrue(t.exited)
 | |
| 
 | |
|     async def test_asyncio_wait_for_timeout(self):
 | |
|         t = SlowTask()
 | |
| 
 | |
|         try:
 | |
|             await asyncio.wait_for(t.run(), t.TASK_TIMEOUT / 2)
 | |
|         except asyncio.TimeoutError:
 | |
|             pass
 | |
| 
 | |
|         self.assertTrue(t.exited)
 | |
| 
 | |
|     async def test_wait_for_timeout_less_then_0_or_0_future_done(self):
 | |
|         loop = asyncio.get_running_loop()
 | |
| 
 | |
|         fut = loop.create_future()
 | |
|         fut.set_result('done')
 | |
| 
 | |
|         t0 = loop.time()
 | |
|         ret = await asyncio.wait_for(fut, 0)
 | |
|         t1 = loop.time()
 | |
| 
 | |
|         self.assertEqual(ret, 'done')
 | |
|         self.assertTrue(fut.done())
 | |
|         self.assertLess(t1 - t0, 0.1)
 | |
| 
 | |
|     async def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
 | |
|         loop = asyncio.get_running_loop()
 | |
| 
 | |
|         foo_started = False
 | |
| 
 | |
|         async def foo():
 | |
|             nonlocal foo_started
 | |
|             foo_started = True
 | |
| 
 | |
|         with self.assertRaises(asyncio.TimeoutError):
 | |
|             t0 = loop.time()
 | |
|             await asyncio.wait_for(foo(), 0)
 | |
|         t1 = loop.time()
 | |
| 
 | |
|         self.assertEqual(foo_started, False)
 | |
|         self.assertLess(t1 - t0, 0.1)
 | |
| 
 | |
|     async def test_wait_for_timeout_less_then_0_or_0(self):
 | |
|         loop = asyncio.get_running_loop()
 | |
| 
 | |
|         for timeout in [0, -1]:
 | |
|             with self.subTest(timeout=timeout):
 | |
|                 foo_running = None
 | |
|                 started = loop.create_future()
 | |
| 
 | |
|                 async def foo():
 | |
|                     nonlocal foo_running
 | |
|                     foo_running = True
 | |
|                     started.set_result(None)
 | |
|                     try:
 | |
|                         await asyncio.sleep(10)
 | |
|                     finally:
 | |
|                         foo_running = False
 | |
|                     return 'done'
 | |
| 
 | |
|                 fut = asyncio.create_task(foo())
 | |
|                 await started
 | |
| 
 | |
|                 with self.assertRaises(asyncio.TimeoutError):
 | |
|                     t0 = loop.time()
 | |
|                     await asyncio.wait_for(fut, timeout)
 | |
|                 t1 = loop.time()
 | |
| 
 | |
|                 self.assertTrue(fut.done())
 | |
|                 # it should have been cancelled due to the timeout
 | |
|                 self.assertTrue(fut.cancelled())
 | |
|                 self.assertEqual(foo_running, False)
 | |
|                 self.assertLess(t1 - t0, 0.1)
 | |
| 
 | |
|     async def test_wait_for(self):
 | |
|         loop = asyncio.get_running_loop()
 | |
|         foo_running = None
 | |
| 
 | |
|         async def foo():
 | |
|             nonlocal foo_running
 | |
|             foo_running = True
 | |
|             try:
 | |
|                 await asyncio.sleep(10)
 | |
|             finally:
 | |
|                 foo_running = False
 | |
|             return 'done'
 | |
| 
 | |
|         fut = asyncio.create_task(foo())
 | |
| 
 | |
|         with self.assertRaises(asyncio.TimeoutError):
 | |
|             t0 = loop.time()
 | |
|             await asyncio.wait_for(fut, 0.1)
 | |
|         t1 = loop.time()
 | |
|         self.assertTrue(fut.done())
 | |
|         # it should have been cancelled due to the timeout
 | |
|         self.assertTrue(fut.cancelled())
 | |
|         self.assertLess(t1 - t0, 0.2)
 | |
|         self.assertEqual(foo_running, False)
 | |
| 
 | |
|     async def test_wait_for_blocking(self):
 | |
|         async def coro():
 | |
|             return 'done'
 | |
| 
 | |
|         res = await asyncio.wait_for(coro(), timeout=None)
 | |
|         self.assertEqual(res, 'done')
 | |
| 
 | |
|     async def test_wait_for_race_condition(self):
 | |
|         loop = asyncio.get_running_loop()
 | |
| 
 | |
|         fut = loop.create_future()
 | |
|         task = asyncio.wait_for(fut, timeout=0.2)
 | |
|         loop.call_later(0.1, fut.set_result, "ok")
 | |
|         res = await task
 | |
|         self.assertEqual(res, "ok")
 | |
| 
 | |
|     async def test_wait_for_cancellation_race_condition(self):
 | |
|         async def inner():
 | |
|             with self.assertRaises(asyncio.CancelledError):
 | |
|                 await asyncio.sleep(1)
 | |
|             return 1
 | |
| 
 | |
|         result = await asyncio.wait_for(inner(), timeout=.01)
 | |
|         self.assertEqual(result, 1)
 | |
| 
 | |
|     async def test_wait_for_waits_for_task_cancellation(self):
 | |
|         task_done = False
 | |
| 
 | |
|         async def inner():
 | |
|             nonlocal task_done
 | |
|             try:
 | |
|                 await asyncio.sleep(10)
 | |
|             except asyncio.CancelledError:
 | |
|                 await asyncio.sleep(_EPSILON)
 | |
|                 raise
 | |
|             finally:
 | |
|                 task_done = True
 | |
| 
 | |
|         inner_task = asyncio.create_task(inner())
 | |
| 
 | |
|         with self.assertRaises(asyncio.TimeoutError) as cm:
 | |
|             await asyncio.wait_for(inner_task, timeout=_EPSILON)
 | |
| 
 | |
|         self.assertTrue(task_done)
 | |
|         chained = cm.exception.__context__
 | |
|         self.assertEqual(type(chained), asyncio.CancelledError)
 | |
| 
 | |
|     async def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
 | |
|         task_done = False
 | |
| 
 | |
|         async def foo():
 | |
|             async def inner():
 | |
|                 nonlocal task_done
 | |
|                 try:
 | |
|                     await asyncio.sleep(10)
 | |
|                 except asyncio.CancelledError:
 | |
|                     await asyncio.sleep(_EPSILON)
 | |
|                     raise
 | |
|                 finally:
 | |
|                     task_done = True
 | |
| 
 | |
|             inner_task = asyncio.create_task(inner())
 | |
|             await asyncio.sleep(_EPSILON)
 | |
|             await asyncio.wait_for(inner_task, timeout=0)
 | |
| 
 | |
|         with self.assertRaises(asyncio.TimeoutError) as cm:
 | |
|             await foo()
 | |
| 
 | |
|         self.assertTrue(task_done)
 | |
|         chained = cm.exception.__context__
 | |
|         self.assertEqual(type(chained), asyncio.CancelledError)
 | |
| 
 | |
|     async def test_wait_for_reraises_exception_during_cancellation(self):
 | |
|         class FooException(Exception):
 | |
|             pass
 | |
| 
 | |
|         async def foo():
 | |
|             async def inner():
 | |
|                 try:
 | |
|                     await asyncio.sleep(0.2)
 | |
|                 finally:
 | |
|                     raise FooException
 | |
| 
 | |
|             inner_task = asyncio.create_task(inner())
 | |
| 
 | |
|             await asyncio.wait_for(inner_task, timeout=_EPSILON)
 | |
| 
 | |
|         with self.assertRaises(FooException):
 | |
|             await foo()
 | |
| 
 | |
|     async def test_wait_for_self_cancellation(self):
 | |
|         async def inner():
 | |
|             try:
 | |
|                 await asyncio.sleep(0.3)
 | |
|             except asyncio.CancelledError:
 | |
|                 try:
 | |
|                     await asyncio.sleep(0.3)
 | |
|                 except asyncio.CancelledError:
 | |
|                     await asyncio.sleep(0.3)
 | |
| 
 | |
|             return 42
 | |
| 
 | |
|         inner_task = asyncio.create_task(inner())
 | |
| 
 | |
|         wait = asyncio.wait_for(inner_task, timeout=0.1)
 | |
| 
 | |
|         # Test that wait_for itself is properly cancellable
 | |
|         # even when the initial task holds up the initial cancellation.
 | |
|         task = asyncio.create_task(wait)
 | |
|         await asyncio.sleep(0.2)
 | |
|         task.cancel()
 | |
| 
 | |
|         with self.assertRaises(asyncio.CancelledError):
 | |
|             await task
 | |
| 
 | |
|         self.assertEqual(await inner_task, 42)
 | |
| 
 | |
|     async def _test_cancel_wait_for(self, timeout):
 | |
|         loop = asyncio.get_running_loop()
 | |
| 
 | |
|         async def blocking_coroutine():
 | |
|             fut = loop.create_future()
 | |
|             # Block: fut result is never set
 | |
|             await fut
 | |
| 
 | |
|         task = asyncio.create_task(blocking_coroutine())
 | |
| 
 | |
|         wait = asyncio.create_task(asyncio.wait_for(task, timeout))
 | |
|         loop.call_soon(wait.cancel)
 | |
| 
 | |
|         with self.assertRaises(asyncio.CancelledError):
 | |
|             await wait
 | |
| 
 | |
|         # Python issue #23219: cancelling the wait must also cancel the task
 | |
|         self.assertTrue(task.cancelled())
 | |
| 
 | |
|     async def test_cancel_blocking_wait_for(self):
 | |
|         await self._test_cancel_wait_for(None)
 | |
| 
 | |
|     async def test_cancel_wait_for(self):
 | |
|         await self._test_cancel_wait_for(60.0)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     unittest.main()
 |