mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	The ProcessPoolForkserver combined with resource_tracker starts a thread after forking, which is not supported by TSan. Also skip test_multiprocessing_fork for the same reason
		
			
				
	
	
		
			152 lines
		
	
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			152 lines
		
	
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import contextlib
 | 
						|
import logging
 | 
						|
import queue
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
import sys
 | 
						|
import io
 | 
						|
from concurrent.futures._base import BrokenExecutor
 | 
						|
from concurrent.futures.process import _check_system_limits
 | 
						|
 | 
						|
from logging.handlers import QueueHandler
 | 
						|
 | 
						|
from test import support
 | 
						|
 | 
						|
from .util import ExecutorMixin, create_executor_tests, setup_module
 | 
						|
 | 
						|
 | 
						|
INITIALIZER_STATUS = 'uninitialized'
 | 
						|
 | 
						|
def init(x):
 | 
						|
    global INITIALIZER_STATUS
 | 
						|
    INITIALIZER_STATUS = x
 | 
						|
 | 
						|
def get_init_status():
 | 
						|
    return INITIALIZER_STATUS
 | 
						|
 | 
						|
def init_fail(log_queue=None):
 | 
						|
    if log_queue is not None:
 | 
						|
        logger = logging.getLogger('concurrent.futures')
 | 
						|
        logger.addHandler(QueueHandler(log_queue))
 | 
						|
        logger.setLevel('CRITICAL')
 | 
						|
        logger.propagate = False
 | 
						|
    time.sleep(0.1)  # let some futures be scheduled
 | 
						|
    raise ValueError('error in initializer')
 | 
						|
 | 
						|
 | 
						|
class InitializerMixin(ExecutorMixin):
 | 
						|
    worker_count = 2
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        global INITIALIZER_STATUS
 | 
						|
        INITIALIZER_STATUS = 'uninitialized'
 | 
						|
        self.executor_kwargs = dict(initializer=init,
 | 
						|
                                    initargs=('initialized',))
 | 
						|
        super().setUp()
 | 
						|
 | 
						|
    def test_initializer(self):
 | 
						|
        futures = [self.executor.submit(get_init_status)
 | 
						|
                   for _ in range(self.worker_count)]
 | 
						|
 | 
						|
        for f in futures:
 | 
						|
            self.assertEqual(f.result(), 'initialized')
 | 
						|
 | 
						|
 | 
						|
class FailingInitializerMixin(ExecutorMixin):
 | 
						|
    worker_count = 2
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        if hasattr(self, "ctx"):
 | 
						|
            # Pass a queue to redirect the child's logging output
 | 
						|
            self.mp_context = self.get_context()
 | 
						|
            self.log_queue = self.mp_context.Queue()
 | 
						|
            self.executor_kwargs = dict(initializer=init_fail,
 | 
						|
                                        initargs=(self.log_queue,))
 | 
						|
        else:
 | 
						|
            # In a thread pool, the child shares our logging setup
 | 
						|
            # (see _assert_logged())
 | 
						|
            self.mp_context = None
 | 
						|
            self.log_queue = None
 | 
						|
            self.executor_kwargs = dict(initializer=init_fail)
 | 
						|
        super().setUp()
 | 
						|
 | 
						|
    def test_initializer(self):
 | 
						|
        with self._assert_logged('ValueError: error in initializer'):
 | 
						|
            try:
 | 
						|
                future = self.executor.submit(get_init_status)
 | 
						|
            except BrokenExecutor:
 | 
						|
                # Perhaps the executor is already broken
 | 
						|
                pass
 | 
						|
            else:
 | 
						|
                with self.assertRaises(BrokenExecutor):
 | 
						|
                    future.result()
 | 
						|
 | 
						|
            # At some point, the executor should break
 | 
						|
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT,
 | 
						|
                                            "executor not broken"):
 | 
						|
                if self.executor._broken:
 | 
						|
                    break
 | 
						|
 | 
						|
            # ... and from this point submit() is guaranteed to fail
 | 
						|
            with self.assertRaises(BrokenExecutor):
 | 
						|
                self.executor.submit(get_init_status)
 | 
						|
 | 
						|
    @contextlib.contextmanager
 | 
						|
    def _assert_logged(self, msg):
 | 
						|
        if self.log_queue is not None:
 | 
						|
            yield
 | 
						|
            output = []
 | 
						|
            try:
 | 
						|
                while True:
 | 
						|
                    output.append(self.log_queue.get_nowait().getMessage())
 | 
						|
            except queue.Empty:
 | 
						|
                pass
 | 
						|
        else:
 | 
						|
            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
 | 
						|
                yield
 | 
						|
            output = cm.output
 | 
						|
        self.assertTrue(any(msg in line for line in output),
 | 
						|
                        output)
 | 
						|
 | 
						|
 | 
						|
create_executor_tests(globals(), InitializerMixin)
 | 
						|
create_executor_tests(globals(), FailingInitializerMixin)
 | 
						|
 | 
						|
 | 
						|
@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
 | 
						|
class FailingInitializerResourcesTest(unittest.TestCase):
 | 
						|
    """
 | 
						|
    Source: https://github.com/python/cpython/issues/104090
 | 
						|
    """
 | 
						|
 | 
						|
    def _test(self, test_class):
 | 
						|
        try:
 | 
						|
            _check_system_limits()
 | 
						|
        except NotImplementedError:
 | 
						|
            self.skipTest("ProcessPoolExecutor unavailable on this system")
 | 
						|
 | 
						|
        runner = unittest.TextTestRunner(stream=io.StringIO())
 | 
						|
        runner.run(test_class('test_initializer'))
 | 
						|
 | 
						|
        # GH-104090:
 | 
						|
        # Stop resource tracker manually now, so we can verify there are not leaked resources by checking
 | 
						|
        # the process exit code
 | 
						|
        from multiprocessing.resource_tracker import _resource_tracker
 | 
						|
        _resource_tracker._stop()
 | 
						|
 | 
						|
        self.assertEqual(_resource_tracker._exitcode, 0)
 | 
						|
 | 
						|
    def test_spawn(self):
 | 
						|
        self._test(ProcessPoolSpawnFailingInitializerTest)
 | 
						|
 | 
						|
    @support.skip_if_sanitizer("TSAN doesn't support threads after fork", thread=True)
 | 
						|
    def test_forkserver(self):
 | 
						|
        self._test(ProcessPoolForkserverFailingInitializerTest)
 | 
						|
 | 
						|
 | 
						|
def setUpModule():
 | 
						|
    setup_module()
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |