zoobzio December 12, 2025 Edit this page

pipz Quick Reference

Processor Selection (10 seconds)

If you need to...Use...Can fail?Example
Transform data (pure)TransformNostrings.ToUpper
Transform data (can fail)ApplyYesParse JSON, validate
Side effects onlyEffectYesLogging, metrics
Conditional changesMutateNoFeature flags
Optional enhancementEnrichLogs errorsAdd metadata
Pass/block dataFilterNoAccess control

Connector Decision (10 seconds)

Need parallel? ──No──→ Need conditions? ──No──→ Sequence
      │                       │
      │                      Yes → Switch
      │
     Yes → Bounded? ──Yes──→ WorkerPool
            │
           No → Need all results? ──No──→ Fastest? → Race
                      │                        │
                      │                       Best? → Contest
                      │
                     Yes → Fire & forget? ──Yes──→ Scaffold
                                 │
                                No → Concurrent

Common Patterns (Copy & Paste)

Basic Pipeline

// Define identities
var (
    PipelineID  = pipz.NewIdentity("name", "Pipeline description")
    ValidateID  = pipz.NewIdentity("validate", "Validate input data")
    NormalizeID = pipz.NewIdentity("normalize", "Normalize data format")
    LogID       = pipz.NewIdentity("log", "Log processed data")
)

pipeline := pipz.NewSequence[T](PipelineID,
    pipz.Apply(ValidateID, validateFunc),
    pipz.Transform(NormalizeID, normalizeFunc),
    pipz.Effect(LogID, logFunc),
)

Retry with Backoff

var APIID = pipz.NewIdentity("api", "Retry API calls on failure")

reliable := pipz.NewRetry(APIID, apiCall, 3)

Circuit Breaker

var ServiceID = pipz.NewIdentity("service", "Protect downstream service")

protected := pipz.NewCircuitBreaker(ServiceID, processor, 5, 30*time.Second).
    SetSuccessThreshold(2) // Optional: require 2 successes to close from half-open

Rate Limiting (Singleton!)

var (
    RateLimitID = pipz.NewIdentity("api", "Limit API request rate")
    ProcessorID = pipz.NewIdentity("processor", "The rate-limited processor")
    limiter     = pipz.NewRateLimiter(RateLimitID, 100, 10, // 100/sec, burst 10
                      pipz.Apply(ProcessorID, processFunc)).
                      SetMode("wait") // Or "drop"
)

Timeout Protection

var SlowID = pipz.NewIdentity("slow", "Enforce timeout on slow operations")

bounded := pipz.NewTimeout(SlowID, processor, 5*time.Second)

Fallback on Error

var SafeID = pipz.NewIdentity("safe", "Fallback to safe default")

safe := pipz.NewFallback(SafeID, riskyOp, safeDefault)

Parallel Processing

var (
    NotifyID  = pipz.NewIdentity("notify", "Send parallel notifications")
    LimitedID = pipz.NewIdentity("limited", "Process with worker pool")
)

// Unbounded - Type must implement Cloner[T]
parallel := pipz.NewConcurrent[T](NotifyID,
    sendEmail, sendSMS, logEvent,
)

// Bounded - limit to N concurrent operations
pool := pipz.NewWorkerPool[T](LimitedID, 5,
    apiCall1, apiCall2, apiCall3, // ... many more
)

Conditional Routing

var RouteID = pipz.NewIdentity("route", "Route by customer tier")

router := pipz.NewSwitch[T](RouteID, routeFunc).
    AddRoute("premium", premiumPipeline).
    AddRoute("standard", standardPipeline)

Resilient API Call

var (
    RateID    = pipz.NewIdentity("rate", "Rate limit API calls")
    BreakerID = pipz.NewIdentity("breaker", "Protect API with circuit breaker")
    TimeoutID = pipz.NewIdentity("timeout", "Enforce API timeout")
    RetryID   = pipz.NewIdentity("retry", "Retry failed API calls")
)

api := pipz.NewRateLimiter(RateID, 100, 10, // 100/sec, burst 10
    pipz.NewCircuitBreaker(BreakerID,
        pipz.NewTimeout(TimeoutID,
            pipz.NewRetry(RetryID, apiCall, 3),
            5*time.Second,
        ),
        5, 30*time.Second, // 5 failures, 30s reset
    ),
)

Error Pipeline

var (
    ErrorsID   = pipz.NewIdentity("errors", "Handle pipeline errors")
    LogErrorID = pipz.NewIdentity("log", "Log error details")
    ClassifyID = pipz.NewIdentity("classify", "Classify error type")
    RecoverID  = pipz.NewIdentity("recover", "Select recovery strategy")
)

errorHandler := pipz.NewSequence[*pipz.Error[T]](ErrorsID,
    pipz.Effect(LogErrorID, logError),
    pipz.Apply(ClassifyID, classifyError),
    pipz.Switch(RecoverID, selectRecovery),
)

Pipeline with Tracing

var (
    OrderPipelineID = pipz.NewIdentity("order-processing", "Main order flow")
    InternalSeqID   = pipz.NewIdentity("order-steps", "Processing sequence")
)

// Wrap any chainable with execution context
pipeline := pipz.NewPipeline(OrderPipelineID,
    pipz.NewSequence(InternalSeqID, validate, enrich, save),
)

// Extract IDs in processors or signal handlers
func myProcessor(ctx context.Context, data T) (T, error) {
    execID, _ := pipz.ExecutionIDFromContext(ctx)  // Unique per call
    pipeID, _ := pipz.PipelineIDFromContext(ctx)   // Stable per pipeline
    // ...
}

Clone Implementation

type Data struct {
    Items []Item
    Meta  map[string]string
}

func (d Data) Clone() Data {
    // Deep copy slices
    items := make([]Item, len(d.Items))
    copy(items, d.Items)

    // Deep copy maps
    meta := make(map[string]string, len(d.Meta))
    for k, v := range d.Meta {
        meta[k] = v
    }

    return Data{Items: items, Meta: meta}
}

Error Handling

Access Error Details

result, err := pipeline.Process(ctx, data)
if err != nil {
    var pipeErr *pipz.Error[T]
    if errors.As(err, &pipeErr) {
        fmt.Printf("Failed at: %v\n", pipeErr.Path)
        fmt.Printf("Cause: %v\n", pipeErr.Err)
        fmt.Printf("Data state: %+v\n", pipeErr.InputData)
        fmt.Printf("Duration: %v\n", pipeErr.Duration)
        if pipeErr.Timeout {
            fmt.Println("Operation timed out")
        }
    }
}

Handle Specific Errors

var (
    RecoverErrorsID = pipz.NewIdentity("recover", "Recover from specific errors")
    HandleErrorID   = pipz.NewIdentity("handle", "Handle temporary errors")
)

pipz.NewHandle(RecoverErrorsID, pipeline,
    pipz.Effect(HandleErrorID, func(ctx context.Context, pipeErr *pipz.Error[T]) error {
        if errors.Is(pipeErr.Err, ErrTemporary) {
            log.Printf("Temporary error at %v: %v", pipeErr.Path, pipeErr.Err)
            // Perform cleanup or notification
        }
        return nil // Handler errors don't affect flow
    }),
)

Testing Patterns

Mock Processor

type Mock[T any] struct {
    identity pipz.Identity
    Returns  T
    Error    error
}

func NewMock[T any](id pipz.Identity, returns T, err error) *Mock[T] {
    return &Mock[T]{identity: id, Returns: returns, Error: err}
}

func (m *Mock[T]) Process(ctx context.Context, data T) (T, error) {
    return m.Returns, m.Error
}

func (m *Mock[T]) Identity() pipz.Identity { return m.identity }
func (m *Mock[T]) Schema() pipz.Node       { return pipz.Node{Identity: m.identity, Type: "mock"} }
func (m *Mock[T]) Close() error            { return nil }

Test Error Location

_, err := pipeline.Process(ctx, data)
var pipeErr *pipz.Error[T]
if errors.As(err, &pipeErr) {
    // Path contains the full chain of identities, last element is where failure occurred
    lastID := pipeErr.Path[len(pipeErr.Path)-1]
    assert.Equal(t, "expected-stage", lastID.Name)
}

Gotchas & Tips

❌ Common Mistakes

  1. Creating rate limiters per request
// WRONG - New instance each time
func handle(req Request) {
    limiter := pipz.NewRateLimiter(...) // ❌
}
  1. Shallow copying in Clone()
// WRONG - Shares slice memory
func (d Data) Clone() Data {
    return Data{Items: d.Items} // ❌
}
  1. Not checking context in long operations
// WRONG - Ignores cancellation
func process(ctx context.Context, data T) (T, error) {
    time.Sleep(10 * time.Second) // ❌
    return data, nil
}

✅ Best Practices

  1. Singleton rate limiters
// RIGHT - Shared instance
var limiter = pipz.NewRateLimiter(...) // ✅
  1. Deep copy in Clone()
// RIGHT - New memory
func (d Data) Clone() Data {
    items := make([]Item, len(d.Items))
    copy(items, d.Items) // ✅
    return Data{Items: items}
}
  1. Respect context
// RIGHT - Cancellable
func process(ctx context.Context, data T) (T, error) {
    select {
    case <-time.After(10 * time.Second):
        return data, nil
    case <-ctx.Done():
        return data, ctx.Err() // ✅
    }
}

Type Constraints

ConnectorRequires
ConcurrentT implements Cloner[T]
RaceT implements Cloner[T]
ContestT implements Cloner[T]
All othersAny type T

Performance Tips

  • Sequence: O(n) - minimize processor count
  • Concurrent: Overhead from goroutines - use for expensive operations
  • Transform: Zero allocations - prefer over Apply when possible
  • Effect: Use for metrics/logging without data copy
  • Switch: Single branch execution - efficient routing

Context Patterns

Add request ID

ctx := context.WithValue(ctx, "request-id", uuid.New())

Set timeout

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

Check cancellation

select {
case <-ctx.Done():
    return ctx.Err()
default:
    // Continue processing
}