[3.15] gh-150662: Stop unbounded memory growth in Tachyon --gecko collector (GH-150845) (#151000)

This commit is contained in:
Miss Islington (bot) 2026-06-06 14:45:53 +02:00 committed by GitHub
parent ec9b40d740
commit ad8a3d33be
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 452 additions and 130 deletions

View file

@ -1,8 +1,10 @@
import itertools
import io
import json
import os
import platform
import sys
import tempfile
import threading
import time
@ -61,6 +63,77 @@
PROCESS_TYPE_MAIN = 0
STACKWALK_DISABLED = 0
# In-memory buffer before spilling to disk
DEFAULT_SPILL_BUFFER_BYTES = 128 * 1024
_JSON_SEPARATORS = (",", ":")
_JSON_ENCODER = json.JSONEncoder(
separators=_JSON_SEPARATORS, allow_nan=False
)
class SpillColumn:
def __init__(self, directory, basename, *,
buffer_bytes=None):
self.path = os.path.join(directory, basename)
self.buffer = bytearray()
self._buffer_bytes = (
DEFAULT_SPILL_BUFFER_BYTES if buffer_bytes is None
else buffer_bytes
)
def append(self, value):
self.buffer += (_JSON_ENCODER.encode(value) + "\n").encode("utf-8")
if len(self.buffer) >= self._buffer_bytes:
self.flush()
def flush(self):
with open(self.path, "ab") as file:
file.write(self.buffer)
self.buffer.clear()
def iter_tokens(self):
with open(self.path, encoding="utf-8") as file:
for line in file:
yield line.rstrip("\n")
class GeckoThreadSpill:
_COLUMNS = (
("samples_stack", "samples-stack.json"),
("samples_time", "samples-time.json"),
("markers_name", "markers-name.json"),
("markers_start_time", "markers-start-time.json"),
("markers_end_time", "markers-end-time.json"),
("markers_phase", "markers-phase.json"),
("markers_category", "markers-category.json"),
("markers_data", "markers-data.json"),
)
def __init__(self, directory, tid):
prefix = f"thread-{tid}-"
for attr, basename in self._COLUMNS:
setattr(self, attr, SpillColumn(directory, prefix + basename))
self.sample_count = 0
self.marker_count = 0
def append_sample(self, stack_index, time_ms):
self.samples_stack.append(stack_index)
self.samples_time.append(time_ms)
self.sample_count += 1
def append_marker(self, name_idx, start_time, end_time, phase, category, data):
self.markers_name.append(name_idx)
self.markers_start_time.append(start_time)
self.markers_end_time.append(end_time)
self.markers_phase.append(phase)
self.markers_category.append(category)
self.markers_data.append(data)
self.marker_count += 1
def prepare_read(self):
for attr, _basename in self._COLUMNS:
getattr(self, attr).flush()
class GeckoCollector(Collector):
aggregating = True
@ -77,6 +150,8 @@ def __init__(self, sample_interval_usec, *, skip_idle=False, opcodes=False):
# Per-thread data structures
self.threads = {} # tid -> thread data
self.spill_dir = None
self.exported = False
# Global tables
self.libs = []
@ -151,6 +226,9 @@ def collect(self, stack_frames, timestamps_us=None):
stack_frames: List of interpreter/thread frame info
timestamps_us: List of timestamps in microseconds (None for live sampling)
"""
if self.exported:
raise RuntimeError("cannot append to GeckoCollector after export")
# Handle live sampling (no timestamps provided)
if timestamps_us is None:
current_time = (time.monotonic() * 1000) - self.start_time
@ -259,15 +337,9 @@ def collect(self, stack_frames, timestamps_us=None):
stack_index = self._process_stack(thread_data, frames)
# Add samples with timestamps
samples = thread_data["samples"]
samples_stack = samples["stack"]
samples_time = samples["time"]
samples_delay = samples["eventDelay"]
thread_spill = thread_data["_spill"]
for t in times:
samples_stack.append(stack_index)
samples_time.append(t)
samples_delay.append(None)
thread_spill.append_sample(stack_index, t)
# Handle opcodes
if self.opcodes_enabled and frames:
@ -294,6 +366,8 @@ def collect(self, stack_frames, timestamps_us=None):
def _create_thread(self, tid, is_main_thread):
"""Create a new thread structure with processed profile format."""
if self.spill_dir is None:
self.spill_dir = tempfile.TemporaryDirectory()
thread = {
"name": f"Thread-{tid}",
@ -307,15 +381,6 @@ def _create_thread(self, tid, is_main_thread):
"tid": tid,
"processType": "default",
"processName": "Python Process",
# Sample data - processed format with direct arrays
"samples": {
"stack": [],
"time": [],
"eventDelay": [],
"weight": None,
"weightType": "samples",
"length": 0, # Will be updated on export
},
# Stack table - processed format
"stackTable": {
"frame": [],
@ -366,21 +431,12 @@ def _create_thread(self, tid, is_main_thread):
"functionSize": [],
"length": 0,
},
# Markers - processed format (arrays)
"markers": {
"data": [],
"name": [],
"startTime": [],
"endTime": [],
"phase": [],
"category": [],
"length": 0,
},
# Caches for deduplication
"_stackCache": {},
"_frameCache": {},
"_funcCache": {},
"_resourceCache": {},
"_spill": GeckoThreadSpill(self.spill_dir.name, tid),
}
return thread
@ -405,51 +461,42 @@ def _add_marker(self, tid, name, start_time, end_time, category):
if tid not in self.threads:
return
thread_data = self.threads[tid]
duration = end_time - start_time
name_idx = self._intern_string(name)
markers = thread_data["markers"]
markers["name"].append(name_idx)
markers["startTime"].append(start_time)
markers["endTime"].append(end_time)
markers["phase"].append(1) # 1 = interval marker
markers["category"].append(category)
markers["data"].append({
"type": name.replace(" ", ""),
"duration": duration,
"tid": tid
})
self.threads[tid]["_spill"].append_marker(
name_idx, start_time, end_time, 1, category, {
"type": name.replace(" ", ""),
"duration": duration,
"tid": tid,
}
)
def _add_opcode_interval_marker(self, tid, opcode, lineno, col_offset, funcname, start_time, end_time):
def _add_opcode_interval_marker(self, tid, opcode, lineno, col_offset,
funcname, start_time, end_time):
"""Add an interval marker for opcode execution span."""
if tid not in self.threads or opcode is None:
return
thread_data = self.threads[tid]
opcode_info = get_opcode_info(opcode)
# Use formatted opcode name (with base opcode for specialized ones)
formatted_opname = format_opcode(opcode)
name_idx = self._intern_string(formatted_opname)
markers = thread_data["markers"]
markers["name"].append(name_idx)
markers["startTime"].append(start_time)
markers["endTime"].append(end_time)
markers["phase"].append(1) # 1 = interval marker
markers["category"].append(CATEGORY_OPCODES)
markers["data"].append({
"type": "Opcode",
"opcode": opcode,
"opname": formatted_opname,
"base_opname": opcode_info["base_opname"],
"is_specialized": opcode_info["is_specialized"],
"line": lineno,
"column": col_offset if col_offset >= 0 else None,
"function": funcname,
"duration": end_time - start_time,
})
self.threads[tid]["_spill"].append_marker(
name_idx, start_time, end_time, 1, CATEGORY_OPCODES, {
"type": "Opcode",
"opcode": opcode,
"opname": formatted_opname,
"base_opname": opcode_info["base_opname"],
"is_specialized": opcode_info["is_specialized"],
"line": lineno,
"column": col_offset if col_offset >= 0 else None,
"function": funcname,
"duration": end_time - start_time,
}
)
def _process_stack(self, thread_data, frames):
"""Process a stack and return the stack index."""
@ -660,7 +707,6 @@ def _finalize_markers(self):
def export(self, filename):
"""Export the profile to a Gecko JSON file."""
if self.sample_count > 0 and self.last_sample_time > 0:
self.interval = self.last_sample_time / self.sample_count
@ -681,19 +727,30 @@ def spin():
spinner_thread = threading.Thread(target=spin, daemon=True)
spinner_thread.start()
temp_path = None
replaced = False
try:
# Finalize any open markers before building profile
self._finalize_markers()
profile = self._build_profile()
with open(filename, "w") as f:
json.dump(profile, f, separators=(",", ":"))
self._prepare_for_serialization()
output_dir = os.path.dirname(os.path.abspath(filename)) or "."
with tempfile.NamedTemporaryFile(
"w", dir=output_dir, delete=False
) as file:
temp_path = file.name
self._stream_profile(file)
os.replace(temp_path, filename)
replaced = True
finally:
self.exported = True
stop_spinner.set()
spinner_thread.join(timeout=1.0)
# Small delay to ensure the clear happens
time.sleep(0.01)
if temp_path is not None and not replaced:
try:
os.unlink(temp_path)
except FileNotFoundError:
pass
self._cleanup_spills()
print(f"Gecko profile written to {filename}")
print(
@ -727,34 +784,17 @@ def _build_marker_schema(self):
def _build_profile(self):
"""Build the complete profile structure in processed format."""
# Convert thread data to final format
threads = []
try:
self._prepare_for_serialization()
file = io.StringIO()
self._stream_profile(file)
return json.loads(file.getvalue())
finally:
self.exported = True
self._cleanup_spills()
for tid, thread_data in self.threads.items():
# Update lengths
samples = thread_data["samples"]
stack_table = thread_data["stackTable"]
frame_table = thread_data["frameTable"]
func_table = thread_data["funcTable"]
resource_table = thread_data["resourceTable"]
samples["length"] = len(samples["stack"])
stack_table["length"] = len(stack_table["frame"])
frame_table["length"] = len(frame_table["func"])
func_table["length"] = len(func_table["name"])
resource_table["length"] = len(resource_table["name"])
thread_data["markers"]["length"] = len(thread_data["markers"]["name"])
# Clean up internal caches
del thread_data["_stackCache"]
del thread_data["_frameCache"]
del thread_data["_funcCache"]
del thread_data["_resourceCache"]
threads.append(thread_data)
# Main profile structure in processed format
profile = {
def _profile_head(self):
return {
"meta": {
"interval": self.interval,
"startTime": self.start_time,
@ -784,7 +824,10 @@ def _build_profile(self):
},
},
"libs": self.libs,
"threads": threads,
}
def _profile_tail(self):
return {
"pages": [],
"shared": {
"stringArray": self.global_strings,
@ -792,4 +835,146 @@ def _build_profile(self):
},
}
return profile
def _prepare_for_serialization(self):
if self.exported:
raise RuntimeError("GeckoCollector has already been exported")
self._finalize_markers()
for thread_data in self.threads.values():
thread_data["_spill"].prepare_read()
thread_data["stackTable"]["length"] = len(thread_data["stackTable"]["frame"])
thread_data["frameTable"]["length"] = len(thread_data["frameTable"]["func"])
thread_data["funcTable"]["length"] = len(thread_data["funcTable"]["name"])
thread_data["resourceTable"]["length"] = len(thread_data["resourceTable"]["name"])
def _cleanup_spills(self):
if self.spill_dir is not None:
self.spill_dir.cleanup()
self.spill_dir = None
def _stream_profile(self, file):
file.write("{")
first = True
for key, value in self._profile_head().items():
first = _write_json_member(file, key, value, first)
first = _write_member_name(file, "threads", first)
file.write("[")
for index, (tid, thread_data) in enumerate(self.threads.items()):
if index:
file.write(",")
self._stream_thread(file, tid, thread_data)
file.write("]")
for key, value in self._profile_tail().items():
first = _write_json_member(file, key, value, first)
file.write("}")
def _stream_thread(self, file, tid, thread_data):
spill = thread_data["_spill"]
metadata = {
"name": thread_data["name"],
"isMainThread": thread_data["isMainThread"],
"processStartupTime": thread_data["processStartupTime"],
"processShutdownTime": thread_data["processShutdownTime"],
"registerTime": thread_data["registerTime"],
"unregisterTime": thread_data["unregisterTime"],
"pausedRanges": thread_data["pausedRanges"],
"pid": thread_data["pid"],
"tid": thread_data["tid"],
"processType": thread_data["processType"],
"processName": thread_data["processName"],
}
file.write("{")
first = True
for key, value in metadata.items():
first = _write_json_member(file, key, value, first)
first = _write_member_name(file, "samples", first)
self._stream_samples(file, spill)
for key in (
"stackTable",
"frameTable",
"funcTable",
"resourceTable",
"nativeSymbols",
):
first = _write_json_member(file, key, thread_data[key], first)
first = _write_member_name(file, "markers", first)
self._stream_markers(file, spill)
file.write("}")
def _stream_samples(self, file, spill):
_stream_column_table(
file,
(
("stack", spill.samples_stack.iter_tokens()),
("time", spill.samples_time.iter_tokens()),
("eventDelay", ("null" for _ in range(spill.sample_count))),
),
spill.sample_count,
(
("weight", None),
("weightType", "samples"),
("length", spill.sample_count),
),
)
def _stream_markers(self, file, spill):
_stream_column_table(
file,
(
("data", spill.markers_data.iter_tokens()),
("name", spill.markers_name.iter_tokens()),
("startTime", spill.markers_start_time.iter_tokens()),
("endTime", spill.markers_end_time.iter_tokens()),
("phase", spill.markers_phase.iter_tokens()),
("category", spill.markers_category.iter_tokens()),
),
spill.marker_count,
(("length", spill.marker_count),),
)
def _write_json(file, value):
for chunk in _JSON_ENCODER.iterencode(value):
file.write(chunk)
def _write_member_name(file, name, first):
if not first:
file.write(",")
_write_json(file, name)
file.write(":")
return False
def _write_json_member(file, name, value, first):
first = _write_member_name(file, name, first)
_write_json(file, value)
return first
def _stream_column_table(file, columns, expected_count, trailing_members=()):
file.write("{")
first = True
for name, token_iter in columns:
first = _write_member_name(file, name, first)
_stream_array(file, token_iter, expected_count, name)
for name, value in trailing_members:
first = _write_json_member(file, name, value, first)
file.write("}")
def _stream_array(file, token_iter, expected_count, label="array"):
file.write("[")
count = 0
for token in token_iter:
if count:
file.write(",")
file.write(token)
count += 1
if count != expected_count:
raise RuntimeError(
f"streamed {count} {label} items, expected {expected_count}"
)
file.write("]")

View file

@ -11,6 +11,7 @@
try:
import _remote_debugging # noqa: F401
from profiling.sampling import gecko_collector
from profiling.sampling.pstats_collector import PstatsCollector
from profiling.sampling.stack_collector import (
CollapsedStackCollector,
@ -59,6 +60,42 @@ def find_child_by_name(children, strings, substr):
return None
def export_gecko_profile(testcase, collector):
gecko_out = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
testcase.addCleanup(close_and_unlink, gecko_out)
# We cannot overwrite an open file on Windows.
gecko_out.close()
with captured_stdout(), captured_stderr():
collector.export(gecko_out.name)
testcase.assertGreater(os.path.getsize(gecko_out.name), 0)
with open(gecko_out.name, encoding="utf-8") as file:
return json.load(file)
def assert_gecko_column_lengths(testcase, table, columns):
expected = table["length"]
for column in columns:
testcase.assertEqual(
len(table[column]), expected,
f"{column!r} has wrong length",
)
def gecko_marker_names(profile, markers):
string_array = profile["shared"]["stringArray"]
return [string_array[idx] for idx in markers["name"]]
def gecko_opcode_marker_data(profile):
markers = profile["threads"][0]["markers"]
return [
data for data in markers["data"]
if data.get("type") == "Opcode"
]
class TestSampleProfilerComponents(unittest.TestCase):
"""Unit tests for individual profiler components."""
@ -583,9 +620,10 @@ def test_gecko_collector_basic(self):
# Verify samples
samples = thread_data["samples"]
self.assertEqual(len(samples["stack"]), 1)
self.assertEqual(len(samples["time"]), 1)
self.assertEqual(samples["length"], 1)
assert_gecko_column_lengths(
self, samples, ("stack", "time", "eventDelay")
)
# Verify function table structure and content
func_table = thread_data["funcTable"]
@ -622,9 +660,6 @@ def test_gecko_collector_basic(self):
@unittest.skipIf(is_emscripten, "threads not available")
def test_gecko_collector_export(self):
"""Test Gecko profile export functionality."""
gecko_out = tempfile.NamedTemporaryFile(suffix=".json", delete=False)
self.addCleanup(close_and_unlink, gecko_out)
collector = GeckoCollector(1000)
test_frames1 = [
@ -657,17 +692,7 @@ def test_gecko_collector_export(self):
collector.collect(test_frames2)
collector.collect(test_frames3)
# Export gecko profile
with captured_stdout(), captured_stderr():
collector.export(gecko_out.name)
# Verify file was created and contains valid data
self.assertTrue(os.path.exists(gecko_out.name))
self.assertGreater(os.path.getsize(gecko_out.name), 0)
# Check file contains valid JSON
with open(gecko_out.name, "r") as f:
profile_data = json.load(f)
profile_data = export_gecko_profile(self, collector)
# Should be valid Gecko profile format
self.assertIn("meta", profile_data)
@ -688,6 +713,100 @@ def test_gecko_collector_export(self):
self.assertIn("func2", string_array)
self.assertIn("other_func", string_array)
thread_data = profile_data["threads"][0]
assert_gecko_column_lengths(
self, thread_data["samples"], ("stack", "time", "eventDelay")
)
@unittest.skipIf(is_emscripten, "threads not available")
def test_gecko_collector_export_after_spill_flush(self):
"""Test Gecko profile export after spill buffers flush to disk."""
old_buffer_bytes = gecko_collector.DEFAULT_SPILL_BUFFER_BYTES
gecko_collector.DEFAULT_SPILL_BUFFER_BYTES = 1
self.addCleanup(
setattr, gecko_collector, "DEFAULT_SPILL_BUFFER_BYTES",
old_buffer_bytes
)
collector = GeckoCollector(1000)
test_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[MockFrameInfo("file.py", 10, "func")],
status=THREAD_STATUS_HAS_GIL,
)
],
)
]
collector.collect(test_frames, timestamps_us=[1000, 2000, 3000])
profile_data = export_gecko_profile(self, collector)
samples = profile_data["threads"][0]["samples"]
self.assertEqual(samples["length"], 3)
assert_gecko_column_lengths(
self, samples, ("stack", "time", "eventDelay")
)
@unittest.skipIf(is_emscripten, "threads not available")
def test_gecko_collector_rejects_collect_after_export(self):
collector = GeckoCollector(1000)
test_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[MockFrameInfo("file.py", 10, "func")],
status=THREAD_STATUS_HAS_GIL,
)
],
)
]
collector.collect(test_frames)
export_gecko_profile(self, collector)
with self.assertRaisesRegex(RuntimeError, "after export"):
collector.collect(test_frames)
@unittest.skipIf(is_emscripten, "threads not available")
def test_gecko_collector_export_failure_keeps_existing_file(self):
collector = GeckoCollector(1000)
test_frames = [
MockInterpreterInfo(
0,
[
MockThreadInfo(
1,
[MockFrameInfo("file.py", 10, "func")],
status=THREAD_STATUS_HAS_GIL,
)
],
)
]
collector.collect(test_frames)
with tempfile.TemporaryDirectory() as temp_dir:
filename = os.path.join(temp_dir, "profile.json")
with open(filename, "w", encoding="utf-8") as file:
file.write("existing")
before = set(os.listdir(temp_dir))
def fail(file):
raise OSError("boom")
collector._stream_profile = fail
with captured_stdout(), captured_stderr():
with self.assertRaisesRegex(OSError, "boom"):
collector.export(filename)
with open(filename, encoding="utf-8") as file:
self.assertEqual(file.read(), "existing")
self.assertEqual(set(os.listdir(temp_dir)), before)
def test_gecko_collector_markers(self):
"""Test Gecko profile markers for GIL and CPU state tracking."""
collector = GeckoCollector(1000)
@ -771,21 +890,16 @@ def test_gecko_collector_markers(self):
self.assertIn("markers", thread_data)
markers = thread_data["markers"]
# Should have marker arrays
self.assertIn("name", markers)
self.assertIn("startTime", markers)
self.assertIn("endTime", markers)
self.assertIn("category", markers)
self.assertGreater(
markers["length"], 0, "Should have generated markers"
)
# Get marker names from string table
string_array = profile_data["shared"]["stringArray"]
marker_names = [string_array[idx] for idx in markers["name"]]
assert_gecko_column_lengths(
self, markers,
("data", "name", "startTime", "endTime", "phase", "category"),
)
# Verify we have different marker types
marker_name_set = set(marker_names)
marker_name_set = set(gecko_marker_names(profile_data, markers))
# Should have "Has GIL" markers (when thread had GIL)
self.assertIn(
@ -2659,6 +2773,7 @@ def test_gecko_collector_opcodes_enabled(self):
def test_gecko_opcode_state_tracking(self):
"""Test that GeckoCollector tracks opcode state changes."""
collector = GeckoCollector(sample_interval_usec=1000, opcodes=True)
self.addCleanup(collector._cleanup_spills)
# First sample with opcode 90 (RAISE_VARARGS)
frame1 = MockFrameInfo("test.py", 10, "func", opcode=90)
@ -2702,10 +2817,28 @@ def test_gecko_opcode_state_change_emits_marker(self):
collector.collect(frames2)
# Should have emitted a marker for the first opcode
thread_data = collector.threads[1]
markers = thread_data["markers"]
# At least one marker should have been added
self.assertGreater(len(markers["name"]), 0)
profile = collector._build_profile()
markers = profile["threads"][0]["markers"]
assert_gecko_column_lengths(
self, markers,
("data", "name", "startTime", "endTime", "phase", "category"),
)
opcode_markers = gecko_opcode_marker_data(profile)
self.assertIn(
{
"opcode": 90,
"line": 10,
"function": "func",
},
[
{
"opcode": marker["opcode"],
"line": marker["line"],
"function": marker["function"],
}
for marker in opcode_markers
],
)
def test_gecko_opcode_markers_not_emitted_when_disabled(self):
"""Test that no opcode markers when opcodes=False."""
@ -2729,8 +2862,9 @@ def test_gecko_opcode_markers_not_emitted_when_disabled(self):
]
collector.collect(frames2)
# opcode_state should not be tracked
self.assertEqual(len(collector.opcode_state), 0)
profile = collector._build_profile()
self.assertEqual(gecko_opcode_marker_data(profile), [])
self.assertEqual(profile["meta"]["markerSchema"], [])
def test_gecko_opcode_with_none_opcode(self):
"""Test that None opcode doesn't cause issues."""
@ -2746,9 +2880,8 @@ def test_gecko_opcode_with_none_opcode(self):
]
collector.collect(frames)
# Should track the state but opcode is None
self.assertIn(1, collector.opcode_state)
self.assertIsNone(collector.opcode_state[1][0])
profile = collector._build_profile()
self.assertEqual(gecko_opcode_marker_data(profile), [])
class TestCollectorFrameFormat(unittest.TestCase):

View file

@ -0,0 +1,4 @@
Fix the ``--gecko`` collector in :mod:`profiling.sampling` that kept every
sample in memory. It now writes sample and marker data to temporary files
and reads them back, ultimately building the output file at the end. Patch
by Pablo Galindo and Maurycy Pawłowski-Wieroński.