"""Thin Python wrapper around C binary writer for profiling data.""" import time import _remote_debugging from .collector import Collector # Compression type constants (must match binary_io.h) COMPRESSION_NONE = 0 COMPRESSION_ZSTD = 1 def _resolve_compression(compression): """Resolve compression type from string or int. Args: compression: 'auto', 'zstd', 'none', or int (0/1) Returns: int: Compression type constant """ if isinstance(compression, int): return compression compression = compression.lower() if compression == 'none': return COMPRESSION_NONE elif compression == 'zstd': return COMPRESSION_ZSTD elif compression == 'auto': # Auto: use zstd if available, otherwise none if _remote_debugging.zstd_available(): return COMPRESSION_ZSTD return COMPRESSION_NONE else: raise ValueError(f"Unknown compression type: {compression}") class BinaryCollector(Collector): """High-performance binary collector using C implementation. This collector writes profiling data directly to a binary file format with optional zstd compression. All I/O is performed in C for maximum throughput. The binary format uses string/frame deduplication and varint encoding for efficient storage. """ def __init__(self, filename, sample_interval_usec, *, skip_idle=False, compression='auto'): """Create a new binary collector. Args: filename: Path to output binary file sample_interval_usec: Sampling interval in microseconds skip_idle: If True, skip idle threads (not used in binary format) compression: 'auto', 'zstd', 'none', or int (0=none, 1=zstd) """ self.filename = filename self.sample_interval_usec = sample_interval_usec self.skip_idle = skip_idle compression_type = _resolve_compression(compression) start_time_us = int(time.monotonic() * 1_000_000) self._writer = _remote_debugging.BinaryWriter( filename, sample_interval_usec, start_time_us, compression=compression_type ) def collect(self, stack_frames, timestamp_us=None): """Collect profiling data from stack frames. This passes stack_frames directly to the C writer which handles all encoding and buffering. Args: stack_frames: List of InterpreterInfo objects from _remote_debugging timestamp_us: Optional timestamp in microseconds. If not provided, uses time.monotonic() to generate one. """ if timestamp_us is None: timestamp_us = int(time.monotonic() * 1_000_000) self._writer.write_sample(stack_frames, timestamp_us) def collect_failed_sample(self): """Record a failed sample attempt (no-op for binary format).""" pass def export(self, filename=None): """Finalize and close the binary file. Args: filename: Ignored (binary files are written incrementally) """ self._writer.finalize() @property def total_samples(self): return self._writer.total_samples def get_stats(self): """Get encoding statistics. Returns: Dict with encoding statistics including repeat/full/suffix/pop-push record counts, frames written/saved, and compression ratio. """ return self._writer.get_stats() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - finalize unless there was an error.""" if exc_type is None: self._writer.finalize() else: self._writer.close() return False