mirror of
https://github.com/golang/go.git
synced 2026-02-07 02:09:55 +00:00
net/http: try to drain response body upon closing for better connection re-use
Currently, we have a rather inconsistent behavior in terms of whether a connection can be re-used or not when an HTTP body is not read to completion: - In HTTP/2, not reading bodies to completion is not an issue, since a new HTTP/2 stream can be created on the same TCP connection. - In HTTP/1 server, we discard up to 256 KiB of unconsumed request body, to potentially allow re-use. - In HTTP/1 client, we do not do anything, and fail to re-use a TCP connection if there are any unconsumed response body at all. This has led to some confusion. For example, some users have mistakenly discarded response body for HTTP/2 when doing so is not needed. Manually discarding response body can also be disadvantageous if the body is excessively large or is a never-ending stream. To solve this issue, this CL makes it so that closing a response body will cause any remaining content to be drained, up to a limit of 256 KiB or 50 milliseconds, whichever one is reached first. This allows better connection re-use for HTTP/1, and most users can now avoid having to manually drain their response body. For #77370 Change-Id: I71e1227fc9cf5f901362c8e234320817f6b0be24 Reviewed-on: https://go-review.googlesource.com/c/go/+/737720 Reviewed-by: Nicholas Husin <husin@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Damien Neil <dneil@google.com>
This commit is contained in:
parent
d99be5c444
commit
11d5284363
3 changed files with 146 additions and 1 deletions
|
|
@ -33,6 +33,7 @@ var (
|
|||
Export_shouldCopyHeaderOnRedirect = shouldCopyHeaderOnRedirect
|
||||
Export_writeStatusLine = writeStatusLine
|
||||
Export_is408Message = is408Message
|
||||
MaxPostCloseReadTime = maxPostCloseReadTime
|
||||
)
|
||||
|
||||
var MaxWriteWaitBeforeConnReuse = &maxWriteWaitBeforeConnReuse
|
||||
|
|
|
|||
|
|
@ -2288,6 +2288,33 @@ func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritte
|
|||
// closing a net.Conn that is now owned by the caller.
|
||||
var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn")
|
||||
|
||||
// maxPostCloseReadBytes is the max number of bytes that a client is willing to
|
||||
// read when draining the response body of any unread bytes after it has been
|
||||
// closed. This number is chosen for consistency with maxPostHandlerReadBytes.
|
||||
const maxPostCloseReadBytes = 256 << 10
|
||||
|
||||
// maxPostCloseReadTime defines the maximum amount of time that a client is
|
||||
// willing to spend on draining a response body of any unread bytes after it
|
||||
// has been closed.
|
||||
const maxPostCloseReadTime = 50 * time.Millisecond
|
||||
|
||||
func maybeDrainBody(body io.Reader) bool {
|
||||
drainedCh := make(chan bool, 1)
|
||||
go func() {
|
||||
if _, err := io.CopyN(io.Discard, body, maxPostCloseReadBytes+1); err == io.EOF {
|
||||
drainedCh <- true
|
||||
} else {
|
||||
drainedCh <- false
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case drained := <-drainedCh:
|
||||
return drained
|
||||
case <-time.After(maxPostCloseReadTime):
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (pc *persistConn) readLoop() {
|
||||
closeErr := errReadLoopExiting // default value, if not changed below
|
||||
defer func() {
|
||||
|
|
@ -2449,6 +2476,9 @@ func (pc *persistConn) readLoop() {
|
|||
// reading the response body. (or for cancellation or death)
|
||||
select {
|
||||
case bodyEOF := <-waitForBodyRead:
|
||||
if !bodyEOF && resp.ContentLength <= maxPostCloseReadBytes {
|
||||
bodyEOF = maybeDrainBody(body.body)
|
||||
}
|
||||
alive = alive &&
|
||||
bodyEOF &&
|
||||
!pc.sawEOF &&
|
||||
|
|
|
|||
|
|
@ -22,11 +22,11 @@ import (
|
|||
"fmt"
|
||||
"go/token"
|
||||
"internal/nettrace"
|
||||
"internal/synctest"
|
||||
"io"
|
||||
"log"
|
||||
mrand "math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
. "net/http"
|
||||
"net/http/httptest"
|
||||
"net/http/httptrace"
|
||||
|
|
@ -44,6 +44,7 @@ import (
|
|||
"sync/atomic"
|
||||
"testing"
|
||||
"testing/iotest"
|
||||
"testing/synctest"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/http/httpguts"
|
||||
|
|
@ -474,6 +475,119 @@ func testTransportReadToEndReusesConn(t *testing.T, mode testMode) {
|
|||
}
|
||||
}
|
||||
|
||||
// In HTTP/1, if a response body has not been fully read by the time it is
|
||||
// closed, we try to drain it, up to a maximum byte and time limit. If we
|
||||
// manage to drain it before the next request, the connection is re-used;
|
||||
// otherwise, a new connection is made.
|
||||
func TestTransportNotReadToEndConnectionReuse(t *testing.T) {
|
||||
run(t, testTransportNotReadToEndConnectionReuse, []testMode{http1Mode, https1Mode})
|
||||
}
|
||||
func testTransportNotReadToEndConnectionReuse(t *testing.T, mode testMode) {
|
||||
tests := []struct {
|
||||
name string
|
||||
bodyLen int
|
||||
contentLenKnown bool
|
||||
headRequest bool
|
||||
timeBetweenReqs time.Duration
|
||||
responseTime time.Duration
|
||||
wantReuse bool
|
||||
}{
|
||||
{
|
||||
name: "unconsumed body within drain limit",
|
||||
bodyLen: 200 * 1024,
|
||||
timeBetweenReqs: http.MaxPostCloseReadTime,
|
||||
wantReuse: true,
|
||||
},
|
||||
{
|
||||
name: "unconsumed body within drain limit with known length",
|
||||
bodyLen: 200 * 1024,
|
||||
contentLenKnown: true,
|
||||
timeBetweenReqs: http.MaxPostCloseReadTime,
|
||||
wantReuse: true,
|
||||
},
|
||||
{
|
||||
name: "unconsumed body larger than drain limit",
|
||||
bodyLen: 500 * 1024,
|
||||
timeBetweenReqs: http.MaxPostCloseReadTime,
|
||||
wantReuse: false,
|
||||
},
|
||||
{
|
||||
name: "unconsumed body larger than drain limit with known length",
|
||||
bodyLen: 500 * 1024,
|
||||
contentLenKnown: true,
|
||||
timeBetweenReqs: http.MaxPostCloseReadTime,
|
||||
wantReuse: false,
|
||||
},
|
||||
{
|
||||
name: "new requests start before drain for old requests are finished",
|
||||
bodyLen: 200 * 1024,
|
||||
timeBetweenReqs: 0,
|
||||
responseTime: time.Minute,
|
||||
wantReuse: false,
|
||||
},
|
||||
{
|
||||
// Server handler will always return no body when handling a HEAD
|
||||
// request, which should always allow connection re-use.
|
||||
name: "unconsumed body larger than drain limit for HEAD request",
|
||||
bodyLen: 500 * 1024,
|
||||
headRequest: true,
|
||||
wantReuse: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
subtest := func(t *testing.T) {
|
||||
addrSeen := make(map[string]int)
|
||||
ts := newClientServerTest(t, mode, HandlerFunc(func(w ResponseWriter, r *Request) {
|
||||
addrSeen[r.RemoteAddr]++
|
||||
time.Sleep(tc.responseTime)
|
||||
if tc.contentLenKnown {
|
||||
w.Header().Add("Content-Length", strconv.Itoa(tc.bodyLen))
|
||||
}
|
||||
w.Write(slices.Repeat([]byte("a"), tc.bodyLen))
|
||||
}), optFakeNet).ts
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for range 10 {
|
||||
wg.Go(func() {
|
||||
method := http.MethodGet
|
||||
if tc.headRequest {
|
||||
method = http.MethodHead
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
req, err := http.NewRequestWithContext(ctx, method, ts.URL, nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
resp, err := ts.Client().Do(req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Errorf("expected HTTP 200, got: %v", resp.StatusCode)
|
||||
}
|
||||
resp.Body.Close()
|
||||
// Context cancellation and body read after the body has been
|
||||
// closed should not affect connection re-use.
|
||||
cancel()
|
||||
if n, err := resp.Body.Read([]byte{}); n != 0 || err == nil {
|
||||
t.Errorf("read after body has been closed should not succeed, but read %v byte with %v error", n, err)
|
||||
}
|
||||
})
|
||||
time.Sleep(tc.timeBetweenReqs)
|
||||
synctest.Wait()
|
||||
}
|
||||
wg.Wait()
|
||||
if (len(addrSeen) == 1) != tc.wantReuse {
|
||||
t.Errorf("want connection reuse to be %v, but %v connections were created", tc.wantReuse, len(addrSeen))
|
||||
}
|
||||
}
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
synctest.Test(t, subtest)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransportMaxPerHostIdleConns(t *testing.T) {
|
||||
run(t, testTransportMaxPerHostIdleConns, []testMode{http1Mode})
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue