zoobzio December 12, 2025 Edit this page

Core Concepts

The Pipeline Mental Model

Think of pipz as a conveyor belt for your data. Each processor is a station that transforms, validates, or enriches data as it passes through. Connectors determine how data flows between stations - sequentially, in parallel, or conditionally.

Pipeline Lifecycle & Data Flow

┌──────────────────────────────────────────────────────────────────┐
│                     Complete Pipeline Lifecycle                  │
└──────────────────────────────────────────────────────────────────┘

┌──────────┐      ┌──────────┐      ┌──────────┐      ┌──────────┐
│  Input   │─────→│ Validate │─────→│Transform │─────→│  Output  │
│   Data   │      │  Stage   │      │  Stage   │      │   Data   │
└──────────┘      └──────────┘      └──────────┘      └──────────┘
      │                 │                 │                 │
      ▼                 ▼                 ▼                 ▼
┌──────────┐      ┌──────────┐      ┌──────────┐      ┌──────────┐
│ Context  │      │  Error   │      │  Error   │      │  Result  │
│  State   │      │ Handling │      │ Handling │      │  State   │
└──────────┘      └──────────┘      └──────────┘      └──────────┘

Legend:
─────→  Data flow
  ▼     Context/Error propagation
  [✓]   Success state
  [✗]   Failure state

Data Flow Patterns

Sequential:  Input → [A] → [B] → [C] → Output
                      ↓     ↓     ↓
                    (T,e) (T,e) (T,e)

Parallel:    Input → ┌─[A]─┐
                     ├─[B]─┼→ Output
                     └─[C]─┘

Conditional: Input → [?] → [A] → Output
                      ↓
                     [B] → Output

Error Flow:  Input → [A] → [B] ✗ → Error[T]
                            ↓
                          Stop

The power comes from composition: small, focused functions combine into complex workflows. This isn't just functional programming - it's a structured way to organize business logic that scales from simple validations to distributed systems.

The Chainable Interface

Everything in pipz implements a single interface:

type Chainable[T any] interface {
    Process(context.Context, T) (T, error)
    Identity() Identity
    Schema() Node
    Close() error
}

This simplicity is deliberate. Any type implementing this interface can be:

  • Used in any pipeline
  • Composed with any other Chainable
  • Tested in isolation
  • Replaced at runtime

You can implement Chainable directly for custom needs, or use the provided wrappers for common patterns.

Processors: Transform Your Data

Processors are immutable functions that act on data. pipz provides wrappers for common patterns:

ProcessorPurposeCan Fail?Modifies Data?Example Use
TransformPure transformationNoYesFormatting, calculations
ApplyFallible transformationYesYesParsing, validation
EffectSide effectsYesNoLogging, metrics
MutateConditional modificationNoYes/NoFeature flags
EnrichOptional enhancementLogs errorsYesAdding metadata
HandleError observationYesNoCleanup, notifications

Example: Building a Validation Pipeline

// Define identities upfront
var (
    ValidateEmailID = pipz.NewIdentity("validate-email", "Validates email format")
    NormalizeDataID = pipz.NewIdentity("normalize-data", "Normalizes user data")
    AuditLogID      = pipz.NewIdentity("audit-log", "Logs validation events")
    UserFlowID      = pipz.NewIdentity("user-flow", "User validation flow")
)

// Create processors
validators := []pipz.Chainable[User]{
    pipz.Apply(ValidateEmailID, func(ctx context.Context, u User) (User, error) {
        if !strings.Contains(u.Email, "@") {
            return u, errors.New("invalid email")
        }
        return u, nil
    }),
    pipz.Transform(NormalizeDataID, func(ctx context.Context, u User) User {
        u.Email = strings.ToLower(u.Email)
        return u
    }),
    pipz.Effect(AuditLogID, func(ctx context.Context, u User) error {
        log.Printf("User validated: %s", u.Email)
        return nil
    }),
}

Connectors: Control the Flow

Connectors compose processors and control execution flow:

Sequential Processing

Sequence - Process data through steps in order:

pipeline := pipz.NewSequence(UserFlowID, validators...)

Parallel Processing

These require T to implement Cloner[T] for safe concurrent execution:

Concurrent - Run all processors simultaneously:

var NotifyID = pipz.NewIdentity("notify", "Sends notifications")

notifications := pipz.NewConcurrent(NotifyID,
    sendEmail,
    sendSMS,
    updateMetrics,
)

Race - Return first successful result:

var FetchID = pipz.NewIdentity("fetch", "Fetches from multiple sources")

fetch := pipz.NewRace(FetchID,
    primaryDB,
    replicaDB,
    cache,
)

Contest - Return first result meeting criteria:

var (
    QualityID     = pipz.NewIdentity("quality", "Finds best quality result")
    QualityCheckF = func(ctx context.Context, result Result) bool {
        return result.Confidence > 0.9
    }
)

quality := pipz.NewContest(QualityID, QualityCheckF,
    aiModel1, aiModel2, aiModel3,
)

Conditional Processing

Switch - Route based on conditions:

var (
    RouterID = pipz.NewIdentity("router", "Routes requests by tier")
    RouterF  = func(ctx context.Context, req Request) string {
        if req.Premium {
            return "premium"
        }
        return "standard"
    }
)

router := pipz.NewSwitch(RouterID, RouterF).
    AddRoute("premium", premiumPipeline).
    AddRoute("standard", standardPipeline)

Error Handling

Fallback - Provide alternative on failure:

var SafeID = pipz.NewIdentity("safe", "Safe operation with fallback")

safe := pipz.NewFallback(SafeID, riskyOperation, safeDefault)

Retry - Retry transient failures:

var APIID = pipz.NewIdentity("api", "Retries API calls")

reliable := pipz.NewRetry(APIID, apiCall, 3)

Resilience

CircuitBreaker - Prevent cascading failures:

var ServiceID = pipz.NewIdentity("service", "Protected service")

protected := pipz.NewCircuitBreaker(ServiceID, processor, 5, 30*time.Second)

RateLimiter - Control throughput:

var RateLimitID = pipz.NewIdentity("api", "API rate limiter")

throttled := pipz.NewRateLimiter[Request](RateLimitID, 100, 10) // 100/sec, burst 10

Timeout - Bound execution time:

var SlowID = pipz.NewIdentity("slow", "Timeout protection")

bounded := pipz.NewTimeout(SlowID, processor, 5*time.Second)

Execution Context

Pipeline - Wrap pipelines with execution context for distributed tracing:

var (
    OrderPipelineID = pipz.NewIdentity("order-processing", "Main order flow")
    InternalSeqID   = pipz.NewIdentity("order-steps", "Processing sequence")
)

// Build processing logic
sequence := pipz.NewSequence(InternalSeqID, validate, enrich, save)

// Wrap with Pipeline for execution context
pipeline := pipz.NewPipeline(OrderPipelineID, sequence)

// Each Process() call gets a unique execution ID
result, err := pipeline.Process(ctx, order)

Pipeline injects two correlation IDs into context:

  • Execution ID - Unique per Process() call, for tracing individual requests
  • Pipeline ID - Stable across executions, for grouping by pipeline

Extract these in signal handlers or processors:

if execID, ok := pipz.ExecutionIDFromContext(ctx); ok {
    log.Printf("Execution: %s", execID)
}
if pipeID, ok := pipz.PipelineIDFromContext(ctx); ok {
    log.Printf("Pipeline: %s", pipeID)
}

Type Safety Through Generics

pipz leverages Go generics for compile-time type safety:

// Define identities upfront
var (
    UserPipelineID = pipz.NewIdentity("user-pipeline", "User processing pipeline")
    ConvertID      = pipz.NewIdentity("convert", "Converts user to order")
)

// Type is locked at pipeline creation
pipeline := pipz.NewSequence[User](UserPipelineID)

// This won't compile - type mismatch
pipeline.Register(processOrder) // Error: expects Chainable[User], got Chainable[Order]

// Transform between types explicitly
converter := pipz.Apply(ConvertID, func(ctx context.Context, u User) (Order, error) {
    return u.CreateOrder()
})

The Cloner Constraint

For concurrent processing, your type must implement Cloner[T]:

type Data struct {
    Values []int
}

func (d Data) Clone() Data {
    newValues := make([]int, len(d.Values))
    copy(newValues, d.Values)
    return Data{Values: newValues}
}

Error Philosophy

Errors in pipz are first-class citizens with rich context:

type Error[T any] struct {
    Path      []Name        // Full path through pipeline
    Err       error         // The underlying error
    InputData T             // Data at failure
    Timestamp time.Time     // When the error occurred
    Duration  time.Duration // How long before failure
    Timeout   bool          // Was it a timeout?
    Canceled  bool          // Was it canceled?
}

This design enables:

  • Precise debugging: Know exactly where and why failures occur
  • Error recovery: Access data state for compensation
  • Error pipelines: Process errors through their own pipelines

Error Pipeline Pattern

// Define identities upfront
var (
    ErrorHandlerID = pipz.NewIdentity("error-handler", "Error handling pipeline")
    LogID          = pipz.NewIdentity("log", "Logs errors")
    ClassifyID     = pipz.NewIdentity("classify", "Classifies error types")
    RecoveryID     = pipz.NewIdentity("recovery", "Selects recovery strategy")
    MainID         = pipz.NewIdentity("main", "Main processing pipeline")
    RecoverID      = pipz.NewIdentity("recover", "Error recovery")
)

// Errors are just data - process them like anything else
errorPipeline := pipz.NewSequence[*pipz.Error[Order]](ErrorHandlerID,
    pipz.Effect(LogID, logError),
    pipz.Apply(ClassifyID, classifyError),
    pipz.Switch(RecoveryID, selectRecoveryStrategy),
)

// Use with main pipeline
mainPipeline := pipz.NewFallback(MainID,
    orderProcessing,
    pipz.Handle(RecoverID, errorPipeline),
)

Best Practices

  1. Name processors with identities - Use descriptive names and descriptions
  2. Keep processors focused - Each should do one thing well
  3. Compose, don't configure - Build complex behavior from simple parts
  4. Test in isolation - Each processor should be independently testable
  5. Handle context - Always respect cancellation and timeouts
  6. Clone properly - Deep copy slices and maps in Clone() methods

Next Steps