91 lines
1.6 KiB
Go
91 lines
1.6 KiB
Go
package dbx
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
)
|
|
|
|
// healthChecker periodically pings nodes and updates their health state.
|
|
type healthChecker struct {
|
|
nodes []*Node
|
|
cfg HealthCheckConfig
|
|
logger Logger
|
|
metrics *MetricsHook
|
|
stop chan struct{}
|
|
done chan struct{}
|
|
}
|
|
|
|
func newHealthChecker(nodes []*Node, cfg HealthCheckConfig, logger Logger, metrics *MetricsHook) *healthChecker {
|
|
return &healthChecker{
|
|
nodes: nodes,
|
|
cfg: cfg,
|
|
logger: logger,
|
|
metrics: metrics,
|
|
stop: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
func (h *healthChecker) start() {
|
|
go h.loop()
|
|
}
|
|
|
|
func (h *healthChecker) loop() {
|
|
defer close(h.done)
|
|
|
|
ticker := time.NewTicker(h.cfg.Interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-h.stop:
|
|
return
|
|
case <-ticker.C:
|
|
h.checkAll()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *healthChecker) checkAll() {
|
|
for _, node := range h.nodes {
|
|
h.checkNode(node)
|
|
}
|
|
}
|
|
|
|
func (h *healthChecker) checkNode(n *Node) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), h.cfg.Timeout)
|
|
defer cancel()
|
|
|
|
err := n.pool.Ping(ctx)
|
|
wasHealthy := n.healthy.Load()
|
|
|
|
if err != nil {
|
|
n.healthy.Store(false)
|
|
if wasHealthy {
|
|
h.logger.Error(ctx, "dbx: node is down",
|
|
"node", n.name,
|
|
"error", err,
|
|
)
|
|
if h.metrics != nil && h.metrics.OnNodeDown != nil {
|
|
h.metrics.OnNodeDown(ctx, n.name, err)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
n.healthy.Store(true)
|
|
if !wasHealthy {
|
|
h.logger.Info(ctx, "dbx: node is up",
|
|
"node", n.name,
|
|
)
|
|
if h.metrics != nil && h.metrics.OnNodeUp != nil {
|
|
h.metrics.OnNodeUp(ctx, n.name)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *healthChecker) shutdown() {
|
|
close(h.stop)
|
|
<-h.done
|
|
}
|