diff options
Diffstat (limited to 'vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py')
-rwxr-xr-x | vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py b/vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py new file mode 100755 index 0000000..98684f8 --- /dev/null +++ b/vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py @@ -0,0 +1,363 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ProcessPoolExecutor. + +The follow diagram and text describe the data-flow through the system: + +|======================= In-process =====================|== Out-of-process ==| + ++----------+ +----------+ +--------+ +-----------+ +---------+ +| | => | Work Ids | => | | => | Call Q | => | | +| | +----------+ | | +-----------+ | | +| | | ... | | | | ... | | | +| | | 6 | | | | 5, call() | | | +| | | 7 | | | | ... | | | +| Process | | ... | | Local | +-----------+ | Process | +| Pool | +----------+ | Worker | | #1..n | +| Executor | | Thread | | | +| | +----------- + | | +-----------+ | | +| | <=> | Work Items | <=> | | <= | Result Q | <= | | +| | +------------+ | | +-----------+ | | +| | | 6: call() | | | | ... | | | +| | | future | | | | 4, result | | | +| | | ... | | | | 3, except | | | ++----------+ +------------+ +--------+ +-----------+ +---------+ + +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict +- adds the id of the _WorkItem to the "Work Ids" queue + +Local worker thread: +- reads work ids from the "Work Ids" queue and looks up the corresponding + WorkItem from the "Work Items" dict: if the work item has been cancelled then + it is simply removed from the dict, otherwise it is repackaged as a + _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" + until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because + calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). +- reads _ResultItems from "Result Q", updates the future stored in the + "Work Items" dict and deletes the dict entry + +Process #1..n: +- reads _CallItems from "Call Q", executes the calls, and puts the resulting + _ResultItems in "Request Q" +""" + +from __future__ import with_statement +import atexit +import multiprocessing +import threading +import weakref +import sys + +from concurrent.futures import _base + +try: + import queue +except ImportError: + import Queue as queue + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +# Workers are created as daemon threads and processes. This is done to allow the +# interpreter to exit when there are still idle processes in a +# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, +# allowing workers to die with the interpreter has two undesirable properties: +# - The workers would still be running during interpretor shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads/processes finish. + +_threads_queues = weakref.WeakKeyDictionary() +_shutdown = False + +def _python_exit(): + global _shutdown + _shutdown = True + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() + +# Controls how many more calls than processes will be queued in the call queue. +# A smaller number will mean that processes spend more time idle waiting for +# work while a larger number will make Future.cancel() succeed less frequently +# (Futures in the call queue cannot be cancelled). +EXTRA_QUEUED_CALLS = 1 + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, fn, args, kwargs): + self.work_id = work_id + self.fn = fn + self.args = args + self.kwargs = kwargs + +def _process_worker(call_queue, result_queue): + """Evaluates calls from call_queue and places the results in result_queue. + + This worker is run in a separate process. + + Args: + call_queue: A multiprocessing.Queue of _CallItems that will be read and + evaluated by the worker. + result_queue: A multiprocessing.Queue of _ResultItems that will written + to by the worker. + shutdown: A multiprocessing.Event that will be set as a signal to the + worker that it should exit when call_queue is empty. + """ + while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(None) + return + try: + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException: + e = sys.exc_info()[1] + result_queue.put(_ResultItem(call_item.work_id, + exception=e)) + else: + result_queue.put(_ResultItem(call_item.work_id, + result=r)) + +def _add_call_item_to_queue(pending_work_items, + work_ids, + call_queue): + """Fills call_queue with _WorkItems from pending_work_items. + + This function never blocks. + + Args: + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids + are consumed and the corresponding _WorkItems from + pending_work_items are transformed into _CallItems and put in + call_queue. + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems. + """ + while True: + if call_queue.full(): + return + try: + work_id = work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del pending_work_items[work_id] + continue + +def _queue_management_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue): + """Manages the communication between this process and the worker processes. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + process: A list of the multiprocessing.Process instances used as + workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + result_queue: A multiprocessing.Queue of _ResultItems generated by the + process workers. + """ + nb_shutdown_processes = [0] + def shutdown_one_process(): + """Tell a worker to terminate, which will in turn wake us again""" + call_queue.put(None) + nb_shutdown_processes[0] += 1 + while True: + _add_call_item_to_queue(pending_work_items, + work_ids_queue, + call_queue) + + result_item = result_queue.get(block=True) + if result_item is not None: + work_item = pending_work_items[result_item.work_id] + del pending_work_items[result_item.work_id] + + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + # Check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if _shutdown or executor is None or executor._shutdown_thread: + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + while nb_shutdown_processes[0] < len(processes): + shutdown_one_process() + # If .join() is not called on the created processes then + # some multiprocessing.Queue methods may deadlock on Mac OS + # X. + for p in processes: + p.join() + call_queue.close() + return + del executor + +_system_limits_checked = False +_system_limited = None +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + import os + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermine limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max + raise NotImplementedError(_system_limited) + +class ProcessPoolExecutor(_base.Executor): + def __init__(self, max_workers=None): + """Initializes a new ProcessPoolExecutor instance. + + Args: + max_workers: The maximum number of processes that can be used to + execute the given calls. If None or not given then as many + worker processes will be created as the machine has processors. + """ + _check_system_limits() + + if max_workers is None: + self._max_workers = multiprocessing.cpu_count() + else: + self._max_workers = max_workers + + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + self._call_queue = multiprocessing.Queue(self._max_workers + + EXTRA_QUEUED_CALLS) + self._result_queue = multiprocessing.Queue() + self._work_ids = queue.Queue() + self._queue_management_thread = None + self._processes = set() + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_lock = threading.Lock() + self._queue_count = 0 + self._pending_work_items = {} + + def _start_queue_management_thread(self): + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, q=self._result_queue): + q.put(None) + if self._queue_management_thread is None: + self._queue_management_thread = threading.Thread( + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), + self._processes, + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue)) + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + _threads_queues[self._queue_management_thread] = self._result_queue + + def _adjust_process_count(self): + for _ in range(len(self._processes), self._max_workers): + p = multiprocessing.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue)) + p.start() + self._processes.add(p) + + def submit(self, fn, *args, **kwargs): + with self._shutdown_lock: + if self._shutdown_thread: + raise RuntimeError('cannot schedule new futures after shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 + # Wake up queue management thread + self._result_queue.put(None) + + self._start_queue_management_thread() + self._adjust_process_count() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown_thread = True + if self._queue_management_thread: + # Wake up queue management thread + self._result_queue.put(None) + if wait: + self._queue_management_thread.join() + # To reduce the risk of openning too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._call_queue = None + self._result_queue = None + self._processes = None + shutdown.__doc__ = _base.Executor.shutdown.__doc__ + +atexit.register(_python_exit) |