zoobzio December 12, 2025 Edit this page

Introduction to pipz

pipz is a Go library for building type-safe, composable data pipelines. It leverages Go's generics to provide compile-time type safety while offering a simple, functional API for complex data transformations.

Why pipz?

The Problem

Building data processing pipelines in Go often involves:

  • Writing boilerplate code for error handling
  • Managing complex control flow (retries, timeouts, fallbacks)
  • Dealing with interface{} and runtime type assertions
  • Difficulty in testing individual components
  • Hard-to-reuse processing logic

The Solution

pipz provides:

  • Type Safety: Full compile-time type checking with generics
  • Composability: Small, reusable components that combine easily
  • Error Handling: Built-in patterns for retries, fallbacks, and recovery
  • Testability: Each component is independently testable
  • Performance: Optimized design with minimal allocations

Core Philosophy

pipz follows these principles:

  1. Interface-First Design: Everything implements Chainable[T] - a single, simple interface
  2. Your Code, Your Way: Implement the interface directly or use our convenience wrappers
  3. Composition over Configuration: Build complex behavior by combining simple pieces
  4. Type Safety First: No interface{}, no runtime type assertions
  5. Errors are Values: Explicit error handling at every step
  6. Context Awareness: Full support for cancellation and timeouts

Key Concepts

The Chainable Interface

The foundation of pipz - everything implements this simple interface:

type Chainable[T any] interface {
    Process(context.Context, T) (T, error)
    Identity() Identity
    Schema() Node
    Close() error
}

Any type implementing this interface can be used in a pipeline. This gives you complete flexibility:

  • Implement it directly for custom processors
  • Use the provided wrapper functions for common patterns
  • Mix both approaches in the same pipeline

Processors

The atomic units that transform data. You can create them by:

  1. Direct Implementation: Implement Chainable[T] for full control
  2. Wrapper Functions: Use Transform, Apply, Effect, etc. for convenience

Connectors

Mutable components that combine any Chainable[T] implementations into more complex behaviors:

  • NewSequence: Run processors in order
  • NewSwitch: Route to different processors based on conditions
  • NewConcurrent: Run multiple processors in parallel
  • NewRace: Use the first successful result
  • And many more...

Pipelines

Managed sequences of processors with introspection and modification capabilities.

Use Cases

pipz excels at:

  • ETL (Extract, Transform, Load) pipelines
  • API request/response processing
  • Event stream processing
  • Payment processing with failover
  • Content moderation pipelines
  • Data validation workflows
  • Microservice orchestration

What Makes pipz Different?

Unlike traditional pipeline libraries, pipz:

  • Uses Go generics for complete type safety
  • Requires no code generation or reflection
  • Has minimal external dependencies (clockz, capitan, uuid)
  • Provides both functional and object-oriented APIs
  • Includes battle-tested patterns (retry, timeout, fallback)
  • Returns rich error context showing the exact failure path
  • Supports both declarative and dynamic pipeline construction
  • Treats errors as data flowing through pipelines - use the same Switch, Concurrent, Sequence patterns for sophisticated error recovery

A Unique Approach to Error Handling

Most frameworks treat errors as exceptions or callbacks. pipz treats them as data that flows through pipelines. This means you can build sophisticated error recovery flows using the exact same tools you use for regular data processing:

// Define identities as package-level variables
var (
    ErrorRecoveryID   = pipz.NewIdentity("error-recovery", "Error recovery pipeline")
    CategorizeID      = pipz.NewIdentity("categorize", "Categorizes errors by type")
    SeverityRouterID  = pipz.NewIdentity("severity-router", "Routes errors by severity")
    ParallelRecoveryID = pipz.NewIdentity("parallel-recovery", "Parallel recovery actions")
    OrderProcessingID = pipz.NewIdentity("order-processing", "Order processing with error handling")
)

// Error recovery pipeline - same tools, same patterns!
errorRecovery := pipz.NewSequence[*pipz.Error[Order]](ErrorRecoveryID,
    pipz.Transform(CategorizeID, categorizeError),
    pipz.NewSwitch(SeverityRouterID, routeBySeverity),
    pipz.NewConcurrent(ParallelRecoveryID, notifyCustomer, updateInventory),
)

// Errors flow through this pipeline automatically
robustPipeline := pipz.NewHandle(OrderProcessingID, mainPipeline, errorRecovery)

This pattern enables type-safe, composable, and testable error handling that scales with your application complexity. See Safety and Reliability for the full power of this approach.

Next Steps