zoobzio December 12, 2025 Edit this page

Hooks and Observability

Pipz integrates with capitan to provide type-safe event hooks for observability, monitoring, and debugging. Stateful connectors emit signals at critical state transitions and decision points, allowing you to observe system behavior without modifying your processing logic.

Overview

Hooks enable you to:

  • Monitor circuit breaker state changes and timeout events
  • Track rate limiting behavior and backpressure
  • Observe worker pool saturation, retry exhaustion, and backoff patterns
  • Detect when fallback processors are being used
  • Alert on threshold violations and failure patterns
  • Collect metrics for dashboards
  • Debug pipeline behavior in production

All events are emitted asynchronously via per-signal worker goroutines, ensuring hooks don't impact pipeline performance.

Event Severity

As of capitan v0.0.5, all events include a severity level that indicates their importance:

  • Error: System failures requiring immediate attention (circuit opened, requests rejected/dropped, all retries exhausted, timeouts)
  • Warn: Degraded performance or fallback scenarios (circuit half-open, rate limiting throttled, pool saturated, individual retry failures, using fallback processors, backoff delays)
  • Info: Normal operations (circuit closed, rate limiter allowed, worker acquired/released, retry attempts, using primary processor)
  • Debug: Detailed operational information (currently unused, but available for verbose logging)

Events can be filtered by severity in your hooks using e.Severity().

Available Signals

CircuitBreaker

SignalWhen EmittedKey Fields
circuitbreaker.openedCircuit opens after failure threshold reachedname, state, failures, failure_threshold
circuitbreaker.closedCircuit closes after successful recoveryname, state, successes, success_threshold
circuitbreaker.half-openCircuit transitions to half-open for testingname, state, generation
circuitbreaker.rejectedRequest rejected while circuit is openname, state, generation

RateLimiter

SignalWhen EmittedKey Fields
ratelimiter.allowedRequest allowed, token consumedname, tokens, rate, burst
ratelimiter.throttledRequest waiting for tokens (wait mode)name, wait_time, tokens, rate
ratelimiter.droppedRequest dropped, no tokens available (drop mode)name, tokens, rate, burst, mode

WorkerPool

SignalWhen EmittedKey Fields
workerpool.saturatedAll worker slots occupied, task will blockname, worker_count, active_workers
workerpool.acquiredWorker slot acquired, task startingname, worker_count, active_workers
workerpool.releasedWorker slot released, task completedname, worker_count, active_workers

Retry

SignalWhen EmittedKey Fields
retry.attempt-startStarting a retry attemptname, attempt, max_attempts
retry.attempt-failRetry attempt failedname, attempt, max_attempts, error
retry.exhaustedAll retry attempts exhaustedname, max_attempts, error

Fallback

SignalWhen EmittedKey Fields
fallback.attemptAttempting a fallback processorname, processor_index, processor_name
fallback.failedAll fallback processors failedname, error

Timeout

SignalWhen EmittedKey Fields
timeout.triggeredOperation exceeded timeout durationname, duration

Backoff

SignalWhen EmittedKey Fields
backoff.waitingEntering exponential backoff delayname, attempt, max_attempts, delay, next_delay

Field Reference

All fields use primitive types for easy integration with monitoring systems:

Field KeyTypeDescription
FieldNamestringConnector instance name
FieldErrorstringError message
FieldTimestampfloat64Unix timestamp
FieldStatestringCircuit state: "closed", "open", "half-open"
FieldFailuresintCurrent failure count
FieldSuccessesintCurrent success count
FieldFailureThresholdintFailures needed to open circuit
FieldSuccessThresholdintSuccesses needed to close from half-open
FieldResetTimeoutfloat64Reset timeout in seconds
FieldGenerationintCircuit generation number
FieldLastFailTimefloat64Last failure timestamp
FieldRatefloat64Requests per second
FieldBurstintMaximum burst capacity
FieldTokensfloat64Current available tokens
FieldModestringRate limiter mode: "wait" or "drop"
FieldWaitTimefloat64Wait time in seconds
FieldWorkerCountintTotal worker slots
FieldActiveWorkersintCurrently active workers
FieldAttemptintCurrent retry attempt number
FieldMaxAttemptsintMaximum retry attempts
FieldProcessorIndexintFallback processor index
FieldProcessorNamestringFallback processor name
FieldDurationfloat64Timeout duration in seconds
FieldDelayfloat64Current backoff delay in seconds
FieldNextDelayfloat64Next backoff delay in seconds

Usage Examples

Basic Hook Registration

import (
    "context"
    "fmt"

    "github.com/zoobzio/capitan"
    "github.com/zoobzio/pipz"
)

func main() {
    // Configure capitan (optional, before any hooks)
    capitan.Configure(capitan.WithBufferSize(64))

    // Hook circuit breaker signals
    capitan.Hook(pipz.SignalCircuitBreakerOpened, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        failures, _ := pipz.FieldFailures.From(e)
        threshold, _ := pipz.FieldFailureThreshold.From(e)

        fmt.Printf("ALERT: Circuit %s opened (failures=%d, threshold=%d)\n",
            name, failures, threshold)
    })

    // Your pipeline code...

    // Shutdown capitan to drain pending events
    defer capitan.Shutdown()
}

Metrics Collection

import (
    "github.com/prometheus/client_golang/prometheus"
)

var (
    circuitState = prometheus.NewGaugeVec(
        prometheus.GaugeOpts{
            Name: "pipz_circuit_state",
            Help: "Circuit breaker state (0=closed, 1=half-open, 2=open)",
        },
        []string{"name"},
    )

    rateLimitDropped = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "pipz_ratelimit_dropped_total",
            Help: "Total requests dropped by rate limiter",
        },
        []string{"name"},
    )
)

func init() {
    prometheus.MustRegister(circuitState, rateLimitDropped)

    // Track circuit state
    capitan.Hook(pipz.SignalCircuitBreakerOpened, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        circuitState.WithLabelValues(name).Set(2) // open
    })

    capitan.Hook(pipz.SignalCircuitBreakerClosed, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        circuitState.WithLabelValues(name).Set(0) // closed
    })

    capitan.Hook(pipz.SignalCircuitBreakerHalfOpen, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        circuitState.WithLabelValues(name).Set(1) // half-open
    })

    // Track dropped requests
    capitan.Hook(pipz.SignalRateLimiterDropped, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        rateLimitDropped.WithLabelValues(name).Inc()
    })
}

Structured Logging

import (
    "log/slog"
)

func setupHooks() {
    // Log all rate limiter events
    capitan.Hook(pipz.SignalRateLimiterThrottled, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        waitTime, _ := pipz.FieldWaitTime.From(e)
        tokens, _ := pipz.FieldTokens.From(e)

        slog.WarnContext(ctx, "rate limiter throttled",
            "connector", name,
            "wait_seconds", waitTime,
            "tokens_remaining", tokens,
        )
    })

    // Log worker pool saturation
    capitan.Hook(pipz.SignalWorkerPoolSaturated, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        workers, _ := pipz.FieldWorkerCount.From(e)

        slog.WarnContext(ctx, "worker pool saturated",
            "connector", name,
            "worker_count", workers,
        )
    })
}

Alerting

func setupAlerts() {
    // Alert when circuit opens
    capitan.Hook(pipz.SignalCircuitBreakerOpened, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        failures, _ := pipz.FieldFailures.From(e)

        // Send to alerting system
        sendAlert(Alert{
            Severity: "critical",
            Title:    fmt.Sprintf("Circuit Breaker Opened: %s", name),
            Message:  fmt.Sprintf("Failures reached threshold: %d", failures),
        })
    })

    // Alert when rate limiter starts dropping requests
    capitan.Hook(pipz.SignalRateLimiterDropped, func(ctx context.Context, e *capitan.Event) {
        name, _ := pipz.FieldName.From(e)
        rate, _ := pipz.FieldRate.From(e)

        sendAlert(Alert{
            Severity: "warning",
            Title:    fmt.Sprintf("Rate Limiter Dropping: %s", name),
            Message:  fmt.Sprintf("Capacity exceeded (rate=%.1f/s)", rate),
        })
    })
}

Severity-Based Filtering

// Only process error-level events
capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    if e.Severity() != capitan.SeverityError {
        return
    }

    name, _ := pipz.FieldName.From(e)
    log.Printf("ERROR event from %s: %s", name, e.Signal())

    // Send to error tracking system
    sendToErrorTracker(e)
})

// Route events by severity
capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    switch e.Severity() {
    case capitan.SeverityError:
        sendToAlertingSystem(e)
    case capitan.SeverityWarn:
        sendToMonitoringDashboard(e)
    case capitan.SeverityInfo:
        sendToMetricsCollector(e)
    case capitan.SeverityDebug:
        sendToDebugLogs(e)
    }
})

Observer Pattern

Use Observe() to listen to multiple signals with a single handler:

// Observe all circuit breaker events
capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    name, _ := pipz.FieldName.From(e)

    switch e.Signal() {
    case pipz.SignalCircuitBreakerOpened:
        log.Printf("Circuit %s: OPENED", name)
    case pipz.SignalCircuitBreakerClosed:
        log.Printf("Circuit %s: CLOSED", name)
    case pipz.SignalCircuitBreakerHalfOpen:
        log.Printf("Circuit %s: TESTING", name)
    }
},
    pipz.SignalCircuitBreakerOpened,
    pipz.SignalCircuitBreakerClosed,
    pipz.SignalCircuitBreakerHalfOpen,
)

// Or observe ALL signals
capitan.Observe(func(ctx context.Context, e *capitan.Event) {
    // Log everything for debugging
    log.Printf("Event: %s", e.Signal())
})

Performance Considerations

Asynchronous Processing

All events are processed asynchronously in per-signal worker goroutines. This means:

  • ✅ Hooks never block pipeline processing
  • ✅ Slow handlers don't impact throughput
  • ✅ Handler panics are recovered automatically
  • ❌ Events may be buffered if handlers are slow
  • ❌ No guaranteed delivery if process crashes

Buffer Sizing

Configure buffer size based on emission rate:

// Default: 16 events per signal
capitan.Configure(capitan.WithBufferSize(16))

// High-volume: increase buffer
capitan.Configure(capitan.WithBufferSize(128))

// Low-latency: smaller buffer (fails faster if handler is slow)
capitan.Configure(capitan.WithBufferSize(4))

If a signal's buffer fills, Emit() becomes blocking until the handler catches up.

Handler Best Practices

  1. Keep handlers fast - Emit to external queues/channels rather than doing heavy work
  2. Don't block - Avoid synchronous I/O in handlers
  3. Handle panics - Capitan recovers, but you should still be defensive
  4. Use context - Respect cancellation in long-running handlers
// ❌ Bad: Blocking I/O in handler
capitan.Hook(signal, func(ctx context.Context, e *capitan.Event) {
    http.Post("https://alerting.com/api", ...)  // Blocks!
})

// ✅ Good: Queue for async processing
var alertQueue = make(chan Alert, 100)

capitan.Hook(signal, func(ctx context.Context, e *capitan.Event) {
    select {
    case alertQueue <- buildAlert(e):
    default:
        // Queue full, drop (don't block pipeline)
    }
})

Shutdown

Always call Shutdown() to drain pending events:

func main() {
    // Setup hooks...

    // Run application...

    // Drain events before exit
    capitan.Shutdown()
}

Without Shutdown(), buffered events may be lost on process exit.

Integration with Connectors

CircuitBreaker

Emits signals on state transitions:

var apiBreaker = pipz.NewCircuitBreaker(
    pipz.NewIdentity("api-breaker", "Protects API from cascading failures"),
    apiProcessor,
    5,                  // failureThreshold
    30 * time.Second,   // resetTimeout
)

// Hook to track state
capitan.Hook(pipz.SignalCircuitBreakerOpened, trackCircuitState)
capitan.Hook(pipz.SignalCircuitBreakerClosed, trackCircuitState)

See CircuitBreaker reference for details.

RateLimiter

Emits signals for throttling and dropping:

var apiLimiter = pipz.NewRateLimiter[Request](
    pipz.NewIdentity("api-limiter", "Rate limits API requests"),
    100,    // rate per second
    10,     // burst
).SetMode("drop")

// Hook to track dropped requests
capitan.Hook(pipz.SignalRateLimiterDropped, trackDrops)

See RateLimiter reference for details.

WorkerPool

Emits signals for worker acquisition and saturation:

var pool = pipz.NewWorkerPool[Task](
    pipz.NewIdentity("worker-pool", "Limits concurrent task processing"),
    10,  // worker count
    processors...,
)

// Hook to track saturation
capitan.Hook(pipz.SignalWorkerPoolSaturated, alertOnSaturation)

See WorkerPool reference for details.

Retry

Emits signals for retry attempts and exhaustion:

var retryProcessor = pipz.NewRetry(
    pipz.NewIdentity("api-retry", "Retries failed API calls"),
    apiProcessor,
    3,  // maxAttempts
)

// Hook to track retry exhaustion
capitan.Hook(pipz.SignalRetryExhausted, func(ctx context.Context, e *capitan.Event) {
    name, _ := pipz.FieldName.From(e)
    err, _ := pipz.FieldError.From(e)
    log.Printf("ALERT: Retry exhausted for %s: %s", name, err)
})

See Retry reference for details.

Fallback

Emits signals when attempting fallback processors:

var fallbackChain = pipz.NewFallback(
    pipz.NewIdentity("payment-fallback", "Payment processing with fallback"),
    stripeProcessor,
    paypalProcessor,
    squareProcessor,
)

// Hook to track when fallbacks are used
capitan.Hook(pipz.SignalFallbackAttempt, func(ctx context.Context, e *capitan.Event) {
    name, _ := pipz.FieldName.From(e)
    procName, _ := pipz.FieldProcessorName.From(e)
    index, _ := pipz.FieldProcessorIndex.From(e)

    if index > 0 {
        log.Printf("WARNING: Using fallback processor %s[%d]: %s", name, index, procName)
    }
})

See Fallback reference for details.

Timeout

Emits signals when operations exceed timeout duration:

var apiTimeout = pipz.NewTimeout(
    pipz.NewIdentity("api-timeout", "Enforces API call timeout"),
    apiProcessor,
    5 * time.Second,
)

// Hook to track timeout events
capitan.Hook(pipz.SignalTimeoutTriggered, func(ctx context.Context, e *capitan.Event) {
    name, _ := pipz.FieldName.From(e)
    duration, _ := pipz.FieldDuration.From(e)
    log.Printf("ALERT: Operation %s timed out after %.2fs", name, duration)
})

See Timeout reference for details.

Backoff

Emits signals when entering exponential backoff delays:

var backoffProcessor = pipz.NewBackoff(
    pipz.NewIdentity("api-backoff", "API calls with exponential backoff"),
    apiProcessor,
    5,                  // maxAttempts
    1 * time.Second,    // baseDelay
)

// Hook to track backoff behavior
capitan.Hook(pipz.SignalBackoffWaiting, func(ctx context.Context, e *capitan.Event) {
    name, _ := pipz.FieldName.From(e)
    attempt, _ := pipz.FieldAttempt.From(e)
    delay, _ := pipz.FieldDelay.From(e)
    log.Printf("WARNING: %s backing off on attempt %d, waiting %.2fs", name, attempt, delay)
})

See Backoff reference for details.

Testing with Hooks

Sync Mode (v0.0.2+)

Use WithSyncMode() for deterministic testing without timing dependencies:

func TestCircuitBreakerHooks(t *testing.T) {
    // Configure with sync mode before first use
    capitan.Configure(capitan.WithSyncMode())

    var opened bool

    capitan.Hook(pipz.SignalCircuitBreakerOpened, func(ctx context.Context, e *capitan.Event) {
        opened = true
    })

    // Trigger circuit opening...

    // No waiting needed - sync mode processes immediately
    if !opened {
        t.Error("circuit should have opened")
    }

    capitan.Shutdown()
}

Important: Configure() must be called before any other capitan operations. In tests, each test function should use a fresh process or the default instance will already be initialized.

Async Mode

For testing async behavior:

func TestCircuitBreakerHooks(t *testing.T) {
    var opened bool
    var mu sync.Mutex

    capitan.Hook(pipz.SignalCircuitBreakerOpened, func(ctx context.Context, e *capitan.Event) {
        mu.Lock()
        opened = true
        mu.Unlock()
    })

    // Trigger circuit opening...

    // Wait for async processing
    time.Sleep(50 * time.Millisecond)

    mu.Lock()
    if !opened {
        t.Error("circuit should have opened")
    }
    mu.Unlock()

    capitan.Shutdown()
}

For production code, hooks are for observability, not control flow.

Further Reading