change HTTP access for RPC.

1. use CONNECT instead of GET.
   CONNECT has exactly the meaning
   we want; most of the time people
   connect to ip:port; we're connecting
   to /_goRPC_

2. wait for a successful HTTP response
   before assuming we are connected to
   the RPC protocol.  this allows better
   error reporting and also keeps the
   protocol alternating between speakers,
   so that the buffering in the HTTP request
   reader cannot accidentally eat some
   RPC bytes too.

gotest sometimes hangs, but not in HTTP.

gotest -match=Unknown hangs every few runs
even in a clean client.

R=r
DELTA=117  (57 added, 44 deleted, 16 changed)
OCL=31656
CL=31685
This commit is contained in:
Russ Cox 2009-07-15 10:49:47 -07:00
parent 8b7951495c
commit aa1e8064b2
3 changed files with 67 additions and 54 deletions

View file

@ -71,6 +71,43 @@ type readClose struct {
io.Closer; io.Closer;
} }
// ReadResponse reads and returns an HTTP response from r.
func ReadResponse(r *bufio.Reader) (*Response, os.Error) {
resp := new(Response);
// Parse the first line of the response.
resp.Header = make(map[string] string);
line, err := readLine(r);
if err != nil {
return nil, err;
}
f := strings.Split(line, " ", 3);
if len(f) < 3 {
return nil, &badStringError{"malformed HTTP response", line};
}
resp.Status = f[1] + " " + f[2];
resp.StatusCode, err = strconv.Atoi(f[1]);
if err != nil {
return nil, &badStringError{"malformed HTTP status code", f[1]};
}
// Parse the response headers.
for {
key, value, err := readKeyValue(r);
if err != nil {
return nil, err;
}
if key == "" {
break; // end of response header
}
resp.AddHeader(key, value);
}
return resp, nil;
}
// Send issues an HTTP request. Caller should close resp.Body when done reading it. // Send issues an HTTP request. Caller should close resp.Body when done reading it.
// //
// TODO: support persistent connections (multiple requests on a single connection). // TODO: support persistent connections (multiple requests on a single connection).
@ -90,45 +127,18 @@ func send(req *Request) (resp *Response, err os.Error) {
return nil, err; return nil, err;
} }
// Close the connection if we encounter an error during header parsing. We'll
// cancel this when we hand the connection off to our caller.
defer func() { if conn != nil { conn.Close() } }();
err = req.write(conn); err = req.write(conn);
if err != nil { if err != nil {
conn.Close();
return nil, err; return nil, err;
} }
// Parse the first line of the response.
resp = new(Response);
resp.Header = make(map[string] string);
reader := bufio.NewReader(conn); reader := bufio.NewReader(conn);
resp, err = ReadResponse(reader);
line, err := readLine(reader);
if err != nil { if err != nil {
conn.Close();
return nil, err; return nil, err;
} }
f := strings.Split(line, " ", 3);
if len(f) < 3 {
return nil, &badStringError{"malformed HTTP response", line};
}
resp.Status = f[1] + " " + f[2];
resp.StatusCode, err = strconv.Atoi(f[1]);
if err != nil {
return nil, &badStringError{"malformed HTTP status code", f[1]};
}
// Parse the response headers.
for {
key, value, err := readKeyValue(reader);
if err != nil {
return nil, err;
}
if key == "" {
break; // end of response header
}
resp.AddHeader(key, value);
}
r := io.Reader(reader); r := io.Reader(reader);
if v := resp.GetHeader("Transfer-Encoding"); v == "chunked" { if v := resp.GetHeader("Transfer-Encoding"); v == "chunked" {
@ -136,8 +146,6 @@ func send(req *Request) (resp *Response, err os.Error) {
} }
resp.Body = readClose{ r, conn }; resp.Body = readClose{ r, conn };
conn = nil; // so that defered func won't close it
err = nil;
return; return;
} }

View file

@ -5,12 +5,15 @@
package rpc package rpc
import ( import (
"bufio";
"gob"; "gob";
"http";
"io"; "io";
"log"; "log";
"net"; "net";
"os"; "os";
"rpc"; "rpc";
"strconv";
"sync"; "sync";
) )
@ -101,8 +104,19 @@ func DialHTTP(network, address string) (*Client, os.Error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
io.WriteString(conn, "GET " + rpcPath + " HTTP/1.0\n\n"); io.WriteString(conn, "CONNECT " + rpcPath + " HTTP/1.0\n\n");
return NewClient(conn), nil;
// Require successful HTTP response
// before switching to RPC protocol.
resp, err := http.ReadResponse(bufio.NewReader(conn));
if err == nil && resp.Status == connected {
return NewClient(conn), nil;
}
if err == nil {
err = os.ErrorString("unexpected HTTP response: " + resp.Status);
}
conn.Close();
return nil, &net.OpError{"dial-http", network, address, err};
} }
// Dial connects to an RPC server at the specified network address. // Dial connects to an RPC server at the specified network address.

View file

@ -250,35 +250,26 @@ func Accept(lis net.Listener) {
server.accept(lis) server.accept(lis)
} }
type bufRWC struct { // Can connect to RPC service using HTTP CONNECT to rpcPath.
r io.Reader; var rpcPath string = "/_goRPC_"
w io.Writer; var connected = "200 Connected to Go RPC"
c io.Closer;
}
func (b *bufRWC) Read(p []byte) (n int, err os.Error) {
return b.r.Read(p);
}
func (b *bufRWC) Write(p []byte) (n int, err os.Error) {
return b.w.Write(p);
}
func (b *bufRWC) Close() os.Error {
return b.c.Close();
}
func serveHTTP(c *http.Conn, req *http.Request) { func serveHTTP(c *http.Conn, req *http.Request) {
if req.Method != "CONNECT" {
c.SetHeader("Content-Type", "text/plain; charset=utf-8");
c.WriteHeader(http.StatusMethodNotAllowed);
io.WriteString(c, "405 must CONNECT to " + rpcPath + "\n");
return;
}
conn, buf, err := c.Hijack(); conn, buf, err := c.Hijack();
if err != nil { if err != nil {
log.Stderr("rpc hijacking ", c.RemoteAddr, ": ", err.String()); log.Stderr("rpc hijacking ", c.RemoteAddr, ": ", err.String());
return; return;
} }
server.serve(&bufRWC{buf, conn, conn}); io.WriteString(conn, "HTTP/1.0 " + connected + "\n\n");
server.serve(conn);
} }
var rpcPath string = "/_goRPC_"
// HandleHTTP registers an HTTP handler for RPC messages. // HandleHTTP registers an HTTP handler for RPC messages.
// It is still necessary to call http.Serve(). // It is still necessary to call http.Serve().
func HandleHTTP() { func HandleHTTP() {