sync: new Cond implementation

Change Cond implementation to use a notification list such that waiters
can first register for a notification, release the lock, then actually
wait. Signalers never have to park anymore.

This is intended to address an issue in the previous implementation
where Broadcast could fail to signal all waiters.

Results of the existing benchmark are below.

                                          Original          New  Diff
BenchmarkCond1-48        2000000               745 ns/op    755 +1.3%
BenchmarkCond2-48        1000000              1545 ns/op   1532 -0.8%
BenchmarkCond4-48         300000              3833 ns/op   3896 +1.6%
BenchmarkCond8-48         200000             10049 ns/op  10257 +2.1%
BenchmarkCond16-48        100000             21123 ns/op  21236 +0.5%
BenchmarkCond32-48         30000             40393 ns/op  41097 +1.7%

Fixes #14064

Change-Id: I083466d61593a791a034df61f5305adfb8f1c7f9
Reviewed-on: https://go-review.googlesource.com/18892
Reviewed-by: Dmitry Vyukov <dvyukov@google.com>
Reviewed-by: Austin Clements <austin@google.com>
Run-TryBot: Caleb Spare <cespare@gmail.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
This commit is contained in:
Wedson Almeida Filho 2016-01-24 19:23:48 +01:00 committed by Austin Clements
parent 87151c82b6
commit 8e7072ca83
6 changed files with 242 additions and 145 deletions

View file

@ -329,7 +329,7 @@ func selecttype(size int32) *Type {
sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("prev")), typenod(Ptrto(Types[TUINT8])))) sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("prev")), typenod(Ptrto(Types[TUINT8]))))
sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("elem")), typenod(Ptrto(Types[TUINT8])))) sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("elem")), typenod(Ptrto(Types[TUINT8]))))
sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("releasetime")), typenod(Types[TUINT64]))) sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("releasetime")), typenod(Types[TUINT64])))
sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("nrelease")), typenod(Types[TINT32]))) sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("ticket")), typenod(Types[TUINT32])))
sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("waitlink")), typenod(Ptrto(Types[TUINT8])))) sudog.List.Append(Nod(ODCLFIELD, newname(Lookup("waitlink")), typenod(Ptrto(Types[TUINT8]))))
typecheck(&sudog, Etype) typecheck(&sudog, Etype)
sudog.Type.Noalg = true sudog.Type.Noalg = true

View file

@ -211,7 +211,8 @@ type gobuf struct {
} }
// Known to compiler. // Known to compiler.
// Changes here must also be made in src/cmd/internal/gc/select.go's selecttype. // Changes here must also be made in src/cmd/compile/internal/gc/select.go's
// selecttype.
type sudog struct { type sudog struct {
g *g g *g
selectdone *uint32 selectdone *uint32
@ -219,7 +220,7 @@ type sudog struct {
prev *sudog prev *sudog
elem unsafe.Pointer // data element elem unsafe.Pointer // data element
releasetime int64 releasetime int64
nrelease int32 // -1 for acquire ticket uint32
waitlink *sudog // g.waiting list waitlink *sudog // g.waiting list
} }

View file

@ -62,6 +62,13 @@ func net_runtime_Semrelease(addr *uint32) {
semrelease(addr) semrelease(addr)
} }
func readyWithTime(s *sudog, traceskip int) {
if s.releasetime != 0 {
s.releasetime = cputicks()
}
goready(s.g, traceskip)
}
// Called from runtime. // Called from runtime.
func semacquire(addr *uint32, profile bool) { func semacquire(addr *uint32, profile bool) {
gp := getg() gp := getg()
@ -141,10 +148,7 @@ func semrelease(addr *uint32) {
} }
unlock(&root.lock) unlock(&root.lock)
if s != nil { if s != nil {
if s.releasetime != 0 { readyWithTime(s, 5)
s.releasetime = cputicks()
}
goready(s.g, 4)
} }
} }
@ -193,101 +197,158 @@ func (root *semaRoot) dequeue(s *sudog) {
s.prev = nil s.prev = nil
} }
// Synchronous semaphore for sync.Cond. // notifyList is a ticket-based notification list used to implement sync.Cond.
type syncSema struct { //
// It must be kept in sync with the sync package.
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait uint32
// notify is the ticket number of the next waiter to be notified. It can
// be read outside the lock, but is only written to with lock held.
//
// Both wait & notify can wrap around, and such cases will be correctly
// handled as long as their "unwrapped" difference is bounded by 2^31.
// For this not to be the case, we'd need to have 2^31+ goroutines
// blocked on the same condvar, which is currently not possible.
notify uint32
// List of parked waiters.
lock mutex lock mutex
head *sudog head *sudog
tail *sudog tail *sudog
} }
// syncsemacquire waits for a pairing syncsemrelease on the same semaphore s. // less checks if a < b, considering a & b running counts that may overflow the
//go:linkname syncsemacquire sync.runtime_Syncsemacquire // 32-bit range, and that their "unwrapped" difference is always less than 2^31.
func syncsemacquire(s *syncSema) { func less(a, b uint32) bool {
lock(&s.lock) return int32(a-b) < 0
if s.head != nil && s.head.nrelease > 0 {
// Have pending release, consume it.
var wake *sudog
s.head.nrelease--
if s.head.nrelease == 0 {
wake = s.head
s.head = wake.next
if s.head == nil {
s.tail = nil
} }
// notifyListAdd adds the caller to a notify list such that it can receive
// notifications. The caller must eventually call notifyListWait to wait for
// such a notification, passing the returned ticket number.
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
// This may be called concurrently, for example, when called from
// sync.Cond.Wait while holding a RWMutex in read mode.
return atomic.Xadd(&l.wait, 1) - 1
} }
unlock(&s.lock)
if wake != nil { // notifyListWait waits for a notification. If one has been sent since
wake.next = nil // notifyListAdd was called, it returns immediately. Otherwise, it blocks.
goready(wake.g, 4) //go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
lock(&l.lock)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
} }
} else {
// Enqueue itself. // Enqueue itself.
w := acquireSudog() s := acquireSudog()
w.g = getg() s.g = getg()
w.nrelease = -1 s.ticket = t
w.next = nil s.releasetime = 0
w.releasetime = 0
t0 := int64(0) t0 := int64(0)
if blockprofilerate > 0 { if blockprofilerate > 0 {
t0 = cputicks() t0 = cputicks()
w.releasetime = -1 s.releasetime = -1
} }
if s.tail == nil { if l.tail == nil {
s.head = w l.head = s
} else { } else {
s.tail.next = w l.tail.next = s
} }
s.tail = w l.tail = s
goparkunlock(&s.lock, "semacquire", traceEvGoBlockCond, 3) goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
if t0 != 0 { if t0 != 0 {
blockevent(w.releasetime-t0, 2) blockevent(s.releasetime-t0, 2)
} }
releaseSudog(w) releaseSudog(s)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
// Fast-path: if there are no new waiters since the last notification
// we don't need to acquire the lock.
if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
return
}
// Pull the list out into a local variable, waiters will be readied
// outside the lock.
lock(&l.lock)
s := l.head
l.head = nil
l.tail = nil
// Update the next ticket to be notified. We can set it to the current
// value of wait because any previous waiters are already in the list
// or will notice that they have already been notified when trying to
// add themselves to the list.
atomic.Store(&l.notify, atomic.Load(&l.wait))
unlock(&l.lock)
// Go through the local list and ready all waiters.
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
} }
} }
// syncsemrelease waits for n pairing syncsemacquire on the same semaphore s. // notifyListNotifyOne notifies one entry in the list.
//go:linkname syncsemrelease sync.runtime_Syncsemrelease //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func syncsemrelease(s *syncSema, n uint32) { func notifyListNotifyOne(l *notifyList) {
lock(&s.lock) // Fast-path: if there are no new waiters since the last notification
for n > 0 && s.head != nil && s.head.nrelease < 0 { // we don't need to acquire the lock at all.
// Have pending acquire, satisfy it. if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
wake := s.head return
s.head = wake.next
if s.head == nil {
s.tail = nil
}
if wake.releasetime != 0 {
wake.releasetime = cputicks()
}
wake.next = nil
goready(wake.g, 4)
n--
}
if n > 0 {
// enqueue itself
w := acquireSudog()
w.g = getg()
w.nrelease = int32(n)
w.next = nil
w.releasetime = 0
if s.tail == nil {
s.head = w
} else {
s.tail.next = w
}
s.tail = w
goparkunlock(&s.lock, "semarelease", traceEvGoBlockCond, 3)
releaseSudog(w)
} else {
unlock(&s.lock)
}
} }
//go:linkname syncsemcheck sync.runtime_Syncsemcheck lock(&l.lock)
func syncsemcheck(sz uintptr) {
if sz != unsafe.Sizeof(syncSema{}) { // Re-check under the lock if we need to do anything.
print("runtime: bad syncSema size - sync=", sz, " runtime=", unsafe.Sizeof(syncSema{}), "\n") t := l.notify
throw("bad syncSema size") if t == atomic.Load(&l.wait) {
unlock(&l.lock)
return
}
// Update the next notify ticket number, and try to find the G that
// needs to be notified. If it hasn't made it to the list yet we won't
// find it, but it won't park itself once it sees the new notify number.
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
unlock(&l.lock)
s.next = nil
readyWithTime(s, 4)
return
}
}
unlock(&l.lock)
}
//go:linkname notifyListCheck sync.runtime_notifyListCheck
func notifyListCheck(sz uintptr) {
if sz != unsafe.Sizeof(notifyList{}) {
print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n")
throw("bad notifyList size")
} }
} }

View file

@ -5,7 +5,6 @@
package sync package sync
import ( import (
"internal/race"
"sync/atomic" "sync/atomic"
"unsafe" "unsafe"
) )
@ -24,8 +23,7 @@ type Cond struct {
// L is held while observing or changing the condition // L is held while observing or changing the condition
L Locker L Locker
sema syncSema notify notifyList
waiters uint32 // number of waiters
checker copyChecker checker copyChecker
} }
@ -52,15 +50,9 @@ func NewCond(l Locker) *Cond {
// //
func (c *Cond) Wait() { func (c *Cond) Wait() {
c.checker.check() c.checker.check()
if race.Enabled { t := runtime_notifyListAdd(&c.notify)
race.Disable()
}
atomic.AddUint32(&c.waiters, 1)
if race.Enabled {
race.Enable()
}
c.L.Unlock() c.L.Unlock()
runtime_Syncsemacquire(&c.sema) runtime_notifyListWait(&c.notify, t)
c.L.Lock() c.L.Lock()
} }
@ -69,7 +61,8 @@ func (c *Cond) Wait() {
// It is allowed but not required for the caller to hold c.L // It is allowed but not required for the caller to hold c.L
// during the call. // during the call.
func (c *Cond) Signal() { func (c *Cond) Signal() {
c.signalImpl(false) c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
} }
// Broadcast wakes all goroutines waiting on c. // Broadcast wakes all goroutines waiting on c.
@ -77,34 +70,8 @@ func (c *Cond) Signal() {
// It is allowed but not required for the caller to hold c.L // It is allowed but not required for the caller to hold c.L
// during the call. // during the call.
func (c *Cond) Broadcast() { func (c *Cond) Broadcast() {
c.signalImpl(true)
}
func (c *Cond) signalImpl(all bool) {
c.checker.check() c.checker.check()
if race.Enabled { runtime_notifyListNotifyAll(&c.notify)
race.Disable()
}
for {
old := atomic.LoadUint32(&c.waiters)
if old == 0 {
if race.Enabled {
race.Enable()
}
return
}
new := old - 1
if all {
new = 0
}
if atomic.CompareAndSwapUint32(&c.waiters, old, new) {
if race.Enabled {
race.Enable()
}
runtime_Syncsemrelease(&c.sema, old-new)
return
}
}
} }
// copyChecker holds back pointer to itself to detect object copying. // copyChecker holds back pointer to itself to detect object copying.

View file

@ -8,6 +8,7 @@ import (
"runtime" "runtime"
"testing" "testing"
"time"
) )
func TestCondSignal(t *testing.T) { func TestCondSignal(t *testing.T) {
@ -183,6 +184,64 @@ func TestRace(t *testing.T) {
<-done <-done
} }
func TestCondSignalStealing(t *testing.T) {
for iters := 0; iters < 1000; iters++ {
var m Mutex
cond := NewCond(&m)
// Start a waiter.
ch := make(chan struct{})
go func() {
m.Lock()
ch <- struct{}{}
cond.Wait()
m.Unlock()
ch <- struct{}{}
}()
<-ch
m.Lock()
m.Unlock()
// We know that the waiter is in the cond.Wait() call because we
// synchronized with it, then acquired/released the mutex it was
// holding when we synchronized.
//
// Start two goroutines that will race: one will broadcast on
// the cond var, the other will wait on it.
//
// The new waiter may or may not get notified, but the first one
// has to be notified.
done := false
go func() {
cond.Broadcast()
}()
go func() {
m.Lock()
for !done {
cond.Wait()
}
m.Unlock()
}()
// Check that the first waiter does get signaled.
select {
case <-ch:
case <-time.After(2 * time.Second):
t.Fatalf("First waiter didn't get broadcast.")
}
// Release the second waiter in case it didn't get the
// broadcast.
m.Lock()
done = true
m.Unlock()
cond.Broadcast()
}
}
func TestCondCopy(t *testing.T) { func TestCondCopy(t *testing.T) {
defer func() { defer func() {
err := recover() err := recover()

View file

@ -19,24 +19,33 @@ func runtime_Semacquire(s *uint32)
// library and should not be used directly. // library and should not be used directly.
func runtime_Semrelease(s *uint32) func runtime_Semrelease(s *uint32)
// Approximation of syncSema in runtime/sema.go. // Approximation of notifyList in runtime/sema.go. Size and alignment must
type syncSema struct { // agree.
type notifyList struct {
wait uint32
notify uint32
lock uintptr lock uintptr
head unsafe.Pointer head unsafe.Pointer
tail unsafe.Pointer tail unsafe.Pointer
} }
// Syncsemacquire waits for a pairing Syncsemrelease on the same semaphore s. // See runtime/sema.go for documentation.
func runtime_Syncsemacquire(s *syncSema) func runtime_notifyListAdd(l *notifyList) uint32
// Syncsemrelease waits for n pairing Syncsemacquire on the same semaphore s. // See runtime/sema.go for documentation.
func runtime_Syncsemrelease(s *syncSema, n uint32) func runtime_notifyListWait(l *notifyList, t uint32)
// Ensure that sync and runtime agree on size of syncSema. // See runtime/sema.go for documentation.
func runtime_Syncsemcheck(size uintptr) func runtime_notifyListNotifyAll(l *notifyList)
// See runtime/sema.go for documentation.
func runtime_notifyListNotifyOne(l *notifyList)
// Ensure that sync and runtime agree on size of notifyList.
func runtime_notifyListCheck(size uintptr)
func init() { func init() {
var s syncSema var n notifyList
runtime_Syncsemcheck(unsafe.Sizeof(s)) runtime_notifyListCheck(unsafe.Sizeof(n))
} }
// Active spinning runtime support. // Active spinning runtime support.