var (
OrderPipelineID = pipz.NewIdentity("order-processing", "Main order flow")
InternalSeqID = pipz.NewIdentity("order-steps", "Processing sequence")
)
// Wrap any chainable with execution context
pipeline := pipz.NewPipeline(OrderPipelineID,
pipz.NewSequence(InternalSeqID, validate, enrich, save),
)
// Extract IDs in processors or signal handlers
func myProcessor(ctx context.Context, data T) (T, error) {
execID, _ := pipz.ExecutionIDFromContext(ctx) // Unique per call
pipeID, _ := pipz.PipelineIDFromContext(ctx) // Stable per pipeline
// ...
}
Clone Implementation
type Data struct {
Items []Item
Meta map[string]string
}
func (d Data) Clone() Data {
// Deep copy slices
items := make([]Item, len(d.Items))
copy(items, d.Items)
// Deep copy maps
meta := make(map[string]string, len(d.Meta))
for k, v := range d.Meta {
meta[k] = v
}
return Data{Items: items, Meta: meta}
}
Error Handling
Access Error Details
result, err := pipeline.Process(ctx, data)
if err != nil {
var pipeErr *pipz.Error[T]
if errors.As(err, &pipeErr) {
fmt.Printf("Failed at: %v\n", pipeErr.Path)
fmt.Printf("Cause: %v\n", pipeErr.Err)
fmt.Printf("Data state: %+v\n", pipeErr.InputData)
fmt.Printf("Duration: %v\n", pipeErr.Duration)
if pipeErr.Timeout {
fmt.Println("Operation timed out")
}
}
}
Handle Specific Errors
var (
RecoverErrorsID = pipz.NewIdentity("recover", "Recover from specific errors")
HandleErrorID = pipz.NewIdentity("handle", "Handle temporary errors")
)
pipz.NewHandle(RecoverErrorsID, pipeline,
pipz.Effect(HandleErrorID, func(ctx context.Context, pipeErr *pipz.Error[T]) error {
if errors.Is(pipeErr.Err, ErrTemporary) {
log.Printf("Temporary error at %v: %v", pipeErr.Path, pipeErr.Err)
// Perform cleanup or notification
}
return nil // Handler errors don't affect flow
}),
)
_, err := pipeline.Process(ctx, data)
var pipeErr *pipz.Error[T]
if errors.As(err, &pipeErr) {
// Path contains the full chain of identities, last element is where failure occurred
lastID := pipeErr.Path[len(pipeErr.Path)-1]
assert.Equal(t, "expected-stage", lastID.Name)
}
Gotchas & Tips
❌ Common Mistakes
Creating rate limiters per request
// WRONG - New instance each time
func handle(req Request) {
limiter := pipz.NewRateLimiter(...) // ❌
}