mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 18:54:53 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			105 lines
		
	
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			105 lines
		
	
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import atexit
 | |
| import os
 | |
| import textwrap
 | |
| import unittest
 | |
| from test import support
 | |
| from test.support import script_helper
 | |
| 
 | |
| 
 | |
| class GeneralTest(unittest.TestCase):
 | |
|     def test_general(self):
 | |
|         # Run _test_atexit.py in a subprocess since it calls atexit._clear()
 | |
|         script = support.findfile("_test_atexit.py")
 | |
|         script_helper.run_test_script(script)
 | |
| 
 | |
| class FunctionalTest(unittest.TestCase):
 | |
|     def test_shutdown(self):
 | |
|         # Actually test the shutdown mechanism in a subprocess
 | |
|         code = textwrap.dedent("""
 | |
|             import atexit
 | |
| 
 | |
|             def f(msg):
 | |
|                 print(msg)
 | |
| 
 | |
|             atexit.register(f, "one")
 | |
|             atexit.register(f, "two")
 | |
|         """)
 | |
|         res = script_helper.assert_python_ok("-c", code)
 | |
|         self.assertEqual(res.out.decode().splitlines(), ["two", "one"])
 | |
|         self.assertFalse(res.err)
 | |
| 
 | |
|     def test_atexit_instances(self):
 | |
|         # bpo-42639: It is safe to have more than one atexit instance.
 | |
|         code = textwrap.dedent("""
 | |
|             import sys
 | |
|             import atexit as atexit1
 | |
|             del sys.modules['atexit']
 | |
|             import atexit as atexit2
 | |
|             del sys.modules['atexit']
 | |
| 
 | |
|             assert atexit2 is not atexit1
 | |
| 
 | |
|             atexit1.register(print, "atexit1")
 | |
|             atexit2.register(print, "atexit2")
 | |
|         """)
 | |
|         res = script_helper.assert_python_ok("-c", code)
 | |
|         self.assertEqual(res.out.decode().splitlines(), ["atexit2", "atexit1"])
 | |
|         self.assertFalse(res.err)
 | |
| 
 | |
| 
 | |
| @support.cpython_only
 | |
| class SubinterpreterTest(unittest.TestCase):
 | |
| 
 | |
|     def test_callbacks_leak(self):
 | |
|         # This test shows a leak in refleak mode if atexit doesn't
 | |
|         # take care to free callbacks in its per-subinterpreter module
 | |
|         # state.
 | |
|         n = atexit._ncallbacks()
 | |
|         code = textwrap.dedent(r"""
 | |
|             import atexit
 | |
|             def f():
 | |
|                 pass
 | |
|             atexit.register(f)
 | |
|             del atexit
 | |
|         """)
 | |
|         ret = support.run_in_subinterp(code)
 | |
|         self.assertEqual(ret, 0)
 | |
|         self.assertEqual(atexit._ncallbacks(), n)
 | |
| 
 | |
|     def test_callbacks_leak_refcycle(self):
 | |
|         # Similar to the above, but with a refcycle through the atexit
 | |
|         # module.
 | |
|         n = atexit._ncallbacks()
 | |
|         code = textwrap.dedent(r"""
 | |
|             import atexit
 | |
|             def f():
 | |
|                 pass
 | |
|             atexit.register(f)
 | |
|             atexit.__atexit = atexit
 | |
|         """)
 | |
|         ret = support.run_in_subinterp(code)
 | |
|         self.assertEqual(ret, 0)
 | |
|         self.assertEqual(atexit._ncallbacks(), n)
 | |
| 
 | |
|     @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
 | |
|     def test_callback_on_subinterpreter_teardown(self):
 | |
|         # This tests if a callback is called on
 | |
|         # subinterpreter teardown.
 | |
|         expected = b"The test has passed!"
 | |
|         r, w = os.pipe()
 | |
| 
 | |
|         code = textwrap.dedent(r"""
 | |
|             import os
 | |
|             import atexit
 | |
|             def callback():
 | |
|                 os.write({:d}, b"The test has passed!")
 | |
|             atexit.register(callback)
 | |
|         """.format(w))
 | |
|         ret = support.run_in_subinterp(code)
 | |
|         os.close(w)
 | |
|         self.assertEqual(os.read(r, len(expected)), expected)
 | |
|         os.close(r)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 | 
