rpc: keep free lists of Request and Response structures.

Also in the common case avoid unnecessary buffering in
the channel.
Removes 13 allocations per round trip. Now at 86, down from
144 a week ago.

R=rsc, bradfitzgo, r2, rsc1
CC=golang-dev
https://golang.org/cl/4277060
This commit is contained in:
Rob Pike 2011-03-18 11:54:36 -07:00
parent add8c8db60
commit 3a6c0990a4
2 changed files with 72 additions and 21 deletions

View file

@ -39,8 +39,9 @@ type Call struct {
// There may be multiple outstanding Calls associated // There may be multiple outstanding Calls associated
// with a single Client. // with a single Client.
type Client struct { type Client struct {
mutex sync.Mutex // protects pending, seq mutex sync.Mutex // protects pending, seq, request
sending sync.Mutex sending sync.Mutex
request Request
seq uint64 seq uint64
codec ClientCodec codec ClientCodec
pending map[uint64]*Call pending map[uint64]*Call
@ -79,21 +80,21 @@ func (client *Client) send(c *Call) {
client.mutex.Unlock() client.mutex.Unlock()
// Encode and send the request. // Encode and send the request.
request := new(Request)
client.sending.Lock() client.sending.Lock()
defer client.sending.Unlock() defer client.sending.Unlock()
request.Seq = c.seq client.request.Seq = c.seq
request.ServiceMethod = c.ServiceMethod client.request.ServiceMethod = c.ServiceMethod
if err := client.codec.WriteRequest(request, c.Args); err != nil { if err := client.codec.WriteRequest(&client.request, c.Args); err != nil {
panic("rpc: client encode error: " + err.String()) panic("rpc: client encode error: " + err.String())
} }
} }
func (client *Client) input() { func (client *Client) input() {
var err os.Error var err os.Error
var response Response
for err == nil { for err == nil {
response := new(Response) response = Response{}
err = client.codec.ReadResponseHeader(response) err = client.codec.ReadResponseHeader(&response)
if err != nil { if err != nil {
if err == os.EOF && !client.closing { if err == os.EOF && !client.closing {
err = io.ErrUnexpectedEOF err = io.ErrUnexpectedEOF
@ -281,6 +282,6 @@ func (client *Client) Call(serviceMethod string, args interface{}, reply interfa
if client.shutdown { if client.shutdown {
return ErrShutdown return ErrShutdown
} }
call := <-client.Go(serviceMethod, args, reply, nil).Done call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error return call.Error
} }

View file

@ -154,23 +154,29 @@ type service struct {
// but documented here as an aid to debugging, such as when analyzing // but documented here as an aid to debugging, such as when analyzing
// network traffic. // network traffic.
type Request struct { type Request struct {
ServiceMethod string // format: "Service.Method" ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
} }
// Response is a header written before every RPC return. It is used internally // Response is a header written before every RPC return. It is used internally
// but documented here as an aid to debugging, such as when analyzing // but documented here as an aid to debugging, such as when analyzing
// network traffic. // network traffic.
type Response struct { type Response struct {
ServiceMethod string // echoes that of the Request ServiceMethod string // echoes that of the Request
Seq uint64 // echoes that of the request Seq uint64 // echoes that of the request
Error string // error, if any. Error string // error, if any.
next *Response // for free list in Server
} }
// Server represents an RPC Server. // Server represents an RPC Server.
type Server struct { type Server struct {
sync.Mutex // protects the serviceMap sync.Mutex // protects the serviceMap
serviceMap map[string]*service serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
respLock sync.Mutex // protects freeResp
freeResp *Response
} }
// NewServer returns a new Server. // NewServer returns a new Server.
@ -296,8 +302,8 @@ func _new(t *reflect.PtrType) *reflect.PtrValue {
return v return v
} }
func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) { func (server *Server) sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec ServerCodec, errmsg string) {
resp := new(Response) resp := server.getResponse()
// Encode the response header // Encode the response header
resp.ServiceMethod = req.ServiceMethod resp.ServiceMethod = req.ServiceMethod
if errmsg != "" { if errmsg != "" {
@ -311,6 +317,7 @@ func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec Se
log.Println("rpc: writing response:", err) log.Println("rpc: writing response:", err)
} }
sending.Unlock() sending.Unlock()
server.freeResponse(resp)
} }
func (m *methodType) NumCalls() (n uint) { func (m *methodType) NumCalls() (n uint) {
@ -320,7 +327,7 @@ func (m *methodType) NumCalls() (n uint) {
return n return n
} }
func (s *service) call(sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) { func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
mtype.Lock() mtype.Lock()
mtype.numCalls++ mtype.numCalls++
mtype.Unlock() mtype.Unlock()
@ -333,7 +340,8 @@ func (s *service) call(sending *sync.Mutex, mtype *methodType, req *Request, arg
if errInter != nil { if errInter != nil {
errmsg = errInter.(os.Error).String() errmsg = errInter.(os.Error).String()
} }
sendResponse(sending, req, replyv.Interface(), codec, errmsg) server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
server.freeRequest(req)
} }
type gobServerCodec struct { type gobServerCodec struct {
@ -395,7 +403,8 @@ func (server *Server) ServeCodec(codec ServerCodec) {
// send a response if we actually managed to read a header. // send a response if we actually managed to read a header.
if req != nil { if req != nil {
sendResponse(sending, req, invalidRequest, codec, err.String()) server.sendResponse(sending, req, invalidRequest, codec, err.String())
server.freeRequest(req)
} }
continue continue
} }
@ -411,16 +420,57 @@ func (server *Server) ServeCodec(codec ServerCodec) {
} }
break break
} }
sendResponse(sending, req, replyv.Interface(), codec, err.String()) server.sendResponse(sending, req, replyv.Interface(), codec, err.String())
continue continue
} }
go service.call(sending, mtype, req, argv, replyv, codec) go service.call(server, sending, mtype, req, argv, replyv, codec)
} }
codec.Close() codec.Close()
} }
func (server *Server) getRequest() *Request {
server.reqLock.Lock()
req := server.freeReq
if req == nil {
req = new(Request)
} else {
server.freeReq = req.next
*req = Request{}
}
server.reqLock.Unlock()
return req
}
func (server *Server) freeRequest(req *Request) {
server.reqLock.Lock()
req.next = server.freeReq
server.freeReq = req
server.reqLock.Unlock()
}
func (server *Server) getResponse() *Response {
server.respLock.Lock()
resp := server.freeResp
if resp == nil {
resp = new(Response)
} else {
server.freeResp = resp.next
*resp = Response{}
}
server.respLock.Unlock()
return resp
}
func (server *Server) freeResponse(resp *Response) {
server.respLock.Lock()
resp.next = server.freeResp
server.freeResp = resp
server.respLock.Unlock()
}
func (server *Server) readRequest(codec ServerCodec) (req *Request, service *service, mtype *methodType, err os.Error) { func (server *Server) readRequest(codec ServerCodec) (req *Request, service *service, mtype *methodType, err os.Error) {
// Grab the request header. // Grab the request header.
req = new(Request) req = server.getRequest()
err = codec.ReadRequestHeader(req) err = codec.ReadRequestHeader(req)
if err != nil { if err != nil {
req = nil req = nil