Introduction to pipz
pipz is a Go library for building type-safe, composable data pipelines. It leverages Go's generics to provide compile-time type safety while offering a simple, functional API for complex data transformations.
Why pipz?
The Problem
Building data processing pipelines in Go often involves:
- Writing boilerplate code for error handling
- Managing complex control flow (retries, timeouts, fallbacks)
- Dealing with
interface{}and runtime type assertions - Difficulty in testing individual components
- Hard-to-reuse processing logic
The Solution
pipz provides:
- Type Safety: Full compile-time type checking with generics
- Composability: Small, reusable components that combine easily
- Error Handling: Built-in patterns for retries, fallbacks, and recovery
- Testability: Each component is independently testable
- Performance: Optimized design with minimal allocations
Core Philosophy
pipz follows these principles:
- Interface-First Design: Everything implements
Chainable[T]- a single, simple interface - Your Code, Your Way: Implement the interface directly or use our convenience wrappers
- Composition over Configuration: Build complex behavior by combining simple pieces
- Type Safety First: No
interface{}, no runtime type assertions - Errors are Values: Explicit error handling at every step
- Context Awareness: Full support for cancellation and timeouts
Key Concepts
The Chainable Interface
The foundation of pipz - everything implements this simple interface:
type Chainable[T any] interface {
Process(context.Context, T) (T, error)
Identity() Identity
Schema() Node
Close() error
}
Any type implementing this interface can be used in a pipeline. This gives you complete flexibility:
- Implement it directly for custom processors
- Use the provided wrapper functions for common patterns
- Mix both approaches in the same pipeline
Processors
The atomic units that transform data. You can create them by:
- Direct Implementation: Implement
Chainable[T]for full control - Wrapper Functions: Use
Transform,Apply,Effect, etc. for convenience
Connectors
Mutable components that combine any Chainable[T] implementations into more complex behaviors:
NewSequence: Run processors in orderNewSwitch: Route to different processors based on conditionsNewConcurrent: Run multiple processors in parallelNewRace: Use the first successful result- And many more...
Pipelines
Managed sequences of processors with introspection and modification capabilities.
Use Cases
pipz excels at:
- ETL (Extract, Transform, Load) pipelines
- API request/response processing
- Event stream processing
- Payment processing with failover
- Content moderation pipelines
- Data validation workflows
- Microservice orchestration
What Makes pipz Different?
Unlike traditional pipeline libraries, pipz:
- Uses Go generics for complete type safety
- Requires no code generation or reflection
- Has minimal external dependencies (clockz, capitan, uuid)
- Provides both functional and object-oriented APIs
- Includes battle-tested patterns (retry, timeout, fallback)
- Returns rich error context showing the exact failure path
- Supports both declarative and dynamic pipeline construction
- Treats errors as data flowing through pipelines - use the same Switch, Concurrent, Sequence patterns for sophisticated error recovery
A Unique Approach to Error Handling
Most frameworks treat errors as exceptions or callbacks. pipz treats them as data that flows through pipelines. This means you can build sophisticated error recovery flows using the exact same tools you use for regular data processing:
// Define identities as package-level variables
var (
ErrorRecoveryID = pipz.NewIdentity("error-recovery", "Error recovery pipeline")
CategorizeID = pipz.NewIdentity("categorize", "Categorizes errors by type")
SeverityRouterID = pipz.NewIdentity("severity-router", "Routes errors by severity")
ParallelRecoveryID = pipz.NewIdentity("parallel-recovery", "Parallel recovery actions")
OrderProcessingID = pipz.NewIdentity("order-processing", "Order processing with error handling")
)
// Error recovery pipeline - same tools, same patterns!
errorRecovery := pipz.NewSequence[*pipz.Error[Order]](ErrorRecoveryID,
pipz.Transform(CategorizeID, categorizeError),
pipz.NewSwitch(SeverityRouterID, routeBySeverity),
pipz.NewConcurrent(ParallelRecoveryID, notifyCustomer, updateInventory),
)
// Errors flow through this pipeline automatically
robustPipeline := pipz.NewHandle(OrderProcessingID, mainPipeline, errorRecovery)
This pattern enables type-safe, composable, and testable error handling that scales with your application complexity. See Safety and Reliability for the full power of this approach.
Next Steps
- Quick Start - Build your first pipeline in minutes
- Core Concepts - Deep dive into pipz concepts and composition patterns
- Building Pipelines - Complete production example