Safety and Reliability Guide
pipz is designed with safety and reliability as core principles. This guide covers the built-in protections that make your pipelines robust in production environments.
Automatic Panic Recovery
Every processor and connector in pipz includes comprehensive panic recovery. This is not optional - it's always enabled to ensure your applications never crash due to unexpected panics in pipeline processors.
What Gets Protected
Complete Coverage:
- All processors:
Apply,Transform,Effect,Mutate,Enrich - All connectors:
Sequence,Concurrent,WorkerPool,Scaffold,Switch,Fallback,Race,Contest,Retry,Backoff,Timeout,Handle,Filter,RateLimiter,CircuitBreaker - All user-defined functions passed to processors
- All condition functions, transformation functions, and side-effect functions
How It Works
// Define identity upfront
var RiskyID = pipz.NewIdentity("risky", "Processor that may panic")
// ANY panic in this function will be automatically recovered
riskyProcessor := pipz.Apply(RiskyID, func(ctx context.Context, data string) (string, error) {
// All of these panics are automatically handled:
if data == "bounds" {
arr := []int{1, 2, 3}
return fmt.Sprintf("%d", arr[10]), nil // Index out of bounds - RECOVERED
}
if data == "nil" {
var ptr *string
return *ptr, nil // Nil pointer dereference - RECOVERED
}
if data == "assert" {
var val interface{} = "not a number"
num := val.(int) // Type assertion failure - RECOVERED
return fmt.Sprintf("%d", num), nil
}
if data == "explicit" {
panic("something went wrong!") // Explicit panic - RECOVERED
}
return data, nil
})
// Use it safely - panics become regular errors
result, err := riskyProcessor.Process(ctx, "nil")
if err != nil {
fmt.Printf("Caught panic as error: %v\n", err)
// Output: risky failed after 123μs: panic in processor "risky": panic occurred: runtime error: invalid memory address or nil pointer dereference
}
Security Sanitization
Panic messages often contain sensitive information that shouldn't be exposed in logs or error responses. pipz automatically sanitizes panic messages to prevent information leakage:
Memory Address Sanitization:
// Original panic: "invalid memory access at 0x1234567890abcdef"
// Sanitized: "panic occurred: invalid memory access at 0x***"
File Path Sanitization:
// Original panic: "error in /sensitive/internal/path/file.go:123"
// Sanitized: "panic occurred (file path sanitized)"
Stack Trace Sanitization:
// Original panic: "error\ngoroutine 1 [running]:\nruntime.main()\n..."
// Sanitized: "panic occurred (stack trace sanitized)"
Length Limiting:
// Very long panic messages (>200 chars) are truncated:
// "panic occurred (message truncated for security)"
Sanitization Patterns:
The sanitization process applies these specific patterns:
- Memory addresses: Replaces hex sequences after
0xwith0x*** - File paths: Detects
/or\characters and replaces entire message - Stack traces: Detects
goroutineorruntime.keywords and replaces entire message - Length limit: Messages longer than 200 characters are truncated
- Nil panics: Converts nil panic values to
"unknown panic (nil value)"
Performance Impact
Panic recovery is implemented with minimal performance overhead:
- Cost: ~20 nanoseconds per operation (measured via benchmarks)
- Implementation: Uses Go's built-in
deferandrecover()mechanisms - Zero allocation in the normal (no-panic) path
- Always enabled: No build tags or configuration options to minimize complexity
Real-World Example
// Processing user data that might come from untrusted sources
processUserInput := pipz.Apply("parse-input", func(ctx context.Context, input string) (UserData, error) {
// Third-party JSON library might panic on malformed input
var userData UserData
if err := someJSONLibrary.Unmarshal([]byte(input), &userData); err != nil {
return userData, err
}
// Array access that might panic if data is unexpected
if len(userData.Scores) > 0 {
userData.AverageScore = userData.Scores[0] // Could panic if Scores is nil
}
return userData, nil
})
// Define identity upfront
var UserPipelineID = pipz.NewIdentity("user-pipeline", "User input processing pipeline")
// Even with malicious or malformed input, your application won't crash
pipeline := pipz.NewSequence(UserPipelineID,
processUserInput,
validateUserData,
enrichWithDefaults,
)
// Malicious input that would normally crash your application
maliciousInput := `{"scores": null, "data": "` + strings.Repeat("x", 100000) + `"}`
result, err := pipeline.Process(ctx, maliciousInput)
if err != nil {
// The application continues running, panic is converted to error
log.Printf("Input processing failed safely: %v", err)
return handleBadInput(err)
}
Error Handling Integration
Panic recovery integrates seamlessly with pipz's error handling system:
Error Path and Context
result, err := panickyPipeline.Process(ctx, data)
if err != nil {
var pipeErr *pipz.Error[DataType]
if errors.As(err, &pipeErr) {
fmt.Printf("Pipeline: %s\n", strings.Join(pipeErr.Path, " -> "))
fmt.Printf("Duration: %v\n", pipeErr.Duration)
fmt.Printf("Input data: %+v\n", pipeErr.InputData)
// Check if the underlying error is a panic
if strings.Contains(pipeErr.Error(), "panic in processor") {
fmt.Println("This was a recovered panic")
}
}
}
Error Recovery Patterns
Use Handle to process panic errors specifically:
// Define identities upfront
var (
ResilientID = pipz.NewIdentity("resilient", "Resilient pipeline with panic handling")
PanicHandlerID = pipz.NewIdentity("panic-handler", "Handles panics from risky processor")
LogPanicID = pipz.NewIdentity("log-panic", "Logs panic errors")
)
pipeline := pipz.NewSequence(ResilientID,
riskyProcessor,
pipz.NewHandle(PanicHandlerID, nextProcessor,
pipz.Effect(LogPanicID, func(ctx context.Context, err *pipz.Error[DataType]) error {
if strings.Contains(err.Error(), "panic in processor") {
log.Printf("ALERT: Panic recovered in %s: %v",
strings.Join(err.Path, "->"), err.Err)
// Send to monitoring system, etc.
}
return nil
}),
),
)
Reliability Through Layered Protection
Combine panic recovery with other reliability patterns for maximum resilience:
// Define identities upfront
var (
FortressID = pipz.NewIdentity("fortress", "Multi-layered protection pipeline")
ValidateID = pipz.NewIdentity("validate", "Input validation")
TimeoutID = pipz.NewIdentity("timeout", "Timeout protection")
RetryID = pipz.NewIdentity("retry", "Retry on transient failures")
CircuitID = pipz.NewIdentity("circuit", "Circuit breaker for cascading failures")
FallbackID = pipz.NewIdentity("fallback", "Fallback for persistent failures")
)
// Multi-layered protection
resilientPipeline := pipz.NewSequence(FortressID,
pipz.Apply(ValidateID, validateInput), // Input validation
pipz.NewTimeout(TimeoutID, // Timeout protection
pipz.NewRetry(RetryID, // Retry on transient failures
pipz.NewCircuitBreaker(CircuitID, // Circuit breaker for cascading failures
pipz.NewFallback(FallbackID, // Fallback for persistent failures
riskyMainProcessor, // Main logic (protected by panic recovery)
safeDefaultProcessor, // Safe fallback (also protected by panic recovery)
),
),
),
30*time.Second,
),
)
In this architecture:
- Input validation prevents known bad data
- Timeout prevents hanging operations
- Retry handles transient failures
- Circuit breaker prevents cascade failures
- Fallback provides alternate processing paths
- Panic recovery (automatic) catches unexpected failures
- Rich error context helps with debugging and monitoring
Monitoring and Observability
Track panic occurrences for operational insights:
// Define identities upfront
var (
MonitoredID = pipz.NewIdentity("monitored", "Pipeline with panic monitoring")
MonitorID = pipz.NewIdentity("monitor", "Monitor and track panics")
TrackPanicsID = pipz.NewIdentity("track-panics", "Tracks panic occurrences")
)
var panicCounter int64
monitoredPipeline := pipz.NewSequence(MonitoredID,
riskyProcessor,
pipz.NewHandle(MonitorID, continueProcessor,
pipz.Effect(TrackPanicsID, func(ctx context.Context, err *pipz.Error[Data]) error {
if strings.Contains(err.Error(), "panic in processor") {
atomic.AddInt64(&panicCounter, 1)
// Extract processor name from panic error
parts := strings.Split(err.Error(), "panic in processor ")
if len(parts) > 1 {
processorName := strings.Split(parts[1], ":")[0]
metrics.IncrementCounter("pipz.panics", map[string]string{
"processor": processorName,
"pipeline": strings.Join(err.Path[:len(err.Path)-1], "->"),
})
}
}
return nil
}),
),
)
Best Practices for Safety
1. Trust the Safety Net, But Don't Abuse It
// Define identities upfront
var (
ParseID = pipz.NewIdentity("parse", "Parses input data")
BadID = pipz.NewIdentity("bad", "Bad processor using panic for flow control")
)
// Good - Panic recovery handles unexpected failures
processor := pipz.Apply(ParseID, func(ctx context.Context, data string) (Result, error) {
result, err := someLibrary.Parse(data)
return result, err // Library might panic, that's handled
})
// Bad - Don't use panic recovery as flow control
badProcessor := pipz.Apply(BadID, func(ctx context.Context, data string) (Result, error) {
if data == "invalid" {
panic("invalid data") // Use proper error returns instead!
}
return Result{}, nil
})
2. Combine with Proper Error Handling
// Define identity upfront
var GoodID = pipz.NewIdentity("good", "Good processor with proper error handling")
// Good - Handle expected errors properly, let panic recovery handle unexpected ones
goodProcessor := pipz.Apply(GoodID, func(ctx context.Context, data string) (Result, error) {
if data == "" {
return Result{}, errors.New("empty input") // Expected error
}
// Unexpected panics from third-party code are automatically handled
return complexThirdPartyOperation(data), nil
})
3. Log and Monitor Panics
// Define identity upfront
var PanicMonitorID = pipz.NewIdentity("panic-monitor", "Monitors and logs panics")
// Set up monitoring to track panic frequency
panicMonitor := pipz.Effect(PanicMonitorID, func(ctx context.Context, err *pipz.Error[Data]) error {
if strings.Contains(err.Error(), "panic in processor") {
log.WithFields(log.Fields{
"processor": extractProcessorName(err),
"path": strings.Join(err.Path, "->"),
"duration": err.Duration,
"input": fmt.Sprintf("%+v", err.InputData),
}).Warn("Panic recovered in pipeline")
}
return nil
})
4. Test Panic Scenarios
func TestPanicRecovery(t *testing.T) {
// Define identity upfront
var TestPanicID = pipz.NewIdentity("test-panic", "Test processor that panics")
panicProcessor := pipz.Transform(TestPanicID, func(ctx context.Context, data string) string {
if data == "panic" {
panic("test panic")
}
return data
})
result, err := panicProcessor.Process(context.Background(), "panic")
// Verify panic was recovered as error
assert.Error(t, err)
assert.Contains(t, err.Error(), "panic in processor")
assert.Equal(t, "", result) // Result should be zero value
// Verify normal operation still works
result, err = panicProcessor.Process(context.Background(), "normal")
assert.NoError(t, err)
assert.Equal(t, "normal", result)
}
Production Deployment Considerations
Security
- Information Leakage Prevention: Panic sanitization prevents accidental exposure of internal details
- Safe Defaults: Failed operations return zero values, preventing undefined behavior
- Attack Surface Reduction: Malformed input cannot crash your application through panics
Reliability
- Graceful Degradation: Panics become errors in the normal error handling flow
- Service Continuity: One panicking operation doesn't crash the entire service
- Error Context: Full path and input data for debugging (timing not tracked for panics)
Performance
- Minimal Overhead: ~20ns per operation is typically negligible
- No Allocations: Panic recovery path only allocates when panics actually occur
- Predictable Behavior: Same error handling patterns whether errors are panics or regular errors
Troubleshooting Panic-Related Issues
High Panic Frequency
If you're seeing many panics in your logs:
- Identify the source: Use error path information to pinpoint problematic processors
- Review input validation: Ensure data is properly validated before processing
- Check third-party libraries: Some libraries may panic on edge cases
- Consider upstream changes: Has input data format changed recently?
Performance Impact
If panic recovery is impacting performance:
- Benchmark without panics: ~20ns overhead should be negligible in most cases
- Check panic frequency: High panic rates indicate underlying issues
- Profile allocation: Ensure panics aren't causing excessive allocations
- Consider alternatives: If panics are frequent, address root causes rather than relying on recovery
Debugging Sanitized Messages
If you need more details from sanitized panic messages for debugging:
- Use development logging: Log full errors in development environments
- Add debug processors: Wrap risky operations with additional logging
- Structured error handling: Return structured errors instead of relying on panics
- Unit test edge cases: Identify and test specific panic scenarios
The goal is to combine automatic safety with proper engineering practices for maximum reliability.