mirror of
				https://github.com/python/cpython.git
				synced 2025-10-31 13:41:24 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1711 lines
		
	
	
	
		
			65 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1711 lines
		
	
	
	
		
			65 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Generate the main interpreter switch.
 | |
| 
 | |
| Reads the instruction definitions from bytecodes.c.
 | |
| Writes the cases to generated_cases.c.h, which is #included in ceval.c.
 | |
| """
 | |
| 
 | |
| import argparse
 | |
| import contextlib
 | |
| import dataclasses
 | |
| import os
 | |
| import posixpath
 | |
| import re
 | |
| import sys
 | |
| import typing
 | |
| 
 | |
| import lexer as lx
 | |
| import parser
 | |
| from parser import StackEffect
 | |
| 
 | |
| HERE = os.path.dirname(__file__)
 | |
| ROOT = os.path.join(HERE, "../..")
 | |
| THIS = os.path.relpath(__file__, ROOT).replace(os.path.sep, posixpath.sep)
 | |
| 
 | |
| DEFAULT_INPUT = os.path.relpath(os.path.join(ROOT, "Python/bytecodes.c"))
 | |
| DEFAULT_OUTPUT = os.path.relpath(os.path.join(ROOT, "Python/generated_cases.c.h"))
 | |
| DEFAULT_METADATA_OUTPUT = os.path.relpath(
 | |
|     os.path.join(ROOT, "Include/internal/pycore_opcode_metadata.h")
 | |
| )
 | |
| DEFAULT_PYMETADATA_OUTPUT = os.path.relpath(
 | |
|     os.path.join(ROOT, "Lib/_opcode_metadata.py")
 | |
| )
 | |
| DEFAULT_EXECUTOR_OUTPUT = os.path.relpath(
 | |
|     os.path.join(ROOT, "Python/executor_cases.c.h")
 | |
| )
 | |
| BEGIN_MARKER = "// BEGIN BYTECODES //"
 | |
| END_MARKER = "// END BYTECODES //"
 | |
| RE_PREDICTED = (
 | |
|     r"^\s*(?:GO_TO_INSTRUCTION\(|DEOPT_IF\(.*?,\s*)(\w+)\);\s*(?://.*)?$"
 | |
| )
 | |
| UNUSED = "unused"
 | |
| BITS_PER_CODE_UNIT = 16
 | |
| 
 | |
| # Constants used instead of size for macro expansions.
 | |
| # Note: 1, 2, 4 must match actual cache entry sizes.
 | |
| OPARG_SIZES = {
 | |
|     "OPARG_FULL": 0,
 | |
|     "OPARG_CACHE_1": 1,
 | |
|     "OPARG_CACHE_2": 2,
 | |
|     "OPARG_CACHE_4": 4,
 | |
|     "OPARG_TOP": 5,
 | |
|     "OPARG_BOTTOM": 6,
 | |
| }
 | |
| 
 | |
| RESERVED_WORDS = {
 | |
|     "co_consts" : "Use FRAME_CO_CONSTS.",
 | |
|     "co_names": "Use FRAME_CO_NAMES.",
 | |
| }
 | |
| 
 | |
| arg_parser = argparse.ArgumentParser(
 | |
|     description="Generate the code for the interpreter switch.",
 | |
|     formatter_class=argparse.ArgumentDefaultsHelpFormatter,
 | |
| )
 | |
| arg_parser.add_argument(
 | |
|     "-o", "--output", type=str, help="Generated code", default=DEFAULT_OUTPUT
 | |
| )
 | |
| arg_parser.add_argument(
 | |
|     "-m", "--metadata", type=str, help="Generated C metadata", default=DEFAULT_METADATA_OUTPUT
 | |
| )
 | |
| arg_parser.add_argument(
 | |
|     "-p", "--pymetadata", type=str, help="Generated Python metadata", default=DEFAULT_PYMETADATA_OUTPUT
 | |
| )
 | |
| arg_parser.add_argument(
 | |
|     "-l", "--emit-line-directives", help="Emit #line directives", action="store_true"
 | |
| )
 | |
| arg_parser.add_argument(
 | |
|     "input", nargs=argparse.REMAINDER, help="Instruction definition file(s)"
 | |
| )
 | |
| arg_parser.add_argument(
 | |
|     "-e",
 | |
|     "--executor-cases",
 | |
|     type=str,
 | |
|     help="Write executor cases to this file",
 | |
|     default=DEFAULT_EXECUTOR_OUTPUT,
 | |
| )
 | |
| 
 | |
| 
 | |
| def effect_size(effect: StackEffect) -> tuple[int, str]:
 | |
|     """Return the 'size' impact of a stack effect.
 | |
| 
 | |
|     Returns a tuple (numeric, symbolic) where:
 | |
| 
 | |
|     - numeric is an int giving the statically analyzable size of the effect
 | |
|     - symbolic is a string representing a variable effect (e.g. 'oparg*2')
 | |
| 
 | |
|     At most one of these will be non-zero / non-empty.
 | |
|     """
 | |
|     if effect.size:
 | |
|         assert not effect.cond, "Array effects cannot have a condition"
 | |
|         return 0, effect.size
 | |
|     elif effect.cond:
 | |
|         if effect.cond in ("0", "1"):
 | |
|             return 0, effect.cond
 | |
|         return 0, f"{maybe_parenthesize(effect.cond)} ? 1 : 0"
 | |
|     else:
 | |
|         return 1, ""
 | |
| 
 | |
| 
 | |
| def maybe_parenthesize(sym: str) -> str:
 | |
|     """Add parentheses around a string if it contains an operator.
 | |
| 
 | |
|     An exception is made for '*' which is common and harmless
 | |
|     in the context where the symbolic size is used.
 | |
|     """
 | |
|     if re.match(r"^[\s\w*]+$", sym):
 | |
|         return sym
 | |
|     else:
 | |
|         return f"({sym})"
 | |
| 
 | |
| 
 | |
| def list_effect_size(effects: list[StackEffect]) -> tuple[int, str]:
 | |
|     numeric = 0
 | |
|     symbolic: list[str] = []
 | |
|     for effect in effects:
 | |
|         diff, sym = effect_size(effect)
 | |
|         numeric += diff
 | |
|         if sym:
 | |
|             symbolic.append(maybe_parenthesize(sym))
 | |
|     return numeric, " + ".join(symbolic)
 | |
| 
 | |
| 
 | |
| def string_effect_size(arg: tuple[int, str]) -> str:
 | |
|     numeric, symbolic = arg
 | |
|     if numeric and symbolic:
 | |
|         return f"{numeric} + {symbolic}"
 | |
|     elif symbolic:
 | |
|         return symbolic
 | |
|     else:
 | |
|         return str(numeric)
 | |
| 
 | |
| 
 | |
| class Formatter:
 | |
|     """Wraps an output stream with the ability to indent etc."""
 | |
| 
 | |
|     stream: typing.TextIO
 | |
|     prefix: str
 | |
|     emit_line_directives: bool = False
 | |
|     lineno: int  # Next line number, 1-based
 | |
|     filename: str  # Slightly improved stream.filename
 | |
|     nominal_lineno: int
 | |
|     nominal_filename: str
 | |
| 
 | |
|     def __init__(
 | |
|             self, stream: typing.TextIO, indent: int,
 | |
|                   emit_line_directives: bool = False, comment: str = "//",
 | |
|     ) -> None:
 | |
|         self.stream = stream
 | |
|         self.prefix = " " * indent
 | |
|         self.emit_line_directives = emit_line_directives
 | |
|         self.comment = comment
 | |
|         self.lineno = 1
 | |
|         self.filename = prettify_filename(self.stream.name)
 | |
|         self.nominal_lineno = 1
 | |
|         self.nominal_filename = self.filename
 | |
| 
 | |
|     def write_raw(self, s: str) -> None:
 | |
|         self.stream.write(s)
 | |
|         newlines = s.count("\n")
 | |
|         self.lineno += newlines
 | |
|         self.nominal_lineno += newlines
 | |
| 
 | |
|     def emit(self, arg: str) -> None:
 | |
|         if arg:
 | |
|             self.write_raw(f"{self.prefix}{arg}\n")
 | |
|         else:
 | |
|             self.write_raw("\n")
 | |
| 
 | |
|     def set_lineno(self, lineno: int, filename: str) -> None:
 | |
|         if self.emit_line_directives:
 | |
|             if lineno != self.nominal_lineno or filename != self.nominal_filename:
 | |
|                 self.emit(f'#line {lineno} "{filename}"')
 | |
|                 self.nominal_lineno = lineno
 | |
|                 self.nominal_filename = filename
 | |
| 
 | |
|     def reset_lineno(self) -> None:
 | |
|         if self.lineno != self.nominal_lineno or self.filename != self.nominal_filename:
 | |
|             self.set_lineno(self.lineno + 1, self.filename)
 | |
| 
 | |
|     @contextlib.contextmanager
 | |
|     def indent(self):
 | |
|         self.prefix += "    "
 | |
|         yield
 | |
|         self.prefix = self.prefix[:-4]
 | |
| 
 | |
|     @contextlib.contextmanager
 | |
|     def block(self, head: str, tail: str = ""):
 | |
|         if head:
 | |
|             self.emit(head + " {")
 | |
|         else:
 | |
|             self.emit("{")
 | |
|         with self.indent():
 | |
|             yield
 | |
|         self.emit("}" + tail)
 | |
| 
 | |
|     def stack_adjust(
 | |
|         self,
 | |
|         input_effects: list[StackEffect],
 | |
|         output_effects: list[StackEffect],
 | |
|     ):
 | |
|         shrink, isym = list_effect_size(input_effects)
 | |
|         grow, osym = list_effect_size(output_effects)
 | |
|         diff = grow - shrink
 | |
|         if isym and isym != osym:
 | |
|             self.emit(f"STACK_SHRINK({isym});")
 | |
|         if diff < 0:
 | |
|             self.emit(f"STACK_SHRINK({-diff});")
 | |
|         if diff > 0:
 | |
|             self.emit(f"STACK_GROW({diff});")
 | |
|         if osym and osym != isym:
 | |
|             self.emit(f"STACK_GROW({osym});")
 | |
| 
 | |
|     def declare(self, dst: StackEffect, src: StackEffect | None):
 | |
|         if dst.name == UNUSED or dst.cond == "0":
 | |
|             return
 | |
|         typ = f"{dst.type}" if dst.type else "PyObject *"
 | |
|         if src:
 | |
|             cast = self.cast(dst, src)
 | |
|             init = f" = {cast}{src.name}"
 | |
|         elif dst.cond:
 | |
|             init = " = NULL"
 | |
|         else:
 | |
|             init = ""
 | |
|         sepa = "" if typ.endswith("*") else " "
 | |
|         self.emit(f"{typ}{sepa}{dst.name}{init};")
 | |
| 
 | |
|     def assign(self, dst: StackEffect, src: StackEffect):
 | |
|         if src.name == UNUSED:
 | |
|             return
 | |
|         if src.size:
 | |
|             # Don't write sized arrays -- it's up to the user code.
 | |
|             return
 | |
|         cast = self.cast(dst, src)
 | |
|         if re.match(r"^REG\(oparg(\d+)\)$", dst.name):
 | |
|             self.emit(f"Py_XSETREF({dst.name}, {cast}{src.name});")
 | |
|         else:
 | |
|             stmt = f"{dst.name} = {cast}{src.name};"
 | |
|             if src.cond and src.cond != "1":
 | |
|                 if src.cond == "0":
 | |
|                     # It will not be executed
 | |
|                     return
 | |
|                 stmt = f"if ({src.cond}) {{ {stmt} }}"
 | |
|             self.emit(stmt)
 | |
| 
 | |
|     def cast(self, dst: StackEffect, src: StackEffect) -> str:
 | |
|         return f"({dst.type or 'PyObject *'})" if src.type != dst.type else ""
 | |
| 
 | |
| @dataclasses.dataclass
 | |
| class InstructionFlags:
 | |
|     """Construct and manipulate instruction flags"""
 | |
| 
 | |
|     HAS_ARG_FLAG: bool
 | |
|     HAS_CONST_FLAG: bool
 | |
|     HAS_NAME_FLAG: bool
 | |
|     HAS_JUMP_FLAG: bool
 | |
| 
 | |
|     def __post_init__(self):
 | |
|         self.bitmask = {
 | |
|             name : (1 << i) for i, name in enumerate(self.names())
 | |
|         }
 | |
| 
 | |
|     @staticmethod
 | |
|     def fromInstruction(instr: "AnyInstruction"):
 | |
|         return InstructionFlags(
 | |
|             HAS_ARG_FLAG=variable_used(instr, "oparg"),
 | |
|             HAS_CONST_FLAG=variable_used(instr, "FRAME_CO_CONSTS"),
 | |
|             HAS_NAME_FLAG=variable_used(instr, "FRAME_CO_NAMES"),
 | |
|             HAS_JUMP_FLAG=variable_used(instr, "JUMPBY"),
 | |
|         )
 | |
| 
 | |
|     @staticmethod
 | |
|     def newEmpty():
 | |
|         return InstructionFlags(False, False, False, False)
 | |
| 
 | |
|     def add(self, other: "InstructionFlags") -> None:
 | |
|         for name, value in dataclasses.asdict(other).items():
 | |
|             if value:
 | |
|                 setattr(self, name, value)
 | |
| 
 | |
|     def names(self, value=None):
 | |
|         if value is None:
 | |
|             return dataclasses.asdict(self).keys()
 | |
|         return [n for n, v in dataclasses.asdict(self).items() if v == value]
 | |
| 
 | |
|     def bitmap(self) -> int:
 | |
|         flags = 0
 | |
|         for name in self.names():
 | |
|             if getattr(self, name):
 | |
|                 flags |= self.bitmask[name]
 | |
|         return flags
 | |
| 
 | |
|     @classmethod
 | |
|     def emit_macros(cls, out: Formatter):
 | |
|         flags = cls.newEmpty()
 | |
|         for name, value in flags.bitmask.items():
 | |
|             out.emit(f"#define {name} ({value})");
 | |
| 
 | |
|         for name, value in flags.bitmask.items():
 | |
|             out.emit(
 | |
|                 f"#define OPCODE_{name[:-len('_FLAG')]}(OP) "
 | |
|                 f"(_PyOpcode_opcode_metadata[OP].flags & ({name}))")
 | |
| 
 | |
| 
 | |
| @dataclasses.dataclass
 | |
| class ActiveCacheEffect:
 | |
|     """Wraps a CacheEffect that is actually used, in context."""
 | |
|     effect: parser.CacheEffect
 | |
|     offset: int
 | |
| 
 | |
| 
 | |
| FORBIDDEN_NAMES_IN_UOPS = (
 | |
|     "resume_with_error",
 | |
|     "kwnames",
 | |
|     "next_instr",
 | |
|     "oparg1",  # Proxy for super-instructions like LOAD_FAST_LOAD_FAST
 | |
|     "JUMPBY",
 | |
|     "DISPATCH",
 | |
|     "INSTRUMENTED_JUMP",
 | |
|     "throwflag",
 | |
|     "exception_unwind",
 | |
|     "import_from",
 | |
|     "import_name",
 | |
|     "_PyObject_CallNoArgs",  # Proxy for BEFORE_WITH
 | |
| )
 | |
| 
 | |
| 
 | |
| # Interpreter tiers
 | |
| TIER_ONE = 1  # Specializing adaptive interpreter (PEP 659)
 | |
| TIER_TWO = 2  # Experimental tracing interpreter
 | |
| Tiers: typing.TypeAlias = typing.Literal[1, 2]
 | |
| 
 | |
| 
 | |
| @dataclasses.dataclass
 | |
| class Instruction:
 | |
|     """An instruction with additional data and code."""
 | |
| 
 | |
|     # Parts of the underlying instruction definition
 | |
|     inst: parser.InstDef
 | |
|     kind: typing.Literal["inst", "op"]
 | |
|     name: str
 | |
|     block: parser.Block
 | |
|     block_text: list[str]  # Block.text, less curlies, less PREDICT() calls
 | |
|     block_line: int  # First line of block in original code
 | |
| 
 | |
|     # Computed by constructor
 | |
|     always_exits: bool
 | |
|     cache_offset: int
 | |
|     cache_effects: list[parser.CacheEffect]
 | |
|     input_effects: list[StackEffect]
 | |
|     output_effects: list[StackEffect]
 | |
|     unmoved_names: frozenset[str]
 | |
|     instr_fmt: str
 | |
|     instr_flags: InstructionFlags
 | |
|     active_caches: list[ActiveCacheEffect]
 | |
| 
 | |
|     # Set later
 | |
|     family: parser.Family | None = None
 | |
|     predicted: bool = False
 | |
| 
 | |
|     def __init__(self, inst: parser.InstDef):
 | |
|         self.inst = inst
 | |
|         self.kind = inst.kind
 | |
|         self.name = inst.name
 | |
|         self.block = inst.block
 | |
|         self.block_text, self.check_eval_breaker, self.block_line = \
 | |
|             extract_block_text(self.block)
 | |
|         self.always_exits = always_exits(self.block_text)
 | |
|         self.cache_effects = [
 | |
|             effect for effect in inst.inputs if isinstance(effect, parser.CacheEffect)
 | |
|         ]
 | |
|         self.cache_offset = sum(c.size for c in self.cache_effects)
 | |
|         self.input_effects = [
 | |
|             effect for effect in inst.inputs if isinstance(effect, StackEffect)
 | |
|         ]
 | |
|         self.output_effects = inst.outputs  # For consistency/completeness
 | |
|         unmoved_names: set[str] = set()
 | |
|         for ieffect, oeffect in zip(self.input_effects, self.output_effects):
 | |
|             if ieffect.name == oeffect.name:
 | |
|                 unmoved_names.add(ieffect.name)
 | |
|             else:
 | |
|                 break
 | |
|         self.unmoved_names = frozenset(unmoved_names)
 | |
| 
 | |
|         self.instr_flags = InstructionFlags.fromInstruction(inst)
 | |
| 
 | |
|         self.active_caches = []
 | |
|         offset = 0
 | |
|         for effect in self.cache_effects:
 | |
|             if effect.name != UNUSED:
 | |
|                 self.active_caches.append(ActiveCacheEffect(effect, offset))
 | |
|             offset += effect.size
 | |
| 
 | |
|         if self.instr_flags.HAS_ARG_FLAG:
 | |
|             fmt = "IB"
 | |
|         else:
 | |
|             fmt = "IX"
 | |
|         if offset:
 | |
|             fmt += "C" + "0"*(offset-1)
 | |
|         self.instr_fmt = fmt
 | |
| 
 | |
|     def is_viable_uop(self) -> bool:
 | |
|         """Whether this instruction is viable as a uop."""
 | |
|         if self.name == "EXIT_TRACE":
 | |
|             return True  # This has 'return frame' but it's okay
 | |
|         if self.always_exits:
 | |
|             # print(f"Skipping {self.name} because it always exits")
 | |
|             return False
 | |
|         if self.instr_flags.HAS_ARG_FLAG:
 | |
|             # If the instruction uses oparg, it cannot use any caches
 | |
|             if self.active_caches:
 | |
|                 # print(f"Skipping {self.name} because it uses oparg and caches")
 | |
|                 return False
 | |
|         else:
 | |
|             # If it doesn't use oparg, it can have one cache entry
 | |
|             if len(self.active_caches) > 1:
 | |
|                 # print(f"Skipping {self.name} because it has >1 cache entries")
 | |
|                 return False
 | |
|         res = True
 | |
|         for forbidden in FORBIDDEN_NAMES_IN_UOPS:
 | |
|             # NOTE: To disallow unspecialized uops, use
 | |
|             # if variable_used(self.inst, forbidden):
 | |
|             if variable_used_unspecialized(self.inst, forbidden):
 | |
|                 # print(f"Skipping {self.name} because it uses {forbidden}")
 | |
|                 res = False
 | |
|         return res
 | |
| 
 | |
|     def write(self, out: Formatter, tier: Tiers = TIER_ONE) -> None:
 | |
|         """Write one instruction, sans prologue and epilogue."""
 | |
|         # Write a static assertion that a family's cache size is correct
 | |
|         if family := self.family:
 | |
|             if self.name == family.name:
 | |
|                 if cache_size := family.size:
 | |
|                     out.emit(
 | |
|                         f"static_assert({cache_size} == "
 | |
|                         f'{self.cache_offset}, "incorrect cache size");'
 | |
|                     )
 | |
| 
 | |
|         # Write input stack effect variable declarations and initializations
 | |
|         ieffects = list(reversed(self.input_effects))
 | |
|         for i, ieffect in enumerate(ieffects):
 | |
|             isize = string_effect_size(
 | |
|                 list_effect_size([ieff for ieff in ieffects[: i + 1]])
 | |
|             )
 | |
|             if ieffect.size:
 | |
|                 src = StackEffect(f"(stack_pointer - {maybe_parenthesize(isize)})", "PyObject **")
 | |
|             elif ieffect.cond:
 | |
|                 src = StackEffect(f"({ieffect.cond}) ? stack_pointer[-{maybe_parenthesize(isize)}] : NULL", "")
 | |
|             else:
 | |
|                 src = StackEffect(f"stack_pointer[-{maybe_parenthesize(isize)}]", "")
 | |
|             out.declare(ieffect, src)
 | |
| 
 | |
|         # Write output stack effect variable declarations
 | |
|         isize = string_effect_size(list_effect_size(self.input_effects))
 | |
|         input_names = {ieffect.name for ieffect in self.input_effects}
 | |
|         for i, oeffect in enumerate(self.output_effects):
 | |
|             if oeffect.name not in input_names:
 | |
|                 if oeffect.size:
 | |
|                     osize = string_effect_size(
 | |
|                         list_effect_size([oeff for oeff in self.output_effects[:i]])
 | |
|                     )
 | |
|                     offset = "stack_pointer"
 | |
|                     if isize != osize:
 | |
|                         if isize != "0":
 | |
|                             offset += f" - ({isize})"
 | |
|                         if osize != "0":
 | |
|                             offset += f" + {osize}"
 | |
|                     src = StackEffect(offset, "PyObject **")
 | |
|                     out.declare(oeffect, src)
 | |
|                 else:
 | |
|                     out.declare(oeffect, None)
 | |
| 
 | |
|         # out.emit(f"next_instr += OPSIZE({self.inst.name}) - 1;")
 | |
| 
 | |
|         self.write_body(out, 0, self.active_caches, tier=tier)
 | |
| 
 | |
|         # Skip the rest if the block always exits
 | |
|         if self.always_exits:
 | |
|             return
 | |
| 
 | |
|         # Write net stack growth/shrinkage
 | |
|         out.stack_adjust(
 | |
|             [ieff for ieff in self.input_effects],
 | |
|             [oeff for oeff in self.output_effects],
 | |
|         )
 | |
| 
 | |
|         # Write output stack effect assignments
 | |
|         oeffects = list(reversed(self.output_effects))
 | |
|         for i, oeffect in enumerate(oeffects):
 | |
|             if oeffect.name in self.unmoved_names:
 | |
|                 continue
 | |
|             osize = string_effect_size(
 | |
|                 list_effect_size([oeff for oeff in oeffects[: i + 1]])
 | |
|             )
 | |
|             if oeffect.size:
 | |
|                 dst = StackEffect(f"stack_pointer - {maybe_parenthesize(osize)}", "PyObject **")
 | |
|             else:
 | |
|                 dst = StackEffect(f"stack_pointer[-{maybe_parenthesize(osize)}]", "")
 | |
|             out.assign(dst, oeffect)
 | |
| 
 | |
|         # Write cache effect
 | |
|         if tier == TIER_ONE and self.cache_offset:
 | |
|             out.emit(f"next_instr += {self.cache_offset};")
 | |
| 
 | |
|     def write_body(
 | |
|             self,
 | |
|             out: Formatter,
 | |
|             dedent: int,
 | |
|             active_caches: list[ActiveCacheEffect],
 | |
|             tier: Tiers = TIER_ONE,
 | |
|         ) -> None:
 | |
|         """Write the instruction body."""
 | |
|         # Write cache effect variable declarations and initializations
 | |
|         for active in active_caches:
 | |
|             ceffect = active.effect
 | |
|             bits = ceffect.size * BITS_PER_CODE_UNIT
 | |
|             if bits == 64:
 | |
|                 # NOTE: We assume that 64-bit data in the cache
 | |
|                 # is always an object pointer.
 | |
|                 # If this becomes false, we need a way to specify
 | |
|                 # syntactically what type the cache data is.
 | |
|                 typ = "PyObject *"
 | |
|                 func = "read_obj"
 | |
|             else:
 | |
|                 typ = f"uint{bits}_t "
 | |
|                 func = f"read_u{bits}"
 | |
|             if tier == TIER_ONE:
 | |
|                 out.emit(
 | |
|                     f"{typ}{ceffect.name} = {func}(&next_instr[{active.offset}].cache);"
 | |
|                 )
 | |
|             else:
 | |
|                 out.emit(f"{typ}{ceffect.name} = ({typ.strip()})operand;")
 | |
| 
 | |
|         # Write the body, substituting a goto for ERROR_IF() and other stuff
 | |
|         assert dedent <= 0
 | |
|         extra = " " * -dedent
 | |
|         names_to_skip = self.unmoved_names | frozenset({UNUSED, "null"})
 | |
|         offset = 0
 | |
|         context = self.block.context
 | |
|         assert context is not None and context.owner is not None
 | |
|         filename = context.owner.filename
 | |
|         for line in self.block_text:
 | |
|             out.set_lineno(self.block_line + offset, filename)
 | |
|             offset += 1
 | |
|             if m := re.match(r"(\s*)ERROR_IF\((.+), (\w+)\);\s*(?://.*)?$", line):
 | |
|                 space, cond, label = m.groups()
 | |
|                 space = extra + space
 | |
|                 # ERROR_IF() must pop the inputs from the stack.
 | |
|                 # The code block is responsible for DECREF()ing them.
 | |
|                 # NOTE: If the label doesn't exist, just add it to ceval.c.
 | |
| 
 | |
|                 # Don't pop common input/output effects at the bottom!
 | |
|                 # These aren't DECREF'ed so they can stay.
 | |
|                 ieffs = list(self.input_effects)
 | |
|                 oeffs = list(self.output_effects)
 | |
|                 while ieffs and oeffs and ieffs[0] == oeffs[0]:
 | |
|                     ieffs.pop(0)
 | |
|                     oeffs.pop(0)
 | |
|                 ninputs, symbolic = list_effect_size(ieffs)
 | |
|                 if ninputs:
 | |
|                     label = f"pop_{ninputs}_{label}"
 | |
|                 if symbolic:
 | |
|                     out.write_raw(
 | |
|                         f"{space}if ({cond}) {{ STACK_SHRINK({symbolic}); goto {label}; }}\n"
 | |
|                     )
 | |
|                 else:
 | |
|                     out.write_raw(f"{space}if ({cond}) goto {label};\n")
 | |
|             elif m := re.match(r"(\s*)DECREF_INPUTS\(\);\s*(?://.*)?$", line):
 | |
|                 out.reset_lineno()
 | |
|                 space = extra + m.group(1)
 | |
|                 for ieff in self.input_effects:
 | |
|                     if ieff.name in names_to_skip:
 | |
|                         continue
 | |
|                     if ieff.size:
 | |
|                         out.write_raw(
 | |
|                             f"{space}for (int _i = {ieff.size}; --_i >= 0;) {{\n"
 | |
|                         )
 | |
|                         out.write_raw(f"{space}    Py_DECREF({ieff.name}[_i]);\n")
 | |
|                         out.write_raw(f"{space}}}\n")
 | |
|                     else:
 | |
|                         decref = "XDECREF" if ieff.cond else "DECREF"
 | |
|                         out.write_raw(f"{space}Py_{decref}({ieff.name});\n")
 | |
|             else:
 | |
|                 out.write_raw(extra + line)
 | |
|         out.reset_lineno()
 | |
| 
 | |
| 
 | |
| InstructionOrCacheEffect = Instruction | parser.CacheEffect
 | |
| StackEffectMapping = list[tuple[StackEffect, StackEffect]]
 | |
| 
 | |
| 
 | |
| @dataclasses.dataclass
 | |
| class Component:
 | |
|     instr: Instruction
 | |
|     input_mapping: StackEffectMapping
 | |
|     output_mapping: StackEffectMapping
 | |
|     active_caches: list[ActiveCacheEffect]
 | |
| 
 | |
|     def write_body(self, out: Formatter) -> None:
 | |
|         with out.block(""):
 | |
|             input_names = {ieffect.name for _, ieffect in self.input_mapping}
 | |
|             for var, ieffect in self.input_mapping:
 | |
|                 out.declare(ieffect, var)
 | |
|             for _, oeffect in self.output_mapping:
 | |
|                 if oeffect.name not in input_names:
 | |
|                     out.declare(oeffect, None)
 | |
| 
 | |
|             self.instr.write_body(out, -4, self.active_caches)
 | |
| 
 | |
|             for var, oeffect in self.output_mapping:
 | |
|                 out.assign(var, oeffect)
 | |
| 
 | |
| 
 | |
| MacroParts = list[Component | parser.CacheEffect]
 | |
| 
 | |
| 
 | |
| @dataclasses.dataclass
 | |
| class MacroInstruction:
 | |
|     """A macro instruction."""
 | |
| 
 | |
|     name: str
 | |
|     stack: list[StackEffect]
 | |
|     initial_sp: int
 | |
|     final_sp: int
 | |
|     instr_fmt: str
 | |
|     instr_flags: InstructionFlags
 | |
|     macro: parser.Macro
 | |
|     parts: MacroParts
 | |
|     cache_offset: int
 | |
|     predicted: bool = False
 | |
| 
 | |
| 
 | |
| @dataclasses.dataclass
 | |
| class PseudoInstruction:
 | |
|     """A pseudo instruction."""
 | |
| 
 | |
|     name: str
 | |
|     targets: list[Instruction]
 | |
|     instr_fmt: str
 | |
|     instr_flags: InstructionFlags
 | |
| 
 | |
| 
 | |
| @dataclasses.dataclass
 | |
| class OverriddenInstructionPlaceHolder:
 | |
|     name: str
 | |
| 
 | |
| 
 | |
| AnyInstruction = Instruction | MacroInstruction | PseudoInstruction
 | |
| INSTR_FMT_PREFIX = "INSTR_FMT_"
 | |
| 
 | |
| 
 | |
| class Analyzer:
 | |
|     """Parse input, analyze it, and write to output."""
 | |
| 
 | |
|     input_filenames: list[str]
 | |
|     output_filename: str
 | |
|     metadata_filename: str
 | |
|     pymetadata_filename: str
 | |
|     executor_filename: str
 | |
|     errors: int = 0
 | |
|     emit_line_directives: bool = False
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         input_filenames: list[str],
 | |
|         output_filename: str,
 | |
|         metadata_filename: str,
 | |
|         pymetadata_filename: str,
 | |
|         executor_filename: str,
 | |
|     ):
 | |
|         """Read the input file."""
 | |
|         self.input_filenames = input_filenames
 | |
|         self.output_filename = output_filename
 | |
|         self.metadata_filename = metadata_filename
 | |
|         self.pymetadata_filename = pymetadata_filename
 | |
|         self.executor_filename = executor_filename
 | |
| 
 | |
|     def error(self, msg: str, node: parser.Node) -> None:
 | |
|         lineno = 0
 | |
|         filename = "<unknown file>"
 | |
|         if context := node.context:
 | |
|             filename = context.owner.filename
 | |
|             # Use line number of first non-comment in the node
 | |
|             for token in context.owner.tokens[context.begin : context.end]:
 | |
|                 lineno = token.line
 | |
|                 if token.kind != "COMMENT":
 | |
|                     break
 | |
|         print(f"{filename}:{lineno}: {msg}", file=sys.stderr)
 | |
|         self.errors += 1
 | |
| 
 | |
|     everything: list[
 | |
|         parser.InstDef | parser.Macro | parser.Pseudo | OverriddenInstructionPlaceHolder
 | |
|     ]
 | |
|     instrs: dict[str, Instruction]  # Includes ops
 | |
|     macros: dict[str, parser.Macro]
 | |
|     macro_instrs: dict[str, MacroInstruction]
 | |
|     families: dict[str, parser.Family]
 | |
|     pseudos: dict[str, parser.Pseudo]
 | |
|     pseudo_instrs: dict[str, PseudoInstruction]
 | |
| 
 | |
|     def parse(self) -> None:
 | |
|         """Parse the source text.
 | |
| 
 | |
|         We only want the parser to see the stuff between the
 | |
|         begin and end markers.
 | |
|         """
 | |
| 
 | |
|         self.everything = []
 | |
|         self.instrs = {}
 | |
|         self.macros = {}
 | |
|         self.families = {}
 | |
|         self.pseudos = {}
 | |
| 
 | |
|         instrs_idx: dict[str, int] = dict()
 | |
| 
 | |
|         for filename in self.input_filenames:
 | |
|             self.parse_file(filename, instrs_idx)
 | |
| 
 | |
|         files = " + ".join(self.input_filenames)
 | |
|         print(
 | |
|             f"Read {len(self.instrs)} instructions/ops, "
 | |
|             f"{len(self.macros)} macros, {len(self.pseudos)} pseudos, "
 | |
|             f"and {len(self.families)} families from {files}",
 | |
|             file=sys.stderr,
 | |
|         )
 | |
| 
 | |
|     def parse_file(self, filename: str, instrs_idx: dict[str, int]) -> None:
 | |
|         with open(filename) as file:
 | |
|             src = file.read()
 | |
| 
 | |
| 
 | |
|         psr = parser.Parser(src, filename=prettify_filename(filename))
 | |
| 
 | |
|         # Skip until begin marker
 | |
|         while tkn := psr.next(raw=True):
 | |
|             if tkn.text == BEGIN_MARKER:
 | |
|                 break
 | |
|         else:
 | |
|             raise psr.make_syntax_error(
 | |
|                 f"Couldn't find {BEGIN_MARKER!r} in {psr.filename}"
 | |
|             )
 | |
|         start = psr.getpos()
 | |
| 
 | |
|         # Find end marker, then delete everything after it
 | |
|         while tkn := psr.next(raw=True):
 | |
|             if tkn.text == END_MARKER:
 | |
|                 break
 | |
|         del psr.tokens[psr.getpos() - 1 :]
 | |
| 
 | |
|         # Parse from start
 | |
|         psr.setpos(start)
 | |
|         thing: parser.InstDef | parser.Macro | parser.Pseudo | parser.Family | None
 | |
|         thing_first_token = psr.peek()
 | |
|         while thing := psr.definition():
 | |
|             if ws := [w for w in RESERVED_WORDS if variable_used(thing, w)]:
 | |
|                 self.error(f"'{ws[0]}' is a reserved word. {RESERVED_WORDS[ws[0]]}", thing)
 | |
| 
 | |
|             match thing:
 | |
|                 case parser.InstDef(name=name):
 | |
|                     if name in self.instrs:
 | |
|                         if not thing.override:
 | |
|                             raise psr.make_syntax_error(
 | |
|                                 f"Duplicate definition of '{name}' @ {thing.context} "
 | |
|                                 f"previous definition @ {self.instrs[name].inst.context}",
 | |
|                                 thing_first_token,
 | |
|                             )
 | |
|                         self.everything[instrs_idx[name]] = OverriddenInstructionPlaceHolder(name=name)
 | |
|                     if name not in self.instrs and thing.override:
 | |
|                         raise psr.make_syntax_error(
 | |
|                             f"Definition of '{name}' @ {thing.context} is supposed to be "
 | |
|                             "an override but no previous definition exists.",
 | |
|                             thing_first_token,
 | |
|                         )
 | |
|                     self.instrs[name] = Instruction(thing)
 | |
|                     instrs_idx[name] = len(self.everything)
 | |
|                     self.everything.append(thing)
 | |
|                 case parser.Macro(name):
 | |
|                     self.macros[name] = thing
 | |
|                     self.everything.append(thing)
 | |
|                 case parser.Family(name):
 | |
|                     self.families[name] = thing
 | |
|                 case parser.Pseudo(name):
 | |
|                     self.pseudos[name] = thing
 | |
|                     self.everything.append(thing)
 | |
|                 case _:
 | |
|                     typing.assert_never(thing)
 | |
|         if not psr.eof():
 | |
|             raise psr.make_syntax_error(f"Extra stuff at the end of {filename}")
 | |
| 
 | |
|     def analyze(self) -> None:
 | |
|         """Analyze the inputs.
 | |
| 
 | |
|         Raises SystemExit if there is an error.
 | |
|         """
 | |
|         self.analyze_macros_and_pseudos()
 | |
|         self.find_predictions()
 | |
|         self.map_families()
 | |
|         self.check_families()
 | |
| 
 | |
|     def find_predictions(self) -> None:
 | |
|         """Find the instructions that need PREDICTED() labels."""
 | |
|         for instr in self.instrs.values():
 | |
|             targets: set[str] = set()
 | |
|             for line in instr.block_text:
 | |
|                 if m := re.match(RE_PREDICTED, line):
 | |
|                     targets.add(m.group(1))
 | |
|             for target in targets:
 | |
|                 if target_instr := self.instrs.get(target):
 | |
|                     target_instr.predicted = True
 | |
|                 elif target_macro := self.macro_instrs.get(target):
 | |
|                     target_macro.predicted = True
 | |
|                 else:
 | |
|                     self.error(
 | |
|                         f"Unknown instruction {target!r} predicted in {instr.name!r}",
 | |
|                         instr.inst,  # TODO: Use better location
 | |
|                     )
 | |
| 
 | |
|     def map_families(self) -> None:
 | |
|         """Link instruction names back to their family, if they have one."""
 | |
|         for family in self.families.values():
 | |
|             for member in [family.name] + family.members:
 | |
|                 if member_instr := self.instrs.get(member):
 | |
|                     if member_instr.family not in (family, None):
 | |
|                         self.error(
 | |
|                             f"Instruction {member} is a member of multiple families "
 | |
|                             f"({member_instr.family.name}, {family.name}).",
 | |
|                             family,
 | |
|                         )
 | |
|                     else:
 | |
|                         member_instr.family = family
 | |
|                 elif not self.macro_instrs.get(member):
 | |
|                     self.error(
 | |
|                         f"Unknown instruction {member!r} referenced in family {family.name!r}",
 | |
|                         family,
 | |
|                     )
 | |
| 
 | |
|     def check_families(self) -> None:
 | |
|         """Check each family:
 | |
| 
 | |
|         - Must have at least 2 members
 | |
|         - All members must be known instructions
 | |
|         - All members must have the same cache, input and output effects
 | |
|         """
 | |
|         for family in self.families.values():
 | |
|             if family.name not in self.macro_instrs and family.name not in self.instrs:
 | |
|                 self.error(
 | |
|                     f"Family {family.name!r} has unknown instruction {family.name!r}",
 | |
|                     family,
 | |
|                 )
 | |
|             members = [
 | |
|                 member
 | |
|                 for member in family.members
 | |
|                 if member in self.instrs or member in self.macro_instrs
 | |
|             ]
 | |
|             if members != family.members:
 | |
|                 unknown = set(family.members) - set(members)
 | |
|                 self.error(
 | |
|                     f"Family {family.name!r} has unknown members: {unknown}", family
 | |
|                 )
 | |
|             expected_effects = self.effect_counts(family.name)
 | |
|             for member in members:
 | |
|                 member_effects = self.effect_counts(member)
 | |
|                 if member_effects != expected_effects:
 | |
|                     self.error(
 | |
|                         f"Family {family.name!r} has inconsistent "
 | |
|                         f"(cache, input, output) effects:\n"
 | |
|                         f"  {family.members[0]} = {expected_effects}; "
 | |
|                         f"{member} = {member_effects}",
 | |
|                         family,
 | |
|                     )
 | |
| 
 | |
|     def effect_counts(self, name: str) -> tuple[int, int, int]:
 | |
|         if instr := self.instrs.get(name):
 | |
|             cache = instr.cache_offset
 | |
|             input = len(instr.input_effects)
 | |
|             output = len(instr.output_effects)
 | |
|         elif mac := self.macro_instrs.get(name):
 | |
|             cache = mac.cache_offset
 | |
|             input, output = 0, 0
 | |
|             for part in mac.parts:
 | |
|                 if isinstance(part, Component):
 | |
|                     # A component may pop what the previous component pushed,
 | |
|                     # so we offset the input/output counts by that.
 | |
|                     delta_i = len(part.instr.input_effects)
 | |
|                     delta_o = len(part.instr.output_effects)
 | |
|                     offset = min(delta_i, output)
 | |
|                     input += delta_i - offset
 | |
|                     output += delta_o - offset
 | |
|         else:
 | |
|             assert False, f"Unknown instruction {name!r}"
 | |
|         return cache, input, output
 | |
| 
 | |
|     def analyze_macros_and_pseudos(self) -> None:
 | |
|         """Analyze each macro and pseudo instruction."""
 | |
|         self.macro_instrs = {}
 | |
|         self.pseudo_instrs = {}
 | |
|         for name, macro in self.macros.items():
 | |
|             self.macro_instrs[name] = self.analyze_macro(macro)
 | |
|         for name, pseudo in self.pseudos.items():
 | |
|             self.pseudo_instrs[name] = self.analyze_pseudo(pseudo)
 | |
| 
 | |
|     def analyze_macro(self, macro: parser.Macro) -> MacroInstruction:
 | |
|         components = self.check_macro_components(macro)
 | |
|         stack, initial_sp = self.stack_analysis(components)
 | |
|         sp = initial_sp
 | |
|         parts: MacroParts = []
 | |
|         flags = InstructionFlags.newEmpty()
 | |
|         offset = 0
 | |
|         for component in components:
 | |
|             match component:
 | |
|                 case parser.CacheEffect() as ceffect:
 | |
|                     parts.append(ceffect)
 | |
|                     offset += ceffect.size
 | |
|                 case Instruction() as instr:
 | |
|                     part, sp, offset = self.analyze_instruction(instr, stack, sp, offset)
 | |
|                     parts.append(part)
 | |
|                     flags.add(instr.instr_flags)
 | |
|                 case _:
 | |
|                     typing.assert_never(component)
 | |
|         final_sp = sp
 | |
|         format = "IB"
 | |
|         if offset:
 | |
|             format += "C" + "0"*(offset-1)
 | |
|         return MacroInstruction(
 | |
|             macro.name, stack, initial_sp, final_sp, format, flags, macro, parts, offset
 | |
|         )
 | |
| 
 | |
|     def analyze_pseudo(self, pseudo: parser.Pseudo) -> PseudoInstruction:
 | |
|         targets = [self.instrs[target] for target in pseudo.targets]
 | |
|         assert targets
 | |
|         # Make sure the targets have the same fmt
 | |
|         fmts = list(set([t.instr_fmt for t in targets]))
 | |
|         assert(len(fmts) == 1)
 | |
|         assert(len(list(set([t.instr_flags.bitmap() for t in targets]))) == 1)
 | |
|         return PseudoInstruction(pseudo.name, targets, fmts[0], targets[0].instr_flags)
 | |
| 
 | |
|     def analyze_instruction(
 | |
|         self, instr: Instruction, stack: list[StackEffect], sp: int, offset: int
 | |
|     ) -> tuple[Component, int, int]:
 | |
|         input_mapping: StackEffectMapping = []
 | |
|         for ieffect in reversed(instr.input_effects):
 | |
|             sp -= 1
 | |
|             input_mapping.append((stack[sp], ieffect))
 | |
|         output_mapping: StackEffectMapping = []
 | |
|         for oeffect in instr.output_effects:
 | |
|             output_mapping.append((stack[sp], oeffect))
 | |
|             sp += 1
 | |
|         active_effects: list[ActiveCacheEffect] = []
 | |
|         for ceffect in instr.cache_effects:
 | |
|             if ceffect.name != UNUSED:
 | |
|                 active_effects.append(ActiveCacheEffect(ceffect, offset))
 | |
|             offset += ceffect.size
 | |
|         return Component(instr, input_mapping, output_mapping, active_effects), sp, offset
 | |
| 
 | |
|     def check_macro_components(
 | |
|         self, macro: parser.Macro
 | |
|     ) -> list[InstructionOrCacheEffect]:
 | |
|         components: list[InstructionOrCacheEffect] = []
 | |
|         for uop in macro.uops:
 | |
|             match uop:
 | |
|                 case parser.OpName(name):
 | |
|                     if name not in self.instrs:
 | |
|                         self.error(f"Unknown instruction {name!r}", macro)
 | |
|                     components.append(self.instrs[name])
 | |
|                 case parser.CacheEffect():
 | |
|                     components.append(uop)
 | |
|                 case _:
 | |
|                     typing.assert_never(uop)
 | |
|         return components
 | |
| 
 | |
|     def stack_analysis(
 | |
|         self, components: typing.Iterable[InstructionOrCacheEffect]
 | |
|     ) -> tuple[list[StackEffect], int]:
 | |
|         """Analyze a macro.
 | |
| 
 | |
|         Ignore cache effects.
 | |
| 
 | |
|         Return the list of variables (as StackEffects) and the initial stack pointer.
 | |
|         """
 | |
|         lowest = current = highest = 0
 | |
|         conditions: dict[int, str] = {}  # Indexed by 'current'.
 | |
|         last_instr: Instruction | None = None
 | |
|         for thing in components:
 | |
|             if isinstance(thing, Instruction):
 | |
|                 last_instr = thing
 | |
|         for thing in components:
 | |
|             match thing:
 | |
|                 case Instruction() as instr:
 | |
|                     if any(
 | |
|                         eff.size for eff in instr.input_effects + instr.output_effects
 | |
|                     ):
 | |
|                         # TODO: Eventually this will be needed, at least for macros.
 | |
|                         self.error(
 | |
|                             f"Instruction {instr.name!r} has variable-sized stack effect, "
 | |
|                             "which are not supported in macro instructions",
 | |
|                             instr.inst,  # TODO: Pass name+location of macro
 | |
|                         )
 | |
|                     if any(eff.cond for eff in instr.input_effects):
 | |
|                         self.error(
 | |
|                             f"Instruction {instr.name!r} has conditional input stack effect, "
 | |
|                             "which are not supported in macro instructions",
 | |
|                             instr.inst,  # TODO: Pass name+location of macro
 | |
|                         )
 | |
|                     if any(eff.cond for eff in instr.output_effects) and instr is not last_instr:
 | |
|                         self.error(
 | |
|                             f"Instruction {instr.name!r} has conditional output stack effect, "
 | |
|                             "but is not the last instruction in a macro",
 | |
|                             instr.inst,  # TODO: Pass name+location of macro
 | |
|                         )
 | |
|                     current -= len(instr.input_effects)
 | |
|                     lowest = min(lowest, current)
 | |
|                     for eff in instr.output_effects:
 | |
|                         if eff.cond:
 | |
|                             conditions[current] = eff.cond
 | |
|                         current += 1
 | |
|                     highest = max(highest, current)
 | |
|                 case parser.CacheEffect():
 | |
|                     pass
 | |
|                 case _:
 | |
|                     typing.assert_never(thing)
 | |
|         # At this point, 'current' is the net stack effect,
 | |
|         # and 'lowest' and 'highest' are the extremes.
 | |
|         # Note that 'lowest' may be negative.
 | |
|         stack = [
 | |
|             StackEffect(f"_tmp_{i}", "", conditions.get(highest - i, ""))
 | |
|             for i in reversed(range(1, highest - lowest + 1))
 | |
|         ]
 | |
|         return stack, -lowest
 | |
| 
 | |
|     def get_stack_effect_info(
 | |
|         self, thing: parser.InstDef | parser.Macro | parser.Pseudo
 | |
|     ) -> tuple[AnyInstruction | None, str | None, str | None]:
 | |
|         def effect_str(effects: list[StackEffect]) -> str:
 | |
|             n_effect, sym_effect = list_effect_size(effects)
 | |
|             if sym_effect:
 | |
|                 return f"{sym_effect} + {n_effect}" if n_effect else sym_effect
 | |
|             return str(n_effect)
 | |
| 
 | |
|         instr: AnyInstruction | None
 | |
|         match thing:
 | |
|             case parser.InstDef():
 | |
|                 if thing.kind != "op":
 | |
|                     instr = self.instrs[thing.name]
 | |
|                     popped = effect_str(instr.input_effects)
 | |
|                     pushed = effect_str(instr.output_effects)
 | |
|                 else:
 | |
|                     instr = None
 | |
|                     popped = ""
 | |
|                     pushed = ""
 | |
|             case parser.Macro():
 | |
|                 instr = self.macro_instrs[thing.name]
 | |
|                 parts = [comp for comp in instr.parts if isinstance(comp, Component)]
 | |
|                 # Note: stack_analysis() already verifies that macro components
 | |
|                 # have no variable-sized stack effects.
 | |
|                 low = 0
 | |
|                 sp = 0
 | |
|                 high = 0
 | |
|                 pushed_symbolic: list[str] = []
 | |
|                 for comp in parts:
 | |
|                     for effect in comp.instr.input_effects:
 | |
|                         assert not effect.cond, effect
 | |
|                         assert not effect.size, effect
 | |
|                         sp -= 1
 | |
|                         low = min(low, sp)
 | |
|                     for effect in comp.instr.output_effects:
 | |
|                         assert not effect.size, effect
 | |
|                         if effect.cond:
 | |
|                             if effect.cond in ("0", "1"):
 | |
|                                 pushed_symbolic.append(effect.cond)
 | |
|                             else:
 | |
|                                 pushed_symbolic.append(maybe_parenthesize(f"{maybe_parenthesize(effect.cond)} ? 1 : 0"))
 | |
|                         sp += 1
 | |
|                         high = max(sp, high)
 | |
|                 if high != max(0, sp):
 | |
|                     # If you get this, intermediate stack growth occurs,
 | |
|                     # and stack size calculations may go awry.
 | |
|                     # E.g. [push, pop]. The fix would be for stack size
 | |
|                     # calculations to use the micro ops.
 | |
|                     self.error("Macro has virtual stack growth", thing)
 | |
|                 popped = str(-low)
 | |
|                 pushed_symbolic.append(str(sp - low - len(pushed_symbolic)))
 | |
|                 pushed = " + ".join(pushed_symbolic)
 | |
|             case parser.Pseudo():
 | |
|                 instr = self.pseudo_instrs[thing.name]
 | |
|                 popped = pushed = None
 | |
|                 # Calculate stack effect, and check that it's the the same
 | |
|                 # for all targets.
 | |
|                 for target in self.pseudos[thing.name].targets:
 | |
|                     target_instr = self.instrs.get(target)
 | |
|                     # Currently target is always an instr. This could change
 | |
|                     # in the future, e.g., if we have a pseudo targetting a
 | |
|                     # macro instruction.
 | |
|                     assert target_instr
 | |
|                     target_popped = effect_str(target_instr.input_effects)
 | |
|                     target_pushed = effect_str(target_instr.output_effects)
 | |
|                     if popped is None and pushed is None:
 | |
|                         popped, pushed = target_popped, target_pushed
 | |
|                     else:
 | |
|                         assert popped == target_popped
 | |
|                         assert pushed == target_pushed
 | |
|             case _:
 | |
|                 typing.assert_never(thing)
 | |
|         return instr, popped, pushed
 | |
| 
 | |
|     def write_stack_effect_functions(self) -> None:
 | |
|         popped_data: list[tuple[AnyInstruction, str]] = []
 | |
|         pushed_data: list[tuple[AnyInstruction, str]] = []
 | |
|         for thing in self.everything:
 | |
|             if isinstance(thing, OverriddenInstructionPlaceHolder):
 | |
|                 continue
 | |
|             instr, popped, pushed = self.get_stack_effect_info(thing)
 | |
|             if instr is not None:
 | |
|                 assert popped is not None and pushed is not None
 | |
|                 popped_data.append((instr, popped))
 | |
|                 pushed_data.append((instr, pushed))
 | |
| 
 | |
|         def write_function(
 | |
|             direction: str, data: list[tuple[AnyInstruction, str]]
 | |
|         ) -> None:
 | |
|             self.out.emit("")
 | |
|             self.out.emit("#ifndef NEED_OPCODE_METADATA")
 | |
|             self.out.emit(f"extern int _PyOpcode_num_{direction}(int opcode, int oparg, bool jump);")
 | |
|             self.out.emit("#else")
 | |
|             self.out.emit("int")
 | |
|             self.out.emit(f"_PyOpcode_num_{direction}(int opcode, int oparg, bool jump) {{")
 | |
|             self.out.emit("    switch(opcode) {")
 | |
|             for instr, effect in data:
 | |
|                 self.out.emit(f"        case {instr.name}:")
 | |
|                 self.out.emit(f"            return {effect};")
 | |
|             self.out.emit("        default:")
 | |
|             self.out.emit("            return -1;")
 | |
|             self.out.emit("    }")
 | |
|             self.out.emit("}")
 | |
|             self.out.emit("#endif")
 | |
| 
 | |
|         write_function("popped", popped_data)
 | |
|         write_function("pushed", pushed_data)
 | |
|         self.out.emit("")
 | |
| 
 | |
|     def from_source_files(self) -> str:
 | |
|         paths = f"\n{self.out.comment}   ".join(
 | |
|             prettify_filename(filename)
 | |
|             for filename in self.input_filenames
 | |
|         )
 | |
|         return f"{self.out.comment} from:\n{self.out.comment}   {paths}\n"
 | |
| 
 | |
|     def write_provenance_header(self):
 | |
|         self.out.write_raw(f"{self.out.comment} This file is generated by {THIS}\n")
 | |
|         self.out.write_raw(self.from_source_files())
 | |
|         self.out.write_raw(f"{self.out.comment} Do not edit!\n")
 | |
| 
 | |
|     def write_metadata(self) -> None:
 | |
|         """Write instruction metadata to output file."""
 | |
| 
 | |
|         # Compute the set of all instruction formats.
 | |
|         all_formats: set[str] = set()
 | |
|         for thing in self.everything:
 | |
|             match thing:
 | |
|                 case OverriddenInstructionPlaceHolder():
 | |
|                     continue
 | |
|                 case parser.InstDef():
 | |
|                     format = self.instrs[thing.name].instr_fmt
 | |
|                 case parser.Macro():
 | |
|                     format = self.macro_instrs[thing.name].instr_fmt
 | |
|                 case parser.Pseudo():
 | |
|                     format = None
 | |
|                     for target in self.pseudos[thing.name].targets:
 | |
|                         target_instr = self.instrs.get(target)
 | |
|                         assert target_instr
 | |
|                         if format is None:
 | |
|                             format = target_instr.instr_fmt
 | |
|                         else:
 | |
|                             assert format == target_instr.instr_fmt
 | |
|                 case _:
 | |
|                     typing.assert_never(thing)
 | |
|             all_formats.add(format)
 | |
|         # Turn it into a list of enum definitions.
 | |
|         format_enums = [INSTR_FMT_PREFIX + format for format in sorted(all_formats)]
 | |
| 
 | |
|         with open(self.metadata_filename, "w") as f:
 | |
|             # Create formatter
 | |
|             self.out = Formatter(f, 0)
 | |
| 
 | |
|             self.write_provenance_header()
 | |
| 
 | |
|             self.out.emit("\n#include <stdbool.h>")
 | |
| 
 | |
|             self.write_pseudo_instrs()
 | |
| 
 | |
|             self.out.emit("")
 | |
|             self.write_uop_items(lambda name, counter: f"#define {name} {counter}")
 | |
| 
 | |
|             self.write_stack_effect_functions()
 | |
| 
 | |
|             # Write type definitions
 | |
|             self.out.emit(f"enum InstructionFormat {{ {', '.join(format_enums)} }};")
 | |
| 
 | |
|             self.out.emit("")
 | |
|             self.out.emit(
 | |
|                 "#define IS_VALID_OPCODE(OP) \\\n"
 | |
|                 "    (((OP) >= 0) && ((OP) < OPCODE_METADATA_SIZE) && \\\n"
 | |
|                 "     (_PyOpcode_opcode_metadata[(OP)].valid_entry))")
 | |
| 
 | |
|             self.out.emit("")
 | |
|             InstructionFlags.emit_macros(self.out)
 | |
| 
 | |
|             self.out.emit("")
 | |
|             with self.out.block("struct opcode_metadata", ";"):
 | |
|                 self.out.emit("bool valid_entry;")
 | |
|                 self.out.emit("enum InstructionFormat instr_format;")
 | |
|                 self.out.emit("int flags;")
 | |
|             self.out.emit("")
 | |
| 
 | |
|             with self.out.block("struct opcode_macro_expansion", ";"):
 | |
|                 self.out.emit("int nuops;")
 | |
|                 self.out.emit("struct { int16_t uop; int8_t size; int8_t offset; } uops[8];")
 | |
|             self.out.emit("")
 | |
| 
 | |
|             for key, value in OPARG_SIZES.items():
 | |
|                 self.out.emit(f"#define {key} {value}")
 | |
|             self.out.emit("")
 | |
| 
 | |
|             self.out.emit("#define OPCODE_METADATA_FMT(OP) "
 | |
|                           "(_PyOpcode_opcode_metadata[(OP)].instr_format)")
 | |
|             self.out.emit("#define SAME_OPCODE_METADATA(OP1, OP2) \\")
 | |
|             self.out.emit("        (OPCODE_METADATA_FMT(OP1) == OPCODE_METADATA_FMT(OP2))")
 | |
|             self.out.emit("")
 | |
| 
 | |
|             # Write metadata array declaration
 | |
|             self.out.emit("#define OPCODE_METADATA_SIZE 512")
 | |
|             self.out.emit("#define OPCODE_UOP_NAME_SIZE 512")
 | |
|             self.out.emit("#define OPCODE_MACRO_EXPANSION_SIZE 256")
 | |
|             self.out.emit("")
 | |
|             self.out.emit("#ifndef NEED_OPCODE_METADATA")
 | |
|             self.out.emit("extern const struct opcode_metadata "
 | |
|                           "_PyOpcode_opcode_metadata[OPCODE_METADATA_SIZE];")
 | |
|             self.out.emit("extern const struct opcode_macro_expansion "
 | |
|                           "_PyOpcode_macro_expansion[OPCODE_MACRO_EXPANSION_SIZE];")
 | |
|             self.out.emit("extern const char * const _PyOpcode_uop_name[OPCODE_UOP_NAME_SIZE];")
 | |
|             self.out.emit("#else // if NEED_OPCODE_METADATA")
 | |
| 
 | |
|             self.out.emit("const struct opcode_metadata "
 | |
|                           "_PyOpcode_opcode_metadata[OPCODE_METADATA_SIZE] = {")
 | |
| 
 | |
|             # Write metadata for each instruction
 | |
|             for thing in self.everything:
 | |
|                 match thing:
 | |
|                     case OverriddenInstructionPlaceHolder():
 | |
|                         continue
 | |
|                     case parser.InstDef():
 | |
|                         if thing.kind != "op":
 | |
|                             self.write_metadata_for_inst(self.instrs[thing.name])
 | |
|                     case parser.Macro():
 | |
|                         self.write_metadata_for_macro(self.macro_instrs[thing.name])
 | |
|                     case parser.Pseudo():
 | |
|                         self.write_metadata_for_pseudo(self.pseudo_instrs[thing.name])
 | |
|                     case _:
 | |
|                         typing.assert_never(thing)
 | |
| 
 | |
|             # Write end of array
 | |
|             self.out.emit("};")
 | |
| 
 | |
|             with self.out.block(
 | |
|                 "const struct opcode_macro_expansion "
 | |
|                 "_PyOpcode_macro_expansion[OPCODE_MACRO_EXPANSION_SIZE] =",
 | |
|                 ";",
 | |
|             ):
 | |
|                 # Write macro expansion for each non-pseudo instruction
 | |
|                 for thing in self.everything:
 | |
|                     match thing:
 | |
|                         case OverriddenInstructionPlaceHolder():
 | |
|                             pass
 | |
|                         case parser.InstDef(name=name):
 | |
|                             instr = self.instrs[name]
 | |
|                             # Since an 'op' is not a bytecode, it has no expansion; but 'inst' is
 | |
|                             if instr.kind == "inst" and instr.is_viable_uop():
 | |
|                                 # Construct a dummy Component -- input/output mappings are not used
 | |
|                                 part = Component(instr, [], [], instr.active_caches)
 | |
|                                 self.write_macro_expansions(instr.name, [part])
 | |
|                             elif instr.kind == "inst" and variable_used(instr.inst, "oparg1"):
 | |
|                                 assert variable_used(instr.inst, "oparg2"), "Half super-instr?"
 | |
|                                 self.write_super_expansions(instr.name)
 | |
|                         case parser.Macro():
 | |
|                             mac = self.macro_instrs[thing.name]
 | |
|                             self.write_macro_expansions(mac.name, mac.parts)
 | |
|                         case parser.Pseudo():
 | |
|                             pass
 | |
|                         case _:
 | |
|                             typing.assert_never(thing)
 | |
| 
 | |
|             with self.out.block("const char * const _PyOpcode_uop_name[OPCODE_UOP_NAME_SIZE] =", ";"):
 | |
|                 self.write_uop_items(lambda name, counter: f"[{name}] = \"{name}\",")
 | |
| 
 | |
|             self.out.emit("#endif // NEED_OPCODE_METADATA")
 | |
| 
 | |
|         with open(self.pymetadata_filename, "w") as f:
 | |
|             # Create formatter
 | |
|             self.out = Formatter(f, 0, comment = "#")
 | |
| 
 | |
|             self.write_provenance_header()
 | |
| 
 | |
|             self.out.emit("")
 | |
|             self.out.emit("_specializations = {")
 | |
|             for name, family in self.families.items():
 | |
|                 with self.out.indent():
 | |
|                     self.out.emit(f"\"{family.name}\": [")
 | |
|                     with self.out.indent():
 | |
|                         for m in family.members:
 | |
|                             self.out.emit(f"\"{m}\",")
 | |
|                     self.out.emit(f"],")
 | |
|             self.out.emit("}")
 | |
| 
 | |
|             # Handle special case
 | |
|             self.out.emit("")
 | |
|             self.out.emit("# An irregular case:")
 | |
|             self.out.emit(
 | |
|                 "_specializations[\"BINARY_OP\"].append("
 | |
|                     "\"BINARY_OP_INPLACE_ADD_UNICODE\")")
 | |
| 
 | |
|             # Make list of specialized instructions
 | |
|             self.out.emit("")
 | |
|             self.out.emit(
 | |
|                 "_specialized_instructions = ["
 | |
|                     "opcode for family in _specializations.values() for opcode in family"
 | |
|                 "]")
 | |
| 
 | |
|     def write_pseudo_instrs(self) -> None:
 | |
|         """Write the IS_PSEUDO_INSTR macro"""
 | |
|         self.out.emit("\n\n#define IS_PSEUDO_INSTR(OP)  ( \\")
 | |
|         for op in self.pseudos:
 | |
|             self.out.emit(f"    ((OP) == {op}) || \\")
 | |
|         self.out.emit(f"    0)")
 | |
| 
 | |
|     def write_uop_items(self, make_text: typing.Callable[[str, int], str]) -> None:
 | |
|         """Write '#define XXX NNN' for each uop"""
 | |
|         counter = 300  # TODO: Avoid collision with pseudo instructions
 | |
|         seen = set()
 | |
| 
 | |
|         def add(name: str) -> None:
 | |
|             if name in seen:
 | |
|                 return
 | |
|             nonlocal counter
 | |
|             self.out.emit(make_text(name, counter))
 | |
|             counter += 1
 | |
|             seen.add(name)
 | |
| 
 | |
|         # These two are first by convention
 | |
|         add("EXIT_TRACE")
 | |
|         add("SAVE_IP")
 | |
| 
 | |
|         for instr in self.instrs.values():
 | |
|             if instr.kind == "op" and instr.is_viable_uop():
 | |
|                 add(instr.name)
 | |
| 
 | |
|     def write_macro_expansions(self, name: str, parts: MacroParts) -> None:
 | |
|         """Write the macro expansions for a macro-instruction."""
 | |
|         # TODO: Refactor to share code with write_cody(), is_viaible_uop(), etc.
 | |
|         offset = 0  # Cache effect offset
 | |
|         expansions: list[tuple[str, int, int]] = []  # [(name, size, offset), ...]
 | |
|         for part in parts:
 | |
|             if isinstance(part, Component):
 | |
|                 # All component instructions must be viable uops
 | |
|                 if not part.instr.is_viable_uop():
 | |
|                     print(f"NOTE: Part {part.instr.name} of {name} is not a viable uop")
 | |
|                     return
 | |
|                 if part.instr.instr_flags.HAS_ARG_FLAG or not part.active_caches:
 | |
|                     size, offset = OPARG_SIZES["OPARG_FULL"], 0
 | |
|                 else:
 | |
|                     # If this assert triggers, is_viable_uops() lied
 | |
|                     assert len(part.active_caches) == 1, (name, part.instr.name)
 | |
|                     cache = part.active_caches[0]
 | |
|                     size, offset = cache.effect.size, cache.offset
 | |
|                 expansions.append((part.instr.name, size, offset))
 | |
|         assert len(expansions) > 0, f"Macro {name} has empty expansion?!"
 | |
|         self.write_expansions(name, expansions)
 | |
| 
 | |
|     def write_super_expansions(self, name: str) -> None:
 | |
|         """Write special macro expansions for super-instructions.
 | |
| 
 | |
|         If you get an assertion failure here, you probably have accidentally
 | |
|         violated one of the assumptions here.
 | |
| 
 | |
|         - A super-instruction's name is of the form FIRST_SECOND where
 | |
|           FIRST and SECOND are regular instructions whose name has the
 | |
|           form FOO_BAR. Thus, there must be exactly 3 underscores.
 | |
|           Example: LOAD_CONST_STORE_FAST.
 | |
| 
 | |
|         - A super-instruction's body uses `oparg1 and `oparg2`, and no
 | |
|           other instruction's body uses those variable names.
 | |
| 
 | |
|         - A super-instruction has no active (used) cache entries.
 | |
| 
 | |
|         In the expansion, the first instruction's operand is all but the
 | |
|         bottom 4 bits of the super-instruction's oparg, and the second
 | |
|         instruction's operand is the bottom 4 bits. We use the special
 | |
|         size codes OPARG_TOP and OPARG_BOTTOM for these.
 | |
|         """
 | |
|         pieces = name.split("_")
 | |
|         assert len(pieces) == 4, f"{name} doesn't look like a super-instr"
 | |
|         name1 = "_".join(pieces[:2])
 | |
|         name2 = "_".join(pieces[2:])
 | |
|         assert name1 in self.instrs, f"{name1} doesn't match any instr"
 | |
|         assert name2 in self.instrs, f"{name2} doesn't match any instr"
 | |
|         instr1 = self.instrs[name1]
 | |
|         instr2 = self.instrs[name2]
 | |
|         assert not instr1.active_caches, f"{name1} has active caches"
 | |
|         assert not instr2.active_caches, f"{name2} has active caches"
 | |
|         expansions = [
 | |
|             (name1, OPARG_SIZES["OPARG_TOP"], 0),
 | |
|             (name2, OPARG_SIZES["OPARG_BOTTOM"], 0),
 | |
|         ]
 | |
|         self.write_expansions(name, expansions)
 | |
| 
 | |
|     def write_expansions(self, name: str, expansions: list[tuple[str, int, int]]) -> None:
 | |
|         pieces = [f"{{ {name}, {size}, {offset} }}" for name, size, offset in expansions]
 | |
|         self.out.emit(
 | |
|             f"[{name}] = "
 | |
|             f"{{ .nuops = {len(pieces)}, .uops = {{ {', '.join(pieces)} }} }},"
 | |
|         )
 | |
| 
 | |
|     def emit_metadata_entry(
 | |
|         self, name: str, fmt: str, flags: InstructionFlags
 | |
|     ) -> None:
 | |
|         flag_names = flags.names(value=True)
 | |
|         if not flag_names:
 | |
|             flag_names.append("0")
 | |
|         self.out.emit(
 | |
|             f"    [{name}] = {{ true, {INSTR_FMT_PREFIX}{fmt},"
 | |
|             f" {' | '.join(flag_names)} }},"
 | |
|         )
 | |
| 
 | |
|     def write_metadata_for_inst(self, instr: Instruction) -> None:
 | |
|         """Write metadata for a single instruction."""
 | |
|         self.emit_metadata_entry(instr.name, instr.instr_fmt, instr.instr_flags)
 | |
| 
 | |
|     def write_metadata_for_macro(self, mac: MacroInstruction) -> None:
 | |
|         """Write metadata for a macro-instruction."""
 | |
|         self.emit_metadata_entry(mac.name, mac.instr_fmt, mac.instr_flags)
 | |
| 
 | |
|     def write_metadata_for_pseudo(self, ps: PseudoInstruction) -> None:
 | |
|         """Write metadata for a macro-instruction."""
 | |
|         self.emit_metadata_entry(ps.name, ps.instr_fmt, ps.instr_flags)
 | |
| 
 | |
|     def write_instructions(self) -> None:
 | |
|         """Write instructions to output file."""
 | |
|         with open(self.output_filename, "w") as f:
 | |
|             # Create formatter
 | |
|             self.out = Formatter(f, 8, self.emit_line_directives)
 | |
| 
 | |
|             self.write_provenance_header()
 | |
| 
 | |
|             # Write and count instructions of all kinds
 | |
|             n_instrs = 0
 | |
|             n_macros = 0
 | |
|             n_pseudos = 0
 | |
|             for thing in self.everything:
 | |
|                 match thing:
 | |
|                     case OverriddenInstructionPlaceHolder():
 | |
|                         self.write_overridden_instr_place_holder(thing)
 | |
|                     case parser.InstDef():
 | |
|                         if thing.kind != "op":
 | |
|                             n_instrs += 1
 | |
|                             self.write_instr(self.instrs[thing.name])
 | |
|                     case parser.Macro():
 | |
|                         n_macros += 1
 | |
|                         self.write_macro(self.macro_instrs[thing.name])
 | |
|                     case parser.Pseudo():
 | |
|                         n_pseudos += 1
 | |
|                     case _:
 | |
|                         typing.assert_never(thing)
 | |
| 
 | |
|         print(
 | |
|             f"Wrote {n_instrs} instructions, {n_macros} macros, "
 | |
|             f"and {n_pseudos} pseudos to {self.output_filename}",
 | |
|             file=sys.stderr,
 | |
|         )
 | |
| 
 | |
|     def write_executor_instructions(self) -> None:
 | |
|         """Generate cases for the Tier 2 interpreter."""
 | |
|         with open(self.executor_filename, "w") as f:
 | |
|             self.out = Formatter(f, 8, self.emit_line_directives)
 | |
|             self.write_provenance_header()
 | |
|             for thing in self.everything:
 | |
|                 match thing:
 | |
|                     case OverriddenInstructionPlaceHolder():
 | |
|                         # TODO: Is this helpful?
 | |
|                         self.write_overridden_instr_place_holder(thing)
 | |
|                     case parser.InstDef():
 | |
|                         instr = self.instrs[thing.name]
 | |
|                         if instr.is_viable_uop():
 | |
|                             self.out.emit("")
 | |
|                             with self.out.block(f"case {thing.name}:"):
 | |
|                                 instr.write(self.out, tier=TIER_TWO)
 | |
|                                 self.out.emit("break;")
 | |
|                     case parser.Macro():
 | |
|                         pass
 | |
|                     case parser.Pseudo():
 | |
|                         pass
 | |
|                     case _:
 | |
|                         typing.assert_never(thing)
 | |
|         print(
 | |
|             f"Wrote some stuff to {self.executor_filename}",
 | |
|             file=sys.stderr,
 | |
|         )
 | |
| 
 | |
|     def write_overridden_instr_place_holder(self,
 | |
|             place_holder: OverriddenInstructionPlaceHolder) -> None:
 | |
|         self.out.emit("")
 | |
|         self.out.emit(
 | |
|             f"{self.out.comment} TARGET({place_holder.name}) overridden by later definition")
 | |
| 
 | |
|     def write_instr(self, instr: Instruction) -> None:
 | |
|         name = instr.name
 | |
|         self.out.emit("")
 | |
|         if instr.inst.override:
 | |
|             self.out.emit("{self.out.comment} Override")
 | |
|         with self.out.block(f"TARGET({name})"):
 | |
|             if instr.predicted:
 | |
|                 self.out.emit(f"PREDICTED({name});")
 | |
|             instr.write(self.out)
 | |
|             if not instr.always_exits:
 | |
|                 if instr.check_eval_breaker:
 | |
|                     self.out.emit("CHECK_EVAL_BREAKER();")
 | |
|                 self.out.emit(f"DISPATCH();")
 | |
| 
 | |
|     def write_macro(self, mac: MacroInstruction) -> None:
 | |
|         """Write code for a macro instruction."""
 | |
|         last_instr: Instruction | None = None
 | |
|         with self.wrap_macro(mac):
 | |
|             cache_adjust = 0
 | |
|             for part in mac.parts:
 | |
|                 match part:
 | |
|                     case parser.CacheEffect(size=size):
 | |
|                         cache_adjust += size
 | |
|                     case Component() as comp:
 | |
|                         last_instr = comp.instr
 | |
|                         comp.write_body(self.out)
 | |
|                         cache_adjust += comp.instr.cache_offset
 | |
| 
 | |
|             if cache_adjust:
 | |
|                 self.out.emit(f"next_instr += {cache_adjust};")
 | |
| 
 | |
|             if (
 | |
|                 (family := self.families.get(mac.name))
 | |
|                 and mac.name == family.name
 | |
|                 and (cache_size := family.size)
 | |
|             ):
 | |
|                 self.out.emit(
 | |
|                     f"static_assert({cache_size} == "
 | |
|                     f'{cache_adjust}, "incorrect cache size");'
 | |
|                 )
 | |
| 
 | |
|     @contextlib.contextmanager
 | |
|     def wrap_macro(self, mac: MacroInstruction):
 | |
|         """Boilerplate for macro instructions."""
 | |
|         # TODO: Somewhere (where?) make it so that if one instruction
 | |
|         # has an output that is input to another, and the variable names
 | |
|         # and types match and don't conflict with other instructions,
 | |
|         # that variable is declared with the right name and type in the
 | |
|         # outer block, rather than trusting the compiler to optimize it.
 | |
|         self.out.emit("")
 | |
|         with self.out.block(f"TARGET({mac.name})"):
 | |
|             if mac.predicted:
 | |
|                 self.out.emit(f"PREDICTED({mac.name});")
 | |
| 
 | |
|             # The input effects should have no conditionals.
 | |
|             # Only the output effects do (for now).
 | |
|             ieffects = [
 | |
|                 StackEffect(eff.name, eff.type) if eff.cond else eff
 | |
|                 for eff in mac.stack
 | |
|             ]
 | |
| 
 | |
|             for i, var in reversed(list(enumerate(ieffects))):
 | |
|                 src = None
 | |
|                 if i < mac.initial_sp:
 | |
|                     src = StackEffect(f"stack_pointer[-{mac.initial_sp - i}]", "")
 | |
|                 self.out.declare(var, src)
 | |
| 
 | |
|             yield
 | |
| 
 | |
|             self.out.stack_adjust(ieffects[:mac.initial_sp], mac.stack[:mac.final_sp])
 | |
| 
 | |
|             for i, var in enumerate(reversed(mac.stack[: mac.final_sp]), 1):
 | |
|                 dst = StackEffect(f"stack_pointer[-{i}]", "")
 | |
|                 self.out.assign(dst, var)
 | |
| 
 | |
|             self.out.emit(f"DISPATCH();")
 | |
| 
 | |
| 
 | |
| def prettify_filename(filename: str) -> str:
 | |
|     # Make filename more user-friendly and less platform-specific,
 | |
|     # it is only used for error reporting at this point.
 | |
|     filename = filename.replace("\\", "/")
 | |
|     if filename.startswith("./"):
 | |
|         filename = filename[2:]
 | |
|     if filename.endswith(".new"):
 | |
|         filename = filename[:-4]
 | |
|     return filename
 | |
| 
 | |
| 
 | |
| def extract_block_text(block: parser.Block) -> tuple[list[str], bool, int]:
 | |
|     # Get lines of text with proper dedent
 | |
|     blocklines = block.text.splitlines(True)
 | |
|     first_token: lx.Token = block.tokens[0]  # IndexError means the context is broken
 | |
|     block_line = first_token.begin[0]
 | |
| 
 | |
|     # Remove blank lines from both ends
 | |
|     while blocklines and not blocklines[0].strip():
 | |
|         blocklines.pop(0)
 | |
|         block_line += 1
 | |
|     while blocklines and not blocklines[-1].strip():
 | |
|         blocklines.pop()
 | |
| 
 | |
|     # Remove leading and trailing braces
 | |
|     assert blocklines and blocklines[0].strip() == "{"
 | |
|     assert blocklines and blocklines[-1].strip() == "}"
 | |
|     blocklines.pop()
 | |
|     blocklines.pop(0)
 | |
|     block_line += 1
 | |
| 
 | |
|     # Remove trailing blank lines
 | |
|     while blocklines and not blocklines[-1].strip():
 | |
|         blocklines.pop()
 | |
| 
 | |
|     # Separate CHECK_EVAL_BREAKER() macro from end
 | |
|     check_eval_breaker = \
 | |
|         blocklines != [] and blocklines[-1].strip() == "CHECK_EVAL_BREAKER();"
 | |
|     if check_eval_breaker:
 | |
|         del blocklines[-1]
 | |
| 
 | |
|     return blocklines, check_eval_breaker, block_line
 | |
| 
 | |
| 
 | |
| def always_exits(lines: list[str]) -> bool:
 | |
|     """Determine whether a block always ends in a return/goto/etc."""
 | |
|     if not lines:
 | |
|         return False
 | |
|     line = lines[-1].rstrip()
 | |
|     # Indent must match exactly (TODO: Do something better)
 | |
|     if line[:12] != " " * 12:
 | |
|         return False
 | |
|     line = line[12:]
 | |
|     return line.startswith(
 | |
|         (
 | |
|             "goto ",
 | |
|             "return ",
 | |
|             "DISPATCH",
 | |
|             "GO_TO_",
 | |
|             "Py_UNREACHABLE()",
 | |
|             "ERROR_IF(true, ",
 | |
|         )
 | |
|     )
 | |
| 
 | |
| 
 | |
| def variable_used(node: parser.Node, name: str) -> bool:
 | |
|     """Determine whether a variable with a given name is used in a node."""
 | |
|     return any(
 | |
|         token.kind == "IDENTIFIER" and token.text == name for token in node.tokens
 | |
|     )
 | |
| 
 | |
| 
 | |
| def variable_used_unspecialized(node: parser.Node, name: str) -> bool:
 | |
|     """Like variable_used(), but skips #if ENABLE_SPECIALIZATION blocks."""
 | |
|     tokens: list[lx.Token] = []
 | |
|     skipping = False
 | |
|     for i, token in enumerate(node.tokens):
 | |
|         if token.kind == "MACRO":
 | |
|             text = "".join(token.text.split())
 | |
|             # TODO: Handle nested #if
 | |
|             if text == "#if":
 | |
|                 if (
 | |
|                     i + 1 < len(node.tokens)
 | |
|                     and node.tokens[i + 1].text == "ENABLE_SPECIALIZATION"
 | |
|                 ):
 | |
|                     skipping = True
 | |
|             elif text in ("#else", "#endif"):
 | |
|                 skipping = False
 | |
|         if not skipping:
 | |
|             tokens.append(token)
 | |
|     return any(token.kind == "IDENTIFIER" and token.text == name for token in tokens)
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     """Parse command line, parse input, analyze, write output."""
 | |
|     args = arg_parser.parse_args()  # Prints message and sys.exit(2) on error
 | |
|     if len(args.input) == 0:
 | |
|         args.input.append(DEFAULT_INPUT)
 | |
| 
 | |
|     # Raises OSError if input unreadable
 | |
|     a = Analyzer(args.input, args.output, args.metadata, args.pymetadata, args.executor_cases)
 | |
| 
 | |
|     if args.emit_line_directives:
 | |
|         a.emit_line_directives = True
 | |
|     a.parse()  # Raises SyntaxError on failure
 | |
|     a.analyze()  # Prints messages and sets a.errors on failure
 | |
|     if a.errors:
 | |
|         sys.exit(f"Found {a.errors} errors")
 | |
|     a.write_instructions()  # Raises OSError if output can't be written
 | |
|     a.write_metadata()
 | |
|     a.write_executor_instructions()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | 
