Core Concepts
The Pipeline Mental Model
Think of pipz as a conveyor belt for your data. Each processor is a station that transforms, validates, or enriches data as it passes through. Connectors determine how data flows between stations - sequentially, in parallel, or conditionally.
Pipeline Lifecycle & Data Flow
┌──────────────────────────────────────────────────────────────────┐
│ Complete Pipeline Lifecycle │
└──────────────────────────────────────────────────────────────────┘
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Input │─────→│ Validate │─────→│Transform │─────→│ Output │
│ Data │ │ Stage │ │ Stage │ │ Data │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Context │ │ Error │ │ Error │ │ Result │
│ State │ │ Handling │ │ Handling │ │ State │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Legend:
─────→ Data flow
▼ Context/Error propagation
[✓] Success state
[✗] Failure state
Data Flow Patterns
Sequential: Input → [A] → [B] → [C] → Output
↓ ↓ ↓
(T,e) (T,e) (T,e)
Parallel: Input → ┌─[A]─┐
├─[B]─┼→ Output
└─[C]─┘
Conditional: Input → [?] → [A] → Output
↓
[B] → Output
Error Flow: Input → [A] → [B] ✗ → Error[T]
↓
Stop
The power comes from composition: small, focused functions combine into complex workflows. This isn't just functional programming - it's a structured way to organize business logic that scales from simple validations to distributed systems.
The Chainable Interface
Everything in pipz implements a single interface:
type Chainable[T any] interface {
Process(context.Context, T) (T, error)
Identity() Identity
Schema() Node
Close() error
}
This simplicity is deliberate. Any type implementing this interface can be:
- Used in any pipeline
- Composed with any other Chainable
- Tested in isolation
- Replaced at runtime
You can implement Chainable directly for custom needs, or use the provided wrappers for common patterns.
Processors: Transform Your Data
Processors are immutable functions that act on data. pipz provides wrappers for common patterns:
| Processor | Purpose | Can Fail? | Modifies Data? | Example Use |
|---|---|---|---|---|
| Transform | Pure transformation | No | Yes | Formatting, calculations |
| Apply | Fallible transformation | Yes | Yes | Parsing, validation |
| Effect | Side effects | Yes | No | Logging, metrics |
| Mutate | Conditional modification | No | Yes/No | Feature flags |
| Enrich | Optional enhancement | Logs errors | Yes | Adding metadata |
| Handle | Error observation | Yes | No | Cleanup, notifications |
Example: Building a Validation Pipeline
// Define identities upfront
var (
ValidateEmailID = pipz.NewIdentity("validate-email", "Validates email format")
NormalizeDataID = pipz.NewIdentity("normalize-data", "Normalizes user data")
AuditLogID = pipz.NewIdentity("audit-log", "Logs validation events")
UserFlowID = pipz.NewIdentity("user-flow", "User validation flow")
)
// Create processors
validators := []pipz.Chainable[User]{
pipz.Apply(ValidateEmailID, func(ctx context.Context, u User) (User, error) {
if !strings.Contains(u.Email, "@") {
return u, errors.New("invalid email")
}
return u, nil
}),
pipz.Transform(NormalizeDataID, func(ctx context.Context, u User) User {
u.Email = strings.ToLower(u.Email)
return u
}),
pipz.Effect(AuditLogID, func(ctx context.Context, u User) error {
log.Printf("User validated: %s", u.Email)
return nil
}),
}
Connectors: Control the Flow
Connectors compose processors and control execution flow:
Sequential Processing
Sequence - Process data through steps in order:
pipeline := pipz.NewSequence(UserFlowID, validators...)
Parallel Processing
These require T to implement Cloner[T] for safe concurrent execution:
Concurrent - Run all processors simultaneously:
var NotifyID = pipz.NewIdentity("notify", "Sends notifications")
notifications := pipz.NewConcurrent(NotifyID,
sendEmail,
sendSMS,
updateMetrics,
)
Race - Return first successful result:
var FetchID = pipz.NewIdentity("fetch", "Fetches from multiple sources")
fetch := pipz.NewRace(FetchID,
primaryDB,
replicaDB,
cache,
)
Contest - Return first result meeting criteria:
var (
QualityID = pipz.NewIdentity("quality", "Finds best quality result")
QualityCheckF = func(ctx context.Context, result Result) bool {
return result.Confidence > 0.9
}
)
quality := pipz.NewContest(QualityID, QualityCheckF,
aiModel1, aiModel2, aiModel3,
)
Conditional Processing
Switch - Route based on conditions:
var (
RouterID = pipz.NewIdentity("router", "Routes requests by tier")
RouterF = func(ctx context.Context, req Request) string {
if req.Premium {
return "premium"
}
return "standard"
}
)
router := pipz.NewSwitch(RouterID, RouterF).
AddRoute("premium", premiumPipeline).
AddRoute("standard", standardPipeline)
Error Handling
Fallback - Provide alternative on failure:
var SafeID = pipz.NewIdentity("safe", "Safe operation with fallback")
safe := pipz.NewFallback(SafeID, riskyOperation, safeDefault)
Retry - Retry transient failures:
var APIID = pipz.NewIdentity("api", "Retries API calls")
reliable := pipz.NewRetry(APIID, apiCall, 3)
Resilience
CircuitBreaker - Prevent cascading failures:
var ServiceID = pipz.NewIdentity("service", "Protected service")
protected := pipz.NewCircuitBreaker(ServiceID, processor, 5, 30*time.Second)
RateLimiter - Control throughput:
var RateLimitID = pipz.NewIdentity("api", "API rate limiter")
throttled := pipz.NewRateLimiter[Request](RateLimitID, 100, 10) // 100/sec, burst 10
Timeout - Bound execution time:
var SlowID = pipz.NewIdentity("slow", "Timeout protection")
bounded := pipz.NewTimeout(SlowID, processor, 5*time.Second)
Execution Context
Pipeline - Wrap pipelines with execution context for distributed tracing:
var (
OrderPipelineID = pipz.NewIdentity("order-processing", "Main order flow")
InternalSeqID = pipz.NewIdentity("order-steps", "Processing sequence")
)
// Build processing logic
sequence := pipz.NewSequence(InternalSeqID, validate, enrich, save)
// Wrap with Pipeline for execution context
pipeline := pipz.NewPipeline(OrderPipelineID, sequence)
// Each Process() call gets a unique execution ID
result, err := pipeline.Process(ctx, order)
Pipeline injects two correlation IDs into context:
- Execution ID - Unique per
Process()call, for tracing individual requests - Pipeline ID - Stable across executions, for grouping by pipeline
Extract these in signal handlers or processors:
if execID, ok := pipz.ExecutionIDFromContext(ctx); ok {
log.Printf("Execution: %s", execID)
}
if pipeID, ok := pipz.PipelineIDFromContext(ctx); ok {
log.Printf("Pipeline: %s", pipeID)
}
Type Safety Through Generics
pipz leverages Go generics for compile-time type safety:
// Define identities upfront
var (
UserPipelineID = pipz.NewIdentity("user-pipeline", "User processing pipeline")
ConvertID = pipz.NewIdentity("convert", "Converts user to order")
)
// Type is locked at pipeline creation
pipeline := pipz.NewSequence[User](UserPipelineID)
// This won't compile - type mismatch
pipeline.Register(processOrder) // Error: expects Chainable[User], got Chainable[Order]
// Transform between types explicitly
converter := pipz.Apply(ConvertID, func(ctx context.Context, u User) (Order, error) {
return u.CreateOrder()
})
The Cloner Constraint
For concurrent processing, your type must implement Cloner[T]:
type Data struct {
Values []int
}
func (d Data) Clone() Data {
newValues := make([]int, len(d.Values))
copy(newValues, d.Values)
return Data{Values: newValues}
}
Error Philosophy
Errors in pipz are first-class citizens with rich context:
type Error[T any] struct {
Path []Name // Full path through pipeline
Err error // The underlying error
InputData T // Data at failure
Timestamp time.Time // When the error occurred
Duration time.Duration // How long before failure
Timeout bool // Was it a timeout?
Canceled bool // Was it canceled?
}
This design enables:
- Precise debugging: Know exactly where and why failures occur
- Error recovery: Access data state for compensation
- Error pipelines: Process errors through their own pipelines
Error Pipeline Pattern
// Define identities upfront
var (
ErrorHandlerID = pipz.NewIdentity("error-handler", "Error handling pipeline")
LogID = pipz.NewIdentity("log", "Logs errors")
ClassifyID = pipz.NewIdentity("classify", "Classifies error types")
RecoveryID = pipz.NewIdentity("recovery", "Selects recovery strategy")
MainID = pipz.NewIdentity("main", "Main processing pipeline")
RecoverID = pipz.NewIdentity("recover", "Error recovery")
)
// Errors are just data - process them like anything else
errorPipeline := pipz.NewSequence[*pipz.Error[Order]](ErrorHandlerID,
pipz.Effect(LogID, logError),
pipz.Apply(ClassifyID, classifyError),
pipz.Switch(RecoveryID, selectRecoveryStrategy),
)
// Use with main pipeline
mainPipeline := pipz.NewFallback(MainID,
orderProcessing,
pipz.Handle(RecoverID, errorPipeline),
)
Best Practices
- Name processors with identities - Use descriptive names and descriptions
- Keep processors focused - Each should do one thing well
- Compose, don't configure - Build complex behavior from simple parts
- Test in isolation - Each processor should be independently testable
- Handle context - Always respect cancellation and timeouts
- Clone properly - Deep copy slices and maps in Clone() methods
Next Steps
- Architecture - System design and internals
- Quickstart Tutorial - Build your first pipeline
- Connector Selection - Choose the right connector
- API Reference - Complete API documentation