Handle
Provides error observation and handling capabilities for processors.
Function Signature
func NewHandle[T any](
identity Identity,
processor Chainable[T],
errorHandler Chainable[*Error[T]],
) *Handle[T]
Parameters
identity(Identity) - Identifier for the connector used in debuggingprocessor- Main processor that might failerrorHandler- Pipeline that processes errors (receives*Error[T])
Returns
Returns a *Handle[T] that implements Chainable[T].
Behavior
- Error observation - Handler processes errors for side effects (logging, cleanup)
- Error pass-through - Original errors always propagate after handling
- Error as data - Errors flow through the error handler pipeline
- Handler errors ignored - Handler failures don't affect error propagation
- Success pass-through - Successful results bypass error handler
Key Insight
Handle provides error observation and cleanup. By wrapping a processor with Handle, you're saying "when this fails, I need to do something about it" - whether that's logging, cleanup, notifications, or compensation. The error always propagates after handling.
Example
// Log errors with context
logged := pipz.NewHandle(
pipz.NewIdentity("order-logging", "Logs order processing errors"),
processOrder,
pipz.Effect(
pipz.NewIdentity("log-error", "Logs order failure details"),
func(ctx context.Context, err *pipz.Error[Order]) error {
log.Printf("Order %s failed at %s: %v",
err.InputData.ID, err.Path, err.Err)
metrics.Increment("order.failures")
return nil
},
),
)
// Clean up resources on failure
withCleanup := pipz.NewHandle(
pipz.NewIdentity("inventory-management", "Manages inventory with cleanup"),
pipz.NewSequence[Order](
reserveInventory,
chargePayment,
confirmOrder,
),
pipz.Effect(
pipz.NewIdentity("cleanup", "Releases inventory on failure"),
func(ctx context.Context, err *pipz.Error[Order]) error {
if err.InputData.ReservationID != "" {
log.Printf("Releasing inventory for failed order %s", err.InputData.ID)
inventory.Release(err.InputData.ReservationID)
}
return nil
},
),
)
// Send notifications on failure
notifying := pipz.NewHandle(
pipz.NewIdentity("payment-alerts", "Alerts on payment failures"),
processPayment,
pipz.Effect(
pipz.NewIdentity("notify", "Sends alert for high-value failures"),
func(ctx context.Context, err *pipz.Error[Payment]) error {
if err.InputData.Amount > 10000 {
// Alert on large payment failures
alerting.SendHighValuePaymentFailure(err.InputData, err.Err)
}
return nil
},
),
)
When to Use
Use Handle when:
- You need to observe errors without stopping them (logging, metrics)
- You need to perform cleanup on failure (release resources)
- Errors require logging with additional context
- You want to send notifications or alerts on failure
- You need to implement compensation logic
- You want to collect metrics about failures
- Different errors require different side effects
When NOT to Use
Don't use Handle when:
- You just need to suppress errors (use
Fallbackwith default value) - Simple retry is sufficient (use
Retry) - You want to transform errors into values (use
Fallback) - No cleanup or side effects are needed on failure
- You want to recover from errors (use
Fallback- Handle only observes)
Error Handler Access
The error handler receives *Error[T] with full context:
type Error[T any] struct {
Path []Identity // Full path through processors
Err error // Original error
InputData T // Input when error occurred
Timeout bool // Was this a timeout?
Canceled bool // Was this cancelled?
Timestamp time.Time // When the error occurred
Duration time.Duration // Processing time before error
}
Common Patterns
// Resource cleanup pattern
inventoryCleanup := pipz.NewHandle(
pipz.NewIdentity("order-with-cleanup", "Order processing with inventory cleanup"),
pipz.NewSequence[Order](
validateOrder,
reserveInventory,
chargePayment,
shipOrder,
),
pipz.Effect(
pipz.NewIdentity("release-inventory", "Releases inventory on failure"),
func(ctx context.Context, err *pipz.Error[Order]) error {
if reservation := err.InputData.ReservationID; reservation != "" {
log.Printf("Releasing inventory reservation %s after failure", reservation)
if releaseErr := inventory.Release(reservation); releaseErr != nil {
log.Printf("Failed to release inventory: %v", releaseErr)
}
}
return nil
},
),
)
// Monitoring and alerting
monitoredPayment := pipz.NewHandle(
pipz.NewIdentity("payment-monitoring", "Payment processing with monitoring"),
processPayment,
pipz.Effect(
pipz.NewIdentity("monitor", "Records payment metrics and alerts"),
func(ctx context.Context, err *pipz.Error[Payment]) error {
metrics.RecordPaymentFailure(err.InputData.Method, err.Err)
if err.InputData.Amount > alertThreshold {
alerting.NotifyHighValueFailure(err.InputData, err.Err)
}
if err.Timeout {
log.Printf("Payment timeout after %v", err.Duration)
metrics.RecordTimeout("payment", err.Duration)
}
return nil
},
),
)
// Compensation pattern
compensatingTransaction := pipz.NewHandle(
pipz.NewIdentity("transfer-with-compensation", "Transfer with compensating transaction"),
pipz.NewSequence[Transfer](
debitSource,
creditDestination,
recordTransaction,
),
pipz.Effect(
pipz.NewIdentity("compensate", "Compensates for partial transfer failure"),
func(ctx context.Context, err *pipz.Error[Transfer]) error {
// Determine how far we got
failedAt := err.Path[len(err.Path)-1]
switch failedAt {
case "recordTransaction":
// Both debit and credit succeeded, just logging failed
log.Printf("Transaction completed but not recorded: %v", err.InputData)
// Try to record in backup system
backupLog.Record(err.InputData)
case "creditDestination":
// Debit succeeded but credit failed - must reverse
log.Printf("Reversing debit due to credit failure")
if reverseErr := reverseDebit(err.InputData); reverseErr != nil {
// Critical - manual intervention needed
alerting.CriticalAlert("Failed to reverse debit", err.InputData, reverseErr)
}
}
return nil
},
),
)
Advanced Error Flows
// Define identities upfront
var (
ParallelRecoveryID = pipz.NewIdentity("parallel-recovery", "Parallel error handling and recovery")
LogID = pipz.NewIdentity("log", "Logs error to central system")
MetricsID = pipz.NewIdentity("metrics", "Updates error dashboard")
BackupID = pipz.NewIdentity("backup", "Saves failed request")
NotifyID = pipz.NewIdentity("notify", "Notifies on-call team")
)
// Parallel error handling
parallelRecovery := pipz.NewHandle(ParallelRecoveryID,
mainProcess,
pipz.NewConcurrent[*pipz.Error[Data]](
pipz.Effect(LogID, logToCentralSystem),
pipz.Effect(MetricsID, updateDashboard),
pipz.Effect(BackupID, saveFailedRequest),
pipz.Apply(NotifyID, notifyOnCallTeam),
),
)
// Define identities for nested handling
var (
OuterID = pipz.NewIdentity("outer", "Outer error handler")
InnerID = pipz.NewIdentity("inner", "Inner error handler")
InnerLogID = pipz.NewIdentity("inner-log", "Logs inner errors")
OuterLogID = pipz.NewIdentity("outer-log", "Logs outer errors")
BatchID = pipz.NewIdentity("batch", "Batch processing with error aggregation")
CollectID = pipz.NewIdentity("collect", "Collects and aggregates errors")
)
// Nested error handling
nestedHandling := pipz.NewHandle(OuterID,
pipz.NewHandle(InnerID,
riskyOperation,
pipz.Effect(InnerLogID, logInnerError),
),
pipz.Effect(OuterLogID,
func(ctx context.Context, err *pipz.Error[Data]) error {
// This catches errors from both riskyOperation and inner-log
log.Printf("Outer handler: %v", err)
return nil
},
),
)
// Error aggregation for batch processing
batchErrors := pipz.NewHandle(BatchID,
batchProcessor,
pipz.Apply(CollectID,
func(ctx context.Context, err *pipz.Error[Batch]) (*pipz.Error[Batch], error) {
errorCollector.Add(err)
if errorCollector.Count() > errorThreshold {
// Trigger batch error recovery
triggerBatchRecovery(errorCollector.GetAll())
}
return err, nil
},
),
)
Gotchas
❌ Don't use Handle to suppress errors
// Define identities upfront
var (
SuppressID = pipz.NewIdentity("suppress", "Attempt to suppress errors")
LogID = pipz.NewIdentity("log", "Logs error")
)
// WRONG - Handle doesn't suppress errors
handle := pipz.NewHandle(SuppressID,
failingProcessor,
pipz.Effect(LogID, logError), // Error still propagates!
)
✅ Use Fallback to recover from errors
// Define identity upfront
var RecoverID = pipz.NewIdentity("recover", "Recovery with fallback")
// RIGHT - Fallback provides recovery
fallback := pipz.NewFallback(RecoverID,
failingProcessor,
defaultProcessor, // This runs on error
)
❌ Don't ignore handler errors in critical cleanup
// WRONG - Critical cleanup might fail silently
handle := pipz.NewHandle(
pipz.NewIdentity("cleanup", "Cleanup with ignored errors"),
processor,
pipz.Effect(
pipz.NewIdentity("release", "Releases resources"),
func(ctx context.Context, err *pipz.Error[Data]) error {
return releaseResources() // Error is ignored!
},
),
)
✅ Log critical cleanup failures
// RIGHT - Track cleanup failures
handle := pipz.NewHandle(
pipz.NewIdentity("cleanup", "Cleanup with error tracking"),
processor,
pipz.Effect(
pipz.NewIdentity("release", "Releases resources with logging"),
func(ctx context.Context, err *pipz.Error[Data]) error {
if releaseErr := releaseResources(); releaseErr != nil {
log.Printf("CRITICAL: Failed to release resources: %v", releaseErr)
alerting.SendCritical("Resource leak", releaseErr)
}
return nil // Handler errors don't affect flow anyway
},
),
)
Best Practices
// Clear separation of concerns
// GOOD: Handle for cleanup, Fallback for recovery
var WithRecoveryID = pipz.NewIdentity("with-recovery", "Pattern with cleanup and recovery")
goodPattern := pipz.NewFallback(WithRecoveryID,
pipz.NewHandle(
pipz.NewIdentity("with-cleanup", "Operation with cleanup"),
riskyOperation,
pipz.Effect(
pipz.NewIdentity("cleanup", "Cleans up resources on failure"),
func(ctx context.Context, err *pipz.Error[Data]) error {
// Clean up resources
cleanup(err.InputData)
return nil
},
),
),
fallbackOperation, // This provides the recovery
)
// Resource management
// GOOD: Always clean up acquired resources
fileProcessor := pipz.NewHandle(
pipz.NewIdentity("file-processing", "Processes file with cleanup"),
pipz.Apply(
pipz.NewIdentity("process", "Processes file"),
func(ctx context.Context, path string) (Result, error) {
file, err := os.Open(path)
if err != nil {
return Result{}, err
}
defer file.Close()
// ... processing ...
},
),
pipz.Effect(
pipz.NewIdentity("cleanup-temp", "Cleans up temporary files"),
func(ctx context.Context, err *pipz.Error[string]) error {
// Clean up any temporary files created
tempPath := filepath.Join(os.TempDir(), filepath.Base(err.InputData))
os.Remove(tempPath)
return nil
},
),
)
// Comprehensive monitoring
// GOOD: Collect all relevant metrics
monitoredService := pipz.NewHandle(
pipz.NewIdentity("monitored", "Service with comprehensive monitoring"),
externalService,
pipz.Effect(
pipz.NewIdentity("metrics", "Records service metrics and errors"),
func(ctx context.Context, err *pipz.Error[Request]) error {
labels := map[string]string{
"service": "external",
"method": err.InputData.Method,
"error": errorType(err.Err),
}
metrics.RecordError(labels)
metrics.RecordLatency(err.Duration, labels)
if err.Timeout {
metrics.RecordTimeout(labels)
}
return nil
},
),
)
See Also
- Fallback - For simple primary/backup patterns
- Retry - For retry logic
- Switch - Often used within error handlers
- Concurrent - For parallel error handling