Composable Data Pipelines for Go. One Interface. Infinite Composition.

Build complex data processing from small, reusable parts. Every processor and connector implements Chainable[T] — compose retry, circuit breaking, and routing without changing your logic.

Getting Started
import "github.com/zoobz-io/pipz"

// Small, focused processors
validate := pipz.Apply(validateID, func(_ context.Context, o Order) (Order, error) {
    if o.Total <= 0 {
        return o, errors.New("invalid total")
    }
    return o, nil
})

enrich := pipz.Transform(enrichID, func(_ context.Context, o Order) Order {
    o.ProcessedAt = time.Now()
    return o
})

notify := pipz.Effect(notifyID, func(_ context.Context, o Order) error {
    return sendConfirmation(o)
})

// Compose with resilience — it's Chainable[T] all the way down
pipeline := pipz.NewSequence(pipelineID,
    validate,
    enrich,
    pipz.NewRetry(retryID, notify, pipz.RetryConfig{MaxAttempts: 3}),
)

result, err := pipeline.Process(ctx, order)
if err != nil {
    var pipeErr *pipz.Error[Order]
    errors.As(err, &pipeErr)
    fmt.Println(pipeErr.Path) // ["pipeline", "retry", "notify"]
}
96%Test Coverage
A+Go Report
MITLicense
1.24+Go Version
v1.0.5Latest Release

Why Pipz?

One interface, every primitive. Processors and connectors compose without limits.

Uniform Chainable[T] Interface

Every processor, every connector, every pipeline implements one interface. Seamless composition, zero special cases.

Composable Resilience

Layer retry, circuit breaker, timeout, and rate limiting without changing processing logic. Nest freely.

Rich Error Context

Errors carry the full path through the pipeline, duration, and input data at failure. Know exactly where and why.

Observable Without Instrumentation

Stateful connectors emit typed capitan signals at critical transitions. Monitoring requires no code changes.

Panic-Safe Execution

Panics recovered automatically with security-conscious sanitization. Sensitive data won't leak in error messages.

Extensible Vocabulary

Libraries define domain-specific processors returning Chainable[T]. Users extend with custom implementations — all first-class.

Capabilities

Processors for transformation, connectors for orchestration — all composable through one interface.

FeatureDescriptionLink
ProcessorsTransform, Apply, Effect, Mutate, Enrich, Filter — immutable wrappers around your functions.Core Concepts
Flow ControlSequence, Concurrent, WorkerPool, Race, Contest — orchestrate execution order and parallelism.Connector Selection
Resilience PatternsRetry, Backoff, CircuitBreaker, RateLimiter, Timeout, Fallback — layer without code changes.Safety & Reliability
Conditional RoutingSwitch for multi-way routing, Filter for gates. Route data through different paths based on content.Building Pipelines
Error PipelinesErrors as data flowing through the same patterns. Build recovery using Switch, Sequence, Fallback.Core Concepts
Signal ObservabilityTyped capitan events for circuit state changes, rate limit hits, retry exhaustion, and timeout breaches.Hooks

Articles

Browse the full pipz documentation.

OverviewType-safe, composable data pipelines for Go

Learn

Getting Started with pipzBuild your first type-safe data pipeline in 10 minutes with practical examples and best practices
Introduction to pipzType-safe, composable data pipelines with Go generics for robust and maintainable data processing
Core ConceptsUnderstanding processors, connectors, and composition patterns for building data pipelines
ArchitectureSystem design, data flow patterns, and internal architecture of the pipz library
Hooks and ObservabilityType-safe event hooks for monitoring, debugging, and observability with capitan integration

Guides

Connector Selection GuideDecision tree and comparison matrix for choosing the right connector for your use case
Clone Implementation GuideComprehensive guide to implementing the Clone() method correctly for safe concurrent processing
Best PracticesDesign principles, patterns, and production-ready guidelines for building robust pipelines with pipz
Testing pipz PipelinesComprehensive testing guide including MockProcessor, ChaosProcessor, assertions, and three-tier testing strategy
Performance Optimization GuideTechniques and strategies for building high-performance pipelines with minimal allocations and optimal resource usage
Safety and Reliability GuideBuilt-in safety features including automatic panic recovery, security sanitization, and resilience patterns
Troubleshooting GuideCommon issues, gotchas, and solutions for diagnosing and resolving problems in pipz pipelines

Cookbook

Recipe: Building PipelinesComplete user registration pipeline with validation, enrichment, resilience, and observability
Recipe: Library ResilienceExpose pipz resilience patterns to library consumers via functional options
Recipe: Extensible Application VocabularyCreate domain-specific APIs where library and user code compose seamlessly

Reference

pipz Quick ReferenceFast-reference cheatsheet for processor selection, connector decision trees, and common pipeline patterns
TypesCore type definitions for pipz pipelines
ProcessorsFunction wrappers that transform, validate, and enrich data flowing through pipelines
ConnectorsComposition primitives that control how processors are executed and connected