mirror of
https://github.com/python/cpython.git
synced 2025-12-31 20:43:36 +00:00
120 lines
3.7 KiB
Python
120 lines
3.7 KiB
Python
"""Thin Python wrapper around C binary writer for profiling data."""
|
|
|
|
import time
|
|
|
|
import _remote_debugging
|
|
|
|
from .collector import Collector
|
|
|
|
# Compression type constants (must match binary_io.h)
|
|
COMPRESSION_NONE = 0
|
|
COMPRESSION_ZSTD = 1
|
|
|
|
|
|
def _resolve_compression(compression):
|
|
"""Resolve compression type from string or int.
|
|
|
|
Args:
|
|
compression: 'auto', 'zstd', 'none', or int (0/1)
|
|
|
|
Returns:
|
|
int: Compression type constant
|
|
"""
|
|
if isinstance(compression, int):
|
|
return compression
|
|
|
|
compression = compression.lower()
|
|
if compression == 'none':
|
|
return COMPRESSION_NONE
|
|
elif compression == 'zstd':
|
|
return COMPRESSION_ZSTD
|
|
elif compression == 'auto':
|
|
# Auto: use zstd if available, otherwise none
|
|
if _remote_debugging.zstd_available():
|
|
return COMPRESSION_ZSTD
|
|
return COMPRESSION_NONE
|
|
else:
|
|
raise ValueError(f"Unknown compression type: {compression}")
|
|
|
|
|
|
class BinaryCollector(Collector):
|
|
"""High-performance binary collector using C implementation.
|
|
|
|
This collector writes profiling data directly to a binary file format
|
|
with optional zstd compression. All I/O is performed in C for maximum
|
|
throughput.
|
|
|
|
The binary format uses string/frame deduplication and varint encoding
|
|
for efficient storage.
|
|
"""
|
|
|
|
def __init__(self, filename, sample_interval_usec, *, skip_idle=False,
|
|
compression='auto'):
|
|
"""Create a new binary collector.
|
|
|
|
Args:
|
|
filename: Path to output binary file
|
|
sample_interval_usec: Sampling interval in microseconds
|
|
skip_idle: If True, skip idle threads (not used in binary format)
|
|
compression: 'auto', 'zstd', 'none', or int (0=none, 1=zstd)
|
|
"""
|
|
self.filename = filename
|
|
self.sample_interval_usec = sample_interval_usec
|
|
self.skip_idle = skip_idle
|
|
|
|
compression_type = _resolve_compression(compression)
|
|
start_time_us = int(time.monotonic() * 1_000_000)
|
|
self._writer = _remote_debugging.BinaryWriter(
|
|
filename, sample_interval_usec, start_time_us, compression=compression_type
|
|
)
|
|
|
|
def collect(self, stack_frames, timestamp_us=None):
|
|
"""Collect profiling data from stack frames.
|
|
|
|
This passes stack_frames directly to the C writer which handles
|
|
all encoding and buffering.
|
|
|
|
Args:
|
|
stack_frames: List of InterpreterInfo objects from _remote_debugging
|
|
timestamp_us: Optional timestamp in microseconds. If not provided,
|
|
uses time.monotonic() to generate one.
|
|
"""
|
|
if timestamp_us is None:
|
|
timestamp_us = int(time.monotonic() * 1_000_000)
|
|
self._writer.write_sample(stack_frames, timestamp_us)
|
|
|
|
def collect_failed_sample(self):
|
|
"""Record a failed sample attempt (no-op for binary format)."""
|
|
pass
|
|
|
|
def export(self, filename=None):
|
|
"""Finalize and close the binary file.
|
|
|
|
Args:
|
|
filename: Ignored (binary files are written incrementally)
|
|
"""
|
|
self._writer.finalize()
|
|
|
|
@property
|
|
def total_samples(self):
|
|
return self._writer.total_samples
|
|
|
|
def get_stats(self):
|
|
"""Get encoding statistics.
|
|
|
|
Returns:
|
|
Dict with encoding statistics including repeat/full/suffix/pop-push
|
|
record counts, frames written/saved, and compression ratio.
|
|
"""
|
|
return self._writer.get_stats()
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
"""Context manager exit - finalize unless there was an error."""
|
|
if exc_type is None:
|
|
self._writer.finalize()
|
|
else:
|
|
self._writer.close()
|
|
return False
|