Architecture
Overview
pipz is designed as a composable data processing library built on a single, uniform interface. The architecture emphasizes type safety, immutability, and clean separation of concerns to enable developers to build maintainable and testable data pipelines.
Core Design Principles
- Single Interface Pattern: Everything implements
Chainable[T], enabling seamless composition - Type Safety: Leverages Go generics (1.21+) for compile-time type checking
- Immutable Processors: Adapter functions are immutable values ensuring thread safety
- Mutable Connectors: Container types that manage state and configuration
- Fail-Fast Execution: Processing stops at the first error, simplifying error handling
- Context-Aware: All operations support context for cancellation and timeout control
System Architecture
┌──────────────────────────────────────────────────────────────┐
│ Application │
└───────────────────────┬──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Chainable[T] Interface │
│ Process(ctx, T) → (T, *Error) │
│ Name() → Name │
└────────────┬─────────────────────────────┬───────────────────┘
│ │
▼ ▼
┌────────────────────────────┐ ┌──────────────────────────────┐
│ Processors (Values) │ │ Connectors (Pointers) │
├────────────────────────────┤ ├──────────────────────────────┤
│ • Transform - Pure function│ │ • Sequence - Sequential flow │
│ • Apply - Can fail │ │ • Concurrent - Parallel exec │
│ • Effect - Side effects │ │ • Race - First success │
│ • Mutate - Conditional │ │ • Contest - First meeting │
│ • Enrich - Optional enhance│ │ • Switch - Conditional branch│
│ • Filter - Pass/block data │ │ • Fallback - Error recovery │
│ • Handle - Error transform │ │ • Retry - Retry on failure │
│ • Scaffold - Development │ │ • CircuitBreaker - Fail fast │
│ │ │ • RateLimiter - Control flow │
│ │ │ • Timeout - Time boundaries │
└────────────────────────────┘ └──────────────────────────────┘
Component Relationships
Processors (Adapters)
Processors are lightweight wrappers around user functions that implement the Chainable interface:
type processor[T any] struct {
name Name
fn func(context.Context, T) (T, error)
}
Key characteristics:
- Immutable: Once created, cannot be modified
- Stateless: No internal state, pure function wrappers
- Thread-Safe: Can be used concurrently without synchronization
- Composable: Can be combined using connectors
Connectors (Composition)
Connectors manage the composition and execution flow of multiple Chainables:
type connector[T any] struct {
name Name
processors []Chainable[T]
// Additional state (mutex, config, etc.)
}
Key characteristics:
- Mutable: Can be modified at runtime (add/remove processors)
- Stateful: May maintain internal state (circuit breaker state, rate limits)
- Configurable: Support runtime configuration changes
- Orchestrators: Control execution flow and error handling
Data Flow Architecture
Sequential Processing
Input → [Processor 1] → [Processor 2] → [Processor 3] → Output
↓ error ↓ error ↓ error
Return Return Return
The Sequence connector processes data through each step sequentially, stopping at the first error.
Parallel Processing
┌→ [Processor 1] →┐
Input →─┼→ [Processor 2] →┼→ Aggregation → Output
└→ [Processor 3] →┘
Parallel connectors (Concurrent, Race, Contest) require T to implement Cloner[T] for safe concurrent processing.
Conditional Processing
┌─[condition]─→ [Branch A] →┐
Input →─┤ ├→ Output
└─[default]──→ [Branch B] →┘
The Switch connector routes data based on conditions, similar to a switch statement.
Execution Flow Diagram
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Context │────→│ Pipeline │────→│ Processor │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ ▼ ▼
│ ┌─────────────┐ ┌─────────────┐
└────────────→│ Router │────→│ Transform │
└─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│Error Handler│←────│ Result │
└─────────────┘ └─────────────┘
Error Handling Architecture
Error Type Hierarchy
type Error[T any] struct {
Path []Name // Full path through pipeline where error occurred
Err error // The underlying error
InputData T // Data state at error time
Timestamp time.Time // When the error occurred
Duration time.Duration // How long before failure
Timeout bool // Was it a timeout?
Canceled bool // Was it canceled?
}
Error handling patterns:
- Fail-Fast: Default behavior, stop on first error
- Recovery: Use
Fallbackfor error recovery with alternate processing - Transformation: Use
Handleto transform errors into valid data - Resilience: Use
Retry,CircuitBreaker,RateLimiterfor fault tolerance
Memory Model
Cloner Interface
For concurrent processing, data must be cloneable:
type Cloner[T any] interface {
Clone() T
}
This ensures:
- Thread safety in parallel operations
- Data isolation between concurrent branches
- Prevention of race conditions
Context Propagation
Every operation receives a context, enabling:
- Request-scoped values
- Cancellation propagation
- Timeout enforcement
- Tracing and monitoring integration
Pipeline Execution Context
The Pipeline connector provides semantic execution context for distributed tracing:
┌──────────────────────────────────────────────────────────────┐
│ Pipeline Wrapper │
├──────────────────────────────────────────────────────────────┤
│ Identity: "order-processing" │
│ Pipeline ID: 550e8400-e29b-41d4-a716-446655440000 (stable) │
└───────────────────────┬──────────────────────────────────────┘
│
▼ Process(ctx, data)
┌──────────────────────────────────────────────────────────────┐
│ Context Injection │
│ + Execution ID: a8b9c0d1-... (unique per call) │
│ + Pipeline ID: 550e8400-... (from Identity) │
└───────────────────────┬──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Root Chainable │
│ (Sequence, Concurrent, etc.) │
│ │
│ All nested connectors receive enriched context │
│ Signals emitted include correlation IDs │
└──────────────────────────────────────────────────────────────┘
This enables:
- Request Correlation: Link all signals from a single execution
- Pipeline Grouping: Aggregate metrics by pipeline identity
- Distributed Tracing: Propagate IDs to external systems
- Debug Context: Know which pipeline and execution produced logs
Extension Points
Custom Processors
Create custom processors by wrapping functions:
func CustomProcessor[T any](id Identity, fn func(context.Context, T) (T, error)) Chainable[T] {
return Apply(id, fn)
}
Custom Connectors
Implement the Chainable interface for custom composition logic:
type CustomConnector[T any] struct {
identity Identity
// Your fields
}
func (c *CustomConnector[T]) Process(ctx context.Context, data T) (T, error) {
// Your logic
}
func (c *CustomConnector[T]) Identity() Identity {
return c.identity
}
func (c *CustomConnector[T]) Schema() Node {
return Node{Identity: c.identity, Type: "custom"}
}
func (c *CustomConnector[T]) Close() error {
return nil
}
Integration Points
Common integration patterns:
- HTTP Middleware: Wrap pipelines as HTTP handlers
- Message Queue Consumers: Process messages through pipelines
- Batch Processing: Use pipelines in batch job frameworks
- Stream Processing: Integrate with streaming platforms
- Service Mesh: Use as sidecar processing logic
Observability Architecture
Hook System
Pipz integrates with capitan to provide type-safe event hooks for monitoring and debugging. Stateful connectors emit signals at critical decision points:
┌──────────────────────────────────────────────────────────────┐
│ Application Code │
└───────────────────────┬──────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Stateful Connectors │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │CircuitBreaker│ │ RateLimiter │ │ WorkerPool │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼──────────────┘
│ │ │
│ capitan.Emit() │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────────────────┐
│ Capitan Event Bus │
│ (Async, Per-Signal Workers) │
└───────────────────────┬──────────────────────────────────────┘
│
▼
┌───────────────┴───────────────┐
▼ ▼
┌──────────────┐ ┌──────────────────┐
│ Observers │ │ Hook Handlers │
├──────────────┤ ├──────────────────┤
│ • Metrics │ │ • Logging │
│ • Alerting │ │ • Tracing │
│ • Debugging │ │ • Custom Logic │
└──────────────┘ └──────────────────┘
Signal Emission Points
Signals are emitted at state transitions and decision points:
CircuitBreaker:
circuitbreaker.opened- Threshold reachedcircuitbreaker.closed- Recovery successfulcircuitbreaker.half-open- Testing recoverycircuitbreaker.rejected- Request blocked
RateLimiter:
ratelimiter.allowed- Token consumedratelimiter.throttled- Waiting for tokenratelimiter.dropped- Request dropped
WorkerPool:
workerpool.saturated- All workers busyworkerpool.acquired- Worker slot takenworkerpool.released- Worker slot freed
Asynchronous Processing
All events are processed asynchronously via per-signal worker goroutines:
Emit() → Buffered Channel → Worker Goroutine → Handler
↓ (size: 16) ↓ ↓
Returns Isolated Panic Safe
Immediately Execution
This architecture ensures:
- Zero impact on pipeline performance
- Isolation between different signal types
- Panic safety with automatic recovery
- Backpressure via configurable buffers
See Hooks Documentation for detailed usage and examples.
Performance Considerations
Optimization Strategies
- Processor Granularity: Balance between too many small processors (overhead) and large monolithic ones (reduced reusability)
- Parallel Execution: Use
Concurrentfor independent operations - Early Filtering: Place
Filterprocessors early to reduce downstream processing - Resource Pooling: Reuse expensive resources across pipeline executions
- Context Timeouts: Set appropriate timeouts to prevent hanging operations
Benchmarking Results
The library includes comprehensive benchmarks showing:
- Minimal overhead for processor wrapping (~2-5ns)
- Linear scaling for sequential processing
- Near-linear scaling for parallel processing with proper data isolation
Security Considerations
Input Validation
Always validate input at pipeline boundaries:
// Define identities upfront
var (
SecureID = pipz.NewIdentity("secure", "Secure processing pipeline")
SanitizeID = pipz.NewIdentity("sanitize", "Sanitizes input")
ValidateID = pipz.NewIdentity("validate", "Validates data")
)
pipeline := pipz.NewSequence(SecureID,
pipz.Transform(SanitizeID, sanitizeInput),
pipz.Apply(ValidateID, validateData),
// ... rest of pipeline
)
Resource Limits
Protect against resource exhaustion:
- Use
Timeoutfor time boundaries - Use
RateLimiterfor throughput control - Use
CircuitBreakerfor cascading failure prevention
Error Information
Be careful with error details in production:
- The
Error[T]type includes the data state at failure - Consider sanitizing sensitive data in error states
- Use structured logging for audit trails
Future Architecture Considerations
Planned Enhancements
- Distributed Execution: Support for distributed pipeline execution
- Persistent State: Durable state management for long-running pipelines
- Visual Pipeline Builder: Tool for visual pipeline composition
- Schema Evolution: Support for data schema versioning
API Stability
The core Chainable[T] interface is stable and will remain backward compatible. New features will be added through:
- New adapter functions
- New connector types
- Optional interfaces for advanced features
- Configuration options on existing types
Summary
The pipz architecture provides a clean, composable foundation for building data processing pipelines. By adhering to a single interface and clear separation between processors and connectors, it enables developers to build complex data flows from simple, testable components while maintaining type safety and performance.