aboutsummaryrefslogtreecommitdiff
path: root/vim/bundle/YouCompleteMe/third_party/pythonfutures/test_futures.py
diff options
context:
space:
mode:
Diffstat (limited to 'vim/bundle/YouCompleteMe/third_party/pythonfutures/test_futures.py')
m---------vim/bundle/YouCompleteMe0
-rwxr-xr-xvim/bundle/YouCompleteMe/third_party/pythonfutures/test_futures.py723
2 files changed, 0 insertions, 723 deletions
diff --git a/vim/bundle/YouCompleteMe b/vim/bundle/YouCompleteMe
new file mode 160000
+Subproject 0de1c0c9bb13ce82172b472c676035cd47cf6a6
diff --git a/vim/bundle/YouCompleteMe/third_party/pythonfutures/test_futures.py b/vim/bundle/YouCompleteMe/third_party/pythonfutures/test_futures.py
deleted file mode 100755
index dd7fd3e..0000000
--- a/vim/bundle/YouCompleteMe/third_party/pythonfutures/test_futures.py
+++ /dev/null
@@ -1,723 +0,0 @@
-from __future__ import with_statement
-import os
-import subprocess
-import sys
-import threading
-import functools
-import contextlib
-import logging
-import re
-import time
-
-from concurrent import futures
-from concurrent.futures._base import (
- PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
-
-try:
- import unittest2 as unittest
-except ImportError:
- import unittest
-
-try:
- from StringIO import StringIO
-except ImportError:
- from io import StringIO
-
-try:
- from test import test_support
-except ImportError:
- from test import support as test_support
-
-try:
- next
-except NameError:
- next = lambda x: x.next()
-
-
-def reap_threads(func):
- """Use this function when threads are being used. This will
- ensure that the threads are cleaned up even when the test fails.
- If threading is unavailable this function does nothing.
- """
- @functools.wraps(func)
- def decorator(*args):
- key = test_support.threading_setup()
- try:
- return func(*args)
- finally:
- test_support.threading_cleanup(*key)
- return decorator
-
-
-# Executing the interpreter in a subprocess
-def _assert_python(expected_success, *args, **env_vars):
- cmd_line = [sys.executable]
- if not env_vars:
- cmd_line.append('-E')
- # Need to preserve the original environment, for in-place testing of
- # shared library builds.
- env = os.environ.copy()
- # But a special flag that can be set to override -- in this case, the
- # caller is responsible to pass the full environment.
- if env_vars.pop('__cleanenv', None):
- env = {}
- env.update(env_vars)
- cmd_line.extend(args)
- p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- env=env)
- try:
- out, err = p.communicate()
- finally:
- subprocess._cleanup()
- p.stdout.close()
- p.stderr.close()
- rc = p.returncode
- err = strip_python_stderr(err)
- if (rc and expected_success) or (not rc and not expected_success):
- raise AssertionError(
- "Process return code is %d, "
- "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
- return rc, out, err
-
-
-def assert_python_ok(*args, **env_vars):
- """
- Assert that running the interpreter with `args` and optional environment
- variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
- """
- return _assert_python(True, *args, **env_vars)
-
-
-def strip_python_stderr(stderr):
- """Strip the stderr of a Python process from potential debug output
- emitted by the interpreter.
-
- This will typically be run on the result of the communicate() method
- of a subprocess.Popen object.
- """
- stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip()
- return stderr
-
-
-@contextlib.contextmanager
-def captured_stderr():
- """Return a context manager used by captured_stdout/stdin/stderr
- that temporarily replaces the sys stream *stream_name* with a StringIO."""
- logging_stream = StringIO()
- handler = logging.StreamHandler(logging_stream)
- logging.root.addHandler(handler)
-
- try:
- yield logging_stream
- finally:
- logging.root.removeHandler(handler)
-
-
-def create_future(state=PENDING, exception=None, result=None):
- f = Future()
- f._state = state
- f._exception = exception
- f._result = result
- return f
-
-
-PENDING_FUTURE = create_future(state=PENDING)
-RUNNING_FUTURE = create_future(state=RUNNING)
-CANCELLED_FUTURE = create_future(state=CANCELLED)
-CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
-EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
-SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
-
-
-def mul(x, y):
- return x * y
-
-
-def sleep_and_raise(t):
- time.sleep(t)
- raise Exception('this is an exception')
-
-def sleep_and_print(t, msg):
- time.sleep(t)
- print(msg)
- sys.stdout.flush()
-
-
-class ExecutorMixin:
- worker_count = 5
-
- def setUp(self):
- self.t1 = time.time()
- try:
- self.executor = self.executor_type(max_workers=self.worker_count)
- except NotImplementedError:
- e = sys.exc_info()[1]
- self.skipTest(str(e))
- self._prime_executor()
-
- def tearDown(self):
- self.executor.shutdown(wait=True)
- dt = time.time() - self.t1
- if test_support.verbose:
- print("%.2fs" % dt)
- self.assertLess(dt, 60, "synchronization issue: test lasted too long")
-
- def _prime_executor(self):
- # Make sure that the executor is ready to do work before running the
- # tests. This should reduce the probability of timeouts in the tests.
- futures = [self.executor.submit(time.sleep, 0.1)
- for _ in range(self.worker_count)]
-
- for f in futures:
- f.result()
-
-
-class ThreadPoolMixin(ExecutorMixin):
- executor_type = futures.ThreadPoolExecutor
-
-
-class ProcessPoolMixin(ExecutorMixin):
- executor_type = futures.ProcessPoolExecutor
-
-
-class ExecutorShutdownTest(unittest.TestCase):
- def test_run_after_shutdown(self):
- self.executor.shutdown()
- self.assertRaises(RuntimeError,
- self.executor.submit,
- pow, 2, 5)
-
- def test_interpreter_shutdown(self):
- # Test the atexit hook for shutdown of worker threads and processes
- rc, out, err = assert_python_ok('-c', """if 1:
- from concurrent.futures import %s
- from time import sleep
- from test_futures import sleep_and_print
- t = %s(5)
- t.submit(sleep_and_print, 1.0, "apple")
- """ % (self.executor_type.__name__, self.executor_type.__name__))
- # Errors in atexit hooks don't change the process exit code, check
- # stderr manually.
- self.assertFalse(err)
- self.assertEqual(out.strip(), "apple".encode())
-
- def test_hang_issue12364(self):
- fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
- self.executor.shutdown()
- for f in fs:
- f.result()
-
-
-class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
- def _prime_executor(self):
- pass
-
- def test_threads_terminate(self):
- self.executor.submit(mul, 21, 2)
- self.executor.submit(mul, 6, 7)
- self.executor.submit(mul, 3, 14)
- self.assertEqual(len(self.executor._threads), 3)
- self.executor.shutdown()
- for t in self.executor._threads:
- t.join()
-
- def test_context_manager_shutdown(self):
- with futures.ThreadPoolExecutor(max_workers=5) as e:
- executor = e
- self.assertEqual(list(e.map(abs, range(-5, 5))),
- [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
-
- for t in executor._threads:
- t.join()
-
- def test_del_shutdown(self):
- executor = futures.ThreadPoolExecutor(max_workers=5)
- executor.map(abs, range(-5, 5))
- threads = executor._threads
- del executor
-
- for t in threads:
- t.join()
-
-
-class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
- def _prime_executor(self):
- pass
-
- def test_processes_terminate(self):
- self.executor.submit(mul, 21, 2)
- self.executor.submit(mul, 6, 7)
- self.executor.submit(mul, 3, 14)
- self.assertEqual(len(self.executor._processes), 5)
- processes = self.executor._processes
- self.executor.shutdown()
-
- for p in processes:
- p.join()
-
- def test_context_manager_shutdown(self):
- with futures.ProcessPoolExecutor(max_workers=5) as e:
- processes = e._processes
- self.assertEqual(list(e.map(abs, range(-5, 5))),
- [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
-
- for p in processes:
- p.join()
-
- def test_del_shutdown(self):
- executor = futures.ProcessPoolExecutor(max_workers=5)
- list(executor.map(abs, range(-5, 5)))
- queue_management_thread = executor._queue_management_thread
- processes = executor._processes
- del executor
-
- queue_management_thread.join()
- for p in processes:
- p.join()
-
-
-class WaitTests(unittest.TestCase):
-
- def test_first_completed(self):
- future1 = self.executor.submit(mul, 21, 2)
- future2 = self.executor.submit(time.sleep, 1.5)
-
- done, not_done = futures.wait(
- [CANCELLED_FUTURE, future1, future2],
- return_when=futures.FIRST_COMPLETED)
-
- self.assertEqual(set([future1]), done)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
-
- def test_first_completed_some_already_completed(self):
- future1 = self.executor.submit(time.sleep, 1.5)
-
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
- return_when=futures.FIRST_COMPLETED)
-
- self.assertEqual(
- set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
- finished)
- self.assertEqual(set([future1]), pending)
-
- def test_first_exception(self):
- future1 = self.executor.submit(mul, 2, 21)
- future2 = self.executor.submit(sleep_and_raise, 1.5)
- future3 = self.executor.submit(time.sleep, 3)
-
- finished, pending = futures.wait(
- [future1, future2, future3],
- return_when=futures.FIRST_EXCEPTION)
-
- self.assertEqual(set([future1, future2]), finished)
- self.assertEqual(set([future3]), pending)
-
- def test_first_exception_some_already_complete(self):
- future1 = self.executor.submit(divmod, 21, 0)
- future2 = self.executor.submit(time.sleep, 1.5)
-
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1, future2],
- return_when=futures.FIRST_EXCEPTION)
-
- self.assertEqual(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- future1]), finished)
- self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
-
- def test_first_exception_one_already_failed(self):
- future1 = self.executor.submit(time.sleep, 2)
-
- finished, pending = futures.wait(
- [EXCEPTION_FUTURE, future1],
- return_when=futures.FIRST_EXCEPTION)
-
- self.assertEqual(set([EXCEPTION_FUTURE]), finished)
- self.assertEqual(set([future1]), pending)
-
- def test_all_completed(self):
- future1 = self.executor.submit(divmod, 2, 0)
- future2 = self.executor.submit(mul, 2, 21)
-
- finished, pending = futures.wait(
- [SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- future1,
- future2],
- return_when=futures.ALL_COMPLETED)
-
- self.assertEqual(set([SUCCESSFUL_FUTURE,
- CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- future1,
- future2]), finished)
- self.assertEqual(set(), pending)
-
- def test_timeout(self):
- future1 = self.executor.submit(mul, 6, 7)
- future2 = self.executor.submit(time.sleep, 3)
-
- finished, pending = futures.wait(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2],
- timeout=1.5,
- return_when=futures.ALL_COMPLETED)
-
- self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1]), finished)
- self.assertEqual(set([future2]), pending)
-
-
-class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
-
- def test_pending_calls_race(self):
- # Issue #14406: multi-threaded race condition when waiting on all
- # futures.
- event = threading.Event()
- def future_func():
- event.wait()
- oldswitchinterval = sys.getcheckinterval()
- sys.setcheckinterval(1)
- try:
- fs = set(self.executor.submit(future_func) for i in range(100))
- event.set()
- futures.wait(fs, return_when=futures.ALL_COMPLETED)
- finally:
- sys.setcheckinterval(oldswitchinterval)
-
-
-class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
- pass
-
-
-class AsCompletedTests(unittest.TestCase):
- # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
- def test_no_timeout(self):
- future1 = self.executor.submit(mul, 2, 21)
- future2 = self.executor.submit(mul, 7, 6)
-
- completed = set(futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]))
- self.assertEqual(set(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1, future2]),
- completed)
-
- def test_zero_timeout(self):
- future1 = self.executor.submit(time.sleep, 2)
- completed_futures = set()
- try:
- for future in futures.as_completed(
- [CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE,
- future1],
- timeout=0):
- completed_futures.add(future)
- except futures.TimeoutError:
- pass
-
- self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
- EXCEPTION_FUTURE,
- SUCCESSFUL_FUTURE]),
- completed_futures)
-
-
-class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
- pass
-
-
-class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
- pass
-
-
-class ExecutorTest(unittest.TestCase):
- # Executor.shutdown() and context manager usage is tested by
- # ExecutorShutdownTest.
- def test_submit(self):
- future = self.executor.submit(pow, 2, 8)
- self.assertEqual(256, future.result())
-
- def test_submit_keyword(self):
- future = self.executor.submit(mul, 2, y=8)
- self.assertEqual(16, future.result())
-
- def test_map(self):
- self.assertEqual(
- list(self.executor.map(pow, range(10), range(10))),
- list(map(pow, range(10), range(10))))
-
- def test_map_exception(self):
- i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
- self.assertEqual(next(i), (0, 1))
- self.assertEqual(next(i), (0, 1))
- self.assertRaises(ZeroDivisionError, next, i)
-
- def test_map_timeout(self):
- results = []
- try:
- for i in self.executor.map(time.sleep,
- [0, 0, 3],
- timeout=1.5):
- results.append(i)
- except futures.TimeoutError:
- pass
- else:
- self.fail('expected TimeoutError')
-
- self.assertEqual([None, None], results)
-
-
-class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
- pass
-
-
-class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
- pass
-
-
-class FutureTests(unittest.TestCase):
- def test_done_callback_with_result(self):
- callback_result = [None]
- def fn(callback_future):
- callback_result[0] = callback_future.result()
-
- f = Future()
- f.add_done_callback(fn)
- f.set_result(5)
- self.assertEqual(5, callback_result[0])
-
- def test_done_callback_with_exception(self):
- callback_exception = [None]
- def fn(callback_future):
- callback_exception[0] = callback_future.exception()
-
- f = Future()
- f.add_done_callback(fn)
- f.set_exception(Exception('test'))
- self.assertEqual(('test',), callback_exception[0].args)
-
- def test_done_callback_with_cancel(self):
- was_cancelled = [None]
- def fn(callback_future):
- was_cancelled[0] = callback_future.cancelled()
-
- f = Future()
- f.add_done_callback(fn)
- self.assertTrue(f.cancel())
- self.assertTrue(was_cancelled[0])
-
- def test_done_callback_raises(self):
- with captured_stderr() as stderr:
- raising_was_called = [False]
- fn_was_called = [False]
-
- def raising_fn(callback_future):
- raising_was_called[0] = True
- raise Exception('doh!')
-
- def fn(callback_future):
- fn_was_called[0] = True
-
- f = Future()
- f.add_done_callback(raising_fn)
- f.add_done_callback(fn)
- f.set_result(5)
- self.assertTrue(raising_was_called)
- self.assertTrue(fn_was_called)
- self.assertIn('Exception: doh!', stderr.getvalue())
-
- def test_done_callback_already_successful(self):
- callback_result = [None]
- def fn(callback_future):
- callback_result[0] = callback_future.result()
-
- f = Future()
- f.set_result(5)
- f.add_done_callback(fn)
- self.assertEqual(5, callback_result[0])
-
- def test_done_callback_already_failed(self):
- callback_exception = [None]
- def fn(callback_future):
- callback_exception[0] = callback_future.exception()
-
- f = Future()
- f.set_exception(Exception('test'))
- f.add_done_callback(fn)
- self.assertEqual(('test',), callback_exception[0].args)
-
- def test_done_callback_already_cancelled(self):
- was_cancelled = [None]
- def fn(callback_future):
- was_cancelled[0] = callback_future.cancelled()
-
- f = Future()
- self.assertTrue(f.cancel())
- f.add_done_callback(fn)
- self.assertTrue(was_cancelled[0])
-
- def test_repr(self):
- self.assertRegexpMatches(repr(PENDING_FUTURE),
- '<Future at 0x[0-9a-f]+ state=pending>')
- self.assertRegexpMatches(repr(RUNNING_FUTURE),
- '<Future at 0x[0-9a-f]+ state=running>')
- self.assertRegexpMatches(repr(CANCELLED_FUTURE),
- '<Future at 0x[0-9a-f]+ state=cancelled>')
- self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
- '<Future at 0x[0-9a-f]+ state=cancelled>')
- self.assertRegexpMatches(
- repr(EXCEPTION_FUTURE),
- '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
- self.assertRegexpMatches(
- repr(SUCCESSFUL_FUTURE),
- '<Future at 0x[0-9a-f]+ state=finished returned int>')
-
- def test_cancel(self):
- f1 = create_future(state=PENDING)
- f2 = create_future(state=RUNNING)
- f3 = create_future(state=CANCELLED)
- f4 = create_future(state=CANCELLED_AND_NOTIFIED)
- f5 = create_future(state=FINISHED, exception=IOError())
- f6 = create_future(state=FINISHED, result=5)
-
- self.assertTrue(f1.cancel())
- self.assertEqual(f1._state, CANCELLED)
-
- self.assertFalse(f2.cancel())
- self.assertEqual(f2._state, RUNNING)
-
- self.assertTrue(f3.cancel())
- self.assertEqual(f3._state, CANCELLED)
-
- self.assertTrue(f4.cancel())
- self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
-
- self.assertFalse(f5.cancel())
- self.assertEqual(f5._state, FINISHED)
-
- self.assertFalse(f6.cancel())
- self.assertEqual(f6._state, FINISHED)
-
- def test_cancelled(self):
- self.assertFalse(PENDING_FUTURE.cancelled())
- self.assertFalse(RUNNING_FUTURE.cancelled())
- self.assertTrue(CANCELLED_FUTURE.cancelled())
- self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
- self.assertFalse(EXCEPTION_FUTURE.cancelled())
- self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
-
- def test_done(self):
- self.assertFalse(PENDING_FUTURE.done())
- self.assertFalse(RUNNING_FUTURE.done())
- self.assertTrue(CANCELLED_FUTURE.done())
- self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
- self.assertTrue(EXCEPTION_FUTURE.done())
- self.assertTrue(SUCCESSFUL_FUTURE.done())
-
- def test_running(self):
- self.assertFalse(PENDING_FUTURE.running())
- self.assertTrue(RUNNING_FUTURE.running())
- self.assertFalse(CANCELLED_FUTURE.running())
- self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
- self.assertFalse(EXCEPTION_FUTURE.running())
- self.assertFalse(SUCCESSFUL_FUTURE.running())
-
- def test_result_with_timeout(self):
- self.assertRaises(futures.TimeoutError,
- PENDING_FUTURE.result, timeout=0)
- self.assertRaises(futures.TimeoutError,
- RUNNING_FUTURE.result, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_FUTURE.result, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
- self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
- self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
-
- def test_result_with_success(self):
- # TODO(brian@sweetapp.com): This test is timing dependant.
- def notification():
- # Wait until the main thread is waiting for the result.
- time.sleep(1)
- f1.set_result(42)
-
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
-
- self.assertEqual(f1.result(timeout=5), 42)
-
- def test_result_with_cancel(self):
- # TODO(brian@sweetapp.com): This test is timing dependant.
- def notification():
- # Wait until the main thread is waiting for the result.
- time.sleep(1)
- f1.cancel()
-
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
-
- self.assertRaises(futures.CancelledError, f1.result, timeout=5)
-
- def test_exception_with_timeout(self):
- self.assertRaises(futures.TimeoutError,
- PENDING_FUTURE.exception, timeout=0)
- self.assertRaises(futures.TimeoutError,
- RUNNING_FUTURE.exception, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_FUTURE.exception, timeout=0)
- self.assertRaises(futures.CancelledError,
- CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
- self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
- IOError))
- self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
-
- def test_exception_with_success(self):
- def notification():
- # Wait until the main thread is waiting for the exception.
- time.sleep(1)
- with f1._condition:
- f1._state = FINISHED
- f1._exception = IOError()
- f1._condition.notify_all()
-
- f1 = create_future(state=PENDING)
- t = threading.Thread(target=notification)
- t.start()
-
- self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
-
-@reap_threads
-def test_main():
- try:
- test_support.run_unittest(ProcessPoolExecutorTest,
- ThreadPoolExecutorTest,
- ProcessPoolWaitTests,
- ThreadPoolWaitTests,
- ProcessPoolAsCompletedTests,
- ThreadPoolAsCompletedTests,
- FutureTests,
- ProcessPoolShutdownTest,
- ThreadPoolShutdownTest)
- finally:
- test_support.reap_children()
-
-if __name__ == "__main__":
- test_main()