mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 23:21:29 +00:00 
			
		
		
		
	Co-authored-by: Victor Stinner <vstinner@python.org> Co-authored-by: Kumar Aditya <kumaraditya@python.org>
		
			
				
	
	
		
			138 lines
		
	
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			138 lines
		
	
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import atexit
 | 
						|
import os
 | 
						|
import textwrap
 | 
						|
import unittest
 | 
						|
from test import support
 | 
						|
from test.support import script_helper
 | 
						|
from test.support import threading_helper
 | 
						|
 | 
						|
class GeneralTest(unittest.TestCase):
 | 
						|
    def test_general(self):
 | 
						|
        # Run _test_atexit.py in a subprocess since it calls atexit._clear()
 | 
						|
        script = support.findfile("_test_atexit.py")
 | 
						|
        script_helper.run_test_script(script)
 | 
						|
 | 
						|
class FunctionalTest(unittest.TestCase):
 | 
						|
    def test_shutdown(self):
 | 
						|
        # Actually test the shutdown mechanism in a subprocess
 | 
						|
        code = textwrap.dedent("""
 | 
						|
            import atexit
 | 
						|
 | 
						|
            def f(msg):
 | 
						|
                print(msg)
 | 
						|
 | 
						|
            atexit.register(f, "one")
 | 
						|
            atexit.register(f, "two")
 | 
						|
        """)
 | 
						|
        res = script_helper.assert_python_ok("-c", code)
 | 
						|
        self.assertEqual(res.out.decode().splitlines(), ["two", "one"])
 | 
						|
        self.assertFalse(res.err)
 | 
						|
 | 
						|
    def test_atexit_instances(self):
 | 
						|
        # bpo-42639: It is safe to have more than one atexit instance.
 | 
						|
        code = textwrap.dedent("""
 | 
						|
            import sys
 | 
						|
            import atexit as atexit1
 | 
						|
            del sys.modules['atexit']
 | 
						|
            import atexit as atexit2
 | 
						|
            del sys.modules['atexit']
 | 
						|
 | 
						|
            assert atexit2 is not atexit1
 | 
						|
 | 
						|
            atexit1.register(print, "atexit1")
 | 
						|
            atexit2.register(print, "atexit2")
 | 
						|
        """)
 | 
						|
        res = script_helper.assert_python_ok("-c", code)
 | 
						|
        self.assertEqual(res.out.decode().splitlines(), ["atexit2", "atexit1"])
 | 
						|
        self.assertFalse(res.err)
 | 
						|
 | 
						|
    @threading_helper.requires_working_threading()
 | 
						|
    @support.requires_resource("cpu")
 | 
						|
    @unittest.skipUnless(support.Py_GIL_DISABLED, "only meaningful without the GIL")
 | 
						|
    def test_atexit_thread_safety(self):
 | 
						|
        # GH-126907: atexit was not thread safe on the free-threaded build
 | 
						|
        source = """
 | 
						|
        from threading import Thread
 | 
						|
 | 
						|
        def dummy():
 | 
						|
            pass
 | 
						|
 | 
						|
 | 
						|
        def thready():
 | 
						|
            for _ in range(100):
 | 
						|
                atexit.register(dummy)
 | 
						|
                atexit._clear()
 | 
						|
                atexit.register(dummy)
 | 
						|
                atexit.unregister(dummy)
 | 
						|
                atexit._run_exitfuncs()
 | 
						|
 | 
						|
 | 
						|
        threads = [Thread(target=thready) for _ in range(10)]
 | 
						|
        for thread in threads:
 | 
						|
            thread.start()
 | 
						|
 | 
						|
        for thread in threads:
 | 
						|
            thread.join()
 | 
						|
        """
 | 
						|
 | 
						|
        # atexit._clear() has some evil side effects, and we don't
 | 
						|
        # want them to affect the rest of the tests.
 | 
						|
        script_helper.assert_python_ok("-c", textwrap.dedent(source))
 | 
						|
 | 
						|
 | 
						|
@support.cpython_only
 | 
						|
class SubinterpreterTest(unittest.TestCase):
 | 
						|
 | 
						|
    def test_callbacks_leak(self):
 | 
						|
        # This test shows a leak in refleak mode if atexit doesn't
 | 
						|
        # take care to free callbacks in its per-subinterpreter module
 | 
						|
        # state.
 | 
						|
        n = atexit._ncallbacks()
 | 
						|
        code = textwrap.dedent(r"""
 | 
						|
            import atexit
 | 
						|
            def f():
 | 
						|
                pass
 | 
						|
            atexit.register(f)
 | 
						|
            del atexit
 | 
						|
        """)
 | 
						|
        ret = support.run_in_subinterp(code)
 | 
						|
        self.assertEqual(ret, 0)
 | 
						|
        self.assertEqual(atexit._ncallbacks(), n)
 | 
						|
 | 
						|
    def test_callbacks_leak_refcycle(self):
 | 
						|
        # Similar to the above, but with a refcycle through the atexit
 | 
						|
        # module.
 | 
						|
        n = atexit._ncallbacks()
 | 
						|
        code = textwrap.dedent(r"""
 | 
						|
            import atexit
 | 
						|
            def f():
 | 
						|
                pass
 | 
						|
            atexit.register(f)
 | 
						|
            atexit.__atexit = atexit
 | 
						|
        """)
 | 
						|
        ret = support.run_in_subinterp(code)
 | 
						|
        self.assertEqual(ret, 0)
 | 
						|
        self.assertEqual(atexit._ncallbacks(), n)
 | 
						|
 | 
						|
    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
 | 
						|
    def test_callback_on_subinterpreter_teardown(self):
 | 
						|
        # This tests if a callback is called on
 | 
						|
        # subinterpreter teardown.
 | 
						|
        expected = b"The test has passed!"
 | 
						|
        r, w = os.pipe()
 | 
						|
 | 
						|
        code = textwrap.dedent(r"""
 | 
						|
            import os
 | 
						|
            import atexit
 | 
						|
            def callback():
 | 
						|
                os.write({:d}, b"The test has passed!")
 | 
						|
            atexit.register(callback)
 | 
						|
        """.format(w))
 | 
						|
        ret = support.run_in_subinterp(code)
 | 
						|
        os.close(w)
 | 
						|
        self.assertEqual(os.read(r, len(expected)), expected)
 | 
						|
        os.close(r)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |