mirror of
https://github.com/golang/go.git
synced 2025-12-08 06:10:04 +00:00
net/http: pool transport gzip readers
goos: linux
goarch: amd64
pkg: net/http
│ HEAD~1 │ HEAD │
│ sec/op │ sec/op vs base │
ClientGzip-8 621.0µ ± 2% 616.3µ ± 10% ~ (p=0.971 n=10)
│ HEAD~1 │ HEAD │
│ B/op │ B/op vs base │
ClientGzip-8 49.765Ki ± 0% 9.514Ki ± 2% -80.88% (p=0.000 n=10)
│ HEAD~1 │ HEAD │
│ allocs/op │ allocs/op vs base │
ClientGzip-8 57.00 ± 0% 52.00 ± 0% -8.77% (p=0.000 n=10)
Allocation saving comes from absent compress/flate.(*dictDecoder).init
This change also improves concurrent body read detection by returning an explicit error.
Updates #61353
Change-Id: I380acfca912dc009b3b9c8283e27b3526cedd546
GitHub-Last-Rev: df12f6a48af4854ba686fe431a9aeb6d9ba3c303
GitHub-Pull-Request: golang/go#61390
Reviewed-on: https://go-review.googlesource.com/c/go/+/510255
Reviewed-by: Sean Liao <sean@liao.dev>
Auto-Submit: Michael Pratt <mpratt@google.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Cherry Mui <cherryyz@google.com>
This commit is contained in:
parent
57769b5532
commit
861c90c907
2 changed files with 197 additions and 86 deletions
|
|
@ -12,6 +12,7 @@ import (
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"compress/zlib"
|
"compress/zlib"
|
||||||
"context"
|
"context"
|
||||||
|
crand "crypto/rand"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"crypto/x509"
|
"crypto/x509"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
@ -5281,8 +5282,8 @@ func benchmarkClientServerParallel(b *testing.B, parallelism int, mode testMode)
|
||||||
func BenchmarkServer(b *testing.B) {
|
func BenchmarkServer(b *testing.B) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
// Child process mode;
|
// Child process mode;
|
||||||
if url := os.Getenv("TEST_BENCH_SERVER_URL"); url != "" {
|
if url := os.Getenv("GO_TEST_BENCH_SERVER_URL"); url != "" {
|
||||||
n, err := strconv.Atoi(os.Getenv("TEST_BENCH_CLIENT_N"))
|
n, err := strconv.Atoi(os.Getenv("GO_TEST_BENCH_CLIENT_N"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
@ -5316,8 +5317,8 @@ func BenchmarkServer(b *testing.B) {
|
||||||
|
|
||||||
cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$")
|
cmd := testenv.Command(b, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkServer$")
|
||||||
cmd.Env = append([]string{
|
cmd.Env = append([]string{
|
||||||
fmt.Sprintf("TEST_BENCH_CLIENT_N=%d", b.N),
|
fmt.Sprintf("GO_TEST_BENCH_CLIENT_N=%d", b.N),
|
||||||
fmt.Sprintf("TEST_BENCH_SERVER_URL=%s", ts.URL),
|
fmt.Sprintf("GO_TEST_BENCH_SERVER_URL=%s", ts.URL),
|
||||||
}, os.Environ()...)
|
}, os.Environ()...)
|
||||||
out, err := cmd.CombinedOutput()
|
out, err := cmd.CombinedOutput()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -5338,68 +5339,12 @@ func getNoBody(urlStr string) (*Response, error) {
|
||||||
// A benchmark for profiling the client without the HTTP server code.
|
// A benchmark for profiling the client without the HTTP server code.
|
||||||
// The server code runs in a subprocess.
|
// The server code runs in a subprocess.
|
||||||
func BenchmarkClient(b *testing.B) {
|
func BenchmarkClient(b *testing.B) {
|
||||||
b.ReportAllocs()
|
|
||||||
b.StopTimer()
|
|
||||||
defer afterTest(b)
|
|
||||||
|
|
||||||
var data = []byte("Hello world.\n")
|
var data = []byte("Hello world.\n")
|
||||||
if server := os.Getenv("TEST_BENCH_SERVER"); server != "" {
|
|
||||||
// Server process mode.
|
url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
|
||||||
port := os.Getenv("TEST_BENCH_SERVER_PORT") // can be set by user
|
|
||||||
if port == "" {
|
|
||||||
port = "0"
|
|
||||||
}
|
|
||||||
ln, err := net.Listen("tcp", "localhost:"+port)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Fprintln(os.Stderr, err.Error())
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
fmt.Println(ln.Addr().String())
|
|
||||||
HandleFunc("/", func(w ResponseWriter, r *Request) {
|
|
||||||
r.ParseForm()
|
|
||||||
if r.Form.Get("stop") != "" {
|
|
||||||
os.Exit(0)
|
|
||||||
}
|
|
||||||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||||||
w.Write(data)
|
w.Write(data)
|
||||||
})
|
}))
|
||||||
var srv Server
|
|
||||||
log.Fatal(srv.Serve(ln))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start server process.
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^BenchmarkClient$")
|
|
||||||
cmd.Env = append(cmd.Environ(), "TEST_BENCH_SERVER=yes")
|
|
||||||
cmd.Stderr = os.Stderr
|
|
||||||
stdout, err := cmd.StdoutPipe()
|
|
||||||
if err != nil {
|
|
||||||
b.Fatal(err)
|
|
||||||
}
|
|
||||||
if err := cmd.Start(); err != nil {
|
|
||||||
b.Fatalf("subprocess failed to start: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
done := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
done <- cmd.Wait()
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
defer func() {
|
|
||||||
cancel()
|
|
||||||
<-done
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for the server in the child process to respond and tell us
|
|
||||||
// its listening address, once it's started listening:
|
|
||||||
bs := bufio.NewScanner(stdout)
|
|
||||||
if !bs.Scan() {
|
|
||||||
b.Fatalf("failed to read listening URL from child: %v", bs.Err())
|
|
||||||
}
|
|
||||||
url := "http://" + strings.TrimSpace(bs.Text()) + "/"
|
|
||||||
if _, err := getNoBody(url); err != nil {
|
|
||||||
b.Fatalf("initial probe of child process failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do b.N requests to the server.
|
// Do b.N requests to the server.
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
@ -5418,12 +5363,115 @@ func BenchmarkClient(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
b.StopTimer()
|
b.StopTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func startClientBenchmarkServer(b *testing.B, handler Handler) string {
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.StopTimer()
|
||||||
|
|
||||||
|
if server := os.Getenv("GO_TEST_BENCH_SERVER"); server != "" {
|
||||||
|
// Server process mode.
|
||||||
|
port := os.Getenv("GO_TEST_BENCH_SERVER_PORT") // can be set by user
|
||||||
|
if port == "" {
|
||||||
|
port = "0"
|
||||||
|
}
|
||||||
|
ln, err := net.Listen("tcp", "localhost:"+port)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
fmt.Println(ln.Addr().String())
|
||||||
|
|
||||||
|
HandleFunc("/", func(w ResponseWriter, r *Request) {
|
||||||
|
r.ParseForm()
|
||||||
|
if r.Form.Get("stop") != "" {
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
handler.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
var srv Server
|
||||||
|
log.Fatal(srv.Serve(ln))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start server process.
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
cmd := testenv.CommandContext(b, ctx, os.Args[0], "-test.run=^$", "-test.bench=^"+b.Name()+"$")
|
||||||
|
cmd.Env = append(cmd.Environ(), "GO_TEST_BENCH_SERVER=yes")
|
||||||
|
cmd.Stderr = os.Stderr
|
||||||
|
stdout, err := cmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := cmd.Start(); err != nil {
|
||||||
|
b.Fatalf("subprocess failed to start: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
done <- cmd.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for the server in the child process to respond and tell us
|
||||||
|
// its listening address, once it's started listening:
|
||||||
|
bs := bufio.NewScanner(stdout)
|
||||||
|
if !bs.Scan() {
|
||||||
|
b.Fatalf("failed to read listening URL from child: %v", bs.Err())
|
||||||
|
}
|
||||||
|
url := "http://" + strings.TrimSpace(bs.Text()) + "/"
|
||||||
|
if _, err := getNoBody(url); err != nil {
|
||||||
|
b.Fatalf("initial probe of child process failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Instruct server process to stop.
|
// Instruct server process to stop.
|
||||||
|
b.Cleanup(func() {
|
||||||
getNoBody(url + "?stop=yes")
|
getNoBody(url + "?stop=yes")
|
||||||
if err := <-done; err != nil {
|
if err := <-done; err != nil {
|
||||||
b.Fatalf("subprocess failed: %v", err)
|
b.Fatalf("subprocess failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cancel()
|
||||||
|
<-done
|
||||||
|
|
||||||
|
afterTest(b)
|
||||||
|
})
|
||||||
|
|
||||||
|
return url
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkClientGzip(b *testing.B) {
|
||||||
|
const responseSize = 1024 * 1024
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
gz := gzip.NewWriter(&buf)
|
||||||
|
if _, err := io.CopyN(gz, crand.Reader, responseSize); err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
gz.Close()
|
||||||
|
|
||||||
|
data := buf.Bytes()
|
||||||
|
|
||||||
|
url := startClientBenchmarkServer(b, HandlerFunc(func(w ResponseWriter, _ *Request) {
|
||||||
|
w.Header().Set("Content-Encoding", "gzip")
|
||||||
|
w.Write(data)
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Do b.N requests to the server.
|
||||||
|
b.StartTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
res, err := Get(url)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("Get: %v", err)
|
||||||
|
}
|
||||||
|
n, err := io.Copy(io.Discard, res.Body)
|
||||||
|
res.Body.Close()
|
||||||
|
if err != nil {
|
||||||
|
b.Fatalf("ReadAll: %v", err)
|
||||||
|
}
|
||||||
|
if n != responseSize {
|
||||||
|
b.Fatalf("ReadAll: expected %d bytes, got %d", responseSize, n)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
b.StopTimer()
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) {
|
func BenchmarkServerFakeConnNoKeepAlive(b *testing.B) {
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ package http
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"compress/flate"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"container/list"
|
"container/list"
|
||||||
"context"
|
"context"
|
||||||
|
|
@ -2988,6 +2989,7 @@ type bodyEOFSignal struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
var errReadOnClosedResBody = errors.New("http: read on closed response body")
|
var errReadOnClosedResBody = errors.New("http: read on closed response body")
|
||||||
|
var errConcurrentReadOnResBody = errors.New("http: concurrent read on response body")
|
||||||
|
|
||||||
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
|
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) {
|
||||||
es.mu.Lock()
|
es.mu.Lock()
|
||||||
|
|
@ -3037,37 +3039,98 @@ func (es *bodyEOFSignal) condfn(err error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// gzipReader wraps a response body so it can lazily
|
// gzipReader wraps a response body so it can lazily
|
||||||
// call gzip.NewReader on the first call to Read
|
// get gzip.Reader from the pool on the first call to Read.
|
||||||
|
// After Close is called it puts gzip.Reader to the pool immediately
|
||||||
|
// if there is no Read in progress or later when Read completes.
|
||||||
type gzipReader struct {
|
type gzipReader struct {
|
||||||
_ incomparable
|
_ incomparable
|
||||||
body *bodyEOFSignal // underlying HTTP/1 response body framing
|
body *bodyEOFSignal // underlying HTTP/1 response body framing
|
||||||
zr *gzip.Reader // lazily-initialized gzip reader
|
mu sync.Mutex // guards zr and zerr
|
||||||
zerr error // any error from gzip.NewReader; sticky
|
zr *gzip.Reader
|
||||||
|
zerr error
|
||||||
|
}
|
||||||
|
|
||||||
|
type eofReader struct{}
|
||||||
|
|
||||||
|
func (eofReader) Read([]byte) (int, error) { return 0, io.EOF }
|
||||||
|
func (eofReader) ReadByte() (byte, error) { return 0, io.EOF }
|
||||||
|
|
||||||
|
var gzipPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
|
||||||
|
|
||||||
|
// gzipPoolGet gets a gzip.Reader from the pool and resets it to read from r.
|
||||||
|
func gzipPoolGet(r io.Reader) (*gzip.Reader, error) {
|
||||||
|
zr := gzipPool.Get().(*gzip.Reader)
|
||||||
|
if err := zr.Reset(r); err != nil {
|
||||||
|
gzipPoolPut(zr)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return zr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// gzipPoolPut puts a gzip.Reader back into the pool.
|
||||||
|
func gzipPoolPut(zr *gzip.Reader) {
|
||||||
|
// Reset will allocate bufio.Reader if we pass it anything
|
||||||
|
// other than a flate.Reader, so ensure that it's getting one.
|
||||||
|
var r flate.Reader = eofReader{}
|
||||||
|
zr.Reset(r)
|
||||||
|
gzipPool.Put(zr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// acquire returns a gzip.Reader for reading response body.
|
||||||
|
// The reader must be released after use.
|
||||||
|
func (gz *gzipReader) acquire() (*gzip.Reader, error) {
|
||||||
|
gz.mu.Lock()
|
||||||
|
defer gz.mu.Unlock()
|
||||||
|
if gz.zerr != nil {
|
||||||
|
return nil, gz.zerr
|
||||||
|
}
|
||||||
|
if gz.zr == nil {
|
||||||
|
gz.zr, gz.zerr = gzipPoolGet(gz.body)
|
||||||
|
if gz.zerr != nil {
|
||||||
|
return nil, gz.zerr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret := gz.zr
|
||||||
|
gz.zr, gz.zerr = nil, errConcurrentReadOnResBody
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// release returns the gzip.Reader to the pool if Close was called during Read.
|
||||||
|
func (gz *gzipReader) release(zr *gzip.Reader) {
|
||||||
|
gz.mu.Lock()
|
||||||
|
defer gz.mu.Unlock()
|
||||||
|
if gz.zerr == errConcurrentReadOnResBody {
|
||||||
|
gz.zr, gz.zerr = zr, nil
|
||||||
|
} else { // errReadOnClosedResBody
|
||||||
|
gzipPoolPut(zr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// close returns the gzip.Reader to the pool immediately or
|
||||||
|
// signals release to do so after Read completes.
|
||||||
|
func (gz *gzipReader) close() {
|
||||||
|
gz.mu.Lock()
|
||||||
|
defer gz.mu.Unlock()
|
||||||
|
if gz.zerr == nil && gz.zr != nil {
|
||||||
|
gzipPoolPut(gz.zr)
|
||||||
|
gz.zr = nil
|
||||||
|
}
|
||||||
|
gz.zerr = errReadOnClosedResBody
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gz *gzipReader) Read(p []byte) (n int, err error) {
|
func (gz *gzipReader) Read(p []byte) (n int, err error) {
|
||||||
if gz.zr == nil {
|
zr, err := gz.acquire()
|
||||||
if gz.zerr == nil {
|
|
||||||
gz.zr, gz.zerr = gzip.NewReader(gz.body)
|
|
||||||
}
|
|
||||||
if gz.zerr != nil {
|
|
||||||
return 0, gz.zerr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
gz.body.mu.Lock()
|
|
||||||
if gz.body.closed {
|
|
||||||
err = errReadOnClosedResBody
|
|
||||||
}
|
|
||||||
gz.body.mu.Unlock()
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return gz.zr.Read(p)
|
defer gz.release(zr)
|
||||||
|
|
||||||
|
return zr.Read(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gz *gzipReader) Close() error {
|
func (gz *gzipReader) Close() error {
|
||||||
|
gz.close()
|
||||||
|
|
||||||
return gz.body.Close()
|
return gz.body.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue