- Deduplicate sentinel errors: httpx.ErrNoHealthy, ErrCircuitOpen, and ErrRetryExhausted are now aliases to the canonical sub-package values so errors.Is works across package boundaries - Retry transport returns ErrRetryExhausted only when all attempts are actually exhausted, not on early policy exit - Balancer: pre-parse endpoint URLs at construction, replace req.Clone with cheap shallow struct copy to avoid per-request allocations - Circuit breaker: Load before LoadOrStore to avoid allocating a Breaker on every request for known hosts - Health checker: drain response body before close for connection reuse, probe endpoints concurrently, run initial probe synchronously in Start - Client: add Close() to shut down health checker goroutine, propagate URL resolution errors instead of silently discarding them - MockClock: fix lock ordering in Reset (clock.mu before t.mu), fix timer slice compaction to avoid backing-array aliasing, extract fireExpired to deduplicate Advance/Set
175 lines
3.7 KiB
Go
175 lines
3.7 KiB
Go
package balancer
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
defaultHealthInterval = 10 * time.Second
|
|
defaultHealthPath = "/health"
|
|
defaultHealthTimeout = 5 * time.Second
|
|
)
|
|
|
|
// HealthOption configures the HealthChecker.
|
|
type HealthOption func(*HealthChecker)
|
|
|
|
// WithHealthInterval sets the interval between health check probes.
|
|
// Default is 10 seconds.
|
|
func WithHealthInterval(d time.Duration) HealthOption {
|
|
return func(h *HealthChecker) {
|
|
h.interval = d
|
|
}
|
|
}
|
|
|
|
// WithHealthPath sets the HTTP path to probe for health checks.
|
|
// Default is "/health".
|
|
func WithHealthPath(path string) HealthOption {
|
|
return func(h *HealthChecker) {
|
|
h.path = path
|
|
}
|
|
}
|
|
|
|
// WithHealthTimeout sets the timeout for each health check request.
|
|
// Default is 5 seconds.
|
|
func WithHealthTimeout(d time.Duration) HealthOption {
|
|
return func(h *HealthChecker) {
|
|
h.timeout = d
|
|
}
|
|
}
|
|
|
|
// HealthChecker periodically probes endpoints to determine their health status.
|
|
type HealthChecker struct {
|
|
interval time.Duration
|
|
path string
|
|
timeout time.Duration
|
|
client *http.Client
|
|
|
|
mu sync.RWMutex
|
|
status map[string]bool
|
|
cancel context.CancelFunc
|
|
stopped chan struct{}
|
|
}
|
|
|
|
func newHealthChecker(opts ...HealthOption) *HealthChecker {
|
|
h := &HealthChecker{
|
|
interval: defaultHealthInterval,
|
|
path: defaultHealthPath,
|
|
timeout: defaultHealthTimeout,
|
|
status: make(map[string]bool),
|
|
}
|
|
for _, opt := range opts {
|
|
opt(h)
|
|
}
|
|
h.client = &http.Client{
|
|
Timeout: h.timeout,
|
|
}
|
|
return h
|
|
}
|
|
|
|
// Start begins the background health checking loop for the given endpoints.
|
|
// An initial probe is run synchronously so that unhealthy endpoints are
|
|
// detected before the first request.
|
|
func (h *HealthChecker) Start(endpoints []Endpoint) {
|
|
// Mark all healthy as a safe default, then immediately probe.
|
|
h.mu.Lock()
|
|
for _, ep := range endpoints {
|
|
h.status[ep.URL] = true
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
h.cancel = cancel
|
|
h.stopped = make(chan struct{})
|
|
|
|
// Run initial probe synchronously so callers don't hit stale state.
|
|
h.probe(ctx, endpoints)
|
|
|
|
go h.loop(ctx, endpoints)
|
|
}
|
|
|
|
// Stop terminates the background health checking goroutine and waits for
|
|
// it to finish.
|
|
func (h *HealthChecker) Stop() {
|
|
if h.cancel != nil {
|
|
h.cancel()
|
|
<-h.stopped
|
|
}
|
|
}
|
|
|
|
// IsHealthy reports whether the given endpoint is currently healthy.
|
|
func (h *HealthChecker) IsHealthy(ep Endpoint) bool {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
healthy, ok := h.status[ep.URL]
|
|
if !ok {
|
|
return false
|
|
}
|
|
return healthy
|
|
}
|
|
|
|
// Healthy returns the subset of endpoints that are currently healthy.
|
|
func (h *HealthChecker) Healthy(endpoints []Endpoint) []Endpoint {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
result := make([]Endpoint, 0, len(endpoints))
|
|
for _, ep := range endpoints {
|
|
if h.status[ep.URL] {
|
|
result = append(result, ep)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (h *HealthChecker) loop(ctx context.Context, endpoints []Endpoint) {
|
|
defer close(h.stopped)
|
|
|
|
ticker := time.NewTicker(h.interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
h.probe(ctx, endpoints)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *HealthChecker) probe(ctx context.Context, endpoints []Endpoint) {
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(endpoints))
|
|
for _, ep := range endpoints {
|
|
go func() {
|
|
defer wg.Done()
|
|
healthy := h.check(ctx, ep)
|
|
h.mu.Lock()
|
|
h.status[ep.URL] = healthy
|
|
h.mu.Unlock()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (h *HealthChecker) check(ctx context.Context, ep Endpoint) bool {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, ep.URL+h.path, nil)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
resp, err := h.client.Do(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
io.Copy(io.Discard, resp.Body)
|
|
resp.Body.Close()
|
|
|
|
return resp.StatusCode >= 200 && resp.StatusCode < 300
|
|
}
|