Fix retry body replay and jitter panic; drive delays via internal/clock
Gate the retry decision on body rewindability: an idempotent request whose body cannot be replayed (no GetBody) is now returned as-is instead of looping with an empty body or surfacing a stale, already-drained response. Guard ExponentialBackoff against rand.Int64N panicking when delay/2 rounds to zero. Use internal/clock for inter-attempt delays so retry timing is consistent with the rest of the codebase and testable without real sleeps.
This commit is contained in:
@@ -44,8 +44,11 @@ func (b *exponentialBackoff) Delay(attempt int) time.Duration {
|
||||
}
|
||||
|
||||
if b.withJitter {
|
||||
jitter := time.Duration(rand.Int64N(int64(delay / 2)))
|
||||
delay += jitter
|
||||
// Guard against rand.Int64N panicking on a non-positive argument when
|
||||
// delay is small enough that delay/2 rounds to zero.
|
||||
if half := int64(delay / 2); half > 0 {
|
||||
delay += time.Duration(rand.Int64N(half))
|
||||
}
|
||||
}
|
||||
|
||||
if delay > b.max {
|
||||
|
||||
@@ -1,12 +1,17 @@
|
||||
package retry
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"git.codelab.vc/pkg/httpx/internal/clock"
|
||||
)
|
||||
|
||||
type options struct {
|
||||
maxAttempts int // default 3
|
||||
backoff Backoff // default ExponentialBackoff(100ms, 5s, true)
|
||||
policy Policy // default: defaultPolicy (retry on 5xx and network errors)
|
||||
retryAfter bool // default true, respect Retry-After header
|
||||
maxAttempts int // default 3
|
||||
backoff Backoff // default ExponentialBackoff(100ms, 5s, true)
|
||||
policy Policy // default: defaultPolicy (retry on 5xx and network errors)
|
||||
retryAfter bool // default true, respect Retry-After header
|
||||
clk clock.Clock // time source for backoff delays (real by default)
|
||||
}
|
||||
|
||||
// Option configures the retry transport.
|
||||
@@ -18,6 +23,17 @@ func defaults() options {
|
||||
backoff: ExponentialBackoff(100*time.Millisecond, 5*time.Second, true),
|
||||
policy: defaultPolicy{},
|
||||
retryAfter: true,
|
||||
clk: clock.System(),
|
||||
}
|
||||
}
|
||||
|
||||
// withClock sets the clock used for inter-attempt delays. Unexported; for
|
||||
// deterministic tests.
|
||||
func withClock(c clock.Clock) Option {
|
||||
return func(o *options) {
|
||||
if c != nil {
|
||||
o.clk = c
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -37,18 +37,16 @@ func Transport(opts ...Option) middleware.Middleware {
|
||||
var exhausted bool
|
||||
|
||||
for attempt := range cfg.maxAttempts {
|
||||
// For retries (attempt > 0), restore the request body.
|
||||
if attempt > 0 {
|
||||
if req.GetBody != nil {
|
||||
body, bodyErr := req.GetBody()
|
||||
if bodyErr != nil {
|
||||
return resp, bodyErr
|
||||
}
|
||||
req.Body = body
|
||||
} else if req.Body != nil {
|
||||
// Body was consumed and cannot be re-created.
|
||||
return resp, err
|
||||
// For retries (attempt > 0) the body was consumed by the
|
||||
// previous attempt; restore it via GetBody. The rewindability
|
||||
// check below guarantees GetBody is set whenever we loop with a
|
||||
// non-nil body, so this branch is always safe.
|
||||
if attempt > 0 && req.GetBody != nil {
|
||||
body, bodyErr := req.GetBody()
|
||||
if bodyErr != nil {
|
||||
return nil, bodyErr
|
||||
}
|
||||
req.Body = body
|
||||
}
|
||||
|
||||
resp, err = next.RoundTrip(req)
|
||||
@@ -64,6 +62,13 @@ func Transport(opts ...Option) middleware.Middleware {
|
||||
break
|
||||
}
|
||||
|
||||
// If the body cannot be rewound, a retry would replay with an
|
||||
// empty body. Return the current result as-is instead of
|
||||
// draining it and looping with a corrupted request.
|
||||
if req.Body != nil && req.GetBody == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Compute delay: use backoff or policy delay, whichever is larger.
|
||||
delay := cfg.backoff.Delay(attempt)
|
||||
if policyDelay > delay {
|
||||
@@ -84,12 +89,12 @@ func Transport(opts ...Option) middleware.Middleware {
|
||||
}
|
||||
|
||||
// Wait for the delay or context cancellation.
|
||||
timer := time.NewTimer(delay)
|
||||
timer := cfg.clk.NewTimer(delay)
|
||||
select {
|
||||
case <-req.Context().Done():
|
||||
timer.Stop()
|
||||
return nil, req.Context().Err()
|
||||
case <-timer.C:
|
||||
case <-timer.C():
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,9 +123,9 @@ func (defaultPolicy) ShouldRetry(_ int, req *http.Request, resp *http.Response,
|
||||
|
||||
switch resp.StatusCode {
|
||||
case http.StatusTooManyRequests, // 429
|
||||
http.StatusBadGateway, // 502
|
||||
http.StatusBadGateway, // 502
|
||||
http.StatusServiceUnavailable, // 503
|
||||
http.StatusGatewayTimeout: // 504
|
||||
http.StatusGatewayTimeout: // 504
|
||||
return true, 0
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.codelab.vc/pkg/httpx/internal/clock"
|
||||
"git.codelab.vc/pkg/httpx/middleware"
|
||||
)
|
||||
|
||||
@@ -229,6 +230,83 @@ func TestTransport(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestTransport_BodyNotRewindable verifies that an idempotent request whose
|
||||
// body cannot be replayed (no GetBody) is returned as-is rather than retried
|
||||
// with an empty body or a stale, already-drained response.
|
||||
func TestTransport_BodyNotRewindable(t *testing.T) {
|
||||
var calls atomic.Int32
|
||||
rt := Transport(
|
||||
WithMaxAttempts(3),
|
||||
WithBackoff(ConstantBackoff(time.Millisecond)),
|
||||
)(mockTransport(func(req *http.Request) (*http.Response, error) {
|
||||
calls.Add(1)
|
||||
io.Copy(io.Discard, req.Body) // a real transport consumes the body
|
||||
return statusResponse(http.StatusServiceUnavailable), nil
|
||||
}))
|
||||
|
||||
// PUT is idempotent (the policy would retry a 503), but with GetBody unset
|
||||
// the body cannot be rewound.
|
||||
req, _ := http.NewRequest(http.MethodPut, "http://example.com", strings.NewReader("data"))
|
||||
req.GetBody = nil
|
||||
|
||||
resp, err := rt.RoundTrip(req)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if resp == nil || resp.StatusCode != http.StatusServiceUnavailable {
|
||||
t.Fatalf("expected the original 503 response, got %v", resp)
|
||||
}
|
||||
if got := calls.Load(); got != 1 {
|
||||
t.Fatalf("expected exactly 1 call (no rewind retry), got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestTransport_InjectedClock verifies that backoff delays are driven by the
|
||||
// configured clock, so retries are deterministic without real sleeps.
|
||||
func TestTransport_InjectedClock(t *testing.T) {
|
||||
clk := clock.Mock(time.Now())
|
||||
var calls atomic.Int32
|
||||
rt := Transport(
|
||||
WithMaxAttempts(2),
|
||||
WithBackoff(ConstantBackoff(time.Hour)), // would block forever on a real clock
|
||||
withClock(clk),
|
||||
)(mockTransport(func(req *http.Request) (*http.Response, error) {
|
||||
calls.Add(1)
|
||||
return statusResponse(http.StatusServiceUnavailable), nil
|
||||
}))
|
||||
|
||||
req, _ := http.NewRequest(http.MethodGet, "http://example.com", nil)
|
||||
|
||||
done := make(chan struct{})
|
||||
var resp *http.Response
|
||||
var err error
|
||||
go func() {
|
||||
resp, err = rt.RoundTrip(req)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Drive the backoff via the mock clock. Advancing repeatedly is robust
|
||||
// against the timer being created slightly after the first attempt.
|
||||
for {
|
||||
clk.Advance(time.Hour)
|
||||
select {
|
||||
case <-done:
|
||||
goto finished
|
||||
case <-time.After(time.Millisecond):
|
||||
}
|
||||
}
|
||||
finished:
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if resp.StatusCode != http.StatusServiceUnavailable {
|
||||
t.Fatalf("expected 503, got %d", resp.StatusCode)
|
||||
}
|
||||
if got := calls.Load(); got != 2 {
|
||||
t.Fatalf("expected 2 calls, got %d", got)
|
||||
}
|
||||
}
|
||||
|
||||
// policyFunc adapts a function into a Policy.
|
||||
type policyFunc func(int, *http.Request, *http.Response, error) (bool, time.Duration)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user