net/http: pool transport gzip readers

goos: linux
goarch: amd64
pkg: net/http
             │   HEAD~1    │              HEAD              │
             │   sec/op    │    sec/op     vs base          │
ClientGzip-8   621.0µ ± 2%   616.3µ ± 10%  ~ (p=0.971 n=10)

             │    HEAD~1     │                 HEAD                 │
             │     B/op      │     B/op      vs base                │
ClientGzip-8   49.765Ki ± 0%   9.514Ki ± 2%  -80.88% (p=0.000 n=10)

             │   HEAD~1   │               HEAD                │
             │ allocs/op  │ allocs/op   vs base               │
ClientGzip-8   57.00 ± 0%   52.00 ± 0%  -8.77% (p=0.000 n=10)

Allocation saving comes from absent compress/flate.(*dictDecoder).init

This change also improves concurrent body read detection by returning an explicit error.

Updates #61353

Change-Id: I380acfca912dc009b3b9c8283e27b3526cedd546
GitHub-Last-Rev: df12f6a48af4854ba686fe431a9aeb6d9ba3c303
GitHub-Pull-Request: golang/go#61390
Reviewed-on: https://go-review.googlesource.com/c/go/+/510255
Reviewed-by: Sean Liao <sean@liao.dev>
Auto-Submit: Michael Pratt <mpratt@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Cherry Mui <cherryyz@google.com>
This commit is contained in:
Alexander Yastrebov 2025-09-03 10:09:08 +00:00 committed by Gopher Robot
parent 57769b5532
commit 861c90c907
2 changed files with 197 additions and 86 deletions

View file

@ -12,6 +12,7 @@ import (
"compress/gzip" "compress/gzip"
"compress/zlib" "compress/zlib"
"context" "context"
crand "crypto/rand"
"crypto/tls" "crypto/tls"
"crypto/x509" "crypto/x509"
"encoding/json" "encoding/json"
@ -5281,8 +5282,8 @@ func benchmarkClientServerParallel(b *testing.B, parallelism int, mode testMode)
func BenchmarkServer(b *testing.B) { func BenchmarkServer(b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
// Child process mode; // Child process mode;
if url := os.Getenv("TEST_BENCH_SERVER_URL"); url != "" { if url := os.Getenv("GO_TEST_BENCH_SERVER_URL"); url != "" {
n, err := strconv.Atoi(os.Getenv("TEST_BENCH_CLIENT_N")) n, err := strconv.Atoi(os.Getenv("GO_TEST_BENCH_CLIENT_N"))
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -5316,8 +5317,8 @@ func BenchmarkServer(b *testing.B) {
cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$") cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$")
cmd.Env = append([]string{ cmd.Env = append([]string{
fmt.Sprintf("TEST_BENCH_CLIENT_N=%d", b.N), fmt.Sprintf("GO_TEST_BENCH_CLIENT_N=%d", b.N),
fmt.Sprintf("TEST_BENCH_SERVER_URL=%s", ts.URL), fmt.Sprintf("GO_TEST_BENCH_SERVER_URL=%s", ts.URL),
}, os.Environ()...) }, os.Environ()...)
out, err := cmd.CombinedOutput() out, err := cmd.CombinedOutput()
if err != nil { if err != nil {
@ -5338,68 +5339,12 @@ func getNoBody(urlStr string) (*Response, error) {
// A benchmark for profiling the client without the HTTP server code. // A benchmark for profiling the client without the HTTP server code.
// The server code runs in a subprocess. // The server code runs in a subprocess.
func BenchmarkClient(b *testing.B) { func BenchmarkClient(b *testing.B) {
b.ReportAllocs()
b.StopTimer()
defer afterTest(b)
var data = []byte("Hello world.\n") var data = []byte("Hello world.\n")
if server := os.Getenv("TEST_BENCH_SERVER"); server != "" {
// Server process mode.
port := os.Getenv("TEST_BENCH_SERVER_PORT") // can be set by user
if port == "" {
port = "0"
}
ln, err := net.Listen("tcp", "localhost:"+port)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
fmt.Println(ln.Addr().String())
HandleFunc("/", func(w ResponseWriter, r *Request) {
r.ParseForm()
if r.Form.Get("stop") != "" {
os.Exit(0)
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
w.Write(data)
})
var srv Server
log.Fatal(srv.Serve(ln))
}
// Start server process. url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
ctx, cancel := context.WithCancel(context.Background()) w.Header().Set("Content-Type", "text/html; charset=utf-8")
cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkClient$") w.Write(data)
cmd.Env = append(cmd.Environ(), "TEST_BENCH_SERVER=yes") }))
cmd.Stderr = os.Stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
b.Fatal(err)
}
if err := cmd.Start(); err != nil {
b.Fatalf("subprocess failed to start: %v", err)
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
close(done)
}()
defer func() {
cancel()
<-done
}()
// Wait for the server in the child process to respond and tell us
// its listening address, once it's started listening:
bs := bufio.NewScanner(stdout)
if !bs.Scan() {
b.Fatalf("failed to read listening URL from child: %v", bs.Err())
}
url := "http://" + strings.TrimSpace(bs.Text()) + "/"
if _, err := getNoBody(url); err != nil {
b.Fatalf("initial probe of child process failed: %v", err)
}
// Do b.N requests to the server. // Do b.N requests to the server.
b.StartTimer() b.StartTimer()
@ -5418,12 +5363,115 @@ func BenchmarkClient(b *testing.B) {
} }
} }
b.StopTimer() b.StopTimer()
}
func startClientBenchmarkServer(b *testing.B, handler Handler) string {
b.ReportAllocs()
b.StopTimer()
if server := os.Getenv("GO_TEST_BENCH_SERVER"); server != "" {
// Server process mode.
port := os.Getenv("GO_TEST_BENCH_SERVER_PORT") // can be set by user
if port == "" {
port = "0"
}
ln, err := net.Listen("tcp", "localhost:"+port)
if err != nil {
log.Fatal(err)
}
fmt.Println(ln.Addr().String())
HandleFunc("/", func(w ResponseWriter, r *Request) {
r.ParseForm()
if r.Form.Get("stop") != "" {
os.Exit(0)
}
handler.ServeHTTP(w, r)
})
var srv Server
log.Fatal(srv.Serve(ln))
}
// Start server process.
ctx, cancel := context.WithCancel(context.Background())
cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^"+b.Name()+"$")
cmd.Env = append(cmd.Environ(), "GO_TEST_BENCH_SERVER=yes")
cmd.Stderr = os.Stderr
stdout, err := cmd.StdoutPipe()
if err != nil {
b.Fatal(err)
}
if err := cmd.Start(); err != nil {
b.Fatalf("subprocess failed to start: %v", err)
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
close(done)
}()
// Wait for the server in the child process to respond and tell us
// its listening address, once it's started listening:
bs := bufio.NewScanner(stdout)
if !bs.Scan() {
b.Fatalf("failed to read listening URL from child: %v", bs.Err())
}
url := "http://" + strings.TrimSpace(bs.Text()) + "/"
if _, err := getNoBody(url); err != nil {
b.Fatalf("initial probe of child process failed: %v", err)
}
// Instruct server process to stop. // Instruct server process to stop.
getNoBody(url + "?stop=yes") b.Cleanup(func() {
if err := <-done; err != nil { getNoBody(url + "?stop=yes")
b.Fatalf("subprocess failed: %v", err) if err := <-done; err != nil {
b.Fatalf("subprocess failed: %v", err)
}
cancel()
<-done
afterTest(b)
})
return url
}
func BenchmarkClientGzip(b *testing.B) {
const responseSize = 1024 * 1024
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := io.CopyN(gz, crand.Reader, responseSize); err != nil {
b.Fatal(err)
} }
gz.Close()
data := buf.Bytes()
url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
w.Header().Set("Content-Encoding", "gzip")
w.Write(data)
}))
// Do b.N requests to the server.
b.StartTimer()
for i := 0; i < b.N; i++ {
res, err := Get(url)
if err != nil {
b.Fatalf("Get: %v", err)
}
n, err := io.Copy(io.Discard, res.Body)
res.Body.Close()
if err != nil {
b.Fatalf("ReadAll: %v", err)
}
if n != responseSize {
b.Fatalf("ReadAll: expected %d bytes, got %d", responseSize, n)
}
}
b.StopTimer()
} }
func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) { func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) {

View file

@ -11,6 +11,7 @@ package http
import ( import (
"bufio" "bufio"
"compress/flate"
"compress/gzip" "compress/gzip"
"container/list" "container/list"
"context" "context"
@ -2988,6 +2989,7 @@ type bodyEOFSignal struct {
} }
var errReadOnClosedResBody = errors.New("http: read on closed response body") var errReadOnClosedResBody = errors.New("http: read on closed response body")
var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
es.mu.Lock() es.mu.Lock()
@ -3037,37 +3039,98 @@ func (es *bodyEOFSignal) condfn(err error) error {
} }
// gzipReader wraps a response body so it can lazily // gzipReader wraps a response body so it can lazily
// call gzip.NewReader on the first call to Read // get gzip.Reader from the pool on the first call to Read.
// After Close is called it puts gzip.Reader to the pool immediately
// if there is no Read in progress or later when Read completes.
type gzipReader struct { type gzipReader struct {
_ incomparable _ incomparable
body *bodyEOFSignal // underlying HTTP/1 response body framing body *bodyEOFSignal // underlying HTTP/1 response body framing
zr *gzip.Reader // lazily-initialized gzip reader mu sync.Mutex // guards zr and zerr
zerr error // any error from gzip.NewReader; sticky zr *gzip.Reader
zerr error
}
type eofReader struct{}
func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
zr := gzipPool.Get().(*gzip.Reader)
if err := zr.Reset(r); err != nil {
gzipPoolPut(zr)
return nil, err
}
return zr, nil
}
// gzipPoolPut puts a gzip.Reader back into the pool.
func gzipPoolPut(zr *gzip.Reader) {
// Reset will allocate bufio.Reader if we pass it anything
// other than a flate.Reader, so ensure that it's getting one.
var r flate.Reader = eofReader{}
zr.Reset(r)
gzipPool.Put(zr)
}
// acquire returns a gzip.Reader for reading response body.
// The reader must be released after use.
func (gz *gzipReader) acquire() (*gzip.Reader, error) {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr != nil {
return nil, gz.zerr
}
if gz.zr == nil {
gz.zr, gz.zerr = gzipPoolGet(gz.body)
if gz.zerr != nil {
return nil, gz.zerr
}
}
ret := gz.zr
gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
return ret, nil
}
// release returns the gzip.Reader to the pool if Close was called during Read.
func (gz *gzipReader) release(zr *gzip.Reader) {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr == errConcurrentReadOnResBody {
gz.zr, gz.zerr = zr, nil
} else { // errReadOnClosedResBody
gzipPoolPut(zr)
}
}
// close returns the gzip.Reader to the pool immediately or
// signals release to do so after Read completes.
func (gz *gzipReader) close() {
gz.mu.Lock()
defer gz.mu.Unlock()
if gz.zerr == nil && gz.zr != nil {
gzipPoolPut(gz.zr)
gz.zr = nil
}
gz.zerr = errReadOnClosedResBody
} }
func (gz *gzipReader) Read(p []byte) (n int, err error) { func (gz *gzipReader) Read(p []byte) (n int, err error) {
if gz.zr == nil { zr, err := gz.acquire()
if gz.zerr == nil {
gz.zr, gz.zerr = gzip.NewReader(gz.body)
}
if gz.zerr != nil {
return 0, gz.zerr
}
}
gz.body.mu.Lock()
if gz.body.closed {
err = errReadOnClosedResBody
}
gz.body.mu.Unlock()
if err != nil { if err != nil {
return 0, err return 0, err
} }
return gz.zr.Read(p) defer gz.release(zr)
return zr.Read(p)
} }
func (gz *gzipReader) Close() error { func (gz *gzipReader) Close() error {
gz.close()
return gz.body.Close() return gz.body.Close()
} }