| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | # Copyright 2009 Brian Quinlan. All Rights Reserved. | 
					
						
							|  |  |  | # Licensed to PSF under a Contributor Agreement. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | """Implements ProcessPoolExecutor.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | The follow diagram and text describe the data-flow through the system: | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | |======================= In-process =====================|== Out-of-process ==| | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | +----------+     +----------+       +--------+     +-----------+    +---------+ | 
					
						
							|  |  |  | |          |  => | Work Ids |    => |        |  => | Call Q    | => |         | | 
					
						
							|  |  |  | |          |     +----------+       |        |     +-----------+    |         | | 
					
						
							|  |  |  | |          |     | ...      |       |        |     | ...       |    |         | | 
					
						
							|  |  |  | |          |     | 6        |       |        |     | 5, call() |    |         | | 
					
						
							|  |  |  | |          |     | 7        |       |        |     | ...       |    |         | | 
					
						
							|  |  |  | | Process  |     | ...      |       | Local  |     +-----------+    | Process | | 
					
						
							|  |  |  | |  Pool    |     +----------+       | Worker |                      |  #1..n  | | 
					
						
							|  |  |  | | Executor |                        | Thread |                      |         | | 
					
						
							|  |  |  | |          |     +----------- +     |        |     +-----------+    |         | | 
					
						
							|  |  |  | |          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         | | 
					
						
							|  |  |  | |          |     +------------+     |        |     +-----------+    |         | | 
					
						
							|  |  |  | |          |     | 6: call()  |     |        |     | ...       |    |         | | 
					
						
							|  |  |  | |          |     |    future  |     |        |     | 4, result |    |         | | 
					
						
							|  |  |  | |          |     | ...        |     |        |     | 3, except |    |         | | 
					
						
							|  |  |  | +----------+     +------------+     +--------+     +-----------+    +---------+ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Executor.submit() called: | 
					
						
							|  |  |  | - creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict | 
					
						
							|  |  |  | - adds the id of the _WorkItem to the "Work Ids" queue | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Local worker thread: | 
					
						
							|  |  |  | - reads work ids from the "Work Ids" queue and looks up the corresponding | 
					
						
							|  |  |  |   WorkItem from the "Work Items" dict: if the work item has been cancelled then | 
					
						
							|  |  |  |   it is simply removed from the dict, otherwise it is repackaged as a | 
					
						
							|  |  |  |   _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" | 
					
						
							|  |  |  |   until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because | 
					
						
							|  |  |  |   calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). | 
					
						
							|  |  |  | - reads _ResultItems from "Result Q", updates the future stored in the | 
					
						
							|  |  |  |   "Work Items" dict and deletes the dict entry | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | Process #1..n: | 
					
						
							|  |  |  | - reads _CallItems from "Call Q", executes the calls, and puts the resulting | 
					
						
							|  |  |  |   _ResultItems in "Request Q" | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | __author__ = 'Brian Quinlan (brian@sweetapp.com)' | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import atexit | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  | import os | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | from concurrent.futures import _base | 
					
						
							|  |  |  | import queue | 
					
						
							|  |  |  | import multiprocessing | 
					
						
							| 
									
										
										
										
											2012-03-05 19:28:37 +01:00
										 |  |  | from multiprocessing.queues import SimpleQueue, Full | 
					
						
							|  |  |  | from multiprocessing.connection import wait | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | import threading | 
					
						
							|  |  |  | import weakref | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # Workers are created as daemon threads and processes. This is done to allow the | 
					
						
							|  |  |  | # interpreter to exit when there are still idle processes in a | 
					
						
							|  |  |  | # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, | 
					
						
							|  |  |  | # allowing workers to die with the interpreter has two undesirable properties: | 
					
						
							|  |  |  | #   - The workers would still be running during interpretor shutdown, | 
					
						
							|  |  |  | #     meaning that they would fail in unpredictable ways. | 
					
						
							|  |  |  | #   - The workers could be killed while evaluating a work item, which could | 
					
						
							|  |  |  | #     be bad if the callable being evaluated has external side-effects e.g. | 
					
						
							|  |  |  | #     writing to a file. | 
					
						
							|  |  |  | # | 
					
						
							|  |  |  | # To work around this problem, an exit handler is installed which tells the | 
					
						
							|  |  |  | # workers to exit when their work queues are empty and then waits until the | 
					
						
							|  |  |  | # threads/processes finish. | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-03-26 19:29:44 +01:00
										 |  |  | _threads_queues = weakref.WeakKeyDictionary() | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | _shutdown = False | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def _python_exit(): | 
					
						
							|  |  |  |     global _shutdown | 
					
						
							|  |  |  |     _shutdown = True | 
					
						
							| 
									
										
										
										
											2011-03-26 19:29:44 +01:00
										 |  |  |     items = list(_threads_queues.items()) | 
					
						
							|  |  |  |     for t, q in items: | 
					
						
							|  |  |  |         q.put(None) | 
					
						
							|  |  |  |     for t, q in items: | 
					
						
							|  |  |  |         t.join() | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | # Controls how many more calls than processes will be queued in the call queue. | 
					
						
							|  |  |  | # A smaller number will mean that processes spend more time idle waiting for | 
					
						
							|  |  |  | # work while a larger number will make Future.cancel() succeed less frequently | 
					
						
							|  |  |  | # (Futures in the call queue cannot be cancelled). | 
					
						
							|  |  |  | EXTRA_QUEUED_CALLS = 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class _WorkItem(object): | 
					
						
							|  |  |  |     def __init__(self, future, fn, args, kwargs): | 
					
						
							|  |  |  |         self.future = future | 
					
						
							|  |  |  |         self.fn = fn | 
					
						
							|  |  |  |         self.args = args | 
					
						
							|  |  |  |         self.kwargs = kwargs | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class _ResultItem(object): | 
					
						
							|  |  |  |     def __init__(self, work_id, exception=None, result=None): | 
					
						
							|  |  |  |         self.work_id = work_id | 
					
						
							|  |  |  |         self.exception = exception | 
					
						
							|  |  |  |         self.result = result | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class _CallItem(object): | 
					
						
							|  |  |  |     def __init__(self, work_id, fn, args, kwargs): | 
					
						
							|  |  |  |         self.work_id = work_id | 
					
						
							|  |  |  |         self.fn = fn | 
					
						
							|  |  |  |         self.args = args | 
					
						
							|  |  |  |         self.kwargs = kwargs | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-04-12 17:48:46 +02:00
										 |  |  | def _process_worker(call_queue, result_queue): | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |     """Evaluates calls from call_queue and places the results in result_queue.
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-12-09 18:08:43 +00:00
										 |  |  |     This worker is run in a separate process. | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |         call_queue: A multiprocessing.Queue of _CallItems that will be read and | 
					
						
							|  |  |  |             evaluated by the worker. | 
					
						
							|  |  |  |         result_queue: A multiprocessing.Queue of _ResultItems that will written | 
					
						
							|  |  |  |             to by the worker. | 
					
						
							|  |  |  |         shutdown: A multiprocessing.Event that will be set as a signal to the | 
					
						
							|  |  |  |             worker that it should exit when call_queue is empty. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     while True: | 
					
						
							| 
									
										
										
										
											2011-04-12 17:48:46 +02:00
										 |  |  |         call_item = call_queue.get(block=True) | 
					
						
							|  |  |  |         if call_item is None: | 
					
						
							|  |  |  |             # Wake up queue management thread | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |             result_queue.put(os.getpid()) | 
					
						
							| 
									
										
										
										
											2011-04-12 17:48:46 +02:00
										 |  |  |             return | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |         try: | 
					
						
							| 
									
										
										
										
											2011-04-12 17:48:46 +02:00
										 |  |  |             r = call_item.fn(*call_item.args, **call_item.kwargs) | 
					
						
							|  |  |  |         except BaseException as e: | 
					
						
							|  |  |  |             result_queue.put(_ResultItem(call_item.work_id, | 
					
						
							|  |  |  |                                          exception=e)) | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |         else: | 
					
						
							| 
									
										
										
										
											2011-04-12 17:48:46 +02:00
										 |  |  |             result_queue.put(_ResultItem(call_item.work_id, | 
					
						
							|  |  |  |                                          result=r)) | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  | def _add_call_item_to_queue(pending_work_items, | 
					
						
							|  |  |  |                             work_ids, | 
					
						
							|  |  |  |                             call_queue): | 
					
						
							|  |  |  |     """Fills call_queue with _WorkItems from pending_work_items.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This function never blocks. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |         pending_work_items: A dict mapping work ids to _WorkItems e.g. | 
					
						
							|  |  |  |             {5: <_WorkItem...>, 6: <_WorkItem...>, ...} | 
					
						
							|  |  |  |         work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids | 
					
						
							|  |  |  |             are consumed and the corresponding _WorkItems from | 
					
						
							|  |  |  |             pending_work_items are transformed into _CallItems and put in | 
					
						
							|  |  |  |             call_queue. | 
					
						
							|  |  |  |         call_queue: A multiprocessing.Queue that will be filled with _CallItems | 
					
						
							|  |  |  |             derived from _WorkItems. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     while True: | 
					
						
							|  |  |  |         if call_queue.full(): | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         try: | 
					
						
							|  |  |  |             work_id = work_ids.get(block=False) | 
					
						
							|  |  |  |         except queue.Empty: | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             work_item = pending_work_items[work_id] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             if work_item.future.set_running_or_notify_cancel(): | 
					
						
							|  |  |  |                 call_queue.put(_CallItem(work_id, | 
					
						
							|  |  |  |                                          work_item.fn, | 
					
						
							|  |  |  |                                          work_item.args, | 
					
						
							|  |  |  |                                          work_item.kwargs), | 
					
						
							|  |  |  |                                block=True) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 del pending_work_items[work_id] | 
					
						
							|  |  |  |                 continue | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-05-03 16:34:42 +02:00
										 |  |  | def _queue_management_worker(executor_reference, | 
					
						
							|  |  |  |                              processes, | 
					
						
							|  |  |  |                              pending_work_items, | 
					
						
							|  |  |  |                              work_ids_queue, | 
					
						
							|  |  |  |                              call_queue, | 
					
						
							|  |  |  |                              result_queue): | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |     """Manages the communication between this process and the worker processes.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     This function is run in a local thread. | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     Args: | 
					
						
							|  |  |  |         executor_reference: A weakref.ref to the ProcessPoolExecutor that owns | 
					
						
							|  |  |  |             this thread. Used to determine if the ProcessPoolExecutor has been | 
					
						
							|  |  |  |             garbage collected and that this function can exit. | 
					
						
							|  |  |  |         process: A list of the multiprocessing.Process instances used as | 
					
						
							|  |  |  |             workers. | 
					
						
							|  |  |  |         pending_work_items: A dict mapping work ids to _WorkItems e.g. | 
					
						
							|  |  |  |             {5: <_WorkItem...>, 6: <_WorkItem...>, ...} | 
					
						
							|  |  |  |         work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). | 
					
						
							|  |  |  |         call_queue: A multiprocessing.Queue that will be filled with _CallItems | 
					
						
							|  |  |  |             derived from _WorkItems for processing by the process workers. | 
					
						
							|  |  |  |         result_queue: A multiprocessing.Queue of _ResultItems generated by the | 
					
						
							|  |  |  |             process workers. | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2011-07-02 21:20:25 +02:00
										 |  |  |     executor = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def shutting_down(): | 
					
						
							|  |  |  |         return _shutdown or executor is None or executor._shutdown_thread | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def shutdown_worker(): | 
					
						
							|  |  |  |         # This is an upper bound | 
					
						
							|  |  |  |         nb_children_alive = sum(p.is_alive() for p in processes.values()) | 
					
						
							|  |  |  |         for i in range(0, nb_children_alive): | 
					
						
							| 
									
										
										
										
											2011-07-03 13:17:06 +02:00
										 |  |  |             call_queue.put_nowait(None) | 
					
						
							| 
									
										
										
										
											2011-07-16 01:51:58 +02:00
										 |  |  |         # Release the queue's resources as soon as possible. | 
					
						
							|  |  |  |         call_queue.close() | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |         # If .join() is not called on the created processes then | 
					
						
							| 
									
										
										
										
											2011-07-02 21:20:25 +02:00
										 |  |  |         # some multiprocessing.Queue methods may deadlock on Mac OS X. | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |         for p in processes.values(): | 
					
						
							|  |  |  |             p.join() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2012-03-05 19:28:37 +01:00
										 |  |  |     reader = result_queue._reader | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |     while True: | 
					
						
							|  |  |  |         _add_call_item_to_queue(pending_work_items, | 
					
						
							|  |  |  |                                 work_ids_queue, | 
					
						
							|  |  |  |                                 call_queue) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |         sentinels = [p.sentinel for p in processes.values()] | 
					
						
							|  |  |  |         assert sentinels | 
					
						
							| 
									
										
										
										
											2012-03-05 19:28:37 +01:00
										 |  |  |         ready = wait([reader] + sentinels) | 
					
						
							|  |  |  |         if reader in ready: | 
					
						
							|  |  |  |             result_item = reader.recv() | 
					
						
							|  |  |  |         else: | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |             # Mark the process pool broken so that submits fail right now. | 
					
						
							|  |  |  |             executor = executor_reference() | 
					
						
							|  |  |  |             if executor is not None: | 
					
						
							|  |  |  |                 executor._broken = True | 
					
						
							|  |  |  |                 executor._shutdown_thread = True | 
					
						
							| 
									
										
										
										
											2011-07-02 21:20:25 +02:00
										 |  |  |                 executor = None | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |             # All futures in flight must be marked failed | 
					
						
							|  |  |  |             for work_id, work_item in pending_work_items.items(): | 
					
						
							|  |  |  |                 work_item.future.set_exception( | 
					
						
							|  |  |  |                     BrokenProcessPool( | 
					
						
							|  |  |  |                         "A process in the process pool was " | 
					
						
							|  |  |  |                         "terminated abruptly while the future was " | 
					
						
							|  |  |  |                         "running or pending." | 
					
						
							|  |  |  |                     )) | 
					
						
							|  |  |  |             pending_work_items.clear() | 
					
						
							|  |  |  |             # Terminate remaining workers forcibly: the queues or their | 
					
						
							|  |  |  |             # locks may be in a dirty state and block forever. | 
					
						
							|  |  |  |             for p in processes.values(): | 
					
						
							|  |  |  |                 p.terminate() | 
					
						
							| 
									
										
										
										
											2011-07-16 01:51:58 +02:00
										 |  |  |             shutdown_worker() | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |             return | 
					
						
							|  |  |  |         if isinstance(result_item, int): | 
					
						
							|  |  |  |             # Clean shutdown of a worker using its PID | 
					
						
							|  |  |  |             # (avoids marking the executor broken) | 
					
						
							| 
									
										
										
										
											2011-07-02 21:20:25 +02:00
										 |  |  |             assert shutting_down() | 
					
						
							| 
									
										
										
										
											2011-07-16 01:13:34 +02:00
										 |  |  |             p = processes.pop(result_item) | 
					
						
							|  |  |  |             p.join() | 
					
						
							| 
									
										
										
										
											2011-07-02 21:20:25 +02:00
										 |  |  |             if not processes: | 
					
						
							|  |  |  |                 shutdown_worker() | 
					
						
							|  |  |  |                 return | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |         elif result_item is not None: | 
					
						
							|  |  |  |             work_item = pending_work_items.pop(result_item.work_id, None) | 
					
						
							|  |  |  |             # work_item can be None if another process terminated (see above) | 
					
						
							|  |  |  |             if work_item is not None: | 
					
						
							|  |  |  |                 if result_item.exception: | 
					
						
							|  |  |  |                     work_item.future.set_exception(result_item.exception) | 
					
						
							|  |  |  |                 else: | 
					
						
							|  |  |  |                     work_item.future.set_result(result_item.result) | 
					
						
							|  |  |  |         # Check whether we should start shutting down. | 
					
						
							| 
									
										
										
										
											2011-03-26 19:29:44 +01:00
										 |  |  |         executor = executor_reference() | 
					
						
							|  |  |  |         # No more work items can be added if: | 
					
						
							|  |  |  |         #   - The interpreter is shutting down OR | 
					
						
							|  |  |  |         #   - The executor that owns this worker has been collected OR | 
					
						
							|  |  |  |         #   - The executor that owns this worker has been shutdown. | 
					
						
							| 
									
										
										
										
											2011-07-02 21:20:25 +02:00
										 |  |  |         if shutting_down(): | 
					
						
							|  |  |  |             try: | 
					
						
							| 
									
										
										
										
											2011-07-03 13:17:06 +02:00
										 |  |  |                 # Since no new work items can be added, it is safe to shutdown | 
					
						
							|  |  |  |                 # this thread if there are no pending work items. | 
					
						
							|  |  |  |                 if not pending_work_items: | 
					
						
							|  |  |  |                     shutdown_worker() | 
					
						
							|  |  |  |                     return | 
					
						
							| 
									
										
										
										
											2011-07-02 21:20:25 +02:00
										 |  |  |             except Full: | 
					
						
							|  |  |  |                 # This is not a problem: we will eventually be woken up (in | 
					
						
							| 
									
										
										
										
											2011-07-03 13:17:06 +02:00
										 |  |  |                 # result_queue.get()) and be able to send a sentinel again. | 
					
						
							| 
									
										
										
										
											2011-07-02 21:20:25 +02:00
										 |  |  |                 pass | 
					
						
							|  |  |  |         executor = None | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-01-03 00:07:01 +00:00
										 |  |  | _system_limits_checked = False | 
					
						
							|  |  |  | _system_limited = None | 
					
						
							|  |  |  | def _check_system_limits(): | 
					
						
							|  |  |  |     global _system_limits_checked, _system_limited | 
					
						
							|  |  |  |     if _system_limits_checked: | 
					
						
							|  |  |  |         if _system_limited: | 
					
						
							|  |  |  |             raise NotImplementedError(_system_limited) | 
					
						
							|  |  |  |     _system_limits_checked = True | 
					
						
							|  |  |  |     try: | 
					
						
							|  |  |  |         nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") | 
					
						
							|  |  |  |     except (AttributeError, ValueError): | 
					
						
							|  |  |  |         # sysconf not available or setting not available | 
					
						
							|  |  |  |         return | 
					
						
							|  |  |  |     if nsems_max == -1: | 
					
						
							|  |  |  |         # indetermine limit, assume that limit is determined | 
					
						
							|  |  |  |         # by available memory only | 
					
						
							|  |  |  |         return | 
					
						
							|  |  |  |     if nsems_max >= 256: | 
					
						
							|  |  |  |         # minimum number of semaphores available | 
					
						
							|  |  |  |         # according to POSIX | 
					
						
							|  |  |  |         return | 
					
						
							|  |  |  |     _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max | 
					
						
							|  |  |  |     raise NotImplementedError(_system_limited) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | class BrokenProcessPool(RuntimeError): | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Raised when a process in a ProcessPoolExecutor terminated abruptly | 
					
						
							|  |  |  |     while a future was in the running state. | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | class ProcessPoolExecutor(_base.Executor): | 
					
						
							|  |  |  |     def __init__(self, max_workers=None): | 
					
						
							|  |  |  |         """Initializes a new ProcessPoolExecutor instance.
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         Args: | 
					
						
							|  |  |  |             max_workers: The maximum number of processes that can be used to | 
					
						
							|  |  |  |                 execute the given calls. If None or not given then as many | 
					
						
							|  |  |  |                 worker processes will be created as the machine has processors. | 
					
						
							|  |  |  |         """
 | 
					
						
							| 
									
										
										
										
											2011-01-03 00:07:01 +00:00
										 |  |  |         _check_system_limits() | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         if max_workers is None: | 
					
						
							|  |  |  |             self._max_workers = multiprocessing.cpu_count() | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             self._max_workers = max_workers | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Make the call queue slightly larger than the number of processes to | 
					
						
							|  |  |  |         # prevent the worker processes from idling. But don't make it too big | 
					
						
							|  |  |  |         # because futures in the call queue cannot be cancelled. | 
					
						
							|  |  |  |         self._call_queue = multiprocessing.Queue(self._max_workers + | 
					
						
							|  |  |  |                                                  EXTRA_QUEUED_CALLS) | 
					
						
							| 
									
										
										
										
											2011-07-16 01:51:58 +02:00
										 |  |  |         # Killed worker processes can produce spurious "broken pipe" | 
					
						
							|  |  |  |         # tracebacks in the queue's own worker thread. But we detect killed | 
					
						
							|  |  |  |         # processes anyway, so silence the tracebacks. | 
					
						
							|  |  |  |         self._call_queue._ignore_epipe = True | 
					
						
							| 
									
										
										
										
											2011-04-12 17:58:11 +02:00
										 |  |  |         self._result_queue = SimpleQueue() | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |         self._work_ids = queue.Queue() | 
					
						
							|  |  |  |         self._queue_management_thread = None | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |         # Map of pids to processes | 
					
						
							|  |  |  |         self._processes = {} | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |         # Shutdown is a two-step process. | 
					
						
							|  |  |  |         self._shutdown_thread = False | 
					
						
							|  |  |  |         self._shutdown_lock = threading.Lock() | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |         self._broken = False | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |         self._queue_count = 0 | 
					
						
							|  |  |  |         self._pending_work_items = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _start_queue_management_thread(self): | 
					
						
							| 
									
										
										
										
											2011-03-26 19:29:44 +01:00
										 |  |  |         # When the executor gets lost, the weakref callback will wake up | 
					
						
							|  |  |  |         # the queue management thread. | 
					
						
							|  |  |  |         def weakref_cb(_, q=self._result_queue): | 
					
						
							|  |  |  |             q.put(None) | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |         if self._queue_management_thread is None: | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |             # Start the processes so that their sentinels are known. | 
					
						
							|  |  |  |             self._adjust_process_count() | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |             self._queue_management_thread = threading.Thread( | 
					
						
							| 
									
										
										
										
											2011-05-03 16:34:42 +02:00
										 |  |  |                     target=_queue_management_worker, | 
					
						
							| 
									
										
										
										
											2011-03-26 19:29:44 +01:00
										 |  |  |                     args=(weakref.ref(self, weakref_cb), | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |                           self._processes, | 
					
						
							|  |  |  |                           self._pending_work_items, | 
					
						
							|  |  |  |                           self._work_ids, | 
					
						
							|  |  |  |                           self._call_queue, | 
					
						
							| 
									
										
										
										
											2011-04-12 17:48:46 +02:00
										 |  |  |                           self._result_queue)) | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |             self._queue_management_thread.daemon = True | 
					
						
							|  |  |  |             self._queue_management_thread.start() | 
					
						
							| 
									
										
										
										
											2011-03-26 19:29:44 +01:00
										 |  |  |             _threads_queues[self._queue_management_thread] = self._result_queue | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def _adjust_process_count(self): | 
					
						
							|  |  |  |         for _ in range(len(self._processes), self._max_workers): | 
					
						
							|  |  |  |             p = multiprocessing.Process( | 
					
						
							|  |  |  |                     target=_process_worker, | 
					
						
							|  |  |  |                     args=(self._call_queue, | 
					
						
							| 
									
										
										
										
											2011-04-12 17:48:46 +02:00
										 |  |  |                           self._result_queue)) | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |             p.start() | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |             self._processes[p.pid] = p | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def submit(self, fn, *args, **kwargs): | 
					
						
							|  |  |  |         with self._shutdown_lock: | 
					
						
							| 
									
										
										
										
											2011-06-08 17:21:55 +02:00
										 |  |  |             if self._broken: | 
					
						
							|  |  |  |                 raise BrokenProcessPool('A child process terminated ' | 
					
						
							|  |  |  |                     'abruptly, the process pool is not usable anymore') | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |             if self._shutdown_thread: | 
					
						
							|  |  |  |                 raise RuntimeError('cannot schedule new futures after shutdown') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             f = _base.Future() | 
					
						
							|  |  |  |             w = _WorkItem(f, fn, args, kwargs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             self._pending_work_items[self._queue_count] = w | 
					
						
							|  |  |  |             self._work_ids.put(self._queue_count) | 
					
						
							|  |  |  |             self._queue_count += 1 | 
					
						
							| 
									
										
										
										
											2011-03-26 19:29:44 +01:00
										 |  |  |             # Wake up queue management thread | 
					
						
							|  |  |  |             self._result_queue.put(None) | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |             self._start_queue_management_thread() | 
					
						
							|  |  |  |             return f | 
					
						
							|  |  |  |     submit.__doc__ = _base.Executor.submit.__doc__ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def shutdown(self, wait=True): | 
					
						
							|  |  |  |         with self._shutdown_lock: | 
					
						
							|  |  |  |             self._shutdown_thread = True | 
					
						
							| 
									
										
										
										
											2011-03-26 19:29:44 +01:00
										 |  |  |         if self._queue_management_thread: | 
					
						
							|  |  |  |             # Wake up queue management thread | 
					
						
							|  |  |  |             self._result_queue.put(None) | 
					
						
							|  |  |  |             if wait: | 
					
						
							| 
									
										
										
										
											2010-09-18 22:35:02 +00:00
										 |  |  |                 self._queue_management_thread.join() | 
					
						
							|  |  |  |         # To reduce the risk of openning too many files, remove references to | 
					
						
							|  |  |  |         # objects that use file descriptors. | 
					
						
							|  |  |  |         self._queue_management_thread = None | 
					
						
							|  |  |  |         self._call_queue = None | 
					
						
							|  |  |  |         self._result_queue = None | 
					
						
							|  |  |  |         self._processes = None | 
					
						
							|  |  |  |     shutdown.__doc__ = _base.Executor.shutdown.__doc__ | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | atexit.register(_python_exit) |