mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 05:31:20 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			173 lines
		
	
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			173 lines
		
	
	
	
		
			5.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import re
 | |
| import sys
 | |
| import textwrap
 | |
| import unittest
 | |
| from dataclasses import dataclass
 | |
| from functools import cache
 | |
| from test import support
 | |
| from test.support.script_helper import run_python_until_end
 | |
| 
 | |
| _strace_binary = "/usr/bin/strace"
 | |
| _syscall_regex = re.compile(
 | |
|     r"(?P<syscall>[^(]*)\((?P<args>[^)]*)\)\s*[=]\s*(?P<returncode>.+)")
 | |
| _returncode_regex = re.compile(
 | |
|     br"\+\+\+ exited with (?P<returncode>\d+) \+\+\+")
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class StraceEvent:
 | |
|     syscall: str
 | |
|     args: list[str]
 | |
|     returncode: str
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class StraceResult:
 | |
|     strace_returncode: int
 | |
|     python_returncode: int
 | |
| 
 | |
|     """The event messages generated by strace. This is very similar to the
 | |
|     stderr strace produces with returncode marker section removed."""
 | |
|     event_bytes: bytes
 | |
|     stdout: bytes
 | |
|     stderr: bytes
 | |
| 
 | |
|     def events(self):
 | |
|         """Parse event_bytes data into system calls for easier processing.
 | |
| 
 | |
|         This assumes the program under inspection doesn't print any non-utf8
 | |
|         strings which would mix into the strace output."""
 | |
|         decoded_events = self.event_bytes.decode('utf-8')
 | |
|         matches = [
 | |
|             _syscall_regex.match(event)
 | |
|             for event in decoded_events.splitlines()
 | |
|         ]
 | |
|         return [
 | |
|             StraceEvent(match["syscall"],
 | |
|                         [arg.strip() for arg in (match["args"].split(","))],
 | |
|                         match["returncode"]) for match in matches if match
 | |
|         ]
 | |
| 
 | |
|     def sections(self):
 | |
|         """Find all "MARK <X>" writes and use them to make groups of events.
 | |
| 
 | |
|         This is useful to avoid variable / overhead events, like those at
 | |
|         interpreter startup or when opening a file so a test can verify just
 | |
|         the small case under study."""
 | |
|         current_section = "__startup"
 | |
|         sections = {current_section: []}
 | |
|         for event in self.events():
 | |
|             if event.syscall == 'write' and len(
 | |
|                     event.args) > 2 and event.args[1].startswith("\"MARK "):
 | |
|                 # Found a new section, don't include the write in the section
 | |
|                 # but all events until next mark should be in that section
 | |
|                 current_section = event.args[1].split(
 | |
|                     " ", 1)[1].removesuffix('\\n"')
 | |
|                 if current_section not in sections:
 | |
|                     sections[current_section] = list()
 | |
|             else:
 | |
|                 sections[current_section].append(event)
 | |
| 
 | |
|         return sections
 | |
| 
 | |
| 
 | |
| @support.requires_subprocess()
 | |
| def strace_python(code, strace_flags, check=True):
 | |
|     """Run strace and return the trace.
 | |
| 
 | |
|     Sets strace_returncode and python_returncode to `-1` on error."""
 | |
|     res = None
 | |
| 
 | |
|     def _make_error(reason, details):
 | |
|         return StraceResult(
 | |
|             strace_returncode=-1,
 | |
|             python_returncode=-1,
 | |
|             event_bytes=f"error({reason},details={details}) = -1".encode('utf-8'),
 | |
|             stdout=res.out if res else b"",
 | |
|             stderr=res.err if res else b"")
 | |
| 
 | |
|     # Run strace, and get out the raw text
 | |
|     try:
 | |
|         res, cmd_line = run_python_until_end(
 | |
|             "-c",
 | |
|             textwrap.dedent(code),
 | |
|             __run_using_command=[_strace_binary] + strace_flags,
 | |
|             # Don't want to trace our JIT's own mmap and mprotect calls:
 | |
|             PYTHON_JIT="0",
 | |
|         )
 | |
|     except OSError as err:
 | |
|         return _make_error("Caught OSError", err)
 | |
| 
 | |
|     if check and res.rc:
 | |
|         res.fail(cmd_line)
 | |
| 
 | |
|     # Get out program returncode
 | |
|     stripped = res.err.strip()
 | |
|     output = stripped.rsplit(b"\n", 1)
 | |
|     if len(output) != 2:
 | |
|         return _make_error("Expected strace events and exit code line",
 | |
|                            stripped[-50:])
 | |
| 
 | |
|     returncode_match = _returncode_regex.match(output[1])
 | |
|     if not returncode_match:
 | |
|         return _make_error("Expected to find returncode in last line.",
 | |
|                            output[1][:50])
 | |
| 
 | |
|     python_returncode = int(returncode_match["returncode"])
 | |
|     if check and python_returncode:
 | |
|         res.fail(cmd_line)
 | |
| 
 | |
|     return StraceResult(strace_returncode=res.rc,
 | |
|                         python_returncode=python_returncode,
 | |
|                         event_bytes=output[0],
 | |
|                         stdout=res.out,
 | |
|                         stderr=res.err)
 | |
| 
 | |
| 
 | |
| def get_events(code, strace_flags, prelude, cleanup):
 | |
|     # NOTE: The flush is currently required to prevent the prints from getting
 | |
|     # buffered and done all at once at exit
 | |
|     prelude = textwrap.dedent(prelude)
 | |
|     code = textwrap.dedent(code)
 | |
|     cleanup = textwrap.dedent(cleanup)
 | |
|     to_run = f"""
 | |
| print("MARK prelude", flush=True)
 | |
| {prelude}
 | |
| print("MARK code", flush=True)
 | |
| {code}
 | |
| print("MARK cleanup", flush=True)
 | |
| {cleanup}
 | |
| print("MARK __shutdown", flush=True)
 | |
|     """
 | |
|     trace = strace_python(to_run, strace_flags)
 | |
|     all_sections = trace.sections()
 | |
|     return all_sections['code']
 | |
| 
 | |
| 
 | |
| def get_syscalls(code, strace_flags, prelude="", cleanup=""):
 | |
|     """Get the syscalls which a given chunk of python code generates"""
 | |
|     events = get_events(code, strace_flags, prelude=prelude, cleanup=cleanup)
 | |
|     return [ev.syscall for ev in events]
 | |
| 
 | |
| 
 | |
| # Moderately expensive (spawns a subprocess), so share results when possible.
 | |
| @cache
 | |
| def _can_strace():
 | |
|     res = strace_python("import sys; sys.exit(0)", [], check=False)
 | |
|     assert res.events(), "Should have parsed multiple calls"
 | |
| 
 | |
|     return res.strace_returncode == 0 and res.python_returncode == 0
 | |
| 
 | |
| 
 | |
| def requires_strace():
 | |
|     if sys.platform != "linux":
 | |
|         return unittest.skip("Linux only, requires strace.")
 | |
| 
 | |
|     if support.check_sanitizer(address=True, memory=True):
 | |
|         return unittest.skip("LeakSanitizer does not work under ptrace (strace, gdb, etc)")
 | |
| 
 | |
|     return unittest.skipUnless(_can_strace(), "Requires working strace")
 | |
| 
 | |
| 
 | |
| __all__ = ["get_events", "get_syscalls", "requires_strace", "strace_python",
 | |
|            "StraceEvent", "StraceResult"]
 | 
