99 lines
2.2 KiB
Go
99 lines
2.2 KiB
Go
package dbx
|
|
|
|
import (
|
|
"context"
|
|
"math"
|
|
"math/rand/v2"
|
|
"time"
|
|
)
|
|
|
|
// retrier executes operations with retry and node fallback.
|
|
type retrier struct {
|
|
cfg RetryConfig
|
|
logger Logger
|
|
metrics *MetricsHook
|
|
}
|
|
|
|
func newRetrier(cfg RetryConfig, logger Logger, metrics *MetricsHook) *retrier {
|
|
return &retrier{cfg: cfg, logger: logger, metrics: metrics}
|
|
}
|
|
|
|
// isRetryable checks the custom classifier first, then falls back to the default.
|
|
func (r *retrier) isRetryable(err error) bool {
|
|
if r.cfg.RetryableErrors != nil {
|
|
return r.cfg.RetryableErrors(err)
|
|
}
|
|
return IsRetryable(err)
|
|
}
|
|
|
|
// do executes fn on the given nodes in order, retrying on retryable errors.
|
|
// For writes, pass a single-element slice with the master.
|
|
// For reads, pass [replicas..., master] for fallback.
|
|
func (r *retrier) do(ctx context.Context, nodes []*Node, fn func(ctx context.Context, n *Node) error) error {
|
|
var lastErr error
|
|
|
|
for attempt := 0; attempt < r.cfg.MaxAttempts; attempt++ {
|
|
if ctx.Err() != nil {
|
|
if lastErr != nil {
|
|
return lastErr
|
|
}
|
|
return ctx.Err()
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
if !node.IsHealthy() {
|
|
continue
|
|
}
|
|
|
|
err := fn(ctx, node)
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
lastErr = err
|
|
|
|
if !r.isRetryable(err) {
|
|
return err
|
|
}
|
|
|
|
if r.metrics != nil && r.metrics.OnRetry != nil {
|
|
r.metrics.OnRetry(ctx, node.name, attempt+1, err)
|
|
}
|
|
r.logger.Warn(ctx, "dbx: retryable error",
|
|
"node", node.name,
|
|
"attempt", attempt+1,
|
|
"error", err,
|
|
)
|
|
}
|
|
|
|
if attempt < r.cfg.MaxAttempts-1 {
|
|
delay := r.backoff(attempt)
|
|
t := time.NewTimer(delay)
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Stop()
|
|
if lastErr != nil {
|
|
return lastErr
|
|
}
|
|
return ctx.Err()
|
|
case <-t.C:
|
|
}
|
|
}
|
|
}
|
|
|
|
if lastErr == nil {
|
|
return ErrNoHealthyNode
|
|
}
|
|
return newRetryError(r.cfg.MaxAttempts, lastErr)
|
|
}
|
|
|
|
// backoff returns the delay for the given attempt with jitter.
|
|
func (r *retrier) backoff(attempt int) time.Duration {
|
|
delay := float64(r.cfg.BaseDelay) * math.Pow(2, float64(attempt))
|
|
if delay > float64(r.cfg.MaxDelay) {
|
|
delay = float64(r.cfg.MaxDelay)
|
|
}
|
|
// add jitter: 75%-125% of computed delay
|
|
jitter := 0.75 + rand.Float64()*0.5
|
|
return time.Duration(delay * jitter)
|
|
}
|