mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 03:04:41 +00:00 
			
		
		
		
	bpo-32574: Fix leaks in asyncio.Queue.put() and .get() (#5208)
This commit is contained in:
		
							parent
							
								
									c9070d03f5
								
							
						
					
					
						commit
						c47dacb690
					
				
					 3 changed files with 63 additions and 2 deletions
				
			
		|  | @ -121,6 +121,13 @@ async def put(self, item): | ||||||
|                 await putter |                 await putter | ||||||
|             except: |             except: | ||||||
|                 putter.cancel()  # Just in case putter is not done yet. |                 putter.cancel()  # Just in case putter is not done yet. | ||||||
|  |                 try: | ||||||
|  |                     # Clean self._putters from canceled putters. | ||||||
|  |                     self._putters.remove(putter) | ||||||
|  |                 except ValueError: | ||||||
|  |                     # The putter could be removed from self._putters by a | ||||||
|  |                     # previous get_nowait call. | ||||||
|  |                     pass | ||||||
|                 if not self.full() and not putter.cancelled(): |                 if not self.full() and not putter.cancelled(): | ||||||
|                     # We were woken up by get_nowait(), but can't take |                     # We were woken up by get_nowait(), but can't take | ||||||
|                     # the call.  Wake up the next in line. |                     # the call.  Wake up the next in line. | ||||||
|  | @ -152,12 +159,13 @@ async def get(self): | ||||||
|                 await getter |                 await getter | ||||||
|             except: |             except: | ||||||
|                 getter.cancel()  # Just in case getter is not done yet. |                 getter.cancel()  # Just in case getter is not done yet. | ||||||
| 
 |  | ||||||
|                 try: |                 try: | ||||||
|  |                     # Clean self._getters from canceled getters. | ||||||
|                     self._getters.remove(getter) |                     self._getters.remove(getter) | ||||||
|                 except ValueError: |                 except ValueError: | ||||||
|  |                     # The getter could be removed from self._getters by a | ||||||
|  |                     # previous put_nowait call. | ||||||
|                     pass |                     pass | ||||||
| 
 |  | ||||||
|                 if not self.empty() and not getter.cancelled(): |                 if not self.empty() and not getter.cancelled(): | ||||||
|                     # We were woken up by put_nowait(), but can't take |                     # We were woken up by put_nowait(), but can't take | ||||||
|                     # the call.  Wake up the next in line. |                     # the call.  Wake up the next in line. | ||||||
|  |  | ||||||
|  | @ -520,6 +520,56 @@ async def getter(): | ||||||
|         self.loop.run_until_complete( |         self.loop.run_until_complete( | ||||||
|             asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop)) |             asyncio.gather(getter(), t0, t1, t2, t3, loop=self.loop)) | ||||||
| 
 | 
 | ||||||
|  |     def test_cancelled_puts_not_being_held_in_self_putters(self): | ||||||
|  |         def a_generator(): | ||||||
|  |             yield 0.01 | ||||||
|  |             yield 0.1 | ||||||
|  | 
 | ||||||
|  |         loop = self.new_test_loop(a_generator) | ||||||
|  | 
 | ||||||
|  |         # Full queue. | ||||||
|  |         queue = asyncio.Queue(loop=loop, maxsize=1) | ||||||
|  |         queue.put_nowait(1) | ||||||
|  | 
 | ||||||
|  |         # Task waiting for space to put an item in the queue. | ||||||
|  |         put_task = loop.create_task(queue.put(1)) | ||||||
|  |         loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) | ||||||
|  | 
 | ||||||
|  |         # Check that the putter is correctly removed from queue._putters when | ||||||
|  |         # the task is canceled. | ||||||
|  |         self.assertEqual(len(queue._putters), 1) | ||||||
|  |         put_task.cancel() | ||||||
|  |         with self.assertRaises(asyncio.CancelledError): | ||||||
|  |             loop.run_until_complete(put_task) | ||||||
|  |         self.assertEqual(len(queue._putters), 0) | ||||||
|  | 
 | ||||||
|  |     def test_cancelled_put_silence_value_error_exception(self): | ||||||
|  |         def gen(): | ||||||
|  |             yield 0.01 | ||||||
|  |             yield 0.1 | ||||||
|  | 
 | ||||||
|  |         loop = self.new_test_loop(gen) | ||||||
|  | 
 | ||||||
|  |         # Full Queue. | ||||||
|  |         queue = asyncio.Queue(1, loop=loop) | ||||||
|  |         queue.put_nowait(1) | ||||||
|  | 
 | ||||||
|  |         # Task waiting for space to put a item in the queue. | ||||||
|  |         put_task = loop.create_task(queue.put(1)) | ||||||
|  |         loop.run_until_complete(asyncio.sleep(0.01, loop=loop)) | ||||||
|  | 
 | ||||||
|  |         # get_nowait() remove the future of put_task from queue._putters. | ||||||
|  |         queue.get_nowait() | ||||||
|  |         # When canceled, queue.put is going to remove its future from | ||||||
|  |         # self._putters but it was removed previously by queue.get_nowait(). | ||||||
|  |         put_task.cancel() | ||||||
|  | 
 | ||||||
|  |         # The ValueError exception triggered by queue._putters.remove(putter) | ||||||
|  |         # inside queue.put should be silenced. | ||||||
|  |         # If the ValueError is silenced we should catch a CancelledError. | ||||||
|  |         with self.assertRaises(asyncio.CancelledError): | ||||||
|  |             loop.run_until_complete(put_task) | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| class LifoQueueTests(_QueueTestBase): | class LifoQueueTests(_QueueTestBase): | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -0,0 +1,3 @@ | ||||||
|  | Fix memory leak in asyncio.Queue, when the queue has limited size and it is | ||||||
|  | full, the cancelation of queue.put() can cause a memory leak. Patch by: José | ||||||
|  | Melero. | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 José Melero Fernández
						José Melero Fernández