Backoff
Retry with exponential backoff for handling transient failures.
Overview
Backoff provides intelligent retry logic with exponentially increasing delays between attempts. Unlike simple Retry which uses fixed delays, Backoff progressively increases wait times to reduce load on failing systems and improve recovery chances.
Function Signature
func NewBackoff[T any](
identity Identity,
processor Chainable[T],
maxAttempts int,
baseDelay time.Duration,
) *Backoff[T]
Type Parameters
T- The data type being processed
Parameters
identity(Identity) - Identifier for debugging and error pathsprocessor(Chainable[T]) - The processor to retry on failuremaxAttempts(int) - Maximum number of retry attempts (minimum 1)baseDelay(time.Duration) - Initial delay between retries
Returns
Returns a *Backoff[T] that implements Chainable[T].
Behavior
- Exponential growth: Each retry doubles the previous delay
- No delay after final attempt: Fails immediately if last attempt fails
- Context aware: Respects cancellation and deadlines
- Error preservation: Maintains complete error context from failures
- Thread-safe: Safe for concurrent use
Retry Backoff Visualization
┌──────────────────────────────────────────────────────────────────┐
│ Exponential Backoff Pattern │
└──────────────────────────────────────────────────────────────────┘
Base Delay: 100ms, Max Attempts: 5
Attempt 1 ──[✗]──→ Wait 100ms ──┐
▼
Attempt 2 ──[✗]──→ Wait 200ms ──┐
▼
Attempt 3 ──[✗]──→ Wait 400ms ──┐
▼
Attempt 4 ──[✗]──→ Wait 800ms ──┐
▼
Attempt 5 ──[✗]──→ Final Failure (no delay)
│
▼
Return Error
Total Time: 100ms + 200ms + 400ms + 800ms = 1.5 seconds
Timeline View:
═════════════
Time: 0ms 100ms 300ms 700ms 1500ms
│ │ │ │ │
Attempt: [1]──►[2]───►[3]───────►[4]────────────►[5]
↑ ↑ ↑ ↑ ↑
Try Retry Retry Retry Final
Success Case (succeeds on attempt 3):
══════════════════════════════════════
Attempt 1 ──[✗]──→ Wait 100ms ──┐
▼
Attempt 2 ──[✗]──→ Wait 200ms ──┐
▼
Attempt 3 ──[✓]──→ Success! Return result
Total Time: 100ms + 200ms = 300ms (plus processing time)
Delay Progression
Given a base delay of 1 second:
- 1st retry: 1 second delay
- 2nd retry: 2 seconds delay
- 3rd retry: 4 seconds delay
- 4th retry: 8 seconds delay
- And so on...
Methods
SetMaxAttempts
Updates the maximum number of retry attempts.
func (b *Backoff[T]) SetMaxAttempts(n int) *Backoff[T]
SetBaseDelay
Updates the base delay duration.
func (b *Backoff[T]) SetBaseDelay(d time.Duration) *Backoff[T]
GetMaxAttempts
Returns the current maximum attempts setting.
func (b *Backoff[T]) GetMaxAttempts() int
GetBaseDelay
Returns the current base delay setting.
func (b *Backoff[T]) GetBaseDelay() time.Duration
Identity
Returns the identity of this connector.
func (b *Backoff[T]) Identity() Identity
Schema
Returns the schema representation of this connector.
func (b *Backoff[T]) Schema() Node
Basic Usage
// Define identities upfront
var (
APIRetryID = pipz.NewIdentity("api-retry", "Retry external API calls with exponential backoff")
CallAPIID = pipz.NewIdentity("call-api", "Call external API")
)
// Retry API calls with exponential backoff
apiCall := pipz.NewBackoff(
APIRetryID,
pipz.Apply(CallAPIID, func(ctx context.Context, req Request) (Response, error) {
return externalAPI.Call(ctx, req)
}),
5, // Max 5 attempts
100*time.Millisecond, // Start with 100ms delay
)
// Delays will be: 100ms, 200ms, 400ms, 800ms
Common Patterns
Network Request Handling
// Define identities upfront
var (
HTTPBackoffID = pipz.NewIdentity("http-backoff", "HTTP client with exponential backoff for server errors")
HTTPRequestID = pipz.NewIdentity("http-request", "Execute HTTP request")
)
// Robust HTTP client with backoff
httpClient := pipz.NewBackoff(
HTTPBackoffID,
pipz.Apply(HTTPRequestID, func(ctx context.Context, req HTTPRequest) (HTTPResponse, error) {
resp, err := client.Do(req.ToHTTP())
if err != nil {
return HTTPResponse{}, err
}
// Retry on 5xx errors
if resp.StatusCode >= 500 {
return HTTPResponse{}, fmt.Errorf("server error: %d", resp.StatusCode)
}
return parseResponse(resp)
}),
4, // 4 attempts total
500*time.Millisecond, // Start with 500ms
)
// Total possible delay: 500ms + 1s + 2s = 3.5s
Database Operations
// Define identities upfront
var (
DBRetryID = pipz.NewIdentity("db-retry", "Retry database operations with backoff for deadlocks")
UpdateID = pipz.NewIdentity("update", "Update database record")
)
// Database operations with backoff for lock contention
dbOperation := pipz.NewBackoff(
DBRetryID,
pipz.Apply(UpdateID, func(ctx context.Context, data Record) (Record, error) {
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return data, err
}
defer tx.Rollback()
// Perform operations
if err := updateRecord(tx, data); err != nil {
return data, err
}
return data, tx.Commit()
}),
3, // 3 attempts for deadlocks
50*time.Millisecond, // Short initial delay
)
Message Queue Processing
// Define identities upfront
var (
MessageRetryID = pipz.NewIdentity("message-retry", "Retry message processing with exponential backoff")
ProcessMessageID = pipz.NewIdentity("process-message", "Parse, validate, and send message")
SendID = pipz.NewIdentity("send", "Send message to queue")
)
// Retry message processing with increasing delays
messageProcessor := pipz.NewBackoff(
MessageRetryID,
pipz.NewSequence(
ProcessMessageID,
parseMessage,
validateMessage,
pipz.Apply(SendID, func(ctx context.Context, msg Message) (Message, error) {
return queue.Send(ctx, msg)
}),
),
6, // More attempts for async operations
1*time.Second, // Start with 1 second
)
// Max total delay: 1s + 2s + 4s + 8s + 16s = 31s
Combined with Circuit Breaker
// Define identities upfront
var (
CircuitID = pipz.NewIdentity("circuit", "Circuit breaker for external service")
BackoffID = pipz.NewIdentity("backoff", "Exponential backoff for service calls")
)
// Backoff with circuit breaker for external services
resilientService := pipz.NewCircuitBreaker(
CircuitID,
pipz.NewBackoff(
BackoffID,
externalServiceCall,
3, // Limited attempts before circuit opens
200*time.Millisecond,
),
10, // Failure threshold
5*time.Minute, // Recovery timeout
)
Configuration Patterns
Dynamic Configuration
// Define identity upfront
var DynamicID = pipz.NewIdentity("dynamic", "Dynamically configured backoff")
backoff := pipz.NewBackoff(
DynamicID,
processor,
3,
1*time.Second,
)
// Adjust based on load
if highLoad() {
backoff.SetMaxAttempts(5).SetBaseDelay(2*time.Second)
} else {
backoff.SetMaxAttempts(3).SetBaseDelay(500*time.Millisecond)
}
Environment-Based Configuration
// Define identities upfront
var (
ProdBackoffID = pipz.NewIdentity("prod-backoff", "Production backoff configuration")
StageBackoffID = pipz.NewIdentity("stage-backoff", "Staging backoff configuration")
DevBackoffID = pipz.NewIdentity("dev-backoff", "Development backoff configuration")
)
func createBackoff(env string) *Backoff[Data] {
switch env {
case "production":
return pipz.NewBackoff(
ProdBackoffID,
processor, 5, 1*time.Second)
case "staging":
return pipz.NewBackoff(
StageBackoffID,
processor, 3, 500*time.Millisecond)
default:
return pipz.NewBackoff(
DevBackoffID,
processor, 2, 100*time.Millisecond)
}
}
Error Handling
Backoff preserves complete error context:
result, err := backoff.Process(ctx, data)
if err != nil {
var pipeErr *pipz.Error[Data]
if errors.As(err, &pipeErr) {
// Error path includes backoff name
fmt.Printf("Failed after %d attempts at: %v\n",
backoff.GetMaxAttempts(), pipeErr.Path)
// Check if it was a timeout during backoff
if pipeErr.IsTimeout() {
log.Warn("Backoff interrupted by timeout")
}
}
}
Comparison with Retry
Backoff vs Retry
| Feature | Backoff | Retry |
|---|---|---|
| Delay pattern | Exponential (1s, 2s, 4s...) | Fixed (1s, 1s, 1s...) |
| Use case | Transient failures, overload | Quick failures, network blips |
| Total time | Grows exponentially | Grows linearly |
| System load | Reduces pressure over time | Constant pressure |
// Define identities upfront
var (
ExponentialID = pipz.NewIdentity("exponential", "Exponential backoff for overloaded systems")
FixedID = pipz.NewIdentity("fixed", "Fixed delay retry for brief interruptions")
)
// Backoff: Good for overloaded systems
backoff := pipz.NewBackoff(
ExponentialID,
processor, 4, 1*time.Second)
// Delays: 1s, 2s, 4s (total: 7s)
// Retry: Good for brief interruptions
retry := pipz.NewRetry(
FixedID,
processor, 4, 1*time.Second)
// Delays: 1s, 1s, 1s (total: 3s)
Performance Characteristics
- Memory: O(1) - No additional allocations per retry
- Goroutines: No additional goroutines created
- Overhead: ~10ns + sleep time per retry
- Context checks: Performed before each retry
Gotchas
❌ Don't use tiny base delays
// Define identity upfront
var TooFastID = pipz.NewIdentity("too-fast", "Backoff with microsecond delays")
// WRONG - Delays too small to be meaningful
backoff := pipz.NewBackoff(
TooFastID,
processor, 5, 1*time.Microsecond)
// Results in: 1μs, 2μs, 4μs, 8μs - essentially no delay
✅ Use meaningful base delays
// Define identity upfront
var ReasonableID = pipz.NewIdentity("reasonable", "Backoff with reasonable delays for system recovery")
// RIGHT - Delays that allow recovery
backoff := pipz.NewBackoff(
ReasonableID,
processor, 5, 100*time.Millisecond)
// Results in: 100ms, 200ms, 400ms, 800ms - gives system time to recover
❌ Don't use too many attempts
// Define identity upfront
var ExcessiveID = pipz.NewIdentity("excessive", "Backoff with too many attempts")
// WRONG - Could wait extremely long
backoff := pipz.NewBackoff(
ExcessiveID,
processor, 10, 1*time.Second)
// Final delay would be 512 seconds (8.5 minutes)!
✅ Balance attempts with total delay
// Define identity upfront
var BalancedID = pipz.NewIdentity("balanced", "Backoff with balanced attempts and delays")
// RIGHT - Reasonable total delay
backoff := pipz.NewBackoff(
BalancedID,
processor, 5, 500*time.Millisecond)
// Max total: 500ms + 1s + 2s + 4s = 7.5s
❌ Don't ignore context cancellation
// WRONG - Not checking context
for i := 0; i < maxAttempts; i++ {
result, err := process(data)
if err == nil {
return result, nil
}
time.Sleep(delay) // Ignores context!
delay *= 2
}
✅ Respect context cancellation
// Define identity upfront
var ContextAwareID = pipz.NewIdentity("context-aware", "Context-aware backoff")
// RIGHT - Backoff handles this automatically
backoff := pipz.NewBackoff(
ContextAwareID,
processor, 5, 1*time.Second)
// Automatically stops on context cancellation
Best Practices
- Start with small delays - Begin with 50-500ms for most operations
- Limit max attempts - Usually 3-5 attempts is sufficient
- Calculate total time - Ensure max total delay is acceptable
- Match delay to failure type - Network: 100ms+, Database: 50ms+, API: 500ms+
- Monitor retry metrics - Track retry rates and success rates
- Consider circuit breakers - Combine with circuit breakers for system protection
- Log retry attempts - Include attempt number in logs for debugging
Testing
// Define identities upfront
var (
FlakyID = pipz.NewIdentity("flaky", "Flaky processor for testing")
TestID = pipz.NewIdentity("test", "Test backoff processor")
SlowID = pipz.NewIdentity("slow", "Always-failing processor")
TimeoutTestID = pipz.NewIdentity("timeout-test", "Backoff with timeout test")
)
func TestBackoff(t *testing.T) {
attempts := 0
processor := pipz.Apply(
FlakyID,
func(ctx context.Context, n int) (int, error) {
attempts++
if attempts < 3 {
return 0, errors.New("transient error")
}
return n * 2, nil
})
backoff := pipz.NewBackoff(
TestID,
processor, 5, 10*time.Millisecond)
start := time.Now()
result, err := backoff.Process(context.Background(), 5)
duration := time.Since(start)
assert.NoError(t, err)
assert.Equal(t, 10, result)
assert.Equal(t, 3, attempts)
// Verify exponential delays (10ms + 20ms = 30ms minimum)
assert.GreaterOrEqual(t, duration, 30*time.Millisecond)
}
func TestBackoffTimeout(t *testing.T) {
processor := pipz.Apply(
SlowID,
func(ctx context.Context, n int) (int, error) {
return 0, errors.New("always fails")
})
backoff := pipz.NewBackoff(
TimeoutTestID,
processor, 10, 100*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
defer cancel()
_, err := backoff.Process(ctx, 5)
var pipeErr *pipz.Error[int]
require.Error(t, err)
require.True(t, errors.As(err, &pipeErr))
assert.True(t, pipeErr.IsTimeout())
}
See Also
- Retry - Fixed-delay retry pattern
- CircuitBreaker - Prevent cascading failures
- Timeout - Bound operation time
- Fallback - Alternative processing on failure