zoobzio December 12, 2025 Edit this page

Backoff

Retry with exponential backoff for handling transient failures.

Overview

Backoff provides intelligent retry logic with exponentially increasing delays between attempts. Unlike simple Retry which uses fixed delays, Backoff progressively increases wait times to reduce load on failing systems and improve recovery chances.

Function Signature

func NewBackoff[T any](
    identity Identity,
    processor Chainable[T],
    maxAttempts int,
    baseDelay time.Duration,
) *Backoff[T]

Type Parameters

  • T - The data type being processed

Parameters

  • identity (Identity) - Identifier for debugging and error paths
  • processor (Chainable[T]) - The processor to retry on failure
  • maxAttempts (int) - Maximum number of retry attempts (minimum 1)
  • baseDelay (time.Duration) - Initial delay between retries

Returns

Returns a *Backoff[T] that implements Chainable[T].

Behavior

  • Exponential growth: Each retry doubles the previous delay
  • No delay after final attempt: Fails immediately if last attempt fails
  • Context aware: Respects cancellation and deadlines
  • Error preservation: Maintains complete error context from failures
  • Thread-safe: Safe for concurrent use

Retry Backoff Visualization

┌──────────────────────────────────────────────────────────────────┐
│                    Exponential Backoff Pattern                   │
└──────────────────────────────────────────────────────────────────┘

Base Delay: 100ms, Max Attempts: 5

Attempt 1 ──[✗]──→ Wait 100ms ──┐
                                 ▼
Attempt 2 ──[✗]──→ Wait 200ms ──┐
                                 ▼
Attempt 3 ──[✗]──→ Wait 400ms ──┐
                                 ▼
Attempt 4 ──[✗]──→ Wait 800ms ──┐
                                 ▼
Attempt 5 ──[✗]──→ Final Failure (no delay)
                         │
                         ▼
                   Return Error

Total Time: 100ms + 200ms + 400ms + 800ms = 1.5 seconds

Timeline View:
═════════════
Time:    0ms   100ms  300ms      700ms           1500ms
         │      │      │          │               │
Attempt: [1]──►[2]───►[3]───────►[4]────────────►[5]
         ↑      ↑      ↑          ↑               ↑
         Try   Retry  Retry     Retry          Final

Success Case (succeeds on attempt 3):
══════════════════════════════════════
Attempt 1 ──[✗]──→ Wait 100ms ──┐
                                 ▼
Attempt 2 ──[✗]──→ Wait 200ms ──┐
                                 ▼
Attempt 3 ──[✓]──→ Success! Return result

Total Time: 100ms + 200ms = 300ms (plus processing time)

Delay Progression

Given a base delay of 1 second:

  • 1st retry: 1 second delay
  • 2nd retry: 2 seconds delay
  • 3rd retry: 4 seconds delay
  • 4th retry: 8 seconds delay
  • And so on...

Methods

SetMaxAttempts

Updates the maximum number of retry attempts.

func (b *Backoff[T]) SetMaxAttempts(n int) *Backoff[T]

SetBaseDelay

Updates the base delay duration.

func (b *Backoff[T]) SetBaseDelay(d time.Duration) *Backoff[T]

GetMaxAttempts

Returns the current maximum attempts setting.

func (b *Backoff[T]) GetMaxAttempts() int

GetBaseDelay

Returns the current base delay setting.

func (b *Backoff[T]) GetBaseDelay() time.Duration

Identity

Returns the identity of this connector.

func (b *Backoff[T]) Identity() Identity

Schema

Returns the schema representation of this connector.

func (b *Backoff[T]) Schema() Node

Basic Usage

// Define identities upfront
var (
    APIRetryID = pipz.NewIdentity("api-retry", "Retry external API calls with exponential backoff")
    CallAPIID  = pipz.NewIdentity("call-api", "Call external API")
)

// Retry API calls with exponential backoff
apiCall := pipz.NewBackoff(
    APIRetryID,
    pipz.Apply(CallAPIID, func(ctx context.Context, req Request) (Response, error) {
        return externalAPI.Call(ctx, req)
    }),
    5,                    // Max 5 attempts
    100*time.Millisecond, // Start with 100ms delay
)

// Delays will be: 100ms, 200ms, 400ms, 800ms

Common Patterns

Network Request Handling

// Define identities upfront
var (
    HTTPBackoffID = pipz.NewIdentity("http-backoff", "HTTP client with exponential backoff for server errors")
    HTTPRequestID = pipz.NewIdentity("http-request", "Execute HTTP request")
)

// Robust HTTP client with backoff
httpClient := pipz.NewBackoff(
    HTTPBackoffID,
    pipz.Apply(HTTPRequestID, func(ctx context.Context, req HTTPRequest) (HTTPResponse, error) {
        resp, err := client.Do(req.ToHTTP())
        if err != nil {
            return HTTPResponse{}, err
        }

        // Retry on 5xx errors
        if resp.StatusCode >= 500 {
            return HTTPResponse{}, fmt.Errorf("server error: %d", resp.StatusCode)
        }

        return parseResponse(resp)
    }),
    4,                   // 4 attempts total
    500*time.Millisecond, // Start with 500ms
)
// Total possible delay: 500ms + 1s + 2s = 3.5s

Database Operations

// Define identities upfront
var (
    DBRetryID = pipz.NewIdentity("db-retry", "Retry database operations with backoff for deadlocks")
    UpdateID  = pipz.NewIdentity("update", "Update database record")
)

// Database operations with backoff for lock contention
dbOperation := pipz.NewBackoff(
    DBRetryID,
    pipz.Apply(UpdateID, func(ctx context.Context, data Record) (Record, error) {
        tx, err := db.BeginTx(ctx, nil)
        if err != nil {
            return data, err
        }
        defer tx.Rollback()

        // Perform operations
        if err := updateRecord(tx, data); err != nil {
            return data, err
        }

        return data, tx.Commit()
    }),
    3,                  // 3 attempts for deadlocks
    50*time.Millisecond, // Short initial delay
)

Message Queue Processing

// Define identities upfront
var (
    MessageRetryID  = pipz.NewIdentity("message-retry", "Retry message processing with exponential backoff")
    ProcessMessageID = pipz.NewIdentity("process-message", "Parse, validate, and send message")
    SendID          = pipz.NewIdentity("send", "Send message to queue")
)

// Retry message processing with increasing delays
messageProcessor := pipz.NewBackoff(
    MessageRetryID,
    pipz.NewSequence(
        ProcessMessageID,
        parseMessage,
        validateMessage,
        pipz.Apply(SendID, func(ctx context.Context, msg Message) (Message, error) {
            return queue.Send(ctx, msg)
        }),
    ),
    6,              // More attempts for async operations
    1*time.Second,  // Start with 1 second
)
// Max total delay: 1s + 2s + 4s + 8s + 16s = 31s

Combined with Circuit Breaker

// Define identities upfront
var (
    CircuitID = pipz.NewIdentity("circuit", "Circuit breaker for external service")
    BackoffID = pipz.NewIdentity("backoff", "Exponential backoff for service calls")
)

// Backoff with circuit breaker for external services
resilientService := pipz.NewCircuitBreaker(
    CircuitID,
    pipz.NewBackoff(
        BackoffID,
        externalServiceCall,
        3,                    // Limited attempts before circuit opens
        200*time.Millisecond,
    ),
    10,  // Failure threshold
    5*time.Minute, // Recovery timeout
)

Configuration Patterns

Dynamic Configuration

// Define identity upfront
var DynamicID = pipz.NewIdentity("dynamic", "Dynamically configured backoff")

backoff := pipz.NewBackoff(
    DynamicID,
    processor,
    3,
    1*time.Second,
)

// Adjust based on load
if highLoad() {
    backoff.SetMaxAttempts(5).SetBaseDelay(2*time.Second)
} else {
    backoff.SetMaxAttempts(3).SetBaseDelay(500*time.Millisecond)
}

Environment-Based Configuration

// Define identities upfront
var (
    ProdBackoffID  = pipz.NewIdentity("prod-backoff", "Production backoff configuration")
    StageBackoffID = pipz.NewIdentity("stage-backoff", "Staging backoff configuration")
    DevBackoffID   = pipz.NewIdentity("dev-backoff", "Development backoff configuration")
)

func createBackoff(env string) *Backoff[Data] {
    switch env {
    case "production":
        return pipz.NewBackoff(
            ProdBackoffID,
            processor, 5, 1*time.Second)
    case "staging":
        return pipz.NewBackoff(
            StageBackoffID,
            processor, 3, 500*time.Millisecond)
    default:
        return pipz.NewBackoff(
            DevBackoffID,
            processor, 2, 100*time.Millisecond)
    }
}

Error Handling

Backoff preserves complete error context:

result, err := backoff.Process(ctx, data)
if err != nil {
    var pipeErr *pipz.Error[Data]
    if errors.As(err, &pipeErr) {
        // Error path includes backoff name
        fmt.Printf("Failed after %d attempts at: %v\n", 
            backoff.GetMaxAttempts(), pipeErr.Path)
        
        // Check if it was a timeout during backoff
        if pipeErr.IsTimeout() {
            log.Warn("Backoff interrupted by timeout")
        }
    }
}

Comparison with Retry

Backoff vs Retry

FeatureBackoffRetry
Delay patternExponential (1s, 2s, 4s...)Fixed (1s, 1s, 1s...)
Use caseTransient failures, overloadQuick failures, network blips
Total timeGrows exponentiallyGrows linearly
System loadReduces pressure over timeConstant pressure
// Define identities upfront
var (
    ExponentialID = pipz.NewIdentity("exponential", "Exponential backoff for overloaded systems")
    FixedID       = pipz.NewIdentity("fixed", "Fixed delay retry for brief interruptions")
)

// Backoff: Good for overloaded systems
backoff := pipz.NewBackoff(
    ExponentialID,
    processor, 4, 1*time.Second)
// Delays: 1s, 2s, 4s (total: 7s)

// Retry: Good for brief interruptions
retry := pipz.NewRetry(
    FixedID,
    processor, 4, 1*time.Second)
// Delays: 1s, 1s, 1s (total: 3s)

Performance Characteristics

  • Memory: O(1) - No additional allocations per retry
  • Goroutines: No additional goroutines created
  • Overhead: ~10ns + sleep time per retry
  • Context checks: Performed before each retry

Gotchas

❌ Don't use tiny base delays

// Define identity upfront
var TooFastID = pipz.NewIdentity("too-fast", "Backoff with microsecond delays")

// WRONG - Delays too small to be meaningful
backoff := pipz.NewBackoff(
    TooFastID,
    processor, 5, 1*time.Microsecond)
// Results in: 1μs, 2μs, 4μs, 8μs - essentially no delay

✅ Use meaningful base delays

// Define identity upfront
var ReasonableID = pipz.NewIdentity("reasonable", "Backoff with reasonable delays for system recovery")

// RIGHT - Delays that allow recovery
backoff := pipz.NewBackoff(
    ReasonableID,
    processor, 5, 100*time.Millisecond)
// Results in: 100ms, 200ms, 400ms, 800ms - gives system time to recover

❌ Don't use too many attempts

// Define identity upfront
var ExcessiveID = pipz.NewIdentity("excessive", "Backoff with too many attempts")

// WRONG - Could wait extremely long
backoff := pipz.NewBackoff(
    ExcessiveID,
    processor, 10, 1*time.Second)
// Final delay would be 512 seconds (8.5 minutes)!

✅ Balance attempts with total delay

// Define identity upfront
var BalancedID = pipz.NewIdentity("balanced", "Backoff with balanced attempts and delays")

// RIGHT - Reasonable total delay
backoff := pipz.NewBackoff(
    BalancedID,
    processor, 5, 500*time.Millisecond)
// Max total: 500ms + 1s + 2s + 4s = 7.5s

❌ Don't ignore context cancellation

// WRONG - Not checking context
for i := 0; i < maxAttempts; i++ {
    result, err := process(data)
    if err == nil {
        return result, nil
    }
    time.Sleep(delay) // Ignores context!
    delay *= 2
}

✅ Respect context cancellation

// Define identity upfront
var ContextAwareID = pipz.NewIdentity("context-aware", "Context-aware backoff")

// RIGHT - Backoff handles this automatically
backoff := pipz.NewBackoff(
    ContextAwareID,
    processor, 5, 1*time.Second)
// Automatically stops on context cancellation

Best Practices

  1. Start with small delays - Begin with 50-500ms for most operations
  2. Limit max attempts - Usually 3-5 attempts is sufficient
  3. Calculate total time - Ensure max total delay is acceptable
  4. Match delay to failure type - Network: 100ms+, Database: 50ms+, API: 500ms+
  5. Monitor retry metrics - Track retry rates and success rates
  6. Consider circuit breakers - Combine with circuit breakers for system protection
  7. Log retry attempts - Include attempt number in logs for debugging

Testing

// Define identities upfront
var (
    FlakyID       = pipz.NewIdentity("flaky", "Flaky processor for testing")
    TestID        = pipz.NewIdentity("test", "Test backoff processor")
    SlowID        = pipz.NewIdentity("slow", "Always-failing processor")
    TimeoutTestID = pipz.NewIdentity("timeout-test", "Backoff with timeout test")
)

func TestBackoff(t *testing.T) {
    attempts := 0
    processor := pipz.Apply(
        FlakyID,
        func(ctx context.Context, n int) (int, error) {
            attempts++
            if attempts < 3 {
                return 0, errors.New("transient error")
            }
            return n * 2, nil
        })

    backoff := pipz.NewBackoff(
        TestID,
        processor, 5, 10*time.Millisecond)

    start := time.Now()
    result, err := backoff.Process(context.Background(), 5)
    duration := time.Since(start)

    assert.NoError(t, err)
    assert.Equal(t, 10, result)
    assert.Equal(t, 3, attempts)

    // Verify exponential delays (10ms + 20ms = 30ms minimum)
    assert.GreaterOrEqual(t, duration, 30*time.Millisecond)
}

func TestBackoffTimeout(t *testing.T) {
    processor := pipz.Apply(
        SlowID,
        func(ctx context.Context, n int) (int, error) {
            return 0, errors.New("always fails")
        })

    backoff := pipz.NewBackoff(
        TimeoutTestID,
        processor, 10, 100*time.Millisecond)

    ctx, cancel := context.WithTimeout(context.Background(), 250*time.Millisecond)
    defer cancel()

    _, err := backoff.Process(ctx, 5)

    var pipeErr *pipz.Error[int]
    require.Error(t, err)
    require.True(t, errors.As(err, &pipeErr))
    assert.True(t, pipeErr.IsTimeout())
}

See Also