zoobzio December 12, 2025 Edit this page

Best Practices

Guidelines for building production-ready pipelines with pipz.

Design Principles

1. Single Responsibility

Each processor should do one thing well:

// Define identities as package-level variables
var (
    ValidateEmailID = pipz.NewIdentity("validate_email", "Validates email format")
    NormalizeEmailID = pipz.NewIdentity("normalize_email", "Normalizes email to lowercase")
    ProcessEmailID = pipz.NewIdentity("process_email", "Processes email (does too much!)")
)

// Good: Focused processors
validateEmail := pipz.Apply(ValidateEmailID, func(ctx context.Context, u User) (User, error) {
    if !isValidEmail(u.Email) {
        return u, errors.New("invalid email format")
    }
    return u, nil
})

normalizeEmail := pipz.Transform(NormalizeEmailID, func(ctx context.Context, u User) User {
    u.Email = strings.ToLower(strings.TrimSpace(u.Email))
    return u
})

// Bad: Doing too much
processEmail := pipz.Apply(ProcessEmailID, func(ctx context.Context, u User) (User, error) {
    // Validating AND normalizing AND checking duplicates
    u.Email = strings.ToLower(strings.TrimSpace(u.Email))
    if !isValidEmail(u.Email) {
        return u, errors.New("invalid email")
    }
    if emailExists(u.Email) {
        return u, errors.New("email exists")
    }
    return u, nil
})

2. Identity-Driven Design

Store Identity objects as package-level variables. This isn't just good style - it's what makes your pipelines composable and modifiable at runtime.

Why Identity Variables are Critical

Identity objects serve as keys that enable dynamic pipeline modification. Because Identity is a comparable struct, you must use the same Identity variable for both creation and lookup:

// Define identities as package-level variables
var (
    ValidateOrderID = pipz.NewIdentity("validate_order", "Validates order structure and totals")
    CalculatePriceID = pipz.NewIdentity("calculate_price", "Calculates order pricing")
    ProcessPaymentID = pipz.NewIdentity("process_payment", "Processes payment transaction")
)

// Use the same Identity when creating processors
validateOrder := pipz.Apply(ValidateOrderID, validateFunc)
calculatePrice := pipz.Transform(CalculatePriceID, priceFunc)
processPayment := pipz.Apply(ProcessPaymentID, paymentFunc)

// Later, use the same Identity for pipeline modification:
orderPipeline.After(ValidateOrderID, fraudDetection)
orderPipeline.Replace(ProcessPaymentID, testPaymentProcessor)
orderPipeline.Remove(CalculatePriceID) // For free orders

Without stored Identity variables, you lose this composability - creating new Identity objects inline makes dynamic modification impossible since they won't match.

The Pattern

// Define all identities as package-level variables - these become your pipeline's "keys"
var (
    // Processor identities
    ValidateOrderID     = pipz.NewIdentity("validate_order", "Validates order structure and totals")
    CheckInventoryID    = pipz.NewIdentity("check_inventory", "Checks inventory availability")
    CalculatePriceID    = pipz.NewIdentity("calculate_price", "Calculates subtotal, tax, and total")
    ApplyDiscountsID    = pipz.NewIdentity("apply_discounts", "Applies applicable discounts")
    ProcessPaymentID    = pipz.NewIdentity("process_payment", "Processes payment transaction")
    UpdateInventoryID   = pipz.NewIdentity("update_inventory", "Updates inventory levels")
    SendConfirmationID  = pipz.NewIdentity("send_confirmation", "Sends order confirmation")
    UpdateCRMID         = pipz.NewIdentity("update_crm", "Updates CRM system")
    MarkPendingID       = pipz.NewIdentity("mark_pending", "Marks order as pending")
    ApplyBulkDiscountID = pipz.NewIdentity("apply_bulk_discount", "Applies bulk order discount")

    // Connector identities
    OrderPipelineID      = pipz.NewIdentity("order-pipeline", "Complete order processing pipeline")
    PaymentFlowID        = pipz.NewIdentity("payment-flow", "Payment processing with retry")
    NotificationsID      = pipz.NewIdentity("notification-flow", "Notification with timeout")
    ExpressCheckoutID    = pipz.NewIdentity("express-checkout", "Express checkout pipeline")
    BulkOrdersID         = pipz.NewIdentity("bulk-orders", "Bulk order processing pipeline")
)

// Use Identity variables when creating processors
func createOrderProcessors() map[pipz.Identity]pipz.Chainable[Order] {
    return map[pipz.Identity]pipz.Chainable[Order]{
        ValidateOrderID: pipz.Apply(ValidateOrderID, func(ctx context.Context, o Order) (Order, error) {
            if o.Total <= 0 {
                return o, errors.New("invalid order total")
            }
            return o, nil
        }),

        CheckInventoryID: pipz.Apply(CheckInventoryID, func(ctx context.Context, o Order) (Order, error) {
            for _, item := range o.Items {
                if !inventory.Has(item.SKU, item.Quantity) {
                    return o, fmt.Errorf("insufficient inventory for %s", item.SKU)
                }
            }
            return o, nil
        }),

        CalculatePriceID: pipz.Transform(CalculatePriceID, func(ctx context.Context, o Order) Order {
            o.Subtotal = calculateSubtotal(o.Items)
            o.Tax = calculateTax(o.Subtotal, o.ShippingAddress)
            o.Total = o.Subtotal + o.Tax + o.ShippingCost
            return o
        }),
    }
}

// Build pipelines using Identity variables
func createOrderPipeline() pipz.Chainable[Order] {
    seq := pipz.NewSequence[Order](OrderPipelineID)
    seq.Register(
        pipz.Apply(ValidateOrderID, validateOrder),
        pipz.Apply(CheckInventoryID, checkInventory),
        pipz.Transform(CalculatePriceID, calculatePrice),
        pipz.Transform(ApplyDiscountsID, applyDiscounts),

        // Payment with retry
        pipz.NewRetry(PaymentFlowID,
            pipz.Apply(ProcessPaymentID, processPayment),
            3,
        ),

        pipz.Apply(UpdateInventoryID, updateInventory),

        // Notifications with timeout
        pipz.NewTimeout(NotificationsID,
            pipz.Effect(SendConfirmationID, sendConfirmation),
            5*time.Second,
        ),
    )
    return seq
}

// Example: Dynamic pipeline composition
func createCustomPipeline(config Config) pipz.Chainable[Order] {
    // For dynamic names, create Identity inline when name is runtime-determined
    customPipelineID := pipz.NewIdentity(config.Name, "Custom pipeline based on config")
    seq := pipz.NewSequence[Order](customPipelineID)

    // Always validate - using the shared Identity variable
    seq.Register(pipz.Apply(ValidateOrderID, validateOrder))

    // Conditionally add processors
    if config.CheckInventory {
        seq.Register(pipz.Apply(CheckInventoryID, checkInventory))
    }

    seq.Register(pipz.Transform(CalculatePriceID, calculatePrice))

    if config.DiscountsEnabled {
        seq.Register(pipz.Transform(ApplyDiscountsID, applyDiscounts))
    }

    return seq
}

Benefits of Identity-driven design:

  1. Composability: Same Identity enables pipeline modification (After, Before, Remove, Replace)
  2. No Magic Strings: Variables prevent typos and runtime errors
  3. IDE Support: Auto-completion and refactoring work perfectly
  4. Debugging: Error messages show meaningful processor names with descriptions
  5. Searchability: Easy to find all usages of a processor
  6. Self-Documenting: Identity descriptions explain what each processor does
// Example: Error messages are more helpful with Identity names
result, err := pipeline.Process(ctx, order)
if err != nil {
    // Error: "failed at [order-pipeline -> validate_order]: invalid order total"
    // Much clearer than: "failed at [pipeline1 -> proc1]: invalid order total"
}

// Example: Easy to modify pipelines using Identity variables
func addFraudCheck(seq *pipz.Sequence[Order]) {
    // Use the same ValidateOrderID variable for lookup
    var FraudCheckID = pipz.NewIdentity("fraud_check", "Checks for fraudulent orders")
    fraudCheck := pipz.Apply(FraudCheckID, checkFraud)
    seq.After(ValidateOrderID, fraudCheck) // Insert after validation
}

3. Explicit Error Handling

Make error handling visible and intentional:

// Define identities as package-level variables
var (
    PaymentFlowID     = pipz.NewIdentity("payment-flow", "Payment processing with error handling")
    ValidatePaymentID = pipz.NewIdentity("validate", "Validates payment details")
    ChargeCardID      = pipz.NewIdentity("charge", "Charges payment card")
    NotifyHandlerID   = pipz.NewIdentity("notify-with-logging", "Notification with error logging")
    NotifyID          = pipz.NewIdentity("notify", "Sends notification")
    LogNotifyErrorID  = pipz.NewIdentity("log_notification_error", "Logs notification errors")
    BadProcessID      = pipz.NewIdentity("process", "Processes payment (too much in one place)")
)

// Good: Clear error strategy with Identity variables
payment := pipz.NewSequence(PaymentFlowID,
    pipz.Apply(ValidatePaymentID, validatePayment),

    // Critical operation with retry
    pipz.RetryWithBackoff(
        pipz.Apply(ChargeCardID, chargeCard),
        3,
        time.Second,
    ),

    // Non-critical with error handler
    pipz.NewHandle(NotifyHandlerID,
        pipz.Effect(NotifyID, sendNotification),
        pipz.Effect(LogNotifyErrorID, func(ctx context.Context, err *pipz.Error[Order]) error {
            log.Printf("Notification failed for order %s: %v", err.InputData.ID, err.Err)
            return nil
        }),
    ),
)

// Bad: Hidden error handling
payment := pipz.Apply(BadProcessID, func(ctx context.Context, p Payment) (Payment, error) {
    // Validation, charging, and notification all mixed together
    // Error handling buried in function
})

4. Type-Safe Routes

Use custom types for routing decisions:

// Good: Type-safe enum
type OrderPriority int
const (
    PriorityStandard OrderPriority = iota
    PriorityExpress
    PriorityOvernight
)

router := pipz.Switch(
    func(ctx context.Context, o Order) OrderPriority {
        if o.ShippingMethod == "overnight" {
            return PriorityOvernight
        }
        // ...
    },
    map[OrderPriority]pipz.Chainable[Order]{
        PriorityStandard:  standardFulfillment,
        PriorityExpress:   expressFulfillment,
        PriorityOvernight: overnightFulfillment,
    },
)

// Bad: Magic strings
router := pipz.Switch(
    func(ctx context.Context, o Order) string {
        return o.ShippingMethod // "standard", "express", etc.
    },
    map[string]pipz.Chainable[Order]{
        "standard": standardFulfillment,
        "express":  expressFulfillment,
        // Easy to typo, no compile-time checking
    },
)

Pipeline Patterns

Pattern: Concurrent vs Sequential

Understanding when to use Concurrent versus Sequential is critical for performance:

Use Concurrent for I/O Operations

Concurrent is designed for operations with real latency where parallel execution provides benefit:

// Define identities as package-level variables
var (
    SendNotificationsID = pipz.NewIdentity("send-notifications", "Sends notifications in parallel")
    EnrichDataID        = pipz.NewIdentity("enrich-data", "Fetches enrichment data in parallel")
)

// GOOD: Parallel API calls with actual network latency
notifications := pipz.NewConcurrent(SendNotificationsID,
    nil, // No reducer needed for side effects
    sendEmailNotification,      // API call to email service
    sendSMSNotification,       // API call to SMS gateway
    updateCRMSystem,           // API call to CRM
    pushToAnalytics,           // API call to analytics service
)

// GOOD: Multiple database queries that don't depend on each other
enrichment := pipz.NewConcurrent(EnrichDataID,
    nil, // No reducer needed
    fetchUserProfile,          // Database query
    fetchAccountHistory,       // Database query
    fetchPreferences,          // Database query
)

Use Sequential for Fast Operations

Sequential is better for CPU-bound operations or fast validations:

// Define identities as package-level variables
var ValidationsID = pipz.NewIdentity("validate", "Validates user data")

// BAD: Using Concurrent for simple validations
validations := pipz.NewConcurrent(ValidationsID, nil,  // ❌ Don't do this!
    validateEmail,     // Simple regex check
    validateAge,       // Number comparison
    validateCountry,   // String check
)

// GOOD: Sequential for fast operations
validations := pipz.NewSequence(ValidationsID,     // ✅ Better!
    validateEmail,
    validateAge,
    validateCountry,
)

Performance Implications

Concurrent creates copies of your data (using the Cloner interface) for goroutine isolation:

// This happens internally in Concurrent:
// 1. data.Clone() called for each processor
// 2. Goroutine spawned for each
// 3. Original input returned (modifications discarded)

// For simple operations, the overhead of:
// - Cloning data
// - Spawning goroutines
// - Channel communication
// ...exceeds the benefit of parallelism

Rule of thumb: If an operation takes less than 10ms, use Sequential.

Pattern: Validation First

Always validate early to fail fast:

// Define identities as package-level variables
var (
    ValidationPipelineID   = pipz.NewIdentity("validation-pipeline", "Validation-first processing")
    ValidateStructureID    = pipz.NewIdentity("validate_structure", "Validates data structure")
    ValidateBusinessRulesID = pipz.NewIdentity("validate_business_rules", "Validates business rules")
    EnrichFromAPIID        = pipz.NewIdentity("enrich_from_api", "Enriches data from API")
    CalculatePricingID     = pipz.NewIdentity("calculate_pricing", "Calculates pricing")
    SaveToDatabaseID       = pipz.NewIdentity("save_to_database", "Saves to database")
)

pipeline := pipz.NewSequence(ValidationPipelineID,
    // Validate first - cheap and catches errors early
    pipz.Apply(ValidateStructureID, validateStructure),
    pipz.Apply(ValidateBusinessRulesID, validateBusinessRules),

    // Then expensive operations
    pipz.Apply(EnrichFromAPIID, enrichFromAPI),
    pipz.Apply(CalculatePricingID, calculatePricing),
    pipz.Apply(SaveToDatabaseID, saveToDatabase),
)

Pattern: Graceful Degradation

Continue processing even when non-critical operations fail:

// Define identities as package-level variables
var (
    ProcessingPipelineID = pipz.NewIdentity("processing-pipeline", "Processing with graceful degradation")
    ValidateID           = pipz.NewIdentity("validate", "Validates input")
    ProcessID            = pipz.NewIdentity("process", "Processes data")
    SaveID               = pipz.NewIdentity("save", "Saves data")
    AddRecommendationsID = pipz.NewIdentity("add_recommendations", "Adds product recommendations")
    NotificationsID      = pipz.NewIdentity("notifications", "Sends notifications in parallel")
    EmailID              = pipz.NewIdentity("email", "Sends email")
    LogEmailErrorID      = pipz.NewIdentity("log_email_error", "Logs email errors")
    AnalyticsID          = pipz.NewIdentity("analytics", "Tracks analytics")
    EmailHandlerID       = pipz.NewIdentity("email-handler", "Email with error handling")
)

pipeline := pipz.NewSequence(ProcessingPipelineID,
    // Critical path
    pipz.Apply(ValidateID, validate),
    pipz.Apply(ProcessID, process),
    pipz.Apply(SaveID, save),

    // Best-effort enrichments
    pipz.Enrich(AddRecommendationsID, func(ctx context.Context, order Order) (Order, error) {
        recs, err := recommendationService.Get(ctx, order.UserID)
        if err != nil {
            return order, err // Enrich will ignore this
        }
        order.Recommendations = recs
        return order, nil
    }),

    // Non-blocking notifications
    pipz.NewConcurrent(NotificationsID, nil,
        pipz.NewHandle(EmailHandlerID,
            pipz.Effect(EmailID, sendEmail),
            pipz.Effect(LogEmailErrorID, logError),
        ),
        pipz.Effect(AnalyticsID, trackAnalytics),
    ),
)

Pattern: Bulkhead Isolation

Isolate failures to prevent cascade. The key insight is that bulkheads are simple to connect when operating on the same type:

// Define identities as package-level variables
var (
    OrderPipelineID        = pipz.NewIdentity("order-pipeline", "Core order processing")
    NotificationTimeoutID  = pipz.NewIdentity("notification-timeout", "Timeout for notifications")
    NotificationConcurrentID = pipz.NewIdentity("notifications", "Parallel notifications")
    FullPipelineID         = pipz.NewIdentity("full-pipeline", "Full order processing with notifications")
    LogNotifyFailureID     = pipz.NewIdentity("log_notification_failure", "Logs notification failures")
    NotificationHandlerID  = pipz.NewIdentity("notification-handler", "Handles notification errors")
)

// Separate pipelines for different concerns
orderPipeline := pipz.NewSequence(OrderPipelineID,
    validateOrder,
    processPayment,
    updateInventory,
)

// Isolated notification pipeline
notificationPipeline := pipz.NewTimeout(NotificationTimeoutID,
    pipz.NewConcurrent(NotificationConcurrentID, nil,
        sendEmail,
        sendSMS,
        updateCRM,
    ),
    5*time.Second, // Don't let notifications block orders
)

// Compose with isolation - easy because both operate on Order type
fullPipeline := pipz.NewSequence(FullPipelineID,
    orderPipeline,
    pipz.NewHandle(NotificationHandlerID,
        notificationPipeline,
        pipz.Effect(LogNotifyFailureID, logError),
    ),
)

Pattern: Orchestration Pipeline

The ultimate pattern is to build workflows as individual pipelines for discrete types, then create an orchestration pipeline that coordinates them:

// Step 1: Define all identities as package-level variables
var (
    // Domain pipeline identities
    OrderWorkflowID    = pipz.NewIdentity("order-workflow", "Order validation and pricing")
    CustomerWorkflowID = pipz.NewIdentity("customer-workflow", "Customer validation and credit check")
    InvoiceWorkflowID  = pipz.NewIdentity("invoice-workflow", "Invoice generation and formatting")

    // Orchestrator identities
    OrchestratorID           = pipz.NewIdentity("orchestrator", "Orchestrates order workflow processing")
    ProcessOrderID           = pipz.NewIdentity("process_order", "Processes order through order pipeline")
    ProcessCustomerInvoiceID = pipz.NewIdentity("process_customer_invoice", "Processes customer and invoice in parallel")
    FinalizeID               = pipz.NewIdentity("finalize", "Final coordination and confirmation")
)

// Step 2: Build discrete pipelines for each type
type OrderWorkflow struct {
    Order    Order
    Customer Customer
    Invoice  Invoice
    Config   WorkflowConfig
}

// Individual pipelines for each domain object
orderPipeline := pipz.NewSequence[Order](OrderWorkflowID,
    validateOrder,
    calculatePricing,
    applyDiscounts,
)

customerPipeline := pipz.NewSequence[Customer](CustomerWorkflowID,
    validateCustomer,
    checkCreditLimit,
    updateLoyaltyPoints,
)

invoicePipeline := pipz.NewSequence[Invoice](InvoiceWorkflowID,
    generateInvoice,
    applyTaxes,
    formatForDisplay,
)

// Step 3: Create orchestration pipeline that coordinates execution
orchestrator := pipz.NewSequence[OrderWorkflow](OrchestratorID,
    // Process order first
    pipz.Apply(ProcessOrderID, func(ctx context.Context, wf OrderWorkflow) (OrderWorkflow, error) {
        processed, err := orderPipeline.Process(ctx, wf.Order)
        if err != nil {
            return wf, fmt.Errorf("order processing failed: %w", err)
        }
        wf.Order = processed
        return wf, nil
    }),

    // Process customer in parallel with invoice if order succeeded
    pipz.Apply(ProcessCustomerInvoiceID, func(ctx context.Context, wf OrderWorkflow) (OrderWorkflow, error) {
        var wg sync.WaitGroup
        var custErr, invErr error

        wg.Add(2)

        // Process customer
        go func() {
            defer wg.Done()
            processed, err := customerPipeline.Process(ctx, wf.Customer)
            if err != nil {
                custErr = err
                return
            }
            wf.Customer = processed
        }()

        // Process invoice
        go func() {
            defer wg.Done()
            // Invoice needs order data
            wf.Invoice.OrderID = wf.Order.ID
            wf.Invoice.Amount = wf.Order.Total

            processed, err := invoicePipeline.Process(ctx, wf.Invoice)
            if err != nil {
                invErr = err
                return
            }
            wf.Invoice = processed
        }()

        wg.Wait()

        // Handle errors based on config
        if custErr != nil && wf.Config.CustomerRequired {
            return wf, fmt.Errorf("customer processing failed: %w", custErr)
        }
        if invErr != nil && wf.Config.InvoiceRequired {
            return wf, fmt.Errorf("invoice processing failed: %w", invErr)
        }

        return wf, nil
    }),

    // Final coordination
    pipz.Apply(FinalizeID, func(ctx context.Context, wf OrderWorkflow) (OrderWorkflow, error) {
        // Any final coordination logic
        if wf.Config.SendConfirmation {
            // Send order confirmation with all processed data
            sendOrderConfirmation(wf.Order, wf.Customer, wf.Invoice)
        }
        return wf, nil
    }),
)

// Usage
workflow := OrderWorkflow{
    Order:    order,
    Customer: customer,
    Invoice:  Invoice{},
    Config: WorkflowConfig{
        CustomerRequired: true,
        InvoiceRequired:  false,
        SendConfirmation: true,
    },
}

result, err := orchestrator.Process(ctx, workflow)

This pattern provides:

  • Type Safety: Each pipeline operates on its specific type
  • Reusability: Individual pipelines can be reused in different workflows
  • Testability: Each pipeline can be tested independently
  • Flexibility: Orchestration logic can change without modifying domain pipelines
  • Clear Dependencies: The orchestrator explicitly manages data flow between pipelines

Pattern: Feature Flags

Support gradual rollouts and A/B testing:

// Define identities as package-level variables
var FeatureFlagPipelineID = pipz.NewIdentity("feature-flag-pipeline", "Pipeline with feature flags")

func createPipeline(features FeatureFlags) pipz.Chainable[Order] {
    seq := pipz.NewSequence[Order](FeatureFlagPipelineID)

    // Core processors always included
    seq.Register(validateOrder, calculatePricing)

    // Conditionally add processors
    if features.IsEnabled("fraud-detection-v2") {
        seq.Register(fraudDetectionV2)
    } else {
        seq.Register(fraudDetectionV1)
    }

    if features.IsEnabled("loyalty-points") {
        seq.Register(calculateLoyaltyPoints)
    }

    seq.Register(chargePayment, fulfillOrder)
    return seq
}

Error Handling Strategies

Error Propagation Pattern

Understanding how errors flow through your pipelines is critical for proper handling:

┌──────────────────────────────────────────────────────────────────┐
│                    Error Propagation in Pipelines                │
└──────────────────────────────────────────────────────────────────┘

Sequential Pipeline:
═══════════════════
Pipeline A
    ├─→ Processor 1 [✓] Success
    │       ↓
    ├─→ Processor 2 [✗] Failure ──→ Error[T]{Stage: "proc2", Cause: err}
    │       ↓                            ↓
    │    Pipeline Stops                  ▼
    │                              Returns to caller
    └─→ Processor 3 [⋯] Never runs

Nested Pipeline:
════════════════
Main Pipeline
    ├─→ Validate [✓]
    │       ↓
    ├─→ Sub-Pipeline ──┐
    │   ├─→ Step A [✓] │
    │   ├─→ Step B [✗]─┼──→ Error wrapped with sub-pipeline context
    │   └─→ Step C [⋯] │         ↓
    │                  │         ▼
    │                  └──→ Error[T]{Stage: "sub-pipeline.step-b"}
    │                            ↓
    └─→ Save [⋯] Never runs      ▼
                           Main Pipeline Stops

Concurrent Pipeline:
════════════════════
Concurrent Connector
    ├─→ Service A [✓] Completes
    ├─→ Service B [✗] Fails ──→ Error logged but doesn't stop others
    ├─→ Service C [✓] Completes
    └─→ Returns original input (errors don't propagate up)

With Error Handler:
════════════════════
Pipeline with Recovery
    ├─→ Process [✗] ──→ Error[T]
    │       ↓               ↓
    │   Fallback ←──────────┘
    │       ↓
    └─→ Continue [✓] Pipeline continues with fallback result

Key Insights:

  • Sequential stops on first error - Subsequent processors never run
  • Errors carry context - Stage name, cause, and data state at failure
  • Concurrent doesn't propagate errors - Failures are isolated
  • Fallback enables recovery - Convert errors back to success path

Strategy: Categorize and Route

type ErrorCategory string

const (
    CategoryValidation ErrorCategory = "validation"
    CategoryTransient  ErrorCategory = "transient"
    CategoryBusiness   ErrorCategory = "business"
    CategorySystem     ErrorCategory = "system"
)

func handleError(ctx context.Context, failure FailedOrder) (FailedOrder, error) {
    category := categorizeError(failure.Error)
    
    switch category {
    case CategoryValidation:
        // Don't retry bad input
        return sendToDeadLetter(ctx, failure)
        
    case CategoryTransient:
        // Retry with backoff
        return pipz.RetryWithBackoff(
            reprocessOrder,
            5,
            time.Second,
        ).Process(ctx, failure)
        
    case CategoryBusiness:
        // Needs human intervention
        return sendToManualReview(ctx, failure)
        
    case CategorySystem:
        // Alert ops team
        alertOps(failure.Error)
        return sendToDeadLetter(ctx, failure)
    }
    
    return failure, failure.Error
}

Strategy: Rate Limiting and Circuit Breaking

Rate Limiting Best Practices

Use RateLimiter to protect downstream services and respect API limits:

// Define identities as package-level variables
var (
    GlobalLimiterID   = pipz.NewIdentity("global-limit", "Global system rate limit")
    ServiceLimiterID  = pipz.NewIdentity("stripe-limit", "Stripe API rate limit")
    EndpointLimiterID = pipz.NewIdentity("charges-limit", "Charges endpoint rate limit")
    PaymentPipelineID = pipz.NewIdentity("payment-pipeline", "Payment processing pipeline")
    ChargeID          = pipz.NewIdentity("charge", "Processes payment charge")
)

// Pattern: Layer rate limits (global -> service -> endpoint)
globalLimiter := pipz.NewRateLimiter(GlobalLimiterID, 10000, 1000)
serviceLimiter := pipz.NewRateLimiter(ServiceLimiterID, 100, 20)  // Stripe's actual limits
endpointLimiter := pipz.NewRateLimiter(EndpointLimiterID, 50, 10)

pipeline := pipz.NewSequence(PaymentPipelineID,
    globalLimiter,      // Global system limit
    serviceLimiter,     // Per-service limit
    endpointLimiter,    // Per-endpoint limit
    pipz.Apply(ChargeID, makePayment),
)

// Pattern: Dynamic rate adjustment based on conditions
func adjustRateLimit(limiter *pipz.RateLimiter[Request], config Config) {
    if config.OffPeakHours {
        limiter.SetRate(1000)  // Higher rate during off-peak
    } else {
        limiter.SetRate(100)   // Lower rate during peak
    }

    if config.PremiumTier {
        limiter.SetMode("wait")  // Wait for premium users
    } else {
        limiter.SetMode("drop")  // Fail fast for basic users
    }
}

// Pattern: Per-user rate limiting with Switch
// Define identities as package-level variables
var (
    UserRateLimitID   = pipz.NewIdentity("user-rate-limit", "Routes by user tier")
    PremiumRateID     = pipz.NewIdentity("premium-rate", "Premium tier rate limit")
    StandardRateID    = pipz.NewIdentity("standard-rate", "Standard tier rate limit")
    FreeRateID        = pipz.NewIdentity("free-rate", "Free tier rate limit")
)

userLimiter := pipz.NewSwitch(UserRateLimitID,
    func(ctx context.Context, req Request) string {
        return getUserTier(req.UserID)
    },
).
AddRoute("premium", pipz.NewRateLimiter(PremiumRateID, 1000, 100)).
AddRoute("standard", pipz.NewRateLimiter(StandardRateID, 100, 10)).
AddRoute("free", pipz.NewRateLimiter(FreeRateID, 10, 1))

Circuit Breaker Best Practices

Use CircuitBreaker to prevent cascade failures and give services time to recover:

// Define identities as package-level variables
var (
    StripeBreakerID = pipz.NewIdentity("stripe-breaker", "Circuit breaker for Stripe API")
    StripeChargeID  = pipz.NewIdentity("stripe-charge", "Charges via Stripe")
    DBBreakerID     = pipz.NewIdentity("db-breaker", "Circuit breaker for database")
    DBSaveID        = pipz.NewIdentity("db-save", "Saves to database")
)

// Pattern: Circuit breaker per external dependency
stripeBreaker := pipz.NewCircuitBreaker(StripeBreakerID,
    pipz.Apply(StripeChargeID, chargeStripe),
    5,                    // Open after 5 failures
    30*time.Second,       // Try recovery after 30s
)

dbBreaker := pipz.NewCircuitBreaker(DBBreakerID,
    pipz.Apply(DBSaveID, saveToDatabase),
    10,                   // More tolerant for internal services
    time.Minute,          // Longer recovery time
)

// Pattern: Combine with rate limiting for comprehensive protection
// Define identities as package-level variables
var (
    ResilientAPIID = pipz.NewIdentity("resilient-api", "Resilient API pipeline")
    APIRateID      = pipz.NewIdentity("api-rate", "API rate limiter")
    APIBreakerID   = pipz.NewIdentity("api-breaker", "API circuit breaker")
    APIRetryID     = pipz.NewIdentity("api-retry", "API retry handler")
    APICallID      = pipz.NewIdentity("api-call", "External API call")
)

resilientAPI := pipz.NewSequence(ResilientAPIID,
    pipz.NewRateLimiter(APIRateID, 100, 20),  // Rate limit first
    pipz.NewCircuitBreaker(APIBreakerID,       // Then circuit break
        pipz.NewRetry(APIRetryID,              // With retry inside
            pipz.Apply(APICallID, callExternalAPI),
            3,
        ),
        5, 30*time.Second,
    ),
)

// Pattern: Graduated thresholds based on service type
func createCircuitBreaker(service string, serviceType ServiceType, processor pipz.Chainable[Request]) *pipz.CircuitBreaker[Request] {
    // Dynamic Identity when service name is determined at runtime
    breakerID := pipz.NewIdentity(service+"-breaker", "Circuit breaker for "+service)
    switch serviceType {
    case ServiceTypeExternal:
        // External services: fail fast, longer recovery
        return pipz.NewCircuitBreaker(breakerID, processor, 3, 2*time.Minute)
    case ServiceTypeInternal:
        // Internal services: more tolerant, shorter recovery
        return pipz.NewCircuitBreaker(breakerID, processor, 10, 30*time.Second)
    case ServiceTypeCritical:
        // Critical services: very tolerant, quick recovery attempts
        return pipz.NewCircuitBreaker(breakerID, processor, 20, 10*time.Second)
    default:
        return pipz.NewCircuitBreaker(breakerID, processor, 5, time.Minute)
    }
}

// Pattern: Monitor and adjust circuit breakers
func monitorCircuits(breakers map[string]*pipz.CircuitBreaker[Request]) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        for name, breaker := range breakers {
            state := breaker.GetState()
            metrics.Gauge("circuit.state", stateToValue(state), "service", name)
            
            switch state {
            case "open":
                log.Warn("Circuit breaker open", "service", name)
                // Consider alerting operations team
            case "half-open":
                log.Info("Circuit breaker testing recovery", "service", name)
            }
        }
    }
}

Combined Resilience Patterns

// Pattern: Complete resilience stack
func createResilientProcessor(name string, processor pipz.Chainable[Request]) pipz.Chainable[Request] {
    // Create dynamic identities when name is determined at runtime
    resilientID := pipz.NewIdentity(name+"-resilient", "Resilient wrapper for "+name)
    rateID := pipz.NewIdentity(name+"-rate", "Rate limiter for "+name)
    timeoutID := pipz.NewIdentity(name+"-timeout", "Timeout for "+name)
    breakerID := pipz.NewIdentity(name+"-breaker", "Circuit breaker for "+name)
    retryID := pipz.NewIdentity(name+"-retry", "Retry handler for "+name)

    return pipz.NewSequence(resilientID,
        // 1. Rate limiting (protect downstream)
        pipz.NewRateLimiter(rateID, 100, 20),

        // 2. Timeout (bound operation time)
        pipz.NewTimeout(timeoutID,

            // 3. Circuit breaker (prevent cascade failures)
            pipz.NewCircuitBreaker(breakerID,

                // 4. Retry (handle transient failures)
                pipz.NewRetry(retryID, processor, 3),

                5, 30*time.Second,
            ),
            10*time.Second,
        ),
    )
}

// Pattern: Service mesh style protection
// Define identities as package-level variables
var (
    ServiceMeshID     = pipz.NewIdentity("service-mesh", "Service mesh with fallback")
    FallbackBreakerID = pipz.NewIdentity("fallback-breaker", "Fallback service circuit breaker")
    FallbackTimeoutID = pipz.NewIdentity("fallback-timeout", "Fallback service timeout")
)

serviceCall := pipz.NewFallback(ServiceMeshID,
    // Primary service with full protection
    createResilientProcessor("primary", primaryService),

    // Fallback service with lighter protection
    pipz.NewCircuitBreaker(FallbackBreakerID,
        pipz.NewTimeout(FallbackTimeoutID, fallbackService, 5*time.Second),
        3, 10*time.Second,
    ),
)

Configuration Best Practices

// Pattern: Configuration-driven circuit breaker settings
type CircuitConfig struct {
    FailureThreshold int           `yaml:"failure_threshold"`
    SuccessThreshold int           `yaml:"success_threshold"`
    ResetTimeout     time.Duration `yaml:"reset_timeout"`
}

type RateConfig struct {
    Rate float64 `yaml:"rate"`
    Burst int    `yaml:"burst"`
    Mode string  `yaml:"mode"`
}

func configureConnectors(breaker *pipz.CircuitBreaker[Request], limiter *pipz.RateLimiter[Request], cfg Config) {
    // Circuit breaker configuration
    breaker.SetFailureThreshold(cfg.Circuit.FailureThreshold).
            SetSuccessThreshold(cfg.Circuit.SuccessThreshold).
            SetResetTimeout(cfg.Circuit.ResetTimeout)
    
    // Rate limiter configuration
    limiter.SetRate(cfg.Rate.Rate).
            SetBurst(cfg.Rate.Burst).
            SetMode(cfg.Rate.Mode)
}

Production Checklist

Design

  • Each processor has single responsibility
  • Error handling is explicit and appropriate
  • Uses type-safe routing (no magic strings)
  • Validates input early
  • Non-critical operations don't block critical path

Resilience

  • Timeouts on all external calls
  • Retry logic for transient failures
  • Circuit breakers for external dependencies
  • Rate limiting to protect downstream services
  • Graduated failure thresholds based on service type
  • Graceful degradation for features
  • Bulkhead isolation between components
  • Combined resilience patterns (rate limit + circuit break + retry)

Observability

  • Metrics on all processors
  • Distributed tracing enabled
  • Structured logging throughout
  • Error categorization and alerting
  • Performance benchmarks

Operations

  • Feature flags for gradual rollout
  • Dead letter queue for failed items
  • Manual intervention workflow
  • Runbooks for common issues
  • Load testing completed

Anti-Patterns to Avoid

  1. God Processor: One processor doing everything
  2. Silent Failures: Swallowing errors without logging
  3. Unbounded Retries: Retrying forever without backoff
  4. Missing Timeouts: No time bounds on operations
  5. Shared Mutable State: Processors modifying shared data
  6. Magic Strings: Using strings instead of typed constants
  7. Hidden Dependencies: Processors with side effects on external state
  8. Circular Fallbacks: Creating recursive fallback chains that can cause stack overflow

Next Steps