Files
httpx/balancer/health.go
Aleksey Shakhmatov 01478be0dc Replace balancer panic with deferred error; test HealthChecker
A malformed endpoint URL panicked inside Transport, crashing the host app
(often at startup from external config). Capture the parse error and surface
it from the transport on first use instead. Add the previously untested
HealthChecker coverage (initial probe, recovery, Stop termination, unknown
endpoint), raising balancer coverage from ~41% to ~87%. Default the health
probe path to /healthz to match this library's own server.
2026-05-23 13:47:33 +03:00

175 lines
3.7 KiB
Go

package balancer
import (
"context"
"io"
"net/http"
"sync"
"time"
)
const (
defaultHealthInterval = 10 * time.Second
defaultHealthPath = "/healthz"
defaultHealthTimeout = 5 * time.Second
)
// HealthOption configures the HealthChecker.
type HealthOption func(*HealthChecker)
// WithHealthInterval sets the interval between health check probes.
// Default is 10 seconds.
func WithHealthInterval(d time.Duration) HealthOption {
return func(h *HealthChecker) {
h.interval = d
}
}
// WithHealthPath sets the HTTP path to probe for health checks.
// Default is "/health".
func WithHealthPath(path string) HealthOption {
return func(h *HealthChecker) {
h.path = path
}
}
// WithHealthTimeout sets the timeout for each health check request.
// Default is 5 seconds.
func WithHealthTimeout(d time.Duration) HealthOption {
return func(h *HealthChecker) {
h.timeout = d
}
}
// HealthChecker periodically probes endpoints to determine their health status.
type HealthChecker struct {
interval time.Duration
path string
timeout time.Duration
client *http.Client
mu sync.RWMutex
status map[string]bool
cancel context.CancelFunc
stopped chan struct{}
}
func newHealthChecker(opts ...HealthOption) *HealthChecker {
h := &HealthChecker{
interval: defaultHealthInterval,
path: defaultHealthPath,
timeout: defaultHealthTimeout,
status: make(map[string]bool),
}
for _, opt := range opts {
opt(h)
}
h.client = &http.Client{
Timeout: h.timeout,
}
return h
}
// Start begins the background health checking loop for the given endpoints.
// An initial probe is run synchronously so that unhealthy endpoints are
// detected before the first request.
func (h *HealthChecker) Start(endpoints []Endpoint) {
// Mark all healthy as a safe default, then immediately probe.
h.mu.Lock()
for _, ep := range endpoints {
h.status[ep.URL] = true
}
h.mu.Unlock()
ctx, cancel := context.WithCancel(context.Background())
h.cancel = cancel
h.stopped = make(chan struct{})
// Run initial probe synchronously so callers don't hit stale state.
h.probe(ctx, endpoints)
go h.loop(ctx, endpoints)
}
// Stop terminates the background health checking goroutine and waits for
// it to finish.
func (h *HealthChecker) Stop() {
if h.cancel != nil {
h.cancel()
<-h.stopped
}
}
// IsHealthy reports whether the given endpoint is currently healthy.
func (h *HealthChecker) IsHealthy(ep Endpoint) bool {
h.mu.RLock()
defer h.mu.RUnlock()
healthy, ok := h.status[ep.URL]
if !ok {
return false
}
return healthy
}
// Healthy returns the subset of endpoints that are currently healthy.
func (h *HealthChecker) Healthy(endpoints []Endpoint) []Endpoint {
h.mu.RLock()
defer h.mu.RUnlock()
result := make([]Endpoint, 0, len(endpoints))
for _, ep := range endpoints {
if h.status[ep.URL] {
result = append(result, ep)
}
}
return result
}
func (h *HealthChecker) loop(ctx context.Context, endpoints []Endpoint) {
defer close(h.stopped)
ticker := time.NewTicker(h.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
h.probe(ctx, endpoints)
}
}
}
func (h *HealthChecker) probe(ctx context.Context, endpoints []Endpoint) {
var wg sync.WaitGroup
wg.Add(len(endpoints))
for _, ep := range endpoints {
go func() {
defer wg.Done()
healthy := h.check(ctx, ep)
h.mu.Lock()
h.status[ep.URL] = healthy
h.mu.Unlock()
}()
}
wg.Wait()
}
func (h *HealthChecker) check(ctx context.Context, ep Endpoint) bool {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, ep.URL+h.path, nil)
if err != nil {
return false
}
resp, err := h.client.Do(req)
if err != nil {
return false
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
return resp.StatusCode >= 200 && resp.StatusCode < 300
}