mirror of
https://github.com/golang/go.git
synced 2025-12-08 06:10:04 +00:00
database/sql: reduce lock contention in Stmt.connStmt
Previouslly, Stmt.connStmt calls DB.connIfFree on each Stmt.css. Since Stmt.connStmt locks Stmt.mu, a concurrent use of Stmt causes lock contention on Stmt.mu. Additionally, DB.connIfFree locks DB.mu which is shared by DB.addDep and DB.removeDep. This change removes DB.connIfFree and makes use of a first unused connection in idle connection pool to reduce lock contention without making it complicated. Fixes #9484 On EC2 c3.8xlarge (E5-2680 v2 @ 2.80GHz * 32 vCPU): benchmark old ns/op new ns/op delta BenchmarkManyConcurrentQuery-8 40249 34721 -13.73% BenchmarkManyConcurrentQuery-16 45610 40176 -11.91% BenchmarkManyConcurrentQuery-32 109831 43179 -60.69% benchmark old allocs new allocs delta BenchmarkManyConcurrentQuery-8 25 25 +0.00% BenchmarkManyConcurrentQuery-16 25 25 +0.00% BenchmarkManyConcurrentQuery-32 25 25 +0.00% benchmark old bytes new bytes delta BenchmarkManyConcurrentQuery-8 3980 3969 -0.28% BenchmarkManyConcurrentQuery-16 3980 3982 +0.05% BenchmarkManyConcurrentQuery-32 3993 3990 -0.08% Change-Id: Ic96296922c465bac38a260018c58324dae1531d9 Reviewed-on: https://go-review.googlesource.com/2207 Reviewed-by: Mikio Hara <mikioh.mikioh@gmail.com>
This commit is contained in:
parent
3acb9fd98e
commit
1b61a97811
2 changed files with 70 additions and 111 deletions
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
var drivers = make(map[string]driver.Driver)
|
var drivers = make(map[string]driver.Driver)
|
||||||
|
|
@ -211,6 +212,10 @@ var ErrNoRows = errors.New("sql: no rows in result set")
|
||||||
type DB struct {
|
type DB struct {
|
||||||
driver driver.Driver
|
driver driver.Driver
|
||||||
dsn string
|
dsn string
|
||||||
|
// numClosed is an atomic counter which represents a total number of
|
||||||
|
// closed connections. Stmt.openStmt checks it before cleaning closed
|
||||||
|
// connections in Stmt.css.
|
||||||
|
numClosed uint64
|
||||||
|
|
||||||
mu sync.Mutex // protects following fields
|
mu sync.Mutex // protects following fields
|
||||||
freeConn []*driverConn
|
freeConn []*driverConn
|
||||||
|
|
@ -246,7 +251,7 @@ type driverConn struct {
|
||||||
// guarded by db.mu
|
// guarded by db.mu
|
||||||
inUse bool
|
inUse bool
|
||||||
onPut []func() // code (with db.mu held) run when conn is next returned
|
onPut []func() // code (with db.mu held) run when conn is next returned
|
||||||
dbmuClosed bool // same as closed, but guarded by db.mu, for connIfFree
|
dbmuClosed bool // same as closed, but guarded by db.mu, for removeClosedStmtLocked
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *driverConn) releaseConn(err error) {
|
func (dc *driverConn) releaseConn(err error) {
|
||||||
|
|
@ -329,6 +334,7 @@ func (dc *driverConn) finalClose() error {
|
||||||
dc.db.maybeOpenNewConnections()
|
dc.db.maybeOpenNewConnections()
|
||||||
dc.db.mu.Unlock()
|
dc.db.mu.Unlock()
|
||||||
|
|
||||||
|
atomic.AddUint64(&dc.db.numClosed, 1)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -683,42 +689,6 @@ var (
|
||||||
errConnBusy = errors.New("database/sql: internal sentinel error: conn is busy")
|
errConnBusy = errors.New("database/sql: internal sentinel error: conn is busy")
|
||||||
)
|
)
|
||||||
|
|
||||||
// connIfFree returns (wanted, nil) if wanted is still a valid conn and
|
|
||||||
// isn't in use.
|
|
||||||
//
|
|
||||||
// The error is errConnClosed if the connection if the requested connection
|
|
||||||
// is invalid because it's been closed.
|
|
||||||
//
|
|
||||||
// The error is errConnBusy if the connection is in use.
|
|
||||||
func (db *DB) connIfFree(wanted *driverConn) (*driverConn, error) {
|
|
||||||
db.mu.Lock()
|
|
||||||
defer db.mu.Unlock()
|
|
||||||
if wanted.dbmuClosed {
|
|
||||||
return nil, errConnClosed
|
|
||||||
}
|
|
||||||
if wanted.inUse {
|
|
||||||
return nil, errConnBusy
|
|
||||||
}
|
|
||||||
idx := -1
|
|
||||||
for ii, v := range db.freeConn {
|
|
||||||
if v == wanted {
|
|
||||||
idx = ii
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if idx >= 0 {
|
|
||||||
db.freeConn = append(db.freeConn[:idx], db.freeConn[idx+1:]...)
|
|
||||||
wanted.inUse = true
|
|
||||||
return wanted, nil
|
|
||||||
}
|
|
||||||
// TODO(bradfitz): shouldn't get here. After Go 1.1, change this to:
|
|
||||||
// panic("connIfFree call requested a non-closed, non-busy, non-free conn")
|
|
||||||
// Which passes all the tests, but I'm too paranoid to include this
|
|
||||||
// late in Go 1.1.
|
|
||||||
// Instead, treat it like a busy connection:
|
|
||||||
return nil, errConnBusy
|
|
||||||
}
|
|
||||||
|
|
||||||
// putConnHook is a hook for testing.
|
// putConnHook is a hook for testing.
|
||||||
var putConnHook func(*DB, *driverConn)
|
var putConnHook func(*DB, *driverConn)
|
||||||
|
|
||||||
|
|
@ -856,9 +826,10 @@ func (db *DB) prepare(query string) (*Stmt, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
stmt := &Stmt{
|
stmt := &Stmt{
|
||||||
db: db,
|
db: db,
|
||||||
query: query,
|
query: query,
|
||||||
css: []connStmt{{dc, si}},
|
css: []connStmt{{dc, si}},
|
||||||
|
lastNumClosed: atomic.LoadUint64(&db.numClosed),
|
||||||
}
|
}
|
||||||
db.addDep(stmt, stmt)
|
db.addDep(stmt, stmt)
|
||||||
db.putConn(dc, nil)
|
db.putConn(dc, nil)
|
||||||
|
|
@ -1293,6 +1264,10 @@ type Stmt struct {
|
||||||
// used if tx == nil and one is found that has idle
|
// used if tx == nil and one is found that has idle
|
||||||
// connections. If tx != nil, txsi is always used.
|
// connections. If tx != nil, txsi is always used.
|
||||||
css []connStmt
|
css []connStmt
|
||||||
|
|
||||||
|
// lastNumClosed is copied from db.numClosed when Stmt is created
|
||||||
|
// without tx and closed connections in css are removed.
|
||||||
|
lastNumClosed uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exec executes a prepared statement with the given arguments and
|
// Exec executes a prepared statement with the given arguments and
|
||||||
|
|
@ -1346,6 +1321,32 @@ func resultFromStatement(ds driverStmt, args ...interface{}) (Result, error) {
|
||||||
return driverResult{ds.Locker, resi}, nil
|
return driverResult{ds.Locker, resi}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// removeClosedStmtLocked removes closed conns in s.css.
|
||||||
|
//
|
||||||
|
// To avoid lock contention on DB.mu, we do it only when
|
||||||
|
// s.db.numClosed - s.lastNum is large enough.
|
||||||
|
func (s *Stmt) removeClosedStmtLocked() {
|
||||||
|
t := len(s.css)/2 + 1
|
||||||
|
if t > 10 {
|
||||||
|
t = 10
|
||||||
|
}
|
||||||
|
dbClosed := atomic.LoadUint64(&s.db.numClosed)
|
||||||
|
if dbClosed-s.lastNumClosed < uint64(t) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.db.mu.Lock()
|
||||||
|
for i := 0; i < len(s.css); i++ {
|
||||||
|
if s.css[i].dc.dbmuClosed {
|
||||||
|
s.css[i] = s.css[len(s.css)-1]
|
||||||
|
s.css = s.css[:len(s.css)-1]
|
||||||
|
i--
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.db.mu.Unlock()
|
||||||
|
s.lastNumClosed = dbClosed
|
||||||
|
}
|
||||||
|
|
||||||
// connStmt returns a free driver connection on which to execute the
|
// connStmt returns a free driver connection on which to execute the
|
||||||
// statement, a function to call to release the connection, and a
|
// statement, a function to call to release the connection, and a
|
||||||
// statement bound to that connection.
|
// statement bound to that connection.
|
||||||
|
|
@ -1372,35 +1373,15 @@ func (s *Stmt) connStmt() (ci *driverConn, releaseConn func(error), si driver.St
|
||||||
return ci, releaseConn, s.txsi.si, nil
|
return ci, releaseConn, s.txsi.si, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < len(s.css); i++ {
|
s.removeClosedStmtLocked()
|
||||||
v := s.css[i]
|
|
||||||
_, err := s.db.connIfFree(v.dc)
|
|
||||||
if err == nil {
|
|
||||||
s.mu.Unlock()
|
|
||||||
return v.dc, v.dc.releaseConn, v.si, nil
|
|
||||||
}
|
|
||||||
if err == errConnClosed {
|
|
||||||
// Lazily remove dead conn from our freelist.
|
|
||||||
s.css[i] = s.css[len(s.css)-1]
|
|
||||||
s.css = s.css[:len(s.css)-1]
|
|
||||||
i--
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
// If all connections are busy, either wait for one to become available (if
|
|
||||||
// we've already hit the maximum number of open connections) or create a
|
|
||||||
// new one.
|
|
||||||
//
|
|
||||||
// TODO(bradfitz): or always wait for one? make configurable later?
|
// TODO(bradfitz): or always wait for one? make configurable later?
|
||||||
dc, err := s.db.conn()
|
dc, err := s.db.conn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do another pass over the list to see whether this statement has
|
|
||||||
// already been prepared on the connection assigned to us.
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
for _, v := range s.css {
|
for _, v := range s.css {
|
||||||
if v.dc == dc {
|
if v.dc == dc {
|
||||||
|
|
|
||||||
|
|
@ -1764,56 +1764,6 @@ func doConcurrentTest(t testing.TB, ct concurrentTest) {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func manyConcurrentQueries(t testing.TB) {
|
|
||||||
maxProcs, numReqs := 16, 500
|
|
||||||
if testing.Short() {
|
|
||||||
maxProcs, numReqs = 4, 50
|
|
||||||
}
|
|
||||||
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(maxProcs))
|
|
||||||
|
|
||||||
db := newTestDB(t, "people")
|
|
||||||
defer closeDB(t, db)
|
|
||||||
|
|
||||||
stmt, err := db.Prepare("SELECT|people|name|")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer stmt.Close()
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(numReqs)
|
|
||||||
|
|
||||||
reqs := make(chan bool)
|
|
||||||
defer close(reqs)
|
|
||||||
|
|
||||||
for i := 0; i < maxProcs*2; i++ {
|
|
||||||
go func() {
|
|
||||||
for range reqs {
|
|
||||||
rows, err := stmt.Query()
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("error on query: %v", err)
|
|
||||||
wg.Done()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
var name string
|
|
||||||
for rows.Next() {
|
|
||||||
rows.Scan(&name)
|
|
||||||
}
|
|
||||||
rows.Close()
|
|
||||||
|
|
||||||
wg.Done()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < numReqs; i++ {
|
|
||||||
reqs <- true
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestIssue6081(t *testing.T) {
|
func TestIssue6081(t *testing.T) {
|
||||||
db := newTestDB(t, "people")
|
db := newTestDB(t, "people")
|
||||||
defer closeDB(t, db)
|
defer closeDB(t, db)
|
||||||
|
|
@ -1985,3 +1935,31 @@ func BenchmarkConcurrentRandom(b *testing.B) {
|
||||||
doConcurrentTest(b, ct)
|
doConcurrentTest(b, ct)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkManyConcurrentQueries(b *testing.B) {
|
||||||
|
b.ReportAllocs()
|
||||||
|
// To see lock contention in Go 1.4, 16~ cores and 128~ goroutines are required.
|
||||||
|
const parallelism = 16
|
||||||
|
|
||||||
|
db := newTestDB(b, "magicquery")
|
||||||
|
defer closeDB(b, db)
|
||||||
|
db.SetMaxIdleConns(runtime.GOMAXPROCS(0) * parallelism)
|
||||||
|
|
||||||
|
stmt, err := db.Prepare("SELECT|magicquery|op|op=?,millis=?")
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
defer stmt.Close()
|
||||||
|
|
||||||
|
b.SetParallelism(parallelism)
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
for pb.Next() {
|
||||||
|
rows, err := stmt.Query("sleep", 1)
|
||||||
|
if err != nil {
|
||||||
|
b.Error(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rows.Close()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue