Troubleshooting Guide
This guide helps diagnose and resolve common issues when working with pipz pipelines. Start with the Common Gotchas section for the most frequent mistakes.
Common Gotchas (Quick Reference)
These are the most common mistakes that cause bugs in pipz pipelines:
1. Creating Rate Limiters or Circuit Breakers Per Request
❌ WRONG:
func handleRequest(req Request) Response {
limiterID := pipz.NewIdentity("api", "API rate limiter")
limiter := pipz.NewRateLimiter(limiterID, 100, 10) // New instance each call!
return limiter.Process(ctx, req) // Useless!
}
✅ RIGHT:
// Package-level Identity and limiter - shared across all requests
var APILimiterID = pipz.NewIdentity("api", "API rate limiter")
var apiLimiter = pipz.NewRateLimiter(APILimiterID, 100, 10)
func handleRequest(req Request) Response {
return apiLimiter.Process(ctx, req)
}
2. Forgetting to Implement Clone() Properly
❌ WRONG:
func (d Data) Clone() Data {
return Data{Items: d.Items} // Shares slice memory!
}
✅ RIGHT:
func (d Data) Clone() Data {
items := make([]Item, len(d.Items))
copy(items, d.Items)
return Data{Items: items}
}
3. Using Transform for Operations That Can Fail
❌ WRONG:
var ParseID = pipz.NewIdentity("parse", "Parse JSON data")
transform := pipz.Transform(ParseID, func(ctx context.Context, s string) Data {
data, _ := json.Unmarshal([]byte(s), &Data{}) // Error ignored!
return data
})
✅ RIGHT:
var ParseID = pipz.NewIdentity("parse", "Parse JSON data")
apply := pipz.Apply(ParseID, func(ctx context.Context, s string) (Data, error) {
var data Data
err := json.Unmarshal([]byte(s), &data)
return data, err
})
4. Not Respecting Context Cancellation
❌ WRONG:
var SlowID = pipz.NewIdentity("slow", "Slow processor")
processor := pipz.Apply(SlowID, func(ctx context.Context, data Data) (Data, error) {
time.Sleep(10 * time.Second) // Ignores context!
return data, nil
})
✅ RIGHT:
var SlowID = pipz.NewIdentity("slow", "Slow processor")
processor := pipz.Apply(SlowID, func(ctx context.Context, data Data) (Data, error) {
select {
case <-time.After(10 * time.Second):
return data, nil
case <-ctx.Done():
return data, ctx.Err()
}
})
5. Using Race/Contest for Operations with Side Effects
❌ WRONG:
var (
PaymentsRaceID = pipz.NewIdentity("payments", "Race for payments (wrong!)")
StripeID = pipz.NewIdentity("stripe", "Charge via Stripe")
PayPalID = pipz.NewIdentity("paypal", "Charge via PayPal")
SquareID = pipz.NewIdentity("square", "Charge via Square")
)
race := pipz.NewRace(PaymentsRaceID,
pipz.Apply(StripeID, chargeStripe), // Charges!
pipz.Apply(PayPalID, chargePayPal), // Also charges!
pipz.Apply(SquareID, chargeSquare), // Triple charge!
)
✅ RIGHT:
var (
FetchRaceID = pipz.NewIdentity("fetch", "Race to fetch data")
CacheID = pipz.NewIdentity("cache", "Fetch from cache")
DBID = pipz.NewIdentity("db", "Fetch from database")
APIID = pipz.NewIdentity("api", "Fetch from API")
)
race := pipz.NewRace(FetchRaceID,
pipz.Apply(CacheID, fetchFromCache),
pipz.Apply(DBID, fetchFromDB),
pipz.Apply(APIID, fetchFromAPI),
)
6. Expecting Results from Concurrent
❌ WRONG:
var (
ModifyID = pipz.NewIdentity("modify", "Modify values (wrong!)")
DoubleID = pipz.NewIdentity("double", "Double the value")
)
concurrent := pipz.NewConcurrent(ModifyID, nil,
pipz.Transform(DoubleID, func(ctx context.Context, n int) int {
return n * 2 // Result is discarded without reducer!
}),
)
result, _ := concurrent.Process(ctx, 5)
// result is still 5, not 10!
✅ RIGHT:
var (
EffectsID = pipz.NewIdentity("effects", "Side effects")
LogID = pipz.NewIdentity("log", "Log data")
MetricsID = pipz.NewIdentity("metrics", "Update metrics")
)
concurrent := pipz.NewConcurrent(EffectsID, nil,
pipz.Effect(LogID, logData),
pipz.Effect(MetricsID, updateMetrics),
)
7. Using Switch Without Default for Unknown Cases
❌ WRONG:
var RouterID = pipz.NewIdentity("router", "Routes by type")
router := pipz.NewSwitch(RouterID,
func(ctx context.Context, data Data) string {
return data.Type // Could be anything!
},
).
AddRoute("A", processA).
AddRoute("B", processB)
// Missing routes will cause runtime errors!
✅ RIGHT:
var RouterID = pipz.NewIdentity("router", "Routes by type")
router := pipz.NewSwitch(RouterID,
func(ctx context.Context, data Data) string {
return data.Type
},
).
AddRoute("A", processA).
AddRoute("B", processB).
Default(processUnknown) // Safety net
8. Ignoring Pipeline Errors
❌ WRONG:
result, _ := pipeline.Process(ctx, data) // Error ignored!
processResult(result) // May be zero value!
✅ RIGHT:
result, err := pipeline.Process(ctx, data)
if err != nil {
var pipeErr *pipz.Error[Data]
if errors.As(err, &pipeErr) {
// Path contains the chain of Identity values
if len(pipeErr.Path) > 0 {
log.Printf("Failed at %s: %v", pipeErr.Path[len(pipeErr.Path)-1].Name(), pipeErr.Err)
}
}
return handleError(err)
}
processResult(result)
9. Nested Timeouts with Wrong Duration
❌ WRONG:
var (
OuterTimeoutID = pipz.NewIdentity("outer", "Outer timeout")
InnerTimeoutID = pipz.NewIdentity("inner", "Inner timeout")
)
timeout := pipz.NewTimeout(OuterTimeoutID,
pipz.NewTimeout(InnerTimeoutID, processor, 10*time.Second), // Longer!
5*time.Second, // Shorter - inner never gets full time
)
✅ RIGHT:
var (
OuterTimeoutID = pipz.NewIdentity("outer", "Outer timeout")
StepsID = pipz.NewIdentity("steps", "Sequential steps")
Step1TimeoutID = pipz.NewIdentity("step1", "Step 1 timeout")
Step2TimeoutID = pipz.NewIdentity("step2", "Step 2 timeout")
)
timeout := pipz.NewTimeout(OuterTimeoutID,
pipz.NewSequence(StepsID,
pipz.NewTimeout(Step1TimeoutID, step1, 5*time.Second),
pipz.NewTimeout(Step2TimeoutID, step2, 5*time.Second),
),
12*time.Second, // Accommodates both with buffer
)
10. Using Enrich for Required Operations
❌ WRONG:
var ValidateID = pipz.NewIdentity("validate", "Validate user")
enrich := pipz.Enrich(ValidateID, func(ctx context.Context, user User) (User, error) {
if !isValid(user) {
return user, errors.New("invalid") // Error is swallowed!
}
return user, nil
})
✅ RIGHT:
var ValidateID = pipz.NewIdentity("validate", "Validate user")
apply := pipz.Apply(ValidateID, func(ctx context.Context, user User) (User, error) {
if !isValid(user) {
return user, errors.New("invalid") // Fails the pipeline
}
return user, nil
})
Built-in Safety Features
Automatic Panic Recovery
Every processor and connector in pipz includes built-in panic recovery. This means:
- No crashes: Panics are automatically caught and converted to errors
- Security sanitization: Sensitive information (memory addresses, file paths, stack traces) is stripped from panic messages
- Rich error context: Panic errors include full pipeline path and input data (timing not tracked for panics)
- Always enabled: No configuration needed, minimal performance overhead (~20ns per operation)
- Complete coverage: All connectors (Sequence, Concurrent, WorkerPool, etc.) and all processors (Apply, Transform, Effect, etc.) are protected
// This will NOT crash your application
var RiskyID = pipz.NewIdentity("risky", "Risky processor that may panic")
panickyProcessor := pipz.Transform(RiskyID, func(ctx context.Context, data string) string {
if data == "boom" {
panic("something went wrong!") // Automatically recovered
}
return data
})
// The panic is converted to a proper error
result, err := panickyProcessor.Process(ctx, "boom")
if err != nil {
// err will be a *pipz.Error with sanitized panic message
fmt.Printf("Caught panic: %v\n", err)
// Output: risky failed after 0s: panic in processor "risky": panic occurred: something went wrong!
}
Security Benefits:
- Prevents accidental leakage of internal memory addresses
- Sanitizes file paths that might contain sensitive directory names
- Removes stack trace information that could expose internal structure
- Truncates excessively long panic messages
When You'll See This:
- Third-party library panics in your processor functions
- Array/slice bounds errors
- Nil pointer dereferences
- Type assertion failures
- Any other unexpected panics in user code
No Action Required: Panic recovery is automatic and always enabled. Your pipeline will continue running, treating panics as regular errors in the pipeline flow.
Common Issues and Solutions
1. Pipeline Execution Issues
Pipeline Stops Unexpectedly
Symptom: Pipeline execution halts without clear error message.
Possible Causes:
- Context cancellation or timeout
- Panic in a processor function (automatically recovered by pipz)
- Deadlock in concurrent operations
Solutions:
// Check for context issues
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := pipeline.Process(ctx, data)
if err != nil {
// Check if it's a context error
if errors.Is(err.Cause, context.DeadlineExceeded) {
log.Println("Pipeline timed out")
} else if errors.Is(err.Cause, context.Canceled) {
log.Println("Pipeline was cancelled")
}
// Check if it's a panic that was automatically recovered
var panicErr *pipz.Error[T]
if errors.As(err, &panicErr) {
// Look for panic error in the chain
if strings.Contains(panicErr.Error(), "panic in processor") {
log.Printf("Processor panic was automatically recovered: %v", panicErr.Err)
}
}
}
// NOTE: Manual panic recovery is unnecessary - pipz handles all panics automatically
// All processors and connectors include built-in panic recovery with security sanitization
Pipeline Returns Unexpected Results
Symptom: Output doesn't match expected transformation.
Possible Causes:
- Processors registered in wrong order
- Mutation of shared state
- Missing Clone() implementation for concurrent processing
Solutions:
// Verify processor order
var PipelineID = pipz.NewIdentity("pipeline", "Data processing pipeline")
seq := pipz.NewSequence[Data](PipelineID)
seq.Iterate(func(p pipz.Chainable[Data]) bool {
fmt.Printf("Processor: %s\n", p.Name())
return true // continue iteration
})
// Ensure proper cloning for concurrent operations
type Data struct {
Values []int
}
func (d Data) Clone() Data {
// Deep copy slice to prevent shared state
newValues := make([]int, len(d.Values))
copy(newValues, d.Values)
return Data{Values: newValues}
}
2. Memory and Performance Issues
High Memory Usage
Symptom: Memory consumption grows unexpectedly.
Possible Causes:
- Large data structures not being garbage collected
- Goroutine leaks in concurrent processors
- Buffered channels not being drained
Solutions:
// Use streaming for large datasets
func processLargeDataset(ctx context.Context, reader io.Reader) error {
scanner := bufio.NewScanner(reader)
pipeline := createPipeline()
for scanner.Scan() {
data := parseData(scanner.Text())
result, err := pipeline.Process(ctx, data)
if err != nil {
return err
}
// Process result immediately, don't accumulate
handleResult(result)
}
return scanner.Err()
}
// Ensure goroutines are properly cleaned up
var ParallelID = pipz.NewIdentity("parallel", "Parallel processing")
concurrent := pipz.NewConcurrent(ParallelID, nil, processors...)
// Always use context with timeout/cancellation
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
result, err := concurrent.Process(ctx, data)
Slow Pipeline Execution
Symptom: Pipeline takes longer than expected to complete.
Possible Causes:
- Sequential processing of independent operations
- Inefficient processor implementations
- Excessive context switching in concurrent operations
Solutions:
// Profile to identify bottlenecks
import _ "net/http/pprof"
// In your main:
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
// Use concurrent processing for independent operations
// Instead of:
var SlowSeqID = pipz.NewIdentity("slow", "Sequential processing")
sequential := pipz.NewSequence(SlowSeqID,
fetchUserData, // 100ms
fetchOrderData, // 150ms
fetchInventory, // 200ms
)
// Use:
var FastConcID = pipz.NewIdentity("fast", "Concurrent processing")
concurrent := pipz.NewConcurrent(FastConcID, nil,
fetchUserData,
fetchOrderData,
fetchInventory,
)
// Total time: ~200ms instead of 450ms
3. Error Handling Issues
Errors Not Being Caught
Symptom: Errors pass through without being handled.
Possible Causes:
- Using Transform instead of Apply for fallible operations
- Not checking error returns
- Enrich processor swallowing errors
Solutions:
// Use Apply for operations that can fail
var ParseID = pipz.NewIdentity("parse", "Parse JSON data")
// Wrong:
processor := pipz.Transform(ParseID, func(ctx context.Context, s string) Data {
var d Data
json.Unmarshal([]byte(s), &d) // Error ignored!
return d
})
// Correct:
processor := pipz.Apply(ParseID, func(ctx context.Context, s string) (Data, error) {
var d Data
err := json.Unmarshal([]byte(s), &d)
return d, err
})
// Always check pipeline errors
result, err := pipeline.Process(ctx, input)
if err != nil {
log.Printf("Pipeline failed at stage '%s': %v", err.Stage, err.Cause)
// Access the data state at failure
log.Printf("Data state at failure: %+v", err.State)
}
Error Recovery Not Working
Symptom: Fallback processors not executing on error.
Possible Causes:
- Incorrect Fallback configuration
- Error occurring after fallback point
- Context already cancelled
Solutions:
// Ensure fallback is properly configured
var (
RecoverID = pipz.NewIdentity("recover", "Recovery fallback")
DefaultID = pipz.NewIdentity("default", "Default value provider")
)
fallback := pipz.NewFallback(RecoverID,
primaryPipeline,
pipz.Transform(DefaultID, func(ctx context.Context, data T) T {
return getDefaultValue()
}),
)
// Check that context is still valid
if ctx.Err() != nil {
log.Printf("Context already cancelled: %v", ctx.Err())
}
4. Type Safety Issues
Compilation Errors with Generics
Symptom: "cannot infer T" or type mismatch errors.
Possible Causes:
- Type inference limitations
- Incompatible processor types in sequence
- Missing type parameters
Solutions:
// Explicitly specify type parameters when needed
// Instead of:
var PipelineID = pipz.NewIdentity("pipeline", "My pipeline")
seq := pipz.NewSequence(PipelineID) // Error: cannot infer T
// Use:
seq := pipz.NewSequence[MyDataType](PipelineID)
// Ensure all processors handle the same type
type User struct { /* fields */ }
type Order struct { /* fields */ }
// This won't compile:
var MixedID = pipz.NewIdentity("mixed", "Mixed type pipeline")
pipeline := pipz.NewSequence(MixedID,
processUser, // Chainable[User]
processOrder, // Chainable[Order] - Type mismatch!
)
// Use transformation to change types:
var (
CorrectID = pipz.NewIdentity("correct", "Correct type pipeline")
ConvertID = pipz.NewIdentity("convert", "Convert User to Order")
)
pipeline := pipz.NewSequence(CorrectID,
processUser,
pipz.Apply(ConvertID, func(ctx context.Context, u User) (Order, error) {
return convertUserToOrder(u)
}),
processOrder,
)
5. Concurrency Issues
Race Conditions
Symptom: Inconsistent results, data corruption, panics.
Possible Causes:
- Shared mutable state
- Missing Clone() implementation
- Unsafe concurrent access
Solutions:
// Implement proper cloning
type Data struct {
mu sync.Mutex // Don't include mutex in clone!
values map[string]int
}
func (d Data) Clone() Data {
newValues := make(map[string]int, len(d.values))
for k, v := range d.values {
newValues[k] = v
}
return Data{values: newValues}
}
// Use synchronization for shared resources
var (
counter int64
)
var CountID = pipz.NewIdentity("count", "Count requests")
processor := pipz.Effect(CountID, func(ctx context.Context, data T) error {
atomic.AddInt64(&counter, 1)
return nil
})
Deadlocks
Symptom: Pipeline hangs indefinitely.
Possible Causes:
- Circular dependencies
- Channel deadlocks
- Mutex ordering issues
Solutions:
// Always use timeouts
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Detect deadlocks with runtime checks
import _ "runtime/debug"
// In development:
runtime.SetBlockProfileRate(1)
// Avoid circular dependencies
// Bad:
proc1 := createProcessor(proc2) // proc1 depends on proc2
proc2 := createProcessor(proc1) // proc2 depends on proc1
// Good:
proc1 := createProcessor(nil)
proc2 := createProcessor(nil)
// Configure dependencies after creation if needed
6. Fallback Chain Stack Overflow
Symptom: Stack overflow errors during error handling, infinite recursion in fallback chains.
Example Stack Trace Pattern:
goroutine 1 [running]:
runtime.main()
fallback.Process(ctx, data)
fallback.Process(ctx, data) // Same fallback called recursively
fallback.Process(ctx, data)
...
runtime: goroutine stack exceeds 1000000000-byte limit
Possible Causes:
- Circular references in fallback chains
- Fallback A points to Fallback B which points back to Fallback A
- Complex multi-level circular dependencies
Solutions:
// Identify the circular dependency
// Look for patterns like:
var (
F1ID = pipz.NewIdentity("f1", "Fallback 1")
F2ID = pipz.NewIdentity("f2", "Fallback 2")
F3ID = pipz.NewIdentity("f3", "Fallback 3")
)
fallback1 := pipz.NewFallback(F1ID, proc1, fallback2)
fallback2 := pipz.NewFallback(F2ID, proc2, fallback3)
fallback3 := pipz.NewFallback(F3ID, proc3, fallback1) // ← Creates circle
// Fix: Use linear hierarchy instead
var (
PrimaryID = pipz.NewIdentity("primary", "Primary fallback")
SecondaryID = pipz.NewIdentity("secondary", "Secondary fallback")
)
fallback := pipz.NewFallback(PrimaryID,
proc1,
pipz.NewFallback(SecondaryID,
proc2,
proc3, // Terminal processor - no further fallbacks
),
)
Prevention: Design fallback chains as hierarchical trees, not circular graphs.
7. Circuit Breaker Issues
Circuit Breaker Not Opening
Symptom: Failed requests continue despite threshold.
Possible Causes:
- Threshold too high
- Window size too large
- Errors not properly propagated
Solutions:
var (
CBApiID = pipz.NewIdentity("api", "API circuit breaker")
ApiID = pipz.NewIdentity("api", "API call")
)
cb := pipz.NewCircuitBreaker(CBApiID,
apiProcessor,
5, // Open after 5 failures
time.Minute, // Cooldown period
)
// Ensure errors are returned, not swallowed
apiProcessor := pipz.Apply(ApiID, func(ctx context.Context, data T) (T, error) {
resp, err := callAPI(data)
if err != nil {
return data, err // Must return error for circuit breaker to count
}
if resp.StatusCode >= 500 {
return data, fmt.Errorf("server error: %d", resp.StatusCode)
}
return processResponse(resp)
})
7. Rate Limiter Issues
Rate Limiting Not Working
Symptom: Requests exceed configured rate.
Possible Causes:
- Incorrect rate configuration
- Multiple rate limiter instances
- Time synchronization issues
Solutions:
// Use a single rate limiter instance with package-level Identity
var RateLimiterID = pipz.NewIdentity("api", "API rate limiter")
var rateLimiter = pipz.NewRateLimiter(RateLimiterID, 100, 10)
// Don't create new instances per request
// Wrong:
func handleRequest(data T) (T, error) {
rlID := pipz.NewIdentity("api", "API rate limiter")
rl := pipz.NewRateLimiter(rlID, 100, 10) // New instance each time!
return rl.Process(ctx, data)
}
// Correct:
func handleRequest(data T) (T, error) {
return rateLimiter.Process(ctx, data) // Reuse single instance
}
Debugging Techniques
1. Pipeline Inspection
// Inspect pipeline structure
func inspectPipeline[T any](seq *pipz.Sequence[T]) {
fmt.Printf("Pipeline: %s\n", seq.Name())
seq.Iterate(func(p pipz.Chainable[T]) bool {
fmt.Printf(" - %s\n", p.Name())
return true
})
}
// Add debug logging
var (
DebugPipelineID = pipz.NewIdentity("debug", "Debug pipeline")
LogInputID = pipz.NewIdentity("log-input", "Log input data")
LogOutputID = pipz.NewIdentity("log-output", "Log output data")
)
debugPipeline := pipz.NewSequence(DebugPipelineID,
pipz.Effect(LogInputID, func(ctx context.Context, data T) error {
log.Printf("Input: %+v", data)
return nil
}),
actualProcessor,
pipz.Effect(LogOutputID, func(ctx context.Context, data T) error {
log.Printf("Output: %+v", data)
return nil
}),
)
2. Error Analysis
// Detailed error logging
result, err := pipeline.Process(ctx, data)
if err != nil {
var pipeErr *pipz.Error[T]
if errors.As(err, &pipeErr) {
log.Printf("Pipeline failed:")
// Path is []Identity - show the full path
var pathNames []string
for _, id := range pipeErr.Path {
pathNames = append(pathNames, id.Name())
}
log.Printf(" Path: %s", strings.Join(pathNames, " -> "))
log.Printf(" Error: %v", pipeErr.Err)
log.Printf(" Type: %T", pipeErr.Err)
log.Printf(" InputData: %+v", pipeErr.InputData)
// Check for specific error types
var validationErr *ValidationError
if errors.As(pipeErr.Err, &validationErr) {
log.Printf(" Validation failed: %s", validationErr.Field)
}
}
}
3. Performance Profiling
// Time individual processors
func timeProcessor[T any](id pipz.Identity, p pipz.Chainable[T]) pipz.Chainable[T] {
return pipz.Apply(id, func(ctx context.Context, data T) (T, error) {
start := time.Now()
result, err := p.Process(ctx, data)
duration := time.Since(start)
log.Printf("Processor %s took %v", p.Name(), duration)
return result, err
})
}
// Memory profiling
import (
"runtime"
"runtime/pprof"
)
func profileMemory() {
f, _ := os.Create("mem.prof")
defer f.Close()
runtime.GC()
pprof.WriteHeapProfile(f)
}
Getting Help
If you're still experiencing issues:
- Read the Tests: Test files often demonstrate edge cases and proper usage
- Enable Debug Logging: Set up detailed logging to trace execution
- Create a Minimal Reproduction: Isolate the issue in a small, reproducible example
- File an Issue: Report bugs at https://github.com/zoobzio/pipz/issues with:
- Go version
- pipz version
- Minimal code to reproduce
- Expected vs actual behavior
- Any error messages or stack traces
Common Patterns for Resilience
Defensive Pipeline Construction
// Define identities upfront
var (
ResilientPipelineID = pipz.NewIdentity("resilient", "Resilient pipeline")
ValidateInputID = pipz.NewIdentity("validate", "Validate input")
RateLimitID = pipz.NewIdentity("rate", "Rate limiter")
CircuitBreakerID = pipz.NewIdentity("circuit", "Circuit breaker")
TimeoutID = pipz.NewIdentity("timeout", "Timeout protection")
RetryID = pipz.NewIdentity("retry", "Retry logic")
RecoverFallbackID = pipz.NewIdentity("recover", "Error recovery fallback")
)
// Combine multiple resilience patterns
resilientPipeline := pipz.NewSequence(ResilientPipelineID,
// Input validation
pipz.Apply(ValidateInputID, validateInput),
// Rate limiting
pipz.NewRateLimiter(RateLimitID, 100, 10,
// Circuit breaker
pipz.NewCircuitBreaker(CircuitBreakerID,
// Timeout protection
pipz.NewTimeout(TimeoutID,
// Retry logic
pipz.NewRetry(RetryID, actualProcessor, 3),
5*time.Second,
),
5, time.Minute,
),
),
// Error recovery
pipz.NewFallback(RecoverFallbackID,
riskyProcessor,
safeDefault,
),
)
This layered approach provides multiple levels of protection against various failure modes.