A malformed endpoint URL panicked inside Transport, crashing the host app (often at startup from external config). Capture the parse error and surface it from the transport on first use instead. Add the previously untested HealthChecker coverage (initial probe, recovery, Stop termination, unknown endpoint), raising balancer coverage from ~41% to ~87%. Default the health probe path to /healthz to match this library's own server.
175 lines
3.7 KiB
Go
175 lines
3.7 KiB
Go
package balancer
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
defaultHealthInterval = 10 * time.Second
|
|
defaultHealthPath = "/healthz"
|
|
defaultHealthTimeout = 5 * time.Second
|
|
)
|
|
|
|
// HealthOption configures the HealthChecker.
|
|
type HealthOption func(*HealthChecker)
|
|
|
|
// WithHealthInterval sets the interval between health check probes.
|
|
// Default is 10 seconds.
|
|
func WithHealthInterval(d time.Duration) HealthOption {
|
|
return func(h *HealthChecker) {
|
|
h.interval = d
|
|
}
|
|
}
|
|
|
|
// WithHealthPath sets the HTTP path to probe for health checks.
|
|
// Default is "/health".
|
|
func WithHealthPath(path string) HealthOption {
|
|
return func(h *HealthChecker) {
|
|
h.path = path
|
|
}
|
|
}
|
|
|
|
// WithHealthTimeout sets the timeout for each health check request.
|
|
// Default is 5 seconds.
|
|
func WithHealthTimeout(d time.Duration) HealthOption {
|
|
return func(h *HealthChecker) {
|
|
h.timeout = d
|
|
}
|
|
}
|
|
|
|
// HealthChecker periodically probes endpoints to determine their health status.
|
|
type HealthChecker struct {
|
|
interval time.Duration
|
|
path string
|
|
timeout time.Duration
|
|
client *http.Client
|
|
|
|
mu sync.RWMutex
|
|
status map[string]bool
|
|
cancel context.CancelFunc
|
|
stopped chan struct{}
|
|
}
|
|
|
|
func newHealthChecker(opts ...HealthOption) *HealthChecker {
|
|
h := &HealthChecker{
|
|
interval: defaultHealthInterval,
|
|
path: defaultHealthPath,
|
|
timeout: defaultHealthTimeout,
|
|
status: make(map[string]bool),
|
|
}
|
|
for _, opt := range opts {
|
|
opt(h)
|
|
}
|
|
h.client = &http.Client{
|
|
Timeout: h.timeout,
|
|
}
|
|
return h
|
|
}
|
|
|
|
// Start begins the background health checking loop for the given endpoints.
|
|
// An initial probe is run synchronously so that unhealthy endpoints are
|
|
// detected before the first request.
|
|
func (h *HealthChecker) Start(endpoints []Endpoint) {
|
|
// Mark all healthy as a safe default, then immediately probe.
|
|
h.mu.Lock()
|
|
for _, ep := range endpoints {
|
|
h.status[ep.URL] = true
|
|
}
|
|
h.mu.Unlock()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
h.cancel = cancel
|
|
h.stopped = make(chan struct{})
|
|
|
|
// Run initial probe synchronously so callers don't hit stale state.
|
|
h.probe(ctx, endpoints)
|
|
|
|
go h.loop(ctx, endpoints)
|
|
}
|
|
|
|
// Stop terminates the background health checking goroutine and waits for
|
|
// it to finish.
|
|
func (h *HealthChecker) Stop() {
|
|
if h.cancel != nil {
|
|
h.cancel()
|
|
<-h.stopped
|
|
}
|
|
}
|
|
|
|
// IsHealthy reports whether the given endpoint is currently healthy.
|
|
func (h *HealthChecker) IsHealthy(ep Endpoint) bool {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
healthy, ok := h.status[ep.URL]
|
|
if !ok {
|
|
return false
|
|
}
|
|
return healthy
|
|
}
|
|
|
|
// Healthy returns the subset of endpoints that are currently healthy.
|
|
func (h *HealthChecker) Healthy(endpoints []Endpoint) []Endpoint {
|
|
h.mu.RLock()
|
|
defer h.mu.RUnlock()
|
|
|
|
result := make([]Endpoint, 0, len(endpoints))
|
|
for _, ep := range endpoints {
|
|
if h.status[ep.URL] {
|
|
result = append(result, ep)
|
|
}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (h *HealthChecker) loop(ctx context.Context, endpoints []Endpoint) {
|
|
defer close(h.stopped)
|
|
|
|
ticker := time.NewTicker(h.interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
h.probe(ctx, endpoints)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *HealthChecker) probe(ctx context.Context, endpoints []Endpoint) {
|
|
var wg sync.WaitGroup
|
|
wg.Add(len(endpoints))
|
|
for _, ep := range endpoints {
|
|
go func() {
|
|
defer wg.Done()
|
|
healthy := h.check(ctx, ep)
|
|
h.mu.Lock()
|
|
h.status[ep.URL] = healthy
|
|
h.mu.Unlock()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (h *HealthChecker) check(ctx context.Context, ep Endpoint) bool {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, ep.URL+h.path, nil)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
resp, err := h.client.Do(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
io.Copy(io.Discard, resp.Body)
|
|
resp.Body.Close()
|
|
|
|
return resp.StatusCode >= 200 && resp.StatusCode < 300
|
|
}
|