This commit is contained in:
Y.Horie 2025-12-05 11:04:00 -05:00 committed by GitHub
commit 301549e303
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 79 additions and 11 deletions

View file

@ -102,6 +102,33 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
})
return true
})
// Iterate over the inflight hosts
inflightHosts.Range(func(key, val any) bool {
address, ok := key.(string)
if !ok {
rangeErr = caddy.APIError{
HTTPStatus: http.StatusInternalServerError,
Err: fmt.Errorf("could not type assert upstream address"),
}
return false
}
upstream, ok := val.(*Host)
if !ok {
rangeErr = caddy.APIError{
HTTPStatus: http.StatusInternalServerError,
Err: fmt.Errorf("could not type assert upstream struct"),
}
return false
}
results = append(results, upstreamStatus{
Address: address,
NumRequests: upstream.NumRequests(),
Fails: upstream.Fails(),
})
return true
})
// If an error happened during the range, return it
if rangeErr != nil {

View file

@ -132,6 +132,16 @@ func (u *Upstream) fillHost() {
u.Host = host
}
func (u *Upstream) fillInfilghtHost(numRemaiRequests int) {
host := new(Host)
existingHost, loaded := inflightHosts.LoadOrStore(u.String(), host)
if loaded {
host = existingHost.(*Host)
}
_ = host.countRequest(numRemaiRequests)
u.Host = host
}
// Host is the basic, in-memory representation of the state of a remote host.
// Its fields are accessed atomically and Host values must not be copied.
type Host struct {
@ -268,6 +278,10 @@ func GetDialInfo(ctx context.Context) (DialInfo, bool) {
// through config reloads.
var hosts = caddy.NewUsagePool()
// inflightHosts is the global repository for hosts that are
// currently in use by inflight upstream request.
var inflightHosts = caddy.NewUsagePool()
// dialInfoVarKey is the key used for the variable that holds
// the dial info for the upstream connection.
const dialInfoVarKey = "reverse_proxy.dial_info"

View file

@ -392,6 +392,9 @@ func (h *Handler) Cleanup() error {
// remove hosts from our config from the pool
for _, upstream := range h.Upstreams {
if upstream.NumRequests() > 0 {
upstream.fillInfilghtHost(upstream.NumRequests())
}
_, _ = hosts.Delete(upstream.String())
}
@ -474,8 +477,16 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
}
var done bool
done, proxyErr = h.proxyLoopIteration(clonedReq, r, w, proxyErr, start, retries, repl, reqHeader, reqHost, next)
done, dialInfo, proxyErr := h.proxyLoopIteration(clonedReq, r, w, proxyErr, start, retries, repl, reqHeader, reqHost, next)
if done {
key := dialInfo.Address
val := inflightHosts.Load(key)
if val != nil {
host, _ := val.(*Host)
if host.NumRequests() <= 0 {
_, _ = inflightHosts.Delete(key)
}
}
break
}
if h.VerboseLogs {
@ -506,7 +517,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
// be assigned to the proxyErr value for the next iteration of the loop (or the error handled after break).
func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w http.ResponseWriter, proxyErr error, start time.Time, retries int,
repl *caddy.Replacer, reqHeader http.Header, reqHost string, next caddyhttp.Handler,
) (bool, error) {
) (bool, *DialInfo, error) {
// get the updated list of upstreams
upstreams := h.Upstreams
if h.DynamicUpstreams != nil {
@ -540,9 +551,9 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
proxyErr = caddyhttp.Error(http.StatusServiceUnavailable, errNoUpstream)
}
if !h.LoadBalancing.tryAgain(h.ctx, start, retries, proxyErr, r, h.logger) {
return true, proxyErr
return true, nil, proxyErr
}
return false, proxyErr
return false, nil, proxyErr
}
// the dial address may vary per-request if placeholders are
@ -550,7 +561,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
// DialInfo struct should have valid network address syntax
dialInfo, err := upstream.fillDialInfo(repl)
if err != nil {
return true, fmt.Errorf("making dial info: %v", err)
return true, nil, fmt.Errorf("making dial info: %v", err)
}
if c := h.logger.Check(zapcore.DebugLevel, "selected upstream"); c != nil {
@ -590,7 +601,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
if proxyErr == nil || errors.Is(proxyErr, context.Canceled) {
// context.Canceled happens when the downstream client
// cancels the request, which is not our failure
return true, nil
return true, &dialInfo, nil
}
// if the roundtrip was successful, don't retry the request or
@ -598,7 +609,7 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
// occur after the roundtrip if, for example, a response handler
// after the roundtrip returns an error)
if succ, ok := proxyErr.(roundtripSucceededError); ok {
return true, succ.error
return true, &dialInfo, succ.error
}
// remember this failure (if enabled)
@ -606,10 +617,10 @@ func (h *Handler) proxyLoopIteration(r *http.Request, origReq *http.Request, w h
// if we've tried long enough, break
if !h.LoadBalancing.tryAgain(h.ctx, start, retries, proxyErr, r, h.logger) {
return true, proxyErr
return true, &dialInfo, proxyErr
}
return false, proxyErr
return false, &dialInfo, proxyErr
}
// Mapping of the canonical form of the headers, to the RFC 6455 form,
@ -851,8 +862,14 @@ func (h Handler) addForwardedHeaders(req *http.Request) error {
func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origReq *http.Request, repl *caddy.Replacer, di DialInfo, next caddyhttp.Handler) error {
_ = di.Upstream.Host.countRequest(1)
//nolint:errcheck
defer di.Upstream.Host.countRequest(-1)
defer func() {
di.Upstream.Host.countRequest(-1)
inflightHost := inflightHosts.Load(di.Address)
if inflightHost != nil {
host, _ := inflightHost.(*Host)
host.countRequest(-1)
}
}()
// point the request to this upstream
h.directRequest(req, di)

View file

@ -194,6 +194,16 @@ func (up *UsagePool) Delete(key any) (deleted bool, err error) {
return deleted, err
}
func (up *UsagePool) Load(key any) any {
up.RLock()
defer up.RUnlock()
upv, loaded := up.pool[key]
if loaded {
return upv.value
}
return nil
}
// References returns the number of references (count of usages) to a
// key in the pool, and true if the key exists, or false otherwise.
func (up *UsagePool) References(key any) (int, bool) {