mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			84 lines
		
	
	
	
		
			3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			84 lines
		
	
	
	
		
			3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import unittest
 | 
						|
import os
 | 
						|
import textwrap
 | 
						|
import importlib
 | 
						|
import sys
 | 
						|
from test.support import os_helper, SHORT_TIMEOUT
 | 
						|
from test.support.script_helper import make_script
 | 
						|
 | 
						|
import subprocess
 | 
						|
 | 
						|
PROCESS_VM_READV_SUPPORTED = False
 | 
						|
 | 
						|
try:
 | 
						|
    from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
 | 
						|
    from _testexternalinspection import get_stack_trace
 | 
						|
except ImportError:
 | 
						|
    raise unittest.SkipTest("Test only runs when _testexternalinspection is available")
 | 
						|
 | 
						|
def _make_test_script(script_dir, script_basename, source):
 | 
						|
    to_return = make_script(script_dir, script_basename, source)
 | 
						|
    importlib.invalidate_caches()
 | 
						|
    return to_return
 | 
						|
 | 
						|
class TestGetStackTrace(unittest.TestCase):
 | 
						|
 | 
						|
    @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
 | 
						|
    @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
 | 
						|
    def test_remote_stack_trace(self):
 | 
						|
        # Spawn a process with some realistic Python code
 | 
						|
        script = textwrap.dedent("""\
 | 
						|
            import time, sys, os
 | 
						|
            def bar():
 | 
						|
                for x in range(100):
 | 
						|
                    if x == 50:
 | 
						|
                        baz()
 | 
						|
            def baz():
 | 
						|
                foo()
 | 
						|
 | 
						|
            def foo():
 | 
						|
                fifo = sys.argv[1]
 | 
						|
                with open(sys.argv[1], "w") as fifo:
 | 
						|
                    fifo.write("ready")
 | 
						|
                time.sleep(1000)
 | 
						|
 | 
						|
            bar()
 | 
						|
            """)
 | 
						|
        stack_trace = None
 | 
						|
        with os_helper.temp_dir() as work_dir:
 | 
						|
            script_dir = os.path.join(work_dir, "script_pkg")
 | 
						|
            os.mkdir(script_dir)
 | 
						|
            fifo = f"{work_dir}/the_fifo"
 | 
						|
            os.mkfifo(fifo)
 | 
						|
            script_name = _make_test_script(script_dir, 'script', script)
 | 
						|
            try:
 | 
						|
                p = subprocess.Popen([sys.executable, script_name,  str(fifo)])
 | 
						|
                with open(fifo, "r") as fifo_file:
 | 
						|
                    response = fifo_file.read()
 | 
						|
                self.assertEqual(response, "ready")
 | 
						|
                stack_trace = get_stack_trace(p.pid)
 | 
						|
            except PermissionError:
 | 
						|
                self.skipTest("Insufficient permissions to read the stack trace")
 | 
						|
            finally:
 | 
						|
                os.remove(fifo)
 | 
						|
                p.kill()
 | 
						|
                p.terminate()
 | 
						|
                p.wait(timeout=SHORT_TIMEOUT)
 | 
						|
 | 
						|
 | 
						|
            expected_stack_trace = [
 | 
						|
                'foo',
 | 
						|
                'baz',
 | 
						|
                'bar',
 | 
						|
                '<module>'
 | 
						|
            ]
 | 
						|
            self.assertEqual(stack_trace, expected_stack_trace)
 | 
						|
 | 
						|
    @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
 | 
						|
    @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
 | 
						|
    def test_self_trace(self):
 | 
						|
        stack_trace = get_stack_trace(os.getpid())
 | 
						|
        self.assertEqual(stack_trace[0], "test_self_trace")
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |