cpython/Lib/profiling/sampling/heatmap_collector.py
Pablo Galindo Salgado 8801c6dec7
gh-140677 Add heatmap visualization to Tachyon sampling profiler (#140680)
Co-authored-by: Ivona Stojanovic <stojanovic.i@hotmail.com>
2025-12-02 20:33:40 +00:00

1039 lines
40 KiB
Python

"""Heatmap collector for Python profiling with line-level execution heat visualization."""
import base64
import collections
import html
import importlib.resources
import json
import os
import platform
import site
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from ._css_utils import get_combined_css
from .stack_collector import StackTraceCollector
# ============================================================================
# Data Classes
# ============================================================================
@dataclass
class FileStats:
"""Statistics for a single profiled file."""
filename: str
module_name: str
module_type: str
total_samples: int
total_self_samples: int
num_lines: int
max_samples: int
max_self_samples: int
percentage: float = 0.0
@dataclass
class TreeNode:
"""Node in the hierarchical file tree structure."""
files: List[FileStats] = field(default_factory=list)
samples: int = 0
count: int = 0
children: Dict[str, 'TreeNode'] = field(default_factory=dict)
@dataclass
class ColorGradient:
"""Configuration for heatmap color gradient calculations."""
# Color stops thresholds
stop_1: float = 0.2 # Blue to cyan transition
stop_2: float = 0.4 # Cyan to green transition
stop_3: float = 0.6 # Green to yellow transition
stop_4: float = 0.8 # Yellow to orange transition
stop_5: float = 1.0 # Orange to red transition
# Alpha (opacity) values
alpha_very_cold: float = 0.3
alpha_cold: float = 0.4
alpha_medium: float = 0.5
alpha_warm: float = 0.6
alpha_hot_base: float = 0.7
alpha_hot_range: float = 0.15
# Gradient multiplier
multiplier: int = 5
# Cache for calculated colors
cache: Dict[float, Tuple[int, int, int, float]] = field(default_factory=dict)
# ============================================================================
# Module Path Analysis
# ============================================================================
def get_python_path_info():
"""Get information about Python installation paths for module extraction.
Returns:
dict: Dictionary containing stdlib path, site-packages paths, and sys.path entries.
"""
info = {
'stdlib': None,
'site_packages': [],
'sys_path': []
}
# Get standard library path from os module location
try:
if hasattr(os, '__file__') and os.__file__:
info['stdlib'] = Path(os.__file__).parent
except (AttributeError, OSError):
pass # Silently continue if we can't determine stdlib path
# Get site-packages directories
site_packages = []
try:
site_packages.extend(Path(p) for p in site.getsitepackages())
except (AttributeError, OSError):
pass # Continue without site packages if unavailable
# Get user site-packages
try:
user_site = site.getusersitepackages()
if user_site and Path(user_site).exists():
site_packages.append(Path(user_site))
except (AttributeError, OSError):
pass # Continue without user site packages
info['site_packages'] = site_packages
info['sys_path'] = [Path(p) for p in sys.path if p]
return info
def extract_module_name(filename, path_info):
"""Extract Python module name and type from file path.
Args:
filename: Path to the Python file
path_info: Dictionary from get_python_path_info()
Returns:
tuple: (module_name, module_type) where module_type is one of:
'stdlib', 'site-packages', 'project', or 'other'
"""
if not filename:
return ('unknown', 'other')
try:
file_path = Path(filename)
except (ValueError, OSError):
return (str(filename), 'other')
# Check if it's in stdlib
if path_info['stdlib'] and _is_subpath(file_path, path_info['stdlib']):
try:
rel_path = file_path.relative_to(path_info['stdlib'])
return (_path_to_module(rel_path), 'stdlib')
except ValueError:
pass
# Check site-packages
for site_pkg in path_info['site_packages']:
if _is_subpath(file_path, site_pkg):
try:
rel_path = file_path.relative_to(site_pkg)
return (_path_to_module(rel_path), 'site-packages')
except ValueError:
continue
# Check other sys.path entries (project files)
if not str(file_path).startswith(('<', '[')): # Skip special files
for path_entry in path_info['sys_path']:
if _is_subpath(file_path, path_entry):
try:
rel_path = file_path.relative_to(path_entry)
return (_path_to_module(rel_path), 'project')
except ValueError:
continue
# Fallback: just use the filename
return (_path_to_module(file_path), 'other')
def _is_subpath(file_path, parent_path):
try:
file_path.relative_to(parent_path)
return True
except (ValueError, OSError):
return False
def _path_to_module(path):
if isinstance(path, str):
path = Path(path)
# Remove .py extension
if path.suffix == '.py':
path = path.with_suffix('')
# Convert path separators to dots
parts = path.parts
# Handle __init__ files - they represent the package itself
if parts and parts[-1] == '__init__':
parts = parts[:-1]
return '.'.join(parts) if parts else path.stem
# ============================================================================
# Helper Classes
# ============================================================================
class _TemplateLoader:
"""Loads and caches HTML/CSS/JS templates for heatmap generation."""
def __init__(self):
"""Load all templates and assets once."""
self.index_template = None
self.file_template = None
self.index_css = None
self.index_js = None
self.file_css = None
self.file_js = None
self.logo_html = None
self._load_templates()
def _load_templates(self):
"""Load all template files from _heatmap_assets."""
try:
template_dir = importlib.resources.files(__package__)
assets_dir = template_dir / "_heatmap_assets"
# Load HTML templates
self.index_template = (assets_dir / "heatmap_index_template.html").read_text(encoding="utf-8")
self.file_template = (assets_dir / "heatmap_pyfile_template.html").read_text(encoding="utf-8")
# Load CSS (same file used for both index and file pages)
css_content = get_combined_css("heatmap")
self.index_css = css_content
self.file_css = css_content
# Load JS
self.index_js = (assets_dir / "heatmap_index.js").read_text(encoding="utf-8")
self.file_js = (assets_dir / "heatmap.js").read_text(encoding="utf-8")
# Load Python logo
logo_dir = template_dir / "_assets"
try:
png_path = logo_dir / "python-logo-only.png"
b64_logo = base64.b64encode(png_path.read_bytes()).decode("ascii")
self.logo_html = f'<img src="data:image/png;base64,{b64_logo}" alt="Python logo" class="python-logo"/>'
except (FileNotFoundError, IOError) as e:
self.logo_html = '<div class="python-logo-placeholder"></div>'
print(f"Warning: Could not load Python logo: {e}")
except (FileNotFoundError, IOError) as e:
raise RuntimeError(f"Failed to load heatmap template files: {e}") from e
class _TreeBuilder:
"""Builds hierarchical tree structure from file statistics."""
@staticmethod
def build_file_tree(file_stats: List[FileStats]) -> Dict[str, TreeNode]:
"""Build hierarchical tree grouped by module type, then by module structure.
Args:
file_stats: List of FileStats objects
Returns:
Dictionary mapping module types to their tree roots
"""
# Group by module type first
type_groups = {'stdlib': [], 'site-packages': [], 'project': [], 'other': []}
for stat in file_stats:
type_groups[stat.module_type].append(stat)
# Build tree for each type
trees = {}
for module_type, stats in type_groups.items():
if not stats:
continue
root_node = TreeNode()
for stat in stats:
module_name = stat.module_name
parts = module_name.split('.')
# Navigate/create tree structure
current_node = root_node
for i, part in enumerate(parts):
if i == len(parts) - 1:
# Last part - store the file
current_node.files.append(stat)
else:
# Intermediate part - create or navigate
if part not in current_node.children:
current_node.children[part] = TreeNode()
current_node = current_node.children[part]
# Calculate aggregate stats for this type's tree
_TreeBuilder._calculate_node_stats(root_node)
trees[module_type] = root_node
return trees
@staticmethod
def _calculate_node_stats(node: TreeNode) -> Tuple[int, int]:
"""Recursively calculate aggregate statistics for tree nodes.
Args:
node: TreeNode to calculate stats for
Returns:
Tuple of (total_samples, file_count)
"""
total_samples = 0
file_count = 0
# Count files at this level
for file_stat in node.files:
total_samples += file_stat.total_samples
file_count += 1
# Recursively process children
for child in node.children.values():
child_samples, child_count = _TreeBuilder._calculate_node_stats(child)
total_samples += child_samples
file_count += child_count
node.samples = total_samples
node.count = file_count
return total_samples, file_count
class _HtmlRenderer:
"""Renders hierarchical tree structures as HTML."""
def __init__(self, file_index: Dict[str, str], color_gradient: ColorGradient,
calculate_intensity_color_func):
"""Initialize renderer with file index and color calculation function.
Args:
file_index: Mapping from filenames to HTML file names
color_gradient: ColorGradient configuration
calculate_intensity_color_func: Function to calculate colors
"""
self.file_index = file_index
self.color_gradient = color_gradient
self.calculate_intensity_color = calculate_intensity_color_func
self.heatmap_bar_height = 16
def render_hierarchical_html(self, trees: Dict[str, TreeNode]) -> str:
"""Build hierarchical HTML with type sections and collapsible module folders.
Args:
trees: Dictionary mapping module types to tree roots
Returns:
Complete HTML string for all sections
"""
type_names = {
'stdlib': '📚 Standard Library',
'site-packages': '📦 Site Packages',
'project': '🏗️ Project Files',
'other': '📄 Other Files'
}
sections = []
for module_type in ['project', 'stdlib', 'site-packages', 'other']:
if module_type not in trees:
continue
tree = trees[module_type]
# Project starts expanded, others start collapsed
is_collapsed = module_type in {'stdlib', 'site-packages', 'other'}
icon = '' if is_collapsed else ''
content_style = ' style="display: none;"' if is_collapsed else ''
section_html = f'''
<div class="type-section">
<div class="type-header" onclick="toggleTypeSection(this)">
<span class="type-icon">{icon}</span>
<span class="type-title">{type_names[module_type]}</span>
<span class="type-stats">({tree.count} files, {tree.samples:,} samples)</span>
</div>
<div class="type-content"{content_style}>
'''
# Render root folders
root_folders = sorted(tree.children.items(),
key=lambda x: x[1].samples, reverse=True)
for folder_name, folder_node in root_folders:
section_html += self._render_folder(folder_node, folder_name, level=1)
# Render root files (files not in any module)
if tree.files:
sorted_files = sorted(tree.files, key=lambda x: x.total_samples, reverse=True)
section_html += ' <div class="files-list">\n'
for stat in sorted_files:
section_html += self._render_file_item(stat, indent=' ')
section_html += ' </div>\n'
section_html += ' </div>\n</div>\n'
sections.append(section_html)
return '\n'.join(sections)
def _render_folder(self, node: TreeNode, name: str, level: int = 1) -> str:
"""Render a single folder node recursively.
Args:
node: TreeNode to render
name: Display name for the folder
level: Nesting level for indentation
Returns:
HTML string for this folder and its contents
"""
indent = ' ' * level
parts = []
# Render folder header (collapsed by default)
parts.append(f'{indent}<div class="folder-node collapsed" data-level="{level}">')
parts.append(f'{indent} <div class="folder-header" onclick="toggleFolder(this)">')
parts.append(f'{indent} <span class="folder-icon">▶</span>')
parts.append(f'{indent} <span class="folder-name">📁 {html.escape(name)}</span>')
parts.append(f'{indent} <span class="folder-stats">({node.count} files, {node.samples:,} samples)</span>')
parts.append(f'{indent} </div>')
parts.append(f'{indent} <div class="folder-content" style="display: none;">')
# Render sub-folders sorted by sample count
subfolders = sorted(node.children.items(),
key=lambda x: x[1].samples, reverse=True)
for subfolder_name, subfolder_node in subfolders:
parts.append(self._render_folder(subfolder_node, subfolder_name, level + 1))
# Render files in this folder
if node.files:
sorted_files = sorted(node.files, key=lambda x: x.total_samples, reverse=True)
parts.append(f'{indent} <div class="files-list">')
for stat in sorted_files:
parts.append(self._render_file_item(stat, indent=f'{indent} '))
parts.append(f'{indent} </div>')
parts.append(f'{indent} </div>')
parts.append(f'{indent}</div>')
return '\n'.join(parts)
def _render_file_item(self, stat: FileStats, indent: str = '') -> str:
"""Render a single file item with heatmap bar.
Args:
stat: FileStats object
indent: Indentation string
Returns:
HTML string for file item
"""
full_path = html.escape(stat.filename)
module_name = html.escape(stat.module_name)
intensity = stat.percentage / 100.0
r, g, b, alpha = self.calculate_intensity_color(intensity)
bg_color = f"rgba({r}, {g}, {b}, {alpha})"
bar_width = min(stat.percentage, 100)
html_file = self.file_index[stat.filename]
return (f'{indent}<div class="file-item">\n'
f'{indent} <a href="{html_file}" class="file-link" title="{full_path}">📄 {module_name}</a>\n'
f'{indent} <span class="file-samples">{stat.total_samples:,} samples</span>\n'
f'{indent} <div class="heatmap-bar-container"><div class="heatmap-bar" style="width: {bar_width}px; background-color: {bg_color}; height: {self.heatmap_bar_height}px;"></div></div>\n'
f'{indent}</div>\n')
# ============================================================================
# Main Collector Class
# ============================================================================
class HeatmapCollector(StackTraceCollector):
"""Collector that generates coverage.py-style heatmap HTML output with line intensity.
This collector creates detailed HTML reports showing which lines of code
were executed most frequently during profiling, similar to coverage.py
but showing execution "heat" rather than just coverage.
"""
# File naming and formatting constants
FILE_INDEX_FORMAT = "file_{:04d}.html"
def __init__(self, *args, **kwargs):
"""Initialize the heatmap collector with data structures for analysis."""
super().__init__(*args, **kwargs)
# Sample counting data structures
self.line_samples = collections.Counter()
self.file_samples = collections.defaultdict(collections.Counter)
self.line_self_samples = collections.Counter()
self.file_self_samples = collections.defaultdict(collections.Counter)
# Call graph data structures for navigation
self.call_graph = collections.defaultdict(list)
self.callers_graph = collections.defaultdict(list)
self.function_definitions = {}
# Edge counting for call path analysis
self.edge_samples = collections.Counter()
# Statistics and metadata
self._total_samples = 0
self._path_info = get_python_path_info()
self.stats = {}
# Color gradient configuration
self._color_gradient = ColorGradient()
# Template loader (loads all templates once)
self._template_loader = _TemplateLoader()
# File index (populated during export)
self.file_index = {}
@property
def _color_cache(self):
"""Compatibility property for accessing color cache."""
return self._color_gradient.cache
def set_stats(self, sample_interval_usec, duration_sec, sample_rate, error_rate=None, missed_samples=None, **kwargs):
"""Set profiling statistics to include in heatmap output.
Args:
sample_interval_usec: Sampling interval in microseconds
duration_sec: Total profiling duration in seconds
sample_rate: Effective sampling rate
error_rate: Optional error rate during profiling
missed_samples: Optional percentage of missed samples
**kwargs: Additional statistics to include
"""
self.stats = {
"sample_interval_usec": sample_interval_usec,
"duration_sec": duration_sec,
"sample_rate": sample_rate,
"error_rate": error_rate,
"missed_samples": missed_samples,
"python_version": sys.version,
"python_implementation": platform.python_implementation(),
"platform": platform.platform(),
}
self.stats.update(kwargs)
def process_frames(self, frames, thread_id):
"""Process stack frames and count samples per line.
Args:
frames: List of frame tuples (filename, lineno, funcname)
frames[0] is the leaf (top of stack, where execution is)
thread_id: Thread ID for this stack trace
"""
self._total_samples += 1
# Count each line in the stack and build call graph
for i, frame_info in enumerate(frames):
filename, lineno, funcname = frame_info
if not self._is_valid_frame(filename, lineno):
continue
# frames[0] is the leaf - where execution is actually happening
is_leaf = (i == 0)
self._record_line_sample(filename, lineno, funcname, is_leaf=is_leaf)
# Build call graph for adjacent frames
if i + 1 < len(frames):
self._record_call_relationship(frames[i], frames[i + 1])
def _is_valid_frame(self, filename, lineno):
"""Check if a frame should be included in the heatmap."""
# Skip internal or invalid files
if not filename or filename.startswith('<') or filename.startswith('['):
return False
# Skip invalid frames with corrupted filename data
if filename == "__init__" and lineno == 0:
return False
return True
def _record_line_sample(self, filename, lineno, funcname, is_leaf=False):
"""Record a sample for a specific line."""
# Track cumulative samples (all occurrences in stack)
self.line_samples[(filename, lineno)] += 1
self.file_samples[filename][lineno] += 1
# Track self/leaf samples (only when at top of stack)
if is_leaf:
self.line_self_samples[(filename, lineno)] += 1
self.file_self_samples[filename][lineno] += 1
# Record function definition location
if funcname and (filename, funcname) not in self.function_definitions:
self.function_definitions[(filename, funcname)] = lineno
def _record_call_relationship(self, callee_frame, caller_frame):
"""Record caller/callee relationship between adjacent frames."""
callee_filename, callee_lineno, callee_funcname = callee_frame
caller_filename, caller_lineno, caller_funcname = caller_frame
# Skip internal files for call graph
if callee_filename.startswith('<') or callee_filename.startswith('['):
return
# Get the callee's function definition line
callee_def_line = self.function_definitions.get(
(callee_filename, callee_funcname), callee_lineno
)
# Record caller -> callee relationship
caller_key = (caller_filename, caller_lineno)
callee_info = (callee_filename, callee_def_line, callee_funcname)
if callee_info not in self.call_graph[caller_key]:
self.call_graph[caller_key].append(callee_info)
# Record callee <- caller relationship
callee_key = (callee_filename, callee_def_line)
caller_info = (caller_filename, caller_lineno, caller_funcname)
if caller_info not in self.callers_graph[callee_key]:
self.callers_graph[callee_key].append(caller_info)
# Count this call edge for path analysis
edge_key = (caller_key, callee_key)
self.edge_samples[edge_key] += 1
def export(self, output_path):
"""Export heatmap data as HTML files in a directory.
Args:
output_path: Path where to create the heatmap output directory
"""
if not self.file_samples:
print("Warning: No heatmap data to export")
return
try:
output_dir = self._prepare_output_directory(output_path)
file_stats = self._calculate_file_stats()
self._create_file_index(file_stats)
# Generate individual file reports
self._generate_file_reports(output_dir, file_stats)
# Generate index page
self._generate_index_html(output_dir / 'index.html', file_stats)
self._print_export_summary(output_dir, file_stats)
except Exception as e:
print(f"Error: Failed to export heatmap: {e}")
raise
def _prepare_output_directory(self, output_path):
"""Create output directory for heatmap files."""
output_dir = Path(output_path)
if output_dir.suffix == '.html':
output_dir = output_dir.with_suffix('')
try:
output_dir.mkdir(exist_ok=True, parents=True)
except (IOError, OSError) as e:
raise RuntimeError(f"Failed to create output directory {output_dir}: {e}") from e
return output_dir
def _create_file_index(self, file_stats: List[FileStats]):
"""Create mapping from filenames to HTML file names."""
self.file_index = {
stat.filename: self.FILE_INDEX_FORMAT.format(i)
for i, stat in enumerate(file_stats)
}
def _generate_file_reports(self, output_dir, file_stats: List[FileStats]):
"""Generate HTML report for each source file."""
for stat in file_stats:
file_path = output_dir / self.file_index[stat.filename]
line_counts = self.file_samples[stat.filename]
valid_line_counts = {line: count for line, count in line_counts.items() if line >= 0}
self_counts = self.file_self_samples.get(stat.filename, {})
valid_self_counts = {line: count for line, count in self_counts.items() if line >= 0}
self._generate_file_html(
file_path,
stat.filename,
valid_line_counts,
valid_self_counts,
stat
)
def _print_export_summary(self, output_dir, file_stats: List[FileStats]):
"""Print summary of exported heatmap."""
print(f"Heatmap output written to {output_dir}/")
print(f" - Index: {output_dir / 'index.html'}")
print(f" - {len(file_stats)} source file(s) analyzed")
def _calculate_file_stats(self) -> List[FileStats]:
"""Calculate statistics for each file.
Returns:
List of FileStats objects sorted by total samples
"""
file_stats = []
for filename, line_counts in self.file_samples.items():
# Skip special frames
if filename in ('~', '...', '.') or filename.startswith('<') or filename.startswith('['):
continue
# Filter out lines with -1 (special frames)
valid_line_counts = {line: count for line, count in line_counts.items() if line >= 0}
if not valid_line_counts:
continue
# Get self samples for this file
self_line_counts = self.file_self_samples.get(filename, {})
valid_self_counts = {line: count for line, count in self_line_counts.items() if line >= 0}
total_samples = sum(valid_line_counts.values())
total_self_samples = sum(valid_self_counts.values())
num_lines = len(valid_line_counts)
max_samples = max(valid_line_counts.values())
max_self_samples = max(valid_self_counts.values()) if valid_self_counts else 0
module_name, module_type = extract_module_name(filename, self._path_info)
file_stats.append(FileStats(
filename=filename,
module_name=module_name,
module_type=module_type,
total_samples=total_samples,
total_self_samples=total_self_samples,
num_lines=num_lines,
max_samples=max_samples,
max_self_samples=max_self_samples,
percentage=0.0
))
# Sort by total samples and calculate percentages
file_stats.sort(key=lambda x: x.total_samples, reverse=True)
if file_stats:
max_total = file_stats[0].total_samples
for stat in file_stats:
stat.percentage = (stat.total_samples / max_total * 100) if max_total > 0 else 0
return file_stats
def _generate_index_html(self, index_path: Path, file_stats: List[FileStats]):
"""Generate index.html with list of all profiled files."""
# Build hierarchical tree
tree = _TreeBuilder.build_file_tree(file_stats)
# Render tree as HTML
renderer = _HtmlRenderer(self.file_index, self._color_gradient,
self._calculate_intensity_color)
sections_html = renderer.render_hierarchical_html(tree)
# Format error rate and missed samples with bar classes
error_rate = self.stats.get('error_rate')
if error_rate is not None:
error_rate_str = f"{error_rate:.1f}%"
error_rate_width = min(error_rate, 100)
# Determine bar color class based on rate
if error_rate < 5:
error_rate_class = "good"
elif error_rate < 15:
error_rate_class = "warning"
else:
error_rate_class = "error"
else:
error_rate_str = "N/A"
error_rate_width = 0
error_rate_class = "good"
missed_samples = self.stats.get('missed_samples')
if missed_samples is not None:
missed_samples_str = f"{missed_samples:.1f}%"
missed_samples_width = min(missed_samples, 100)
if missed_samples < 5:
missed_samples_class = "good"
elif missed_samples < 15:
missed_samples_class = "warning"
else:
missed_samples_class = "error"
else:
missed_samples_str = "N/A"
missed_samples_width = 0
missed_samples_class = "good"
# Populate template
replacements = {
"<!-- INLINE_CSS -->": f"<style>\n{self._template_loader.index_css}\n</style>",
"<!-- INLINE_JS -->": f"<script>\n{self._template_loader.index_js}\n</script>",
"<!-- PYTHON_LOGO -->": self._template_loader.logo_html,
"<!-- NUM_FILES -->": str(len(file_stats)),
"<!-- TOTAL_SAMPLES -->": f"{self._total_samples:,}",
"<!-- DURATION -->": f"{self.stats.get('duration_sec', 0):.1f}s",
"<!-- SAMPLE_RATE -->": f"{self.stats.get('sample_rate', 0):.1f}",
"<!-- ERROR_RATE -->": error_rate_str,
"<!-- ERROR_RATE_WIDTH -->": str(error_rate_width),
"<!-- ERROR_RATE_CLASS -->": error_rate_class,
"<!-- MISSED_SAMPLES -->": missed_samples_str,
"<!-- MISSED_SAMPLES_WIDTH -->": str(missed_samples_width),
"<!-- MISSED_SAMPLES_CLASS -->": missed_samples_class,
"<!-- SECTIONS_HTML -->": sections_html,
}
html_content = self._template_loader.index_template
for placeholder, value in replacements.items():
html_content = html_content.replace(placeholder, value)
try:
index_path.write_text(html_content, encoding='utf-8')
except (IOError, OSError) as e:
raise RuntimeError(f"Failed to write index file {index_path}: {e}") from e
def _calculate_intensity_color(self, intensity: float) -> Tuple[int, int, int, float]:
"""Calculate RGB color and alpha for given intensity (0-1 range).
Returns (r, g, b, alpha) tuple representing the heatmap color gradient:
blue -> green -> yellow -> orange -> red
Results are cached to improve performance.
"""
# Round to 3 decimal places for cache key
cache_key = round(intensity, 3)
if cache_key in self._color_gradient.cache:
return self._color_gradient.cache[cache_key]
gradient = self._color_gradient
m = gradient.multiplier
# Color stops with (threshold, rgb_func, alpha_func)
stops = [
(gradient.stop_1,
lambda i: (0, int(150 * i * m), 255),
lambda i: gradient.alpha_very_cold),
(gradient.stop_2,
lambda i: (0, 255, int(255 * (1 - (i - gradient.stop_1) * m))),
lambda i: gradient.alpha_cold),
(gradient.stop_3,
lambda i: (int(255 * (i - gradient.stop_2) * m), 255, 0),
lambda i: gradient.alpha_medium),
(gradient.stop_4,
lambda i: (255, int(200 - 100 * (i - gradient.stop_3) * m), 0),
lambda i: gradient.alpha_warm),
(gradient.stop_5,
lambda i: (255, int(100 * (1 - (i - gradient.stop_4) * m)), 0),
lambda i: gradient.alpha_hot_base + gradient.alpha_hot_range * (i - gradient.stop_4) * m),
]
result = None
for threshold, rgb_func, alpha_func in stops:
if intensity < threshold or threshold == gradient.stop_5:
r, g, b = rgb_func(intensity)
result = (r, g, b, alpha_func(intensity))
break
# Fallback
if result is None:
result = (255, 0, 0, 0.75)
# Cache the result
self._color_gradient.cache[cache_key] = result
return result
def _generate_file_html(self, output_path: Path, filename: str,
line_counts: Dict[int, int], self_counts: Dict[int, int],
file_stat: FileStats):
"""Generate HTML for a single source file with heatmap coloring."""
# Read source file
try:
source_lines = Path(filename).read_text(encoding='utf-8', errors='replace').splitlines()
except (IOError, OSError) as e:
if not (filename.startswith('<') or filename.startswith('[') or
filename in ('~', '...', '.') or len(filename) < 2):
print(f"Warning: Could not read source file {filename}: {e}")
source_lines = [f"# Source file not available: {filename}"]
# Generate HTML for each line
max_samples = max(line_counts.values()) if line_counts else 1
max_self_samples = max(self_counts.values()) if self_counts else 1
code_lines_html = [
self._build_line_html(line_num, line_content, line_counts, self_counts,
max_samples, max_self_samples, filename)
for line_num, line_content in enumerate(source_lines, start=1)
]
# Populate template
replacements = {
"<!-- FILENAME -->": html.escape(filename),
"<!-- TOTAL_SAMPLES -->": f"{file_stat.total_samples:,}",
"<!-- TOTAL_SELF_SAMPLES -->": f"{file_stat.total_self_samples:,}",
"<!-- NUM_LINES -->": str(file_stat.num_lines),
"<!-- PERCENTAGE -->": f"{file_stat.percentage:.2f}",
"<!-- MAX_SAMPLES -->": str(file_stat.max_samples),
"<!-- MAX_SELF_SAMPLES -->": str(file_stat.max_self_samples),
"<!-- CODE_LINES -->": ''.join(code_lines_html),
"<!-- INLINE_CSS -->": f"<style>\n{self._template_loader.file_css}\n</style>",
"<!-- INLINE_JS -->": f"<script>\n{self._template_loader.file_js}\n</script>",
}
html_content = self._template_loader.file_template
for placeholder, value in replacements.items():
html_content = html_content.replace(placeholder, value)
try:
output_path.write_text(html_content, encoding='utf-8')
except (IOError, OSError) as e:
raise RuntimeError(f"Failed to write file {output_path}: {e}") from e
def _build_line_html(self, line_num: int, line_content: str,
line_counts: Dict[int, int], self_counts: Dict[int, int],
max_samples: int, max_self_samples: int, filename: str) -> str:
"""Build HTML for a single line of source code."""
cumulative_samples = line_counts.get(line_num, 0)
self_samples = self_counts.get(line_num, 0)
# Calculate colors for both self and cumulative modes
if cumulative_samples > 0:
cumulative_intensity = cumulative_samples / max_samples if max_samples > 0 else 0
self_intensity = self_samples / max_self_samples if max_self_samples > 0 and self_samples > 0 else 0
# Default to self-based coloring
intensity = self_intensity if self_samples > 0 else cumulative_intensity
r, g, b, alpha = self._calculate_intensity_color(intensity)
bg_color = f"rgba({r}, {g}, {b}, {alpha})"
# Pre-calculate colors for both modes (for JS toggle)
self_bg_color = self._format_color_for_intensity(self_intensity) if self_samples > 0 else "transparent"
cumulative_bg_color = self._format_color_for_intensity(cumulative_intensity)
self_display = f"{self_samples:,}" if self_samples > 0 else ""
cumulative_display = f"{cumulative_samples:,}"
tooltip = f"Self: {self_samples:,}, Total: {cumulative_samples:,}"
else:
bg_color = "transparent"
self_bg_color = "transparent"
cumulative_bg_color = "transparent"
self_display = ""
cumulative_display = ""
tooltip = ""
# Get navigation buttons
nav_buttons_html = self._build_navigation_buttons(filename, line_num)
# Build line HTML
line_html = html.escape(line_content.rstrip('\n'))
title_attr = f' title="{html.escape(tooltip)}"' if tooltip else ""
return (
f' <div class="code-line" data-bg-color="{bg_color}" '
f'data-self-color="{self_bg_color}" data-cumulative-color="{cumulative_bg_color}" '
f'id="line-{line_num}"{title_attr}>\n'
f' <div class="line-number">{line_num}</div>\n'
f' <div class="line-samples-self">{self_display}</div>\n'
f' <div class="line-samples-cumulative">{cumulative_display}</div>\n'
f' <div class="line-content">{line_html}</div>\n'
f' {nav_buttons_html}\n'
f' </div>\n'
)
def _format_color_for_intensity(self, intensity: float) -> str:
"""Format color as rgba() string for given intensity."""
r, g, b, alpha = self._calculate_intensity_color(intensity)
return f"rgba({r}, {g}, {b}, {alpha})"
def _build_navigation_buttons(self, filename: str, line_num: int) -> str:
"""Build navigation buttons for callers/callees."""
line_key = (filename, line_num)
caller_list = self._deduplicate_by_function(self.callers_graph.get(line_key, []))
callee_list = self._deduplicate_by_function(self.call_graph.get(line_key, []))
# Get edge counts for each caller/callee
callers_with_counts = self._get_edge_counts(line_key, caller_list, is_caller=True)
callees_with_counts = self._get_edge_counts(line_key, callee_list, is_caller=False)
# Build navigation buttons with counts
caller_btn = self._create_navigation_button(callers_with_counts, 'caller', '')
callee_btn = self._create_navigation_button(callees_with_counts, 'callee', '')
if caller_btn or callee_btn:
return f'<div class="line-nav-buttons">{caller_btn}{callee_btn}</div>'
return ''
def _get_edge_counts(self, line_key: Tuple[str, int],
items: List[Tuple[str, int, str]],
is_caller: bool) -> List[Tuple[str, int, str, int]]:
"""Get sample counts for each caller/callee edge."""
result = []
for file, line, func in items:
edge_line_key = (file, line)
if is_caller:
edge_key = (edge_line_key, line_key)
else:
edge_key = (line_key, edge_line_key)
count = self.edge_samples.get(edge_key, 0)
result.append((file, line, func, count))
result.sort(key=lambda x: x[3], reverse=True)
return result
def _deduplicate_by_function(self, items: List[Tuple[str, int, str]]) -> List[Tuple[str, int, str]]:
"""Remove duplicate entries based on (file, function) key."""
seen = {}
result = []
for file, line, func in items:
key = (file, func)
if key not in seen:
seen[key] = True
result.append((file, line, func))
return result
def _create_navigation_button(self, items_with_counts: List[Tuple[str, int, str, int]],
btn_class: str, arrow: str) -> str:
"""Create HTML for a navigation button with sample counts."""
# Filter valid items
valid_items = [(f, l, fn, cnt) for f, l, fn, cnt in items_with_counts
if f in self.file_index and l > 0]
if not valid_items:
return ""
if len(valid_items) == 1:
file, line, func, count = valid_items[0]
target_html = self.file_index[file]
nav_data = json.dumps({'link': f"{target_html}#line-{line}", 'func': func})
title = f"Go to {btn_class}: {html.escape(func)} ({count:,} samples)"
return f'<button class="nav-btn {btn_class}" data-nav=\'{html.escape(nav_data)}\' title="{title}">{arrow}</button>'
# Multiple items - create menu
total_samples = sum(cnt for _, _, _, cnt in valid_items)
items_data = [
{
'file': os.path.basename(file),
'func': func,
'count': count,
'link': f"{self.file_index[file]}#line-{line}"
}
for file, line, func, count in valid_items
]
items_json = html.escape(json.dumps(items_data))
title = f"{len(items_data)} {btn_class}s ({total_samples:,} samples)"
return f'<button class="nav-btn {btn_class}" data-nav-multi=\'{items_json}\' title="{title}">{arrow}</button>'