mirror of
https://github.com/golang/go.git
synced 2025-12-08 06:10:04 +00:00
net/http: add Transport.NewClientConn
For #75772 Change-Id: Iad7607b40636bab1faf8653455e92e9700309003 Reviewed-on: https://go-review.googlesource.com/c/go/+/722223 Reviewed-by: Nicholas Husin <nsh@golang.org> Reviewed-by: Nicholas Husin <husin@google.com> Auto-Submit: Damien Neil <dneil@google.com> LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
This commit is contained in:
parent
6465818435
commit
0921e1db83
6 changed files with 949 additions and 25 deletions
10
api/next/75772.txt
Normal file
10
api/next/75772.txt
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
pkg net/http, method (*ClientConn) Available() int #75772
|
||||||
|
pkg net/http, method (*ClientConn) Close() error #75772
|
||||||
|
pkg net/http, method (*ClientConn) Err() error #75772
|
||||||
|
pkg net/http, method (*ClientConn) InFlight() int #75772
|
||||||
|
pkg net/http, method (*ClientConn) Release() #75772
|
||||||
|
pkg net/http, method (*ClientConn) Reserve() error #75772
|
||||||
|
pkg net/http, method (*ClientConn) RoundTrip(*Request) (*Response, error) #75772
|
||||||
|
pkg net/http, method (*ClientConn) SetStateHook(func(*ClientConn)) #75772
|
||||||
|
pkg net/http, method (*Transport) NewClientConn(context.Context, string, string) (*ClientConn, error) #75772
|
||||||
|
pkg net/http, type ClientConn struct #75772
|
||||||
5
doc/next/6-stdlib/99-minor/net/http/75772.md
Normal file
5
doc/next/6-stdlib/99-minor/net/http/75772.md
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
The new [Transport.NewClientConn] method returns a client connection
|
||||||
|
to an HTTP server.
|
||||||
|
Most users should continue to use [Transport.RoundTrip] to make requests,
|
||||||
|
which manages a pool of connection.
|
||||||
|
`NewClientConn` is useful for users who need to implement their own conection management.
|
||||||
456
src/net/http/clientconn.go
Normal file
456
src/net/http/clientconn.go
Normal file
|
|
@ -0,0 +1,456 @@
|
||||||
|
// Copyright 2025 The Go Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"net/http/httptrace"
|
||||||
|
"net/url"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A ClientConn is a client connection to an HTTP server.
|
||||||
|
//
|
||||||
|
// Unlike a [Transport], a ClientConn represents a single connection.
|
||||||
|
// Most users should use a Transport rather than creating client connections directly.
|
||||||
|
type ClientConn struct {
|
||||||
|
cc genericClientConn
|
||||||
|
|
||||||
|
stateHookMu sync.Mutex
|
||||||
|
userStateHook func(*ClientConn)
|
||||||
|
stateHookRunning bool
|
||||||
|
lastAvailable int
|
||||||
|
lastInFlight int
|
||||||
|
lastClosed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// newClientConner is the interface implemented by HTTP/2 transports to create new client conns.
|
||||||
|
//
|
||||||
|
// The http package (this package) needs a way to ask the http2 package to
|
||||||
|
// create a client connection.
|
||||||
|
//
|
||||||
|
// Transport.TLSNextProto["h2"] contains a function which appears to do this,
|
||||||
|
// but for historical reasons it does not: The TLSNextProto function adds a
|
||||||
|
// *tls.Conn to the http2.Transport's connection pool and returns a RoundTripper
|
||||||
|
// which is backed by that connection pool. NewClientConn needs a way to get a
|
||||||
|
// single client connection out of the http2 package.
|
||||||
|
//
|
||||||
|
// The http2 package registers a RoundTripper with Transport.RegisterProtocol.
|
||||||
|
// If this RoundTripper implements newClientConner, then Transport.NewClientConn will use
|
||||||
|
// it to create new HTTP/2 client connections.
|
||||||
|
type newClientConner interface {
|
||||||
|
// NewClientConn creates a new client connection from a net.Conn.
|
||||||
|
//
|
||||||
|
// The RoundTripper returned by NewClientConn must implement genericClientConn.
|
||||||
|
// (We don't define NewClientConn as returning genericClientConn,
|
||||||
|
// because either we'd need to make genericClientConn an exported type
|
||||||
|
// or define it as a type alias. Neither is particularly appealing.)
|
||||||
|
//
|
||||||
|
// The state hook passed here is the internal state hook
|
||||||
|
// (ClientConn.maybeRunStateHook). The internal state hook calls
|
||||||
|
// the user state hook (if any), which is set by the user with
|
||||||
|
// ClientConn.SetStateHook.
|
||||||
|
//
|
||||||
|
// The client connection should arrange to call the internal state hook
|
||||||
|
// when the connection closes, when requests complete, and when the
|
||||||
|
// connection concurrency limit changes.
|
||||||
|
//
|
||||||
|
// The client connection must call the internal state hook when the connection state
|
||||||
|
// changes asynchronously, such as when a request completes.
|
||||||
|
//
|
||||||
|
// The internal state hook need not be called after synchronous changes to the state:
|
||||||
|
// Close, Reserve, Release, and RoundTrip calls which don't start a request
|
||||||
|
// do not need to call the hook.
|
||||||
|
//
|
||||||
|
// The general idea is that if we call (for example) Close,
|
||||||
|
// we know that the connection state has probably changed and we
|
||||||
|
// don't need the state hook to tell us that.
|
||||||
|
// However, if the connection closes asynchronously
|
||||||
|
// (because, for example, the other end of the conn closed it),
|
||||||
|
// the state hook needs to inform us.
|
||||||
|
NewClientConn(nc net.Conn, internalStateHook func()) (RoundTripper, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// genericClientConn is an interface implemented by HTTP/2 client conns
|
||||||
|
// returned from newClientConner.NewClientConn.
|
||||||
|
//
|
||||||
|
// See the newClientConner doc comment for more information.
|
||||||
|
type genericClientConn interface {
|
||||||
|
Close() error
|
||||||
|
Err() error
|
||||||
|
RoundTrip(req *Request) (*Response, error)
|
||||||
|
Reserve() error
|
||||||
|
Release()
|
||||||
|
Available() int
|
||||||
|
InFlight() int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientConn creates a new client connection to the given address.
|
||||||
|
//
|
||||||
|
// If scheme is "http", the connection is unencrypted.
|
||||||
|
// If scheme is "https", the connection uses TLS.
|
||||||
|
//
|
||||||
|
// The protocol used for the new connection is determined by the scheme,
|
||||||
|
// Transport.Protocols configuration field, and protocols supported by the
|
||||||
|
// server. See Transport.Protocols for more details.
|
||||||
|
//
|
||||||
|
// If Transport.Proxy is set and indicates that a request sent to the given
|
||||||
|
// address should use a proxy, the new connection uses that proxy.
|
||||||
|
//
|
||||||
|
// NewClientConn always creates a new connection,
|
||||||
|
// even if the Transport has an existing cached connection to the given host.
|
||||||
|
//
|
||||||
|
// The new connection is not added to the Transport's connection cache,
|
||||||
|
// and will not be used by [Transport.RoundTrip].
|
||||||
|
// It does not count against the MaxIdleConns and MaxConnsPerHost limits.
|
||||||
|
//
|
||||||
|
// The caller is responsible for closing the new connection.
|
||||||
|
func (t *Transport) NewClientConn(ctx context.Context, scheme, address string) (*ClientConn, error) {
|
||||||
|
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
|
||||||
|
|
||||||
|
switch scheme {
|
||||||
|
case "http", "https":
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("net/http: invalid scheme %q", scheme)
|
||||||
|
}
|
||||||
|
|
||||||
|
host, port, err := net.SplitHostPort(address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if port == "" {
|
||||||
|
port = schemePort(scheme)
|
||||||
|
}
|
||||||
|
|
||||||
|
var proxyURL *url.URL
|
||||||
|
if t.Proxy != nil {
|
||||||
|
// Transport.Proxy takes a *Request, so create a fake one to pass it.
|
||||||
|
req := &Request{
|
||||||
|
ctx: ctx,
|
||||||
|
Method: "GET",
|
||||||
|
URL: &url.URL{
|
||||||
|
Scheme: scheme,
|
||||||
|
Host: host,
|
||||||
|
Path: "/",
|
||||||
|
},
|
||||||
|
Proto: "HTTP/1.1",
|
||||||
|
ProtoMajor: 1,
|
||||||
|
ProtoMinor: 1,
|
||||||
|
Header: make(Header),
|
||||||
|
Body: NoBody,
|
||||||
|
Host: host,
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
proxyURL, err = t.Proxy(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cm := connectMethod{
|
||||||
|
targetScheme: scheme,
|
||||||
|
targetAddr: net.JoinHostPort(host, port),
|
||||||
|
proxyURL: proxyURL,
|
||||||
|
}
|
||||||
|
|
||||||
|
// The state hook is a bit tricky:
|
||||||
|
// The persistConn has a state hook which calls ClientConn.maybeRunStateHook,
|
||||||
|
// which in turn calls the user-provided state hook (if any).
|
||||||
|
//
|
||||||
|
// ClientConn.maybeRunStateHook handles debouncing hook calls for both
|
||||||
|
// HTTP/1 and HTTP/2.
|
||||||
|
//
|
||||||
|
// Since there's no need to change the persistConn's hook, we set it at creation time.
|
||||||
|
cc := &ClientConn{}
|
||||||
|
const isClientConn = true
|
||||||
|
pconn, err := t.dialConn(ctx, cm, isClientConn, cc.maybeRunStateHook)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Note that cc.maybeRunStateHook may have been called
|
||||||
|
// in the short window between dialConn and now.
|
||||||
|
// This is fine.
|
||||||
|
cc.stateHookMu.Lock()
|
||||||
|
defer cc.stateHookMu.Unlock()
|
||||||
|
if pconn.alt != nil {
|
||||||
|
// If pconn.alt is set, this is a connection implemented in another package
|
||||||
|
// (probably x/net/http2) or the bundled copy in h2_bundle.go.
|
||||||
|
gc, ok := pconn.alt.(genericClientConn)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("http: NewClientConn returned something that is not a ClientConn")
|
||||||
|
}
|
||||||
|
cc.cc = gc
|
||||||
|
cc.lastAvailable = gc.Available()
|
||||||
|
} else {
|
||||||
|
// This is an HTTP/1 connection.
|
||||||
|
pconn.availch = make(chan struct{}, 1)
|
||||||
|
pconn.availch <- struct{}{}
|
||||||
|
cc.cc = http1ClientConn{pconn}
|
||||||
|
cc.lastAvailable = 1
|
||||||
|
}
|
||||||
|
return cc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the connection.
|
||||||
|
// Outstanding RoundTrip calls are interrupted.
|
||||||
|
func (cc *ClientConn) Close() error {
|
||||||
|
defer cc.maybeRunStateHook()
|
||||||
|
return cc.cc.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Err reports any fatal connection errors.
|
||||||
|
// It returns nil if the connection is usable.
|
||||||
|
// If it returns non-nil, the connection can no longer be used.
|
||||||
|
func (cc *ClientConn) Err() error {
|
||||||
|
return cc.cc.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateClientConnRequest(req *Request) error {
|
||||||
|
if req.URL == nil {
|
||||||
|
return errors.New("http: nil Request.URL")
|
||||||
|
}
|
||||||
|
if req.Header == nil {
|
||||||
|
return errors.New("http: nil Request.Header")
|
||||||
|
}
|
||||||
|
// Validate the outgoing headers.
|
||||||
|
if err := validateHeaders(req.Header); err != "" {
|
||||||
|
return fmt.Errorf("http: invalid header %s", err)
|
||||||
|
}
|
||||||
|
// Validate the outgoing trailers too.
|
||||||
|
if err := validateHeaders(req.Trailer); err != "" {
|
||||||
|
return fmt.Errorf("http: invalid trailer %s", err)
|
||||||
|
}
|
||||||
|
if req.Method != "" && !validMethod(req.Method) {
|
||||||
|
return fmt.Errorf("http: invalid method %q", req.Method)
|
||||||
|
}
|
||||||
|
if req.URL.Host == "" {
|
||||||
|
return errors.New("http: no Host in request URL")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RoundTrip implements the [RoundTripper] interface.
|
||||||
|
//
|
||||||
|
// The request is sent on the client connection,
|
||||||
|
// regardless of the URL being requested or any proxy settings.
|
||||||
|
//
|
||||||
|
// If the connection is at its concurrency limit,
|
||||||
|
// RoundTrip waits for the connection to become available
|
||||||
|
// before sending the request.
|
||||||
|
func (cc *ClientConn) RoundTrip(req *Request) (*Response, error) {
|
||||||
|
defer cc.maybeRunStateHook()
|
||||||
|
if err := validateClientConnRequest(req); err != nil {
|
||||||
|
cc.Release()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return cc.cc.RoundTrip(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Available reports the number of requests that may be sent
|
||||||
|
// to the connection without blocking.
|
||||||
|
// It returns 0 if the connection is closed.
|
||||||
|
func (cc *ClientConn) Available() int {
|
||||||
|
return cc.cc.Available()
|
||||||
|
}
|
||||||
|
|
||||||
|
// InFlight reports the number of requests in flight,
|
||||||
|
// including reserved requests.
|
||||||
|
// It returns 0 if the connection is closed.
|
||||||
|
func (cc *ClientConn) InFlight() int {
|
||||||
|
return cc.cc.InFlight()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reserve reserves a concurrency slot on the connection.
|
||||||
|
// If Reserve returns nil, one additional RoundTrip call may be made
|
||||||
|
// without waiting for an existing request to complete.
|
||||||
|
//
|
||||||
|
// The reserved concurrency slot is accounted as an in-flight request.
|
||||||
|
// A successful call to RoundTrip will decrement the Available count
|
||||||
|
// and increment the InFlight count.
|
||||||
|
//
|
||||||
|
// Each successful call to Reserve should be followed by exactly one call
|
||||||
|
// to RoundTrip or Release, which will consume or release the reservation.
|
||||||
|
//
|
||||||
|
// If the connection is closed or at its concurrency limit,
|
||||||
|
// Reserve returns an error.
|
||||||
|
func (cc *ClientConn) Reserve() error {
|
||||||
|
defer cc.maybeRunStateHook()
|
||||||
|
return cc.cc.Reserve()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Release releases an unused concurrency slot reserved by Reserve.
|
||||||
|
// If there are no reserved concurrency slots, it has no effect.
|
||||||
|
func (cc *ClientConn) Release() {
|
||||||
|
defer cc.maybeRunStateHook()
|
||||||
|
cc.cc.Release()
|
||||||
|
}
|
||||||
|
|
||||||
|
// shouldRunStateHook returns the user's state hook if we should call it,
|
||||||
|
// or nil if we don't need to call it at this time.
|
||||||
|
func (cc *ClientConn) shouldRunStateHook(stopRunning bool) func(*ClientConn) {
|
||||||
|
cc.stateHookMu.Lock()
|
||||||
|
defer cc.stateHookMu.Unlock()
|
||||||
|
if cc.cc == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if stopRunning {
|
||||||
|
cc.stateHookRunning = false
|
||||||
|
}
|
||||||
|
if cc.userStateHook == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if cc.stateHookRunning {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
available = cc.Available()
|
||||||
|
inFlight = cc.InFlight()
|
||||||
|
closed = cc.Err() != nil
|
||||||
|
)
|
||||||
|
var hook func(*ClientConn)
|
||||||
|
if available > cc.lastAvailable || inFlight < cc.lastInFlight || closed != cc.lastClosed {
|
||||||
|
hook = cc.userStateHook
|
||||||
|
cc.stateHookRunning = true
|
||||||
|
}
|
||||||
|
cc.lastAvailable = available
|
||||||
|
cc.lastInFlight = inFlight
|
||||||
|
cc.lastClosed = closed
|
||||||
|
return hook
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cc *ClientConn) maybeRunStateHook() {
|
||||||
|
hook := cc.shouldRunStateHook(false)
|
||||||
|
if hook == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Run the hook synchronously.
|
||||||
|
//
|
||||||
|
// This means that if, for example, the user calls resp.Body.Close to finish a request,
|
||||||
|
// the Close call will synchronously run the hook, giving the hook the chance to
|
||||||
|
// return the ClientConn to a connection pool before the next request is made.
|
||||||
|
hook(cc)
|
||||||
|
// The connection state may have changed while the hook was running,
|
||||||
|
// in which case we need to run it again.
|
||||||
|
//
|
||||||
|
// If we do need to run the hook again, do so in a new goroutine to avoid blocking
|
||||||
|
// the current goroutine indefinitely.
|
||||||
|
hook = cc.shouldRunStateHook(true)
|
||||||
|
if hook != nil {
|
||||||
|
go func() {
|
||||||
|
for hook != nil {
|
||||||
|
hook(cc)
|
||||||
|
hook = cc.shouldRunStateHook(true)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetStateHook arranges for f to be called when the state of the connection changes.
|
||||||
|
// At most one call to f is made at a time.
|
||||||
|
// If the connection's state has changed since it was created,
|
||||||
|
// f is called immediately in a separate goroutine.
|
||||||
|
// f may be called synchronously from RoundTrip or Response.Body.Close.
|
||||||
|
//
|
||||||
|
// If SetStateHook is called multiple times, the new hook replaces the old one.
|
||||||
|
// If f is nil, no further calls will be made to f after SetStateHook returns.
|
||||||
|
//
|
||||||
|
// f is called when Available increases (more requests may be sent on the connection),
|
||||||
|
// InFlight decreases (existing requests complete), or Err begins returning non-nil
|
||||||
|
// (the connection is no longer usable).
|
||||||
|
func (cc *ClientConn) SetStateHook(f func(*ClientConn)) {
|
||||||
|
cc.stateHookMu.Lock()
|
||||||
|
cc.userStateHook = f
|
||||||
|
cc.stateHookMu.Unlock()
|
||||||
|
cc.maybeRunStateHook()
|
||||||
|
}
|
||||||
|
|
||||||
|
// http1ClientConn is a genericClientConn implementation backed by
|
||||||
|
// an HTTP/1 *persistConn (pconn.alt is nil).
|
||||||
|
type http1ClientConn struct {
|
||||||
|
pconn *persistConn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cc http1ClientConn) RoundTrip(req *Request) (*Response, error) {
|
||||||
|
ctx := req.Context()
|
||||||
|
trace := httptrace.ContextClientTrace(ctx)
|
||||||
|
|
||||||
|
// Convert Request.Cancel into context cancelation.
|
||||||
|
ctx, cancel := context.WithCancelCause(req.Context())
|
||||||
|
if req.Cancel != nil {
|
||||||
|
go awaitLegacyCancel(ctx, cancel, req)
|
||||||
|
}
|
||||||
|
|
||||||
|
treq := &transportRequest{Request: req, trace: trace, ctx: ctx, cancel: cancel}
|
||||||
|
resp, err := cc.pconn.roundTrip(treq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
resp.Request = req
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cc http1ClientConn) Close() error {
|
||||||
|
cc.pconn.close(errors.New("ClientConn closed"))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cc http1ClientConn) Err() error {
|
||||||
|
select {
|
||||||
|
case <-cc.pconn.closech:
|
||||||
|
return cc.pconn.closed
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cc http1ClientConn) Available() int {
|
||||||
|
cc.pconn.mu.Lock()
|
||||||
|
defer cc.pconn.mu.Unlock()
|
||||||
|
if cc.pconn.closed != nil || cc.pconn.reserved || cc.pconn.inFlight {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cc http1ClientConn) InFlight() int {
|
||||||
|
cc.pconn.mu.Lock()
|
||||||
|
defer cc.pconn.mu.Unlock()
|
||||||
|
if cc.pconn.closed == nil && (cc.pconn.reserved || cc.pconn.inFlight) {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cc http1ClientConn) Reserve() error {
|
||||||
|
cc.pconn.mu.Lock()
|
||||||
|
defer cc.pconn.mu.Unlock()
|
||||||
|
if cc.pconn.closed != nil {
|
||||||
|
return cc.pconn.closed
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-cc.pconn.availch:
|
||||||
|
default:
|
||||||
|
return errors.New("connection is unavailable")
|
||||||
|
}
|
||||||
|
cc.pconn.reserved = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cc http1ClientConn) Release() {
|
||||||
|
cc.pconn.mu.Lock()
|
||||||
|
defer cc.pconn.mu.Unlock()
|
||||||
|
if cc.pconn.reserved {
|
||||||
|
select {
|
||||||
|
case cc.pconn.availch <- struct{}{}:
|
||||||
|
default:
|
||||||
|
panic("cannot release reservation")
|
||||||
|
}
|
||||||
|
cc.pconn.reserved = false
|
||||||
|
}
|
||||||
|
}
|
||||||
374
src/net/http/clientconn_test.go
Normal file
374
src/net/http/clientconn_test.go
Normal file
|
|
@ -0,0 +1,374 @@
|
||||||
|
// Copyright 2025 The Go Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package http_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"testing/synctest"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTransportNewClientConnRoundTrip(t *testing.T) { run(t, testTransportNewClientConnRoundTrip) }
|
||||||
|
func testTransportNewClientConnRoundTrip(t *testing.T, mode testMode) {
|
||||||
|
cst := newClientServerTest(t, mode, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
io.WriteString(w, req.Host)
|
||||||
|
}), optFakeNet)
|
||||||
|
|
||||||
|
scheme := mode.Scheme() // http or https
|
||||||
|
cc, err := cst.tr.NewClientConn(t.Context(), scheme, cst.ts.Listener.Addr().String())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
|
||||||
|
// Send requests for a couple different domains.
|
||||||
|
// All use the same connection.
|
||||||
|
for _, host := range []string{"example.tld", "go.dev"} {
|
||||||
|
req, _ := http.NewRequest("GET", fmt.Sprintf("%v://%v/", scheme, host), nil)
|
||||||
|
resp, err := cc.RoundTrip(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
got, _ := io.ReadAll(resp.Body)
|
||||||
|
if string(got) != host {
|
||||||
|
t.Errorf("got response body %q, want %v", got, host)
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
// CloseIdleConnections does not close connections created by NewClientConn.
|
||||||
|
cst.tr.CloseIdleConnections()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cc.Err(); err != nil {
|
||||||
|
t.Errorf("before close: ClientConn.Err() = %v, want nil", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cc.Close()
|
||||||
|
if err := cc.Err(); err == nil {
|
||||||
|
t.Errorf("after close: ClientConn.Err() = nil, want error")
|
||||||
|
}
|
||||||
|
|
||||||
|
req, _ := http.NewRequest("GET", scheme+"://example.tld/", nil)
|
||||||
|
resp, err := cc.RoundTrip(req)
|
||||||
|
if err == nil {
|
||||||
|
resp.Body.Close()
|
||||||
|
t.Errorf("after close: cc.RoundTrip succeeded, want error")
|
||||||
|
}
|
||||||
|
t.Log(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClientConnTest(t testing.TB, mode testMode, h http.HandlerFunc, opts ...any) (*clientServerTest, *http.ClientConn) {
|
||||||
|
if h == nil {
|
||||||
|
h = func(w http.ResponseWriter, req *http.Request) {}
|
||||||
|
}
|
||||||
|
cst := newClientServerTest(t, mode, h, opts...)
|
||||||
|
cc, err := cst.tr.NewClientConn(t.Context(), mode.Scheme(), cst.ts.Listener.Addr().String())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() {
|
||||||
|
cc.Close()
|
||||||
|
})
|
||||||
|
synctest.Wait()
|
||||||
|
return cst, cc
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestClientConnReserveAll reserves every concurrency slot on a connection.
|
||||||
|
func TestClientConnReserveAll(t *testing.T) { runSynctest(t, testClientConnReserveAll) }
|
||||||
|
func testClientConnReserveAll(t *testing.T, mode testMode) {
|
||||||
|
cst, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
|
||||||
|
s.HTTP2 = &http.HTTP2Config{
|
||||||
|
MaxConcurrentStreams: 3,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
want := 1
|
||||||
|
switch mode {
|
||||||
|
case http2Mode, http2UnencryptedMode:
|
||||||
|
want = cst.ts.Config.HTTP2.MaxConcurrentStreams
|
||||||
|
}
|
||||||
|
available := cc.Available()
|
||||||
|
if available != want {
|
||||||
|
t.Fatalf("cc.Available() = %v, want %v", available, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reserve every available concurrency slot on the connection.
|
||||||
|
for i := range available {
|
||||||
|
if err := cc.Reserve(); err != nil {
|
||||||
|
t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
|
||||||
|
}
|
||||||
|
if got, want := cc.Available(), available-i-1; got != want {
|
||||||
|
t.Fatalf("cc.Available() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := cc.InFlight(), i+1; got != want {
|
||||||
|
t.Fatalf("cc.InFlight() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The next reservation attempt should fail, since every slot is consumed.
|
||||||
|
if err := cc.Reserve(); err == nil {
|
||||||
|
t.Fatalf("cc.Reserve() = nil, want error")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestClientConnReserveParallel starts concurrent goroutines which reserve every
|
||||||
|
// concurrency slot on a connection.
|
||||||
|
func TestClientConnReserveParallel(t *testing.T) { runSynctest(t, testClientConnReserveParallel) }
|
||||||
|
func testClientConnReserveParallel(t *testing.T, mode testMode) {
|
||||||
|
_, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
|
||||||
|
s.HTTP2 = &http.HTTP2Config{
|
||||||
|
MaxConcurrentStreams: 3,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
var (
|
||||||
|
wg sync.WaitGroup
|
||||||
|
mu sync.Mutex
|
||||||
|
success int
|
||||||
|
failure int
|
||||||
|
)
|
||||||
|
available := cc.Available()
|
||||||
|
const extra = 2
|
||||||
|
for range available + extra {
|
||||||
|
wg.Go(func() {
|
||||||
|
err := cc.Reserve()
|
||||||
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
if err == nil {
|
||||||
|
success++
|
||||||
|
} else {
|
||||||
|
failure++
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if got, want := success, available; got != want {
|
||||||
|
t.Errorf("%v successful reservations, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := failure, extra; got != want {
|
||||||
|
t.Errorf("%v failed reservations, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestClientConnReserveRelease repeatedly reserves and releases concurrency slots.
|
||||||
|
func TestClientConnReserveRelease(t *testing.T) { runSynctest(t, testClientConnReserveRelease) }
|
||||||
|
func testClientConnReserveRelease(t *testing.T, mode testMode) {
|
||||||
|
_, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
|
||||||
|
s.HTTP2 = &http.HTTP2Config{
|
||||||
|
MaxConcurrentStreams: 3,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
available := cc.Available()
|
||||||
|
for i := range 2 * available {
|
||||||
|
if err := cc.Reserve(); err != nil {
|
||||||
|
t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
|
||||||
|
}
|
||||||
|
cc.Release()
|
||||||
|
}
|
||||||
|
|
||||||
|
if got, want := cc.Available(), available; got != want {
|
||||||
|
t.Fatalf("cc.Available() = %v, want %v", available, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestClientConnReserveAndConsume reserves a concurrency slot on a connection,
|
||||||
|
// and then verifies that various events consume the reservation.
|
||||||
|
func TestClientConnReserveAndConsume(t *testing.T) {
|
||||||
|
for _, test := range []struct {
|
||||||
|
name string
|
||||||
|
consume func(t *testing.T, cc *http.ClientConn, mode testMode)
|
||||||
|
handler func(w http.ResponseWriter, req *http.Request, donec chan struct{})
|
||||||
|
h1Closed bool
|
||||||
|
}{{
|
||||||
|
// Explicit release.
|
||||||
|
name: "release",
|
||||||
|
consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
|
||||||
|
cc.Release()
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
// Invalid request sent to RoundTrip.
|
||||||
|
name: "invalid field name",
|
||||||
|
consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
|
||||||
|
req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
|
||||||
|
req.Header["invalid field name"] = []string{"x"}
|
||||||
|
_, err := cc.RoundTrip(req)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("RoundTrip succeeded, want failure")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
// Successful request/response cycle.
|
||||||
|
name: "body close",
|
||||||
|
consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
|
||||||
|
req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
|
||||||
|
resp, err := cc.RoundTrip(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("RoundTrip: %v", err)
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
},
|
||||||
|
}, {
|
||||||
|
// Request context canceled before headers received.
|
||||||
|
name: "cancel",
|
||||||
|
consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
|
||||||
|
ctx, cancel := context.WithCancel(t.Context())
|
||||||
|
go func() {
|
||||||
|
req, _ := http.NewRequestWithContext(ctx, "GET", mode.Scheme()+"://example.tld/", nil)
|
||||||
|
_, err := cc.RoundTrip(req)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("RoundTrip succeeded, want failure")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
synctest.Wait()
|
||||||
|
cancel()
|
||||||
|
},
|
||||||
|
handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
|
||||||
|
<-donec
|
||||||
|
},
|
||||||
|
// An HTTP/1 connection is closed after a request is canceled on it.
|
||||||
|
h1Closed: true,
|
||||||
|
}, {
|
||||||
|
// Response body closed before full response received.
|
||||||
|
name: "early body close",
|
||||||
|
consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
|
||||||
|
req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
|
||||||
|
resp, err := cc.RoundTrip(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("RoundTrip: %v", err)
|
||||||
|
}
|
||||||
|
t.Logf("%T", resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
},
|
||||||
|
handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
|
||||||
|
w.WriteHeader(200)
|
||||||
|
http.NewResponseController(w).Flush()
|
||||||
|
<-donec
|
||||||
|
},
|
||||||
|
// An HTTP/1 connection is closed after a request is canceled on it.
|
||||||
|
h1Closed: true,
|
||||||
|
}} {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
runSynctest(t, func(t *testing.T, mode testMode) {
|
||||||
|
donec := make(chan struct{})
|
||||||
|
defer close(donec)
|
||||||
|
handler := func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
if test.handler != nil {
|
||||||
|
test.handler(w, req, donec)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, cc := newClientConnTest(t, mode, handler, optFakeNet)
|
||||||
|
stateHookCalls := 0
|
||||||
|
cc.SetStateHook(func(cc *http.ClientConn) {
|
||||||
|
stateHookCalls++
|
||||||
|
})
|
||||||
|
synctest.Wait()
|
||||||
|
stateHookCalls = 0 // ignore any initial update call
|
||||||
|
|
||||||
|
avail := cc.Available()
|
||||||
|
if err := cc.Reserve(); err != nil {
|
||||||
|
t.Fatalf("cc.Reserve() = %v, want nil", err)
|
||||||
|
}
|
||||||
|
synctest.Wait()
|
||||||
|
if got, want := stateHookCalls, 0; got != want {
|
||||||
|
t.Errorf("connection state hook calls: %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
test.consume(t, cc, mode)
|
||||||
|
synctest.Wait()
|
||||||
|
|
||||||
|
// State hook should be called, either to report the
|
||||||
|
// connection availability increasing or the connection closing.
|
||||||
|
if got, want := stateHookCalls, 1; got != want {
|
||||||
|
t.Errorf("connection state hook calls: %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
if test.h1Closed && (mode == http1Mode || mode == https1Mode) {
|
||||||
|
if got, want := cc.Available(), 0; got != want {
|
||||||
|
t.Errorf("cc.Available() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := cc.InFlight(), 0; got != want {
|
||||||
|
t.Errorf("cc.InFlight() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if err := cc.Err(); err == nil {
|
||||||
|
t.Errorf("cc.Err() = nil, want closed connection")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if got, want := cc.Available(), avail; got != want {
|
||||||
|
t.Errorf("cc.Available() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := cc.InFlight(), 0; got != want {
|
||||||
|
t.Errorf("cc.InFlight() = %v, want %v", got, want)
|
||||||
|
}
|
||||||
|
if err := cc.Err(); err != nil {
|
||||||
|
t.Errorf("cc.Err() = %v, want nil", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cc.Available() > 0 {
|
||||||
|
if err := cc.Reserve(); err != nil {
|
||||||
|
t.Errorf("cc.Reserve() = %v, want success", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestClientConnRoundTripBlocks verifies that RoundTrip blocks until a concurrency
|
||||||
|
// slot is available on a connection.
|
||||||
|
func TestClientConnRoundTripBlocks(t *testing.T) { runSynctest(t, testClientConnRoundTripBlocks) }
|
||||||
|
func testClientConnRoundTripBlocks(t *testing.T, mode testMode) {
|
||||||
|
var handlerCalls atomic.Int64
|
||||||
|
requestc := make(chan struct{})
|
||||||
|
handler := func(w http.ResponseWriter, req *http.Request) {
|
||||||
|
handlerCalls.Add(1)
|
||||||
|
<-requestc
|
||||||
|
}
|
||||||
|
_, cc := newClientConnTest(t, mode, handler, optFakeNet, func(s *http.Server) {
|
||||||
|
s.HTTP2 = &http.HTTP2Config{
|
||||||
|
MaxConcurrentStreams: 3,
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
available := cc.Available()
|
||||||
|
var responses atomic.Int64
|
||||||
|
const extra = 2
|
||||||
|
for range available + extra {
|
||||||
|
go func() {
|
||||||
|
req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
|
||||||
|
resp, err := cc.RoundTrip(req)
|
||||||
|
responses.Add(1)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("RoundTrip: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
synctest.Wait()
|
||||||
|
if got, want := int(handlerCalls.Load()), available; got != want {
|
||||||
|
t.Errorf("got %v handler calls, want %v", got, want)
|
||||||
|
}
|
||||||
|
if got, want := int(responses.Load()), 0; got != want {
|
||||||
|
t.Errorf("got %v responses, want %v", got, want)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := range available + extra {
|
||||||
|
requestc <- struct{}{}
|
||||||
|
synctest.Wait()
|
||||||
|
if got, want := int(responses.Load()), i+1; got != want {
|
||||||
|
t.Errorf("got %v responses, want %v", got, want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -46,6 +46,16 @@ const (
|
||||||
http2UnencryptedMode = testMode("h2unencrypted") // HTTP/2
|
http2UnencryptedMode = testMode("h2unencrypted") // HTTP/2
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (m testMode) Scheme() string {
|
||||||
|
switch m {
|
||||||
|
case http1Mode, http2UnencryptedMode:
|
||||||
|
return "http"
|
||||||
|
case https1Mode, http2Mode:
|
||||||
|
return "https"
|
||||||
|
}
|
||||||
|
panic("unknown testMode")
|
||||||
|
}
|
||||||
|
|
||||||
type testNotParallelOpt struct{}
|
type testNotParallelOpt struct{}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
||||||
|
|
@ -1067,6 +1067,22 @@ func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
|
||||||
return errConnBroken
|
return errConnBroken
|
||||||
}
|
}
|
||||||
pconn.markReused()
|
pconn.markReused()
|
||||||
|
if pconn.isClientConn {
|
||||||
|
// internalStateHook is always set for conns created by NewClientConn.
|
||||||
|
defer pconn.internalStateHook()
|
||||||
|
pconn.mu.Lock()
|
||||||
|
defer pconn.mu.Unlock()
|
||||||
|
if !pconn.inFlight {
|
||||||
|
panic("pconn is not in flight")
|
||||||
|
}
|
||||||
|
pconn.inFlight = false
|
||||||
|
select {
|
||||||
|
case pconn.availch <- struct{}{}:
|
||||||
|
default:
|
||||||
|
panic("unable to make pconn available")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
t.idleMu.Lock()
|
t.idleMu.Lock()
|
||||||
defer t.idleMu.Unlock()
|
defer t.idleMu.Unlock()
|
||||||
|
|
@ -1243,6 +1259,9 @@ func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
|
||||||
|
|
||||||
// removeIdleConn marks pconn as dead.
|
// removeIdleConn marks pconn as dead.
|
||||||
func (t *Transport) removeIdleConn(pconn *persistConn) bool {
|
func (t *Transport) removeIdleConn(pconn *persistConn) bool {
|
||||||
|
if pconn.isClientConn {
|
||||||
|
return true
|
||||||
|
}
|
||||||
t.idleMu.Lock()
|
t.idleMu.Lock()
|
||||||
defer t.idleMu.Unlock()
|
defer t.idleMu.Unlock()
|
||||||
return t.removeIdleConnLocked(pconn)
|
return t.removeIdleConnLocked(pconn)
|
||||||
|
|
@ -1625,7 +1644,8 @@ func (t *Transport) dialConnFor(w *wantConn) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
pc, err := t.dialConn(ctx, w.cm)
|
const isClientConn = false
|
||||||
|
pc, err := t.dialConn(ctx, w.cm, isClientConn, nil)
|
||||||
delivered := w.tryDeliver(pc, err, time.Time{})
|
delivered := w.tryDeliver(pc, err, time.Time{})
|
||||||
if err == nil && (!delivered || pc.alt != nil) {
|
if err == nil && (!delivered || pc.alt != nil) {
|
||||||
// pconn was not passed to w,
|
// pconn was not passed to w,
|
||||||
|
|
@ -1746,7 +1766,7 @@ type erringRoundTripper interface {
|
||||||
|
|
||||||
var testHookProxyConnectTimeout = context.WithTimeout
|
var testHookProxyConnectTimeout = context.WithTimeout
|
||||||
|
|
||||||
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
|
func (t *Transport) dialConn(ctx context.Context, cm connectMethod, isClientConn bool, internalStateHook func()) (pconn *persistConn, err error) {
|
||||||
pconn = &persistConn{
|
pconn = &persistConn{
|
||||||
t: t,
|
t: t,
|
||||||
cacheKey: cm.key(),
|
cacheKey: cm.key(),
|
||||||
|
|
@ -1755,6 +1775,8 @@ func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *pers
|
||||||
closech: make(chan struct{}),
|
closech: make(chan struct{}),
|
||||||
writeErrCh: make(chan error, 1),
|
writeErrCh: make(chan error, 1),
|
||||||
writeLoopDone: make(chan struct{}),
|
writeLoopDone: make(chan struct{}),
|
||||||
|
isClientConn: isClientConn,
|
||||||
|
internalStateHook: internalStateHook,
|
||||||
}
|
}
|
||||||
trace := httptrace.ContextClientTrace(ctx)
|
trace := httptrace.ContextClientTrace(ctx)
|
||||||
wrapErr := func(err error) error {
|
wrapErr := func(err error) error {
|
||||||
|
|
@ -1927,6 +1949,21 @@ func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *pers
|
||||||
t.Protocols != nil &&
|
t.Protocols != nil &&
|
||||||
t.Protocols.UnencryptedHTTP2() &&
|
t.Protocols.UnencryptedHTTP2() &&
|
||||||
!t.Protocols.HTTP1()
|
!t.Protocols.HTTP1()
|
||||||
|
|
||||||
|
if isClientConn && (unencryptedHTTP2 || (pconn.tlsState != nil && pconn.tlsState.NegotiatedProtocol == "h2")) {
|
||||||
|
altProto, _ := t.altProto.Load().(map[string]RoundTripper)
|
||||||
|
h2, ok := altProto["https"].(newClientConner)
|
||||||
|
if !ok {
|
||||||
|
return nil, errors.New("http: HTTP/2 implementation does not support NewClientConn (update golang.org/x/net?)")
|
||||||
|
}
|
||||||
|
alt, err := h2.NewClientConn(pconn.conn, internalStateHook)
|
||||||
|
if err != nil {
|
||||||
|
pconn.conn.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt, isClientConn: true}, nil
|
||||||
|
}
|
||||||
|
|
||||||
if unencryptedHTTP2 {
|
if unencryptedHTTP2 {
|
||||||
next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
|
next, ok := t.TLSNextProto[nextProtoUnencryptedHTTP2]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
@ -2091,8 +2128,10 @@ type persistConn struct {
|
||||||
reqch chan requestAndChan // written by roundTrip; read by readLoop
|
reqch chan requestAndChan // written by roundTrip; read by readLoop
|
||||||
writech chan writeRequest // written by roundTrip; read by writeLoop
|
writech chan writeRequest // written by roundTrip; read by writeLoop
|
||||||
closech chan struct{} // closed when conn closed
|
closech chan struct{} // closed when conn closed
|
||||||
|
availch chan struct{} // ClientConn only: contains a value when conn is usable
|
||||||
isProxy bool
|
isProxy bool
|
||||||
sawEOF bool // whether we've seen EOF from conn; owned by readLoop
|
sawEOF bool // whether we've seen EOF from conn; owned by readLoop
|
||||||
|
isClientConn bool // whether this is a ClientConn (outside any pool)
|
||||||
readLimit int64 // bytes allowed to be read; owned by readLoop
|
readLimit int64 // bytes allowed to be read; owned by readLoop
|
||||||
// writeErrCh passes the request write error (usually nil)
|
// writeErrCh passes the request write error (usually nil)
|
||||||
// from the writeLoop goroutine to the readLoop which passes
|
// from the writeLoop goroutine to the readLoop which passes
|
||||||
|
|
@ -2111,6 +2150,10 @@ type persistConn struct {
|
||||||
closed error // set non-nil when conn is closed, before closech is closed
|
closed error // set non-nil when conn is closed, before closech is closed
|
||||||
canceledErr error // set non-nil if conn is canceled
|
canceledErr error // set non-nil if conn is canceled
|
||||||
reused bool // whether conn has had successful request/response and is being reused.
|
reused bool // whether conn has had successful request/response and is being reused.
|
||||||
|
reserved bool // ClientConn only: concurrency slot reserved
|
||||||
|
inFlight bool // ClientConn only: request is in flight
|
||||||
|
internalStateHook func() // ClientConn state hook
|
||||||
|
|
||||||
// mutateHeaderFunc is an optional func to modify extra
|
// mutateHeaderFunc is an optional func to modify extra
|
||||||
// headers on each outbound request before it's written. (the
|
// headers on each outbound request before it's written. (the
|
||||||
// original Request given to RoundTrip is not modified)
|
// original Request given to RoundTrip is not modified)
|
||||||
|
|
@ -2250,6 +2293,9 @@ func (pc *persistConn) readLoop() {
|
||||||
defer func() {
|
defer func() {
|
||||||
pc.close(closeErr)
|
pc.close(closeErr)
|
||||||
pc.t.removeIdleConn(pc)
|
pc.t.removeIdleConn(pc)
|
||||||
|
if pc.internalStateHook != nil {
|
||||||
|
pc.internalStateHook()
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
tryPutIdleConn := func(treq *transportRequest) bool {
|
tryPutIdleConn := func(treq *transportRequest) bool {
|
||||||
|
|
@ -2753,9 +2799,32 @@ var (
|
||||||
testHookReadLoopBeforeNextRead = nop
|
testHookReadLoopBeforeNextRead = nop
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (pc *persistConn) waitForAvailability(ctx context.Context) error {
|
||||||
|
select {
|
||||||
|
case <-pc.availch:
|
||||||
|
return nil
|
||||||
|
case <-pc.closech:
|
||||||
|
return pc.closed
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
|
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
|
||||||
testHookEnterRoundTrip()
|
testHookEnterRoundTrip()
|
||||||
|
|
||||||
pc.mu.Lock()
|
pc.mu.Lock()
|
||||||
|
if pc.isClientConn {
|
||||||
|
if !pc.reserved {
|
||||||
|
pc.mu.Unlock()
|
||||||
|
if err := pc.waitForAvailability(req.ctx); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pc.mu.Lock()
|
||||||
|
}
|
||||||
|
pc.reserved = false
|
||||||
|
pc.inFlight = true
|
||||||
|
}
|
||||||
pc.numExpectedResponses++
|
pc.numExpectedResponses++
|
||||||
headerFn := pc.mutateHeaderFunc
|
headerFn := pc.mutateHeaderFunc
|
||||||
pc.mu.Unlock()
|
pc.mu.Unlock()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue