mirror of
https://github.com/golang/go.git
synced 2025-12-08 06:10:04 +00:00
net/http/httputil: rewrite flushing code, disable on Server-Sent Events
* Rewrite the flushing code to not use a persistent goroutine, which also simplifies testing. * Define the meaning of a negative flush interval. Its meaning doesn't change, but now it's locked in, and then we can use it to optimize the performance of the non-buffered case to avoid use of an AfterFunc. * Support (internal-only) special casing of FlushInterval values per request/response. * For now, treat Server-Sent Event responses as unbuffered. (or rather, immediately flushed from the buffer per-write) Fixes #27816 Change-Id: Ie0f975c997daa3db539504137c741a96d7022665 Reviewed-on: https://go-review.googlesource.com/c/137335 Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
This commit is contained in:
parent
ffc7bc55f3
commit
5440bfc2ea
2 changed files with 102 additions and 43 deletions
|
|
@ -18,10 +18,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// onExitFlushLoop is a callback set by tests to detect the state of the
|
|
||||||
// flushLoop() goroutine.
|
|
||||||
var onExitFlushLoop func()
|
|
||||||
|
|
||||||
// ReverseProxy is an HTTP Handler that takes an incoming request and
|
// ReverseProxy is an HTTP Handler that takes an incoming request and
|
||||||
// sends it to another server, proxying the response back to the
|
// sends it to another server, proxying the response back to the
|
||||||
// client.
|
// client.
|
||||||
|
|
@ -42,6 +38,12 @@ type ReverseProxy struct {
|
||||||
// to flush to the client while copying the
|
// to flush to the client while copying the
|
||||||
// response body.
|
// response body.
|
||||||
// If zero, no periodic flushing is done.
|
// If zero, no periodic flushing is done.
|
||||||
|
// A negative value means to flush immediately
|
||||||
|
// after each write to the client.
|
||||||
|
// The FlushInterval is ignored when ReverseProxy
|
||||||
|
// recognizes a response as a streaming response;
|
||||||
|
// for such reponses, writes are flushed to the client
|
||||||
|
// immediately.
|
||||||
FlushInterval time.Duration
|
FlushInterval time.Duration
|
||||||
|
|
||||||
// ErrorLog specifies an optional logger for errors
|
// ErrorLog specifies an optional logger for errors
|
||||||
|
|
@ -271,7 +273,7 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||||
fl.Flush()
|
fl.Flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = p.copyResponse(rw, res.Body)
|
err = p.copyResponse(rw, res.Body, p.flushInterval(req, res))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
// Since we're streaming the response, if we run into an error all we can do
|
// Since we're streaming the response, if we run into an error all we can do
|
||||||
|
|
@ -332,15 +334,28 @@ func removeConnectionHeaders(h http.Header) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader) error {
|
// flushInterval returns the p.FlushInterval value, conditionally
|
||||||
if p.FlushInterval != 0 {
|
// overriding its value for a specific request/response.
|
||||||
|
func (p *ReverseProxy) flushInterval(req *http.Request, res *http.Response) time.Duration {
|
||||||
|
resCT := res.Header.Get("Content-Type")
|
||||||
|
|
||||||
|
// For Server-Sent Events responses, flush immediately.
|
||||||
|
// The MIME type is defined in https://www.w3.org/TR/eventsource/#text-event-stream
|
||||||
|
if resCT == "text/event-stream" {
|
||||||
|
return -1 // negative means immediately
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: more specific cases? e.g. res.ContentLength == -1?
|
||||||
|
return p.FlushInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *ReverseProxy) copyResponse(dst io.Writer, src io.Reader, flushInterval time.Duration) error {
|
||||||
|
if flushInterval != 0 {
|
||||||
if wf, ok := dst.(writeFlusher); ok {
|
if wf, ok := dst.(writeFlusher); ok {
|
||||||
mlw := &maxLatencyWriter{
|
mlw := &maxLatencyWriter{
|
||||||
dst: wf,
|
dst: wf,
|
||||||
latency: p.FlushInterval,
|
latency: flushInterval,
|
||||||
done: make(chan bool),
|
|
||||||
}
|
}
|
||||||
go mlw.flushLoop()
|
|
||||||
defer mlw.stop()
|
defer mlw.stop()
|
||||||
dst = mlw
|
dst = mlw
|
||||||
}
|
}
|
||||||
|
|
@ -403,34 +418,44 @@ type writeFlusher interface {
|
||||||
|
|
||||||
type maxLatencyWriter struct {
|
type maxLatencyWriter struct {
|
||||||
dst writeFlusher
|
dst writeFlusher
|
||||||
latency time.Duration
|
latency time.Duration // non-zero; negative means to flush immediately
|
||||||
|
|
||||||
mu sync.Mutex // protects Write + Flush
|
mu sync.Mutex // protects t, flushPending, and dst.Flush
|
||||||
done chan bool
|
t *time.Timer
|
||||||
|
flushPending bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *maxLatencyWriter) Write(p []byte) (int, error) {
|
func (m *maxLatencyWriter) Write(p []byte) (n int, err error) {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
return m.dst.Write(p)
|
n, err = m.dst.Write(p)
|
||||||
|
if m.latency < 0 {
|
||||||
|
m.dst.Flush()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if m.flushPending {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if m.t == nil {
|
||||||
|
m.t = time.AfterFunc(m.latency, m.delayedFlush)
|
||||||
|
} else {
|
||||||
|
m.t.Reset(m.latency)
|
||||||
|
}
|
||||||
|
m.flushPending = true
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *maxLatencyWriter) flushLoop() {
|
func (m *maxLatencyWriter) delayedFlush() {
|
||||||
t := time.NewTicker(m.latency)
|
m.mu.Lock()
|
||||||
defer t.Stop()
|
defer m.mu.Unlock()
|
||||||
for {
|
m.dst.Flush()
|
||||||
select {
|
m.flushPending = false
|
||||||
case <-m.done:
|
}
|
||||||
if onExitFlushLoop != nil {
|
|
||||||
onExitFlushLoop()
|
func (m *maxLatencyWriter) stop() {
|
||||||
}
|
m.mu.Lock()
|
||||||
return
|
defer m.mu.Unlock()
|
||||||
case <-t.C:
|
if m.t != nil {
|
||||||
m.mu.Lock()
|
m.t.Stop()
|
||||||
m.dst.Flush()
|
|
||||||
m.mu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *maxLatencyWriter) stop() { m.done <- true }
|
|
||||||
|
|
|
||||||
|
|
@ -297,10 +297,6 @@ func TestReverseProxyFlushInterval(t *testing.T) {
|
||||||
proxyHandler := NewSingleHostReverseProxy(backendURL)
|
proxyHandler := NewSingleHostReverseProxy(backendURL)
|
||||||
proxyHandler.FlushInterval = time.Microsecond
|
proxyHandler.FlushInterval = time.Microsecond
|
||||||
|
|
||||||
done := make(chan bool)
|
|
||||||
onExitFlushLoop = func() { done <- true }
|
|
||||||
defer func() { onExitFlushLoop = nil }()
|
|
||||||
|
|
||||||
frontend := httptest.NewServer(proxyHandler)
|
frontend := httptest.NewServer(proxyHandler)
|
||||||
defer frontend.Close()
|
defer frontend.Close()
|
||||||
|
|
||||||
|
|
@ -314,13 +310,6 @@ func TestReverseProxyFlushInterval(t *testing.T) {
|
||||||
if bodyBytes, _ := ioutil.ReadAll(res.Body); string(bodyBytes) != expected {
|
if bodyBytes, _ := ioutil.ReadAll(res.Body); string(bodyBytes) != expected {
|
||||||
t.Errorf("got body %q; expected %q", bodyBytes, expected)
|
t.Errorf("got body %q; expected %q", bodyBytes, expected)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
// OK
|
|
||||||
case <-time.After(5 * time.Second):
|
|
||||||
t.Error("maxLatencyWriter flushLoop() never exited")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReverseProxyCancelation(t *testing.T) {
|
func TestReverseProxyCancelation(t *testing.T) {
|
||||||
|
|
@ -946,3 +935,48 @@ func TestReverseProxy_PanicBodyError(t *testing.T) {
|
||||||
req, _ := http.NewRequest("GET", "http://foo.tld/", nil)
|
req, _ := http.NewRequest("GET", "http://foo.tld/", nil)
|
||||||
rproxy.ServeHTTP(httptest.NewRecorder(), req)
|
rproxy.ServeHTTP(httptest.NewRecorder(), req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSelectFlushInterval(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
p *ReverseProxy
|
||||||
|
req *http.Request
|
||||||
|
res *http.Response
|
||||||
|
want time.Duration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "default",
|
||||||
|
res: &http.Response{},
|
||||||
|
p: &ReverseProxy{FlushInterval: 123},
|
||||||
|
want: 123,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "server-sent events overrides non-zero",
|
||||||
|
res: &http.Response{
|
||||||
|
Header: http.Header{
|
||||||
|
"Content-Type": {"text/event-stream"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
p: &ReverseProxy{FlushInterval: 123},
|
||||||
|
want: -1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "server-sent events overrides zero",
|
||||||
|
res: &http.Response{
|
||||||
|
Header: http.Header{
|
||||||
|
"Content-Type": {"text/event-stream"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
p: &ReverseProxy{FlushInterval: 0},
|
||||||
|
want: -1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got := tt.p.flushInterval(tt.req, tt.res)
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf("flushLatency = %v; want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue