Best Practices
Guidelines for building production-ready pipelines with pipz.
Design Principles
1. Single Responsibility
Each processor should do one thing well:
// Define identities as package-level variables
var (
ValidateEmailID = pipz.NewIdentity("validate_email", "Validates email format")
NormalizeEmailID = pipz.NewIdentity("normalize_email", "Normalizes email to lowercase")
ProcessEmailID = pipz.NewIdentity("process_email", "Processes email (does too much!)")
)
// Good: Focused processors
validateEmail := pipz.Apply(ValidateEmailID, func(ctx context.Context, u User) (User, error) {
if !isValidEmail(u.Email) {
return u, errors.New("invalid email format")
}
return u, nil
})
normalizeEmail := pipz.Transform(NormalizeEmailID, func(ctx context.Context, u User) User {
u.Email = strings.ToLower(strings.TrimSpace(u.Email))
return u
})
// Bad: Doing too much
processEmail := pipz.Apply(ProcessEmailID, func(ctx context.Context, u User) (User, error) {
// Validating AND normalizing AND checking duplicates
u.Email = strings.ToLower(strings.TrimSpace(u.Email))
if !isValidEmail(u.Email) {
return u, errors.New("invalid email")
}
if emailExists(u.Email) {
return u, errors.New("email exists")
}
return u, nil
})
2. Identity-Driven Design
Store Identity objects as package-level variables. This isn't just good style - it's what makes your pipelines composable and modifiable at runtime.
Why Identity Variables are Critical
Identity objects serve as keys that enable dynamic pipeline modification. Because Identity is a comparable struct, you must use the same Identity variable for both creation and lookup:
// Define identities as package-level variables
var (
ValidateOrderID = pipz.NewIdentity("validate_order", "Validates order structure and totals")
CalculatePriceID = pipz.NewIdentity("calculate_price", "Calculates order pricing")
ProcessPaymentID = pipz.NewIdentity("process_payment", "Processes payment transaction")
)
// Use the same Identity when creating processors
validateOrder := pipz.Apply(ValidateOrderID, validateFunc)
calculatePrice := pipz.Transform(CalculatePriceID, priceFunc)
processPayment := pipz.Apply(ProcessPaymentID, paymentFunc)
// Later, use the same Identity for pipeline modification:
orderPipeline.After(ValidateOrderID, fraudDetection)
orderPipeline.Replace(ProcessPaymentID, testPaymentProcessor)
orderPipeline.Remove(CalculatePriceID) // For free orders
Without stored Identity variables, you lose this composability - creating new Identity objects inline makes dynamic modification impossible since they won't match.
The Pattern
// Define all identities as package-level variables - these become your pipeline's "keys"
var (
// Processor identities
ValidateOrderID = pipz.NewIdentity("validate_order", "Validates order structure and totals")
CheckInventoryID = pipz.NewIdentity("check_inventory", "Checks inventory availability")
CalculatePriceID = pipz.NewIdentity("calculate_price", "Calculates subtotal, tax, and total")
ApplyDiscountsID = pipz.NewIdentity("apply_discounts", "Applies applicable discounts")
ProcessPaymentID = pipz.NewIdentity("process_payment", "Processes payment transaction")
UpdateInventoryID = pipz.NewIdentity("update_inventory", "Updates inventory levels")
SendConfirmationID = pipz.NewIdentity("send_confirmation", "Sends order confirmation")
UpdateCRMID = pipz.NewIdentity("update_crm", "Updates CRM system")
MarkPendingID = pipz.NewIdentity("mark_pending", "Marks order as pending")
ApplyBulkDiscountID = pipz.NewIdentity("apply_bulk_discount", "Applies bulk order discount")
// Connector identities
OrderPipelineID = pipz.NewIdentity("order-pipeline", "Complete order processing pipeline")
PaymentFlowID = pipz.NewIdentity("payment-flow", "Payment processing with retry")
NotificationsID = pipz.NewIdentity("notification-flow", "Notification with timeout")
ExpressCheckoutID = pipz.NewIdentity("express-checkout", "Express checkout pipeline")
BulkOrdersID = pipz.NewIdentity("bulk-orders", "Bulk order processing pipeline")
)
// Use Identity variables when creating processors
func createOrderProcessors() map[pipz.Identity]pipz.Chainable[Order] {
return map[pipz.Identity]pipz.Chainable[Order]{
ValidateOrderID: pipz.Apply(ValidateOrderID, func(ctx context.Context, o Order) (Order, error) {
if o.Total <= 0 {
return o, errors.New("invalid order total")
}
return o, nil
}),
CheckInventoryID: pipz.Apply(CheckInventoryID, func(ctx context.Context, o Order) (Order, error) {
for _, item := range o.Items {
if !inventory.Has(item.SKU, item.Quantity) {
return o, fmt.Errorf("insufficient inventory for %s", item.SKU)
}
}
return o, nil
}),
CalculatePriceID: pipz.Transform(CalculatePriceID, func(ctx context.Context, o Order) Order {
o.Subtotal = calculateSubtotal(o.Items)
o.Tax = calculateTax(o.Subtotal, o.ShippingAddress)
o.Total = o.Subtotal + o.Tax + o.ShippingCost
return o
}),
}
}
// Build pipelines using Identity variables
func createOrderPipeline() pipz.Chainable[Order] {
seq := pipz.NewSequence[Order](OrderPipelineID)
seq.Register(
pipz.Apply(ValidateOrderID, validateOrder),
pipz.Apply(CheckInventoryID, checkInventory),
pipz.Transform(CalculatePriceID, calculatePrice),
pipz.Transform(ApplyDiscountsID, applyDiscounts),
// Payment with retry
pipz.NewRetry(PaymentFlowID,
pipz.Apply(ProcessPaymentID, processPayment),
3,
),
pipz.Apply(UpdateInventoryID, updateInventory),
// Notifications with timeout
pipz.NewTimeout(NotificationsID,
pipz.Effect(SendConfirmationID, sendConfirmation),
5*time.Second,
),
)
return seq
}
// Example: Dynamic pipeline composition
func createCustomPipeline(config Config) pipz.Chainable[Order] {
// For dynamic names, create Identity inline when name is runtime-determined
customPipelineID := pipz.NewIdentity(config.Name, "Custom pipeline based on config")
seq := pipz.NewSequence[Order](customPipelineID)
// Always validate - using the shared Identity variable
seq.Register(pipz.Apply(ValidateOrderID, validateOrder))
// Conditionally add processors
if config.CheckInventory {
seq.Register(pipz.Apply(CheckInventoryID, checkInventory))
}
seq.Register(pipz.Transform(CalculatePriceID, calculatePrice))
if config.DiscountsEnabled {
seq.Register(pipz.Transform(ApplyDiscountsID, applyDiscounts))
}
return seq
}
Benefits of Identity-driven design:
- Composability: Same Identity enables pipeline modification (After, Before, Remove, Replace)
- No Magic Strings: Variables prevent typos and runtime errors
- IDE Support: Auto-completion and refactoring work perfectly
- Debugging: Error messages show meaningful processor names with descriptions
- Searchability: Easy to find all usages of a processor
- Self-Documenting: Identity descriptions explain what each processor does
// Example: Error messages are more helpful with Identity names
result, err := pipeline.Process(ctx, order)
if err != nil {
// Error: "failed at [order-pipeline -> validate_order]: invalid order total"
// Much clearer than: "failed at [pipeline1 -> proc1]: invalid order total"
}
// Example: Easy to modify pipelines using Identity variables
func addFraudCheck(seq *pipz.Sequence[Order]) {
// Use the same ValidateOrderID variable for lookup
var FraudCheckID = pipz.NewIdentity("fraud_check", "Checks for fraudulent orders")
fraudCheck := pipz.Apply(FraudCheckID, checkFraud)
seq.After(ValidateOrderID, fraudCheck) // Insert after validation
}
3. Explicit Error Handling
Make error handling visible and intentional:
// Define identities as package-level variables
var (
PaymentFlowID = pipz.NewIdentity("payment-flow", "Payment processing with error handling")
ValidatePaymentID = pipz.NewIdentity("validate", "Validates payment details")
ChargeCardID = pipz.NewIdentity("charge", "Charges payment card")
NotifyHandlerID = pipz.NewIdentity("notify-with-logging", "Notification with error logging")
NotifyID = pipz.NewIdentity("notify", "Sends notification")
LogNotifyErrorID = pipz.NewIdentity("log_notification_error", "Logs notification errors")
BadProcessID = pipz.NewIdentity("process", "Processes payment (too much in one place)")
)
// Good: Clear error strategy with Identity variables
payment := pipz.NewSequence(PaymentFlowID,
pipz.Apply(ValidatePaymentID, validatePayment),
// Critical operation with retry
pipz.RetryWithBackoff(
pipz.Apply(ChargeCardID, chargeCard),
3,
time.Second,
),
// Non-critical with error handler
pipz.NewHandle(NotifyHandlerID,
pipz.Effect(NotifyID, sendNotification),
pipz.Effect(LogNotifyErrorID, func(ctx context.Context, err *pipz.Error[Order]) error {
log.Printf("Notification failed for order %s: %v", err.InputData.ID, err.Err)
return nil
}),
),
)
// Bad: Hidden error handling
payment := pipz.Apply(BadProcessID, func(ctx context.Context, p Payment) (Payment, error) {
// Validation, charging, and notification all mixed together
// Error handling buried in function
})
4. Type-Safe Routes
Use custom types for routing decisions:
// Good: Type-safe enum
type OrderPriority int
const (
PriorityStandard OrderPriority = iota
PriorityExpress
PriorityOvernight
)
router := pipz.Switch(
func(ctx context.Context, o Order) OrderPriority {
if o.ShippingMethod == "overnight" {
return PriorityOvernight
}
// ...
},
map[OrderPriority]pipz.Chainable[Order]{
PriorityStandard: standardFulfillment,
PriorityExpress: expressFulfillment,
PriorityOvernight: overnightFulfillment,
},
)
// Bad: Magic strings
router := pipz.Switch(
func(ctx context.Context, o Order) string {
return o.ShippingMethod // "standard", "express", etc.
},
map[string]pipz.Chainable[Order]{
"standard": standardFulfillment,
"express": expressFulfillment,
// Easy to typo, no compile-time checking
},
)
Pipeline Patterns
Pattern: Concurrent vs Sequential
Understanding when to use Concurrent versus Sequential is critical for performance:
Use Concurrent for I/O Operations
Concurrent is designed for operations with real latency where parallel execution provides benefit:
// Define identities as package-level variables
var (
SendNotificationsID = pipz.NewIdentity("send-notifications", "Sends notifications in parallel")
EnrichDataID = pipz.NewIdentity("enrich-data", "Fetches enrichment data in parallel")
)
// GOOD: Parallel API calls with actual network latency
notifications := pipz.NewConcurrent(SendNotificationsID,
nil, // No reducer needed for side effects
sendEmailNotification, // API call to email service
sendSMSNotification, // API call to SMS gateway
updateCRMSystem, // API call to CRM
pushToAnalytics, // API call to analytics service
)
// GOOD: Multiple database queries that don't depend on each other
enrichment := pipz.NewConcurrent(EnrichDataID,
nil, // No reducer needed
fetchUserProfile, // Database query
fetchAccountHistory, // Database query
fetchPreferences, // Database query
)
Use Sequential for Fast Operations
Sequential is better for CPU-bound operations or fast validations:
// Define identities as package-level variables
var ValidationsID = pipz.NewIdentity("validate", "Validates user data")
// BAD: Using Concurrent for simple validations
validations := pipz.NewConcurrent(ValidationsID, nil, // ❌ Don't do this!
validateEmail, // Simple regex check
validateAge, // Number comparison
validateCountry, // String check
)
// GOOD: Sequential for fast operations
validations := pipz.NewSequence(ValidationsID, // ✅ Better!
validateEmail,
validateAge,
validateCountry,
)
Performance Implications
Concurrent creates copies of your data (using the Cloner interface) for goroutine isolation:
// This happens internally in Concurrent:
// 1. data.Clone() called for each processor
// 2. Goroutine spawned for each
// 3. Original input returned (modifications discarded)
// For simple operations, the overhead of:
// - Cloning data
// - Spawning goroutines
// - Channel communication
// ...exceeds the benefit of parallelism
Rule of thumb: If an operation takes less than 10ms, use Sequential.
Pattern: Validation First
Always validate early to fail fast:
// Define identities as package-level variables
var (
ValidationPipelineID = pipz.NewIdentity("validation-pipeline", "Validation-first processing")
ValidateStructureID = pipz.NewIdentity("validate_structure", "Validates data structure")
ValidateBusinessRulesID = pipz.NewIdentity("validate_business_rules", "Validates business rules")
EnrichFromAPIID = pipz.NewIdentity("enrich_from_api", "Enriches data from API")
CalculatePricingID = pipz.NewIdentity("calculate_pricing", "Calculates pricing")
SaveToDatabaseID = pipz.NewIdentity("save_to_database", "Saves to database")
)
pipeline := pipz.NewSequence(ValidationPipelineID,
// Validate first - cheap and catches errors early
pipz.Apply(ValidateStructureID, validateStructure),
pipz.Apply(ValidateBusinessRulesID, validateBusinessRules),
// Then expensive operations
pipz.Apply(EnrichFromAPIID, enrichFromAPI),
pipz.Apply(CalculatePricingID, calculatePricing),
pipz.Apply(SaveToDatabaseID, saveToDatabase),
)
Pattern: Graceful Degradation
Continue processing even when non-critical operations fail:
// Define identities as package-level variables
var (
ProcessingPipelineID = pipz.NewIdentity("processing-pipeline", "Processing with graceful degradation")
ValidateID = pipz.NewIdentity("validate", "Validates input")
ProcessID = pipz.NewIdentity("process", "Processes data")
SaveID = pipz.NewIdentity("save", "Saves data")
AddRecommendationsID = pipz.NewIdentity("add_recommendations", "Adds product recommendations")
NotificationsID = pipz.NewIdentity("notifications", "Sends notifications in parallel")
EmailID = pipz.NewIdentity("email", "Sends email")
LogEmailErrorID = pipz.NewIdentity("log_email_error", "Logs email errors")
AnalyticsID = pipz.NewIdentity("analytics", "Tracks analytics")
EmailHandlerID = pipz.NewIdentity("email-handler", "Email with error handling")
)
pipeline := pipz.NewSequence(ProcessingPipelineID,
// Critical path
pipz.Apply(ValidateID, validate),
pipz.Apply(ProcessID, process),
pipz.Apply(SaveID, save),
// Best-effort enrichments
pipz.Enrich(AddRecommendationsID, func(ctx context.Context, order Order) (Order, error) {
recs, err := recommendationService.Get(ctx, order.UserID)
if err != nil {
return order, err // Enrich will ignore this
}
order.Recommendations = recs
return order, nil
}),
// Non-blocking notifications
pipz.NewConcurrent(NotificationsID, nil,
pipz.NewHandle(EmailHandlerID,
pipz.Effect(EmailID, sendEmail),
pipz.Effect(LogEmailErrorID, logError),
),
pipz.Effect(AnalyticsID, trackAnalytics),
),
)
Pattern: Bulkhead Isolation
Isolate failures to prevent cascade. The key insight is that bulkheads are simple to connect when operating on the same type:
// Define identities as package-level variables
var (
OrderPipelineID = pipz.NewIdentity("order-pipeline", "Core order processing")
NotificationTimeoutID = pipz.NewIdentity("notification-timeout", "Timeout for notifications")
NotificationConcurrentID = pipz.NewIdentity("notifications", "Parallel notifications")
FullPipelineID = pipz.NewIdentity("full-pipeline", "Full order processing with notifications")
LogNotifyFailureID = pipz.NewIdentity("log_notification_failure", "Logs notification failures")
NotificationHandlerID = pipz.NewIdentity("notification-handler", "Handles notification errors")
)
// Separate pipelines for different concerns
orderPipeline := pipz.NewSequence(OrderPipelineID,
validateOrder,
processPayment,
updateInventory,
)
// Isolated notification pipeline
notificationPipeline := pipz.NewTimeout(NotificationTimeoutID,
pipz.NewConcurrent(NotificationConcurrentID, nil,
sendEmail,
sendSMS,
updateCRM,
),
5*time.Second, // Don't let notifications block orders
)
// Compose with isolation - easy because both operate on Order type
fullPipeline := pipz.NewSequence(FullPipelineID,
orderPipeline,
pipz.NewHandle(NotificationHandlerID,
notificationPipeline,
pipz.Effect(LogNotifyFailureID, logError),
),
)
Pattern: Orchestration Pipeline
The ultimate pattern is to build workflows as individual pipelines for discrete types, then create an orchestration pipeline that coordinates them:
// Step 1: Define all identities as package-level variables
var (
// Domain pipeline identities
OrderWorkflowID = pipz.NewIdentity("order-workflow", "Order validation and pricing")
CustomerWorkflowID = pipz.NewIdentity("customer-workflow", "Customer validation and credit check")
InvoiceWorkflowID = pipz.NewIdentity("invoice-workflow", "Invoice generation and formatting")
// Orchestrator identities
OrchestratorID = pipz.NewIdentity("orchestrator", "Orchestrates order workflow processing")
ProcessOrderID = pipz.NewIdentity("process_order", "Processes order through order pipeline")
ProcessCustomerInvoiceID = pipz.NewIdentity("process_customer_invoice", "Processes customer and invoice in parallel")
FinalizeID = pipz.NewIdentity("finalize", "Final coordination and confirmation")
)
// Step 2: Build discrete pipelines for each type
type OrderWorkflow struct {
Order Order
Customer Customer
Invoice Invoice
Config WorkflowConfig
}
// Individual pipelines for each domain object
orderPipeline := pipz.NewSequence[Order](OrderWorkflowID,
validateOrder,
calculatePricing,
applyDiscounts,
)
customerPipeline := pipz.NewSequence[Customer](CustomerWorkflowID,
validateCustomer,
checkCreditLimit,
updateLoyaltyPoints,
)
invoicePipeline := pipz.NewSequence[Invoice](InvoiceWorkflowID,
generateInvoice,
applyTaxes,
formatForDisplay,
)
// Step 3: Create orchestration pipeline that coordinates execution
orchestrator := pipz.NewSequence[OrderWorkflow](OrchestratorID,
// Process order first
pipz.Apply(ProcessOrderID, func(ctx context.Context, wf OrderWorkflow) (OrderWorkflow, error) {
processed, err := orderPipeline.Process(ctx, wf.Order)
if err != nil {
return wf, fmt.Errorf("order processing failed: %w", err)
}
wf.Order = processed
return wf, nil
}),
// Process customer in parallel with invoice if order succeeded
pipz.Apply(ProcessCustomerInvoiceID, func(ctx context.Context, wf OrderWorkflow) (OrderWorkflow, error) {
var wg sync.WaitGroup
var custErr, invErr error
wg.Add(2)
// Process customer
go func() {
defer wg.Done()
processed, err := customerPipeline.Process(ctx, wf.Customer)
if err != nil {
custErr = err
return
}
wf.Customer = processed
}()
// Process invoice
go func() {
defer wg.Done()
// Invoice needs order data
wf.Invoice.OrderID = wf.Order.ID
wf.Invoice.Amount = wf.Order.Total
processed, err := invoicePipeline.Process(ctx, wf.Invoice)
if err != nil {
invErr = err
return
}
wf.Invoice = processed
}()
wg.Wait()
// Handle errors based on config
if custErr != nil && wf.Config.CustomerRequired {
return wf, fmt.Errorf("customer processing failed: %w", custErr)
}
if invErr != nil && wf.Config.InvoiceRequired {
return wf, fmt.Errorf("invoice processing failed: %w", invErr)
}
return wf, nil
}),
// Final coordination
pipz.Apply(FinalizeID, func(ctx context.Context, wf OrderWorkflow) (OrderWorkflow, error) {
// Any final coordination logic
if wf.Config.SendConfirmation {
// Send order confirmation with all processed data
sendOrderConfirmation(wf.Order, wf.Customer, wf.Invoice)
}
return wf, nil
}),
)
// Usage
workflow := OrderWorkflow{
Order: order,
Customer: customer,
Invoice: Invoice{},
Config: WorkflowConfig{
CustomerRequired: true,
InvoiceRequired: false,
SendConfirmation: true,
},
}
result, err := orchestrator.Process(ctx, workflow)
This pattern provides:
- Type Safety: Each pipeline operates on its specific type
- Reusability: Individual pipelines can be reused in different workflows
- Testability: Each pipeline can be tested independently
- Flexibility: Orchestration logic can change without modifying domain pipelines
- Clear Dependencies: The orchestrator explicitly manages data flow between pipelines
Pattern: Feature Flags
Support gradual rollouts and A/B testing:
// Define identities as package-level variables
var FeatureFlagPipelineID = pipz.NewIdentity("feature-flag-pipeline", "Pipeline with feature flags")
func createPipeline(features FeatureFlags) pipz.Chainable[Order] {
seq := pipz.NewSequence[Order](FeatureFlagPipelineID)
// Core processors always included
seq.Register(validateOrder, calculatePricing)
// Conditionally add processors
if features.IsEnabled("fraud-detection-v2") {
seq.Register(fraudDetectionV2)
} else {
seq.Register(fraudDetectionV1)
}
if features.IsEnabled("loyalty-points") {
seq.Register(calculateLoyaltyPoints)
}
seq.Register(chargePayment, fulfillOrder)
return seq
}
Error Handling Strategies
Error Propagation Pattern
Understanding how errors flow through your pipelines is critical for proper handling:
┌──────────────────────────────────────────────────────────────────┐
│ Error Propagation in Pipelines │
└──────────────────────────────────────────────────────────────────┘
Sequential Pipeline:
═══════════════════
Pipeline A
├─→ Processor 1 [✓] Success
│ ↓
├─→ Processor 2 [✗] Failure ──→ Error[T]{Stage: "proc2", Cause: err}
│ ↓ ↓
│ Pipeline Stops ▼
│ Returns to caller
└─→ Processor 3 [⋯] Never runs
Nested Pipeline:
════════════════
Main Pipeline
├─→ Validate [✓]
│ ↓
├─→ Sub-Pipeline ──┐
│ ├─→ Step A [✓] │
│ ├─→ Step B [✗]─┼──→ Error wrapped with sub-pipeline context
│ └─→ Step C [⋯] │ ↓
│ │ ▼
│ └──→ Error[T]{Stage: "sub-pipeline.step-b"}
│ ↓
└─→ Save [⋯] Never runs ▼
Main Pipeline Stops
Concurrent Pipeline:
════════════════════
Concurrent Connector
├─→ Service A [✓] Completes
├─→ Service B [✗] Fails ──→ Error logged but doesn't stop others
├─→ Service C [✓] Completes
└─→ Returns original input (errors don't propagate up)
With Error Handler:
════════════════════
Pipeline with Recovery
├─→ Process [✗] ──→ Error[T]
│ ↓ ↓
│ Fallback ←──────────┘
│ ↓
└─→ Continue [✓] Pipeline continues with fallback result
Key Insights:
- Sequential stops on first error - Subsequent processors never run
- Errors carry context - Stage name, cause, and data state at failure
- Concurrent doesn't propagate errors - Failures are isolated
- Fallback enables recovery - Convert errors back to success path
Strategy: Categorize and Route
type ErrorCategory string
const (
CategoryValidation ErrorCategory = "validation"
CategoryTransient ErrorCategory = "transient"
CategoryBusiness ErrorCategory = "business"
CategorySystem ErrorCategory = "system"
)
func handleError(ctx context.Context, failure FailedOrder) (FailedOrder, error) {
category := categorizeError(failure.Error)
switch category {
case CategoryValidation:
// Don't retry bad input
return sendToDeadLetter(ctx, failure)
case CategoryTransient:
// Retry with backoff
return pipz.RetryWithBackoff(
reprocessOrder,
5,
time.Second,
).Process(ctx, failure)
case CategoryBusiness:
// Needs human intervention
return sendToManualReview(ctx, failure)
case CategorySystem:
// Alert ops team
alertOps(failure.Error)
return sendToDeadLetter(ctx, failure)
}
return failure, failure.Error
}
Strategy: Rate Limiting and Circuit Breaking
Rate Limiting Best Practices
Use RateLimiter to protect downstream services and respect API limits:
// Define identities as package-level variables
var (
GlobalLimiterID = pipz.NewIdentity("global-limit", "Global system rate limit")
ServiceLimiterID = pipz.NewIdentity("stripe-limit", "Stripe API rate limit")
EndpointLimiterID = pipz.NewIdentity("charges-limit", "Charges endpoint rate limit")
PaymentPipelineID = pipz.NewIdentity("payment-pipeline", "Payment processing pipeline")
ChargeID = pipz.NewIdentity("charge", "Processes payment charge")
)
// Pattern: Layer rate limits (global -> service -> endpoint)
globalLimiter := pipz.NewRateLimiter(GlobalLimiterID, 10000, 1000)
serviceLimiter := pipz.NewRateLimiter(ServiceLimiterID, 100, 20) // Stripe's actual limits
endpointLimiter := pipz.NewRateLimiter(EndpointLimiterID, 50, 10)
pipeline := pipz.NewSequence(PaymentPipelineID,
globalLimiter, // Global system limit
serviceLimiter, // Per-service limit
endpointLimiter, // Per-endpoint limit
pipz.Apply(ChargeID, makePayment),
)
// Pattern: Dynamic rate adjustment based on conditions
func adjustRateLimit(limiter *pipz.RateLimiter[Request], config Config) {
if config.OffPeakHours {
limiter.SetRate(1000) // Higher rate during off-peak
} else {
limiter.SetRate(100) // Lower rate during peak
}
if config.PremiumTier {
limiter.SetMode("wait") // Wait for premium users
} else {
limiter.SetMode("drop") // Fail fast for basic users
}
}
// Pattern: Per-user rate limiting with Switch
// Define identities as package-level variables
var (
UserRateLimitID = pipz.NewIdentity("user-rate-limit", "Routes by user tier")
PremiumRateID = pipz.NewIdentity("premium-rate", "Premium tier rate limit")
StandardRateID = pipz.NewIdentity("standard-rate", "Standard tier rate limit")
FreeRateID = pipz.NewIdentity("free-rate", "Free tier rate limit")
)
userLimiter := pipz.NewSwitch(UserRateLimitID,
func(ctx context.Context, req Request) string {
return getUserTier(req.UserID)
},
).
AddRoute("premium", pipz.NewRateLimiter(PremiumRateID, 1000, 100)).
AddRoute("standard", pipz.NewRateLimiter(StandardRateID, 100, 10)).
AddRoute("free", pipz.NewRateLimiter(FreeRateID, 10, 1))
Circuit Breaker Best Practices
Use CircuitBreaker to prevent cascade failures and give services time to recover:
// Define identities as package-level variables
var (
StripeBreakerID = pipz.NewIdentity("stripe-breaker", "Circuit breaker for Stripe API")
StripeChargeID = pipz.NewIdentity("stripe-charge", "Charges via Stripe")
DBBreakerID = pipz.NewIdentity("db-breaker", "Circuit breaker for database")
DBSaveID = pipz.NewIdentity("db-save", "Saves to database")
)
// Pattern: Circuit breaker per external dependency
stripeBreaker := pipz.NewCircuitBreaker(StripeBreakerID,
pipz.Apply(StripeChargeID, chargeStripe),
5, // Open after 5 failures
30*time.Second, // Try recovery after 30s
)
dbBreaker := pipz.NewCircuitBreaker(DBBreakerID,
pipz.Apply(DBSaveID, saveToDatabase),
10, // More tolerant for internal services
time.Minute, // Longer recovery time
)
// Pattern: Combine with rate limiting for comprehensive protection
// Define identities as package-level variables
var (
ResilientAPIID = pipz.NewIdentity("resilient-api", "Resilient API pipeline")
APIRateID = pipz.NewIdentity("api-rate", "API rate limiter")
APIBreakerID = pipz.NewIdentity("api-breaker", "API circuit breaker")
APIRetryID = pipz.NewIdentity("api-retry", "API retry handler")
APICallID = pipz.NewIdentity("api-call", "External API call")
)
resilientAPI := pipz.NewSequence(ResilientAPIID,
pipz.NewRateLimiter(APIRateID, 100, 20), // Rate limit first
pipz.NewCircuitBreaker(APIBreakerID, // Then circuit break
pipz.NewRetry(APIRetryID, // With retry inside
pipz.Apply(APICallID, callExternalAPI),
3,
),
5, 30*time.Second,
),
)
// Pattern: Graduated thresholds based on service type
func createCircuitBreaker(service string, serviceType ServiceType, processor pipz.Chainable[Request]) *pipz.CircuitBreaker[Request] {
// Dynamic Identity when service name is determined at runtime
breakerID := pipz.NewIdentity(service+"-breaker", "Circuit breaker for "+service)
switch serviceType {
case ServiceTypeExternal:
// External services: fail fast, longer recovery
return pipz.NewCircuitBreaker(breakerID, processor, 3, 2*time.Minute)
case ServiceTypeInternal:
// Internal services: more tolerant, shorter recovery
return pipz.NewCircuitBreaker(breakerID, processor, 10, 30*time.Second)
case ServiceTypeCritical:
// Critical services: very tolerant, quick recovery attempts
return pipz.NewCircuitBreaker(breakerID, processor, 20, 10*time.Second)
default:
return pipz.NewCircuitBreaker(breakerID, processor, 5, time.Minute)
}
}
// Pattern: Monitor and adjust circuit breakers
func monitorCircuits(breakers map[string]*pipz.CircuitBreaker[Request]) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
for name, breaker := range breakers {
state := breaker.GetState()
metrics.Gauge("circuit.state", stateToValue(state), "service", name)
switch state {
case "open":
log.Warn("Circuit breaker open", "service", name)
// Consider alerting operations team
case "half-open":
log.Info("Circuit breaker testing recovery", "service", name)
}
}
}
}
Combined Resilience Patterns
// Pattern: Complete resilience stack
func createResilientProcessor(name string, processor pipz.Chainable[Request]) pipz.Chainable[Request] {
// Create dynamic identities when name is determined at runtime
resilientID := pipz.NewIdentity(name+"-resilient", "Resilient wrapper for "+name)
rateID := pipz.NewIdentity(name+"-rate", "Rate limiter for "+name)
timeoutID := pipz.NewIdentity(name+"-timeout", "Timeout for "+name)
breakerID := pipz.NewIdentity(name+"-breaker", "Circuit breaker for "+name)
retryID := pipz.NewIdentity(name+"-retry", "Retry handler for "+name)
return pipz.NewSequence(resilientID,
// 1. Rate limiting (protect downstream)
pipz.NewRateLimiter(rateID, 100, 20),
// 2. Timeout (bound operation time)
pipz.NewTimeout(timeoutID,
// 3. Circuit breaker (prevent cascade failures)
pipz.NewCircuitBreaker(breakerID,
// 4. Retry (handle transient failures)
pipz.NewRetry(retryID, processor, 3),
5, 30*time.Second,
),
10*time.Second,
),
)
}
// Pattern: Service mesh style protection
// Define identities as package-level variables
var (
ServiceMeshID = pipz.NewIdentity("service-mesh", "Service mesh with fallback")
FallbackBreakerID = pipz.NewIdentity("fallback-breaker", "Fallback service circuit breaker")
FallbackTimeoutID = pipz.NewIdentity("fallback-timeout", "Fallback service timeout")
)
serviceCall := pipz.NewFallback(ServiceMeshID,
// Primary service with full protection
createResilientProcessor("primary", primaryService),
// Fallback service with lighter protection
pipz.NewCircuitBreaker(FallbackBreakerID,
pipz.NewTimeout(FallbackTimeoutID, fallbackService, 5*time.Second),
3, 10*time.Second,
),
)
Configuration Best Practices
// Pattern: Configuration-driven circuit breaker settings
type CircuitConfig struct {
FailureThreshold int `yaml:"failure_threshold"`
SuccessThreshold int `yaml:"success_threshold"`
ResetTimeout time.Duration `yaml:"reset_timeout"`
}
type RateConfig struct {
Rate float64 `yaml:"rate"`
Burst int `yaml:"burst"`
Mode string `yaml:"mode"`
}
func configureConnectors(breaker *pipz.CircuitBreaker[Request], limiter *pipz.RateLimiter[Request], cfg Config) {
// Circuit breaker configuration
breaker.SetFailureThreshold(cfg.Circuit.FailureThreshold).
SetSuccessThreshold(cfg.Circuit.SuccessThreshold).
SetResetTimeout(cfg.Circuit.ResetTimeout)
// Rate limiter configuration
limiter.SetRate(cfg.Rate.Rate).
SetBurst(cfg.Rate.Burst).
SetMode(cfg.Rate.Mode)
}
Production Checklist
Design
- Each processor has single responsibility
- Error handling is explicit and appropriate
- Uses type-safe routing (no magic strings)
- Validates input early
- Non-critical operations don't block critical path
Resilience
- Timeouts on all external calls
- Retry logic for transient failures
- Circuit breakers for external dependencies
- Rate limiting to protect downstream services
- Graduated failure thresholds based on service type
- Graceful degradation for features
- Bulkhead isolation between components
- Combined resilience patterns (rate limit + circuit break + retry)
Observability
- Metrics on all processors
- Distributed tracing enabled
- Structured logging throughout
- Error categorization and alerting
- Performance benchmarks
Operations
- Feature flags for gradual rollout
- Dead letter queue for failed items
- Manual intervention workflow
- Runbooks for common issues
- Load testing completed
Anti-Patterns to Avoid
- God Processor: One processor doing everything
- Silent Failures: Swallowing errors without logging
- Unbounded Retries: Retrying forever without backoff
- Missing Timeouts: No time bounds on operations
- Shared Mutable State: Processors modifying shared data
- Magic Strings: Using strings instead of typed constants
- Hidden Dependencies: Processors with side effects on external state
- Circular Fallbacks: Creating recursive fallback chains that can cause stack overflow
Next Steps
- Performance Guide - Optimize for production
- Error Recovery - Advanced error patterns