mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			102 lines
		
	
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			102 lines
		
	
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import contextlib
 | 
						|
import multiprocessing as mp
 | 
						|
import multiprocessing.process
 | 
						|
import multiprocessing.util
 | 
						|
import os
 | 
						|
import threading
 | 
						|
import unittest
 | 
						|
from concurrent import futures
 | 
						|
from test import support
 | 
						|
 | 
						|
from .executor import ExecutorTest, mul
 | 
						|
from .util import BaseTestCase, ThreadPoolMixin, setup_module
 | 
						|
 | 
						|
 | 
						|
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
 | 
						|
    def test_map_submits_without_iteration(self):
 | 
						|
        """Tests verifying issue 11777."""
 | 
						|
        finished = []
 | 
						|
        def record_finished(n):
 | 
						|
            finished.append(n)
 | 
						|
 | 
						|
        self.executor.map(record_finished, range(10))
 | 
						|
        self.executor.shutdown(wait=True)
 | 
						|
        self.assertCountEqual(finished, range(10))
 | 
						|
 | 
						|
    def test_default_workers(self):
 | 
						|
        executor = self.executor_type()
 | 
						|
        expected = min(32, (os.process_cpu_count() or 1) + 4)
 | 
						|
        self.assertEqual(executor._max_workers, expected)
 | 
						|
 | 
						|
    def test_saturation(self):
 | 
						|
        executor = self.executor_type(4)
 | 
						|
        def acquire_lock(lock):
 | 
						|
            lock.acquire()
 | 
						|
 | 
						|
        sem = threading.Semaphore(0)
 | 
						|
        for i in range(15 * executor._max_workers):
 | 
						|
            executor.submit(acquire_lock, sem)
 | 
						|
        self.assertEqual(len(executor._threads), executor._max_workers)
 | 
						|
        for i in range(15 * executor._max_workers):
 | 
						|
            sem.release()
 | 
						|
        executor.shutdown(wait=True)
 | 
						|
 | 
						|
    @support.requires_gil_enabled("gh-117344: test is flaky without the GIL")
 | 
						|
    def test_idle_thread_reuse(self):
 | 
						|
        executor = self.executor_type()
 | 
						|
        executor.submit(mul, 21, 2).result()
 | 
						|
        executor.submit(mul, 6, 7).result()
 | 
						|
        executor.submit(mul, 3, 14).result()
 | 
						|
        self.assertEqual(len(executor._threads), 1)
 | 
						|
        executor.shutdown(wait=True)
 | 
						|
 | 
						|
    @support.requires_fork()
 | 
						|
    @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
 | 
						|
    @support.requires_resource('cpu')
 | 
						|
    def test_hang_global_shutdown_lock(self):
 | 
						|
        # bpo-45021: _global_shutdown_lock should be reinitialized in the child
 | 
						|
        # process, otherwise it will never exit
 | 
						|
        def submit(pool):
 | 
						|
            pool.submit(submit, pool)
 | 
						|
 | 
						|
        with futures.ThreadPoolExecutor(1) as pool:
 | 
						|
            pool.submit(submit, pool)
 | 
						|
 | 
						|
            for _ in range(50):
 | 
						|
                with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
 | 
						|
                    workers.submit(tuple)
 | 
						|
 | 
						|
    def test_executor_map_current_future_cancel(self):
 | 
						|
        stop_event = threading.Event()
 | 
						|
        log = []
 | 
						|
 | 
						|
        def log_n_wait(ident):
 | 
						|
            log.append(f"{ident=} started")
 | 
						|
            try:
 | 
						|
                stop_event.wait()
 | 
						|
            finally:
 | 
						|
                log.append(f"{ident=} stopped")
 | 
						|
 | 
						|
        with self.executor_type(max_workers=1) as pool:
 | 
						|
            # submit work to saturate the pool
 | 
						|
            fut = pool.submit(log_n_wait, ident="first")
 | 
						|
            try:
 | 
						|
                with contextlib.closing(
 | 
						|
                    pool.map(log_n_wait, ["second", "third"], timeout=0)
 | 
						|
                ) as gen:
 | 
						|
                    with self.assertRaises(TimeoutError):
 | 
						|
                        next(gen)
 | 
						|
            finally:
 | 
						|
                stop_event.set()
 | 
						|
            fut.result()
 | 
						|
        # ident='second' is cancelled as a result of raising a TimeoutError
 | 
						|
        # ident='third' is cancelled because it remained in the collection of futures
 | 
						|
        self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
 | 
						|
 | 
						|
 | 
						|
def setUpModule():
 | 
						|
    setup_module()
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |