| 
									
										
										
										
											2000-02-02 15:10:15 +00:00
										 |  |  | """A multi-producer, multi-consumer queue.""" | 
					
						
							| 
									
										
										
										
											1992-08-25 12:30:44 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  | from time import time as _time | 
					
						
							| 
									
										
										
										
											2004-01-29 06:37:52 +00:00
										 |  |  | from collections import deque | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2003-07-01 05:34:27 +00:00
										 |  |  | __all__ = ['Empty', 'Full', 'Queue'] | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2001-01-15 22:53:46 +00:00
										 |  |  | class Empty(Exception): | 
					
						
							|  |  |  |     "Exception raised by Queue.get(block=0)/get_nowait()." | 
					
						
							|  |  |  |     pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Full(Exception): | 
					
						
							|  |  |  |     "Exception raised by Queue.put(block=0)/put_nowait()." | 
					
						
							|  |  |  |     pass | 
					
						
							| 
									
										
										
										
											1992-08-25 12:30:44 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | class Queue: | 
					
						
							| 
									
										
										
										
											2006-06-10 14:09:11 +00:00
										 |  |  |     """Create a queue object with a given maximum size.
 | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2006-06-10 14:09:11 +00:00
										 |  |  |     If maxsize is <= 0, the queue size is infinite. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     def __init__(self, maxsize=0): | 
					
						
							| 
									
										
										
										
											2002-12-30 22:36:09 +00:00
										 |  |  |         try: | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |             import threading | 
					
						
							| 
									
										
										
										
											2002-12-30 22:36:09 +00:00
										 |  |  |         except ImportError: | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |             import dummy_threading as threading | 
					
						
							| 
									
										
										
										
											2008-01-15 20:52:42 +00:00
										 |  |  |         self.maxsize = maxsize | 
					
						
							| 
									
										
										
										
											1998-03-26 21:13:24 +00:00
										 |  |  |         self._init(maxsize) | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |         # mutex must be held whenever the queue is mutating.  All methods | 
					
						
							|  |  |  |         # that acquire mutex must release it before returning.  mutex | 
					
						
							| 
									
										
										
										
											2006-11-23 21:35:19 +00:00
										 |  |  |         # is shared between the three conditions, so acquiring and | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |         # releasing the conditions also acquires and releases mutex. | 
					
						
							|  |  |  |         self.mutex = threading.Lock() | 
					
						
							|  |  |  |         # Notify not_empty whenever an item is added to the queue; a | 
					
						
							|  |  |  |         # thread waiting to get is notified then. | 
					
						
							|  |  |  |         self.not_empty = threading.Condition(self.mutex) | 
					
						
							|  |  |  |         # Notify not_full whenever an item is removed from the queue; | 
					
						
							|  |  |  |         # a thread waiting to put is notified then. | 
					
						
							|  |  |  |         self.not_full = threading.Condition(self.mutex) | 
					
						
							| 
									
										
										
										
											2006-03-24 20:43:29 +00:00
										 |  |  |         # Notify all_tasks_done whenever the number of unfinished tasks | 
					
						
							|  |  |  |         # drops to zero; thread waiting to join() is notified to resume | 
					
						
							|  |  |  |         self.all_tasks_done = threading.Condition(self.mutex) | 
					
						
							|  |  |  |         self.unfinished_tasks = 0 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def task_done(self): | 
					
						
							|  |  |  |         """Indicate that a formerly enqueued task is complete.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Used by Queue consumer threads.  For each get() used to fetch a task, | 
					
						
							|  |  |  |         a subsequent call to task_done() tells the queue that the processing | 
					
						
							|  |  |  |         on the task is complete. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         If a join() is currently blocking, it will resume when all items | 
					
						
							|  |  |  |         have been processed (meaning that a task_done() call was received | 
					
						
							|  |  |  |         for every item that had been put() into the queue). | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Raises a ValueError if called more times than there were items | 
					
						
							|  |  |  |         placed in the queue. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         self.all_tasks_done.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							| 
									
										
										
										
											2006-03-25 12:15:04 +00:00
										 |  |  |             unfinished = self.unfinished_tasks - 1 | 
					
						
							| 
									
										
										
										
											2006-03-24 20:43:29 +00:00
										 |  |  |             if unfinished <= 0: | 
					
						
							|  |  |  |                 if unfinished < 0: | 
					
						
							| 
									
										
										
										
											2006-03-25 01:50:43 +00:00
										 |  |  |                     raise ValueError('task_done() called too many times') | 
					
						
							| 
									
										
										
										
											2006-03-24 20:43:29 +00:00
										 |  |  |                 self.all_tasks_done.notifyAll() | 
					
						
							| 
									
										
										
										
											2006-03-25 12:15:04 +00:00
										 |  |  |             self.unfinished_tasks = unfinished | 
					
						
							| 
									
										
										
										
											2006-03-24 20:43:29 +00:00
										 |  |  |         finally: | 
					
						
							|  |  |  |             self.all_tasks_done.release() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def join(self): | 
					
						
							|  |  |  |         """Blocks until all items in the Queue have been gotten and processed.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         The count of unfinished tasks goes up whenever an item is added to the | 
					
						
							|  |  |  |         queue. The count goes down whenever a consumer thread calls task_done() | 
					
						
							|  |  |  |         to indicate the item was retrieved and all work on it is complete. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         When the count of unfinished tasks drops to zero, join() unblocks. | 
					
						
							|  |  |  |         """
 | 
					
						
							|  |  |  |         self.all_tasks_done.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             while self.unfinished_tasks: | 
					
						
							|  |  |  |                 self.all_tasks_done.wait() | 
					
						
							|  |  |  |         finally: | 
					
						
							|  |  |  |             self.all_tasks_done.release() | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def qsize(self): | 
					
						
							| 
									
										
										
										
											1999-02-08 18:34:01 +00:00
										 |  |  |         """Return the approximate size of the queue (not reliable!).""" | 
					
						
							| 
									
										
										
										
											1998-04-29 14:29:32 +00:00
										 |  |  |         self.mutex.acquire() | 
					
						
							| 
									
										
										
										
											1998-03-26 21:13:24 +00:00
										 |  |  |         n = self._qsize() | 
					
						
							| 
									
										
										
										
											1998-04-29 14:29:32 +00:00
										 |  |  |         self.mutex.release() | 
					
						
							| 
									
										
										
										
											1998-03-26 21:13:24 +00:00
										 |  |  |         return n | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def empty(self): | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |         """Return True if the queue is empty, False otherwise (not reliable!).""" | 
					
						
							| 
									
										
										
										
											1998-04-29 14:29:32 +00:00
										 |  |  |         self.mutex.acquire() | 
					
						
							| 
									
										
										
										
											2008-01-15 20:52:42 +00:00
										 |  |  |         n = not self._qsize() | 
					
						
							| 
									
										
										
										
											1998-04-29 14:29:32 +00:00
										 |  |  |         self.mutex.release() | 
					
						
							| 
									
										
										
										
											1998-03-26 21:13:24 +00:00
										 |  |  |         return n | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def full(self): | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |         """Return True if the queue is full, False otherwise (not reliable!).""" | 
					
						
							| 
									
										
										
										
											1998-04-29 14:29:32 +00:00
										 |  |  |         self.mutex.acquire() | 
					
						
							| 
									
										
										
										
											2008-01-15 20:52:42 +00:00
										 |  |  |         n = 0 < self.maxsize == self._qsize() | 
					
						
							| 
									
										
										
										
											1998-04-29 14:29:32 +00:00
										 |  |  |         self.mutex.release() | 
					
						
							| 
									
										
										
										
											1998-03-26 21:13:24 +00:00
										 |  |  |         return n | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |     def put(self, item, block=True, timeout=None): | 
					
						
							| 
									
										
										
										
											1998-04-09 14:25:32 +00:00
										 |  |  |         """Put an item into the queue.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |         If optional args 'block' is true and 'timeout' is None (the default), | 
					
						
							|  |  |  |         block if necessary until a free slot is available. If 'timeout' is | 
					
						
							|  |  |  |         a positive number, it blocks at most 'timeout' seconds and raises | 
					
						
							|  |  |  |         the Full exception if no free slot was available within that time. | 
					
						
							|  |  |  |         Otherwise ('block' is false), put an item on the queue if a free slot | 
					
						
							|  |  |  |         is immediately available, else raise the Full exception ('timeout' | 
					
						
							|  |  |  |         is ignored in that case). | 
					
						
							| 
									
										
										
										
											1999-02-08 18:34:01 +00:00
										 |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |         self.not_full.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							| 
									
										
										
										
											2008-01-15 20:52:42 +00:00
										 |  |  |             if self.maxsize > 0: | 
					
						
							|  |  |  |                 if not block: | 
					
						
							|  |  |  |                     if self._qsize() == self.maxsize: | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |                         raise Full | 
					
						
							| 
									
										
										
										
											2008-01-15 20:52:42 +00:00
										 |  |  |                 elif timeout is None: | 
					
						
							|  |  |  |                     while self._qsize() == self.maxsize: | 
					
						
							|  |  |  |                         self.not_full.wait() | 
					
						
							|  |  |  |                 elif timeout < 0: | 
					
						
							|  |  |  |                     raise ValueError("'timeout' must be a positive number") | 
					
						
							|  |  |  |                 else: | 
					
						
							|  |  |  |                     endtime = _time() + timeout | 
					
						
							|  |  |  |                     while self._qsize() == self.maxsize: | 
					
						
							|  |  |  |                         remaining = endtime - _time() | 
					
						
							|  |  |  |                         if remaining <= 0.0: | 
					
						
							|  |  |  |                             raise Full | 
					
						
							|  |  |  |                         self.not_full.wait(remaining) | 
					
						
							| 
									
										
										
										
											2002-04-19 00:11:32 +00:00
										 |  |  |             self._put(item) | 
					
						
							| 
									
										
										
										
											2006-03-24 20:43:29 +00:00
										 |  |  |             self.unfinished_tasks += 1 | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |             self.not_empty.notify() | 
					
						
							| 
									
										
										
										
											2002-04-19 00:11:32 +00:00
										 |  |  |         finally: | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |             self.not_full.release() | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											1999-02-08 18:34:01 +00:00
										 |  |  |     def put_nowait(self, item): | 
					
						
							|  |  |  |         """Put an item into the queue without blocking.
 | 
					
						
							| 
									
										
										
										
											1998-04-09 14:25:32 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											1999-02-08 18:34:01 +00:00
										 |  |  |         Only enqueue the item if a free slot is immediately available. | 
					
						
							|  |  |  |         Otherwise raise the Full exception. | 
					
						
							| 
									
										
										
										
											1998-03-26 21:13:24 +00:00
										 |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2004-07-12 01:20:32 +00:00
										 |  |  |         return self.put(item, False) | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |     def get(self, block=True, timeout=None): | 
					
						
							| 
									
										
										
										
											1999-02-08 18:34:01 +00:00
										 |  |  |         """Remove and return an item from the queue.
 | 
					
						
							| 
									
										
										
										
											1998-04-09 14:25:32 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |         If optional args 'block' is true and 'timeout' is None (the default), | 
					
						
							|  |  |  |         block if necessary until an item is available. If 'timeout' is | 
					
						
							|  |  |  |         a positive number, it blocks at most 'timeout' seconds and raises | 
					
						
							|  |  |  |         the Empty exception if no item was available within that time. | 
					
						
							|  |  |  |         Otherwise ('block' is false), return an item if one is immediately | 
					
						
							|  |  |  |         available, else raise the Empty exception ('timeout' is ignored | 
					
						
							|  |  |  |         in that case). | 
					
						
							| 
									
										
										
										
											1998-03-26 21:13:24 +00:00
										 |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |         self.not_empty.acquire() | 
					
						
							|  |  |  |         try: | 
					
						
							| 
									
										
										
										
											2004-07-12 01:20:32 +00:00
										 |  |  |             if not block: | 
					
						
							| 
									
										
										
										
											2008-01-15 20:52:42 +00:00
										 |  |  |                 if not self._qsize(): | 
					
						
							| 
									
										
										
										
											2004-07-12 01:20:32 +00:00
										 |  |  |                     raise Empty | 
					
						
							|  |  |  |             elif timeout is None: | 
					
						
							| 
									
										
										
										
											2008-01-15 20:52:42 +00:00
										 |  |  |                 while not self._qsize(): | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |                     self.not_empty.wait() | 
					
						
							| 
									
										
										
										
											2008-01-15 20:52:42 +00:00
										 |  |  |             elif timeout < 0: | 
					
						
							|  |  |  |                 raise ValueError("'timeout' must be a positive number") | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |             else: | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |                 endtime = _time() + timeout | 
					
						
							| 
									
										
										
										
											2008-01-15 20:52:42 +00:00
										 |  |  |                 while not self._qsize(): | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |                     remaining = endtime - _time() | 
					
						
							| 
									
										
										
										
											2004-07-12 01:20:32 +00:00
										 |  |  |                     if remaining <= 0.0: | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |                         raise Empty | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |                     self.not_empty.wait(remaining) | 
					
						
							| 
									
										
										
										
											2002-04-19 00:11:32 +00:00
										 |  |  |             item = self._get() | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |             self.not_full.notify() | 
					
						
							|  |  |  |             return item | 
					
						
							| 
									
										
										
										
											2002-04-19 00:11:32 +00:00
										 |  |  |         finally: | 
					
						
							| 
									
										
										
										
											2004-07-12 00:45:14 +00:00
										 |  |  |             self.not_empty.release() | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											1999-02-08 18:34:01 +00:00
										 |  |  |     def get_nowait(self): | 
					
						
							|  |  |  |         """Remove and return an item from the queue without blocking.
 | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2002-10-15 15:11:13 +00:00
										 |  |  |         Only get an item if one is immediately available. Otherwise | 
					
						
							| 
									
										
										
										
											1999-02-08 18:34:01 +00:00
										 |  |  |         raise the Empty exception. | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2004-07-12 01:20:32 +00:00
										 |  |  |         return self.get(False) | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     # Override these methods to implement other queue organizations | 
					
						
							|  |  |  |     # (e.g. stack or priority queue). | 
					
						
							|  |  |  |     # These will only be called with appropriate locks held | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Initialize the queue representation | 
					
						
							|  |  |  |     def _init(self, maxsize): | 
					
						
							| 
									
										
										
										
											2004-01-29 06:37:52 +00:00
										 |  |  |         self.queue = deque() | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def _qsize(self): | 
					
						
							| 
									
										
										
										
											1998-03-26 21:13:24 +00:00
										 |  |  |         return len(self.queue) | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     # Put a new item in the queue | 
					
						
							|  |  |  |     def _put(self, item): | 
					
						
							| 
									
										
										
										
											1998-03-26 21:13:24 +00:00
										 |  |  |         self.queue.append(item) | 
					
						
							| 
									
										
										
										
											1997-11-20 19:56:38 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     # Get an item from the queue | 
					
						
							|  |  |  |     def _get(self): | 
					
						
							| 
									
										
										
										
											2004-01-29 06:37:52 +00:00
										 |  |  |         return self.queue.popleft() |