Testing pipz Pipelines
Comprehensive guide to testing pipelines with the pipz testing utilities, best practices, and three-tier testing strategy.
Table of Contents
- Testing Package Overview
- MockProcessor - Testing Pipeline Behavior
- ChaosProcessor - Resilience Testing
- Assertion Helpers
- Testing Time-Dependent Components
- Test Organization Strategy
- Testing Best Practices
- Common Testing Patterns
- Testing Gotchas
Testing Package Overview
The github.com/zoobzio/pipz/testing package provides comprehensive utilities for testing pipz pipelines:
import pipztesting "github.com/zoobzio/pipz/testing"
Core Testing Components
- MockProcessor: Configurable mock implementation for testing pipeline behavior
- ChaosProcessor: Chaos engineering tool for resilience testing
- Assertion Helpers: Utilities for verifying processor calls and behaviors
- Helper Functions: Timing, parallelization, and synchronization utilities
MockProcessor - Testing Pipeline Behavior
MockProcessor provides a fully configurable mock implementation of pipz.Chainable[T] with call tracking, configurable return values, delays, and panic simulation.
Basic Mock Usage
func TestPipelineWithMock(t *testing.T) {
// Create a mock processor
mock := pipztesting.NewMockProcessor[string](t, "data-processor")
// Configure return values
mock.WithReturn("processed", nil)
// Define identities upfront
var (
TestPipelineID = pipz.NewIdentity("test-pipeline", "Test pipeline with mock")
PrepareID = pipz.NewIdentity("prepare", "Prepare input")
FinalizeID = pipz.NewIdentity("finalize", "Finalize output")
)
// Build pipeline with mock
pipeline := pipz.NewSequence[string](TestPipelineID,
pipz.Transform(PrepareID, strings.ToUpper),
mock,
pipz.Transform(FinalizeID, strings.TrimSpace),
)
// Process data
result, err := pipeline.Process(context.Background(), " input ")
// Verify results
require.NoError(t, err)
assert.Equal(t, "processed", result)
// Verify mock was called
pipztesting.AssertProcessed(t, mock, 1)
pipztesting.AssertProcessedWith(t, mock, "INPUT")
}
Testing Error Paths
func TestErrorHandling(t *testing.T) {
// Create mock that returns error
mock := pipztesting.NewMockProcessor[Order](t, "payment-processor")
mock.WithReturn(Order{}, errors.New("payment declined"))
// Define identities upfront
var (
OrderPipelineID = pipz.NewIdentity("order-pipeline", "Order processing with error handling")
ProcessID = pipz.NewIdentity("process", "Order processing sequence")
ErrorRecoveryID = pipz.NewIdentity("error-recovery", "Handle order processing errors")
)
// Build pipeline with error handling
pipeline := pipz.NewHandle[Order](OrderPipelineID,
pipz.NewSequence[Order](ProcessID,
validateOrder,
mock, // Will fail here
shipOrder,
),
pipz.Transform(ErrorRecoveryID, func(ctx context.Context, err *pipz.Error[Order]) *pipz.Error[Order] {
// Log error and mark order as failed
err.InputData.Status = "payment_failed"
return err
}),
)
order := Order{ID: "123", Amount: 99.99}
_, err := pipeline.Process(context.Background(), order)
// Verify error occurred
require.Error(t, err)
// Verify shipOrder was never called (pipeline stopped at error)
var pipeErr *pipz.Error[Order]
require.True(t, errors.As(err, &pipeErr))
assert.Equal(t, pipz.Name("payment-processor"), pipeErr.Path[len(pipeErr.Path)-1])
assert.Equal(t, "payment_failed", pipeErr.InputData.Status)
}
Testing Delays and Timeouts
func TestTimeoutBehavior(t *testing.T) {
// Create mock with 200ms delay
slowMock := pipztesting.NewMockProcessor[string](t, "slow-service")
slowMock.WithReturn("result", nil).WithDelay(200 * time.Millisecond)
// Define identity upfront
var FastTimeoutID = pipz.NewIdentity("fast-timeout", "Quick timeout wrapper")
// Wrap with timeout
pipeline := pipz.NewTimeout[string](FastTimeoutID,
slowMock,
100*time.Millisecond, // Timeout before mock completes
)
// Process should timeout
_, err := pipeline.Process(context.Background(), "data")
// Verify timeout occurred
require.Error(t, err)
assert.True(t, errors.Is(err, context.DeadlineExceeded))
// Mock should still have been called
pipztesting.AssertProcessed(t, slowMock, 1)
}
Testing Panic Recovery
func TestPanicRecovery(t *testing.T) {
// Create mock that panics
panicMock := pipztesting.NewMockProcessor[string](t, "unstable-service")
panicMock.WithPanic("database connection lost")
// Define identities upfront
var (
SafePipelineID = pipz.NewIdentity("safe-pipeline", "Pipeline with panic recovery")
RecoverID = pipz.NewIdentity("recover", "Recover from panics")
)
// Build pipeline with panic recovery
pipeline := pipz.NewHandle[string](SafePipelineID,
panicMock,
pipz.Transform(RecoverID, func(ctx context.Context, err *pipz.Error[string]) *pipz.Error[string] {
// Check if it was a panic by examining the error
if strings.Contains(err.Err.Error(), "panic") {
// Log recovered panic
log.Printf("recovered from panic: %v", err.Err)
}
return err
}),
)
// Should recover from panic
_, err := pipeline.Process(context.Background(), "test")
// Verify error but not panic
require.Error(t, err)
assert.Contains(t, err.Error(), "recovered from panic")
}
Call History Tracking
func TestCallHistory(t *testing.T) {
mock := pipztesting.NewMockProcessor[int](t, "accumulator")
mock.WithReturn(0, nil).WithHistorySize(10) // Keep last 10 calls
// Process multiple values
for i := 0; i < 5; i++ {
mock.Process(context.Background(), i)
}
// Examine call history
history := mock.CallHistory()
require.Len(t, history, 5)
// Verify call order and timing
for i, call := range history {
assert.Equal(t, i, call.Input)
if i > 0 {
assert.True(t, call.Timestamp.After(history[i-1].Timestamp))
}
}
}
ChaosProcessor - Resilience Testing
ChaosProcessor enables chaos engineering for pipelines by randomly injecting failures, delays, timeouts, and panics. This helps verify resilience patterns work correctly under adverse conditions.
Basic Chaos Testing
func TestPipelineResilience(t *testing.T) {
// Define identities upfront
var (
ProcessID = pipz.NewIdentity("process", "Data processing transform")
ResilientID = pipz.NewIdentity("resilient", "Resilient retry wrapper")
RecoverID = pipz.NewIdentity("recover", "Error recovery handler")
HandleErrorID = pipz.NewIdentity("handle-error", "Handle processing errors")
)
// Wrap a normal processor with chaos
normalProcessor := pipz.Transform(ProcessID, func(ctx context.Context, data string) string {
return data + "_processed"
})
// Configure chaos
chaosConfig := pipztesting.ChaosConfig{
FailureRate: 0.2, // 20% failure rate
LatencyMin: 10 * time.Millisecond,
LatencyMax: 50 * time.Millisecond,
TimeoutRate: 0.1, // 10% timeout rate
PanicRate: 0.05, // 5% panic rate
Seed: 42, // Reproducible chaos
}
chaos := pipztesting.NewChaosProcessor("chaos-test", normalProcessor, chaosConfig)
// Build resilient pipeline
pipeline := pipz.NewRetry[string](ResilientID,
pipz.NewHandle[string](RecoverID,
chaos,
pipz.Transform(HandleErrorID, func(ctx context.Context, err *pipz.Error[string]) *pipz.Error[string] {
// Log and recover from errors
return err
}),
),
3, // Retry up to 3 times
)
// Run many iterations to trigger chaos
successCount := 0
failureCount := 0
for i := 0; i < 100; i++ {
result, err := pipeline.Process(context.Background(), fmt.Sprintf("request_%d", i))
if err == nil {
successCount++
assert.Contains(t, result, "_processed")
} else {
failureCount++
}
}
// With retries, success rate should be higher than failure rate
stats := chaos.Stats()
t.Logf("Chaos Stats: %s", stats)
t.Logf("Pipeline Success Rate: %.1f%%", float64(successCount)/100*100)
// Verify chaos was actually injected
assert.Greater(t, stats.FailedCalls+stats.TimeoutCalls+stats.PanicCalls, int64(0))
}
Testing Circuit Breaker with Chaos
func TestCircuitBreakerUnderChaos(t *testing.T) {
// Define identities upfront
var (
ExternalAPIID = pipz.NewIdentity("external-api", "External API call")
APIBreakerID = pipz.NewIdentity("api-breaker", "Circuit breaker for API")
)
// Create service with intermittent failures
service := pipz.Apply(ExternalAPIID, func(ctx context.Context, req Request) (Response, error) {
// Actual API call
return callAPI(req)
})
// Add chaos to simulate network issues
chaosService := pipztesting.NewChaosProcessor("chaos-api", service,
pipztesting.ChaosConfig{
FailureRate: 0.3, // 30% failures
TimeoutRate: 0.2, // 20% timeouts
LatencyMin: 50 * time.Millisecond,
LatencyMax: 200 * time.Millisecond,
},
)
// Wrap with circuit breaker
circuitBreaker := pipz.NewCircuitBreaker[Request](APIBreakerID,
chaosService,
10, // Open after 10 failures
30*time.Second, // Recovery time
)
// Test that circuit breaker opens under chaos
var openedAt time.Time
failuresSinceOpen := 0
for i := 0; i < 50; i++ {
_, err := circuitBreaker.Process(context.Background(), Request{ID: i})
var pipeErr *pipz.Error[Request]
if errors.As(err, &pipeErr) && pipeErr.Path[0] == "api-breaker" {
if openedAt.IsZero() {
openedAt = time.Now()
t.Logf("Circuit opened after %d requests", i)
}
failuresSinceOpen++
}
}
// Verify circuit breaker opened
assert.False(t, openedAt.IsZero(), "Circuit breaker should have opened")
// Verify fast failures after opening
assert.Greater(t, failuresSinceOpen, 10, "Should fail fast when open")
// Check chaos statistics
stats := chaosService.Stats()
t.Logf("Chaos injected: %d failures, %d timeouts out of %d calls",
stats.FailedCalls, stats.TimeoutCalls, stats.TotalCalls)
}
Load Testing with Chaos
func TestLoadWithChaos(t *testing.T) {
// Define identities upfront
var (
CounterID = pipz.NewIdentity("counter", "Count processed items")
LoadTestID = pipz.NewIdentity("load-test", "Load testing pipeline")
ThrottleID = pipz.NewIdentity("throttle", "Rate limit requests")
RetryID = pipz.NewIdentity("retry", "Retry on failure")
)
// Create processor that tracks throughput
var processed atomic.Int64
processor := pipz.Effect(CounterID, func(ctx context.Context, data int) error {
processed.Add(1)
return nil
})
// Add variable chaos
chaos := pipztesting.NewChaosProcessor("variable-chaos", processor,
pipztesting.ChaosConfig{
FailureRate: 0.1,
LatencyMin: 1 * time.Millisecond,
LatencyMax: 10 * time.Millisecond,
TimeoutRate: 0.05,
},
)
// Build pipeline with rate limiting and retries
pipeline := pipz.NewSequence[int](LoadTestID,
pipz.NewRateLimiter[int](ThrottleID, 100, 10), // 100 req/s, burst 10
pipz.NewRetry[int](RetryID, chaos, 2),
)
// Generate load
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
var wg sync.WaitGroup
for i := 0; i < 10; i++ { // 10 concurrent workers
wg.Add(1)
go func(worker int) {
defer wg.Done()
for j := 0; ctx.Err() == nil; j++ {
pipeline.Process(ctx, worker*1000+j)
}
}(i)
}
wg.Wait()
// Analyze results
totalProcessed := processed.Load()
stats := chaos.Stats()
t.Logf("Processed: %d requests in 5 seconds", totalProcessed)
t.Logf("Chaos Stats: %s", stats)
t.Logf("Effective throughput: %.1f req/s", float64(totalProcessed)/5)
// Verify system remained stable under chaos
assert.Greater(t, totalProcessed, int64(100), "Should process reasonable amount despite chaos")
}
Assertion Helpers
The testing package provides specialized assertions for verifying processor behavior:
Basic Assertions
func TestProcessorAssertions(t *testing.T) {
mock := pipztesting.NewMockProcessor[string](t, "test-processor")
mock.WithReturn("result", nil)
// Define identity upfront
var PipelineID = pipz.NewIdentity("pipeline", "Test pipeline")
pipeline := pipz.NewSequence[string](PipelineID, mock)
// Process some data
pipeline.Process(context.Background(), "input1")
pipeline.Process(context.Background(), "input2")
// Assert exact call count
pipztesting.AssertProcessed(t, mock, 2)
// Assert last input
pipztesting.AssertProcessedWith(t, mock, "input2")
// Assert call count range
pipztesting.AssertProcessedBetween(t, mock, 1, 3)
// Reset and verify no calls
mock.Reset()
pipztesting.AssertNotProcessed(t, mock)
}
Waiting for Async Operations
func TestAsyncProcessing(t *testing.T) {
mock := pipztesting.NewMockProcessor[string](t, "async-processor")
mock.WithReturn("done", nil)
// Process asynchronously
go func() {
time.Sleep(100 * time.Millisecond)
mock.Process(context.Background(), "async-data")
}()
// Wait for processor to be called
success := pipztesting.WaitForCalls(mock, 1, 500*time.Millisecond)
require.True(t, success, "Mock should have been called within timeout")
// Verify the call
pipztesting.AssertProcessedWith(t, mock, "async-data")
}
Parallel Testing Helper
func TestConcurrentSafety(t *testing.T) {
// Define identities upfront
var (
CounterID = pipz.NewIdentity("counter", "Count processing calls")
ParallelID = pipz.NewIdentity("parallel", "Parallel execution")
)
var counter atomic.Int64
processor := pipz.Effect(CounterID, func(ctx context.Context, n int) error {
counter.Add(1)
return nil
})
pipeline := pipz.NewConcurrent[int](ParallelID,
processor,
processor,
processor,
)
// Run parallel test
pipztesting.ParallelTest(t, 10, func(workerID int) {
for i := 0; i < 100; i++ {
pipeline.Process(context.Background(), workerID*1000+i)
}
})
// Each of 10 workers processed 100 items through 3 processors
expected := int64(10 * 100 * 3)
assert.Equal(t, expected, counter.Load())
}
Latency Measurement
func TestProcessorPerformance(t *testing.T) {
// Define identity upfront
var SlowID = pipz.NewIdentity("slow", "Slow transform for testing")
processor := pipz.Transform(SlowID, func(ctx context.Context, n int) int {
time.Sleep(10 * time.Millisecond)
return n * 2
})
// Measure latency
latency := pipztesting.MeasureLatency(func() {
processor.Process(context.Background(), 42)
})
assert.GreaterOrEqual(t, latency, 10*time.Millisecond)
assert.Less(t, latency, 20*time.Millisecond)
// Measure with result
result, duration := pipztesting.MeasureLatencyWithResult(func() int {
res, _ := processor.Process(context.Background(), 21)
return res
})
assert.Equal(t, 42, result)
assert.GreaterOrEqual(t, duration, 10*time.Millisecond)
}
Testing Time-Dependent Components
For connectors that depend on time (Timeout, Backoff, CircuitBreaker, RateLimiter, WorkerPool), use clockz for deterministic testing:
func TestTimeoutWithFakeClock(t *testing.T) {
clock := clockz.NewFakeClock()
// Define identity upfront
var TestTimeoutID = pipz.NewIdentity("test", "Timeout with fake clock")
timeout := pipz.NewTimeout(TestTimeoutID, processor, 5*time.Second).
WithClock(clock)
// Start processing in background
go timeout.Process(ctx, data)
// Advance clock to trigger timeout
clock.Advance(6 * time.Second)
// Verify timeout behavior
}
For detailed clockz usage, see: https://github.com/zoobzio/clockz
Test Organization Strategy
Pipz follows a three-tier testing strategy that separates concerns and enables comprehensive validation:
1. Unit Tests (Package-Level)
Located alongside source code, testing individual processors and connectors in isolation.
pipz/
├── processor_test.go # Tests individual processors
├── connector_test.go # Tests individual connectors
└── error_test.go # Tests error handling
Example Unit Test:
// processor_test.go
func TestTransformProcessor(t *testing.T) {
// Define identity upfront
var DoubleID = NewIdentity("double", "Double the input value")
processor := Transform(DoubleID, func(ctx context.Context, n int) int {
return n * 2
})
result, err := processor.Process(context.Background(), 21)
require.NoError(t, err)
assert.Equal(t, 42, result)
}
2. Integration Tests
Located in testing/integration/, testing complete pipelines and real-world scenarios.
testing/integration/
├── pipeline_flows_test.go # End-to-end pipeline tests
├── resilience_patterns_test.go # Circuit breakers, retries, fallbacks
└── real_world_test.go # Business scenario tests
Example Integration Test:
// testing/integration/resilience_patterns_test.go
func TestCircuitBreakerWithRetry(t *testing.T) {
// Define identities upfront
var (
FlakyID = pipz.NewIdentity("flaky", "Flaky service that fails initially")
BreakerID = pipz.NewIdentity("breaker", "Circuit breaker for flaky service")
RetryID = pipz.NewIdentity("retry", "Retry wrapper")
)
var callCount int64
// Flaky service that fails initially
flakyService := pipz.Apply(FlakyID, func(ctx context.Context, data string) (string, error) {
count := atomic.AddInt64(&callCount, 1)
if count <= 3 {
return "", errors.New("service unavailable")
}
return data + "_processed", nil
})
// Build resilient pipeline
pipeline := pipz.NewCircuitBreaker[string](BreakerID,
pipz.NewRetry[string](RetryID, flakyService, 3),
5, // Threshold
time.Second, // Recovery
)
// Should succeed after retries
result, err := pipeline.Process(context.Background(), "test")
require.NoError(t, err)
assert.Equal(t, "test_processed", result)
assert.Equal(t, int64(4), callCount) // 3 failures + 1 success
}
3. Benchmarks
Located in testing/benchmarks/, measuring performance and comparing implementations.
testing/benchmarks/
├── core_performance_test.go # Individual processor benchmarks
├── composition_performance_test.go # Pipeline composition benchmarks
└── comparison_test.go # Comparative benchmarks
Example Benchmark:
// testing/benchmarks/core_performance_test.go
func BenchmarkTransformProcessor(b *testing.B) {
// Define identity upfront
var DoubleID = pipz.NewIdentity("double", "Double the input value")
processor := pipz.Transform(DoubleID, func(_ context.Context, n int) int {
return n * 2
})
ctx := context.Background()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
result, _ := processor.Process(ctx, 42)
_ = result // Prevent optimization
}
}
Testing Best Practices
1. Test Data Isolation with Clone()
Always implement proper Clone() for concurrent testing:
type TestData struct {
ID string
Values []int
Meta map[string]any
}
// Proper deep clone implementation
func (d TestData) Clone() TestData {
values := make([]int, len(d.Values))
copy(values, d.Values)
meta := make(map[string]any, len(d.Meta))
for k, v := range d.Meta {
meta[k] = v
}
return TestData{
ID: d.ID,
Values: values,
Meta: meta,
}
}
func TestConcurrentIsolation(t *testing.T) {
// Define identities upfront
var (
ParallelID = pipz.NewIdentity("parallel", "Parallel execution")
Modify1ID = pipz.NewIdentity("modify1", "Modify values slice")
Modify2ID = pipz.NewIdentity("modify2", "Modify metadata map")
)
data := TestData{
ID: "test",
Values: []int{1, 2, 3},
Meta: map[string]any{"key": "value"},
}
// Concurrent processors should not affect each other
concurrent := pipz.NewConcurrent[TestData](ParallelID,
pipz.Effect(Modify1ID, func(ctx context.Context, d TestData) error {
d.Values[0] = 999
return nil
}),
pipz.Effect(Modify2ID, func(ctx context.Context, d TestData) error {
d.Meta["new"] = "data"
return nil
}),
)
original := data.Clone()
concurrent.Process(context.Background(), data)
// Original must be unchanged
assert.Equal(t, original, data)
}
2. Stateful Connector Testing
Stateful connectors (RateLimiter, CircuitBreaker) must be singletons:
func TestStatefulConnectorSharing(t *testing.T) {
// Define identity upfront
var APILimiterID = pipz.NewIdentity("api", "API rate limiter")
// CORRECT: Shared instance maintains state
rateLimiter := pipz.NewRateLimiter[string](APILimiterID, 2, 1) // 2 req/s
// Multiple goroutines share the same limiter
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
rateLimiter.Process(context.Background(), fmt.Sprintf("req_%d", id))
}(i)
}
start := time.Now()
wg.Wait()
elapsed := time.Since(start)
// Should take ~2 seconds for 5 requests at 2 req/s
assert.GreaterOrEqual(t, elapsed, 2*time.Second)
}
3. Error Path Testing
Test both success and failure paths:
func TestCompleteErrorCoverage(t *testing.T) {
tests := []struct {
name string
input Order
shouldFail bool
failureStage string
}{
{
name: "valid_order",
input: Order{ID: "123", Amount: 99.99},
shouldFail: false,
},
{
name: "invalid_amount",
input: Order{ID: "456", Amount: -10},
shouldFail: true,
failureStage: "validate",
},
{
name: "payment_failure",
input: Order{ID: "789", Amount: 99999}, // Triggers payment failure
shouldFail: true,
failureStage: "payment",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pipeline := buildOrderPipeline()
_, err := pipeline.Process(context.Background(), tt.input)
if tt.shouldFail {
require.Error(t, err)
var pipeErr *pipz.Error[Order]
require.True(t, errors.As(err, &pipeErr))
assert.Equal(t, pipz.Name(tt.failureStage), pipeErr.Path[len(pipeErr.Path)-1])
} else {
require.NoError(t, err)
}
})
}
}
4. Context Cancellation Testing
Ensure processors respect context:
func TestContextPropagation(t *testing.T) {
// Define identity upfront
var BlockerID = pipz.NewIdentity("blocker", "Blocks until context cancelled")
// Create a processor that blocks until context is cancelled
blockingProcessor := pipz.Apply(BlockerID, func(ctx context.Context, data string) (string, error) {
<-ctx.Done()
return "", ctx.Err()
})
ctx, cancel := context.WithCancel(context.Background())
// Start processing in background
done := make(chan error, 1)
go func() {
_, err := blockingProcessor.Process(ctx, "test")
done <- err
}()
// Give it time to start
time.Sleep(10 * time.Millisecond)
// Cancel context
cancel()
// Should complete quickly with cancellation error
select {
case err := <-done:
assert.ErrorIs(t, err, context.Canceled)
case <-time.After(100 * time.Millisecond):
t.Fatal("Processor did not respect context cancellation")
}
}
5. Table-Driven Tests
Use table-driven tests for comprehensive coverage:
func TestPipelineVariations(t *testing.T) {
tests := []struct {
name string
pipeline func() pipz.Chainable[int]
input int
expected int
wantErr bool
}{
{
name: "simple_transform",
pipeline: func() pipz.Chainable[int] {
var DoubleID = pipz.NewIdentity("double", "Double value")
return pipz.Transform(DoubleID, func(_ context.Context, n int) int {
return n * 2
})
},
input: 5,
expected: 10,
},
{
name: "sequence_of_transforms",
pipeline: func() pipz.Chainable[int] {
var (
MathID = pipz.NewIdentity("math", "Math operations")
DoubleID = pipz.NewIdentity("double", "Double value")
Add10ID = pipz.NewIdentity("add10", "Add 10 to value")
)
return pipz.NewSequence[int](MathID,
pipz.Transform(DoubleID, func(_ context.Context, n int) int { return n * 2 }),
pipz.Transform(Add10ID, func(_ context.Context, n int) int { return n + 10 }),
)
},
input: 5,
expected: 20, // (5 * 2) + 10
},
{
name: "with_validation",
pipeline: func() pipz.Chainable[int] {
var ValidateID = pipz.NewIdentity("validate", "Validate non-negative")
return pipz.Apply(ValidateID, func(_ context.Context, n int) (int, error) {
if n < 0 {
return 0, errors.New("negative not allowed")
}
return n, nil
})
},
input: -5,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pipeline := tt.pipeline()
result, err := pipeline.Process(context.Background(), tt.input)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tt.expected, result)
}
})
}
}
Common Testing Patterns
Testing Dynamic Pipeline Modification
func TestDynamicPipelineModification(t *testing.T) {
// Define identities upfront so we can reference them for lookup
var (
DynamicID = pipz.NewIdentity("dynamic", "Dynamic test pipeline")
Step1ID = pipz.NewIdentity("step1", "Uppercase transform")
Step2ID = pipz.NewIdentity("step2", "Add exclamation")
Step15ID = pipz.NewIdentity("step1.5", "Wrap in brackets")
)
// Start with basic pipeline
seq := pipz.NewSequence[string](DynamicID)
seq.Register(
pipz.Transform(Step1ID, strings.ToUpper),
)
// Test initial configuration
result, _ := seq.Process(context.Background(), "hello")
assert.Equal(t, "HELLO", result)
// Add processor at runtime
seq.Push(pipz.Transform(Step2ID, func(_ context.Context, s string) string {
return s + "!"
}))
// Test modified pipeline
result, _ = seq.Process(context.Background(), "hello")
assert.Equal(t, "HELLO!", result)
// Insert processor in middle (use same Identity for lookup)
seq.After(Step1ID, pipz.Transform(Step15ID, func(_ context.Context, s string) string {
return "[" + s + "]"
}))
// Test final configuration
result, _ = seq.Process(context.Background(), "hello")
assert.Equal(t, "[HELLO]!", result)
}
Testing Pipeline Composition
func TestPipelineComposition(t *testing.T) {
// Define identities upfront
var (
ValidationID = pipz.NewIdentity("validation", "Validate user data")
ValidateEmailID = pipz.NewIdentity("validate-email", "Validate email format")
ValidateAgeID = pipz.NewIdentity("validate-age", "Validate age range")
EnrichmentID = pipz.NewIdentity("enrichment", "Enrich user data")
AddMetadataID = pipz.NewIdentity("add-metadata", "Add user metadata")
CalculateScoreID = pipz.NewIdentity("calculate-score", "Calculate user score")
UserProcessingID = pipz.NewIdentity("user-processing", "Full user processing pipeline")
SaveID = pipz.NewIdentity("save", "Save user to database")
)
// Build reusable sub-pipelines
validation := pipz.NewSequence[User](ValidationID,
pipz.Apply(ValidateEmailID, validateEmail),
pipz.Apply(ValidateAgeID, validateAge),
)
enrichment := pipz.NewSequence[User](EnrichmentID,
pipz.Transform(AddMetadataID, addMetadata),
pipz.Transform(CalculateScoreID, calculateScore),
)
// Compose into larger pipeline
fullPipeline := pipz.NewSequence[User](UserProcessingID,
validation,
enrichment,
pipz.Effect(SaveID, saveUser),
)
// Test composed pipeline
user := User{Email: "test@example.com", Age: 25}
result, err := fullPipeline.Process(context.Background(), user)
require.NoError(t, err)
assert.NotEmpty(t, result.Metadata)
assert.Greater(t, result.Score, 0)
}
Testing Switch Routing
func TestSwitchRouting(t *testing.T) {
// Define identities upfront
var (
RouterID = pipz.NewIdentity("request-router", "Route requests by type")
HandleQueryID = pipz.NewIdentity("handle-query", "Handle query requests")
HandleCommandID = pipz.NewIdentity("handle-command", "Handle command requests")
HandleEventID = pipz.NewIdentity("handle-event", "Handle event requests")
HandleUnknownID = pipz.NewIdentity("handle-unknown", "Handle unknown requests")
)
// Create router that processes based on type
router := pipz.NewSwitch[Request](RouterID,
func(_ context.Context, req Request) string {
return req.Type
},
).
AddRoute("query", pipz.Transform(HandleQueryID, handleQuery)).
AddRoute("command", pipz.Apply(HandleCommandID, handleCommand)).
AddRoute("event", pipz.Effect(HandleEventID, handleEvent)).
Default(pipz.Transform(HandleUnknownID, handleUnknown))
tests := []struct {
reqType string
expected string
}{
{"query", "query_result"},
{"command", "command_result"},
{"event", "event_result"},
{"unknown", "unknown_result"},
}
for _, tt := range tests {
t.Run(tt.reqType, func(t *testing.T) {
req := Request{Type: tt.reqType, Data: "test"}
result, err := router.Process(context.Background(), req)
require.NoError(t, err)
assert.Contains(t, result.Response, tt.expected)
})
}
}
Testing Gotchas
❌ Creating Connectors Per Request
// WRONG - New rate limiter per request (useless!)
func processRequest(req Request) Response {
limiterID := pipz.NewIdentity("api", "Rate limiter")
limiter := pipz.NewRateLimiter(limiterID, 10, 1) // New instance
return limiter.Process(ctx, req)
}
✅ Singleton Connectors
// RIGHT - Package-level Identity and limiter shared across requests
var APILimiterID = pipz.NewIdentity("api", "API rate limiter")
var apiLimiter = pipz.NewRateLimiter(APILimiterID, 10, 1)
func processRequest(req Request) Response {
return apiLimiter.Process(ctx, req)
}
❌ Shallow Clone Implementation
// WRONG - Shares memory between concurrent processors
func (d Data) Clone() Data {
return Data{
ID: d.ID,
Items: d.Items, // SHARES SLICE!
Meta: d.Meta, // SHARES MAP!
}
}
✅ Deep Clone Implementation
// RIGHT - Complete isolation
func (d Data) Clone() Data {
items := make([]Item, len(d.Items))
copy(items, d.Items)
meta := make(map[string]any, len(d.Meta))
for k, v := range d.Meta {
meta[k] = v
}
return Data{
ID: d.ID,
Items: items,
Meta: meta,
}
}
❌ Not Testing Error Paths
// WRONG - Only tests happy path
func TestPipeline(t *testing.T) {
pipeline := buildPipeline()
result, _ := pipeline.Process(ctx, validData)
assert.Equal(t, expected, result)
}
✅ Complete Path Coverage
// RIGHT - Tests success and failure
func TestPipeline(t *testing.T) {
pipeline := buildPipeline()
// Test success
result, err := pipeline.Process(ctx, validData)
require.NoError(t, err)
assert.Equal(t, expected, result)
// Test failure
_, err = pipeline.Process(ctx, invalidData)
require.Error(t, err)
var pipeErr *pipz.Error[Data]
require.True(t, errors.As(err, &pipeErr))
assert.Equal(t, pipz.Name("validation"), pipeErr.Path[len(pipeErr.Path)-1])
}
❌ Ignoring Context Cancellation
// WRONG - Doesn't respect context
func (p *SlowProcessor) Process(ctx context.Context, data Data) (Data, error) {
time.Sleep(5 * time.Second) // Blocks regardless of context
return process(data)
}
✅ Context-Aware Processing
// RIGHT - Respects cancellation
func (p *SlowProcessor) Process(ctx context.Context, data Data) (Data, error) {
select {
case <-time.After(5 * time.Second):
return process(data)
case <-ctx.Done():
return data, ctx.Err()
}
}
Summary
The pipz testing package provides comprehensive tools for validating pipeline behavior:
- MockProcessor for controlled testing with configurable behavior
- ChaosProcessor for resilience and fault tolerance testing
- Assertion helpers for verifying processor interactions
- Three-tier testing strategy separating unit, integration, and performance tests
- Best practices for avoiding common pitfalls
Effective testing ensures your pipelines are robust, performant, and handle edge cases gracefully. Use mocks for isolation, chaos for resilience validation, and follow the testing patterns to build reliable data processing systems.