internal/trace: add end-of-generation signal to trace

This change takes the EvEndOfGeneration event and promotes it to a real
event that appears in the trace.

This allows the trace parser to unambiguously identify truncated traces
vs. broken traces. It also makes a lot of the logic around parsing
simpler, because there's no more batch spilling necessary.

Fixes #73904.

Change-Id: I37c359b32b6b5f894825aafc02921adeaacf2595
Reviewed-on: https://go-review.googlesource.com/c/go/+/693398
Reviewed-by: Carlos Amedee <carlos@golang.org>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
Michael Anthony Knyszek 2025-08-05 21:37:07 +00:00 committed by Michael Knyszek
parent cb814bd5bc
commit 4a7fde922f
11 changed files with 284 additions and 146 deletions

View file

@ -44,6 +44,10 @@ func (b *batch) isSyncBatch(ver version.Version) bool {
(tracev2.EventType(b.data[0]) == tracev2.EvSync && ver >= version.Go125))
}
func (b *batch) isEndOfGeneration() bool {
return b.exp == tracev2.NoExperiment && len(b.data) > 0 && tracev2.EventType(b.data[0]) == tracev2.EvEndOfGeneration
}
// readBatch reads the next full batch from r.
func readBatch(r interface {
io.Reader
@ -54,6 +58,9 @@ func readBatch(r interface {
if err != nil {
return batch{}, 0, err
}
if typ := tracev2.EventType(b); typ == tracev2.EvEndOfGeneration {
return batch{m: NoThread, exp: tracev2.NoExperiment, data: []byte{b}}, 0, nil
}
if typ := tracev2.EventType(b); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
return batch{}, 0, fmt.Errorf("expected batch event, got event %d", typ)
}

View file

@ -9,6 +9,7 @@ import (
"bytes"
"cmp"
"encoding/binary"
"errors"
"fmt"
"io"
"slices"
@ -32,22 +33,102 @@ type generation struct {
*evTable
}
// readGeneration buffers and decodes the structural elements of a trace generation
// out of r.
func readGeneration(r *bufio.Reader, ver version.Version) (*generation, error) {
if ver < version.Go126 {
return nil, errors.New("internal error: readGeneration called for <1.26 trace")
}
g := &generation{
evTable: &evTable{
pcs: make(map[uint64]frame),
},
batches: make(map[ThreadID][]batch),
}
// Read batches one at a time until we either hit the next generation.
for {
b, gen, err := readBatch(r)
if err == io.EOF {
if len(g.batches) != 0 {
return nil, errors.New("incomplete generation found; trace likely truncated")
}
return nil, nil // All done.
}
if err != nil {
return nil, err
}
if g.gen == 0 {
// Initialize gen.
g.gen = gen
}
if b.isEndOfGeneration() {
break
}
if gen == 0 {
// 0 is a sentinel used by the runtime, so we'll never see it.
return nil, fmt.Errorf("invalid generation number %d", gen)
}
if gen != g.gen {
return nil, fmt.Errorf("broken trace: missing end-of-generation event, or generations are interleaved")
}
if g.minTs == 0 || b.time < g.minTs {
g.minTs = b.time
}
if err := processBatch(g, b, ver); err != nil {
return nil, err
}
}
// Check some invariants.
if g.freq == 0 {
return nil, fmt.Errorf("no frequency event found")
}
if !g.hasClockSnapshot {
return nil, fmt.Errorf("no clock snapshot event found")
}
// N.B. Trust that the batch order is correct. We can't validate the batch order
// by timestamp because the timestamps could just be plain wrong. The source of
// truth is the order things appear in the trace and the partial order sequence
// numbers on certain events. If it turns out the batch order is actually incorrect
// we'll very likely fail to advance a partial order from the frontier.
// Compactify stacks and strings for better lookup performance later.
g.stacks.compactify()
g.strings.compactify()
// Validate stacks.
if err := validateStackStrings(&g.stacks, &g.strings, g.pcs); err != nil {
return nil, err
}
// Now that we have the frequency, fix up CPU samples.
fixUpCPUSamples(g.cpuSamples, g.freq)
return g, nil
}
// spilledBatch represents a batch that was read out for the next generation,
// while reading the previous one. It's passed on when parsing the next
// generation.
//
// Used only for trace versions < Go126.
type spilledBatch struct {
gen uint64
*batch
}
// readGeneration buffers and decodes the structural elements of a trace generation
// readGenerationWithSpill buffers and decodes the structural elements of a trace generation
// out of r. spill is the first batch of the new generation (already buffered and
// parsed from reading the last generation). Returns the generation and the first
// batch read of the next generation, if any.
//
// If gen is non-nil, it is valid and must be processed before handling the returned
// error.
func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
func readGenerationWithSpill(r *bufio.Reader, spill *spilledBatch, ver version.Version) (*generation, *spilledBatch, error) {
if ver >= version.Go126 {
return nil, nil, errors.New("internal error: readGenerationWithSpill called for Go 1.26+ trace")
}
g := &generation{
evTable: &evTable{
pcs: make(map[uint64]frame),
@ -56,6 +137,7 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
}
// Process the spilled batch.
if spill != nil {
// Process the spilled batch, which contains real data.
g.gen = spill.gen
g.minTs = spill.batch.time
if err := processBatch(g, *spill.batch, ver); err != nil {
@ -63,8 +145,7 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
}
spill = nil
}
// Read batches one at a time until we either hit EOF or
// the next generation.
// Read batches one at a time until we either hit the next generation.
var spillErr error
for {
b, gen, err := readBatch(r)
@ -73,7 +154,7 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
}
if err != nil {
if g.gen != 0 {
// This is an error reading the first batch of the next generation.
// This may be an error reading the first batch of the next generation.
// This is fine. Let's forge ahead assuming that what we've got so
// far is fine.
spillErr = err
@ -89,7 +170,8 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
// Initialize gen.
g.gen = gen
}
if gen == g.gen+1 { // TODO: advance this the same way the runtime does.
if gen == g.gen+1 {
// TODO: Increment the generation with wraparound the same way the runtime does.
spill = &spilledBatch{gen: gen, batch: &b}
break
}
@ -134,15 +216,8 @@ func readGeneration(r *bufio.Reader, spill *spilledBatch, ver version.Version) (
return nil, nil, err
}
// Fix up the CPU sample timestamps, now that we have freq.
for i := range g.cpuSamples {
s := &g.cpuSamples[i]
s.time = g.freq.mul(timestamp(s.time))
}
// Sort the CPU samples.
slices.SortFunc(g.cpuSamples, func(a, b cpuSample) int {
return cmp.Compare(a.time, b.time)
})
// Now that we have the frequency, fix up CPU samples.
fixUpCPUSamples(g.cpuSamples, g.freq)
return g, spill, spillErr
}
@ -174,6 +249,8 @@ func processBatch(g *generation, b batch, ver version.Version) error {
if err := addExperimentalBatch(g.expBatches, b); err != nil {
return err
}
case b.isEndOfGeneration():
return errors.New("internal error: unexpectedly processing EndOfGeneration; broken trace?")
default:
if _, ok := g.batches[b.m]; !ok {
g.batchMs = append(g.batchMs, b.m)
@ -512,3 +589,15 @@ func addExperimentalBatch(expBatches map[tracev2.Experiment][]ExperimentalBatch,
})
return nil
}
func fixUpCPUSamples(samples []cpuSample, freq frequency) {
// Fix up the CPU sample timestamps.
for i := range samples {
s := &samples[i]
s.time = freq.mul(timestamp(s.time))
}
// Sort the CPU samples.
slices.SortFunc(samples, func(a, b cpuSample) int {
return cmp.Compare(a.time, b.time)
})
}

View file

@ -322,6 +322,14 @@ func (g *Generation) writeEventsTo(tw *raw.TextWriter) {
}
}
b.writeEventsTo(tw)
// Write end-of-generation event if necessary.
if g.trace.ver >= version.Go126 {
tw.WriteEvent(raw.Event{
Version: g.trace.ver,
Ev: tracev2.EvEndOfGeneration,
})
}
}
func (g *Generation) newStructuralBatch() *Batch {

View file

@ -6,6 +6,7 @@ package trace
import (
"bufio"
"errors"
"fmt"
"io"
"slices"
@ -22,18 +23,28 @@ import (
// event as the first event, and a Sync event as the last event.
// (There may also be any number of Sync events in the middle, too.)
type Reader struct {
version version.Version
r *bufio.Reader
lastTs Time
gen *generation
version version.Version
r *bufio.Reader
lastTs Time
gen *generation
frontier []*batchCursor
cpuSamples []cpuSample
order ordering
syncs int
done bool
// Spill state.
//
// Traces before Go 1.26 had no explicit end-of-generation signal, and
// so the first batch of the next generation needed to be parsed to identify
// a new generation. This batch is the "spilled" so we don't lose track
// of it when parsing the next generation.
//
// This is unnecessary after Go 1.26 because of an explicit end-of-generation
// signal.
spill *spilledBatch
spillErr error // error from reading spill
spillErrSync bool // whether we emitted a Sync before reporting spillErr
frontier []*batchCursor
cpuSamples []cpuSample
order ordering
syncs int
done bool
v1Events *traceV1Converter
}
@ -54,7 +65,7 @@ func NewReader(r io.Reader) (*Reader, error) {
return &Reader{
v1Events: convertV1Trace(tr),
}, nil
case version.Go122, version.Go123, version.Go125:
case version.Go122, version.Go123, version.Go125, version.Go126:
return &Reader{
version: v,
r: br,
@ -139,52 +150,14 @@ func (r *Reader) ReadEvent() (e Event, err error) {
// Check if we need to refresh the generation.
if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
if r.spillErr != nil {
if r.spillErrSync {
return Event{}, r.spillErr
}
r.spillErrSync = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
if r.version < version.Go126 {
return r.nextGenWithSpill()
}
if r.gen != nil && r.spill == nil {
// If we have a generation from the last read,
// and there's nothing left in the frontier, and
// there's no spilled batch, indicating that there's
// no further generation, it means we're done.
// Emit the final sync event.
r.done = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
gen, err := readGeneration(r.r, r.version)
if err != nil {
return Event{}, err
}
// Read the next generation.
r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill, r.version)
if r.gen == nil {
r.spillErrSync = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
// Reset CPU samples cursor.
r.cpuSamples = r.gen.cpuSamples
// Reset frontier.
for _, m := range r.gen.batchMs {
batches := r.gen.batches[m]
bc := &batchCursor{m: m}
ok, err := bc.nextEvent(batches, r.gen.freq)
if err != nil {
return Event{}, err
}
if !ok {
// Turns out there aren't actually any events in these batches.
continue
}
r.frontier = heapInsert(r.frontier, bc)
}
r.syncs++
// Always emit a sync event at the beginning of the generation.
return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
return r.installGen(gen)
}
tryAdvance := func(i int) (bool, error) {
bc := r.frontier[i]
@ -251,6 +224,78 @@ func (r *Reader) ReadEvent() (e Event, err error) {
return ev, nil
}
// nextGenWithSpill reads the generation and calls nextGen while
// also handling any spilled batches.
func (r *Reader) nextGenWithSpill() (Event, error) {
if r.version >= version.Go126 {
return Event{}, errors.New("internal error: nextGenWithSpill called for Go 1.26+ trace")
}
if r.spillErr != nil {
if r.spillErrSync {
return Event{}, r.spillErr
}
r.spillErrSync = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
if r.gen != nil && r.spill == nil {
// If we have a generation from the last read,
// and there's nothing left in the frontier, and
// there's no spilled batch, indicating that there's
// no further generation, it means we're done.
// Emit the final sync event.
r.done = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
// Read the next generation.
var gen *generation
gen, r.spill, r.spillErr = readGenerationWithSpill(r.r, r.spill, r.version)
if gen == nil {
r.gen = nil
r.spillErrSync = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
return r.installGen(gen)
}
// installGen installs the new generation into the Reader and returns
// a Sync event for the new generation.
func (r *Reader) installGen(gen *generation) (Event, error) {
if gen == nil {
// Emit the final sync event.
r.gen = nil
r.done = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
r.gen = gen
// Reset CPU samples cursor.
r.cpuSamples = r.gen.cpuSamples
// Reset frontier.
for _, m := range r.gen.batchMs {
batches := r.gen.batches[m]
bc := &batchCursor{m: m}
ok, err := bc.nextEvent(batches, r.gen.freq)
if err != nil {
return Event{}, err
}
if !ok {
// Turns out there aren't actually any events in these batches.
continue
}
r.frontier = heapInsert(r.frontier, bc)
}
r.syncs++
// Always emit a sync event at the beginning of the generation.
return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
}
func dumpFrontier(frontier []*batchCursor) string {
var sb strings.Builder
for _, bc := range frontier {

View file

@ -87,8 +87,8 @@ const (
EvSync // start of a sync batch [...EvFrequency|EvClockSnapshot]
EvClockSnapshot // snapshot of trace, mono and wall clocks [timestamp, mono, sec, nsec]
// Reserved internal in-band end-of-generation signal. Must never appear in the trace. Added in Go 1.25.
// This could be used as an explicit in-band end-of-generation signal in the future.
// In-band end-of-generation signal. Added in Go 1.26.
// Used in Go 1.25 only internally.
EvEndOfGeneration
NumEvents

View file

@ -21,7 +21,8 @@ const (
Go122 Version = 22 // v2
Go123 Version = 23 // v2
Go125 Version = 25 // v2
Current = Go125
Go126 Version = 26 // v2
Current = Go126
)
var versions = map[Version][]tracev2.EventSpec{
@ -33,7 +34,8 @@ var versions = map[Version][]tracev2.EventSpec{
Go122: tracev2.Specs()[:tracev2.EvUserLog+1], // All events after are Go 1.23+.
Go123: tracev2.Specs()[:tracev2.EvExperimentalBatch+1], // All events after are Go 1.25+.
Go125: tracev2.Specs(),
Go125: tracev2.Specs()[:tracev2.EvClockSnapshot+1], // All events after are Go 1.26+.
Go126: tracev2.Specs(),
}
// Specs returns the set of event.Specs for this version.

View file

@ -754,24 +754,7 @@ func traceRegisterLabelsAndReasons(gen uintptr) {
// was on has been returned, ReadTrace returns nil. The caller must copy the
// returned data before calling ReadTrace again.
// ReadTrace must be called from one goroutine at a time.
func ReadTrace() []byte {
for {
buf := readTrace()
// Skip over the end-of-generation signal which must not appear
// in the final trace.
if len(buf) == 1 && tracev2.EventType(buf[0]) == tracev2.EvEndOfGeneration {
continue
}
return buf
}
}
// readTrace is the implementation of ReadTrace, except with an additional
// in-band signal as to when the buffer is for a new generation.
//
//go:linkname readTrace runtime/trace.runtime_readTrace
func readTrace() (buf []byte) {
func ReadTrace() (buf []byte) {
top:
var park bool
systemstack(func() {
@ -842,7 +825,7 @@ func readTrace0() (buf []byte, park bool) {
if !trace.headerWritten {
trace.headerWritten = true
unlock(&trace.lock)
return []byte("go 1.25 trace\x00\x00\x00"), false
return []byte("go 1.26 trace\x00\x00\x00"), false
}
// Read the next buffer.

View file

@ -12,72 +12,77 @@ import (
// timestamp is an unprocessed timestamp.
type timestamp uint64
// batch represents a batch of trace events.
// It is unparsed except for its header.
type batch struct {
m threadID
time timestamp
gen uint64
data []byte
}
// threadID is the runtime-internal M structure's ID. This is unique
// for each OS thread.
type threadID int64
// readBatch copies b and parses the trace batch header inside.
// Returns the batch, the generation, bytes read, and an error.
func readBatch(b []byte) (batch, uint64, uint64, error) {
// Returns the batch, bytes read, and an error.
func readBatch(b []byte) (batch, uint64, error) {
if len(b) == 0 {
return batch{}, 0, 0, fmt.Errorf("batch is empty")
return batch{}, 0, fmt.Errorf("batch is empty")
}
data := make([]byte, len(b))
if nw := copy(data, b); nw != len(b) {
return batch{}, 0, 0, fmt.Errorf("unexpected error copying batch")
}
// Read batch header byte.
if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
return batch{}, 0, 1, fmt.Errorf("expected batch event, got event %d", typ)
}
copy(data, b)
// Read the batch header: gen (generation), thread (M) ID, base timestamp
// for the batch.
// Read batch header byte.
if typ := tracev2.EventType(b[0]); typ == tracev2.EvEndOfGeneration {
if len(b) != 1 {
return batch{}, 1, fmt.Errorf("unexpected end of generation in batch of size >1")
}
return batch{data: data}, 1, nil
}
if typ := tracev2.EventType(b[0]); typ != tracev2.EvEventBatch && typ != tracev2.EvExperimentalBatch {
return batch{}, 1, fmt.Errorf("expected batch event, got event %d", typ)
}
total := 1
b = b[1:]
// Read the generation
gen, n, err := readUvarint(b)
if err != nil {
return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
}
total += n
b = b[n:]
m, n, err := readUvarint(b)
if err != nil {
return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
}
total += n
b = b[n:]
ts, n, err := readUvarint(b)
if err != nil {
return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
return batch{}, uint64(total + n), fmt.Errorf("error reading batch gen: %w", err)
}
total += n
b = b[n:]
// Read in the size of the batch to follow.
// Read the M (discard it).
_, n, err = readUvarint(b)
if err != nil {
return batch{}, uint64(total + n), fmt.Errorf("error reading batch M ID: %w", err)
}
total += n
b = b[n:]
// Read the timestamp.
ts, n, err := readUvarint(b)
if err != nil {
return batch{}, uint64(total + n), fmt.Errorf("error reading batch timestamp: %w", err)
}
total += n
b = b[n:]
// Read the size of the batch to follow.
size, n, err := readUvarint(b)
if err != nil {
return batch{}, gen, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
return batch{}, uint64(total + n), fmt.Errorf("error reading batch size: %w", err)
}
if size > tracev2.MaxBatchSize {
return batch{}, gen, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
return batch{}, uint64(total + n), fmt.Errorf("invalid batch size %d, maximum is %d", size, tracev2.MaxBatchSize)
}
total += n
total += int(size)
if total != len(data) {
return batch{}, uint64(total), fmt.Errorf("expected complete batch")
}
data = data[:total]
// Return the batch.
return batch{
m: threadID(m),
gen: gen,
time: timestamp(ts),
data: data,
}, gen, uint64(total), nil
}, uint64(total), nil
}

View file

@ -141,9 +141,9 @@ func (fr *FlightRecorder) WriteTo(w io.Writer) (n int64, err error) {
// Write all the data.
for _, gen := range gens {
for _, batch := range gen.batches {
for _, data := range gen.batches {
// Write batch data.
nw, err = w.Write(batch.data)
nw, err = w.Write(data)
n += int64(nw)
if err != nil {
return n, err

View file

@ -41,21 +41,21 @@ func (w *recorder) Write(b []byte) (n int, err error) {
if len(b) == n {
return 0, nil
}
ba, gen, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch.
ba, nb, err := readBatch(b[n:]) // Every write from the runtime is guaranteed to be a complete batch.
if err != nil {
return len(b) - int(nb) - n, err
}
n += int(nb)
// Append the batch to the current generation.
if r.active.gen == 0 {
r.active.gen = gen
if ba.gen != 0 && r.active.gen == 0 {
r.active.gen = ba.gen
}
if r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time) {
if ba.time != 0 && (r.active.minTime == 0 || r.active.minTime > r.freq.mul(ba.time)) {
r.active.minTime = r.freq.mul(ba.time)
}
r.active.size += len(ba.data)
r.active.batches = append(r.active.batches, ba)
r.active.batches = append(r.active.batches, ba.data)
return len(b), nil
}
@ -99,7 +99,7 @@ type rawGeneration struct {
gen uint64
size int
minTime eventTime
batches []batch
batches [][]byte
}
func traceTimeNow(freq frequency) eventTime {

View file

@ -155,7 +155,7 @@ func (t *traceMultiplexer) startLocked() error {
t.subscribersMu.Unlock()
go func() {
header := runtime_readTrace()
header := runtime.ReadTrace()
if traceStartWriter != nil {
traceStartWriter.Write(header)
}
@ -164,10 +164,16 @@ func (t *traceMultiplexer) startLocked() error {
}
for {
data := runtime_readTrace()
data := runtime.ReadTrace()
if data == nil {
break
}
if traceStartWriter != nil {
traceStartWriter.Write(data)
}
if flightRecorder != nil {
flightRecorder.Write(data)
}
if len(data) == 1 && tracev2.EventType(data[0]) == tracev2.EvEndOfGeneration {
if flightRecorder != nil {
flightRecorder.endGeneration()
@ -187,13 +193,6 @@ func (t *traceMultiplexer) startLocked() error {
if frIsNew {
flightRecorder.Write(header)
}
} else {
if traceStartWriter != nil {
traceStartWriter.Write(data)
}
if flightRecorder != nil {
flightRecorder.Write(data)
}
}
}
}()