from abc import ABC, abstractmethod from .constants import ( DEFAULT_LOCATION, THREAD_STATUS_HAS_GIL, THREAD_STATUS_ON_CPU, THREAD_STATUS_GIL_REQUESTED, THREAD_STATUS_UNKNOWN, THREAD_STATUS_HAS_EXCEPTION, _INTERNAL_FRAME_SUFFIXES, ) try: from _remote_debugging import FrameInfo except ImportError: # Fallback definition if _remote_debugging is not available FrameInfo = None def normalize_location(location): """Normalize location to a 4-tuple format. Args: location: tuple (lineno, end_lineno, col_offset, end_col_offset) or None Returns: tuple: (lineno, end_lineno, col_offset, end_col_offset) """ if location is None: return DEFAULT_LOCATION return location def extract_lineno(location): """Extract lineno from location. Args: location: tuple (lineno, end_lineno, col_offset, end_col_offset) or None Returns: int: The line number (0 for synthetic frames) """ if location is None: return 0 return location[0] def _is_internal_frame(frame): if isinstance(frame, tuple): filename = frame[0] if frame else "" else: filename = getattr(frame, "filename", "") if not filename: return False return filename.endswith(_INTERNAL_FRAME_SUFFIXES) def filter_internal_frames(frames): if not frames: return frames return [f for f in frames if not _is_internal_frame(f)] class Collector(ABC): @abstractmethod def collect(self, stack_frames, timestamps_us=None): """Collect profiling data from stack frames. Args: stack_frames: List of InterpreterInfo objects timestamps_us: Optional list of timestamps in microseconds. If provided (from binary replay with RLE batching), use these instead of current time. If None, collectors should use time.monotonic() or similar. The list may contain multiple timestamps when samples are batched together (same stack, different times). """ def collect_failed_sample(self): """Collect data about a failed sample attempt.""" @abstractmethod def export(self, filename): """Export collected data to a file.""" @staticmethod def _filter_internal_frames(frames): return filter_internal_frames(frames) def _iter_all_frames(self, stack_frames, skip_idle=False): for interpreter_info in stack_frames: for thread_info in interpreter_info.threads: # skip_idle now means: skip if thread is not actively running # A thread is "active" if it has the GIL OR is on CPU if skip_idle: status_flags = thread_info.status has_gil = bool(status_flags & THREAD_STATUS_HAS_GIL) on_cpu = bool(status_flags & THREAD_STATUS_ON_CPU) if not (has_gil or on_cpu): continue frames = thread_info.frame_info if frames: # Filter out internal profiler frames from the bottom of the stack frames = self._filter_internal_frames(frames) if frames: yield frames, thread_info.thread_id def _iter_async_frames(self, awaited_info_list): # Phase 1: Index tasks and build parent relationships with pre-computed selection task_map, child_to_parent, all_task_ids, all_parent_ids = self._build_task_graph(awaited_info_list) # Phase 2: Find leaf tasks (tasks not awaited by anyone) leaf_task_ids = self._find_leaf_tasks(all_task_ids, all_parent_ids) # Phase 3: Build linear stacks from each leaf to root (optimized - no sorting!) yield from self._build_linear_stacks(leaf_task_ids, task_map, child_to_parent) def _iter_stacks(self, stack_frames, skip_idle=False): """Yield (frames, thread_id) for all stacks, handling both sync and async modes.""" if stack_frames and hasattr(stack_frames[0], "awaited_by"): for frames, thread_id, _ in self._iter_async_frames(stack_frames): if frames: yield frames, thread_id else: for frames, thread_id in self._iter_all_frames(stack_frames, skip_idle=skip_idle): if frames: yield frames, thread_id def _build_task_graph(self, awaited_info_list): task_map = {} child_to_parent = {} # Maps child_id -> (selected_parent_id, parent_count) all_task_ids = set() all_parent_ids = set() # Track ALL parent IDs for leaf detection for awaited_info in awaited_info_list: thread_id = awaited_info.thread_id for task_info in awaited_info.awaited_by: task_id = task_info.task_id task_map[task_id] = (task_info, thread_id) all_task_ids.add(task_id) # Pre-compute selected parent and count for optimization if task_info.awaited_by: parent_ids = [p.task_name for p in task_info.awaited_by] parent_count = len(parent_ids) # Track ALL parents for leaf detection all_parent_ids.update(parent_ids) # Use min() for O(n) instead of sorted()[0] which is O(n log n) selected_parent = min(parent_ids) if parent_count > 1 else parent_ids[0] child_to_parent[task_id] = (selected_parent, parent_count) return task_map, child_to_parent, all_task_ids, all_parent_ids def _find_leaf_tasks(self, all_task_ids, all_parent_ids): # Leaves are tasks that are not parents of any other task return all_task_ids - all_parent_ids def _build_linear_stacks(self, leaf_task_ids, task_map, child_to_parent): for leaf_id in leaf_task_ids: frames = [] visited = set() current_id = leaf_id thread_id = None # Follow the single parent chain from leaf to root while current_id is not None: # Cycle detection if current_id in visited: break visited.add(current_id) # Check if task exists in task_map if current_id not in task_map: break task_info, tid = task_map[current_id] # Set thread_id from first task if thread_id is None: thread_id = tid # Add all frames from all coroutines in this task if task_info.coroutine_stack: for coro_info in task_info.coroutine_stack: for frame in coro_info.call_stack: frames.append(frame) # Get pre-computed parent info (no sorting needed!) parent_info = child_to_parent.get(current_id) # Add task boundary marker with parent count annotation if multiple parents task_name = task_info.task_name or "Task-" + str(task_info.task_id) if parent_info: selected_parent, parent_count = parent_info if parent_count > 1: task_name = f"{task_name} ({parent_count} parents)" frames.append(FrameInfo(("", None, task_name, None))) current_id = selected_parent else: # Root task - no parent frames.append(FrameInfo(("", None, task_name, None))) current_id = None # Yield the complete stack if we collected any frames if frames and thread_id is not None: yield frames, thread_id, leaf_id def _is_gc_frame(self, frame): if isinstance(frame, tuple): funcname = frame[2] if len(frame) >= 3 else "" else: funcname = getattr(frame, "funcname", "") return "" in funcname or "gc_collect" in funcname def _collect_thread_status_stats(self, stack_frames): """Collect aggregate and per-thread status statistics from a sample. Returns: tuple: (aggregate_status_counts, has_gc_frame, per_thread_stats) - aggregate_status_counts: dict with has_gil, on_cpu, has_exception, etc. - has_gc_frame: bool indicating if any thread has GC frames - per_thread_stats: dict mapping thread_id to per-thread counts """ status_counts = { "has_gil": 0, "on_cpu": 0, "gil_requested": 0, "unknown": 0, "has_exception": 0, "total": 0, } has_gc_frame = False per_thread_stats = {} for interpreter_info in stack_frames: threads = getattr(interpreter_info, "threads", []) for thread_info in threads: status_counts["total"] += 1 # Track thread status using bit flags status_flags = getattr(thread_info, "status", 0) if status_flags & THREAD_STATUS_HAS_GIL: status_counts["has_gil"] += 1 if status_flags & THREAD_STATUS_ON_CPU: status_counts["on_cpu"] += 1 if status_flags & THREAD_STATUS_GIL_REQUESTED: status_counts["gil_requested"] += 1 if status_flags & THREAD_STATUS_UNKNOWN: status_counts["unknown"] += 1 if status_flags & THREAD_STATUS_HAS_EXCEPTION: status_counts["has_exception"] += 1 # Track per-thread statistics thread_id = getattr(thread_info, "thread_id", None) if thread_id is not None: if thread_id not in per_thread_stats: per_thread_stats[thread_id] = { "has_gil": 0, "on_cpu": 0, "gil_requested": 0, "unknown": 0, "has_exception": 0, "total": 0, "gc_samples": 0, } thread_stats = per_thread_stats[thread_id] thread_stats["total"] += 1 if status_flags & THREAD_STATUS_HAS_GIL: thread_stats["has_gil"] += 1 if status_flags & THREAD_STATUS_ON_CPU: thread_stats["on_cpu"] += 1 if status_flags & THREAD_STATUS_GIL_REQUESTED: thread_stats["gil_requested"] += 1 if status_flags & THREAD_STATUS_UNKNOWN: thread_stats["unknown"] += 1 if status_flags & THREAD_STATUS_HAS_EXCEPTION: thread_stats["has_exception"] += 1 # Check for GC frames in this thread frames = getattr(thread_info, "frame_info", None) if frames: for frame in frames: if self._is_gc_frame(frame): thread_stats["gc_samples"] += 1 has_gc_frame = True break return status_counts, has_gc_frame, per_thread_stats