zoobzio December 12, 2025 Edit this page

Recipe: Extensible Application Vocabulary

When you fix pipz.Chainable[T] to a domain type, you create an extensible vocabulary — a set of composable primitives where library code and user code are indistinguishable.

The Pattern

This recipe demonstrates how cogito (an LLM reasoning framework) creates a vocabulary of reasoning primitives that users can extend with their own.

Core Concept

// Library fixes the generic to a domain type
type Thought struct { /* reasoning context */ }

// Library provides primitives — all implement Chainable[*Thought]
func NewDecide(key, question string) *Decide { ... }
func NewAnalyze[T any](key, prompt string) *Analyze[T] { ... }
func NewCategorize(key, question string, categories []string) *Categorize { ... }

// Users implement custom primitives — same interface, first-class citizen
type MyCustomStep struct { ... }
func (m *MyCustomStep) Process(ctx context.Context, t *Thought) (*Thought, error) { ... }
func (m *MyCustomStep) Identity() pipz.Identity { ... }
func (m *MyCustomStep) Schema() pipz.Node { ... }
func (m *MyCustomStep) Close() error { ... }

// Everything composes — library and user code, indistinguishable
pipeline := cogito.Sequence("my-flow",
    cogito.NewAnalyze[Data]("parse", "extract fields"),
    &MyCustomStep{...},  // User's custom primitive
    cogito.NewDecide("approve", "should we approve?"),
)

Implementation

Define Your Domain Type

// Thought is the reasoning context passed through the pipeline
type Thought struct {
    ID      string
    Intent  string
    notes   []Note  // Accumulated reasoning history
    Session *Session // LLM conversation state
}

// Implement Cloner for parallel processing
func (t *Thought) Clone() *Thought {
    clone := &Thought{
        ID:      uuid.New().String(),
        Intent:  t.Intent,
        notes:   make([]Note, len(t.notes)),
        Session: t.Session.Clone(),
    }
    copy(clone.notes, t.notes)
    return clone
}

Create Library Primitives

Each primitive implements pipz.Chainable[*Thought]:

// Decide — binary yes/no decision
type Decide struct {
    identity pipz.Identity
    key      string
    question string
}

func NewDecide(key, question string) *Decide {
    return &Decide{
        identity: pipz.NewIdentity(key, "Binary decision: "+question),
        key:      key,
        question: question,
    }
}

func (d *Decide) Process(ctx context.Context, t *Thought) (*Thought, error) {
    // Get context from thought's notes
    context := t.RenderContext()

    // Make LLM decision
    result, err := d.synapse.Fire(ctx, t.Session, context)
    if err != nil {
        return t, err
    }

    // Record decision in thought
    t.AddNote(d.key, result.Answer, "decide")
    return t, nil
}

func (d *Decide) Identity() pipz.Identity { return d.identity }
func (d *Decide) Schema() pipz.Node {
    return pipz.Node{Identity: d.identity, Type: "decide"}
}
func (d *Decide) Close() error { return nil }
// Analyze — extract structured data
type Analyze[T any] struct {
    identity pipz.Identity
    key      string
    prompt   string
}

func NewAnalyze[T any](key, prompt string) *Analyze[T] {
    return &Analyze[T]{
        identity: pipz.NewIdentity(key, "Extraction: "+prompt),
        key:      key,
        prompt:   prompt,
    }
}

func (a *Analyze[T]) Process(ctx context.Context, t *Thought) (*Thought, error) {
    context := t.RenderContext()

    var result T
    if err := a.synapse.Extract(ctx, t.Session, context, &result); err != nil {
        return t, err
    }

    t.AddNote(a.key, result, "analyze")
    return t, nil
}

func (a *Analyze[T]) Identity() pipz.Identity { return a.identity }
func (a *Analyze[T]) Schema() pipz.Node {
    return pipz.Node{Identity: a.identity, Type: "analyze"}
}
func (a *Analyze[T]) Close() error { return nil }

Provide Composition Helpers

Wrap pipz connectors with domain-specific names:

// Sequence executes steps in order
func Sequence(name string, steps ...pipz.Chainable[*Thought]) *pipz.Sequence[*Thought] {
    id := pipz.NewIdentity(name, "Sequential reasoning chain")
    return pipz.NewSequence(id, steps...)
}

// Converge runs steps in parallel and synthesizes results
func Converge(name, synthesisPrompt string, steps ...pipz.Chainable[*Thought]) pipz.Chainable[*Thought] {
    id := pipz.NewIdentity(name, "Parallel reasoning with synthesis")
    return pipz.NewConcurrent(id, steps...).WithReducer(synthesize(synthesisPrompt))
}

// Filter conditionally executes a step
func Filter(name string, predicate func(*Thought) bool, step pipz.Chainable[*Thought]) pipz.Chainable[*Thought] {
    id := pipz.NewIdentity(name, "Conditional reasoning step")
    return pipz.NewFilter(id, predicate, step)
}

// Resilience wrappers
func Retry(name string, step pipz.Chainable[*Thought], attempts int) pipz.Chainable[*Thought] {
    id := pipz.NewIdentity(name, "Retry reasoning step")
    return pipz.NewRetry(id, step, attempts)
}

func Timeout(name string, step pipz.Chainable[*Thought], duration time.Duration) pipz.Chainable[*Thought] {
    id := pipz.NewIdentity(name, "Timeout reasoning step")
    return pipz.NewTimeout(id, step, duration)
}

Users Extend the Vocabulary

Users create custom primitives that compose with library primitives:

// User's domain-specific reasoning step
type RiskAssessment struct {
    identity   pipz.Identity
    key        string
    riskEngine RiskEngine
}

func NewRiskAssessment(key string, engine RiskEngine) *RiskAssessment {
    return &RiskAssessment{
        identity:   pipz.NewIdentity(key, "Assesses risk using custom engine"),
        key:        key,
        riskEngine: engine,
    }
}

func (r *RiskAssessment) Process(ctx context.Context, t *Thought) (*Thought, error) {
    // Extract data from thought
    data := t.GetNote("parsed-data")

    // Use custom risk engine
    score, err := r.riskEngine.Evaluate(ctx, data)
    if err != nil {
        return t, err
    }

    // Record in thought like any library primitive
    t.AddNote(r.key, fmt.Sprintf("Risk score: %.2f", score), "risk-assessment")
    return t, nil
}

func (r *RiskAssessment) Identity() pipz.Identity { return r.identity }
func (r *RiskAssessment) Schema() pipz.Node {
    return pipz.Node{Identity: r.identity, Type: "risk-assessment"}
}
func (r *RiskAssessment) Close() error { return nil }

Compose Library and User Code

// Library primitives and user primitives compose identically
pipeline := cogito.Sequence("loan-approval",
    // Library primitive: extract application data
    cogito.NewAnalyze[LoanApplication]("parse", "extract loan application fields"),

    // Library primitive: search for similar applications
    cogito.NewSeek("history", "similar loan applications").WithLimit(5),

    // User primitive: custom risk assessment
    NewRiskAssessment("risk", myRiskEngine),

    // Library primitive: make decision
    cogito.NewDecide("approve", "should we approve this loan?"),

    // Conditional: only if approved
    cogito.Filter("if-approved",
        func(t *cogito.Thought) bool {
            return t.GetNote("approve") == "yes"
        },
        cogito.Sequence("post-approval",
            cogito.NewAnalyze[Terms]("terms", "generate loan terms"),
            &NotifyApplicant{...}, // Another user primitive
        ),
    ),
)

Real-World Example: cogito

cogito provides 15+ reasoning primitives:

CategoryPrimitives
DecisionDecide, Categorize, Assess, Prioritize
ExtractionAnalyze
MemorySeek, Survey, Recall, Reflect
Control FlowSift (LLM gate), Discern (LLM router)
SynthesisConverge, Amplify
SessionCompress, Truncate

Users extend this vocabulary with domain-specific reasoning steps. A legal document analyzer might add:

pipeline := cogito.Sequence("contract-review",
    cogito.NewAnalyze[Contract]("parse", "extract contract clauses"),
    &ClauseRiskScorer{...},        // User: score each clause
    &RegulatoryChecker{...},       // User: check compliance
    cogito.NewCategorize("type", "contract type", contractTypes),
    cogito.NewDecide("flag", "does this need legal review?"),
)

Benefits

  1. No Privileged Code — Library primitives have no special status; user primitives are first-class
  2. Uniform Interface — Everything is Chainable[*Thought]
  3. Full Composability — Sequence, parallel, conditional, resilience — all work with any primitive
  4. Observable — All primitives emit signals through the same mechanism
  5. Schema Introspection — Pipeline structure is discoverable at runtime

The Key Insight

When you fix Chainable[T] to your domain type, you're not just using pipz — you're creating a domain-specific language for composing operations on that type. The library provides vocabulary; users extend it.

pipz.Chainable[T]  →  fix T to *Thought  →  Reasoning vocabulary
                  →  fix T to *File     →  File processing vocabulary
                  →  fix T to *Request  →  API handling vocabulary

Each vocabulary inherits pipz's composition, resilience, and observability — but speaks in domain terms.

See Also