"""Utilities for deriving JIT unwind information from DWARF CFI.""" import dataclasses import pathlib import re import typing _LLVMRun = typing.Callable[..., typing.Awaitable[str]] @dataclasses.dataclass(frozen=True) class UnwindInfo: code_alignment_factor: int data_alignment_factor: int return_address_register: int cfa_register: int cfa_offset: int frame_pointer_register: int frame_pointer_offset: int return_address_offset: int @dataclasses.dataclass(frozen=True) class ELFUnwindConfig: frame_pointer: str return_address: str register_numbers: typing.Mapping[str, int] call_instruction_prefixes: tuple[str, ...] def is_call_instruction(self, instruction: str) -> bool: return instruction.startswith(self.call_instruction_prefixes) @dataclasses.dataclass(frozen=True) class _UnwindRow: pc: int cfa_register: str cfa_offset: int saved_registers: dict[str, int] class ELFUnwindInfo: def __init__( self, target_name: str, *, config: ELFUnwindConfig, verbose: bool = False, llvm_version: str, llvm_tools_install_dir: str | None = None, llvm_run: _LLVMRun, ) -> None: self.target_name = target_name self.config = config self.verbose = verbose self.llvm_version = llvm_version self.llvm_tools_install_dir = llvm_tools_install_dir self.llvm_run = llvm_run @staticmethod def _parse_dwarfdump_int( dump: str, field: str, *, required: bool = True ) -> int | None: match = re.search(rf"^\s*{field}:\s+(-?\d+)$", dump, re.MULTILINE) if match is None: if required: raise ValueError(f"missing {field} in llvm-dwarfdump output") return None return int(match.group(1)) @staticmethod def _parse_dwarfdump_rows(dump: str) -> list[_UnwindRow]: row_pattern = re.compile( r"^\s*0x(?P[0-9a-f]+):\s+" r"CFA=(?P[A-Z][A-Z0-9]*)" r"(?P[+-]\d+)?" r"(?::\s*(?P.*))?$" ) saved_pattern = re.compile( r"(?P[A-Z][A-Z0-9]*)=\[CFA(?P[+-]\d+)?\]" ) rows = [] for line in dump.splitlines(): row_match = row_pattern.match(line) if row_match is None: continue saved_registers = {} saved = row_match["saved"] if saved: for saved_match in saved_pattern.finditer(saved): offset = saved_match["offset"] saved_registers[saved_match["register"]] = ( int(offset) if offset is not None else 0 ) cfa_offset = row_match["cfa_offset"] rows.append( _UnwindRow( pc=int(row_match["pc"], 16), cfa_register=row_match["cfa_register"], cfa_offset=int(cfa_offset) if cfa_offset is not None else 0, saved_registers=saved_registers, ) ) if not rows: raise ValueError("missing interpreted CFI rows in llvm-dwarfdump output") return rows @staticmethod def _parse_objdump_instructions(dump: str) -> list[tuple[int, str]]: instructions = [] for line in dump.splitlines(): match = re.match( r"^\s*(?P[0-9a-f]+):\s+" r"(?:(?:[0-9a-f]{2}|[0-9a-f]{8})\s+)+" r"(?P.+)$", line, ) if match: instructions.append( ( int(match["pc"], 16), re.sub(r"\s+", " ", match["instruction"].strip()), ) ) if not instructions: raise ValueError("missing instructions in llvm-objdump output") return instructions def _reg_number(self, register: str) -> int: try: return self.config.register_numbers[register] except KeyError as exc: raise ValueError( f"unsupported register {register!r} in llvm-dwarfdump output" ) from exc @staticmethod def _encoded_cfa_offset(byte_offset: int, data_alignment_factor: int) -> int: if data_alignment_factor == 0: raise ValueError("DWARF data alignment factor must not be zero") if byte_offset % data_alignment_factor: raise ValueError( f"offset {byte_offset} is not a multiple of " f"data alignment factor {data_alignment_factor}" ) return byte_offset // data_alignment_factor async def _read_objdump(self, output: pathlib.Path) -> str: return await self.llvm_run( "llvm-objdump", ["-d", f"{output}"], echo=self.verbose, llvm_version=self.llvm_version, llvm_tools_install_dir=self.llvm_tools_install_dir, ) async def _read_eh_frame(self, output: pathlib.Path) -> str: return await self.llvm_run( "llvm-dwarfdump", ["--eh-frame", f"{output}"], echo=self.verbose, llvm_version=self.llvm_version, llvm_tools_install_dir=self.llvm_tools_install_dir, ) def _executor_call_pc(self, disassembly: str) -> int: calls = [ pc for pc, instruction in self._parse_objdump_instructions(disassembly) if self.config.is_call_instruction(instruction) ] if len(calls) != 1: raise ValueError( f"{self.target_name} JIT shim should contain exactly one executor call" ) call_pc = calls[0] return call_pc def _active_row(self, eh_frame: str, call_pc: int) -> _UnwindRow: rows = self._parse_dwarfdump_rows(eh_frame) active_rows = [row for row in rows if row.pc <= call_pc] if not active_rows: raise ValueError( f"{self.target_name} JIT shim has no CFI row for executor call " f"at 0x{call_pc:x}" ) return max(active_rows, key=lambda row: row.pc) def _check_saved_registers(self, row: _UnwindRow) -> None: if ( self.config.frame_pointer not in row.saved_registers or self.config.return_address not in row.saved_registers ): raise ValueError( f"{self.target_name} JIT shim CFI row at 0x{row.pc:x} " f"does not save {self.config.frame_pointer} and " f"{self.config.return_address}" ) def _build_unwind_info(self, eh_frame: str, active_row: _UnwindRow) -> UnwindInfo: code_alignment_factor = self._parse_dwarfdump_int( eh_frame, "Code alignment factor" ) data_alignment_factor = self._parse_dwarfdump_int( eh_frame, "Data alignment factor" ) return_address_register = self._parse_dwarfdump_int( eh_frame, "Return address column" ) assert code_alignment_factor is not None assert data_alignment_factor is not None assert return_address_register is not None return UnwindInfo( code_alignment_factor=code_alignment_factor, data_alignment_factor=data_alignment_factor, return_address_register=return_address_register, cfa_register=self._reg_number(active_row.cfa_register), cfa_offset=active_row.cfa_offset, frame_pointer_register=self._reg_number(self.config.frame_pointer), frame_pointer_offset=self._encoded_cfa_offset( active_row.saved_registers[self.config.frame_pointer], data_alignment_factor, ), return_address_offset=self._encoded_cfa_offset( active_row.saved_registers[self.config.return_address], data_alignment_factor, ), ) async def extract(self, output: pathlib.Path) -> UnwindInfo: disassembly = await self._read_objdump(output) call_pc = self._executor_call_pc(disassembly) eh_frame = await self._read_eh_frame(output) active_row = self._active_row(eh_frame, call_pc) self._check_saved_registers(active_row) return self._build_unwind_info(eh_frame, active_row)