Overview
Data processing in Go often means choosing between type safety and flexibility.
pipz offers both: a single interface that everything implements, with zero reflection and full compile-time checking.
type Chainable[T any] interface {
Process(context.Context, T) (T, error)
Identity() Identity
Schema() Node
Close() error
}
Implement it directly for custom processors. Use the provided wrappers for common patterns. Mix both approaches in the same pipeline.
// Custom processor - full control
type RateLimiter[T any] struct {
limiter *rate.Limiter
}
func (r *RateLimiter[T]) Process(ctx context.Context, data T) (T, error) {
if err := r.limiter.Wait(ctx); err != nil {
return data, err
}
return data, nil
}
// Define identities upfront
var (
ValidateID = pipz.NewIdentity("validate", "Validate order fields")
EnrichID = pipz.NewIdentity("enrich", "Add timestamp to order")
OrderFlowID = pipz.NewIdentity("order-flow", "Process customer orders")
)
// Built-in wrappers - convenience
validate := pipz.Apply(ValidateID, validateOrder)
enrich := pipz.Transform(EnrichID, addTimestamp)
// Compose freely
pipeline := pipz.NewSequence(OrderFlowID,
validate,
&RateLimiter[Order]{limiter: limiter},
enrich,
)
Type-safe, minimal dependencies, panic-recovered by default.
Philosophy
pipz draws inspiration from functional composition: small, focused units that combine into complex behaviors. Any type implementing Chainable[T] can participate in a pipeline. This creates a uniform abstraction that spans your entire processing flow.
// Define identities upfront
var (
ValidatePaymentID = pipz.NewIdentity("validate", "Validate payment details")
ChargeCardID = pipz.NewIdentity("charge", "Charge customer card")
RetryID = pipz.NewIdentity("retry", "Retry charge on failure")
GatewayID = pipz.NewIdentity("gateway", "Try multiple payment gateways")
BoundedID = pipz.NewIdentity("bounded", "Enforce processing timeout")
PaymentID = pipz.NewIdentity("payment", "Complete payment processing flow")
ChargeTimeoutID = pipz.NewIdentity("charge-timeout", "Bound total charge time")
MultiGatewayID = pipz.NewIdentity("multi-gateway", "Failover between gateways")
PrimaryID = pipz.NewIdentity("primary", "Retry primary gateway")
)
// Processors transform data
validate := pipz.Apply(ValidatePaymentID, validatePayment)
charge := pipz.Apply(ChargeCardID, chargeCard)
// Connectors compose processors
withRetry := pipz.NewRetry(RetryID, charge, 3)
withFallback := pipz.NewFallback(GatewayID, primaryCharge, backupCharge)
withTimeout := pipz.NewTimeout(BoundedID, pipeline, 5*time.Second)
// Everything composes the same way
robust := pipz.NewSequence(PaymentID,
validate,
pipz.NewTimeout(ChargeTimeoutID,
pipz.NewFallback(MultiGatewayID,
pipz.NewRetry(PrimaryID, primaryCharge, 3),
backupCharge,
),
10*time.Second,
),
)
Three levels of composition, one interface, complete type safety throughout.
Capabilities
A uniform interface opens possibilities:
Resilience - Retry failed operations, fall back to alternatives, enforce timeouts. Layer these patterns without changing your processing logic.
Parallelism - Run processors concurrently with Concurrent, bound parallelism with WorkerPool, race for first success with Race.
Routing - Direct data through different paths with Switch. Filter conditionally with Filter. Contest for the first result meeting criteria.
Observability - Emit typed signals on state changes. CircuitBreaker, RateLimiter, and WorkerPool broadcast their state for monitoring and alerting.
pipz provides the composition layer. What you build on top is up to you.
Priorities
Type Safety
Generics eliminate runtime type assertions. Data flows through pipelines with compile-time checking at every step.
// Define identities upfront
var (
ProcessID = pipz.NewIdentity("process", "Process customer orders")
ValidateID = pipz.NewIdentity("validate", "Validate order fields")
)
// Compile-time type checking
pipeline := pipz.NewSequence[Order](ProcessID,
pipz.Apply(ValidateID, func(_ context.Context, o Order) (Order, error) {
// o is Order, not interface{}
return o, nil
}),
)
// Type mismatch caught at compile time, not runtime
// pipeline.Process(ctx, "not an order") // Won't compile
Composability
Small processors combine into complex behaviors. Each connector serves one purpose and combines cleanly with others.
// Define identities upfront
var (
BoundedID = pipz.NewIdentity("bounded", "Enforce overall timeout")
WithRetryID = pipz.NewIdentity("with-retry", "Retry failed operations")
WithFallbackID = pipz.NewIdentity("with-fallback", "Fallback on error")
)
// Layer resilience patterns
resilient := pipz.NewTimeout(BoundedID,
pipz.NewRetry(WithRetryID,
pipz.NewFallback(WithFallbackID, primary, backup),
3,
),
5*time.Second,
)
No configuration objects. No builder patterns. Just composition.
Error Context
Errors carry their full path through the pipeline. Know exactly where failures occurred, how long operations took, and what data was being processed.
result, err := pipeline.Process(ctx, order)
if err != nil {
var pipeErr *pipz.Error[Order]
if errors.As(err, &pipeErr) {
// Path is []Identity - extract names for display
var path []string
for _, id := range pipeErr.Path {
path = append(path, id.Name())
}
fmt.Printf("Failed at: %s\n", strings.Join(path, " -> "))
fmt.Printf("Duration: %v\n", pipeErr.Duration)
fmt.Printf("Input: %+v\n", pipeErr.InputData)
}
}
// Output: Failed at: payment -> gateway -> primary
// Duration: 2.3s
// Input: Order{ID: "ORD-123", Total: 99.99}
Errors as Data
Most frameworks treat errors as exceptions. pipz treats them as data that flows through pipelines. Build error recovery using the same tools you use for regular processing.
// Define identities upfront
var (
RecoverID = pipz.NewIdentity("recover", "Error recovery pipeline")
CategorizeID = pipz.NewIdentity("categorize", "Categorize error type")
RouteID = pipz.NewIdentity("route", "Route by severity")
OrderFlowID = pipz.NewIdentity("order-flow", "Order processing with error handling")
)
// Error recovery pipeline - same patterns, same composition
errorHandler := pipz.NewSequence[*pipz.Error[Order]](RecoverID,
pipz.Transform(CategorizeID, categorizeError),
pipz.NewSwitch(RouteID, routeBySeverity),
)
// Attach to any pipeline
robust := pipz.NewHandle(OrderFlowID, mainPipeline, errorHandler)
Safety
Panics are recovered automatically with security-focused sanitization. One misbehaving processor won't crash your system or leak sensitive data in error messages.
// Panics become errors, automatically
result, err := pipeline.Process(ctx, data)
// err contains sanitized panic info if a processor panicked