runtime: eliminate global span queue [green tea]

This change removes the locked global span queue and replaces the
fixed-size local span queue with a variable-sized local span queue. The
variable-sized local span queue grows as needed to accomodate local
work. With no global span queue either, GC workers balance work amongst
themselves by stealing from each other.

The new variable-sized local span queues are inspired by the P-local
deque underlying sync.Pool. Unlike the sync.Pool deque, however, both
the owning P and stealing Ps take spans from the tail, making this
incarnation a strict queue, not a deque. This is intentional, since we
want a queue-like order to encourage objects to accumulate on each span.

These variable-sized local span queues are crucial to mark termination,
just like the global span queue was. To avoid hitting the ragged barrier
too often, we must check whether any Ps have any spans on their
variable-sized local span queues. We maintain a per-P atomic bitmask
(another pMask) that contains this state. We can also use this to speed
up stealing by skipping Ps that don't have any local spans.

The variable-sized local span queues are slower than the old fixed-size
local span queues because of the additional indirection, so this change
adds a non-atomic local fixed-size queue. This risks getting work stuck
on it, so, similarly to how workbufs work, each worker will occasionally
dump some spans onto its local variable-sized queue. This scales much
more nicely than dumping to a global queue, but is still visible to all
other Ps.

For #73581.

Change-Id: I814f54d9c3cc7fa7896167746e9823f50943ac22
Reviewed-on: https://go-review.googlesource.com/c/go/+/700496
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
Michael Anthony Knyszek 2025-08-15 17:09:05 +00:00 committed by Michael Knyszek
parent 7bc1935db5
commit d7a38adf4c
11 changed files with 570 additions and 557 deletions

View file

@ -1289,30 +1289,6 @@ func MSpanCountAlloc(ms *MSpan, bits []byte) int {
return result
}
type MSpanQueue mSpanQueue
func (q *MSpanQueue) Size() int {
return (*mSpanQueue)(q).n
}
func (q *MSpanQueue) Push(s *MSpan) {
(*mSpanQueue)(q).push((*mspan)(s))
}
func (q *MSpanQueue) Pop() *MSpan {
s := (*mSpanQueue)(q).pop()
return (*MSpan)(s)
}
func (q *MSpanQueue) TakeAll(p *MSpanQueue) {
(*mSpanQueue)(q).takeAll((*mSpanQueue)(p))
}
func (q *MSpanQueue) PopN(n int) MSpanQueue {
p := (*mSpanQueue)(q).popN(n)
return (MSpanQueue)(p)
}
const (
TimeHistSubBucketBits = timeHistSubBucketBits
TimeHistNumSubBuckets = timeHistNumSubBuckets

View file

@ -881,199 +881,6 @@ func TestWeakToStrongMarkTermination(t *testing.T) {
}
}
func TestMSpanQueue(t *testing.T) {
expectSize := func(t *testing.T, q *runtime.MSpanQueue, want int) {
t.Helper()
if got := q.Size(); got != want {
t.Errorf("expected size %d, got %d", want, got)
}
}
expectMSpan := func(t *testing.T, got, want *runtime.MSpan, op string) {
t.Helper()
if got != want {
t.Errorf("expected mspan %p from %s, got %p", want, op, got)
}
}
makeSpans := func(t *testing.T, n int) ([]*runtime.MSpan, func()) {
t.Helper()
spans := make([]*runtime.MSpan, 0, n)
for range cap(spans) {
spans = append(spans, runtime.AllocMSpan())
}
return spans, func() {
for i, s := range spans {
runtime.FreeMSpan(s)
spans[i] = nil
}
}
}
t.Run("Empty", func(t *testing.T) {
var q runtime.MSpanQueue
expectSize(t, &q, 0)
expectMSpan(t, q.Pop(), nil, "pop")
})
t.Run("PushPop", func(t *testing.T) {
s := runtime.AllocMSpan()
defer runtime.FreeMSpan(s)
var q runtime.MSpanQueue
q.Push(s)
expectSize(t, &q, 1)
expectMSpan(t, q.Pop(), s, "pop")
expectMSpan(t, q.Pop(), nil, "pop")
})
t.Run("PushPopPushPop", func(t *testing.T) {
s0 := runtime.AllocMSpan()
defer runtime.FreeMSpan(s0)
s1 := runtime.AllocMSpan()
defer runtime.FreeMSpan(s1)
var q runtime.MSpanQueue
// Push and pop s0.
q.Push(s0)
expectSize(t, &q, 1)
expectMSpan(t, q.Pop(), s0, "pop")
expectMSpan(t, q.Pop(), nil, "pop")
// Push and pop s1.
q.Push(s1)
expectSize(t, &q, 1)
expectMSpan(t, q.Pop(), s1, "pop")
expectMSpan(t, q.Pop(), nil, "pop")
})
t.Run("PushPushPopPop", func(t *testing.T) {
s0 := runtime.AllocMSpan()
defer runtime.FreeMSpan(s0)
s1 := runtime.AllocMSpan()
defer runtime.FreeMSpan(s1)
var q runtime.MSpanQueue
q.Push(s0)
expectSize(t, &q, 1)
q.Push(s1)
expectSize(t, &q, 2)
expectMSpan(t, q.Pop(), s0, "pop")
expectMSpan(t, q.Pop(), s1, "pop")
expectMSpan(t, q.Pop(), nil, "pop")
})
t.Run("EmptyTakeAll", func(t *testing.T) {
var q runtime.MSpanQueue
var p runtime.MSpanQueue
expectSize(t, &p, 0)
expectSize(t, &q, 0)
p.TakeAll(&q)
expectSize(t, &p, 0)
expectSize(t, &q, 0)
expectMSpan(t, q.Pop(), nil, "pop")
expectMSpan(t, p.Pop(), nil, "pop")
})
t.Run("Push4TakeAll", func(t *testing.T) {
spans, free := makeSpans(t, 4)
defer free()
var q runtime.MSpanQueue
for i, s := range spans {
expectSize(t, &q, i)
q.Push(s)
expectSize(t, &q, i+1)
}
var p runtime.MSpanQueue
p.TakeAll(&q)
expectSize(t, &p, 4)
for i := range p.Size() {
expectMSpan(t, p.Pop(), spans[i], "pop")
}
expectSize(t, &p, 0)
expectMSpan(t, q.Pop(), nil, "pop")
expectMSpan(t, p.Pop(), nil, "pop")
})
t.Run("Push4Pop3", func(t *testing.T) {
spans, free := makeSpans(t, 4)
defer free()
var q runtime.MSpanQueue
for i, s := range spans {
expectSize(t, &q, i)
q.Push(s)
expectSize(t, &q, i+1)
}
p := q.PopN(3)
expectSize(t, &p, 3)
expectSize(t, &q, 1)
for i := range p.Size() {
expectMSpan(t, p.Pop(), spans[i], "pop")
}
expectMSpan(t, q.Pop(), spans[len(spans)-1], "pop")
expectSize(t, &p, 0)
expectSize(t, &q, 0)
expectMSpan(t, q.Pop(), nil, "pop")
expectMSpan(t, p.Pop(), nil, "pop")
})
t.Run("Push4Pop0", func(t *testing.T) {
spans, free := makeSpans(t, 4)
defer free()
var q runtime.MSpanQueue
for i, s := range spans {
expectSize(t, &q, i)
q.Push(s)
expectSize(t, &q, i+1)
}
p := q.PopN(0)
expectSize(t, &p, 0)
expectSize(t, &q, 4)
for i := range q.Size() {
expectMSpan(t, q.Pop(), spans[i], "pop")
}
expectSize(t, &p, 0)
expectSize(t, &q, 0)
expectMSpan(t, q.Pop(), nil, "pop")
expectMSpan(t, p.Pop(), nil, "pop")
})
t.Run("Push4Pop4", func(t *testing.T) {
spans, free := makeSpans(t, 4)
defer free()
var q runtime.MSpanQueue
for i, s := range spans {
expectSize(t, &q, i)
q.Push(s)
expectSize(t, &q, i+1)
}
p := q.PopN(4)
expectSize(t, &p, 4)
expectSize(t, &q, 0)
for i := range p.Size() {
expectMSpan(t, p.Pop(), spans[i], "pop")
}
expectSize(t, &p, 0)
expectMSpan(t, q.Pop(), nil, "pop")
expectMSpan(t, p.Pop(), nil, "pop")
})
t.Run("Push4Pop5", func(t *testing.T) {
spans, free := makeSpans(t, 4)
defer free()
var q runtime.MSpanQueue
for i, s := range spans {
expectSize(t, &q, i)
q.Push(s)
expectSize(t, &q, i+1)
}
p := q.PopN(5)
expectSize(t, &p, 4)
expectSize(t, &q, 0)
for i := range p.Size() {
expectMSpan(t, p.Pop(), spans[i], "pop")
}
expectSize(t, &p, 0)
expectMSpan(t, q.Pop(), nil, "pop")
expectMSpan(t, p.Pop(), nil, "pop")
})
}
func TestDetectFinalizerAndCleanupLeaks(t *testing.T) {
got := runTestProg(t, "testprog", "DetectFinalizerAndCleanupLeaks", "GODEBUG=checkfinalizers=1")
sp := strings.SplitN(got, "detected possible issues with cleanups and/or finalizers", 2)

View file

@ -326,7 +326,7 @@ type workType struct {
full lfstack // lock-free list of full blocks workbuf
_ cpu.CacheLinePad // prevents false-sharing between full and empty
empty lfstack // lock-free list of empty blocks workbuf
_ cpu.CacheLinePad // prevents false-sharing between empty and nproc/nwait
_ cpu.CacheLinePad // prevents false-sharing between empty and wbufSpans
wbufSpans struct {
lock mutex
@ -337,12 +337,24 @@ type workType struct {
// one of the workbuf lists.
busy mSpanList
}
_ cpu.CacheLinePad // prevents false-sharing between wbufSpans and spanq
_ cpu.CacheLinePad // prevents false-sharing between wbufSpans and spanWorkMask
// Global queue of spans to scan.
// spanqMask is a bitmap indicating which Ps have local work worth stealing.
// Set or cleared by the owning P, cleared by stealing Ps.
//
// spanqMask is like a proxy for a global queue. An important invariant is that
// forced flushing like gcw.dispose must set this bit on any P that has local
// span work.
spanqMask pMask
_ cpu.CacheLinePad // prevents false-sharing between spanqMask and everything else
// List of all spanSPMCs.
//
// Only used if goexperiment.GreenTeaGC.
spanq spanQueue
spanSPMCs struct {
lock mutex // no lock rank because it's a leaf lock (see mklockrank.go).
all *spanSPMC
}
// Restore 64-bit alignment on 32-bit.
// _ uint32
@ -711,8 +723,9 @@ func gcStart(trigger gcTrigger) {
traceRelease(trace)
}
// Check that all Ps have finished deferred mcache flushes.
// Check and setup per-P state.
for _, p := range allp {
// Check that all Ps have finished deferred mcache flushes.
if fg := p.mcache.flushGen.Load(); fg != mheap_.sweepgen {
println("runtime: p", p.id, "flushGen", fg, "!= sweepgen", mheap_.sweepgen)
throw("p mcache not flushed")
@ -923,6 +936,7 @@ top:
// TODO(austin): Break up these workbufs to
// better distribute work.
pp.gcw.dispose()
// Collect the flushedWork flag.
if pp.gcw.flushedWork {
atomic.Xadd(&gcMarkDoneFlushed, 1)
@ -1623,17 +1637,6 @@ func gcEndWork() (last bool) {
return incnwait == work.nproc && !gcMarkWorkAvailable()
}
// gcMarkWorkAvailable reports whether there's any non-local work available to do.
func gcMarkWorkAvailable() bool {
if !work.full.empty() || !work.spanq.empty() {
return true // global work available
}
if work.markrootNext < work.markrootJobs {
return true // root scan work available
}
return false
}
// gcMark runs the mark (or, for concurrent GC, mark termination)
// All gcWork caches must be empty.
// STW is in effect at this point.
@ -1644,8 +1647,8 @@ func gcMark(startTime int64) {
work.tstart = startTime
// Check that there's no marking work remaining.
if work.full != 0 || work.markrootNext < work.markrootJobs || !work.spanq.empty() {
print("runtime: full=", hex(work.full), " next=", work.markrootNext, " jobs=", work.markrootJobs, " nDataRoots=", work.nDataRoots, " nBSSRoots=", work.nBSSRoots, " nSpanRoots=", work.nSpanRoots, " nStackRoots=", work.nStackRoots, " spanq.n=", work.spanq.size(), "\n")
if work.full != 0 || work.markrootNext < work.markrootJobs {
print("runtime: full=", hex(work.full), " next=", work.markrootNext, " jobs=", work.markrootJobs, " nDataRoots=", work.nDataRoots, " nBSSRoots=", work.nBSSRoots, " nSpanRoots=", work.nSpanRoots, " nStackRoots=", work.nStackRoots, "\n")
panic("non-empty mark queue after concurrent mark")
}
@ -1761,10 +1764,12 @@ func gcSweep(mode gcMode) bool {
// Sweep all spans eagerly.
for sweepone() != ^uintptr(0) {
}
// Free workbufs eagerly.
// Free workbufs and span rings eagerly.
prepareFreeWorkbufs()
for freeSomeWbufs(false) {
}
for freeSomeSpanSPMCs(false) {
}
// All "free" events for this mark/sweep cycle have
// now happened, so we can make this profile cycle
// available immediately.

View file

@ -666,6 +666,7 @@ func gcAssistAlloc1(gp *g, scanWork int64) {
gp.gcAssistBytes = 0
return
}
// Track time spent in this assist. Since we're on the
// system stack, this is non-preemptible, so we can
// just measure start and end time.
@ -1231,14 +1232,18 @@ func gcDrain(gcw *gcWork, flags gcDrainFlags) {
var b uintptr
var s objptr
if b = gcw.tryGetObjFast(); b == 0 {
if s = gcw.tryGetSpan(false); s == 0 {
if s = gcw.tryGetSpanFast(); s == 0 {
if b = gcw.tryGetObj(); b == 0 {
if s = gcw.tryGetSpan(); s == 0 {
// Flush the write barrier
// buffer; this may create
// more work.
wbBufFlush()
if b = gcw.tryGetObj(); b == 0 {
s = gcw.tryGetSpan(true)
if s = gcw.tryGetSpan(); s == 0 {
s = gcw.tryStealSpan()
}
}
}
}
}
@ -1327,13 +1332,15 @@ func gcDrainN(gcw *gcWork, scanWork int64) int64 {
var b uintptr
var s objptr
if b = gcw.tryGetObjFast(); b == 0 {
if s = gcw.tryGetSpan(false); s == 0 {
if s = gcw.tryGetSpanFast(); s == 0 {
if b = gcw.tryGetObj(); b == 0 {
if s = gcw.tryGetSpan(); s == 0 {
// Flush the write barrier
// buffer; this may create
// more work.
wbBufFlush()
if b = gcw.tryGetObj(); b == 0 {
if s = gcw.tryGetSpan(); s == 0 {
// Try to do a root job.
if work.markrootNext < work.markrootJobs {
job := atomic.Xadd(&work.markrootNext, +1) - 1
@ -1342,7 +1349,9 @@ func gcDrainN(gcw *gcWork, scanWork int64) int64 {
continue
}
}
s = gcw.tryGetSpan(true)
s = gcw.tryStealSpan()
}
}
}
}
}

View file

@ -37,7 +37,6 @@
package runtime
import (
"internal/cpu"
"internal/goarch"
"internal/runtime/atomic"
"internal/runtime/gc"
@ -300,6 +299,12 @@ func tryDeferToSpanScan(p uintptr, gcw *gcWork) bool {
if q.tryAcquire() {
if gcw.spanq.put(makeObjPtr(base, objIndex)) {
if gcphase == _GCmark {
// This is intentionally racy; the bit set here might get
// stomped on by a stealing P. See the comment in tryStealSpan
// for an explanation as to why this is OK.
if !work.spanqMask.read(uint32(gcw.id)) {
work.spanqMask.set(gcw.id)
}
gcw.mayNeedWorker = true
}
gcw.flushedWork = true
@ -308,260 +313,487 @@ func tryDeferToSpanScan(p uintptr, gcw *gcWork) bool {
return true
}
// tryGetSpanFast attempts to get an entire span to scan.
func (w *gcWork) tryGetSpanFast() objptr {
return w.spanq.tryGetFast()
}
// tryGetSpan attempts to get an entire span to scan.
func (w *gcWork) tryGetSpan(slow bool) objptr {
if s := w.spanq.get(); s != 0 {
func (w *gcWork) tryGetSpan() objptr {
if s := w.spanq.tryGetFast(); s != 0 {
return s
}
if slow {
// Check the global span queue.
if s := work.spanq.get(w); s != 0 {
// "Steal" from ourselves.
if s := w.spanq.steal(&w.spanq); s != 0 {
return s
}
// Attempt to steal spans to scan from other Ps.
return spanQueueSteal(w)
// We failed to get any local work, so we're fresh out.
// Nobody else is going to add work for us. Clear our bit.
if work.spanqMask.read(uint32(w.id)) {
work.spanqMask.clear(w.id)
}
return 0
}
// spanQueue is a concurrent safe queue of mspans. Each mspan is represented
// as an objptr whose spanBase is the base address of the span.
// spanQueue is a P-local stealable span queue.
type spanQueue struct {
avail atomic.Bool // optimization to check emptiness w/o the lock
_ cpu.CacheLinePad // prevents false-sharing between lock and avail
lock mutex
q mSpanQueue
}
func (q *spanQueue) empty() bool {
return !q.avail.Load()
}
func (q *spanQueue) size() int {
return q.q.n
}
// putBatch adds a whole batch of spans to the queue.
func (q *spanQueue) putBatch(batch []objptr) {
var list mSpanQueue
for _, p := range batch {
s := spanOfUnchecked(p.spanBase())
s.scanIdx = p.objIndex()
list.push(s)
}
lock(&q.lock)
if q.q.n == 0 {
q.avail.Store(true)
}
q.q.takeAll(&list)
unlock(&q.lock)
}
// get tries to take a span off the queue.
//
// Returns a non-zero objptr on success. Also, moves additional
// spans to gcw's local span queue.
func (q *spanQueue) get(gcw *gcWork) objptr {
if q.empty() {
return 0
}
lock(&q.lock)
if q.q.n == 0 {
unlock(&q.lock)
return 0
}
n := q.q.n/int(gomaxprocs) + 1
if n > q.q.n {
n = q.q.n
}
if max := len(gcw.spanq.ring) / 2; n > max {
n = max
}
newQ := q.q.popN(n)
if q.q.n == 0 {
q.avail.Store(false)
}
unlock(&q.lock)
s := newQ.pop()
for newQ.n > 0 {
s := newQ.pop()
gcw.spanq.put(makeObjPtr(s.base(), s.scanIdx))
}
return makeObjPtr(s.base(), s.scanIdx)
}
// localSpanQueue is a P-local ring buffer of objptrs that represent spans.
// Accessed without a lock.
//
// Multi-consumer, single-producer. The only producer is the P that owns this
// queue, but any other P may consume from it.
//
// This is based on the scheduler runqueues. If making changes there, consider
// also making them here.
type localSpanQueue struct {
head atomic.Uint32
tail atomic.Uint32
// head, tail, and ring represent a local non-thread-safe ring buffer.
head, tail uint32
ring [256]objptr
// putsSinceDrain counts the number of put calls since the last drain.
putsSinceDrain int
// chain contains state visible to other Ps.
//
// In particular, that means a linked chain of single-producer multi-consumer
// ring buffers where the single producer is this P only.
//
// This linked chain structure is based off the sync.Pool dequeue.
chain struct {
// head is the spanSPMC to put to. This is only accessed
// by the producer, so doesn't need to be synchronized.
head *spanSPMC
// tail is the spanSPMC to steal from. This is accessed
// by consumers, so reads and writes must be atomic.
tail atomic.UnsafePointer // *spanSPMC
}
}
// put adds s to the queue. Returns true if put flushed to the global queue
// because it was full.
func (q *localSpanQueue) put(s objptr) (flushed bool) {
for {
h := q.head.Load() // synchronize with consumers
t := q.tail.Load()
if t-h < uint32(len(q.ring)) {
q.ring[t%uint32(len(q.ring))] = s
q.tail.Store(t + 1) // Makes the item avail for consumption.
// putFast tries to put s onto the queue, but may fail if it's full.
func (q *spanQueue) putFast(s objptr) (ok bool) {
if q.tail-q.head == uint32(len(q.ring)) {
return false
}
if q.putSlow(s, h, t) {
q.ring[q.tail%uint32(len(q.ring))] = s
q.tail++
return true
}
// The queue is not full, now the put above must succeed.
// put puts s onto the queue.
//
// Returns whether the caller should spin up a new worker.
func (q *spanQueue) put(s objptr) bool {
// The constants below define the period of and volume of
// spans we spill to the spmc chain when the local queue is
// not full.
//
// spillPeriod must be > spillMax, otherwise that sets the
// effective maximum size of our local span queue. Even if
// we have a span ring of size N, but we flush K spans every
// K puts, then K becomes our effective maximum length. When
// spillPeriod > spillMax, then we're always spilling spans
// at a slower rate than we're accumulating them.
const (
// spillPeriod defines how often to check if we should
// spill some spans, counted in the number of calls to put.
spillPeriod = 64
// spillMax defines, at most, how many spans to drain with
// each spill.
spillMax = 16
)
if q.putFast(s) {
// Occasionally try to spill some work to generate parallelism.
q.putsSinceDrain++
if q.putsSinceDrain >= spillPeriod {
// Reset even if we don't drain, so we don't check every time.
q.putsSinceDrain = 0
// Try to drain some spans. Don't bother if there's very
// few of them or there's already spans in the spmc chain.
n := min((q.tail-q.head)/2, spillMax)
if n > 4 && q.chainEmpty() {
q.drain(n)
return true
}
}
return false
}
// putSlow is a helper for put to move spans to the global queue.
// Returns true on success, false on failure (nothing moved).
func (q *localSpanQueue) putSlow(s objptr, h, t uint32) bool {
var batch [len(q.ring)/2 + 1]objptr
// First, grab a batch from local queue.
n := t - h
n = n / 2
if n != uint32(len(q.ring)/2) {
throw("localSpanQueue.putSlow: queue is not full")
// We're out of space. Drain out our local spans.
q.drain(uint32(len(q.ring)) / 2)
if !q.putFast(s) {
throw("failed putFast after drain")
}
return true
}
// flush publishes all spans in the local queue to the spmc chain.
func (q *spanQueue) flush() {
n := q.tail - q.head
if n == 0 {
return
}
q.drain(n)
}
// empty returns true if there's no more work on the queue.
//
// Not thread-safe. Must only be called by the owner of q.
func (q *spanQueue) empty() bool {
// Check the local queue for work.
if q.tail-q.head > 0 {
return false
}
return q.chainEmpty()
}
// chainEmpty returns true if the spmc chain is empty.
//
// Thread-safe.
func (q *spanQueue) chainEmpty() bool {
// Check the rest of the rings for work.
r := (*spanSPMC)(q.chain.tail.Load())
for r != nil {
if !r.empty() {
return false
}
r = (*spanSPMC)(r.prev.Load())
}
return true
}
// drain publishes n spans from the local queue to the spmc chain.
func (q *spanQueue) drain(n uint32) {
q.putsSinceDrain = 0
if q.chain.head == nil {
// N.B. We target 1024, but this may be bigger if the physical
// page size is bigger, or if we can fit more uintptrs into a
// physical page. See newSpanSPMC docs.
r := newSpanSPMC(1024)
q.chain.head = r
q.chain.tail.StoreNoWB(unsafe.Pointer(r))
}
// Try to drain some of the queue to the head spmc.
if q.tryDrain(q.chain.head, n) {
return
}
// No space. Create a bigger spmc and add it to the chain.
// Double the size of the next one, up to a maximum.
//
// We double each time so we can avoid taking this slow path
// in the future, which involves a global lock. Ideally we want
// to hit a steady-state where the deepest any queue goes during
// a mark phase can fit in the ring.
//
// However, we still set a maximum on this. We set the maximum
// to something large to amortize the cost of lock acquisition, but
// still at a reasonable size for big heaps and/or a lot of Ps (which
// tend to be correlated).
//
// It's not too bad to burn relatively large-but-fixed amounts of per-P
// memory if we need to deal with really, really deep queues, since the
// constants of proportionality are small. Simultaneously, we want to
// avoid a situation where a single worker ends up queuing O(heap)
// work and then forever retains a queue of that size.
const maxCap = 1 << 20 / goarch.PtrSize
newCap := q.chain.head.cap * 2
if newCap > maxCap {
newCap = maxCap
}
newHead := newSpanSPMC(newCap)
if !q.tryDrain(newHead, n) {
throw("failed to put span on newly-allocated spanSPMC")
}
q.chain.head.prev.StoreNoWB(unsafe.Pointer(newHead))
q.chain.head = newHead
}
// tryDrain attempts to drain n spans from q's local queue to the chain.
//
// Returns whether it succeeded.
func (q *spanQueue) tryDrain(r *spanSPMC, n uint32) bool {
if q.head+n > q.tail {
throw("attempt to drain too many elements")
}
h := r.head.Load() // synchronize with consumers
t := r.tail.Load()
rn := t - h
if rn+n <= r.cap {
for i := uint32(0); i < n; i++ {
batch[i] = q.ring[(h+i)%uint32(len(q.ring))]
*r.slot(t + i) = q.ring[(q.head+i)%uint32(len(q.ring))]
}
if !q.head.CompareAndSwap(h, h+n) { // Commits consume.
return false
}
batch[n] = s
work.spanq.putBatch(batch[:])
r.tail.Store(t + n) // Makes the items avail for consumption.
q.head += n
return true
}
return false
}
// get attempts to take a span off the queue. Might fail if the
// queue is empty. May be called by multiple threads, but callers
// are better off using stealFrom to amortize the cost of stealing.
// This method is intended for use by the owner of this queue.
func (q *localSpanQueue) get() objptr {
for {
h := q.head.Load()
t := q.tail.Load()
if t == h {
// tryGetFast attempts to get a span from the local queue, but may fail if it's empty,
// returning false.
func (q *spanQueue) tryGetFast() objptr {
if q.tail-q.head == 0 {
return 0
}
s := q.ring[h%uint32(len(q.ring))]
if q.head.CompareAndSwap(h, h+1) {
s := q.ring[q.head%uint32(len(q.ring))]
q.head++
return s
}
// steal takes some spans from the ring chain of another span queue.
//
// q == q2 is OK.
func (q *spanQueue) steal(q2 *spanQueue) objptr {
r := (*spanSPMC)(q2.chain.tail.Load())
if r == nil {
return 0
}
for {
// It's important that we load the next pointer
// *before* popping the tail. In general, r may be
// transiently empty, but if next is non-nil before
// the pop and the pop fails, then r is permanently
// empty, which is the only condition under which it's
// safe to drop r from the chain.
r2 := (*spanSPMC)(r.prev.Load())
// Try to refill from one of the rings
if s := q.refill(r); s != 0 {
return s
}
if r2 == nil {
// This is the only ring. It's empty right
// now, but could be pushed to in the future.
return 0
}
// The tail of the chain has been drained, so move on
// to the next ring. Try to drop it from the chain
// so the next consumer doesn't have to look at the empty
// ring again.
if q2.chain.tail.CompareAndSwapNoWB(unsafe.Pointer(r), unsafe.Pointer(r2)) {
r.dead.Store(true)
}
r = r2
}
}
func (q *localSpanQueue) empty() bool {
h := q.head.Load()
t := q.tail.Load()
return t == h
// refill takes some spans from r and puts them into q's local queue.
//
// One span is removed from the stolen spans and returned on success.
// Failure to steal returns a zero objptr.
//
// steal is thread-safe with respect to r.
func (q *spanQueue) refill(r *spanSPMC) objptr {
if q.tail-q.head != 0 {
throw("steal with local work available")
}
// stealFrom takes spans from q2 and puts them into q1. One span is removed
// from the stolen spans and returned on success. Failure to steal returns a
// zero objptr.
func (q1 *localSpanQueue) stealFrom(q2 *localSpanQueue) objptr {
writeHead := q1.tail.Load()
// Steal some spans.
var n uint32
for {
h := q2.head.Load() // load-acquire, synchronize with other consumers
t := q2.tail.Load() // load-acquire, synchronize with the producer
h := r.head.Load() // load-acquire, synchronize with other consumers
t := r.tail.Load() // load-acquire, synchronize with the producer
n = t - h
n = n - n/2
if n == 0 {
return 0
}
if n > uint32(len(q2.ring)/2) { // read inconsistent h and t
if n > r.cap { // read inconsistent h and t
continue
}
n = min(n, uint32(len(q.ring)/2))
for i := uint32(0); i < n; i++ {
c := q2.ring[(h+i)%uint32(len(q2.ring))]
q1.ring[(writeHead+i)%uint32(len(q1.ring))] = c
q.ring[i] = *r.slot(h + i)
}
if q2.head.CompareAndSwap(h, h+n) {
if r.head.CompareAndSwap(h, h+n) {
break
}
}
n--
c := q1.ring[(writeHead+n)%uint32(len(q1.ring))]
if n == 0 {
return c
}
h := q1.head.Load()
if writeHead-h+n >= uint32(len(q1.ring)) {
throw("localSpanQueue.stealFrom: queue overflow")
}
q1.tail.Store(writeHead + n)
return c
// Update local queue head and tail to reflect new buffered values.
q.head = 0
q.tail = n
// Pop off the head of the queue and return it.
return q.tryGetFast()
}
// drain moves all spans in the queue to the global queue.
// spanSPMC is a ring buffer of objptrs that represent spans.
// Accessed without a lock.
//
// Returns true if anything was moved.
func (q *localSpanQueue) drain() bool {
var batch [len(q.ring)]objptr
// Single-producer, multi-consumer. The only producer is the P that owns this
// queue, but any other P may consume from it.
//
// ## Invariants for memory management
//
// 1. All spanSPMCs are allocated from mheap_.spanSPMCAlloc.
// 2. All allocated spanSPMCs must be on the work.spanSPMCs list.
// 3. spanSPMCs may only be allocated if gcphase != _GCoff.
// 4. spanSPMCs may only be deallocated if gcphase == _GCoff.
//
// Invariants (3) and (4) ensure that we do not need to concern ourselves with
// tricky reuse issues that stem from not knowing when a thread is truly done
// with a spanSPMC. For example, two threads could load the same spanSPMC from
// the tail of the chain. One thread is then paused while the other steals the
// last few elements off of it. It's not safe to free at that point since the
// other thread will still inspect that spanSPMC, and we have no way of knowing
// without more complex and/or heavyweight synchronization.
//
// Instead, we rely on the global synchronization inherent to GC phases, and
// the fact that spanSPMCs are only ever used during the mark phase, to ensure
// memory safety. This means we temporarily waste some memory, but it's only
// until the end of the mark phase.
type spanSPMC struct {
_ sys.NotInHeap
var n uint32
for {
var h uint32
for {
h = q.head.Load()
t := q.tail.Load()
n = t - h
if n == 0 {
// allnext is the link to the next spanSPMC on the work.spanSPMCs list.
// This is used to find and free dead spanSPMCs. Protected by
// work.spanSPMCs.lock.
allnext *spanSPMC
// dead indicates whether the spanSPMC is no longer in use.
// Protected by the CAS to the prev field of the spanSPMC pointing
// to this spanSPMC. That is, whoever wins that CAS takes ownership
// of marking this spanSPMC as dead. See spanQueue.steal for details.
dead atomic.Bool
// prev is the next link up a spanQueue's SPMC chain, from tail to head,
// hence the name "prev." Set by a spanQueue's producer, cleared by a
// CAS in spanQueue.steal.
prev atomic.UnsafePointer // *spanSPMC
// head, tail, cap, and ring together represent a fixed-size SPMC lock-free
// ring buffer of size cap. The ring buffer contains objptr values.
head atomic.Uint32
tail atomic.Uint32
cap uint32 // cap(ring))
ring *objptr
}
// newSpanSPMC allocates and initializes a new spmc with the provided capacity.
//
// newSpanSPMC may override the capacity with a larger one if the provided one would
// waste memory.
func newSpanSPMC(cap uint32) *spanSPMC {
lock(&work.spanSPMCs.lock)
r := (*spanSPMC)(mheap_.spanSPMCAlloc.alloc())
r.allnext = work.spanSPMCs.all
work.spanSPMCs.all = r
unlock(&work.spanSPMCs.lock)
// If cap < the capacity of a single physical page, round up.
pageCap := uint32(physPageSize / goarch.PtrSize) // capacity of a single page
if cap < pageCap {
cap = pageCap
}
if cap&(cap-1) != 0 {
throw("spmc capacity must be a power of 2")
}
r.cap = cap
ring := sysAlloc(uintptr(cap)*unsafe.Sizeof(objptr(0)), &memstats.gcMiscSys, "GC span queue")
atomic.StorepNoWB(unsafe.Pointer(&r.ring), ring)
return r
}
// empty returns true if the spmc is empty.
//
// empty is thread-safe.
func (r *spanSPMC) empty() bool {
h := r.head.Load()
t := r.tail.Load()
return t == h
}
// deinit frees any resources the spanSPMC is holding onto and zeroes it.
func (r *spanSPMC) deinit() {
sysFree(unsafe.Pointer(r.ring), uintptr(r.cap)*unsafe.Sizeof(objptr(0)), &memstats.gcMiscSys)
r.ring = nil
r.dead.Store(false)
r.prev.StoreNoWB(nil)
r.head.Store(0)
r.tail.Store(0)
r.cap = 0
}
// slot returns a pointer to slot i%r.cap.
func (r *spanSPMC) slot(i uint32) *objptr {
idx := uintptr(i & (r.cap - 1))
return (*objptr)(unsafe.Add(unsafe.Pointer(r.ring), idx*unsafe.Sizeof(objptr(0))))
}
// freeSomeSpanSPMCs frees some spanSPMCs back to the OS and returns
// true if it should be called again to free more.
func freeSomeSpanSPMCs(preemptible bool) bool {
// TODO(mknyszek): This is arbitrary, but some kind of limit is necessary
// to help bound delays to cooperatively preempt ourselves.
const batchSize = 64
// According to the SPMC memory management invariants, we can only free
// spanSPMCs outside of the mark phase. We ensure we do this in two ways.
//
// 1. We take the work.spanSPMCs lock, which we need anyway. This ensures
// that we are non-preemptible. If this path becomes lock-free, we will
// need to become non-preemptible in some other way.
// 2. Once we are non-preemptible, we check the gcphase, and back out if
// it's not safe.
//
// This way, we ensure that we don't start freeing if we're in the wrong
// phase, and the phase can't change on us while we're freeing.
lock(&work.spanSPMCs.lock)
if gcphase != _GCoff || work.spanSPMCs.all == nil {
unlock(&work.spanSPMCs.lock)
return false
}
if n <= uint32(len(q.ring)) {
rp := &work.spanSPMCs.all
gp := getg()
more := true
for i := 0; i < batchSize && !(preemptible && gp.preempt); i++ {
r := *rp
if r == nil {
more = false
break
}
// Read inconsistent h and t.
}
for i := uint32(0); i < n; i++ {
batch[i] = q.ring[(h+i)%uint32(len(q.ring))]
}
if q.head.CompareAndSwap(h, h+n) { // Commits consume.
break
if r.dead.Load() {
// It's dead. Deinitialize and free it.
*rp = r.allnext
r.deinit()
mheap_.spanSPMCAlloc.free(unsafe.Pointer(r))
} else {
// Still alive, likely in some P's chain.
// Skip it.
rp = &r.allnext
}
}
if !q.empty() {
throw("drained local span queue, but not empty")
unlock(&work.spanSPMCs.lock)
return more
}
work.spanq.putBatch(batch[:n])
return true
}
// spanQueueSteal attempts to steal a span from another P's local queue.
// tryStealSpan attempts to steal a span from another P's local queue.
//
// Returns a non-zero objptr on success.
func spanQueueSteal(gcw *gcWork) objptr {
func (w *gcWork) tryStealSpan() objptr {
pp := getg().m.p.ptr()
for enum := stealOrder.start(cheaprand()); !enum.done(); enum.next() {
if !work.spanqMask.read(enum.position()) {
continue
}
p2 := allp[enum.position()]
if pp == p2 {
continue
}
if s := gcw.spanq.stealFrom(&p2.gcw.spanq); s != 0 {
if s := w.spanq.steal(&p2.gcw.spanq); s != 0 {
return s
}
// N.B. This is intentionally racy. We may stomp on a mask set by
// a P that just put a bunch of work into its local queue.
//
// This is OK because the ragged barrier in gcMarkDone will set
// the bit on each P if there's local work we missed. This race
// should generally be rare, since the window between noticing
// an empty local queue and this bit being set is quite small.
work.spanqMask.clear(int32(enum.position()))
}
return 0
}
@ -903,6 +1135,23 @@ func (w *gcWork) flushScanStats(dst *[gc.NumSizeClasses]sizeClassScanStats) {
clear(w.stats[:])
}
// gcMarkWorkAvailable reports whether there's any non-local work available to do.
//
// This is a heavyweight check and must only be used for correctness, not
// as a hint.
func gcMarkWorkAvailable() bool {
if !work.full.empty() {
return true // global work available
}
if work.markrootNext < work.markrootJobs {
return true // root scan work available
}
if work.spanqMask.any() {
return true // stealable local work available
}
return false
}
// scanObject scans the object starting at b, adding pointers to gcw.
// b must point to the beginning of a heap object or an oblet.
// scanObject consults the GC bitmap for the pointer mask and the

View file

@ -54,31 +54,34 @@ func (q *spanInlineMarkBits) tryAcquire() bool {
}
type spanQueue struct {
_ uint32 // To match alignment padding requirements for atomically-accessed variables in workType.
}
func (q *spanQueue) flush() {
}
func (q *spanQueue) empty() bool {
return true
}
func (q *spanQueue) size() int {
return 0
type spanSPMC struct {
_ sys.NotInHeap
}
type localSpanQueue struct {
}
func (q *localSpanQueue) drain() bool {
func freeSomeSpanSPMCs(preemptible bool) bool {
return false
}
func (q *localSpanQueue) empty() bool {
return true
}
type objptr uintptr
func (w *gcWork) tryGetSpan(steal bool) objptr {
func (w *gcWork) tryGetSpanFast() objptr {
return 0
}
func (w *gcWork) tryGetSpan() objptr {
return 0
}
func (w *gcWork) tryStealSpan() objptr {
return 0
}
@ -116,6 +119,17 @@ func (w *gcWork) flushScanStats(dst *[gc.NumSizeClasses]sizeClassScanStats) {
clear(w.stats[:])
}
// gcMarkWorkAvailable reports whether there's any non-local work available to do.
func gcMarkWorkAvailable() bool {
if !work.full.empty() {
return true // global work available
}
if work.markrootNext < work.markrootJobs {
return true // root scan work available
}
return false
}
// scanObject scans the object starting at b, adding pointers to gcw.
// b must point to the beginning of a heap object or an oblet.
// scanObject consults the GC bitmap for the pointer mask and the

View file

@ -714,7 +714,7 @@ func (c *gcControllerState) enlistWorker() {
// (the scheduler will already prefer to spin up a new
// dedicated worker over an idle one).
if sched.npidle.Load() != 0 && sched.nmspinning.Load() == 0 {
wakep()
wakep() // Likely to consume our worker request.
return
}
}

View file

@ -307,6 +307,10 @@ func bgsweep(c chan int) {
// N.B. freeSomeWbufs is already batched internally.
goschedIfBusy()
}
for freeSomeSpanSPMCs(true) {
// N.B. freeSomeSpanSPMCs is already batched internally.
goschedIfBusy()
}
lock(&sweep.lock)
if !isSweepDone() {
// This can happen if a GC runs between

View file

@ -55,9 +55,10 @@ func init() {
// | Priority | Work queue | Restrictions | Function |
// |----------------------------------------------------------|
// | 1 | Workbufs | P-local | tryGetObjFast |
// | 2 | Span queue | P-local | tryGetSpan(false) | [greenteagc]
// | 2 | Span queue | P-local | tryGetSpanFast | [greenteagc]
// | 3 | Workbufs | None | tryGetObj |
// | 4 | Span queue | None | tryGetSpan(true) | [greenteagc]
// | 4 | Span queue | None | tryGetSpan | [greenteagc]
// | 5 | Span queue | None | tryStealSpan | [greenteagc]
// +----------------------------------------------------------+
//
// The rationale behind this ordering comes from two insights:
@ -80,6 +81,8 @@ func init() {
// gcWork may locally hold GC work buffers. This can be done by
// disabling preemption (systemstack or acquirem).
type gcWork struct {
id int32 // same ID as the parent P
// wbuf1 and wbuf2 are the primary and secondary work buffers.
//
// This can be thought of as a stack of both work buffers'
@ -103,7 +106,7 @@ type gcWork struct {
// spanq is a queue of spans to process.
//
// Only used if goexperiment.GreenTeaGC.
spanq localSpanQueue
spanq spanQueue
// ptrBuf is a temporary buffer used by span scanning.
ptrBuf *[pageSize / goarch.PtrSize]uintptr
@ -318,7 +321,18 @@ func (w *gcWork) dispose() {
}
w.wbuf2 = nil
}
if w.spanq.drain() {
if !w.spanq.empty() {
w.spanq.flush() // Flush any local work.
// There's globally-visible work now, so make everyone aware of it.
//
// Note that we need to make everyone aware even if flush didn't
// flush any local work. The global work was always visible, but
// the bitmap bit may have been unset.
//
// See the comment in tryStealSpan, which explains how it relies
// on this behavior.
work.spanqMask.set(w.id)
w.flushedWork = true
}
if w.bytesMarked != 0 {

View file

@ -213,13 +213,14 @@ type mheap struct {
pad [(cpu.CacheLinePadSize - unsafe.Sizeof(mcentral{})%cpu.CacheLinePadSize) % cpu.CacheLinePadSize]byte
}
spanalloc fixalloc // allocator for span*
cachealloc fixalloc // allocator for mcache*
specialfinalizeralloc fixalloc // allocator for specialfinalizer*
specialCleanupAlloc fixalloc // allocator for specialCleanup*
specialCheckFinalizerAlloc fixalloc // allocator for specialCheckFinalizer*
specialTinyBlockAlloc fixalloc // allocator for specialTinyBlock*
specialprofilealloc fixalloc // allocator for specialprofile*
spanalloc fixalloc // allocator for span
spanSPMCAlloc fixalloc // allocator for spanSPMC, protected by work.spanSPMCs.lock
cachealloc fixalloc // allocator for mcache
specialfinalizeralloc fixalloc // allocator for specialfinalizer
specialCleanupAlloc fixalloc // allocator for specialCleanup
specialCheckFinalizerAlloc fixalloc // allocator for specialCheckFinalizer
specialTinyBlockAlloc fixalloc // allocator for specialTinyBlock
specialprofilealloc fixalloc // allocator for specialprofile
specialReachableAlloc fixalloc // allocator for specialReachable
specialPinCounterAlloc fixalloc // allocator for specialPinCounter
specialWeakHandleAlloc fixalloc // allocator for specialWeakHandle
@ -793,6 +794,7 @@ func (h *mheap) init() {
lockInit(&h.speciallock, lockRankMheapSpecial)
h.spanalloc.init(unsafe.Sizeof(mspan{}), recordspan, unsafe.Pointer(h), &memstats.mspan_sys)
h.spanSPMCAlloc.init(unsafe.Sizeof(spanSPMC{}), nil, nil, &memstats.gcMiscSys)
h.cachealloc.init(unsafe.Sizeof(mcache{}), nil, nil, &memstats.mcache_sys)
h.specialfinalizeralloc.init(unsafe.Sizeof(specialfinalizer{}), nil, nil, &memstats.other_sys)
h.specialCleanupAlloc.init(unsafe.Sizeof(specialCleanup{}), nil, nil, &memstats.other_sys)
@ -1937,86 +1939,6 @@ func (list *mSpanList) takeAll(other *mSpanList) {
other.first, other.last = nil, nil
}
// mSpanQueue is like an mSpanList but is FIFO instead of LIFO and may
// be allocated on the stack. (mSpanList can be visible from the mspan
// itself, so it is marked as not-in-heap).
type mSpanQueue struct {
head, tail *mspan
n int
}
// push adds s to the end of the queue.
func (q *mSpanQueue) push(s *mspan) {
if s.next != nil {
throw("span already on list")
}
if q.tail == nil {
q.tail, q.head = s, s
} else {
q.tail.next = s
q.tail = s
}
q.n++
}
// pop removes a span from the head of the queue, if any.
func (q *mSpanQueue) pop() *mspan {
if q.head == nil {
return nil
}
s := q.head
q.head = s.next
s.next = nil
if q.head == nil {
q.tail = nil
}
q.n--
return s
}
// takeAll removes all the spans from q2 and adds them to the end of q1, in order.
func (q1 *mSpanQueue) takeAll(q2 *mSpanQueue) {
if q2.head == nil {
return
}
if q1.head == nil {
*q1 = *q2
} else {
q1.tail.next = q2.head
q1.tail = q2.tail
q1.n += q2.n
}
q2.tail = nil
q2.head = nil
q2.n = 0
}
// popN removes n spans from the head of the queue and returns them as a new queue.
func (q *mSpanQueue) popN(n int) mSpanQueue {
var newQ mSpanQueue
if n <= 0 {
return newQ
}
if n >= q.n {
newQ = *q
q.tail = nil
q.head = nil
q.n = 0
return newQ
}
s := q.head
for range n - 1 {
s = s.next
}
q.n -= n
newQ.head = q.head
newQ.tail = s
newQ.n = n
q.head = s.next
s.next = nil
return newQ
}
const (
// _KindSpecialTinyBlock indicates that a given allocation is a tiny block.
// Ordered before KindSpecialFinalizer and KindSpecialCleanup so that it

View file

@ -5735,6 +5735,7 @@ func setcpuprofilerate(hz int32) {
// previously destroyed p, and transitions it to status _Pgcstop.
func (pp *p) init(id int32) {
pp.id = id
pp.gcw.id = id
pp.status = _Pgcstop
pp.sudogcache = pp.sudogbuf[:0]
pp.deferpool = pp.deferpoolbuf[:0]
@ -5890,6 +5891,7 @@ func procresize(nprocs int32) *p {
idlepMask = idlepMask.resize(nprocs)
timerpMask = timerpMask.resize(nprocs)
work.spanqMask = work.spanqMask.resize(nprocs)
unlock(&allpLock)
}
@ -5954,6 +5956,7 @@ func procresize(nprocs int32) *p {
allp = allp[:nprocs]
idlepMask = idlepMask.resize(nprocs)
timerpMask = timerpMask.resize(nprocs)
work.spanqMask = work.spanqMask.resize(nprocs)
unlock(&allpLock)
}
@ -6892,6 +6895,16 @@ func (p pMask) clear(id int32) {
atomic.And(&p[word], ^mask)
}
// any returns true if any bit in p is set.
func (p pMask) any() bool {
for i := range p {
if atomic.Load(&p[i]) != 0 {
return true
}
}
return false
}
// resize resizes the pMask and returns a new one.
//
// The result may alias p, so callers are encouraged to