Composable Data Pipelines for Go. One Interface. Infinite Composition.
Build complex data processing from small, reusable parts. Every processor and connector implements Chainable[T] — compose retry, circuit breaking, and routing without changing your logic.
Getting Startedimport "github.com/zoobz-io/pipz"
// Small, focused processors
validate := pipz.Apply(validateID, func(_ context.Context, o Order) (Order, error) {
if o.Total <= 0 {
return o, errors.New("invalid total")
}
return o, nil
})
enrich := pipz.Transform(enrichID, func(_ context.Context, o Order) Order {
o.ProcessedAt = time.Now()
return o
})
notify := pipz.Effect(notifyID, func(_ context.Context, o Order) error {
return sendConfirmation(o)
})
// Compose with resilience — it's Chainable[T] all the way down
pipeline := pipz.NewSequence(pipelineID,
validate,
enrich,
pipz.NewRetry(retryID, notify, pipz.RetryConfig{MaxAttempts: 3}),
)
result, err := pipeline.Process(ctx, order)
if err != nil {
var pipeErr *pipz.Error[Order]
errors.As(err, &pipeErr)
fmt.Println(pipeErr.Path) // ["pipeline", "retry", "notify"]
}Why Pipz?
One interface, every primitive. Processors and connectors compose without limits.
Uniform Chainable[T] Interface
Every processor, every connector, every pipeline implements one interface. Seamless composition, zero special cases.
Composable Resilience
Layer retry, circuit breaker, timeout, and rate limiting without changing processing logic. Nest freely.
Rich Error Context
Errors carry the full path through the pipeline, duration, and input data at failure. Know exactly where and why.
Observable Without Instrumentation
Stateful connectors emit typed capitan signals at critical transitions. Monitoring requires no code changes.
Panic-Safe Execution
Panics recovered automatically with security-conscious sanitization. Sensitive data won't leak in error messages.
Extensible Vocabulary
Libraries define domain-specific processors returning Chainable[T]. Users extend with custom implementations — all first-class.
Capabilities
Processors for transformation, connectors for orchestration — all composable through one interface.
| Feature | Description | Link |
|---|---|---|
| Processors | Transform, Apply, Effect, Mutate, Enrich, Filter — immutable wrappers around your functions. | Core Concepts |
| Flow Control | Sequence, Concurrent, WorkerPool, Race, Contest — orchestrate execution order and parallelism. | Connector Selection |
| Resilience Patterns | Retry, Backoff, CircuitBreaker, RateLimiter, Timeout, Fallback — layer without code changes. | Safety & Reliability |
| Conditional Routing | Switch for multi-way routing, Filter for gates. Route data through different paths based on content. | Building Pipelines |
| Error Pipelines | Errors as data flowing through the same patterns. Build recovery using Switch, Sequence, Fallback. | Core Concepts |
| Signal Observability | Typed capitan events for circuit state changes, rate limit hits, retry exhaustion, and timeout breaches. | Hooks |
Articles
Browse the full pipz documentation.