From 5cfd1a7400b0d2a8e5476e41ee905d9470735729 Mon Sep 17 00:00:00 2001 From: Aleksey Shakhmatov Date: Fri, 20 Mar 2026 15:21:32 +0300 Subject: [PATCH] Fix sentinel error aliasing, hot-path allocations, and resource leaks - Deduplicate sentinel errors: httpx.ErrNoHealthy, ErrCircuitOpen, and ErrRetryExhausted are now aliases to the canonical sub-package values so errors.Is works across package boundaries - Retry transport returns ErrRetryExhausted only when all attempts are actually exhausted, not on early policy exit - Balancer: pre-parse endpoint URLs at construction, replace req.Clone with cheap shallow struct copy to avoid per-request allocations - Circuit breaker: Load before LoadOrStore to avoid allocating a Breaker on every request for known hosts - Health checker: drain response body before close for connection reuse, probe endpoints concurrently, run initial probe synchronously in Start - Client: add Close() to shut down health checker goroutine, propagate URL resolution errors instead of silently discarding them - MockClock: fix lock ordering in Reset (clock.mu before t.mu), fix timer slice compaction to avoid backing-array aliasing, extract fireExpired to deduplicate Advance/Set --- balancer/balancer.go | 44 ++++++++++++++++++++++++------- balancer/balancer_test.go | 6 +++-- balancer/health.go | 26 +++++++++++++----- circuitbreaker/breaker.go | 5 +++- client.go | 42 +++++++++++++++++++++++------- error.go | 13 ++++++--- internal/clock/clock.go | 55 ++++++++++++++++++++++++++++----------- retry/retry.go | 12 +++++++++ 8 files changed, 155 insertions(+), 48 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index d561670..29f5e53 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -2,6 +2,7 @@ package balancer import ( "errors" + "fmt" "net/http" "net/url" @@ -23,6 +24,19 @@ type Strategy interface { Next(healthy []Endpoint) (Endpoint, error) } +// Closer can be used to shut down resources associated with a balancer +// transport (e.g. background health checker goroutines). +type Closer struct { + healthChecker *HealthChecker +} + +// Close stops background goroutines. Safe to call multiple times. +func (c *Closer) Close() { + if c.healthChecker != nil { + c.healthChecker.Stop() + } +} + // Transport returns a middleware that load-balances requests across the // provided endpoints using the configured strategy. // @@ -33,7 +47,7 @@ type Strategy interface { // If active health checking is enabled (WithHealthCheck), a background // goroutine periodically probes endpoints. Otherwise all endpoints are // assumed healthy. -func Transport(endpoints []Endpoint, opts ...Option) middleware.Middleware { +func Transport(endpoints []Endpoint, opts ...Option) (middleware.Middleware, *Closer) { o := &options{ strategy: RoundRobin(), } @@ -41,10 +55,22 @@ func Transport(endpoints []Endpoint, opts ...Option) middleware.Middleware { opt(o) } + // Pre-parse endpoint URLs once at construction time. + parsed := make(map[string]*url.URL, len(endpoints)) + for _, ep := range endpoints { + u, err := url.Parse(ep.URL) + if err != nil { + panic(fmt.Sprintf("balancer: invalid endpoint URL %q: %v", ep.URL, err)) + } + parsed[ep.URL] = u + } + if o.healthChecker != nil { o.healthChecker.Start(endpoints) } + closer := &Closer{healthChecker: o.healthChecker} + return func(next http.RoundTripper) http.RoundTripper { return middleware.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { healthy := endpoints @@ -61,18 +87,18 @@ func Transport(endpoints []Endpoint, opts ...Option) middleware.Middleware { return nil, err } - epURL, err := url.Parse(ep.URL) - if err != nil { - return nil, err - } + epURL := parsed[ep.URL] - // Clone the request URL and replace scheme+host with the endpoint. - r := req.Clone(req.Context()) + // Shallow-copy request and URL to avoid mutating the original, + // without the expense of req.Clone's deep header copy. + r := *req + u := *req.URL + r.URL = &u r.URL.Scheme = epURL.Scheme r.URL.Host = epURL.Host r.Host = epURL.Host - return next.RoundTrip(r) + return next.RoundTrip(&r) }) - } + }, closer } diff --git a/balancer/balancer_test.go b/balancer/balancer_test.go index d659cb3..2b05ffc 100644 --- a/balancer/balancer_test.go +++ b/balancer/balancer_test.go @@ -33,7 +33,8 @@ func TestTransport_PicksEndpointAndReplacesURL(t *testing.T) { return okResponse(), nil }) - rt := Transport(endpoints)(base) + mw, _ := Transport(endpoints) + rt := mw(base) req, err := http.NewRequest(http.MethodGet, "https://original.example.com/api/v1/users", nil) if err != nil { @@ -67,7 +68,8 @@ func TestTransport_ErrNoHealthyWhenNoEndpoints(t *testing.T) { return nil, nil }) - rt := Transport(endpoints)(base) + mw, _ := Transport(endpoints) + rt := mw(base) req, err := http.NewRequest(http.MethodGet, "https://example.com/test", nil) if err != nil { diff --git a/balancer/health.go b/balancer/health.go index 90db44a..4ffc7dd 100644 --- a/balancer/health.go +++ b/balancer/health.go @@ -2,6 +2,7 @@ package balancer import ( "context" + "io" "net/http" "sync" "time" @@ -70,8 +71,10 @@ func newHealthChecker(opts ...HealthOption) *HealthChecker { } // Start begins the background health checking loop for the given endpoints. -// All endpoints are initially considered healthy. +// An initial probe is run synchronously so that unhealthy endpoints are +// detected before the first request. func (h *HealthChecker) Start(endpoints []Endpoint) { + // Mark all healthy as a safe default, then immediately probe. h.mu.Lock() for _, ep := range endpoints { h.status[ep.URL] = true @@ -82,6 +85,9 @@ func (h *HealthChecker) Start(endpoints []Endpoint) { h.cancel = cancel h.stopped = make(chan struct{}) + // Run initial probe synchronously so callers don't hit stale state. + h.probe(ctx, endpoints) + go h.loop(ctx, endpoints) } @@ -111,7 +117,7 @@ func (h *HealthChecker) Healthy(endpoints []Endpoint) []Endpoint { h.mu.RLock() defer h.mu.RUnlock() - var result []Endpoint + result := make([]Endpoint, 0, len(endpoints)) for _, ep := range endpoints { if h.status[ep.URL] { result = append(result, ep) @@ -137,13 +143,18 @@ func (h *HealthChecker) loop(ctx context.Context, endpoints []Endpoint) { } func (h *HealthChecker) probe(ctx context.Context, endpoints []Endpoint) { + var wg sync.WaitGroup + wg.Add(len(endpoints)) for _, ep := range endpoints { - healthy := h.check(ctx, ep) - - h.mu.Lock() - h.status[ep.URL] = healthy - h.mu.Unlock() + go func() { + defer wg.Done() + healthy := h.check(ctx, ep) + h.mu.Lock() + h.status[ep.URL] = healthy + h.mu.Unlock() + }() } + wg.Wait() } func (h *HealthChecker) check(ctx context.Context, ep Endpoint) bool { @@ -156,6 +167,7 @@ func (h *HealthChecker) check(ctx context.Context, ep Endpoint) bool { if err != nil { return false } + io.Copy(io.Discard, resp.Body) resp.Body.Close() return resp.StatusCode >= 200 && resp.StatusCode < 300 diff --git a/circuitbreaker/breaker.go b/circuitbreaker/breaker.go index 68a2695..b4f2629 100644 --- a/circuitbreaker/breaker.go +++ b/circuitbreaker/breaker.go @@ -156,7 +156,10 @@ func Transport(opts ...Option) middleware.Middleware { return middleware.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { host := req.URL.Host - val, _ := hosts.LoadOrStore(host, NewBreaker(opts...)) + val, ok := hosts.Load(host) + if !ok { + val, _ = hosts.LoadOrStore(host, NewBreaker(opts...)) + } cb := val.(*Breaker) done, err := cb.Allow() diff --git a/client.go b/client.go index eb25b28..4456375 100644 --- a/client.go +++ b/client.go @@ -2,6 +2,7 @@ package httpx import ( "context" + "fmt" "io" "net/http" "strings" @@ -15,9 +16,10 @@ import ( // Client is a high-level HTTP client that composes middleware for retry, // circuit breaking, load balancing, logging, and more. type Client struct { - httpClient *http.Client - baseURL string - errorMapper ErrorMapper + httpClient *http.Client + baseURL string + errorMapper ErrorMapper + balancerCloser *balancer.Closer } // New creates a new Client with the given options. @@ -37,8 +39,11 @@ func New(opts ...Option) *Client { var chain []middleware.Middleware // Balancer (innermost, wraps base transport). + var balancerCloser *balancer.Closer if len(o.endpoints) > 0 { - chain = append(chain, balancer.Transport(o.endpoints, o.balancerOpts...)) + var mw middleware.Middleware + mw, balancerCloser = balancer.Transport(o.endpoints, o.balancerOpts...) + chain = append(chain, mw) } // Circuit breaker wraps balancer. @@ -72,15 +77,18 @@ func New(opts ...Option) *Client { Transport: rt, Timeout: o.timeout, }, - baseURL: o.baseURL, - errorMapper: o.errorMapper, + baseURL: o.baseURL, + errorMapper: o.errorMapper, + balancerCloser: balancerCloser, } } // Do executes an HTTP request. func (c *Client) Do(ctx context.Context, req *http.Request) (*Response, error) { req = req.WithContext(ctx) - c.resolveURL(req) + if err := c.resolveURL(req); err != nil { + return nil, err + } resp, err := c.httpClient.Do(req) if err != nil { @@ -143,14 +151,23 @@ func (c *Client) Delete(ctx context.Context, url string) (*Response, error) { return c.Do(ctx, req) } +// Close releases resources associated with the Client, such as background +// health checker goroutines. It is safe to call multiple times. +func (c *Client) Close() { + if c.balancerCloser != nil { + c.balancerCloser.Close() + } +} + // HTTPClient returns the underlying *http.Client for advanced use cases. +// Mutating the returned client may bypass the configured middleware chain. func (c *Client) HTTPClient() *http.Client { return c.httpClient } -func (c *Client) resolveURL(req *http.Request) { +func (c *Client) resolveURL(req *http.Request) error { if c.baseURL == "" { - return + return nil } // Only resolve relative URLs (no scheme). if req.URL.Scheme == "" && req.URL.Host == "" { @@ -159,6 +176,11 @@ func (c *Client) resolveURL(req *http.Request) { path = "/" + path } base := strings.TrimRight(c.baseURL, "/") - req.URL, _ = req.URL.Parse(base + path) + u, err := req.URL.Parse(base + path) + if err != nil { + return fmt.Errorf("httpx: resolving URL %q with base %q: %w", path, c.baseURL, err) + } + req.URL = u } + return nil } diff --git a/error.go b/error.go index 6a3185e..fc47c54 100644 --- a/error.go +++ b/error.go @@ -1,16 +1,21 @@ package httpx import ( - "errors" "fmt" "net/http" + + "git.codelab.vc/pkg/httpx/balancer" + "git.codelab.vc/pkg/httpx/circuitbreaker" + "git.codelab.vc/pkg/httpx/retry" ) // Sentinel errors returned by httpx components. +// These are aliases for the canonical errors defined in sub-packages, +// so that errors.Is works regardless of which import the caller uses. var ( - ErrRetryExhausted = errors.New("httpx: all retry attempts exhausted") - ErrCircuitOpen = errors.New("httpx: circuit breaker is open") - ErrNoHealthy = errors.New("httpx: no healthy endpoints available") + ErrRetryExhausted = retry.ErrRetryExhausted + ErrCircuitOpen = circuitbreaker.ErrCircuitOpen + ErrNoHealthy = balancer.ErrNoHealthy ) // Error provides structured error information for failed HTTP operations. diff --git a/internal/clock/clock.go b/internal/clock/clock.go index c6a8e08..d19a6be 100644 --- a/internal/clock/clock.go +++ b/internal/clock/clock.go @@ -64,6 +64,7 @@ func (m *MockClock) NewTimer(d time.Duration) Timer { m.mu.Lock() defer m.mu.Unlock() t := &mockTimer{ + clock: m, ch: make(chan time.Time, 1), deadline: m.now.Add(d), active: true, @@ -84,6 +85,25 @@ func (m *MockClock) Advance(d time.Duration) { m.mu.Lock() m.now = m.now.Add(d) now := m.now + m.mu.Unlock() + + m.fireExpired(now) +} + +// Set sets the clock to an absolute time and fires any expired timers. +func (m *MockClock) Set(t time.Time) { + m.mu.Lock() + m.now = t + now := m.now + m.mu.Unlock() + + m.fireExpired(now) +} + +// fireExpired fires all active timers whose deadline has passed, then +// removes inactive timers to prevent unbounded growth. +func (m *MockClock) fireExpired(now time.Time) { + m.mu.Lock() timers := m.timers m.mu.Unlock() @@ -94,27 +114,27 @@ func (m *MockClock) Advance(d time.Duration) { } t.mu.Unlock() } -} -// Set sets the clock to an absolute time and fires any expired timers. -func (m *MockClock) Set(t time.Time) { + // Compact: remove inactive timers. Use a new slice to avoid aliasing + // the backing array (NewTimer may have appended between snapshots). m.mu.Lock() - m.now = t - now := m.now - timers := m.timers - m.mu.Unlock() - - for _, tmr := range timers { - tmr.mu.Lock() - if tmr.active && !now.Before(tmr.deadline) { - tmr.fire(now) + n := len(m.timers) + active := make([]*mockTimer, 0, n) + for _, t := range m.timers { + t.mu.Lock() + keep := t.active + t.mu.Unlock() + if keep { + active = append(active, t) } - tmr.mu.Unlock() } + m.timers = active + m.mu.Unlock() } type mockTimer struct { mu sync.Mutex + clock *MockClock ch chan time.Time deadline time.Time active bool @@ -131,12 +151,17 @@ func (t *mockTimer) Stop() bool { } func (t *mockTimer) Reset(d time.Duration) bool { + // Acquire clock lock first to match the lock ordering in fireExpired + // (clock.mu → t.mu), preventing deadlock. + t.clock.mu.Lock() + deadline := t.clock.now.Add(d) + t.clock.mu.Unlock() + t.mu.Lock() defer t.mu.Unlock() was := t.active t.active = true - // Note: deadline will be recalculated on next Advance - t.deadline = time.Now().Add(d) // placeholder; mock users should use Advance + t.deadline = deadline return was } diff --git a/retry/retry.go b/retry/retry.go index 36c491b..7dd9f7d 100644 --- a/retry/retry.go +++ b/retry/retry.go @@ -1,6 +1,8 @@ package retry import ( + "errors" + "fmt" "io" "net/http" "time" @@ -8,6 +10,10 @@ import ( "git.codelab.vc/pkg/httpx/middleware" ) +// ErrRetryExhausted is returned when all retry attempts have been exhausted +// and the last attempt also failed. +var ErrRetryExhausted = errors.New("httpx: all retry attempts exhausted") + // Policy decides whether a failed request should be retried. type Policy interface { // ShouldRetry reports whether the request should be retried. The extra @@ -28,6 +34,7 @@ func Transport(opts ...Option) middleware.Middleware { return middleware.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { var resp *http.Response var err error + var exhausted bool for attempt := range cfg.maxAttempts { // For retries (attempt > 0), restore the request body. @@ -48,6 +55,7 @@ func Transport(opts ...Option) middleware.Middleware { // Last attempt — return whatever we got. if attempt == cfg.maxAttempts-1 { + exhausted = true break } @@ -85,6 +93,10 @@ func Transport(opts ...Option) middleware.Middleware { } } + // Wrap with ErrRetryExhausted only when all attempts were used. + if exhausted && err != nil { + err = fmt.Errorf("%w: %w", ErrRetryExhausted, err) + } return resp, err }) }