| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | .. currentmodule:: asyncio
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-14 14:57:39 -07:00
										 |  |  | .. _asyncio-queues:
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  | 
 | 
					
						
							|  |  |  | ======
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | Queues
 | 
					
						
							|  |  |  | ======
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-10-10 19:18:46 -04:00
										 |  |  | **Source code:** :source:`Lib/asyncio/queues.py`
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ------------------------------------------------
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  | asyncio queues are designed to be similar to classes of the
 | 
					
						
							|  |  |  | :mod:`queue` module.  Although asyncio queues are not thread-safe,
 | 
					
						
							|  |  |  | they are designed to be used specifically in async/await code.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-17 19:16:44 -04:00
										 |  |  | Note that methods of asyncio queues don't have a *timeout* parameter;
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  | use :func:`asyncio.wait_for` function to do queue operations with a
 | 
					
						
							|  |  |  | timeout.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  | See also the `Examples`_ section below.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | Queue
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  | =====
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | .. class:: Queue(maxsize=0, \*, loop=None)
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |    A first in, first out (FIFO) queue.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |    If *maxsize* is less than or equal to zero, the queue size is
 | 
					
						
							|  |  |  |    infinite.  If it is an integer greater than ``0``, then
 | 
					
						
							|  |  |  |    ``await put()`` blocks when the queue reaches *maxsize*
 | 
					
						
							|  |  |  |    until an item is removed by :meth:`get`.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |    Unlike the standard library threading :mod:`queue`, the size of
 | 
					
						
							|  |  |  |    the queue is always known and can be returned by calling the
 | 
					
						
							|  |  |  |    :meth:`qsize` method.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2019-09-10 08:46:12 -03:00
										 |  |  |    .. deprecated-removed:: 3.8 3.10
 | 
					
						
							|  |  |  |       The *loop* parameter.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2015-02-25 14:24:15 +01:00
										 |  |  |    This class is :ref:`not thread safe <asyncio-multithreading>`.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |    .. attribute:: maxsize
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       Number of items allowed in the queue.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |    .. method:: empty()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       Return ``True`` if the queue is empty, ``False`` otherwise.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    .. method:: full()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       Return ``True`` if there are :attr:`maxsize` items in the queue.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |       If the queue was initialized with ``maxsize=0`` (the default),
 | 
					
						
							|  |  |  |       then :meth:`full()` never returns ``True``.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |    .. coroutinemethod:: get()
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |       Remove and return an item from the queue. If queue is empty,
 | 
					
						
							|  |  |  |       wait until an item is available.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |    .. method:: get_nowait()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       Return an item if one is immediately available, else raise
 | 
					
						
							|  |  |  |       :exc:`QueueEmpty`.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    .. coroutinemethod:: join()
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-14 10:06:55 -07:00
										 |  |  |       Block until all items in the queue have been received and processed.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |       The count of unfinished tasks goes up whenever an item is added
 | 
					
						
							| 
									
										
										
										
											2019-01-17 13:52:17 +02:00
										 |  |  |       to the queue. The count goes down whenever a consumer coroutine calls
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |       :meth:`task_done` to indicate that the item was retrieved and all
 | 
					
						
							|  |  |  |       work on it is complete.  When the count of unfinished tasks drops
 | 
					
						
							|  |  |  |       to zero, :meth:`join` unblocks.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |    .. coroutinemethod:: put(item)
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |       Put an item into the queue. If the queue is full, wait until a
 | 
					
						
							| 
									
										
										
										
											2018-09-17 19:16:44 -04:00
										 |  |  |       free slot is available before adding the item.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |    .. method:: put_nowait(item)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       Put an item into the queue without blocking.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       If no free slot is immediately available, raise :exc:`QueueFull`.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    .. method:: qsize()
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-17 19:16:44 -04:00
										 |  |  |       Return the number of items in the queue.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  |    .. method:: task_done()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       Indicate that a formerly enqueued task is complete.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |       Used by queue consumers. For each :meth:`~Queue.get` used to
 | 
					
						
							|  |  |  |       fetch a task, a subsequent call to :meth:`task_done` tells the
 | 
					
						
							|  |  |  |       queue that the processing on the task is complete.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |       If a :meth:`join` is currently blocking, it will resume when all
 | 
					
						
							|  |  |  |       items have been processed (meaning that a :meth:`task_done`
 | 
					
						
							|  |  |  |       call was received for every item that had been :meth:`~Queue.put`
 | 
					
						
							|  |  |  |       into the queue).
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |       Raises :exc:`ValueError` if called more times than there were
 | 
					
						
							|  |  |  |       items placed in the queue.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  | Priority Queue
 | 
					
						
							|  |  |  | ==============
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | .. class:: PriorityQueue
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |    A variant of :class:`Queue`; retrieves entries in priority order
 | 
					
						
							|  |  |  |    (lowest first).
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |    Entries are typically tuples of the form
 | 
					
						
							|  |  |  |    ``(priority_number, data)``.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  | LIFO Queue
 | 
					
						
							|  |  |  | ==========
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | .. class:: LifoQueue
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |    A variant of :class:`Queue` that retrieves most recently added
 | 
					
						
							|  |  |  |    entries first (last in, first out).
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Exceptions
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  | ==========
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | .. exception:: QueueEmpty
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |    This exception is raised when the :meth:`~Queue.get_nowait` method
 | 
					
						
							|  |  |  |    is called on an empty queue.
 | 
					
						
							| 
									
										
										
										
											2015-02-25 13:55:43 +01:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | .. exception:: QueueFull
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  |    Exception raised when the :meth:`~Queue.put_nowait` method is called
 | 
					
						
							|  |  |  |    on a queue that has reached its *maxsize*.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Examples
 | 
					
						
							|  |  |  | ========
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-14 15:11:24 -07:00
										 |  |  | .. _asyncio_example_queue_dist:
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2018-09-11 09:54:40 -07:00
										 |  |  | Queues can be used to distribute workload between several
 | 
					
						
							|  |  |  | concurrent tasks::
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    import asyncio
 | 
					
						
							|  |  |  |    import random
 | 
					
						
							|  |  |  |    import time
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    async def worker(name, queue):
 | 
					
						
							|  |  |  |        while True:
 | 
					
						
							|  |  |  |            # Get a "work item" out of the queue.
 | 
					
						
							|  |  |  |            sleep_for = await queue.get()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |            # Sleep for the "sleep_for" seconds.
 | 
					
						
							|  |  |  |            await asyncio.sleep(sleep_for)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |            # Notify the queue that the "work item" has been processed.
 | 
					
						
							|  |  |  |            queue.task_done()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |            print(f'{name} has slept for {sleep_for:.2f} seconds')
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    async def main():
 | 
					
						
							|  |  |  |        # Create a queue that we will use to store our "workload".
 | 
					
						
							|  |  |  |        queue = asyncio.Queue()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        # Generate random timings and put them into the queue.
 | 
					
						
							|  |  |  |        total_sleep_time = 0
 | 
					
						
							|  |  |  |        for _ in range(20):
 | 
					
						
							|  |  |  |            sleep_for = random.uniform(0.05, 1.0)
 | 
					
						
							|  |  |  |            total_sleep_time += sleep_for
 | 
					
						
							|  |  |  |            queue.put_nowait(sleep_for)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        # Create three worker tasks to process the queue concurrently.
 | 
					
						
							|  |  |  |        tasks = []
 | 
					
						
							|  |  |  |        for i in range(3):
 | 
					
						
							|  |  |  |            task = asyncio.create_task(worker(f'worker-{i}', queue))
 | 
					
						
							|  |  |  |            tasks.append(task)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        # Wait until the queue is fully processed.
 | 
					
						
							|  |  |  |        started_at = time.monotonic()
 | 
					
						
							|  |  |  |        await queue.join()
 | 
					
						
							|  |  |  |        total_slept_for = time.monotonic() - started_at
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        # Cancel our worker tasks.
 | 
					
						
							|  |  |  |        for task in tasks:
 | 
					
						
							|  |  |  |            task.cancel()
 | 
					
						
							|  |  |  |        # Wait until all worker tasks are cancelled.
 | 
					
						
							|  |  |  |        await asyncio.gather(*tasks, return_exceptions=True)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |        print('====')
 | 
					
						
							|  |  |  |        print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
 | 
					
						
							|  |  |  |        print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |    asyncio.run(main())
 |