Recipe: Extensible Application Vocabulary
When you fix pipz.Chainable[T] to a domain type, you create an extensible vocabulary — a set of composable primitives where library code and user code are indistinguishable.
The Pattern
This recipe demonstrates how cogito (an LLM reasoning framework) creates a vocabulary of reasoning primitives that users can extend with their own.
Core Concept
// Library fixes the generic to a domain type
type Thought struct { /* reasoning context */ }
// Library provides primitives — all implement Chainable[*Thought]
func NewDecide(key, question string) *Decide { ... }
func NewAnalyze[T any](key, prompt string) *Analyze[T] { ... }
func NewCategorize(key, question string, categories []string) *Categorize { ... }
// Users implement custom primitives — same interface, first-class citizen
type MyCustomStep struct { ... }
func (m *MyCustomStep) Process(ctx context.Context, t *Thought) (*Thought, error) { ... }
func (m *MyCustomStep) Identity() pipz.Identity { ... }
func (m *MyCustomStep) Schema() pipz.Node { ... }
func (m *MyCustomStep) Close() error { ... }
// Everything composes — library and user code, indistinguishable
pipeline := cogito.Sequence("my-flow",
cogito.NewAnalyze[Data]("parse", "extract fields"),
&MyCustomStep{...}, // User's custom primitive
cogito.NewDecide("approve", "should we approve?"),
)
Implementation
Define Your Domain Type
// Thought is the reasoning context passed through the pipeline
type Thought struct {
ID string
Intent string
notes []Note // Accumulated reasoning history
Session *Session // LLM conversation state
}
// Implement Cloner for parallel processing
func (t *Thought) Clone() *Thought {
clone := &Thought{
ID: uuid.New().String(),
Intent: t.Intent,
notes: make([]Note, len(t.notes)),
Session: t.Session.Clone(),
}
copy(clone.notes, t.notes)
return clone
}
Create Library Primitives
Each primitive implements pipz.Chainable[*Thought]:
// Decide — binary yes/no decision
type Decide struct {
identity pipz.Identity
key string
question string
}
func NewDecide(key, question string) *Decide {
return &Decide{
identity: pipz.NewIdentity(key, "Binary decision: "+question),
key: key,
question: question,
}
}
func (d *Decide) Process(ctx context.Context, t *Thought) (*Thought, error) {
// Get context from thought's notes
context := t.RenderContext()
// Make LLM decision
result, err := d.synapse.Fire(ctx, t.Session, context)
if err != nil {
return t, err
}
// Record decision in thought
t.AddNote(d.key, result.Answer, "decide")
return t, nil
}
func (d *Decide) Identity() pipz.Identity { return d.identity }
func (d *Decide) Schema() pipz.Node {
return pipz.Node{Identity: d.identity, Type: "decide"}
}
func (d *Decide) Close() error { return nil }
// Analyze — extract structured data
type Analyze[T any] struct {
identity pipz.Identity
key string
prompt string
}
func NewAnalyze[T any](key, prompt string) *Analyze[T] {
return &Analyze[T]{
identity: pipz.NewIdentity(key, "Extraction: "+prompt),
key: key,
prompt: prompt,
}
}
func (a *Analyze[T]) Process(ctx context.Context, t *Thought) (*Thought, error) {
context := t.RenderContext()
var result T
if err := a.synapse.Extract(ctx, t.Session, context, &result); err != nil {
return t, err
}
t.AddNote(a.key, result, "analyze")
return t, nil
}
func (a *Analyze[T]) Identity() pipz.Identity { return a.identity }
func (a *Analyze[T]) Schema() pipz.Node {
return pipz.Node{Identity: a.identity, Type: "analyze"}
}
func (a *Analyze[T]) Close() error { return nil }
Provide Composition Helpers
Wrap pipz connectors with domain-specific names:
// Sequence executes steps in order
func Sequence(name string, steps ...pipz.Chainable[*Thought]) *pipz.Sequence[*Thought] {
id := pipz.NewIdentity(name, "Sequential reasoning chain")
return pipz.NewSequence(id, steps...)
}
// Converge runs steps in parallel and synthesizes results
func Converge(name, synthesisPrompt string, steps ...pipz.Chainable[*Thought]) pipz.Chainable[*Thought] {
id := pipz.NewIdentity(name, "Parallel reasoning with synthesis")
return pipz.NewConcurrent(id, steps...).WithReducer(synthesize(synthesisPrompt))
}
// Filter conditionally executes a step
func Filter(name string, predicate func(*Thought) bool, step pipz.Chainable[*Thought]) pipz.Chainable[*Thought] {
id := pipz.NewIdentity(name, "Conditional reasoning step")
return pipz.NewFilter(id, predicate, step)
}
// Resilience wrappers
func Retry(name string, step pipz.Chainable[*Thought], attempts int) pipz.Chainable[*Thought] {
id := pipz.NewIdentity(name, "Retry reasoning step")
return pipz.NewRetry(id, step, attempts)
}
func Timeout(name string, step pipz.Chainable[*Thought], duration time.Duration) pipz.Chainable[*Thought] {
id := pipz.NewIdentity(name, "Timeout reasoning step")
return pipz.NewTimeout(id, step, duration)
}
Users Extend the Vocabulary
Users create custom primitives that compose with library primitives:
// User's domain-specific reasoning step
type RiskAssessment struct {
identity pipz.Identity
key string
riskEngine RiskEngine
}
func NewRiskAssessment(key string, engine RiskEngine) *RiskAssessment {
return &RiskAssessment{
identity: pipz.NewIdentity(key, "Assesses risk using custom engine"),
key: key,
riskEngine: engine,
}
}
func (r *RiskAssessment) Process(ctx context.Context, t *Thought) (*Thought, error) {
// Extract data from thought
data := t.GetNote("parsed-data")
// Use custom risk engine
score, err := r.riskEngine.Evaluate(ctx, data)
if err != nil {
return t, err
}
// Record in thought like any library primitive
t.AddNote(r.key, fmt.Sprintf("Risk score: %.2f", score), "risk-assessment")
return t, nil
}
func (r *RiskAssessment) Identity() pipz.Identity { return r.identity }
func (r *RiskAssessment) Schema() pipz.Node {
return pipz.Node{Identity: r.identity, Type: "risk-assessment"}
}
func (r *RiskAssessment) Close() error { return nil }
Compose Library and User Code
// Library primitives and user primitives compose identically
pipeline := cogito.Sequence("loan-approval",
// Library primitive: extract application data
cogito.NewAnalyze[LoanApplication]("parse", "extract loan application fields"),
// Library primitive: search for similar applications
cogito.NewSeek("history", "similar loan applications").WithLimit(5),
// User primitive: custom risk assessment
NewRiskAssessment("risk", myRiskEngine),
// Library primitive: make decision
cogito.NewDecide("approve", "should we approve this loan?"),
// Conditional: only if approved
cogito.Filter("if-approved",
func(t *cogito.Thought) bool {
return t.GetNote("approve") == "yes"
},
cogito.Sequence("post-approval",
cogito.NewAnalyze[Terms]("terms", "generate loan terms"),
&NotifyApplicant{...}, // Another user primitive
),
),
)
Real-World Example: cogito
cogito provides 15+ reasoning primitives:
| Category | Primitives |
|---|---|
| Decision | Decide, Categorize, Assess, Prioritize |
| Extraction | Analyze |
| Memory | Seek, Survey, Recall, Reflect |
| Control Flow | Sift (LLM gate), Discern (LLM router) |
| Synthesis | Converge, Amplify |
| Session | Compress, Truncate |
Users extend this vocabulary with domain-specific reasoning steps. A legal document analyzer might add:
pipeline := cogito.Sequence("contract-review",
cogito.NewAnalyze[Contract]("parse", "extract contract clauses"),
&ClauseRiskScorer{...}, // User: score each clause
&RegulatoryChecker{...}, // User: check compliance
cogito.NewCategorize("type", "contract type", contractTypes),
cogito.NewDecide("flag", "does this need legal review?"),
)
Benefits
- No Privileged Code — Library primitives have no special status; user primitives are first-class
- Uniform Interface — Everything is
Chainable[*Thought] - Full Composability — Sequence, parallel, conditional, resilience — all work with any primitive
- Observable — All primitives emit signals through the same mechanism
- Schema Introspection — Pipeline structure is discoverable at runtime
The Key Insight
When you fix Chainable[T] to your domain type, you're not just using pipz — you're creating a domain-specific language for composing operations on that type. The library provides vocabulary; users extend it.
pipz.Chainable[T] → fix T to *Thought → Reasoning vocabulary
→ fix T to *File → File processing vocabulary
→ fix T to *Request → API handling vocabulary
Each vocabulary inherits pipz's composition, resilience, and observability — but speaks in domain terms.
See Also
- cogito source — Complete implementation of this pattern
- Building Pipelines — Application-level pipeline construction
- Library Resilience — Expose resilience via With* API