diff --git a/src/context/benchmark_test.go b/src/context/benchmark_test.go index b79232704eb..3c526dd1069 100644 --- a/src/context/benchmark_test.go +++ b/src/context/benchmark_test.go @@ -7,10 +7,64 @@ package context_test import ( . "context" "fmt" + "runtime" + "sync" "testing" + "time" ) -func BenchmarkContextCancelTree(b *testing.B) { +func BenchmarkWithTimeout(b *testing.B) { + for concurrency := 40; concurrency <= 4e5; concurrency *= 100 { + name := fmt.Sprintf("concurrency=%d", concurrency) + b.Run(name, func(b *testing.B) { + benchmarkWithTimeout(b, concurrency) + }) + } +} + +func benchmarkWithTimeout(b *testing.B, concurrentContexts int) { + gomaxprocs := runtime.GOMAXPROCS(0) + perPContexts := concurrentContexts / gomaxprocs + root := Background() + + // Generate concurrent contexts. + var wg sync.WaitGroup + ccf := make([][]CancelFunc, gomaxprocs) + for i := range ccf { + wg.Add(1) + go func(i int) { + defer wg.Done() + cf := make([]CancelFunc, perPContexts) + for j := range cf { + _, cf[j] = WithTimeout(root, time.Hour) + } + ccf[i] = cf + }(i) + } + wg.Wait() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + wcf := make([]CancelFunc, 10) + for pb.Next() { + for i := range wcf { + _, wcf[i] = WithTimeout(root, time.Hour) + } + for _, f := range wcf { + f() + } + } + }) + b.StopTimer() + + for _, cf := range ccf { + for _, f := range cf { + f() + } + } +} + +func BenchmarkCancelTree(b *testing.B) { depths := []int{1, 10, 100, 1000} for _, d := range depths { b.Run(fmt.Sprintf("depth=%d", d), func(b *testing.B) { diff --git a/src/internal/trace/parser.go b/src/internal/trace/parser.go index 31b41bcac58..2e145129ebe 100644 --- a/src/internal/trace/parser.go +++ b/src/internal/trace/parser.go @@ -270,8 +270,9 @@ func parseHeader(buf []byte) (int, error) { // It does analyze and verify per-event-type arguments. func parseEvents(ver int, rawEvents []rawEvent, strings map[uint64]string) (events []*Event, stacks map[uint64][]*Frame, err error) { var ticksPerSec, lastSeq, lastTs int64 - var lastG, timerGoid uint64 + var lastG uint64 var lastP int + timerGoids := make(map[uint64]bool) lastGs := make(map[int]uint64) // last goroutine running on P stacks = make(map[uint64][]*Frame) batches := make(map[int][]*Event) // events by P @@ -308,7 +309,7 @@ func parseEvents(ver int, rawEvents []rawEvent, strings map[uint64]string) (even return } case EvTimerGoroutine: - timerGoid = raw.args[0] + timerGoids[raw.args[0]] = true case EvStack: if len(raw.args) < 2 { err = fmt.Errorf("EvStack has wrong number of arguments at offset 0x%x: want at least 2, got %v", @@ -431,7 +432,7 @@ func parseEvents(ver int, rawEvents []rawEvent, strings map[uint64]string) (even for _, ev := range events { ev.Ts = int64(float64(ev.Ts-minTs) * freq) // Move timers and syscalls to separate fake Ps. - if timerGoid != 0 && ev.G == timerGoid && ev.Type == EvGoUnblock { + if timerGoids[ev.G] && ev.Type == EvGoUnblock { ev.P = TimerP } if ev.Type == EvGoSysExit { diff --git a/src/runtime/proc.go b/src/runtime/proc.go index 21fff7de5d4..98753551a9b 100644 --- a/src/runtime/proc.go +++ b/src/runtime/proc.go @@ -3863,15 +3863,11 @@ func sysmon() { } shouldRelax := true if osRelaxMinNS > 0 { - lock(&timers.lock) - if timers.sleeping { - now := nanotime() - next := timers.sleepUntil - if next-now < osRelaxMinNS { - shouldRelax = false - } + next := timeSleepUntil() + now := nanotime() + if next-now < osRelaxMinNS { + shouldRelax = false } - unlock(&timers.lock) } if shouldRelax { osRelax(true) diff --git a/src/runtime/time.go b/src/runtime/time.go index abf200d7d3f..6bfa6ba1608 100644 --- a/src/runtime/time.go +++ b/src/runtime/time.go @@ -6,14 +6,18 @@ package runtime -import "unsafe" +import ( + "runtime/internal/sys" + "unsafe" +) // Package time knows the layout of this structure. // If this struct changes, adjust ../time/sleep.go:/runtimeTimer. // For GOOS=nacl, package syscall knows the layout of this structure. // If this struct changes, adjust ../syscall/net_nacl.go:/runtimeTimer. type timer struct { - i int // heap index + tb *timersBucket // the bucket the timer lives in + i int // heap index // Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be @@ -25,7 +29,37 @@ type timer struct { seq uintptr } -var timers struct { +// timersLen is the length of timers array. +// +// Ideally, this would be set to GOMAXPROCS, but that would require +// dynamic reallocation +// +// The current value is a compromise between memory usage and performance +// that should cover the majority of GOMAXPROCS values used in the wild. +const timersLen = 64 + +// timers contains "per-P" timer heaps. +// +// Timers are queued into timersBucket associated with the current P, +// so each P may work with its own timers independently of other P instances. +// +// Each timersBucket may be associated with multiple P +// if GOMAXPROCS > timersLen. +var timers [timersLen]struct { + timersBucket + + // The padding should eliminate false sharing + // between timersBucket values. + pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte +} + +func (t *timer) assignBucket() *timersBucket { + id := uint8(getg().m.p.ptr().id) % timersLen + t.tb = &timers[id].timersBucket + return t.tb +} + +type timersBucket struct { lock mutex gp *g created bool @@ -51,18 +85,20 @@ func timeSleep(ns int64) { return } - t := getg().timer + gp := getg() + t := gp.timer if t == nil { t = new(timer) - getg().timer = t + gp.timer = t } *t = timer{} t.when = nanotime() + ns t.f = goroutineReady - t.arg = getg() - lock(&timers.lock) - addtimerLocked(t) - goparkunlock(&timers.lock, "sleep", traceEvGoSleep, 2) + t.arg = gp + tb := t.assignBucket() + lock(&tb.lock) + tb.addtimerLocked(t) + goparkunlock(&tb.lock, "sleep", traceEvGoSleep, 2) } // startTimer adds t to the timer heap. @@ -89,87 +125,86 @@ func goroutineReady(arg interface{}, seq uintptr) { } func addtimer(t *timer) { - lock(&timers.lock) - addtimerLocked(t) - unlock(&timers.lock) + tb := t.assignBucket() + lock(&tb.lock) + tb.addtimerLocked(t) + unlock(&tb.lock) } // Add a timer to the heap and start or kick timerproc if the new timer is // earlier than any of the others. // Timers are locked. -func addtimerLocked(t *timer) { +func (tb *timersBucket) addtimerLocked(t *timer) { // when must never be negative; otherwise timerproc will overflow // during its delta calculation and never expire other runtime timers. if t.when < 0 { t.when = 1<<63 - 1 } - t.i = len(timers.t) - timers.t = append(timers.t, t) - siftupTimer(t.i) + t.i = len(tb.t) + tb.t = append(tb.t, t) + tb.siftupTimer(t.i) if t.i == 0 { // siftup moved to top: new earliest deadline. - if timers.sleeping { - timers.sleeping = false - notewakeup(&timers.waitnote) + if tb.sleeping { + tb.sleeping = false + notewakeup(&tb.waitnote) } - if timers.rescheduling { - timers.rescheduling = false - goready(timers.gp, 0) + if tb.rescheduling { + tb.rescheduling = false + goready(tb.gp, 0) } } - if !timers.created { - timers.created = true - go timerproc() + if !tb.created { + tb.created = true + go timerproc(tb) } } // Delete timer t from the heap. // Do not need to update the timerproc: if it wakes up early, no big deal. func deltimer(t *timer) bool { - // Dereference t so that any panic happens before the lock is held. - // Discard result, because t might be moving in the heap. - _ = t.i + tb := t.tb - lock(&timers.lock) + lock(&tb.lock) // t may not be registered anymore and may have // a bogus i (typically 0, if generated by Go). // Verify it before proceeding. i := t.i - last := len(timers.t) - 1 - if i < 0 || i > last || timers.t[i] != t { - unlock(&timers.lock) + last := len(tb.t) - 1 + if i < 0 || i > last || tb.t[i] != t { + unlock(&tb.lock) return false } if i != last { - timers.t[i] = timers.t[last] - timers.t[i].i = i + tb.t[i] = tb.t[last] + tb.t[i].i = i } - timers.t[last] = nil - timers.t = timers.t[:last] + tb.t[last] = nil + tb.t = tb.t[:last] if i != last { - siftupTimer(i) - siftdownTimer(i) + tb.siftupTimer(i) + tb.siftdownTimer(i) } - unlock(&timers.lock) + unlock(&tb.lock) return true } // Timerproc runs the time-driven events. -// It sleeps until the next event in the timers heap. +// It sleeps until the next event in the tb heap. // If addtimer inserts a new earlier event, it wakes timerproc early. -func timerproc() { - timers.gp = getg() +func timerproc(tb *timersBucket) { + tb.gp = getg() for { - lock(&timers.lock) - timers.sleeping = false + lock(&tb.lock) + tb.sleeping = false now := nanotime() delta := int64(-1) for { - if len(timers.t) == 0 { + if len(tb.t) == 0 { delta = -1 break } - t := timers.t[0] + t := tb.t[0] delta = t.when - now if delta > 0 { break @@ -177,43 +212,43 @@ func timerproc() { if t.period > 0 { // leave in heap but adjust next time to fire t.when += t.period * (1 + -delta/t.period) - siftdownTimer(0) + tb.siftdownTimer(0) } else { // remove from heap - last := len(timers.t) - 1 + last := len(tb.t) - 1 if last > 0 { - timers.t[0] = timers.t[last] - timers.t[0].i = 0 + tb.t[0] = tb.t[last] + tb.t[0].i = 0 } - timers.t[last] = nil - timers.t = timers.t[:last] + tb.t[last] = nil + tb.t = tb.t[:last] if last > 0 { - siftdownTimer(0) + tb.siftdownTimer(0) } t.i = -1 // mark as removed } f := t.f arg := t.arg seq := t.seq - unlock(&timers.lock) + unlock(&tb.lock) if raceenabled { raceacquire(unsafe.Pointer(t)) } f(arg, seq) - lock(&timers.lock) + lock(&tb.lock) } if delta < 0 || faketime > 0 { // No timers left - put goroutine to sleep. - timers.rescheduling = true - goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1) + tb.rescheduling = true + goparkunlock(&tb.lock, "timer goroutine (idle)", traceEvGoBlock, 1) continue } // At least one timer pending. Sleep until then. - timers.sleeping = true - timers.sleepUntil = now + delta - noteclear(&timers.waitnote) - unlock(&timers.lock) - notetsleepg(&timers.waitnote, delta) + tb.sleeping = true + tb.sleepUntil = now + delta + noteclear(&tb.waitnote) + unlock(&tb.lock) + notetsleepg(&tb.waitnote, delta) } } @@ -222,28 +257,68 @@ func timejump() *g { return nil } - lock(&timers.lock) - if !timers.created || len(timers.t) == 0 { - unlock(&timers.lock) + for i := range timers { + lock(&timers[i].lock) + } + gp := timejumpLocked() + for i := range timers { + unlock(&timers[i].lock) + } + + return gp +} + +func timejumpLocked() *g { + // Determine a timer bucket with minimum when. + var minT *timer + for i := range timers { + tb := &timers[i] + if !tb.created || len(tb.t) == 0 { + continue + } + t := tb.t[0] + if minT == nil || t.when < minT.when { + minT = t + } + } + if minT == nil || minT.when <= faketime { return nil } - var gp *g - if faketime < timers.t[0].when { - faketime = timers.t[0].when - if timers.rescheduling { - timers.rescheduling = false - gp = timers.gp - } + faketime = minT.when + tb := minT.tb + if !tb.rescheduling { + return nil } - unlock(&timers.lock) - return gp + tb.rescheduling = false + return tb.gp +} + +func timeSleepUntil() int64 { + next := int64(1<<63 - 1) + + // Determine minimum sleepUntil across all the timer buckets. + // + // The function can not return a precise answer, + // as another timer may pop in as soon as timers have been unlocked. + // So lock the timers one by one instead of all at once. + for i := range timers { + tb := &timers[i] + + lock(&tb.lock) + if tb.sleeping && tb.sleepUntil < next { + next = tb.sleepUntil + } + unlock(&tb.lock) + } + + return next } // Heap maintenance algorithms. -func siftupTimer(i int) { - t := timers.t +func (tb *timersBucket) siftupTimer(i int) { + t := tb.t when := t[i].when tmp := t[i] for i > 0 { @@ -259,8 +334,8 @@ func siftupTimer(i int) { } } -func siftdownTimer(i int) { - t := timers.t +func (tb *timersBucket) siftdownTimer(i int) { + t := tb.t n := len(t) when := t[i].when tmp := t[i] diff --git a/src/runtime/trace.go b/src/runtime/trace.go index a2eb0ba8c39..50e4c73c838 100644 --- a/src/runtime/trace.go +++ b/src/runtime/trace.go @@ -408,9 +408,12 @@ func ReadTrace() []byte { var data []byte data = append(data, traceEvFrequency|0<