Getting Started with pipz
Build your first data pipeline in 10 minutes.
What is pipz?
pipz is a Go library for building type-safe, composable data pipelines. Think of it as LEGO blocks for data processing - small, focused pieces that connect together to solve complex problems.
Installation
go get github.com/zoobzio/pipz
Requires Go 1.21+ for generics support.
Your First Pipeline
Let's build a simple pipeline that processes user registration:
package main
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/zoobzio/pipz"
)
// Our data structure
type User struct {
Email string
Username string
Age int
Verified bool
}
func main() {
// Define identities upfront
var (
RegistrationID = pipz.NewIdentity("registration", "User registration pipeline")
ValidateID = pipz.NewIdentity("validate", "Validates user input fields")
NormalizeID = pipz.NewIdentity("normalize", "Normalizes user data")
LogID = pipz.NewIdentity("log", "Logs successful user registration")
)
// Create a pipeline with 3 steps
pipeline := pipz.NewSequence[User](RegistrationID,
// Step 1: Validate
pipz.Apply(ValidateID, validateUser),
// Step 2: Normalize
pipz.Transform(NormalizeID, normalizeUser),
// Step 3: Log
pipz.Effect(LogID, logUser),
)
// Process a user
user := User{
Email: "JOHN.DOE@EXAMPLE.COM",
Username: "johndoe123",
Age: 25,
}
result, err := pipeline.Process(context.Background(), user)
if err != nil {
fmt.Printf("Pipeline failed: %v\n", err)
return
}
fmt.Printf("Processed user: %+v\n", result)
}
// Validation can fail
func validateUser(ctx context.Context, u User) (User, error) {
if u.Email == "" {
return u, errors.New("email required")
}
if u.Age < 18 {
return u, errors.New("must be 18 or older")
}
return u, nil
}
// Transformation can't fail
func normalizeUser(ctx context.Context, u User) User {
u.Email = strings.ToLower(u.Email)
u.Verified = true
return u
}
// Side effect (logging)
func logUser(ctx context.Context, u User) error {
fmt.Printf("User registered: %s\n", u.Email)
return nil
}
Understanding the Building Blocks
Processors: Transform Your Data
pipz provides different processor types for different needs:
// Define identities upfront
var (
UpperID = pipz.NewIdentity("upper", "Converts strings to uppercase")
ParseID = pipz.NewIdentity("parse", "Parses JSON data")
LogID = pipz.NewIdentity("log", "Logs data processing")
)
// Transform: Pure functions that can't fail
upperCase := pipz.Transform(UpperID, func(ctx context.Context, s string) string {
return strings.ToUpper(s)
})
// Apply: Operations that can fail
parse := pipz.Apply(ParseID, func(ctx context.Context, s string) (Data, error) {
var data Data
err := json.Unmarshal([]byte(s), &data)
return data, err
})
// Effect: Side effects without modifying data
log := pipz.Effect(LogID, func(ctx context.Context, d Data) error {
fmt.Printf("Processing: %+v\n", d)
return nil
})
Connectors: Control the Flow
Connectors determine how processors are executed:
// Define identities upfront
var (
StepsID = pipz.NewIdentity("steps", "Sequential processing")
ParallelID = pipz.NewIdentity("parallel", "Parallel processing")
ResilientID = pipz.NewIdentity("resilient", "Resilient processing with fallback")
RouterID = pipz.NewIdentity("router", "Routes data by type")
)
// Sequence: Run steps in order
sequential := pipz.NewSequence[Data](StepsID, step1, step2, step3)
// Concurrent: Run in parallel (requires Clone())
parallel := pipz.NewConcurrent[Data](ParallelID, task1, task2, task3)
// Fallback: Try primary, use backup on failure
resilient := pipz.NewFallback[Data](ResilientID, primary, backup)
// Switch: Route based on conditions
router := pipz.NewSwitch[Data](RouterID, getType).
AddRoute("typeA", processTypeA).
AddRoute("typeB", processTypeB)
Building a Real-World Pipeline
Let's create a more realistic example - an order processing pipeline with validation, enrichment, and error handling:
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/zoobzio/pipz"
)
type Order struct {
ID string
CustomerID string
Items []Item
Total float64
Status string
CreatedAt time.Time
}
type Item struct {
ProductID string
Quantity int
Price float64
}
// Implement Clone for concurrent processing
func (o Order) Clone() Order {
items := make([]Item, len(o.Items))
copy(items, o.Items)
return Order{
ID: o.ID,
CustomerID: o.CustomerID,
Items: items,
Total: o.Total,
Status: o.Status,
CreatedAt: o.CreatedAt,
}
}
func createOrderPipeline() pipz.Chainable[Order] {
// Define identities upfront
var (
OrderProcessingID = pipz.NewIdentity("order-processing", "Complete order processing pipeline")
ValidateID = pipz.NewIdentity("validate", "Validates order data")
CalculateTotalID = pipz.NewIdentity("calculate-total", "Calculates order total")
AddCustomerDataID = pipz.NewIdentity("add-customer-data", "Enriches order with customer data")
CheckInventoryID = pipz.NewIdentity("check-inventory", "Checks product inventory")
ProcessPaymentID = pipz.NewIdentity("process-payment", "Processes payment")
NotificationsID = pipz.NewIdentity("notifications", "Sends notifications")
EmailID = pipz.NewIdentity("email", "Sends email confirmation")
SMSID = pipz.NewIdentity("sms", "Sends SMS notification")
AnalyticsID = pipz.NewIdentity("analytics", "Tracks order metrics")
CompleteID = pipz.NewIdentity("complete", "Marks order as completed")
)
return pipz.NewSequence[Order](OrderProcessingID,
// Validation phase
pipz.Apply(ValidateID, validateOrder),
// Enrichment phase
pipz.Transform(CalculateTotalID, calculateTotal),
pipz.Enrich(AddCustomerDataID, enrichWithCustomerData),
// Processing phase
pipz.Apply(CheckInventoryID, checkInventory),
pipz.Apply(ProcessPaymentID, processPayment),
// Parallel notifications
pipz.NewConcurrent[Order](NotificationsID,
pipz.Effect(EmailID, sendEmailConfirmation),
pipz.Effect(SMSID, sendSMSNotification),
pipz.Effect(AnalyticsID, trackOrderMetrics),
),
// Final status update
pipz.Transform(CompleteID, func(ctx context.Context, o Order) Order {
o.Status = "completed"
return o
}),
)
}
func validateOrder(ctx context.Context, o Order) (Order, error) {
if len(o.Items) == 0 {
return o, errors.New("order must have items")
}
if o.CustomerID == "" {
return o, errors.New("customer ID required")
}
return o, nil
}
func calculateTotal(ctx context.Context, o Order) Order {
total := 0.0
for _, item := range o.Items {
total += item.Price * float64(item.Quantity)
}
o.Total = total
return o
}
func enrichWithCustomerData(ctx context.Context, o Order) (Order, error) {
// This is optional - won't fail the pipeline
customer, err := fetchCustomer(o.CustomerID)
if err != nil {
// Log but continue
fmt.Printf("Could not enrich with customer data: %v\n", err)
return o, err // Enrich logs but doesn't fail
}
// Add customer tier for discounts, etc.
o.Status = fmt.Sprintf("processing-%s", customer.Tier)
return o, nil
}
func checkInventory(ctx context.Context, o Order) (Order, error) {
for _, item := range o.Items {
available, err := getInventoryCount(item.ProductID)
if err != nil {
return o, fmt.Errorf("inventory check failed: %w", err)
}
if available < item.Quantity {
return o, fmt.Errorf("insufficient inventory for %s", item.ProductID)
}
}
return o, nil
}
func processPayment(ctx context.Context, o Order) (Order, error) {
// Process payment...
fmt.Printf("Processing payment of $%.2f\n", o.Total)
return o, nil
}
func sendEmailConfirmation(ctx context.Context, o Order) error {
fmt.Printf("Sending email for order %s\n", o.ID)
return nil
}
func sendSMSNotification(ctx context.Context, o Order) error {
fmt.Printf("Sending SMS for order %s\n", o.ID)
return nil
}
func trackOrderMetrics(ctx context.Context, o Order) error {
fmt.Printf("Tracking metrics for order %s: $%.2f\n", o.ID, o.Total)
return nil
}
Adding Resilience
Real-world systems need error handling, retries, and timeouts:
func createResilientPipeline() pipz.Chainable[Order] {
// Define identities upfront
var (
TimeoutProtectionID = pipz.NewIdentity("timeout-protection", "Protects against slow operations")
RetryOnFailureID = pipz.NewIdentity("retry-on-failure", "Retries failed operations")
WithFallbackID = pipz.NewIdentity("with-fallback", "Provides fallback processing")
FallbackID = pipz.NewIdentity("fallback", "Fallback order processing")
)
// Basic pipeline
basicPipeline := createOrderPipeline()
// Add resilience layers
return pipz.NewTimeout(TimeoutProtectionID,
pipz.NewRetry(RetryOnFailureID,
pipz.NewFallback(WithFallbackID,
basicPipeline,
pipz.Apply(FallbackID, processFallbackOrder),
),
3, // Retry up to 3 times
),
30*time.Second, // Overall timeout
)
}
func processFallbackOrder(ctx context.Context, o Order) (Order, error) {
// Simplified processing for fallback
o.Status = "pending-manual-review"
fmt.Printf("Order %s sent for manual review\n", o.ID)
return o, nil
}
Error Handling
pipz provides rich error information:
func handlePipelineError(err error) {
var pipeErr *pipz.Error[Order]
if errors.As(err, &pipeErr) {
// Path is []Identity - get the last stage name
if len(pipeErr.Path) > 0 {
fmt.Printf("Pipeline failed at: %s\n", pipeErr.Path[len(pipeErr.Path)-1].Name())
}
fmt.Printf("Error: %v\n", pipeErr.Err)
fmt.Printf("Order state at failure: %+v\n", pipeErr.InputData)
if pipeErr.Timeout {
fmt.Println("Failure was due to timeout")
}
}
}
func main() {
pipeline := createResilientPipeline()
order := Order{
ID: "ORD-123",
CustomerID: "CUST-456",
Items: []Item{
{ProductID: "PROD-1", Quantity: 2, Price: 29.99},
{ProductID: "PROD-2", Quantity: 1, Price: 49.99},
},
CreatedAt: time.Now(),
}
result, err := pipeline.Process(context.Background(), order)
if err != nil {
handlePipelineError(err)
return
}
fmt.Printf("Order processed successfully: %+v\n", result)
}
Advanced Patterns
Conditional Processing
// Define identities upfront
var (
ValueRouterID = pipz.NewIdentity("value-router", "Routes orders by value")
)
// Route orders based on value
valueRouter := pipz.NewSwitch[Order](ValueRouterID,
func(ctx context.Context, o Order) string {
if o.Total > 1000 {
return "high-value"
}
if o.Total > 100 {
return "standard"
}
return "low-value"
},
).
AddRoute("high-value", highValuePipeline).
AddRoute("standard", standardPipeline).
AddRoute("low-value", lowValuePipeline)
Rate Limiting
// Define identities upfront
var (
APILimitID = pipz.NewIdentity("api-limit", "Rate limits API calls")
ProtectedID = pipz.NewIdentity("protected", "Protected API calls")
APICallID = pipz.NewIdentity("api-call", "Calls external API")
)
// Protect external APIs
var apiLimiter = pipz.NewRateLimiter[Order](APILimitID, 100, 10)
protectedAPI := pipz.NewSequence[Order](ProtectedID,
apiLimiter,
pipz.Apply(APICallID, callExternalAPI),
)
Circuit Breaking
// Define identities upfront
var (
BreakerID = pipz.NewIdentity("breaker", "Protects against cascading failures")
)
// Prevent cascading failures
circuitBreaker := pipz.NewCircuitBreaker[Order](BreakerID,
externalService,
5, // Open after 5 failures
30*time.Second, // Try recovery after 30s
)
Testing Your Pipelines
Pipelines are easy to test:
func TestOrderPipeline(t *testing.T) {
pipeline := createOrderPipeline()
// Test valid order
validOrder := Order{
ID: "TEST-1",
CustomerID: "CUST-1",
Items: []Item{{ProductID: "P1", Quantity: 1, Price: 10.00}},
}
result, err := pipeline.Process(context.Background(), validOrder)
assert.NoError(t, err)
assert.Equal(t, "completed", result.Status)
assert.Equal(t, 10.00, result.Total)
// Test invalid order
invalidOrder := Order{ID: "TEST-2"} // Missing customer ID
_, err = pipeline.Process(context.Background(), invalidOrder)
assert.Error(t, err)
assert.Contains(t, err.Error(), "customer ID required")
}
Best Practices
- Define identities upfront as package-level variables
// Create reusable identity constants
var (
StageValidate = pipz.NewIdentity("validate", "Validates input data")
StageEnrich = pipz.NewIdentity("enrich", "Enriches with metadata")
StageProcess = pipz.NewIdentity("process", "Processes the data")
)
- Implement Clone() properly for concurrent processing
func (d Data) Clone() Data {
// Deep copy all reference types
newSlice := make([]Item, len(d.Items))
copy(newSlice, d.Items)
return Data{Items: newSlice}
}
- Respect context cancellation
func slowOperation(ctx context.Context, data Data) (Data, error) {
select {
case <-ctx.Done():
return data, ctx.Err()
case result := <-doWork(data):
return result, nil
}
}
- Create singletons for stateful connectors
// Define identities upfront
var (
APIID = pipz.NewIdentity("api", "API rate limiter")
)
// RIGHT - Shared instance
var rateLimiter = pipz.NewRateLimiter[Data](APIID, 100, 10)
// WRONG - New instance each time
func process(data Data) {
limiter := pipz.NewRateLimiter[Data](APIID, 100, 10) // Don't do this!
}
Next Steps
Now that you understand the basics:
- Explore the Core Concepts for deeper understanding
- Check the Cookbook for real-world recipes
- Browse the API Reference for detailed documentation
Common Questions
Q: When should I use pipz instead of regular Go code? A: Use pipz when you need composable, reusable data processing with good error handling and built-in patterns like retry, circuit breaking, and rate limiting.
Q: How does pipz compare to other pipeline libraries? A: pipz focuses on type safety, simplicity, and composability. It's lighter than workflow engines like Temporal but more structured than basic function chaining.
Q: Can I use pipz for streaming data? A: pipz processes one item at a time. For streaming, wrap your stream processing in pipz processors.
Q: How do I handle errors in the middle of a pipeline?
A: Use Fallback for recovery, Handle for cleanup, or check the Safety and Reliability guide.
Getting Help
- Check the Troubleshooting guide
- Open an Issue on GitHub
Happy pipelining!