zoobzio December 12, 2025 Edit this page

Handle

Provides error observation and handling capabilities for processors.

Function Signature

func NewHandle[T any](
    identity Identity,
    processor Chainable[T],
    errorHandler Chainable[*Error[T]],
) *Handle[T]

Parameters

  • identity (Identity) - Identifier for the connector used in debugging
  • processor - Main processor that might fail
  • errorHandler - Pipeline that processes errors (receives *Error[T])

Returns

Returns a *Handle[T] that implements Chainable[T].

Behavior

  • Error observation - Handler processes errors for side effects (logging, cleanup)
  • Error pass-through - Original errors always propagate after handling
  • Error as data - Errors flow through the error handler pipeline
  • Handler errors ignored - Handler failures don't affect error propagation
  • Success pass-through - Successful results bypass error handler

Key Insight

Handle provides error observation and cleanup. By wrapping a processor with Handle, you're saying "when this fails, I need to do something about it" - whether that's logging, cleanup, notifications, or compensation. The error always propagates after handling.

Example

// Log errors with context
logged := pipz.NewHandle(
    pipz.NewIdentity("order-logging", "Logs order processing errors"),
    processOrder,
    pipz.Effect(
        pipz.NewIdentity("log-error", "Logs order failure details"),
        func(ctx context.Context, err *pipz.Error[Order]) error {
            log.Printf("Order %s failed at %s: %v",
                err.InputData.ID, err.Path, err.Err)
            metrics.Increment("order.failures")
            return nil
        },
    ),
)

// Clean up resources on failure
withCleanup := pipz.NewHandle(
    pipz.NewIdentity("inventory-management", "Manages inventory with cleanup"),
    pipz.NewSequence[Order](
        reserveInventory,
        chargePayment,
        confirmOrder,
    ),
    pipz.Effect(
        pipz.NewIdentity("cleanup", "Releases inventory on failure"),
        func(ctx context.Context, err *pipz.Error[Order]) error {
            if err.InputData.ReservationID != "" {
                log.Printf("Releasing inventory for failed order %s", err.InputData.ID)
                inventory.Release(err.InputData.ReservationID)
            }
            return nil
        },
    ),
)

// Send notifications on failure
notifying := pipz.NewHandle(
    pipz.NewIdentity("payment-alerts", "Alerts on payment failures"),
    processPayment,
    pipz.Effect(
        pipz.NewIdentity("notify", "Sends alert for high-value failures"),
        func(ctx context.Context, err *pipz.Error[Payment]) error {
            if err.InputData.Amount > 10000 {
                // Alert on large payment failures
                alerting.SendHighValuePaymentFailure(err.InputData, err.Err)
            }
            return nil
        },
    ),
)

When to Use

Use Handle when:

  • You need to observe errors without stopping them (logging, metrics)
  • You need to perform cleanup on failure (release resources)
  • Errors require logging with additional context
  • You want to send notifications or alerts on failure
  • You need to implement compensation logic
  • You want to collect metrics about failures
  • Different errors require different side effects

When NOT to Use

Don't use Handle when:

  • You just need to suppress errors (use Fallback with default value)
  • Simple retry is sufficient (use Retry)
  • You want to transform errors into values (use Fallback)
  • No cleanup or side effects are needed on failure
  • You want to recover from errors (use Fallback - Handle only observes)

Error Handler Access

The error handler receives *Error[T] with full context:

type Error[T any] struct {
    Path      []Identity    // Full path through processors
    Err       error         // Original error
    InputData T             // Input when error occurred
    Timeout   bool          // Was this a timeout?
    Canceled  bool          // Was this cancelled?
    Timestamp time.Time     // When the error occurred
    Duration  time.Duration // Processing time before error
}

Common Patterns

// Resource cleanup pattern
inventoryCleanup := pipz.NewHandle(
    pipz.NewIdentity("order-with-cleanup", "Order processing with inventory cleanup"),
    pipz.NewSequence[Order](
        validateOrder,
        reserveInventory,
        chargePayment,
        shipOrder,
    ),
    pipz.Effect(
        pipz.NewIdentity("release-inventory", "Releases inventory on failure"),
        func(ctx context.Context, err *pipz.Error[Order]) error {
            if reservation := err.InputData.ReservationID; reservation != "" {
                log.Printf("Releasing inventory reservation %s after failure", reservation)
                if releaseErr := inventory.Release(reservation); releaseErr != nil {
                    log.Printf("Failed to release inventory: %v", releaseErr)
                }
            }
            return nil
        },
    ),
)

// Monitoring and alerting
monitoredPayment := pipz.NewHandle(
    pipz.NewIdentity("payment-monitoring", "Payment processing with monitoring"),
    processPayment,
    pipz.Effect(
        pipz.NewIdentity("monitor", "Records payment metrics and alerts"),
        func(ctx context.Context, err *pipz.Error[Payment]) error {
            metrics.RecordPaymentFailure(err.InputData.Method, err.Err)

            if err.InputData.Amount > alertThreshold {
                alerting.NotifyHighValueFailure(err.InputData, err.Err)
            }

            if err.Timeout {
                log.Printf("Payment timeout after %v", err.Duration)
                metrics.RecordTimeout("payment", err.Duration)
            }

            return nil
        },
    ),
)

// Compensation pattern
compensatingTransaction := pipz.NewHandle(
    pipz.NewIdentity("transfer-with-compensation", "Transfer with compensating transaction"),
    pipz.NewSequence[Transfer](
        debitSource,
        creditDestination,
        recordTransaction,
    ),
    pipz.Effect(
        pipz.NewIdentity("compensate", "Compensates for partial transfer failure"),
        func(ctx context.Context, err *pipz.Error[Transfer]) error {
            // Determine how far we got
            failedAt := err.Path[len(err.Path)-1]

            switch failedAt {
            case "recordTransaction":
                // Both debit and credit succeeded, just logging failed
                log.Printf("Transaction completed but not recorded: %v", err.InputData)
                // Try to record in backup system
                backupLog.Record(err.InputData)

            case "creditDestination":
                // Debit succeeded but credit failed - must reverse
                log.Printf("Reversing debit due to credit failure")
                if reverseErr := reverseDebit(err.InputData); reverseErr != nil {
                    // Critical - manual intervention needed
                    alerting.CriticalAlert("Failed to reverse debit", err.InputData, reverseErr)
                }
            }

            return nil
        },
    ),
)

Advanced Error Flows

// Define identities upfront
var (
    ParallelRecoveryID = pipz.NewIdentity("parallel-recovery", "Parallel error handling and recovery")
    LogID              = pipz.NewIdentity("log", "Logs error to central system")
    MetricsID          = pipz.NewIdentity("metrics", "Updates error dashboard")
    BackupID           = pipz.NewIdentity("backup", "Saves failed request")
    NotifyID           = pipz.NewIdentity("notify", "Notifies on-call team")
)

// Parallel error handling
parallelRecovery := pipz.NewHandle(ParallelRecoveryID,
    mainProcess,
    pipz.NewConcurrent[*pipz.Error[Data]](
        pipz.Effect(LogID, logToCentralSystem),
        pipz.Effect(MetricsID, updateDashboard),
        pipz.Effect(BackupID, saveFailedRequest),
        pipz.Apply(NotifyID, notifyOnCallTeam),
    ),
)

// Define identities for nested handling
var (
    OuterID    = pipz.NewIdentity("outer", "Outer error handler")
    InnerID    = pipz.NewIdentity("inner", "Inner error handler")
    InnerLogID = pipz.NewIdentity("inner-log", "Logs inner errors")
    OuterLogID = pipz.NewIdentity("outer-log", "Logs outer errors")
    BatchID    = pipz.NewIdentity("batch", "Batch processing with error aggregation")
    CollectID  = pipz.NewIdentity("collect", "Collects and aggregates errors")
)

// Nested error handling
nestedHandling := pipz.NewHandle(OuterID,
    pipz.NewHandle(InnerID,
        riskyOperation,
        pipz.Effect(InnerLogID, logInnerError),
    ),
    pipz.Effect(OuterLogID,
        func(ctx context.Context, err *pipz.Error[Data]) error {
            // This catches errors from both riskyOperation and inner-log
            log.Printf("Outer handler: %v", err)
            return nil
        },
    ),
)

// Error aggregation for batch processing
batchErrors := pipz.NewHandle(BatchID,
    batchProcessor,
    pipz.Apply(CollectID,
        func(ctx context.Context, err *pipz.Error[Batch]) (*pipz.Error[Batch], error) {
            errorCollector.Add(err)
            if errorCollector.Count() > errorThreshold {
                // Trigger batch error recovery
                triggerBatchRecovery(errorCollector.GetAll())
            }
            return err, nil
        },
    ),
)

Gotchas

❌ Don't use Handle to suppress errors

// Define identities upfront
var (
    SuppressID = pipz.NewIdentity("suppress", "Attempt to suppress errors")
    LogID      = pipz.NewIdentity("log", "Logs error")
)

// WRONG - Handle doesn't suppress errors
handle := pipz.NewHandle(SuppressID,
    failingProcessor,
    pipz.Effect(LogID, logError), // Error still propagates!
)

✅ Use Fallback to recover from errors

// Define identity upfront
var RecoverID = pipz.NewIdentity("recover", "Recovery with fallback")

// RIGHT - Fallback provides recovery
fallback := pipz.NewFallback(RecoverID,
    failingProcessor,
    defaultProcessor, // This runs on error
)

❌ Don't ignore handler errors in critical cleanup

// WRONG - Critical cleanup might fail silently
handle := pipz.NewHandle(
    pipz.NewIdentity("cleanup", "Cleanup with ignored errors"),
    processor,
    pipz.Effect(
        pipz.NewIdentity("release", "Releases resources"),
        func(ctx context.Context, err *pipz.Error[Data]) error {
            return releaseResources() // Error is ignored!
        },
    ),
)

✅ Log critical cleanup failures

// RIGHT - Track cleanup failures
handle := pipz.NewHandle(
    pipz.NewIdentity("cleanup", "Cleanup with error tracking"),
    processor,
    pipz.Effect(
        pipz.NewIdentity("release", "Releases resources with logging"),
        func(ctx context.Context, err *pipz.Error[Data]) error {
            if releaseErr := releaseResources(); releaseErr != nil {
                log.Printf("CRITICAL: Failed to release resources: %v", releaseErr)
                alerting.SendCritical("Resource leak", releaseErr)
            }
            return nil // Handler errors don't affect flow anyway
        },
    ),
)

Best Practices

// Clear separation of concerns
// GOOD: Handle for cleanup, Fallback for recovery
var WithRecoveryID = pipz.NewIdentity("with-recovery", "Pattern with cleanup and recovery")
goodPattern := pipz.NewFallback(WithRecoveryID,
    pipz.NewHandle(
        pipz.NewIdentity("with-cleanup", "Operation with cleanup"),
        riskyOperation,
        pipz.Effect(
            pipz.NewIdentity("cleanup", "Cleans up resources on failure"),
            func(ctx context.Context, err *pipz.Error[Data]) error {
                // Clean up resources
                cleanup(err.InputData)
                return nil
            },
        ),
    ),
    fallbackOperation,  // This provides the recovery
)

// Resource management
// GOOD: Always clean up acquired resources
fileProcessor := pipz.NewHandle(
    pipz.NewIdentity("file-processing", "Processes file with cleanup"),
    pipz.Apply(
        pipz.NewIdentity("process", "Processes file"),
        func(ctx context.Context, path string) (Result, error) {
            file, err := os.Open(path)
            if err != nil {
                return Result{}, err
            }
            defer file.Close()
            // ... processing ...
        },
    ),
    pipz.Effect(
        pipz.NewIdentity("cleanup-temp", "Cleans up temporary files"),
        func(ctx context.Context, err *pipz.Error[string]) error {
            // Clean up any temporary files created
            tempPath := filepath.Join(os.TempDir(), filepath.Base(err.InputData))
            os.Remove(tempPath)
            return nil
        },
    ),
)

// Comprehensive monitoring
// GOOD: Collect all relevant metrics
monitoredService := pipz.NewHandle(
    pipz.NewIdentity("monitored", "Service with comprehensive monitoring"),
    externalService,
    pipz.Effect(
        pipz.NewIdentity("metrics", "Records service metrics and errors"),
        func(ctx context.Context, err *pipz.Error[Request]) error {
            labels := map[string]string{
                "service": "external",
                "method":  err.InputData.Method,
                "error":   errorType(err.Err),
            }

            metrics.RecordError(labels)
            metrics.RecordLatency(err.Duration, labels)

            if err.Timeout {
                metrics.RecordTimeout(labels)
            }

            return nil
        },
    ),
)

See Also

  • Fallback - For simple primary/backup patterns
  • Retry - For retry logic
  • Switch - Often used within error handlers
  • Concurrent - For parallel error handling