zoobzio December 12, 2025 Edit this page

Connector Selection Guide

Quick decisions for choosing the right connector.

Decision Tree

What do you need?
│
├─ Sequential processing? → Sequence
│
├─ Parallel processing?
│   ├─ Need all results? → Concurrent
│   ├─ Bounded parallelism? → WorkerPool
│   ├─ Fire and forget? → Scaffold
│   ├─ Need fastest? → Race
│   └─ Need best match? → Contest
│
├─ Conditional routing? → Switch
│
├─ Error handling?
│   ├─ Have fallback? → Fallback
│   └─ Transient errors? → Retry
│
└─ Resilience?
    ├─ Prevent cascading failures? → CircuitBreaker
    ├─ Control throughput? → RateLimiter
    └─ Bound execution time? → Timeout

Connector Comparison Matrix

┌────────────────┬──────────┬────────────┬──────────┬─────────────────────┐
│   Connector    │ Parallel │ All Run?   │ Returns  │ Primary Use Case    │
├────────────────┼──────────┼────────────┼──────────┼─────────────────────┤
│ Sequence       │    No    │ Until fail │ Last     │ Step-by-step flow   │
│ Concurrent     │   Yes    │    Yes     │ Original │ Side effects        │
│ WorkerPool     │   Yes*   │    Yes     │ Original │ Bounded parallelism │
│ Scaffold       │   Yes    │    Yes     │ Original │ Fire-and-forget     │
│ Race           │   Yes    │ First wins │ First    │ Fastest response    │
│ Contest        │   Yes    │ Until pass │ Matching │ Quality threshold   │
│ Switch         │    No    │ One branch │ Selected │ Conditional routing │
│ Fallback       │    No    │ On failure │ Primary  │ Error recovery      │
│ Retry          │    No    │ Until pass │ Success  │ Transient failures  │
│ CircuitBreaker │    No    │ If closed  │ Result   │ Cascade prevention  │
│ RateLimiter    │    No    │ If allowed │ Result   │ Throughput control  │
│ Timeout        │    No    │ Time bound │ Result   │ Execution limits    │
└────────────────┴──────────┴────────────┴──────────┴─────────────────────┘

Legend:
• Parallel: Whether processors run concurrently (*WorkerPool limits concurrency)
• All Run?: Whether all processors execute or stop early
• Returns: What data is returned to caller
• Primary Use Case: Main scenario for using this connector

Problem-Solution Guide

You need to: Process data through multiple steps in order

Solution: Sequence

// Define identity upfront
var PipelineID = pipz.NewIdentity("pipeline", "Sequential processing pipeline")

pipeline := pipz.NewSequence[T](PipelineID, step1, step2, step3)

When to use:

  • Order matters
  • Each step depends on previous
  • Building up state through transformations

Don't use when:

  • Steps are independent (use Concurrent instead)

You need to: Run multiple operations in parallel

Solution: Concurrent

// Define identity upfront
var ParallelID = pipz.NewIdentity("parallel", "Executes operations in parallel")

concurrent := pipz.NewConcurrent[T](ParallelID, proc1, proc2, proc3)

When to use:

  • Operations are independent
  • Running side effects (notifications, logging)
  • Want to parallelize for performance

Requirements:

  • Type T must implement Cloner[T]

Don't use when:

  • Order matters
  • Operations depend on each other

You need to: Run parallel operations with limited resources

Solution: WorkerPool

// Define identity upfront
var LimitedID = pipz.NewIdentity("limited", "Worker pool with max 3 concurrent operations")

pool := pipz.NewWorkerPool[T](LimitedID, 3, proc1, proc2, proc3, proc4, proc5)

When to use:

  • Resource-constrained environments
  • Rate-limited external services
  • Controlled database connections
  • Preventing memory exhaustion
  • Managing CPU-intensive operations

Requirements:

  • Type T must implement Cloner[T]

Don't use when:

  • Need unbounded parallelism (use Concurrent)
  • Operations must complete in order (use Sequence)
  • Fire-and-forget semantics needed (use Scaffold)

You need to: Get the fastest result from multiple sources

Solution: Race

// Define identity upfront
var FastestID = pipz.NewIdentity("fastest", "Returns first successful result")

race := pipz.NewRace[T](FastestID, primary, backup1, backup2)

When to use:

  • Multiple sources for same data
  • Want lowest latency
  • Have fallback options

Requirements:

  • Type T must implement Cloner[T]

Don't use when:

  • Need all results
  • Sources have different costs

You need to: Find first result meeting quality criteria

Solution: Contest

// Define identity upfront
var BestID = pipz.NewIdentity("best", "Finds first result with score > 0.9")

contest := pipz.NewContest[T](BestID,
    func(ctx context.Context, result T) bool {
        return result.Score > 0.9
    },
    model1, model2, model3,
)

When to use:

  • Quality matters more than speed
  • Have multiple approaches
  • Want first acceptable result

Requirements:

  • Type T must implement Cloner[T]

You need to: Route data based on conditions

Solution: Switch

// Define identity upfront
var RouterID = pipz.NewIdentity("router", "Routes data based on premium status")

switch := pipz.NewSwitch[T](RouterID,
    func(ctx context.Context, data T) string {
        if data.Premium {
            return "premium"
        }
        return "standard"
    },
).
AddRoute("premium", premiumPipeline).
AddRoute("standard", standardPipeline)

When to use:

  • Different processing for different data types
  • Conditional logic
  • A/B testing

Don't use when:

  • All data follows same path

You need to: Recover from errors gracefully

Solution: Fallback

// Define identity upfront
var SafeID = pipz.NewIdentity("safe", "Uses fallback on error")

fallback := pipz.NewFallback[T](SafeID, riskyOperation, safeDefault)

When to use:

  • Have a safe default
  • Want graceful degradation
  • Errors are expected

Don't use when:

  • Errors should stop processing
  • No reasonable fallback exists

You need to: Retry failed operations

Solution: Retry

// Define identity upfront
var ReliableID = pipz.NewIdentity("reliable", "Retries up to 3 times on failure")

retry := pipz.NewRetry[T](ReliableID, processor, 3)

When to use:

  • Transient errors (network, temporary unavailability)
  • External service calls
  • Database operations

Don't use when:

  • Errors are permanent (validation failures)
  • No backoff needed (can overwhelm service)

You need to: Prevent cascading failures

Solution: CircuitBreaker

// Define identity upfront
var ProtectedID = pipz.NewIdentity("protected", "Circuit breaker with 5 failure threshold")

breaker := pipz.NewCircuitBreaker[T](ProtectedID, processor,
    pipz.WithCircuitBreakerThreshold(5),
    pipz.WithCircuitBreakerWindow(time.Minute),
)

When to use:

  • Calling external services
  • Protecting downstream systems
  • Failing fast is acceptable

Don't use when:

  • Every request must be attempted
  • Failures are independent

You need to: Control processing rate

Solution: RateLimiter

// Define identity upfront
var ThrottledID = pipz.NewIdentity("throttled", "Rate limited to 100 per second")

limiter := pipz.NewRateLimiter[T](ThrottledID, processor,
    pipz.WithRateLimiterRate(100),
    pipz.WithRateLimiterPeriod(time.Second),
)

When to use:

  • API rate limits
  • Resource protection
  • Cost control

Important:

  • Must use singleton instance (don't create per request)

You need to: Bound execution time

Solution: Timeout

// Define identity upfront
var BoundedID = pipz.NewIdentity("bounded", "Times out after 5 seconds")

timeout := pipz.NewTimeout[T](BoundedID, processor, 5*time.Second)

When to use:

  • Network operations
  • User-facing APIs
  • SLA requirements

Don't use when:

  • Operations must complete
  • Time is unpredictable

Quick Comparison

ConnectorParallel?Can Fail?Needs Clone?Stateful?
SequenceNoYesNoNo
ConcurrentYesYesYesNo
WorkerPoolLimitedYesYesNo
ScaffoldYesNo**YesNo
RaceYesYesYesNo
ContestYesYesYesNo
SwitchNoYesNoNo
FallbackNoNo*NoNo
RetryNoYesNoYes
CircuitBreakerNoYesNoYes
RateLimiterNoYesNoYes
TimeoutNoYesNoNo

*Fallback always returns a value (uses fallback on error) **Scaffold errors are not reported back

Common Combinations

Resilient External API Call

// Define identities upfront
var (
    RateID    = pipz.NewIdentity("rate", "Rate limit external API calls")
    BreakerID = pipz.NewIdentity("breaker", "Circuit breaker for API protection")
    TimeoutID = pipz.NewIdentity("timeout", "5 second timeout for API calls")
    RetryID   = pipz.NewIdentity("retry", "Retry API calls up to 3 times")
)

api := pipz.NewRateLimiter(RateID,
    pipz.NewCircuitBreaker(BreakerID,
        pipz.NewTimeout(TimeoutID,
            pipz.NewRetry(RetryID, apiCall, 3),
            5*time.Second,
        ),
    ),
)

Multi-Source with Fallback

// Define identities upfront
var (
    FetchID   = pipz.NewIdentity("fetch", "Fetch with fallback to static default")
    SourcesID = pipz.NewIdentity("sources", "Race between primary and secondary sources")
)

fetch := pipz.NewFallback(FetchID,
    pipz.NewRace[T](SourcesID, primary, secondary),
    staticDefault,
)

Conditional Parallel Processing

// Define identities upfront
var (
    RouterID     = pipz.NewIdentity("router", "Routes to batch or sequential processing")
    BatchID      = pipz.NewIdentity("batch", "Batch parallel processing")
    SequentialID = pipz.NewIdentity("seq", "Sequential processing")
)

router := pipz.NewSwitch[T](RouterID, routeFunc).
    AddRoute("batch", pipz.NewConcurrent[T](BatchID, processors...)).
    AddRoute("sequential", pipz.NewSequence[T](SequentialID, processors...))

Resource-Constrained Processing

// Define identities upfront
var (
    APILimitedID = pipz.NewIdentity("api-limited", "Worker pool limited to 5 concurrent API calls")
    ServiceAID   = pipz.NewIdentity("service-a", "Calls service A")
    ServiceBID   = pipz.NewIdentity("service-b", "Calls service B")
    ServiceCID   = pipz.NewIdentity("service-c", "Calls service C")
    ServiceDID   = pipz.NewIdentity("service-d", "Calls service D")
    ServiceEID   = pipz.NewIdentity("service-e", "Calls service E")
    ServiceFID   = pipz.NewIdentity("service-f", "Calls service F")
)

// Limit concurrent API calls to avoid rate limits
apiCalls := pipz.NewWorkerPool[T](APILimitedID, 5,
    pipz.Apply(ServiceAID, callServiceA),
    pipz.Apply(ServiceBID, callServiceB),
    pipz.Apply(ServiceCID, callServiceC),
    pipz.Apply(ServiceDID, callServiceD),
    pipz.Apply(ServiceEID, callServiceE),
    pipz.Apply(ServiceFID, callServiceF),
    // Only 5 will run concurrently
)