mirror of
				https://github.com/python/cpython.git
				synced 2025-11-04 07:31:38 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			522 lines
		
	
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			522 lines
		
	
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import unittest
 | 
						|
import string
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import sysconfig
 | 
						|
import os
 | 
						|
import pathlib
 | 
						|
import shutil
 | 
						|
from test import support
 | 
						|
from test.support.script_helper import (
 | 
						|
    make_script,
 | 
						|
    assert_python_failure,
 | 
						|
    assert_python_ok,
 | 
						|
)
 | 
						|
from test.support.os_helper import temp_dir
 | 
						|
 | 
						|
 | 
						|
if not support.has_subprocess_support:
 | 
						|
    raise unittest.SkipTest("test module requires subprocess")
 | 
						|
 | 
						|
if support.check_sanitizer(address=True, memory=True, ub=True):
 | 
						|
    # gh-109580: Skip the test because it does crash randomly if Python is
 | 
						|
    # built with ASAN.
 | 
						|
    raise unittest.SkipTest("test crash randomly on ASAN/MSAN/UBSAN build")
 | 
						|
 | 
						|
 | 
						|
def supports_trampoline_profiling():
 | 
						|
    perf_trampoline = sysconfig.get_config_var("PY_HAVE_PERF_TRAMPOLINE")
 | 
						|
    if not perf_trampoline:
 | 
						|
        return False
 | 
						|
    return int(perf_trampoline) == 1
 | 
						|
 | 
						|
 | 
						|
if not supports_trampoline_profiling():
 | 
						|
    raise unittest.SkipTest("perf trampoline profiling not supported")
 | 
						|
 | 
						|
 | 
						|
class TestPerfTrampoline(unittest.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        super().setUp()
 | 
						|
        self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
 | 
						|
 | 
						|
    def tearDown(self) -> None:
 | 
						|
        super().tearDown()
 | 
						|
        files_to_delete = (
 | 
						|
            set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
 | 
						|
        )
 | 
						|
        for file in files_to_delete:
 | 
						|
            file.unlink()
 | 
						|
 | 
						|
    def test_trampoline_works(self):
 | 
						|
        code = """if 1:
 | 
						|
                def foo():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def bar():
 | 
						|
                    foo()
 | 
						|
 | 
						|
                def baz():
 | 
						|
                    bar()
 | 
						|
 | 
						|
                baz()
 | 
						|
                """
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            with subprocess.Popen(
 | 
						|
                [sys.executable, "-Xperf", script],
 | 
						|
                text=True,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            ) as process:
 | 
						|
                stdout, stderr = process.communicate()
 | 
						|
 | 
						|
        self.assertEqual(stderr, "")
 | 
						|
        self.assertEqual(stdout, "")
 | 
						|
 | 
						|
        perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
 | 
						|
        self.assertTrue(perf_file.exists())
 | 
						|
        perf_file_contents = perf_file.read_text()
 | 
						|
        perf_lines = perf_file_contents.splitlines()
 | 
						|
        expected_symbols = [
 | 
						|
            f"py::foo:{script}",
 | 
						|
            f"py::bar:{script}",
 | 
						|
            f"py::baz:{script}",
 | 
						|
        ]
 | 
						|
        for expected_symbol in expected_symbols:
 | 
						|
            perf_line = next(
 | 
						|
                (line for line in perf_lines if expected_symbol in line), None
 | 
						|
            )
 | 
						|
            self.assertIsNotNone(
 | 
						|
                perf_line, f"Could not find {expected_symbol} in perf file"
 | 
						|
            )
 | 
						|
            perf_addr = perf_line.split(" ")[0]
 | 
						|
            self.assertFalse(
 | 
						|
                perf_addr.startswith("0x"), "Address should not be prefixed with 0x"
 | 
						|
            )
 | 
						|
            self.assertTrue(
 | 
						|
                set(perf_addr).issubset(string.hexdigits),
 | 
						|
                "Address should contain only hex characters",
 | 
						|
            )
 | 
						|
 | 
						|
    def test_trampoline_works_with_forks(self):
 | 
						|
        code = """if 1:
 | 
						|
                import os, sys
 | 
						|
 | 
						|
                def foo_fork():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def bar_fork():
 | 
						|
                    foo_fork()
 | 
						|
 | 
						|
                def baz_fork():
 | 
						|
                    bar_fork()
 | 
						|
 | 
						|
                def foo():
 | 
						|
                    pid = os.fork()
 | 
						|
                    if pid == 0:
 | 
						|
                        print(os.getpid())
 | 
						|
                        baz_fork()
 | 
						|
                    else:
 | 
						|
                        _, status = os.waitpid(-1, 0)
 | 
						|
                        sys.exit(status)
 | 
						|
 | 
						|
                def bar():
 | 
						|
                    foo()
 | 
						|
 | 
						|
                def baz():
 | 
						|
                    bar()
 | 
						|
 | 
						|
                baz()
 | 
						|
                """
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            with subprocess.Popen(
 | 
						|
                [sys.executable, "-Xperf", script],
 | 
						|
                text=True,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            ) as process:
 | 
						|
                stdout, stderr = process.communicate()
 | 
						|
 | 
						|
        self.assertEqual(process.returncode, 0)
 | 
						|
        self.assertEqual(stderr, "")
 | 
						|
        child_pid = int(stdout.strip())
 | 
						|
        perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
 | 
						|
        perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
 | 
						|
        self.assertTrue(perf_file.exists())
 | 
						|
        self.assertTrue(perf_child_file.exists())
 | 
						|
 | 
						|
        perf_file_contents = perf_file.read_text()
 | 
						|
        self.assertIn(f"py::foo:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::baz:{script}", perf_file_contents)
 | 
						|
 | 
						|
        child_perf_file_contents = perf_child_file.read_text()
 | 
						|
        self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
 | 
						|
        self.assertIn(f"py::baz_fork:{script}", child_perf_file_contents)
 | 
						|
 | 
						|
    def test_sys_api(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                def foo():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def spam():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def bar():
 | 
						|
                    sys.deactivate_stack_trampoline()
 | 
						|
                    foo()
 | 
						|
                    sys.activate_stack_trampoline("perf")
 | 
						|
                    spam()
 | 
						|
 | 
						|
                def baz():
 | 
						|
                    bar()
 | 
						|
 | 
						|
                sys.activate_stack_trampoline("perf")
 | 
						|
                baz()
 | 
						|
                """
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            with subprocess.Popen(
 | 
						|
                [sys.executable, script],
 | 
						|
                text=True,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            ) as process:
 | 
						|
                stdout, stderr = process.communicate()
 | 
						|
 | 
						|
        self.assertEqual(stderr, "")
 | 
						|
        self.assertEqual(stdout, "")
 | 
						|
 | 
						|
        perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
 | 
						|
        self.assertTrue(perf_file.exists())
 | 
						|
        perf_file_contents = perf_file.read_text()
 | 
						|
        self.assertNotIn(f"py::foo:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::spam:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::baz:{script}", perf_file_contents)
 | 
						|
 | 
						|
    def test_sys_api_with_existing_trampoline(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                sys.activate_stack_trampoline("perf")
 | 
						|
                sys.activate_stack_trampoline("perf")
 | 
						|
                """
 | 
						|
        assert_python_ok("-c", code)
 | 
						|
 | 
						|
    def test_sys_api_with_invalid_trampoline(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                sys.activate_stack_trampoline("invalid")
 | 
						|
                """
 | 
						|
        rc, out, err = assert_python_failure("-c", code)
 | 
						|
        self.assertIn("invalid backend: invalid", err.decode())
 | 
						|
 | 
						|
    def test_sys_api_get_status(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                sys.activate_stack_trampoline("perf")
 | 
						|
                assert sys.is_stack_trampoline_active() is True
 | 
						|
                sys.deactivate_stack_trampoline()
 | 
						|
                assert sys.is_stack_trampoline_active() is False
 | 
						|
                """
 | 
						|
        assert_python_ok("-c", code)
 | 
						|
 | 
						|
 | 
						|
def is_unwinding_reliable_with_frame_pointers():
 | 
						|
    cflags = sysconfig.get_config_var("PY_CORE_CFLAGS")
 | 
						|
    if not cflags:
 | 
						|
        return False
 | 
						|
    return "no-omit-frame-pointer" in cflags and "_Py_JIT" not in cflags
 | 
						|
 | 
						|
 | 
						|
def perf_command_works():
 | 
						|
    try:
 | 
						|
        cmd = ["perf", "--help"]
 | 
						|
        stdout = subprocess.check_output(cmd, text=True)
 | 
						|
    except (subprocess.SubprocessError, OSError):
 | 
						|
        return False
 | 
						|
 | 
						|
    # perf version does not return a version number on Fedora. Use presence
 | 
						|
    # of "perf.data" in help as indicator that it's perf from Linux tools.
 | 
						|
    if "perf.data" not in stdout:
 | 
						|
        return False
 | 
						|
 | 
						|
    # Check that we can run a simple perf run
 | 
						|
    with temp_dir() as script_dir:
 | 
						|
        try:
 | 
						|
            output_file = script_dir + "/perf_output.perf"
 | 
						|
            cmd = (
 | 
						|
                "perf",
 | 
						|
                "record",
 | 
						|
                "-g",
 | 
						|
                "--call-graph=fp",
 | 
						|
                "-o",
 | 
						|
                output_file,
 | 
						|
                "--",
 | 
						|
                sys.executable,
 | 
						|
                "-c",
 | 
						|
                'print("hello")',
 | 
						|
            )
 | 
						|
            stdout = subprocess.check_output(
 | 
						|
                cmd, cwd=script_dir, text=True, stderr=subprocess.STDOUT
 | 
						|
            )
 | 
						|
        except (subprocess.SubprocessError, OSError):
 | 
						|
            return False
 | 
						|
 | 
						|
        if "hello" not in stdout:
 | 
						|
            return False
 | 
						|
 | 
						|
    return True
 | 
						|
 | 
						|
 | 
						|
def run_perf(cwd, *args, use_jit=False, **env_vars):
 | 
						|
    if env_vars:
 | 
						|
        env = os.environ.copy()
 | 
						|
        env.update(env_vars)
 | 
						|
    else:
 | 
						|
        env = None
 | 
						|
    output_file = cwd + "/perf_output.perf"
 | 
						|
    if not use_jit:
 | 
						|
        base_cmd = ("perf", "record", "-g", "--call-graph=fp", "-o", output_file, "--")
 | 
						|
    else:
 | 
						|
        base_cmd = (
 | 
						|
            "perf",
 | 
						|
            "record",
 | 
						|
            "-g",
 | 
						|
            "--call-graph=dwarf,65528",
 | 
						|
            "-F99",
 | 
						|
            "-k1",
 | 
						|
            "-o",
 | 
						|
            output_file,
 | 
						|
            "--",
 | 
						|
        )
 | 
						|
    proc = subprocess.run(
 | 
						|
        base_cmd + args,
 | 
						|
        stdout=subprocess.PIPE,
 | 
						|
        stderr=subprocess.PIPE,
 | 
						|
        env=env,
 | 
						|
    )
 | 
						|
    if proc.returncode:
 | 
						|
        print(proc.stderr, file=sys.stderr)
 | 
						|
        raise ValueError(f"Perf failed with return code {proc.returncode}")
 | 
						|
 | 
						|
    if use_jit:
 | 
						|
        jit_output_file = cwd + "/jit_output.dump"
 | 
						|
        command = ("perf", "inject", "-j", "-i", output_file, "-o", jit_output_file)
 | 
						|
        proc = subprocess.run(
 | 
						|
            command, stderr=subprocess.PIPE, stdout=subprocess.PIPE, env=env
 | 
						|
        )
 | 
						|
        if proc.returncode:
 | 
						|
            print(proc.stderr)
 | 
						|
            raise ValueError(f"Perf failed with return code {proc.returncode}")
 | 
						|
        # Copy the jit_output_file to the output_file
 | 
						|
        os.rename(jit_output_file, output_file)
 | 
						|
 | 
						|
    base_cmd = ("perf", "script")
 | 
						|
    proc = subprocess.run(
 | 
						|
        ("perf", "script", "-i", output_file),
 | 
						|
        stdout=subprocess.PIPE,
 | 
						|
        stderr=subprocess.PIPE,
 | 
						|
        env=env,
 | 
						|
        check=True,
 | 
						|
    )
 | 
						|
    return proc.stdout.decode("utf-8", "replace"), proc.stderr.decode(
 | 
						|
        "utf-8", "replace"
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
class TestPerfProfilerMixin:
 | 
						|
    def run_perf(self, script_dir, perf_mode, script):
 | 
						|
        raise NotImplementedError()
 | 
						|
 | 
						|
    def test_python_calls_appear_in_the_stack_if_perf_activated(self):
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            code = """if 1:
 | 
						|
                def foo(n):
 | 
						|
                    x = 0
 | 
						|
                    for i in range(n):
 | 
						|
                        x += i
 | 
						|
 | 
						|
                def bar(n):
 | 
						|
                    foo(n)
 | 
						|
 | 
						|
                def baz(n):
 | 
						|
                    bar(n)
 | 
						|
 | 
						|
                baz(10000000)
 | 
						|
                """
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            stdout, stderr = self.run_perf(script_dir, script)
 | 
						|
            self.assertEqual(stderr, "")
 | 
						|
 | 
						|
            self.assertIn(f"py::foo:{script}", stdout)
 | 
						|
            self.assertIn(f"py::bar:{script}", stdout)
 | 
						|
            self.assertIn(f"py::baz:{script}", stdout)
 | 
						|
 | 
						|
    def test_python_calls_do_not_appear_in_the_stack_if_perf_deactivated(self):
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            code = """if 1:
 | 
						|
                def foo(n):
 | 
						|
                    x = 0
 | 
						|
                    for i in range(n):
 | 
						|
                        x += i
 | 
						|
 | 
						|
                def bar(n):
 | 
						|
                    foo(n)
 | 
						|
 | 
						|
                def baz(n):
 | 
						|
                    bar(n)
 | 
						|
 | 
						|
                baz(10000000)
 | 
						|
                """
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            stdout, stderr = self.run_perf(
 | 
						|
                script_dir, script, activate_trampoline=False
 | 
						|
            )
 | 
						|
            self.assertEqual(stderr, "")
 | 
						|
 | 
						|
            self.assertNotIn(f"py::foo:{script}", stdout)
 | 
						|
            self.assertNotIn(f"py::bar:{script}", stdout)
 | 
						|
            self.assertNotIn(f"py::baz:{script}", stdout)
 | 
						|
 | 
						|
@unittest.skipUnless(perf_command_works(), "perf command doesn't work")
 | 
						|
@unittest.skipUnless(
 | 
						|
    is_unwinding_reliable_with_frame_pointers(),
 | 
						|
    "Unwinding is unreliable with frame pointers",
 | 
						|
)
 | 
						|
class TestPerfProfiler(unittest.TestCase, TestPerfProfilerMixin):
 | 
						|
    def run_perf(self, script_dir, script, activate_trampoline=True):
 | 
						|
        if activate_trampoline:
 | 
						|
            return run_perf(script_dir, sys.executable, "-Xperf", script)
 | 
						|
        return run_perf(script_dir, sys.executable, script)
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super().setUp()
 | 
						|
        self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
 | 
						|
 | 
						|
    def tearDown(self) -> None:
 | 
						|
        super().tearDown()
 | 
						|
        files_to_delete = (
 | 
						|
            set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
 | 
						|
        )
 | 
						|
        for file in files_to_delete:
 | 
						|
            file.unlink()
 | 
						|
 | 
						|
    def test_pre_fork_compile(self):
 | 
						|
        code = """if 1:
 | 
						|
                import sys
 | 
						|
                import os
 | 
						|
                import sysconfig
 | 
						|
                from _testinternalcapi import (
 | 
						|
                    compile_perf_trampoline_entry,
 | 
						|
                    perf_trampoline_set_persist_after_fork,
 | 
						|
                )
 | 
						|
 | 
						|
                def foo_fork():
 | 
						|
                    pass
 | 
						|
 | 
						|
                def bar_fork():
 | 
						|
                    foo_fork()
 | 
						|
 | 
						|
                def foo():
 | 
						|
                    import time; time.sleep(1)
 | 
						|
 | 
						|
                def bar():
 | 
						|
                    foo()
 | 
						|
 | 
						|
                def compile_trampolines_for_all_functions():
 | 
						|
                    perf_trampoline_set_persist_after_fork(1)
 | 
						|
                    for _, obj in globals().items():
 | 
						|
                        if callable(obj) and hasattr(obj, '__code__'):
 | 
						|
                            compile_perf_trampoline_entry(obj.__code__)
 | 
						|
 | 
						|
                if __name__ == "__main__":
 | 
						|
                    compile_trampolines_for_all_functions()
 | 
						|
                    pid = os.fork()
 | 
						|
                    if pid == 0:
 | 
						|
                        print(os.getpid())
 | 
						|
                        bar_fork()
 | 
						|
                    else:
 | 
						|
                        bar()
 | 
						|
                """
 | 
						|
 | 
						|
        with temp_dir() as script_dir:
 | 
						|
            script = make_script(script_dir, "perftest", code)
 | 
						|
            with subprocess.Popen(
 | 
						|
                [sys.executable, "-Xperf", script],
 | 
						|
                universal_newlines=True,
 | 
						|
                stderr=subprocess.PIPE,
 | 
						|
                stdout=subprocess.PIPE,
 | 
						|
            ) as process:
 | 
						|
                stdout, stderr = process.communicate()
 | 
						|
 | 
						|
        self.assertEqual(process.returncode, 0)
 | 
						|
        self.assertNotIn("Error:", stderr)
 | 
						|
        child_pid = int(stdout.strip())
 | 
						|
        perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
 | 
						|
        perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
 | 
						|
        self.assertTrue(perf_file.exists())
 | 
						|
        self.assertTrue(perf_child_file.exists())
 | 
						|
 | 
						|
        perf_file_contents = perf_file.read_text()
 | 
						|
        self.assertIn(f"py::foo:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::foo_fork:{script}", perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar_fork:{script}", perf_file_contents)
 | 
						|
 | 
						|
        child_perf_file_contents = perf_child_file.read_text()
 | 
						|
        self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
 | 
						|
        self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
 | 
						|
 | 
						|
        # Pre-compiled perf-map entries of a forked process must be
 | 
						|
        # identical in both the parent and child perf-map files.
 | 
						|
        perf_file_lines = perf_file_contents.split("\n")
 | 
						|
        for line in perf_file_lines:
 | 
						|
            if f"py::foo_fork:{script}" in line or f"py::bar_fork:{script}" in line:
 | 
						|
                self.assertIn(line, child_perf_file_contents)
 | 
						|
 | 
						|
 | 
						|
def _is_perf_vesion_at_least(major, minor):
 | 
						|
    # The output of perf --version looks like "perf version 6.7-3" but
 | 
						|
    # it can also be perf version "perf version 5.15.143"
 | 
						|
    try:
 | 
						|
        output = subprocess.check_output(["perf", "--version"], text=True)
 | 
						|
    except (subprocess.CalledProcessError, FileNotFoundError):
 | 
						|
        return False
 | 
						|
    version = output.split()[2]
 | 
						|
    version = version.split("-")[0]
 | 
						|
    version = version.split(".")
 | 
						|
    version = tuple(map(int, version))
 | 
						|
    return version >= (major, minor)
 | 
						|
 | 
						|
 | 
						|
@unittest.skipUnless(perf_command_works(), "perf command doesn't work")
 | 
						|
@unittest.skipUnless(_is_perf_vesion_at_least(6, 6), "perf command may not work due to a perf bug")
 | 
						|
class TestPerfProfilerWithDwarf(unittest.TestCase, TestPerfProfilerMixin):
 | 
						|
    def run_perf(self, script_dir, script, activate_trampoline=True):
 | 
						|
        if activate_trampoline:
 | 
						|
            return run_perf(
 | 
						|
                script_dir, sys.executable, "-Xperf_jit", script, use_jit=True
 | 
						|
            )
 | 
						|
        return run_perf(script_dir, sys.executable, script, use_jit=True)
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super().setUp()
 | 
						|
        self.perf_files = set(pathlib.Path("/tmp/").glob("jit*.dump"))
 | 
						|
        self.perf_files |= set(pathlib.Path("/tmp/").glob("jitted-*.so"))
 | 
						|
 | 
						|
    def tearDown(self) -> None:
 | 
						|
        super().tearDown()
 | 
						|
        files_to_delete = set(pathlib.Path("/tmp/").glob("jit*.dump"))
 | 
						|
        files_to_delete |= set(pathlib.Path("/tmp/").glob("jitted-*.so"))
 | 
						|
        files_to_delete = files_to_delete - self.perf_files
 | 
						|
        for file in files_to_delete:
 | 
						|
            file.unlink()
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    unittest.main()
 |