zoobzio December 12, 2025 Edit this page

Timeout

Enforces a time limit on processor execution.

Function Signature

func NewTimeout[T any](
    identity Identity,
    processor Chainable[T],
    duration time.Duration,
) *Timeout[T]

Parameters

  • identity (Identity) - Identifier with name and description for the connector used in debugging and observability
  • processor - The processor to time-bound
  • duration - Maximum allowed execution time

Returns

Returns a *Timeout[T] that implements Chainable[T].

Testing Configuration

WithClock

func (t *Timeout[T]) WithClock(clock clockz.Clock) *Timeout[T]

Sets a custom clock implementation for testing purposes. This method enables controlled time manipulation in tests using clockz.FakeClock.

Parameters:

  • clock (clockz.Clock) - Clock implementation to use

Returns: Returns the same connector instance for method chaining.

Example:

// Use fake clock in tests
fakeClock := clockz.NewFakeClock()
timeout := pipz.NewTimeout(
    pipz.NewIdentity("test", "Test timeout with fake clock for controlled time advancement"),
    processor,
    5*time.Second,
).WithClock(fakeClock)

// Advance time in test
fakeClock.Advance(6 * time.Second)

Behavior

  • Time enforcement - Cancels operation after duration
  • Context timeout - Creates a timeout context for the processor
  • Timeout errors - Returns timeout error with timing information
  • Clean cancellation - Processor should respect context cancellation
  • Error enrichment - Timeout errors include duration info

Example

// Define identities upfront
var (
    APITimeoutID     = pipz.NewIdentity("api-timeout", "Enforces 5 second timeout on external API calls")
    APICallID        = pipz.NewIdentity("api-call", "Calls external API")
    BoundedPipelineID = pipz.NewIdentity("bounded-pipeline", "Ensures entire data processing pipeline completes within 30 seconds")
    ProcessingID     = pipz.NewIdentity("processing", "Data processing sequence")
    FetchID          = pipz.NewIdentity("fetch", "Fetches data")
    TransformID      = pipz.NewIdentity("transform", "Transforms data")
    SaveID           = pipz.NewIdentity("save", "Saves data")
    GraduatedID      = pipz.NewIdentity("graduated", "Graduated timeout fallback")
    FastID           = pipz.NewIdentity("fast", "Primary service with strict 1 second timeout")
    SlowID           = pipz.NewIdentity("slow", "Backup service with generous 10 second timeout")
    ReliableID       = pipz.NewIdentity("reliable", "Reliable retry with timeout")
    BoundedOpID      = pipz.NewIdentity("bounded-op", "Each retry attempt limited to 5 seconds for slow operation")
)

// Basic timeout
fastAPI := pipz.NewTimeout(APITimeoutID,
    pipz.Apply(APICallID, callExternalAPI),
    5*time.Second,
)

// Timeout on complex pipeline
timeBounded := pipz.NewTimeout(BoundedPipelineID,
    pipz.NewSequence[Data](ProcessingID,
        pipz.Apply(FetchID, fetchData),
        pipz.Apply(TransformID, transformData),
        pipz.Apply(SaveID, saveData),
    ),
    30*time.Second, // Entire pipeline must complete in 30s
)

// Graduated timeouts
resilient := pipz.NewFallback(GraduatedID,
    pipz.NewTimeout(FastID,
        primaryService,
        1*time.Second,
    ),
    pipz.NewTimeout(SlowID,
        backupService,
        10*time.Second,
    ),
)

// Timeout with retry
reliableButSlow := pipz.NewRetry(ReliableID,
    pipz.NewTimeout(BoundedOpID,
        slowOperation,
        5*time.Second,
    ),
    3, // Retry up to 3 times, each with 5s timeout
)

When to Use

Use Timeout when:

  • Calling external services (APIs, databases)
  • Operations might hang indefinitely
  • SLAs must be enforced
  • Protecting against slow operations
  • Resource usage needs bounds
  • User-facing operations need responsiveness
  • Preventing cascading delays

When NOT to Use

Don't use Timeout when:

  • Operations are always fast (unnecessary overhead)
  • Cancellation might corrupt data (transactions)
  • Exact completion is required (financial processing)
  • Timeout would leave inconsistent state
  • Operations can't be cancelled (use monitoring instead)

Error Details

Timeout errors are marked specially:

timeout := pipz.NewTimeout(
    pipz.NewIdentity("slow-op", "Timeout for potentially slow processor with 2 second limit"),
    slowProcessor,
    2*time.Second,
)
_, err := timeout.Process(ctx, data)

if err != nil {
    var pipeErr *pipz.Error[Data]
    if errors.As(err, &pipeErr) {
        if pipeErr.Timeout {
            fmt.Printf("Operation timed out after %v", pipeErr.Duration)
            // Duration will be ~2 seconds
        }
    }
}

Context Cancellation

Processors must respect context cancellation:

// Define identities upfront
var (
    GoodProcessorID = pipz.NewIdentity("good", "Context-aware processor")
    BadProcessorID  = pipz.NewIdentity("bad", "Context-ignoring processor")
)

// GOOD: Checks context
goodProcessor := pipz.Apply(GoodProcessorID, func(ctx context.Context, data Data) (Data, error) {
    for i := 0; i < 100; i++ {
        select {
        case <-ctx.Done():
            return data, ctx.Err() // Respects timeout
        default:
            // Do work
            if err := processChunk(data, i); err != nil {
                return data, err
            }
        }
    }
    return data, nil
})

// BAD: Ignores context
badProcessor := pipz.Apply(BadProcessorID, func(ctx context.Context, data Data) (Data, error) {
    // This will continue even after timeout!
    time.Sleep(10 * time.Second)
    return data, nil
})

Common Patterns

// Tiered timeout strategy
tieredService := pipz.NewSwitch[Request](
    pipz.NewIdentity("tier-router", "Routes requests by tier with appropriate timeout limits"),
    func(ctx context.Context, req Request) string {
        return req.Tier
    },
).
AddRoute("premium", pipz.NewTimeout(
    pipz.NewIdentity("premium", "Premium tier processing with 10 second timeout"),
    processor,
    10*time.Second,
)).
AddRoute("standard", pipz.NewTimeout(
    pipz.NewIdentity("standard", "Standard tier processing with 5 second timeout"),
    processor,
    5*time.Second,
)).
AddRoute("free", pipz.NewTimeout(
    pipz.NewIdentity("free", "Free tier processing with 1 second timeout"),
    processor,
    1*time.Second,
))

// Define identities for monitoring and dynamic patterns
var (
    MonitoredID  = pipz.NewIdentity("monitored", "Monitored timeout handler")
    OperationID  = pipz.NewIdentity("operation", "Monitored operation with 3 second timeout")
    TrackID      = pipz.NewIdentity("track", "Tracks timeout metrics")
    DynamicID    = pipz.NewIdentity("dynamic", "Dynamic timeout based on priority")
    BoundedID    = pipz.NewIdentity("bounded", "Dynamically timed operation")
)

// Timeout with monitoring
monitoredTimeout := pipz.NewHandle(MonitoredID,
    pipz.NewTimeout(OperationID,
        processor,
        3*time.Second,
    ),
    pipz.Effect(TrackID, func(ctx context.Context, err *pipz.Error[Data]) error {
        if err.Timeout {
            metrics.Increment("timeouts", "operation", "myop")
            if err.Duration > 2*time.Second {
                log.Printf("Near timeout: %v", err.Duration)
            }
        }
        return nil
    }),
)

// Dynamic timeout based on context
dynamicTimeout := pipz.Apply(DynamicID, func(ctx context.Context, req Request) (Request, error) {
    timeout := 5 * time.Second // default
    if req.Priority == "high" {
        timeout = 30 * time.Second
    }

    return pipz.NewTimeout(BoundedID,
        processor,
        timeout,
    ).Process(ctx, req)
})

Advanced Patterns

// Percentage-based timeout (P95)
adaptiveTimeout := pipz.NewTimeout(
    pipz.NewIdentity("adaptive", "Adaptive timeout based on P95 historical performance metrics"),
    processor,
    calculateP95Timeout(), // Based on historical data
)

// Define identities for advanced patterns
var (
    GracefulID     = pipz.NewIdentity("graceful", "Graceful shutdown with partial results")
    OuterID        = pipz.NewIdentity("outer", "Outer timeout for multi-step sequence")
    InnerSequenceID = pipz.NewIdentity("inner-sequence", "Inner sequence with step timeouts")
    Step1ID        = pipz.NewIdentity("step1", "First step with 5 second timeout")
    Step2ID        = pipz.NewIdentity("step2", "Second step with 5 second timeout")
    Step3ID        = pipz.NewIdentity("step3", "Third step with 5 second timeout")
)

// Timeout with graceful shutdown
gracefulTimeout := pipz.Apply(GracefulID, func(ctx context.Context, batch Batch) (Batch, error) {
    ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()

    results := make([]Result, 0, len(batch.Items))
    for _, item := range batch.Items {
        select {
        case <-ctx.Done():
            // Save partial results
            batch.Partial = true
            batch.Results = results
            return batch, fmt.Errorf("timeout after %d items", len(results))
        default:
            result, err := processItem(ctx, item)
            if err != nil {
                return batch, err
            }
            results = append(results, result)
        }
    }

    batch.Results = results
    return batch, nil
})

// Cascading timeouts
cascading := pipz.NewTimeout(OuterID,
    pipz.NewSequence[Data](InnerSequenceID,
        pipz.NewTimeout(Step1ID,
            step1,
            5*time.Second,
        ),
        pipz.NewTimeout(Step2ID,
            step2,
            5*time.Second,
        ),
        pipz.NewTimeout(Step3ID,
            step3,
            5*time.Second,
        ),
    ),
    12*time.Second, // Total timeout less than sum of parts
)

Gotchas

❌ Don't use timeouts that are too short

// WRONG - Timeout shorter than average response time
timeout := pipz.NewTimeout(
    pipz.NewIdentity("too-short", "Unrealistic timeout that will fail most normal requests"),
    apiCall, // Takes 2-3 seconds normally
    1*time.Second, // Will almost always timeout!
)

✅ Base timeouts on actual performance

// RIGHT - Realistic timeout
timeout := pipz.NewTimeout(
    pipz.NewIdentity("realistic", "Realistic timeout based on P99 response time measurements"),
    apiCall,
    5*time.Second, // P99 response time
)

❌ Don't ignore context in processors

// WRONG - Ignores timeout!
processor := pipz.Apply("bad", func(ctx context.Context, data Data) (Data, error) {
    time.Sleep(10 * time.Second) // Ignores context!
    return data, nil
})

✅ Check context during long operations

// RIGHT - Respects timeout
processor := pipz.Apply("good", func(ctx context.Context, data Data) (Data, error) {
    for i := 0; i < 100; i++ {
        select {
        case <-ctx.Done():
            return data, ctx.Err()
        default:
            processChunk(data, i)
        }
    }
    return data, nil
})

❌ Don't nest timeouts incorrectly

// WRONG - Inner timeout longer than outer!
timeout := pipz.NewTimeout(
    pipz.NewIdentity("outer", "Outer timeout shorter than inner timeout - misconfigured"),
    pipz.NewTimeout(
        pipz.NewIdentity("inner", "Inner timeout that will never complete fully"),
        processor,
        10*time.Second,
    ), // Longer!
    5*time.Second, // Shorter - inner never gets full time
)

✅ Make outer timeouts longer than sum of inner

// Define identities upfront
var (
    OuterTimeoutID = pipz.NewIdentity("outer", "Outer timeout with buffer")
    StepsID        = pipz.NewIdentity("steps", "Sequential steps")
    StepOneID      = pipz.NewIdentity("step1", "First step with 5 second timeout")
    StepTwoID      = pipz.NewIdentity("step2", "Second step with 5 second timeout")
)

// RIGHT - Outer accommodates inner timeouts
timeout := pipz.NewTimeout(OuterTimeoutID,
    pipz.NewSequence(StepsID,
        pipz.NewTimeout(StepOneID,
            step1,
            5*time.Second,
        ),
        pipz.NewTimeout(StepTwoID,
            step2,
            5*time.Second,
        ),
    ),
    12*time.Second, // Allows both to complete with buffer
)

Best Practices

// Set realistic timeouts
// GOOD: Based on actual performance
goodTimeout := pipz.NewTimeout(
    pipz.NewIdentity("realistic", "Statistically-based timeout using mean plus standard deviation"),
    apiCall,
    2*averageResponseTime + standardDeviation,
)

// BAD: Arbitrary timeout
badTimeout := pipz.NewTimeout(
    pipz.NewIdentity("arbitrary", "Arbitrary timeout without performance justification"),
    apiCall,
    1*time.Minute, // Why 1 minute?
)

// Define identities for best practices examples
var (
    BatchID     = pipz.NewIdentity("batch", "Batch processor with timeout")
    ProcessID   = pipz.NewIdentity("process", "Processes batch items")
    SlowLogID   = pipz.NewIdentity("slow-log", "Logs slow operations")
    MaybeSlowID = pipz.NewIdentity("maybe-slow", "Potentially slow operation")
    LogSlowID   = pipz.NewIdentity("log-slow", "Logs slow operations")
)

// Handle partial results
batchWithTimeout := pipz.NewTimeout(BatchID,
    pipz.Apply(ProcessID, func(ctx context.Context, batch Batch) (Batch, error) {
        var processed []Item
        for _, item := range batch.Items {
            if ctx.Err() != nil {
                // Return partial results
                return Batch{
                    Items:     batch.Items,
                    Processed: processed,
                    Partial:   true,
                }, ctx.Err()
            }
            processed = append(processed, process(item))
        }
        return Batch{Items: batch.Items, Processed: processed}, nil
    }),
    30*time.Second,
)

// Log slow operations
slowLogger := pipz.NewHandle(SlowLogID,
    pipz.NewTimeout(MaybeSlowID,
        processor,
        5*time.Second,
    ),
    pipz.Effect(LogSlowID, func(ctx context.Context, err *pipz.Error[Data]) error {
        // Log operations that take more than 3s
        if err.Duration > 3*time.Second {
            log.Printf("Slow operation: %v (timeout: %v)", err.Duration, err.Timeout)
        }
        return nil
    }),
)

See Also

  • Retry - Often combined with timeout
  • Fallback - For timeout recovery
  • Race - Alternative approach to timeouts
  • Handle - For timeout monitoring