mirror of
https://github.com/golang/go.git
synced 2025-12-08 06:10:04 +00:00
exp/sql: finish transactions, flesh out types, docs
Fixes #2328 (float, bool) R=rsc, r CC=golang-dev https://golang.org/cl/5294067
This commit is contained in:
parent
cefee3c919
commit
8089e57812
8 changed files with 434 additions and 85 deletions
|
|
@ -10,7 +10,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime"
|
||||
"sync"
|
||||
|
||||
"exp/sql/driver"
|
||||
|
|
@ -192,13 +191,13 @@ func (db *DB) Exec(query string, args ...interface{}) (Result, error) {
|
|||
|
||||
// If the driver does not implement driver.Execer, we need
|
||||
// a connection.
|
||||
conn, err := db.conn()
|
||||
ci, err := db.conn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer db.putConn(conn)
|
||||
defer db.putConn(ci)
|
||||
|
||||
if execer, ok := conn.(driver.Execer); ok {
|
||||
if execer, ok := ci.(driver.Execer); ok {
|
||||
resi, err := execer.Exec(query, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -206,7 +205,7 @@ func (db *DB) Exec(query string, args ...interface{}) (Result, error) {
|
|||
return result{resi}, nil
|
||||
}
|
||||
|
||||
sti, err := conn.Prepare(query)
|
||||
sti, err := ci.Prepare(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -233,18 +232,26 @@ func (db *DB) Query(query string, args ...interface{}) (*Rows, error) {
|
|||
// Row's Scan method is called.
|
||||
func (db *DB) QueryRow(query string, args ...interface{}) *Row {
|
||||
rows, err := db.Query(query, args...)
|
||||
if err != nil {
|
||||
return &Row{err: err}
|
||||
}
|
||||
return &Row{rows: rows}
|
||||
return &Row{rows: rows, err: err}
|
||||
}
|
||||
|
||||
// Begin starts a transaction. The isolation level is dependent on
|
||||
// Begin starts a transaction. The isolation level is dependent on
|
||||
// the driver.
|
||||
func (db *DB) Begin() (*Tx, error) {
|
||||
// TODO(bradfitz): add another method for beginning a transaction
|
||||
// at a specific isolation level.
|
||||
panic(todo())
|
||||
ci, err := db.conn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
txi, err := ci.Begin()
|
||||
if err != nil {
|
||||
db.putConn(ci)
|
||||
return nil, fmt.Errorf("sql: failed to Begin transaction: %v", err)
|
||||
}
|
||||
return &Tx{
|
||||
db: db,
|
||||
ci: ci,
|
||||
txi: txi,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DriverDatabase returns the database's underlying driver.
|
||||
|
|
@ -253,41 +260,158 @@ func (db *DB) Driver() driver.Driver {
|
|||
}
|
||||
|
||||
// Tx is an in-progress database transaction.
|
||||
//
|
||||
// A transaction must end with a call to Commit or Rollback.
|
||||
//
|
||||
// After a call to Commit or Rollback, all operations on the
|
||||
// transaction fail with ErrTransactionFinished.
|
||||
type Tx struct {
|
||||
db *DB
|
||||
|
||||
// ci is owned exclusively until Commit or Rollback, at which point
|
||||
// it's returned with putConn.
|
||||
ci driver.Conn
|
||||
txi driver.Tx
|
||||
|
||||
// cimu is held while somebody is using ci (between grabConn
|
||||
// and releaseConn)
|
||||
cimu sync.Mutex
|
||||
|
||||
// done transitions from false to true exactly once, on Commit
|
||||
// or Rollback. once done, all operations fail with
|
||||
// ErrTransactionFinished.
|
||||
done bool
|
||||
}
|
||||
|
||||
var ErrTransactionFinished = errors.New("sql: Transaction has already been committed or rolled back")
|
||||
|
||||
func (tx *Tx) close() {
|
||||
if tx.done {
|
||||
panic("double close") // internal error
|
||||
}
|
||||
tx.done = true
|
||||
tx.db.putConn(tx.ci)
|
||||
tx.ci = nil
|
||||
tx.txi = nil
|
||||
}
|
||||
|
||||
func (tx *Tx) grabConn() (driver.Conn, error) {
|
||||
if tx.done {
|
||||
return nil, ErrTransactionFinished
|
||||
}
|
||||
tx.cimu.Lock()
|
||||
return tx.ci, nil
|
||||
}
|
||||
|
||||
func (tx *Tx) releaseConn() {
|
||||
tx.cimu.Unlock()
|
||||
}
|
||||
|
||||
// Commit commits the transaction.
|
||||
func (tx *Tx) Commit() error {
|
||||
panic(todo())
|
||||
if tx.done {
|
||||
return ErrTransactionFinished
|
||||
}
|
||||
defer tx.close()
|
||||
return tx.txi.Commit()
|
||||
}
|
||||
|
||||
// Rollback aborts the transaction.
|
||||
func (tx *Tx) Rollback() error {
|
||||
panic(todo())
|
||||
if tx.done {
|
||||
return ErrTransactionFinished
|
||||
}
|
||||
defer tx.close()
|
||||
return tx.txi.Rollback()
|
||||
}
|
||||
|
||||
// Prepare creates a prepared statement.
|
||||
//
|
||||
// The statement is only valid within the scope of this transaction.
|
||||
func (tx *Tx) Prepare(query string) (*Stmt, error) {
|
||||
panic(todo())
|
||||
// TODO(bradfitz): the restriction that the returned statement
|
||||
// is only valid for this Transaction is lame and negates a
|
||||
// lot of the benefit of prepared statements. We could be
|
||||
// more efficient here and either provide a method to take an
|
||||
// existing Stmt (created on perhaps a different Conn), and
|
||||
// re-create it on this Conn if necessary. Or, better: keep a
|
||||
// map in DB of query string to Stmts, and have Stmt.Execute
|
||||
// do the right thing and re-prepare if the Conn in use
|
||||
// doesn't have that prepared statement. But we'll want to
|
||||
// avoid caching the statement in the case where we only call
|
||||
// conn.Prepare implicitly (such as in db.Exec or tx.Exec),
|
||||
// but the caller package can't be holding a reference to the
|
||||
// returned statement. Perhaps just looking at the reference
|
||||
// count (by noting Stmt.Close) would be enough. We might also
|
||||
// want a finalizer on Stmt to drop the reference count.
|
||||
ci, err := tx.grabConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.releaseConn()
|
||||
|
||||
si, err := ci.Prepare(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stmt := &Stmt{
|
||||
db: tx.db,
|
||||
tx: tx,
|
||||
txsi: si,
|
||||
query: query,
|
||||
}
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
// Exec executes a query that doesn't return rows.
|
||||
// For example: an INSERT and UPDATE.
|
||||
func (tx *Tx) Exec(query string, args ...interface{}) {
|
||||
panic(todo())
|
||||
func (tx *Tx) Exec(query string, args ...interface{}) (Result, error) {
|
||||
ci, err := tx.grabConn()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.releaseConn()
|
||||
|
||||
if execer, ok := ci.(driver.Execer); ok {
|
||||
resi, err := execer.Exec(query, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result{resi}, nil
|
||||
}
|
||||
|
||||
sti, err := ci.Prepare(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer sti.Close()
|
||||
resi, err := sti.Exec(args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result{resi}, nil
|
||||
}
|
||||
|
||||
// Query executes a query that returns rows, typically a SELECT.
|
||||
func (tx *Tx) Query(query string, args ...interface{}) (*Rows, error) {
|
||||
panic(todo())
|
||||
if tx.done {
|
||||
return nil, ErrTransactionFinished
|
||||
}
|
||||
stmt, err := tx.Prepare(query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer stmt.Close()
|
||||
return stmt.Query(args...)
|
||||
}
|
||||
|
||||
// QueryRow executes a query that is expected to return at most one row.
|
||||
// QueryRow always return a non-nil value. Errors are deferred until
|
||||
// Row's Scan method is called.
|
||||
func (tx *Tx) QueryRow(query string, args ...interface{}) *Row {
|
||||
panic(todo())
|
||||
rows, err := tx.Query(query, args...)
|
||||
return &Row{rows: rows, err: err}
|
||||
}
|
||||
|
||||
// connStmt is a prepared statement on a particular connection.
|
||||
|
|
@ -302,24 +426,28 @@ type Stmt struct {
|
|||
db *DB // where we came from
|
||||
query string // that created the Sttm
|
||||
|
||||
mu sync.Mutex
|
||||
closed bool
|
||||
css []connStmt // can use any that have idle connections
|
||||
}
|
||||
// If in a transaction, else both nil:
|
||||
tx *Tx
|
||||
txsi driver.Stmt
|
||||
|
||||
func todo() string {
|
||||
_, file, line, _ := runtime.Caller(1)
|
||||
return fmt.Sprintf("%s:%d: TODO: implement", file, line)
|
||||
mu sync.Mutex // protects the rest of the fields
|
||||
closed bool
|
||||
|
||||
// css is a list of underlying driver statement interfaces
|
||||
// that are valid on particular connections. This is only
|
||||
// used if tx == nil and one is found that has idle
|
||||
// connections. If tx != nil, txsi is always used.
|
||||
css []connStmt
|
||||
}
|
||||
|
||||
// Exec executes a prepared statement with the given arguments and
|
||||
// returns a Result summarizing the effect of the statement.
|
||||
func (s *Stmt) Exec(args ...interface{}) (Result, error) {
|
||||
ci, si, err := s.connStmt()
|
||||
_, releaseConn, si, err := s.connStmt()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer s.db.putConn(ci)
|
||||
defer releaseConn()
|
||||
|
||||
if want := si.NumInput(); len(args) != want {
|
||||
return nil, fmt.Errorf("db: expected %d arguments, got %d", want, len(args))
|
||||
|
|
@ -353,11 +481,29 @@ func (s *Stmt) Exec(args ...interface{}) (Result, error) {
|
|||
return result{resi}, nil
|
||||
}
|
||||
|
||||
func (s *Stmt) connStmt(args ...interface{}) (driver.Conn, driver.Stmt, error) {
|
||||
// connStmt returns a free driver connection on which to execute the
|
||||
// statement, a function to call to release the connection, and a
|
||||
// statement bound to that connection.
|
||||
func (s *Stmt) connStmt() (ci driver.Conn, releaseConn func(), si driver.Stmt, err error) {
|
||||
s.mu.Lock()
|
||||
if s.closed {
|
||||
return nil, nil, errors.New("db: statement is closed")
|
||||
s.mu.Unlock()
|
||||
err = errors.New("db: statement is closed")
|
||||
return
|
||||
}
|
||||
|
||||
// In a transaction, we always use the connection that the
|
||||
// transaction was created on.
|
||||
if s.tx != nil {
|
||||
s.mu.Unlock()
|
||||
ci, err = s.tx.grabConn() // blocks, waiting for the connection.
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
releaseConn = func() { s.tx.releaseConn() }
|
||||
return ci, releaseConn, s.txsi, nil
|
||||
}
|
||||
|
||||
var cs connStmt
|
||||
match := false
|
||||
for _, v := range s.css {
|
||||
|
|
@ -375,11 +521,11 @@ func (s *Stmt) connStmt(args ...interface{}) (driver.Conn, driver.Stmt, error) {
|
|||
if !match {
|
||||
ci, err := s.db.conn()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
si, err := ci.Prepare(s.query)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
s.mu.Lock()
|
||||
cs = connStmt{ci, si}
|
||||
|
|
@ -387,13 +533,15 @@ func (s *Stmt) connStmt(args ...interface{}) (driver.Conn, driver.Stmt, error) {
|
|||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
return cs.ci, cs.si, nil
|
||||
conn := cs.ci
|
||||
releaseConn = func() { s.db.putConn(conn) }
|
||||
return conn, releaseConn, cs.si, nil
|
||||
}
|
||||
|
||||
// Query executes a prepared query statement with the given arguments
|
||||
// and returns the query results as a *Rows.
|
||||
func (s *Stmt) Query(args ...interface{}) (*Rows, error) {
|
||||
ci, si, err := s.connStmt(args...)
|
||||
ci, releaseConn, si, err := s.connStmt()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -405,11 +553,13 @@ func (s *Stmt) Query(args ...interface{}) (*Rows, error) {
|
|||
s.db.putConn(ci)
|
||||
return nil, err
|
||||
}
|
||||
// Note: ownership of ci passes to the *Rows
|
||||
// Note: ownership of ci passes to the *Rows, to be freed
|
||||
// with releaseConn.
|
||||
rows := &Rows{
|
||||
db: s.db,
|
||||
ci: ci,
|
||||
rowsi: rowsi,
|
||||
db: s.db,
|
||||
ci: ci,
|
||||
releaseConn: releaseConn,
|
||||
rowsi: rowsi,
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
|
@ -436,19 +586,24 @@ func (s *Stmt) QueryRow(args ...interface{}) *Row {
|
|||
// Close closes the statement.
|
||||
func (s *Stmt) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock() // TODO(bradfitz): move this unlock after 'closed = true'?
|
||||
defer s.mu.Unlock()
|
||||
if s.closed {
|
||||
return nil
|
||||
}
|
||||
s.closed = true
|
||||
for _, v := range s.css {
|
||||
if ci, match := s.db.connIfFree(v.ci); match {
|
||||
v.si.Close()
|
||||
s.db.putConn(ci)
|
||||
} else {
|
||||
// TODO(bradfitz): care that we can't close
|
||||
// this statement because the statement's
|
||||
// connection is in use?
|
||||
|
||||
if s.tx != nil {
|
||||
s.txsi.Close()
|
||||
} else {
|
||||
for _, v := range s.css {
|
||||
if ci, match := s.db.connIfFree(v.ci); match {
|
||||
v.si.Close()
|
||||
s.db.putConn(ci)
|
||||
} else {
|
||||
// TODO(bradfitz): care that we can't close
|
||||
// this statement because the statement's
|
||||
// connection is in use?
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
|
@ -468,9 +623,10 @@ func (s *Stmt) Close() error {
|
|||
// err = rows.Error() // get any Error encountered during iteration
|
||||
// ...
|
||||
type Rows struct {
|
||||
db *DB
|
||||
ci driver.Conn // owned; must be returned when Rows is closed
|
||||
rowsi driver.Rows
|
||||
db *DB
|
||||
ci driver.Conn // owned; must call putconn when closed to release
|
||||
releaseConn func()
|
||||
rowsi driver.Rows
|
||||
|
||||
closed bool
|
||||
lastcols []interface{}
|
||||
|
|
@ -538,7 +694,7 @@ func (rs *Rows) Close() error {
|
|||
}
|
||||
rs.closed = true
|
||||
err := rs.rowsi.Close()
|
||||
rs.db.putConn(rs.ci)
|
||||
rs.releaseConn()
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue