diff options
Diffstat (limited to 'vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py')
m--------- | vim/bundle/YouCompleteMe | 0 | ||||
-rwxr-xr-x | vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py | 363 |
2 files changed, 0 insertions, 363 deletions
diff --git a/vim/bundle/YouCompleteMe b/vim/bundle/YouCompleteMe new file mode 160000 +Subproject 0de1c0c9bb13ce82172b472c676035cd47cf6a6 diff --git a/vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py b/vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py deleted file mode 100755 index 98684f8..0000000 --- a/vim/bundle/YouCompleteMe/third_party/pythonfutures/concurrent/futures/process.py +++ /dev/null @@ -1,363 +0,0 @@ -# Copyright 2009 Brian Quinlan. All Rights Reserved. -# Licensed to PSF under a Contributor Agreement. - -"""Implements ProcessPoolExecutor. - -The follow diagram and text describe the data-flow through the system: - -|======================= In-process =====================|== Out-of-process ==| - -+----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | -| | | 7 | | | | ... | | | -| Process | | ... | | Local | +-----------+ | Process | -| Pool | +----------+ | Worker | | #1..n | -| Executor | | Thread | | | -| | +----------- + | | +-----------+ | | -| | <=> | Work Items | <=> | | <= | Result Q | <= | | -| | +------------+ | | +-----------+ | | -| | | 6: call() | | | | ... | | | -| | | future | | | | 4, result | | | -| | | ... | | | | 3, except | | | -+----------+ +------------+ +--------+ +-----------+ +---------+ - -Executor.submit() called: -- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict -- adds the id of the _WorkItem to the "Work Ids" queue - -Local worker thread: -- reads work ids from the "Work Ids" queue and looks up the corresponding - WorkItem from the "Work Items" dict: if the work item has been cancelled then - it is simply removed from the dict, otherwise it is repackaged as a - _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" - until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because - calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). -- reads _ResultItems from "Result Q", updates the future stored in the - "Work Items" dict and deletes the dict entry - -Process #1..n: -- reads _CallItems from "Call Q", executes the calls, and puts the resulting - _ResultItems in "Request Q" -""" - -from __future__ import with_statement -import atexit -import multiprocessing -import threading -import weakref -import sys - -from concurrent.futures import _base - -try: - import queue -except ImportError: - import Queue as queue - -__author__ = 'Brian Quinlan (brian@sweetapp.com)' - -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a -# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, -# allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpretor shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads/processes finish. - -_threads_queues = weakref.WeakKeyDictionary() -_shutdown = False - -def _python_exit(): - global _shutdown - _shutdown = True - items = list(_threads_queues.items()) - for t, q in items: - q.put(None) - for t, q in items: - t.join() - -# Controls how many more calls than processes will be queued in the call queue. -# A smaller number will mean that processes spend more time idle waiting for -# work while a larger number will make Future.cancel() succeed less frequently -# (Futures in the call queue cannot be cancelled). -EXTRA_QUEUED_CALLS = 1 - -class _WorkItem(object): - def __init__(self, future, fn, args, kwargs): - self.future = future - self.fn = fn - self.args = args - self.kwargs = kwargs - -class _ResultItem(object): - def __init__(self, work_id, exception=None, result=None): - self.work_id = work_id - self.exception = exception - self.result = result - -class _CallItem(object): - def __init__(self, work_id, fn, args, kwargs): - self.work_id = work_id - self.fn = fn - self.args = args - self.kwargs = kwargs - -def _process_worker(call_queue, result_queue): - """Evaluates calls from call_queue and places the results in result_queue. - - This worker is run in a separate process. - - Args: - call_queue: A multiprocessing.Queue of _CallItems that will be read and - evaluated by the worker. - result_queue: A multiprocessing.Queue of _ResultItems that will written - to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. - """ - while True: - call_item = call_queue.get(block=True) - if call_item is None: - # Wake up queue management thread - result_queue.put(None) - return - try: - r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException: - e = sys.exc_info()[1] - result_queue.put(_ResultItem(call_item.work_id, - exception=e)) - else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) - -def _add_call_item_to_queue(pending_work_items, - work_ids, - call_queue): - """Fills call_queue with _WorkItems from pending_work_items. - - This function never blocks. - - Args: - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids - are consumed and the corresponding _WorkItems from - pending_work_items are transformed into _CallItems and put in - call_queue. - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems. - """ - while True: - if call_queue.full(): - return - try: - work_id = work_ids.get(block=False) - except queue.Empty: - return - else: - work_item = pending_work_items[work_id] - - if work_item.future.set_running_or_notify_cancel(): - call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) - else: - del pending_work_items[work_id] - continue - -def _queue_management_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue): - """Manages the communication between this process and the worker processes. - - This function is run in a local thread. - - Args: - executor_reference: A weakref.ref to the ProcessPoolExecutor that owns - this thread. Used to determine if the ProcessPoolExecutor has been - garbage collected and that this function can exit. - process: A list of the multiprocessing.Process instances used as - workers. - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems for processing by the process workers. - result_queue: A multiprocessing.Queue of _ResultItems generated by the - process workers. - """ - nb_shutdown_processes = [0] - def shutdown_one_process(): - """Tell a worker to terminate, which will in turn wake us again""" - call_queue.put(None) - nb_shutdown_processes[0] += 1 - while True: - _add_call_item_to_queue(pending_work_items, - work_ids_queue, - call_queue) - - result_item = result_queue.get(block=True) - if result_item is not None: - work_item = pending_work_items[result_item.work_id] - del pending_work_items[result_item.work_id] - - if result_item.exception: - work_item.future.set_exception(result_item.exception) - else: - work_item.future.set_result(result_item.result) - # Check whether we should start shutting down. - executor = executor_reference() - # No more work items can be added if: - # - The interpreter is shutting down OR - # - The executor that owns this worker has been collected OR - # - The executor that owns this worker has been shutdown. - if _shutdown or executor is None or executor._shutdown_thread: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - while nb_shutdown_processes[0] < len(processes): - shutdown_one_process() - # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS - # X. - for p in processes: - p.join() - call_queue.close() - return - del executor - -_system_limits_checked = False -_system_limited = None -def _check_system_limits(): - global _system_limits_checked, _system_limited - if _system_limits_checked: - if _system_limited: - raise NotImplementedError(_system_limited) - _system_limits_checked = True - try: - import os - nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") - except (AttributeError, ValueError): - # sysconf not available or setting not available - return - if nsems_max == -1: - # indetermine limit, assume that limit is determined - # by available memory only - return - if nsems_max >= 256: - # minimum number of semaphores available - # according to POSIX - return - _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max - raise NotImplementedError(_system_limited) - -class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): - """Initializes a new ProcessPoolExecutor instance. - - Args: - max_workers: The maximum number of processes that can be used to - execute the given calls. If None or not given then as many - worker processes will be created as the machine has processors. - """ - _check_system_limits() - - if max_workers is None: - self._max_workers = multiprocessing.cpu_count() - else: - self._max_workers = max_workers - - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from idling. But don't make it too big - # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_workers + - EXTRA_QUEUED_CALLS) - self._result_queue = multiprocessing.Queue() - self._work_ids = queue.Queue() - self._queue_management_thread = None - self._processes = set() - - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_lock = threading.Lock() - self._queue_count = 0 - self._pending_work_items = {} - - def _start_queue_management_thread(self): - # When the executor gets lost, the weakref callback will wake up - # the queue management thread. - def weakref_cb(_, q=self._result_queue): - q.put(None) - if self._queue_management_thread is None: - self._queue_management_thread = threading.Thread( - target=_queue_management_worker, - args=(weakref.ref(self, weakref_cb), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue)) - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - _threads_queues[self._queue_management_thread] = self._result_queue - - def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): - p = multiprocessing.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue)) - p.start() - self._processes.add(p) - - def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') - - f = _base.Future() - w = _WorkItem(f, fn, args, kwargs) - - self._pending_work_items[self._queue_count] = w - self._work_ids.put(self._queue_count) - self._queue_count += 1 - # Wake up queue management thread - self._result_queue.put(None) - - self._start_queue_management_thread() - self._adjust_process_count() - return f - submit.__doc__ = _base.Executor.submit.__doc__ - - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True - if self._queue_management_thread: - # Wake up queue management thread - self._result_queue.put(None) - if wait: - self._queue_management_thread.join() - # To reduce the risk of openning too many files, remove references to - # objects that use file descriptors. - self._queue_management_thread = None - self._call_queue = None - self._result_queue = None - self._processes = None - shutdown.__doc__ = _base.Executor.shutdown.__doc__ - -atexit.register(_python_exit) |