zoobzio December 12, 2025 Edit this page

Architecture

Overview

pipz is designed as a composable data processing library built on a single, uniform interface. The architecture emphasizes type safety, immutability, and clean separation of concerns to enable developers to build maintainable and testable data pipelines.

Core Design Principles

  1. Single Interface Pattern: Everything implements Chainable[T], enabling seamless composition
  2. Type Safety: Leverages Go generics (1.21+) for compile-time type checking
  3. Immutable Processors: Adapter functions are immutable values ensuring thread safety
  4. Mutable Connectors: Container types that manage state and configuration
  5. Fail-Fast Execution: Processing stops at the first error, simplifying error handling
  6. Context-Aware: All operations support context for cancellation and timeout control

System Architecture

┌──────────────────────────────────────────────────────────────┐
│                         Application                          │
└───────────────────────┬──────────────────────────────────────┘
                        │
                        ▼
┌──────────────────────────────────────────────────────────────┐
│                    Chainable[T] Interface                    │
│                 Process(ctx, T) → (T, *Error)                │
│                        Name() → Name                         │
└────────────┬─────────────────────────────┬───────────────────┘
             │                             │
             ▼                             ▼
┌────────────────────────────┐ ┌──────────────────────────────┐
│      Processors (Values)   │ │    Connectors (Pointers)     │
├────────────────────────────┤ ├──────────────────────────────┤
│ • Transform - Pure function│ │ • Sequence - Sequential flow │
│ • Apply - Can fail         │ │ • Concurrent - Parallel exec │
│ • Effect - Side effects    │ │ • Race - First success       │
│ • Mutate - Conditional     │ │ • Contest - First meeting    │
│ • Enrich - Optional enhance│ │ • Switch - Conditional branch│
│ • Filter - Pass/block data │ │ • Fallback - Error recovery │
│ • Handle - Error transform │ │ • Retry - Retry on failure   │
│ • Scaffold - Development   │ │ • CircuitBreaker - Fail fast │
│                            │ │ • RateLimiter - Control flow │
│                            │ │ • Timeout - Time boundaries  │
└────────────────────────────┘ └──────────────────────────────┘

Component Relationships

Processors (Adapters)

Processors are lightweight wrappers around user functions that implement the Chainable interface:

type processor[T any] struct {
    name Name
    fn   func(context.Context, T) (T, error)
}

Key characteristics:

  • Immutable: Once created, cannot be modified
  • Stateless: No internal state, pure function wrappers
  • Thread-Safe: Can be used concurrently without synchronization
  • Composable: Can be combined using connectors

Connectors (Composition)

Connectors manage the composition and execution flow of multiple Chainables:

type connector[T any] struct {
    name       Name
    processors []Chainable[T]
    // Additional state (mutex, config, etc.)
}

Key characteristics:

  • Mutable: Can be modified at runtime (add/remove processors)
  • Stateful: May maintain internal state (circuit breaker state, rate limits)
  • Configurable: Support runtime configuration changes
  • Orchestrators: Control execution flow and error handling

Data Flow Architecture

Sequential Processing

Input → [Processor 1] → [Processor 2] → [Processor 3] → Output
         ↓ error          ↓ error          ↓ error
         Return           Return           Return

The Sequence connector processes data through each step sequentially, stopping at the first error.

Parallel Processing

        ┌→ [Processor 1] →┐
Input →─┼→ [Processor 2] →┼→ Aggregation → Output
        └→ [Processor 3] →┘

Parallel connectors (Concurrent, Race, Contest) require T to implement Cloner[T] for safe concurrent processing.

Conditional Processing

         ┌─[condition]─→ [Branch A] →┐
Input →─┤                            ├→ Output
         └─[default]──→ [Branch B] →┘

The Switch connector routes data based on conditions, similar to a switch statement.

Execution Flow Diagram

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Context   │────→│  Pipeline   │────→│  Processor  │
└─────────────┘     └─────────────┘     └─────────────┘
      │                    │                    │
      │                    ▼                    ▼
      │             ┌─────────────┐     ┌─────────────┐
      └────────────→│   Router    │────→│  Transform  │
                    └─────────────┘     └─────────────┘
                           │                    │
                           ▼                    ▼
                    ┌─────────────┐     ┌─────────────┐
                    │Error Handler│←────│   Result    │
                    └─────────────┘     └─────────────┘

Error Handling Architecture

Error Type Hierarchy

type Error[T any] struct {
    Path      []Name        // Full path through pipeline where error occurred
    Err       error         // The underlying error
    InputData T             // Data state at error time
    Timestamp time.Time     // When the error occurred
    Duration  time.Duration // How long before failure
    Timeout   bool          // Was it a timeout?
    Canceled  bool          // Was it canceled?
}

Error handling patterns:

  1. Fail-Fast: Default behavior, stop on first error
  2. Recovery: Use Fallback for error recovery with alternate processing
  3. Transformation: Use Handle to transform errors into valid data
  4. Resilience: Use Retry, CircuitBreaker, RateLimiter for fault tolerance

Memory Model

Cloner Interface

For concurrent processing, data must be cloneable:

type Cloner[T any] interface {
    Clone() T
}

This ensures:

  • Thread safety in parallel operations
  • Data isolation between concurrent branches
  • Prevention of race conditions

Context Propagation

Every operation receives a context, enabling:

  • Request-scoped values
  • Cancellation propagation
  • Timeout enforcement
  • Tracing and monitoring integration

Pipeline Execution Context

The Pipeline connector provides semantic execution context for distributed tracing:

┌──────────────────────────────────────────────────────────────┐
│                       Pipeline Wrapper                        │
├──────────────────────────────────────────────────────────────┤
│  Identity: "order-processing"                                │
│  Pipeline ID: 550e8400-e29b-41d4-a716-446655440000 (stable)  │
└───────────────────────┬──────────────────────────────────────┘
                        │
                        ▼ Process(ctx, data)
┌──────────────────────────────────────────────────────────────┐
│                    Context Injection                          │
│  + Execution ID: a8b9c0d1-... (unique per call)              │
│  + Pipeline ID:  550e8400-... (from Identity)                │
└───────────────────────┬──────────────────────────────────────┘
                        │
                        ▼
┌──────────────────────────────────────────────────────────────┐
│                   Root Chainable                              │
│  (Sequence, Concurrent, etc.)                                │
│                                                              │
│  All nested connectors receive enriched context              │
│  Signals emitted include correlation IDs                     │
└──────────────────────────────────────────────────────────────┘

This enables:

  • Request Correlation: Link all signals from a single execution
  • Pipeline Grouping: Aggregate metrics by pipeline identity
  • Distributed Tracing: Propagate IDs to external systems
  • Debug Context: Know which pipeline and execution produced logs

Extension Points

Custom Processors

Create custom processors by wrapping functions:

func CustomProcessor[T any](id Identity, fn func(context.Context, T) (T, error)) Chainable[T] {
    return Apply(id, fn)
}

Custom Connectors

Implement the Chainable interface for custom composition logic:

type CustomConnector[T any] struct {
    identity Identity
    // Your fields
}

func (c *CustomConnector[T]) Process(ctx context.Context, data T) (T, error) {
    // Your logic
}

func (c *CustomConnector[T]) Identity() Identity {
    return c.identity
}

func (c *CustomConnector[T]) Schema() Node {
    return Node{Identity: c.identity, Type: "custom"}
}

func (c *CustomConnector[T]) Close() error {
    return nil
}

Integration Points

Common integration patterns:

  1. HTTP Middleware: Wrap pipelines as HTTP handlers
  2. Message Queue Consumers: Process messages through pipelines
  3. Batch Processing: Use pipelines in batch job frameworks
  4. Stream Processing: Integrate with streaming platforms
  5. Service Mesh: Use as sidecar processing logic

Observability Architecture

Hook System

Pipz integrates with capitan to provide type-safe event hooks for monitoring and debugging. Stateful connectors emit signals at critical decision points:

┌──────────────────────────────────────────────────────────────┐
│                      Application Code                        │
└───────────────────────┬──────────────────────────────────────┘
                        │
                        ▼
┌──────────────────────────────────────────────────────────────┐
│                   Stateful Connectors                        │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐       │
│  │CircuitBreaker│  │ RateLimiter  │  │ WorkerPool   │       │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘       │
└─────────┼──────────────────┼──────────────────┼──────────────┘
          │                  │                  │
          │ capitan.Emit()   │                  │
          ▼                  ▼                  ▼
┌──────────────────────────────────────────────────────────────┐
│                    Capitan Event Bus                         │
│              (Async, Per-Signal Workers)                     │
└───────────────────────┬──────────────────────────────────────┘
                        │
                        ▼
        ┌───────────────┴───────────────┐
        ▼                               ▼
┌──────────────┐              ┌──────────────────┐
│  Observers   │              │  Hook Handlers   │
├──────────────┤              ├──────────────────┤
│ • Metrics    │              │ • Logging        │
│ • Alerting   │              │ • Tracing        │
│ • Debugging  │              │ • Custom Logic   │
└──────────────┘              └──────────────────┘

Signal Emission Points

Signals are emitted at state transitions and decision points:

CircuitBreaker:

  • circuitbreaker.opened - Threshold reached
  • circuitbreaker.closed - Recovery successful
  • circuitbreaker.half-open - Testing recovery
  • circuitbreaker.rejected - Request blocked

RateLimiter:

  • ratelimiter.allowed - Token consumed
  • ratelimiter.throttled - Waiting for token
  • ratelimiter.dropped - Request dropped

WorkerPool:

  • workerpool.saturated - All workers busy
  • workerpool.acquired - Worker slot taken
  • workerpool.released - Worker slot freed

Asynchronous Processing

All events are processed asynchronously via per-signal worker goroutines:

Emit() → Buffered Channel → Worker Goroutine → Handler
  ↓           (size: 16)          ↓               ↓
Returns                        Isolated       Panic Safe
Immediately                   Execution

This architecture ensures:

  • Zero impact on pipeline performance
  • Isolation between different signal types
  • Panic safety with automatic recovery
  • Backpressure via configurable buffers

See Hooks Documentation for detailed usage and examples.

Performance Considerations

Optimization Strategies

  1. Processor Granularity: Balance between too many small processors (overhead) and large monolithic ones (reduced reusability)
  2. Parallel Execution: Use Concurrent for independent operations
  3. Early Filtering: Place Filter processors early to reduce downstream processing
  4. Resource Pooling: Reuse expensive resources across pipeline executions
  5. Context Timeouts: Set appropriate timeouts to prevent hanging operations

Benchmarking Results

The library includes comprehensive benchmarks showing:

  • Minimal overhead for processor wrapping (~2-5ns)
  • Linear scaling for sequential processing
  • Near-linear scaling for parallel processing with proper data isolation

Security Considerations

Input Validation

Always validate input at pipeline boundaries:

// Define identities upfront
var (
    SecureID   = pipz.NewIdentity("secure", "Secure processing pipeline")
    SanitizeID = pipz.NewIdentity("sanitize", "Sanitizes input")
    ValidateID = pipz.NewIdentity("validate", "Validates data")
)

pipeline := pipz.NewSequence(SecureID,
    pipz.Transform(SanitizeID, sanitizeInput),
    pipz.Apply(ValidateID, validateData),
    // ... rest of pipeline
)

Resource Limits

Protect against resource exhaustion:

  • Use Timeout for time boundaries
  • Use RateLimiter for throughput control
  • Use CircuitBreaker for cascading failure prevention

Error Information

Be careful with error details in production:

  • The Error[T] type includes the data state at failure
  • Consider sanitizing sensitive data in error states
  • Use structured logging for audit trails

Future Architecture Considerations

Planned Enhancements

  1. Distributed Execution: Support for distributed pipeline execution
  2. Persistent State: Durable state management for long-running pipelines
  3. Visual Pipeline Builder: Tool for visual pipeline composition
  4. Schema Evolution: Support for data schema versioning

API Stability

The core Chainable[T] interface is stable and will remain backward compatible. New features will be added through:

  • New adapter functions
  • New connector types
  • Optional interfaces for advanced features
  • Configuration options on existing types

Summary

The pipz architecture provides a clean, composable foundation for building data processing pipelines. By adhering to a single interface and clear separation between processors and connectors, it enables developers to build complex data flows from simple, testable components while maintaining type safety and performance.