zoobzio December 12, 2025 Edit this page

Recipe: Building Pipelines

A complete, production-ready pipeline example covering validation, enrichment, resilience patterns, and observability.

The Scenario

We'll build a user registration pipeline that:

  1. Validates user input
  2. Checks for existing accounts
  3. Enriches data with defaults
  4. Creates the account
  5. Sends welcome emails
  6. Logs for analytics

Step 1: Define Your Data Model

package main

import (
    "context"
    "errors"
    "fmt"
    "strings"
    "time"
    
    "github.com/zoobzio/pipz"
)

type User struct {
    ID           string
    Email        string
    Username     string
    Password     string // hashed
    FullName     string
    Country      string
    Verified     bool
    CreatedAt    time.Time
    Preferences  UserPreferences
}

type UserPreferences struct {
    Newsletter  bool
    Language    string
    Theme       string
}

Step 2: Define Your Keys (Constants)

Define all processor and connector names as constants - these are the "keys" to your system:

// All names are constants - this is the key to the system
const (
    // Validation processors
    ProcessorValidate       = "validate"
    ProcessorCheckDuplicate = "check_duplicate"
    
    // Transformation processors  
    ProcessorEnrich = "enrich"
    
    // Persistence processors
    ProcessorSave = "save"
    
    // Post-registration processors
    ProcessorSendWelcome  = "send_welcome"
    ProcessorLogAnalytics = "log_analytics"
    
    // Connector names
    PipelineRegistration      = "registration"
    ConnectorPostRegistration = "post-registration"
    ConnectorEmailHandle      = "email-with-error-handling"
    ConnectorSaveRetry        = "save-with-retry"
)

Step 3: Define Your Business Logic

Write the core business logic as pure functions:

// Validation logic
func validateUser(ctx context.Context, user User) error {
    if user.Email == "" {
        return errors.New("email is required")
    }
    if !strings.Contains(user.Email, "@") {
        return errors.New("invalid email format")
    }
    if len(user.Username) < 3 {
        return errors.New("username must be at least 3 characters")
    }
    if len(user.Password) < 8 {
        return errors.New("password must be at least 8 characters")
    }
    return nil
}

// Check for existing user
func checkDuplicate(ctx context.Context, user User) error {
    // Simulate database check
    existingEmails := map[string]bool{
        "admin@example.com": true,
        "test@example.com":  true,
    }
    
    if existingEmails[user.Email] {
        return fmt.Errorf("email %s already registered", user.Email)
    }
    return nil
}

// Normalize and enrich data
func enrichUser(ctx context.Context, user User) User {
    // Normalize email
    user.Email = strings.ToLower(strings.TrimSpace(user.Email))
    
    // Set defaults
    if user.Country == "" {
        user.Country = detectCountry(ctx)
    }
    
    if user.Preferences.Language == "" {
        user.Preferences.Language = "en"
    }
    
    if user.Preferences.Theme == "" {
        user.Preferences.Theme = "light"
    }
    
    // Set timestamps
    user.CreatedAt = time.Now()
    user.ID = generateID()
    
    return user
}

// Save to database
func saveUser(ctx context.Context, user User) error {
    // Simulate database save
    fmt.Printf("Saving user: %s\n", user.Email)
    
    // In real app:
    // return db.Save(ctx, user)
    
    return nil
}

// Send welcome email
func sendWelcomeEmail(ctx context.Context, user User) error {
    // Simulate email sending
    fmt.Printf("Sending welcome email to: %s\n", user.Email)
    
    // In real app:
    // return emailService.Send(ctx, WelcomeEmail{
    //     To: user.Email,
    //     Name: user.FullName,
    // })
    
    return nil
}

// Log for analytics
func logRegistration(ctx context.Context, user User) error {
    fmt.Printf("[ANALYTICS] New user registered: %s from %s\n", 
        user.Username, user.Country)
    return nil
}

// Error handlers
func logEmailError(ctx context.Context, err *pipz.Error[User]) error {
    fmt.Printf("Failed to send welcome email: %v\n", err.Err)
    return nil
}

// Helper functions
func detectCountry(ctx context.Context) string {
    // In real app: use GeoIP or similar
    return "US"
}

func generateID() string {
    return fmt.Sprintf("user_%d", time.Now().UnixNano())
}

Step 4: Define Identities and Processors

Define identities upfront, then wrap your business logic with pipz processors:

// Define identities upfront
var (
    // Processor identities
    ValidateID        = pipz.NewIdentity(ProcessorValidate, "Validates user input fields")
    CheckDuplicateID  = pipz.NewIdentity(ProcessorCheckDuplicate, "Checks for duplicate user accounts")
    EnrichID          = pipz.NewIdentity(ProcessorEnrich, "Enriches user data with defaults")
    SaveID            = pipz.NewIdentity(ProcessorSave, "Saves user to database")
    SendWelcomeID     = pipz.NewIdentity(ProcessorSendWelcome, "Sends welcome email to user")
    LogAnalyticsID    = pipz.NewIdentity(ProcessorLogAnalytics, "Logs registration to analytics system")
    LogEmailErrorID   = pipz.NewIdentity("log_email_error", "Logs email sending errors")

    // Connector identities
    RegistrationID        = pipz.NewIdentity(PipelineRegistration, "Complete user registration pipeline")
    PostRegistrationID    = pipz.NewIdentity(ConnectorPostRegistration, "Parallel post-registration tasks")
    EmailHandleID         = pipz.NewIdentity(ConnectorEmailHandle, "Email sending with error recovery")
    SaveRetryID           = pipz.NewIdentity(ConnectorSaveRetry, "Database save with exponential backoff")
    RobustRegistrationID  = pipz.NewIdentity("robust-registration", "Robust registration with retry and error handling")
)

// Processors as reusable variables
var (
    // Validation processors
    ValidateUser    = pipz.Effect(ValidateID, validateUser)
    CheckDuplicate  = pipz.Effect(CheckDuplicateID, checkDuplicate)

    // Transformation processors
    EnrichUser = pipz.Transform(EnrichID, enrichUser)

    // Persistence processors
    SaveUser = pipz.Effect(SaveID, saveUser)

    // Post-registration processors
    SendWelcome  = pipz.Effect(SendWelcomeID, sendWelcomeEmail)
    LogAnalytics = pipz.Effect(LogAnalyticsID, logRegistration)

    // Error handling
    LogEmailError = pipz.Effect(LogEmailErrorID, logEmailError)
)

// Note: Since Concurrent requires Cloner, implement it:
func (u User) Clone() User {
    // User has no pointer fields, so simple copy works
    return u
}

Step 5: Define Your Connectors

Define connector identities upfront, then compose processors into sequences and connectors:

// Composed connectors
var (
    // Basic registration pipeline
    RegistrationPipeline = pipz.NewSequence[User](RegistrationID,
        // Validation phase
        ValidateUser,
        CheckDuplicate,

        // Transformation phase
        EnrichUser,

        // Persistence phase
        SaveUser,

        // Post-registration phase (parallel)
        pipz.NewConcurrent(PostRegistrationID,
            SendWelcome,
            LogAnalytics,
        ),
    )

    // Robust email sending with error handling
    EmailWithErrorHandling = pipz.NewHandle(EmailHandleID,
        SendWelcome,
        LogEmailError,
    )

    // Save with retry logic
    SaveWithRetry = pipz.NewBackoff(SaveRetryID,
        SaveUser,
        3,
        100*time.Millisecond,
    )

    // Robust registration pipeline
    RobustRegistrationPipeline = pipz.NewSequence[User](RobustRegistrationID,
        // Validation with early exit
        ValidateUser,
        CheckDuplicate,

        // Enrich data
        EnrichUser,

        // Save with retry
        SaveWithRetry,

        // Non-critical operations shouldn't fail registration
        pipz.NewConcurrent(PostRegistrationID,
            EmailWithErrorHandling,
            LogAnalytics,
        ),
    )
)

Step 6: Create Functions to Execute Pipelines

// Simple registration
func RegisterUser(ctx context.Context, user User) (User, error) {
    return RegistrationPipeline.Process(ctx, user)
}

// Robust registration with error handling
func RegisterUserRobust(ctx context.Context, user User) (User, error) {
    return RobustRegistrationPipeline.Process(ctx, user)
}

func main() {
    // Test with valid user
    newUser := User{
        Email:    "john.doe@example.com",
        Username: "johndoe",
        Password: "securepassword123", // Would be hashed in real app
        FullName: "John Doe",
    }
    
    ctx := context.Background()
    registered, err := RegisterUser(ctx, newUser)
    if err != nil {
        var pipeErr *pipz.Error[User]
        if errors.As(err, &pipeErr) {
            fmt.Printf("Registration failed at %v: %v\n", pipeErr.Path, pipeErr.Err)
        } else {
            fmt.Printf("Registration failed: %v\n", err)
        }
        return
    }
    
    fmt.Printf("Successfully registered: %+v\n", registered)
}

Step 7: Dynamic Pipeline Modification

Pipelines can be modified at runtime:

// Define identities for dynamic processors
var (
    FraudCheckID       = pipz.NewIdentity("fraud_check", "Checks user against fraud detection database")
    NewEmailProviderID = pipz.NewIdentity(ProcessorSendWelcome, "Sends welcome email using new provider")
)

// Define processors
var (
    FraudCheck = pipz.Effect(FraudCheckID, func(ctx context.Context, user User) error {
        // Check against fraud database
        fmt.Printf("Checking user %s for fraud indicators\n", user.Email)
        return nil
    })

    NewEmailSender = pipz.Effect(NewEmailProviderID, func(ctx context.Context, user User) error {
        fmt.Printf("[NEW PROVIDER] Sending welcome email to: %s\n", user.Email)
        return nil
    })
)

// Add fraud detection for high-risk domains
func AddFraudDetection() {
    // Insert after validation but before enrichment
    RegistrationPipeline.After(ProcessorCheckDuplicate, FraudCheck)
}

// Replace email sender for A/B testing
func UseNewEmailProvider() {
    RegistrationPipeline.Replace(ProcessorSendWelcome, NewEmailSender)
}

Step 8: Add Conditional Logic

Add premium user handling with the Switch connector:

// Additional constants
const (
    ProcessorRegularOnboarding  = "regular_onboarding"
    ProcessorPremiumOnboarding  = "premium_onboarding"
    ProcessorAssignManager      = "assign_account_manager"
    RouterUserType              = "user-type-router"
    PipelinePremiumFlow         = "premium-flow"
    PipelineConditionalReg      = "conditional-registration"
)

// Route keys
type UserType string

const (
    TypeRegular UserType = "regular"
    TypePremium UserType = "premium"
)

// Business logic
func detectUserType(ctx context.Context, user User) UserType {
    // Premium domains get premium accounts
    premiumDomains := []string{"company.com", "enterprise.org"}
    
    emailDomain := strings.Split(user.Email, "@")[1]
    for _, domain := range premiumDomains {
        if emailDomain == domain {
            return TypePremium
        }
    }
    return TypeRegular
}

func regularOnboarding(ctx context.Context, u User) error {
    fmt.Println("Starting regular onboarding flow")
    return nil
}

func premiumOnboarding(ctx context.Context, u User) error {
    fmt.Println("Starting premium onboarding flow")
    return nil
}

func assignAccountManager(ctx context.Context, u User) error {
    fmt.Println("Assigning dedicated account manager")
    return nil
}

// Define identities upfront
var (
    // Processor identities
    RegularOnboardingID  = pipz.NewIdentity(ProcessorRegularOnboarding, "Executes regular user onboarding flow")
    PremiumOnboardingID  = pipz.NewIdentity(ProcessorPremiumOnboarding, "Executes premium user onboarding flow")
    AssignManagerID      = pipz.NewIdentity(ProcessorAssignManager, "Assigns dedicated account manager to user")

    // Connector identities
    PremiumFlowID        = pipz.NewIdentity(PipelinePremiumFlow, "Premium user onboarding workflow")
    UserTypeRouterID     = pipz.NewIdentity(RouterUserType, "Routes users to appropriate onboarding flow")
    ConditionalRegID     = pipz.NewIdentity(PipelineConditionalReg, "Registration with conditional user type routing")
)

// Processors
var (
    RegularOnboarding  = pipz.Effect(RegularOnboardingID, regularOnboarding)
    PremiumOnboarding  = pipz.Effect(PremiumOnboardingID, premiumOnboarding)
    AssignManager      = pipz.Effect(AssignManagerID, assignAccountManager)
)

// Connectors
var (
    // Premium user flow
    PremiumFlow = pipz.NewSequence[User](PremiumFlowID,
        PremiumOnboarding,
        AssignManager,
    )

    // Router for user types
    UserTypeRouter = pipz.NewSwitch(UserTypeRouterID, detectUserType).
        AddRoute(TypeRegular, RegularOnboarding).
        AddRoute(TypePremium, PremiumFlow)

    // Conditional registration pipeline
    ConditionalRegistrationPipeline = pipz.NewSequence[User](ConditionalRegID,
        // Common steps
        ValidateUser,
        CheckDuplicate,
        EnrichUser,
        SaveUser,

        // Route based on user type
        UserTypeRouter,
    )
)

// Function to execute
func RegisterUserConditional(ctx context.Context, user User) (User, error) {
    return ConditionalRegistrationPipeline.Process(ctx, user)
}

Step 9: Test Your Pipeline

func TestRegistrationPipeline(t *testing.T) {
    pipeline := createRegistrationPipeline()
    
    tests := []struct {
        name    string
        user    User
        wantErr bool
    }{
        {
            name: "valid user",
            user: User{
                Email:    "valid@example.com",
                Username: "validuser",
                Password: "password123",
            },
            wantErr: false,
        },
        {
            name: "invalid email",
            user: User{
                Email:    "invalid",
                Username: "validuser",
                Password: "password123",
            },
            wantErr: true,
        },
        {
            name: "duplicate email",
            user: User{
                Email:    "admin@example.com",
                Username: "newadmin",
                Password: "password123",
            },
            wantErr: true,
        },
    }
    
    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            _, err := pipeline.Process(context.Background(), tt.user)
            if (err != nil) != tt.wantErr {
                t.Errorf("Process() error = %v, wantErr %v", err, tt.wantErr)
            }
        })
    }
}

Step 10: Production Resilience

For production deployments with external service calls, layer additional resilience patterns:

// Additional constants for resilience
const (
    ConnectorAPIBreaker   = "api-circuit-breaker"
    ConnectorAPIRateLimit = "api-rate-limit"
    ConnectorAPITimeout   = "api-timeout"
    PipelineProduction    = "production-registration"
)

// Define resilience identities
var (
    APIBreakerID      = pipz.NewIdentity(ConnectorAPIBreaker, "Prevents cascading failures to external services")
    APIRateLimitID    = pipz.NewIdentity(ConnectorAPIRateLimit, "Limits request rate to external APIs")
    APITimeoutID      = pipz.NewIdentity(ConnectorAPITimeout, "Enforces timeout on external calls")
    ProductionRegID   = pipz.NewIdentity(PipelineProduction, "Production registration with full resilience")
)

// Wrap external service calls with resilience stack
var (
    // Layer: Timeout → CircuitBreaker → Backoff → Effect
    ResilientSave = pipz.NewTimeout(APITimeoutID,
        pipz.NewCircuitBreaker(APIBreakerID,
            pipz.NewBackoff(SaveRetryID,
                SaveUser,
                3,
                100*time.Millisecond,
            ),
            5,               // Open after 5 failures
            30*time.Second,  // Try recovery after 30s
        ),
        5*time.Second, // 5 second timeout
    )

    // Rate limit external email API
    RateLimitedEmail = pipz.NewRateLimiter(APIRateLimitID,
        10,  // 10 requests per second
        5,   // Burst of 5
        EmailWithErrorHandling,
    )

    // Production pipeline with full resilience
    ProductionRegistrationPipeline = pipz.NewSequence[User](ProductionRegID,
        ValidateUser,
        CheckDuplicate,
        EnrichUser,
        ResilientSave,
        pipz.NewConcurrent(PostRegistrationID,
            RateLimitedEmail,
            LogAnalytics,
        ),
    )
)

Step 11: Observability with Pipeline Wrapper

Wrap your pipeline for distributed tracing and correlation:

import "github.com/zoobzio/capitan"

// Set up signal handlers before creating pipelines
func init() {
    // Monitor circuit breaker state
    capitan.Hook(pipz.SignalCircuitBreakerOpened, func(ctx context.Context, e *capitan.Event) {
        execID, _ := pipz.ExecutionIDFromContext(ctx)
        name, _ := pipz.FieldName.From(e)
        log.Printf("[ALERT] Circuit opened: %s (exec=%s)", name, execID)
    })

    // Track retries
    capitan.Hook(pipz.SignalRetryExhausted, func(ctx context.Context, e *capitan.Event) {
        execID, _ := pipz.ExecutionIDFromContext(ctx)
        name, _ := pipz.FieldName.From(e)
        attempts, _ := pipz.FieldMaxAttempts.From(e)
        log.Printf("[ERROR] Retry exhausted: %s after %d attempts (exec=%s)", name, attempts, execID)
    })

    // Monitor rate limiting
    capitan.Hook(pipz.SignalRateLimiterDropped, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        log.Printf("[WARN] Request dropped by rate limiter: %s", name)
    })
}

// Wrap with Pipeline for execution context
var ObservableRegistration = pipz.NewPipeline(ProductionRegID, ProductionRegistrationPipeline)

// Each call gets unique execution ID for correlation
func RegisterUserProduction(ctx context.Context, user User) (User, error) {
    return ObservableRegistration.Process(ctx, user)
}

Key Takeaways

  1. Start Simple: Begin with basic processors and compose them
  2. Add Robustness Gradually: Layer in retry, timeout, circuit breaker, and rate limiting
  3. Use the Right Connector: Sequence for steps, Concurrent for parallel work
  4. Handle Errors Appropriately: Critical vs non-critical operations
  5. Wrap for Observability: Use Pipeline wrapper for correlation IDs
  6. Test Each Component: Processors are independently testable

See Also