Recipe: Building Pipelines
A complete, production-ready pipeline example covering validation, enrichment, resilience patterns, and observability.
The Scenario
We'll build a user registration pipeline that:
- Validates user input
- Checks for existing accounts
- Enriches data with defaults
- Creates the account
- Sends welcome emails
- Logs for analytics
Step 1: Define Your Data Model
package main
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/zoobzio/pipz"
)
type User struct {
ID string
Email string
Username string
Password string // hashed
FullName string
Country string
Verified bool
CreatedAt time.Time
Preferences UserPreferences
}
type UserPreferences struct {
Newsletter bool
Language string
Theme string
}
Step 2: Define Your Keys (Constants)
Define all processor and connector names as constants - these are the "keys" to your system:
// All names are constants - this is the key to the system
const (
// Validation processors
ProcessorValidate = "validate"
ProcessorCheckDuplicate = "check_duplicate"
// Transformation processors
ProcessorEnrich = "enrich"
// Persistence processors
ProcessorSave = "save"
// Post-registration processors
ProcessorSendWelcome = "send_welcome"
ProcessorLogAnalytics = "log_analytics"
// Connector names
PipelineRegistration = "registration"
ConnectorPostRegistration = "post-registration"
ConnectorEmailHandle = "email-with-error-handling"
ConnectorSaveRetry = "save-with-retry"
)
Step 3: Define Your Business Logic
Write the core business logic as pure functions:
// Validation logic
func validateUser(ctx context.Context, user User) error {
if user.Email == "" {
return errors.New("email is required")
}
if !strings.Contains(user.Email, "@") {
return errors.New("invalid email format")
}
if len(user.Username) < 3 {
return errors.New("username must be at least 3 characters")
}
if len(user.Password) < 8 {
return errors.New("password must be at least 8 characters")
}
return nil
}
// Check for existing user
func checkDuplicate(ctx context.Context, user User) error {
// Simulate database check
existingEmails := map[string]bool{
"admin@example.com": true,
"test@example.com": true,
}
if existingEmails[user.Email] {
return fmt.Errorf("email %s already registered", user.Email)
}
return nil
}
// Normalize and enrich data
func enrichUser(ctx context.Context, user User) User {
// Normalize email
user.Email = strings.ToLower(strings.TrimSpace(user.Email))
// Set defaults
if user.Country == "" {
user.Country = detectCountry(ctx)
}
if user.Preferences.Language == "" {
user.Preferences.Language = "en"
}
if user.Preferences.Theme == "" {
user.Preferences.Theme = "light"
}
// Set timestamps
user.CreatedAt = time.Now()
user.ID = generateID()
return user
}
// Save to database
func saveUser(ctx context.Context, user User) error {
// Simulate database save
fmt.Printf("Saving user: %s\n", user.Email)
// In real app:
// return db.Save(ctx, user)
return nil
}
// Send welcome email
func sendWelcomeEmail(ctx context.Context, user User) error {
// Simulate email sending
fmt.Printf("Sending welcome email to: %s\n", user.Email)
// In real app:
// return emailService.Send(ctx, WelcomeEmail{
// To: user.Email,
// Name: user.FullName,
// })
return nil
}
// Log for analytics
func logRegistration(ctx context.Context, user User) error {
fmt.Printf("[ANALYTICS] New user registered: %s from %s\n",
user.Username, user.Country)
return nil
}
// Error handlers
func logEmailError(ctx context.Context, err *pipz.Error[User]) error {
fmt.Printf("Failed to send welcome email: %v\n", err.Err)
return nil
}
// Helper functions
func detectCountry(ctx context.Context) string {
// In real app: use GeoIP or similar
return "US"
}
func generateID() string {
return fmt.Sprintf("user_%d", time.Now().UnixNano())
}
Step 4: Define Identities and Processors
Define identities upfront, then wrap your business logic with pipz processors:
// Define identities upfront
var (
// Processor identities
ValidateID = pipz.NewIdentity(ProcessorValidate, "Validates user input fields")
CheckDuplicateID = pipz.NewIdentity(ProcessorCheckDuplicate, "Checks for duplicate user accounts")
EnrichID = pipz.NewIdentity(ProcessorEnrich, "Enriches user data with defaults")
SaveID = pipz.NewIdentity(ProcessorSave, "Saves user to database")
SendWelcomeID = pipz.NewIdentity(ProcessorSendWelcome, "Sends welcome email to user")
LogAnalyticsID = pipz.NewIdentity(ProcessorLogAnalytics, "Logs registration to analytics system")
LogEmailErrorID = pipz.NewIdentity("log_email_error", "Logs email sending errors")
// Connector identities
RegistrationID = pipz.NewIdentity(PipelineRegistration, "Complete user registration pipeline")
PostRegistrationID = pipz.NewIdentity(ConnectorPostRegistration, "Parallel post-registration tasks")
EmailHandleID = pipz.NewIdentity(ConnectorEmailHandle, "Email sending with error recovery")
SaveRetryID = pipz.NewIdentity(ConnectorSaveRetry, "Database save with exponential backoff")
RobustRegistrationID = pipz.NewIdentity("robust-registration", "Robust registration with retry and error handling")
)
// Processors as reusable variables
var (
// Validation processors
ValidateUser = pipz.Effect(ValidateID, validateUser)
CheckDuplicate = pipz.Effect(CheckDuplicateID, checkDuplicate)
// Transformation processors
EnrichUser = pipz.Transform(EnrichID, enrichUser)
// Persistence processors
SaveUser = pipz.Effect(SaveID, saveUser)
// Post-registration processors
SendWelcome = pipz.Effect(SendWelcomeID, sendWelcomeEmail)
LogAnalytics = pipz.Effect(LogAnalyticsID, logRegistration)
// Error handling
LogEmailError = pipz.Effect(LogEmailErrorID, logEmailError)
)
// Note: Since Concurrent requires Cloner, implement it:
func (u User) Clone() User {
// User has no pointer fields, so simple copy works
return u
}
Step 5: Define Your Connectors
Define connector identities upfront, then compose processors into sequences and connectors:
// Composed connectors
var (
// Basic registration pipeline
RegistrationPipeline = pipz.NewSequence[User](RegistrationID,
// Validation phase
ValidateUser,
CheckDuplicate,
// Transformation phase
EnrichUser,
// Persistence phase
SaveUser,
// Post-registration phase (parallel)
pipz.NewConcurrent(PostRegistrationID,
SendWelcome,
LogAnalytics,
),
)
// Robust email sending with error handling
EmailWithErrorHandling = pipz.NewHandle(EmailHandleID,
SendWelcome,
LogEmailError,
)
// Save with retry logic
SaveWithRetry = pipz.NewBackoff(SaveRetryID,
SaveUser,
3,
100*time.Millisecond,
)
// Robust registration pipeline
RobustRegistrationPipeline = pipz.NewSequence[User](RobustRegistrationID,
// Validation with early exit
ValidateUser,
CheckDuplicate,
// Enrich data
EnrichUser,
// Save with retry
SaveWithRetry,
// Non-critical operations shouldn't fail registration
pipz.NewConcurrent(PostRegistrationID,
EmailWithErrorHandling,
LogAnalytics,
),
)
)
Step 6: Create Functions to Execute Pipelines
// Simple registration
func RegisterUser(ctx context.Context, user User) (User, error) {
return RegistrationPipeline.Process(ctx, user)
}
// Robust registration with error handling
func RegisterUserRobust(ctx context.Context, user User) (User, error) {
return RobustRegistrationPipeline.Process(ctx, user)
}
func main() {
// Test with valid user
newUser := User{
Email: "john.doe@example.com",
Username: "johndoe",
Password: "securepassword123", // Would be hashed in real app
FullName: "John Doe",
}
ctx := context.Background()
registered, err := RegisterUser(ctx, newUser)
if err != nil {
var pipeErr *pipz.Error[User]
if errors.As(err, &pipeErr) {
fmt.Printf("Registration failed at %v: %v\n", pipeErr.Path, pipeErr.Err)
} else {
fmt.Printf("Registration failed: %v\n", err)
}
return
}
fmt.Printf("Successfully registered: %+v\n", registered)
}
Step 7: Dynamic Pipeline Modification
Pipelines can be modified at runtime:
// Define identities for dynamic processors
var (
FraudCheckID = pipz.NewIdentity("fraud_check", "Checks user against fraud detection database")
NewEmailProviderID = pipz.NewIdentity(ProcessorSendWelcome, "Sends welcome email using new provider")
)
// Define processors
var (
FraudCheck = pipz.Effect(FraudCheckID, func(ctx context.Context, user User) error {
// Check against fraud database
fmt.Printf("Checking user %s for fraud indicators\n", user.Email)
return nil
})
NewEmailSender = pipz.Effect(NewEmailProviderID, func(ctx context.Context, user User) error {
fmt.Printf("[NEW PROVIDER] Sending welcome email to: %s\n", user.Email)
return nil
})
)
// Add fraud detection for high-risk domains
func AddFraudDetection() {
// Insert after validation but before enrichment
RegistrationPipeline.After(ProcessorCheckDuplicate, FraudCheck)
}
// Replace email sender for A/B testing
func UseNewEmailProvider() {
RegistrationPipeline.Replace(ProcessorSendWelcome, NewEmailSender)
}
Step 8: Add Conditional Logic
Add premium user handling with the Switch connector:
// Additional constants
const (
ProcessorRegularOnboarding = "regular_onboarding"
ProcessorPremiumOnboarding = "premium_onboarding"
ProcessorAssignManager = "assign_account_manager"
RouterUserType = "user-type-router"
PipelinePremiumFlow = "premium-flow"
PipelineConditionalReg = "conditional-registration"
)
// Route keys
type UserType string
const (
TypeRegular UserType = "regular"
TypePremium UserType = "premium"
)
// Business logic
func detectUserType(ctx context.Context, user User) UserType {
// Premium domains get premium accounts
premiumDomains := []string{"company.com", "enterprise.org"}
emailDomain := strings.Split(user.Email, "@")[1]
for _, domain := range premiumDomains {
if emailDomain == domain {
return TypePremium
}
}
return TypeRegular
}
func regularOnboarding(ctx context.Context, u User) error {
fmt.Println("Starting regular onboarding flow")
return nil
}
func premiumOnboarding(ctx context.Context, u User) error {
fmt.Println("Starting premium onboarding flow")
return nil
}
func assignAccountManager(ctx context.Context, u User) error {
fmt.Println("Assigning dedicated account manager")
return nil
}
// Define identities upfront
var (
// Processor identities
RegularOnboardingID = pipz.NewIdentity(ProcessorRegularOnboarding, "Executes regular user onboarding flow")
PremiumOnboardingID = pipz.NewIdentity(ProcessorPremiumOnboarding, "Executes premium user onboarding flow")
AssignManagerID = pipz.NewIdentity(ProcessorAssignManager, "Assigns dedicated account manager to user")
// Connector identities
PremiumFlowID = pipz.NewIdentity(PipelinePremiumFlow, "Premium user onboarding workflow")
UserTypeRouterID = pipz.NewIdentity(RouterUserType, "Routes users to appropriate onboarding flow")
ConditionalRegID = pipz.NewIdentity(PipelineConditionalReg, "Registration with conditional user type routing")
)
// Processors
var (
RegularOnboarding = pipz.Effect(RegularOnboardingID, regularOnboarding)
PremiumOnboarding = pipz.Effect(PremiumOnboardingID, premiumOnboarding)
AssignManager = pipz.Effect(AssignManagerID, assignAccountManager)
)
// Connectors
var (
// Premium user flow
PremiumFlow = pipz.NewSequence[User](PremiumFlowID,
PremiumOnboarding,
AssignManager,
)
// Router for user types
UserTypeRouter = pipz.NewSwitch(UserTypeRouterID, detectUserType).
AddRoute(TypeRegular, RegularOnboarding).
AddRoute(TypePremium, PremiumFlow)
// Conditional registration pipeline
ConditionalRegistrationPipeline = pipz.NewSequence[User](ConditionalRegID,
// Common steps
ValidateUser,
CheckDuplicate,
EnrichUser,
SaveUser,
// Route based on user type
UserTypeRouter,
)
)
// Function to execute
func RegisterUserConditional(ctx context.Context, user User) (User, error) {
return ConditionalRegistrationPipeline.Process(ctx, user)
}
Step 9: Test Your Pipeline
func TestRegistrationPipeline(t *testing.T) {
pipeline := createRegistrationPipeline()
tests := []struct {
name string
user User
wantErr bool
}{
{
name: "valid user",
user: User{
Email: "valid@example.com",
Username: "validuser",
Password: "password123",
},
wantErr: false,
},
{
name: "invalid email",
user: User{
Email: "invalid",
Username: "validuser",
Password: "password123",
},
wantErr: true,
},
{
name: "duplicate email",
user: User{
Email: "admin@example.com",
Username: "newadmin",
Password: "password123",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := pipeline.Process(context.Background(), tt.user)
if (err != nil) != tt.wantErr {
t.Errorf("Process() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
Step 10: Production Resilience
For production deployments with external service calls, layer additional resilience patterns:
// Additional constants for resilience
const (
ConnectorAPIBreaker = "api-circuit-breaker"
ConnectorAPIRateLimit = "api-rate-limit"
ConnectorAPITimeout = "api-timeout"
PipelineProduction = "production-registration"
)
// Define resilience identities
var (
APIBreakerID = pipz.NewIdentity(ConnectorAPIBreaker, "Prevents cascading failures to external services")
APIRateLimitID = pipz.NewIdentity(ConnectorAPIRateLimit, "Limits request rate to external APIs")
APITimeoutID = pipz.NewIdentity(ConnectorAPITimeout, "Enforces timeout on external calls")
ProductionRegID = pipz.NewIdentity(PipelineProduction, "Production registration with full resilience")
)
// Wrap external service calls with resilience stack
var (
// Layer: Timeout → CircuitBreaker → Backoff → Effect
ResilientSave = pipz.NewTimeout(APITimeoutID,
pipz.NewCircuitBreaker(APIBreakerID,
pipz.NewBackoff(SaveRetryID,
SaveUser,
3,
100*time.Millisecond,
),
5, // Open after 5 failures
30*time.Second, // Try recovery after 30s
),
5*time.Second, // 5 second timeout
)
// Rate limit external email API
RateLimitedEmail = pipz.NewRateLimiter(APIRateLimitID,
10, // 10 requests per second
5, // Burst of 5
EmailWithErrorHandling,
)
// Production pipeline with full resilience
ProductionRegistrationPipeline = pipz.NewSequence[User](ProductionRegID,
ValidateUser,
CheckDuplicate,
EnrichUser,
ResilientSave,
pipz.NewConcurrent(PostRegistrationID,
RateLimitedEmail,
LogAnalytics,
),
)
)
Step 11: Observability with Pipeline Wrapper
Wrap your pipeline for distributed tracing and correlation:
import "github.com/zoobzio/capitan"
// Set up signal handlers before creating pipelines
func init() {
// Monitor circuit breaker state
capitan.Hook(pipz.SignalCircuitBreakerOpened, func(ctx context.Context, e *capitan.Event) {
execID, _ := pipz.ExecutionIDFromContext(ctx)
name, _ := pipz.FieldName.From(e)
log.Printf("[ALERT] Circuit opened: %s (exec=%s)", name, execID)
})
// Track retries
capitan.Hook(pipz.SignalRetryExhausted, func(ctx context.Context, e *capitan.Event) {
execID, _ := pipz.ExecutionIDFromContext(ctx)
name, _ := pipz.FieldName.From(e)
attempts, _ := pipz.FieldMaxAttempts.From(e)
log.Printf("[ERROR] Retry exhausted: %s after %d attempts (exec=%s)", name, attempts, execID)
})
// Monitor rate limiting
capitan.Hook(pipz.SignalRateLimiterDropped, func(ctx context.Context, e *capitan.Event) {
name, _ := pipz.FieldName.From(e)
log.Printf("[WARN] Request dropped by rate limiter: %s", name)
})
}
// Wrap with Pipeline for execution context
var ObservableRegistration = pipz.NewPipeline(ProductionRegID, ProductionRegistrationPipeline)
// Each call gets unique execution ID for correlation
func RegisterUserProduction(ctx context.Context, user User) (User, error) {
return ObservableRegistration.Process(ctx, user)
}
Key Takeaways
- Start Simple: Begin with basic processors and compose them
- Add Robustness Gradually: Layer in retry, timeout, circuit breaker, and rate limiting
- Use the Right Connector: Sequence for steps, Concurrent for parallel work
- Handle Errors Appropriately: Critical vs non-critical operations
- Wrap for Observability: Use Pipeline wrapper for correlation IDs
- Test Each Component: Processors are independently testable
See Also
- Safety & Reliability - Error recovery patterns
- Testing Pipelines - Comprehensive testing strategies
- Performance - Optimization tips
- Library Resilience - Expose resilience via With* API
- Extensible Vocabulary - Create domain-specific APIs