mirror of
https://github.com/python/cpython.git
synced 2026-05-05 10:01:07 +00:00
236 lines
8.3 KiB
Python
236 lines
8.3 KiB
Python
"""Utilities for deriving JIT unwind information from DWARF CFI."""
|
|
|
|
import dataclasses
|
|
import pathlib
|
|
import re
|
|
import typing
|
|
|
|
_LLVMRun = typing.Callable[..., typing.Awaitable[str]]
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class UnwindInfo:
|
|
code_alignment_factor: int
|
|
data_alignment_factor: int
|
|
return_address_register: int
|
|
cfa_register: int
|
|
cfa_offset: int
|
|
frame_pointer_register: int
|
|
frame_pointer_offset: int
|
|
return_address_offset: int
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class ELFUnwindConfig:
|
|
frame_pointer: str
|
|
return_address: str
|
|
register_numbers: typing.Mapping[str, int]
|
|
call_instruction_prefixes: tuple[str, ...]
|
|
|
|
def is_call_instruction(self, instruction: str) -> bool:
|
|
return instruction.startswith(self.call_instruction_prefixes)
|
|
|
|
|
|
@dataclasses.dataclass(frozen=True)
|
|
class _UnwindRow:
|
|
pc: int
|
|
cfa_register: str
|
|
cfa_offset: int
|
|
saved_registers: dict[str, int]
|
|
|
|
|
|
class ELFUnwindInfo:
|
|
def __init__(
|
|
self,
|
|
target_name: str,
|
|
*,
|
|
config: ELFUnwindConfig,
|
|
verbose: bool = False,
|
|
llvm_version: str,
|
|
llvm_tools_install_dir: str | None = None,
|
|
llvm_run: _LLVMRun,
|
|
) -> None:
|
|
self.target_name = target_name
|
|
self.config = config
|
|
self.verbose = verbose
|
|
self.llvm_version = llvm_version
|
|
self.llvm_tools_install_dir = llvm_tools_install_dir
|
|
self.llvm_run = llvm_run
|
|
|
|
@staticmethod
|
|
def _parse_dwarfdump_int(
|
|
dump: str, field: str, *, required: bool = True
|
|
) -> int | None:
|
|
match = re.search(rf"^\s*{field}:\s+(-?\d+)$", dump, re.MULTILINE)
|
|
if match is None:
|
|
if required:
|
|
raise ValueError(f"missing {field} in llvm-dwarfdump output")
|
|
return None
|
|
return int(match.group(1))
|
|
|
|
@staticmethod
|
|
def _parse_dwarfdump_rows(dump: str) -> list[_UnwindRow]:
|
|
row_pattern = re.compile(
|
|
r"^\s*0x(?P<pc>[0-9a-f]+):\s+"
|
|
r"CFA=(?P<cfa_register>[A-Z][A-Z0-9]*)"
|
|
r"(?P<cfa_offset>[+-]\d+)?"
|
|
r"(?::\s*(?P<saved>.*))?$"
|
|
)
|
|
saved_pattern = re.compile(
|
|
r"(?P<register>[A-Z][A-Z0-9]*)=\[CFA(?P<offset>[+-]\d+)?\]"
|
|
)
|
|
rows = []
|
|
for line in dump.splitlines():
|
|
row_match = row_pattern.match(line)
|
|
if row_match is None:
|
|
continue
|
|
saved_registers = {}
|
|
saved = row_match["saved"]
|
|
if saved:
|
|
for saved_match in saved_pattern.finditer(saved):
|
|
offset = saved_match["offset"]
|
|
saved_registers[saved_match["register"]] = (
|
|
int(offset) if offset is not None else 0
|
|
)
|
|
cfa_offset = row_match["cfa_offset"]
|
|
rows.append(
|
|
_UnwindRow(
|
|
pc=int(row_match["pc"], 16),
|
|
cfa_register=row_match["cfa_register"],
|
|
cfa_offset=int(cfa_offset) if cfa_offset is not None else 0,
|
|
saved_registers=saved_registers,
|
|
)
|
|
)
|
|
if not rows:
|
|
raise ValueError("missing interpreted CFI rows in llvm-dwarfdump output")
|
|
return rows
|
|
|
|
@staticmethod
|
|
def _parse_objdump_instructions(dump: str) -> list[tuple[int, str]]:
|
|
instructions = []
|
|
for line in dump.splitlines():
|
|
match = re.match(
|
|
r"^\s*(?P<pc>[0-9a-f]+):\s+"
|
|
r"(?:(?:[0-9a-f]{2}|[0-9a-f]{8})\s+)+"
|
|
r"(?P<instruction>.+)$",
|
|
line,
|
|
)
|
|
if match:
|
|
instructions.append(
|
|
(
|
|
int(match["pc"], 16),
|
|
re.sub(r"\s+", " ", match["instruction"].strip()),
|
|
)
|
|
)
|
|
if not instructions:
|
|
raise ValueError("missing instructions in llvm-objdump output")
|
|
return instructions
|
|
|
|
def _reg_number(self, register: str) -> int:
|
|
try:
|
|
return self.config.register_numbers[register]
|
|
except KeyError as exc:
|
|
raise ValueError(
|
|
f"unsupported register {register!r} in llvm-dwarfdump output"
|
|
) from exc
|
|
|
|
@staticmethod
|
|
def _encoded_cfa_offset(byte_offset: int, data_alignment_factor: int) -> int:
|
|
if data_alignment_factor == 0:
|
|
raise ValueError("DWARF data alignment factor must not be zero")
|
|
if byte_offset % data_alignment_factor:
|
|
raise ValueError(
|
|
f"offset {byte_offset} is not a multiple of "
|
|
f"data alignment factor {data_alignment_factor}"
|
|
)
|
|
return byte_offset // data_alignment_factor
|
|
|
|
async def _read_objdump(self, output: pathlib.Path) -> str:
|
|
return await self.llvm_run(
|
|
"llvm-objdump",
|
|
["-d", f"{output}"],
|
|
echo=self.verbose,
|
|
llvm_version=self.llvm_version,
|
|
llvm_tools_install_dir=self.llvm_tools_install_dir,
|
|
)
|
|
|
|
async def _read_eh_frame(self, output: pathlib.Path) -> str:
|
|
return await self.llvm_run(
|
|
"llvm-dwarfdump",
|
|
["--eh-frame", f"{output}"],
|
|
echo=self.verbose,
|
|
llvm_version=self.llvm_version,
|
|
llvm_tools_install_dir=self.llvm_tools_install_dir,
|
|
)
|
|
|
|
def _executor_call_pc(self, disassembly: str) -> int:
|
|
calls = [
|
|
pc
|
|
for pc, instruction in self._parse_objdump_instructions(disassembly)
|
|
if self.config.is_call_instruction(instruction)
|
|
]
|
|
if len(calls) != 1:
|
|
raise ValueError(
|
|
f"{self.target_name} JIT shim should contain exactly one executor call"
|
|
)
|
|
call_pc = calls[0]
|
|
return call_pc
|
|
|
|
def _active_row(self, eh_frame: str, call_pc: int) -> _UnwindRow:
|
|
rows = self._parse_dwarfdump_rows(eh_frame)
|
|
active_rows = [row for row in rows if row.pc <= call_pc]
|
|
if not active_rows:
|
|
raise ValueError(
|
|
f"{self.target_name} JIT shim has no CFI row for executor call "
|
|
f"at 0x{call_pc:x}"
|
|
)
|
|
return max(active_rows, key=lambda row: row.pc)
|
|
|
|
def _check_saved_registers(self, row: _UnwindRow) -> None:
|
|
if (
|
|
self.config.frame_pointer not in row.saved_registers
|
|
or self.config.return_address not in row.saved_registers
|
|
):
|
|
raise ValueError(
|
|
f"{self.target_name} JIT shim CFI row at 0x{row.pc:x} "
|
|
f"does not save {self.config.frame_pointer} and "
|
|
f"{self.config.return_address}"
|
|
)
|
|
|
|
def _build_unwind_info(self, eh_frame: str, active_row: _UnwindRow) -> UnwindInfo:
|
|
code_alignment_factor = self._parse_dwarfdump_int(
|
|
eh_frame, "Code alignment factor"
|
|
)
|
|
data_alignment_factor = self._parse_dwarfdump_int(
|
|
eh_frame, "Data alignment factor"
|
|
)
|
|
return_address_register = self._parse_dwarfdump_int(
|
|
eh_frame, "Return address column"
|
|
)
|
|
assert code_alignment_factor is not None
|
|
assert data_alignment_factor is not None
|
|
assert return_address_register is not None
|
|
return UnwindInfo(
|
|
code_alignment_factor=code_alignment_factor,
|
|
data_alignment_factor=data_alignment_factor,
|
|
return_address_register=return_address_register,
|
|
cfa_register=self._reg_number(active_row.cfa_register),
|
|
cfa_offset=active_row.cfa_offset,
|
|
frame_pointer_register=self._reg_number(self.config.frame_pointer),
|
|
frame_pointer_offset=self._encoded_cfa_offset(
|
|
active_row.saved_registers[self.config.frame_pointer],
|
|
data_alignment_factor,
|
|
),
|
|
return_address_offset=self._encoded_cfa_offset(
|
|
active_row.saved_registers[self.config.return_address],
|
|
data_alignment_factor,
|
|
),
|
|
)
|
|
|
|
async def extract(self, output: pathlib.Path) -> UnwindInfo:
|
|
disassembly = await self._read_objdump(output)
|
|
call_pc = self._executor_call_pc(disassembly)
|
|
eh_frame = await self._read_eh_frame(output)
|
|
active_row = self._active_row(eh_frame, call_pc)
|
|
self._check_saved_registers(active_row)
|
|
return self._build_unwind_info(eh_frame, active_row)
|