Clone Implementation Guide
Understanding and implementing the Clone() method correctly is critical for safe concurrent processing in pipz pipelines.
Why Clone() Matters
When using concurrent connectors (Concurrent, Race, Contest), pipz creates independent copies of your data for each parallel processor. This isolation prevents data races and ensures predictable behavior. Without proper cloning, concurrent processors can corrupt each other's data, leading to subtle bugs that are difficult to debug.
The Cloner Interface
type Cloner[T any] interface {
Clone() T
}
Your data types must implement this interface to work with concurrent connectors.
Implementation Patterns
Pattern 1: Simple Value Types
For types containing only value fields (no pointers, slices, or maps):
type Config struct {
MaxRetries int
Timeout time.Duration
Enabled bool
}
// Simple copy is sufficient for value types
func (c Config) Clone() Config {
return c // All fields are copied by value
}
Pattern 2: Types with Slices
Slices share underlying arrays, so they must be deep copied:
❌ WRONG: Shallow copy shares slice memory
func (o Order) Clone() Order {
return Order{
ID: o.ID,
Items: o.Items, // DANGER: Shares underlying array!
}
}
// Concurrent processors will see each other's modifications!
✅ RIGHT: Deep copy creates independent slice
func (o Order) Clone() Order {
// Create new slice with same capacity for efficiency
items := make([]Item, len(o.Items))
copy(items, o.Items)
return Order{
ID: o.ID,
Items: items, // Independent copy
}
}
Pattern 3: Types with Maps
Maps are reference types and must be copied explicitly:
❌ WRONG: Shallow copy shares map reference
func (r Request) Clone() Request {
return Request{
ID: r.ID,
Headers: r.Headers, // DANGER: Same map instance!
}
}
✅ RIGHT: Deep copy creates independent map
func (r Request) Clone() Request {
// Create new map with same capacity
headers := make(map[string]string, len(r.Headers))
for k, v := range r.Headers {
headers[k] = v
}
return Request{
ID: r.ID,
Headers: headers, // Independent copy
}
}
Pattern 4: Types with Pointers
Pointers require careful consideration - decide whether to share or copy the pointed-to value:
type Document struct {
ID string
Content string
Metadata *Metadata // Pointer field
}
// Option 1: Share the pointed-to value (if immutable)
func (d Document) Clone() Document {
return Document{
ID: d.ID,
Content: d.Content,
Metadata: d.Metadata, // Shares same Metadata instance
}
}
// Option 2: Deep copy the pointed-to value (if mutable)
func (d Document) Clone() Document {
var metadata *Metadata
if d.Metadata != nil {
// Create independent copy
metaCopy := *d.Metadata
metadata = &metaCopy
}
return Document{
ID: d.ID,
Content: d.Content,
Metadata: metadata, // Independent copy
}
}
Pattern 5: Nested Structures
For complex nested structures, implement Clone() recursively:
type Order struct {
ID string
Customer Customer
Items []OrderItem
Metadata map[string]any
}
type Customer struct {
ID string
Name string
Addresses []Address
}
type OrderItem struct {
ProductID string
Quantity int
Options map[string]string
}
// Comprehensive deep clone
func (o Order) Clone() Order {
// Clone nested struct (if it has reference types)
customer := o.Customer.Clone()
// Clone slice of structs
items := make([]OrderItem, len(o.Items))
for i, item := range o.Items {
items[i] = item.Clone()
}
// Clone map with interface{} values
metadata := make(map[string]any, len(o.Metadata))
for k, v := range o.Metadata {
// Handle different value types
switch val := v.(type) {
case []byte:
// Deep copy byte slices
b := make([]byte, len(val))
copy(b, val)
metadata[k] = b
default:
// Copy other values directly
metadata[k] = val
}
}
return Order{
ID: o.ID,
Customer: customer,
Items: items,
Metadata: metadata,
}
}
func (c Customer) Clone() Customer {
addresses := make([]Address, len(c.Addresses))
copy(addresses, c.Addresses)
return Customer{
ID: c.ID,
Name: c.Name,
Addresses: addresses,
}
}
func (i OrderItem) Clone() OrderItem {
options := make(map[string]string, len(i.Options))
for k, v := range i.Options {
options[k] = v
}
return OrderItem{
ProductID: i.ProductID,
Quantity: i.Quantity,
Options: options,
}
}
Testing Clone Implementations
Test 1: Independence Test
Verify that modifications to the clone don't affect the original:
func TestCloneIndependence(t *testing.T) {
original := Order{
ID: "order-1",
Items: []Item{{ProductID: "prod-1", Quantity: 1}},
Metadata: map[string]any{
"priority": "high",
},
}
// Create clone
cloned := original.Clone()
// Modify clone
cloned.Items[0].Quantity = 5
cloned.Metadata["priority"] = "low"
// Verify original is unchanged
if original.Items[0].Quantity != 1 {
t.Error("Clone modification affected original slice")
}
if original.Metadata["priority"] != "high" {
t.Error("Clone modification affected original map")
}
}
Test 2: Race Condition Detection
Use Go's race detector to catch sharing issues:
func TestCloneConcurrency(t *testing.T) {
// Run with: go test -race
original := Order{
ID: "order-1",
Items: []Item{{ProductID: "prod-1", Quantity: 1}},
}
// Simulate concurrent processing
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
// Each goroutine gets its own clone
clone := original.Clone()
// Modify clone independently
clone.Items[0].Quantity = n
// Process...
time.Sleep(10 * time.Millisecond)
// Verify our modifications
if clone.Items[0].Quantity != n {
t.Errorf("Unexpected quantity: got %d, want %d",
clone.Items[0].Quantity, n)
}
}(i)
}
wg.Wait()
}
Test 3: Benchmark Clone Performance
Measure the overhead of cloning:
func BenchmarkClone(b *testing.B) {
order := Order{
ID: "order-1",
Items: make([]Item, 100),
Metadata: make(map[string]any, 50),
}
// Initialize test data
for i := range order.Items {
order.Items[i] = Item{
ProductID: fmt.Sprintf("prod-%d", i),
Quantity: i,
}
}
for i := 0; i < 50; i++ {
order.Metadata[fmt.Sprintf("key-%d", i)] = i
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = order.Clone()
}
}
Common Pitfalls
Pitfall 1: Forgetting Nested Slices
type Report struct {
Sections []Section
}
type Section struct {
Title string
Data []byte // Easy to miss!
}
❌ **WRONG: Nested slices not cloned**
func (r Report) Clone() Report {
sections := make([]Section, len(r.Sections))
copy(sections, r.Sections) // Shallow copy of structs!
return Report{Sections: sections}
}
✅ **RIGHT: Clone nested slices too**
func (r Report) Clone() Report {
sections := make([]Section, len(r.Sections))
for i, s := range r.Sections {
data := make([]byte, len(s.Data))
copy(data, s.Data)
sections[i] = Section{
Title: s.Title,
Data: data,
}
}
return Report{Sections: sections}
}
Pitfall 2: Shared Channel References
Channels should typically not be cloned:
type Worker struct {
ID string
Results chan Result // Channels are for communication
}
func (w Worker) Clone() Worker {
return Worker{
ID: w.ID,
Results: w.Results, // Share the channel - usually correct
}
}
Pitfall 3: Time and Sync Types
Some standard library types have special considerations:
type Task struct {
ID string
StartTime time.Time // Value type, safe to copy
mu sync.Mutex // NEVER copy a mutex!
data []byte
}
func (t *Task) Clone() Task {
// Note: Returns value, not pointer
t.mu.Lock()
defer t.mu.Unlock()
data := make([]byte, len(t.data))
copy(data, t.data)
return Task{
ID: t.ID,
StartTime: t.StartTime,
// mu: zero value (new mutex)
data: data,
}
}
Performance Considerations
Memory Allocation
Deep cloning allocates new memory. Consider the trade-offs:
// Lightweight clone for mostly-immutable data
func (d Document) CloneLightweight() Document {
// Only clone what might be modified
return Document{
ID: d.ID,
Metadata: d.Metadata, // Share if read-only
Content: d.Content, // Share large immutable data
Tags: cloneSlice(d.Tags), // Clone mutable slice
}
}
// Full deep clone for complete isolation
func (d Document) CloneDeep() Document {
// Clone everything for total independence
content := make([]byte, len(d.Content))
copy(content, d.Content)
return Document{
ID: d.ID,
Metadata: cloneMap(d.Metadata),
Content: content,
Tags: cloneSlice(d.Tags),
}
}
Clone Pools
For high-frequency cloning, consider object pools:
var orderPool = sync.Pool{
New: func() any {
return &Order{
Items: make([]Item, 0, 10), // Pre-allocate capacity
Metadata: make(map[string]any, 5),
}
},
}
func (o Order) CloneWithPool() Order {
// Get pooled object
clone := orderPool.Get().(*Order)
// Reset and populate
clone.ID = o.ID
clone.Items = clone.Items[:0] // Reuse slice backing
clone.Items = append(clone.Items, o.Items...)
// Clear and repopulate map
for k := range clone.Metadata {
delete(clone.Metadata, k)
}
for k, v := range o.Metadata {
clone.Metadata[k] = v
}
return *clone
}
When Clone() Errors Occur
If you see panics or race conditions with concurrent connectors, check:
- Missing Clone() implementation: Type doesn't implement Cloner interface
- Shallow copies: Slices/maps are being shared between goroutines
- Pointer fields: Pointed-to values are being modified concurrently
- Interface fields: Concrete types in interface{} fields need deep copying
Debugging Clone Issues
Enable race detection during development:
# Run tests with race detector
go test -race ./...
# Run your application with race detector
go run -race main.go
Common race detector output indicating clone issues:
WARNING: DATA RACE
Write at 0x00c000180010 by goroutine 7:
main.processOrder()
/path/to/file.go:45 +0x64
Previous write at 0x00c000180010 by goroutine 6:
main.processOrder()
/path/to/file.go:45 +0x64
This indicates shared memory between goroutines - your Clone() is likely shallow copying.
Best Practices Summary
- Always deep copy reference types (slices, maps, pointers)
- Test with -race flag during development
- Benchmark Clone() performance for hot paths
- Document sharing decisions when intentionally sharing data
- Consider immutability to avoid cloning altogether
- Use code generation for complex types (see tools like
deepcopy-gen)
Example: Production-Ready Clone
Here's a complete example following all best practices:
package main
import (
"sync"
"time"
)
type Order struct {
// Immutable fields (safe to share)
ID string
CreatedAt time.Time
// Mutable value fields (copied by value)
Status string
Total float64
// Reference types (need deep copy)
Items []OrderItem
Tags []string
Attributes map[string]string
// Pointer fields (decision needed)
Customer *Customer
// Never copy
mu sync.RWMutex
}
func (o Order) Clone() Order {
// Deep copy slices
items := make([]OrderItem, len(o.Items))
for i, item := range o.Items {
items[i] = item.Clone()
}
tags := make([]string, len(o.Tags))
copy(tags, o.Tags)
// Deep copy map
attributes := make(map[string]string, len(o.Attributes))
for k, v := range o.Attributes {
attributes[k] = v
}
// Deep copy pointer if needed
var customer *Customer
if o.Customer != nil {
custCopy := o.Customer.Clone()
customer = &custCopy
}
return Order{
// Immutable fields
ID: o.ID,
CreatedAt: o.CreatedAt,
// Value fields
Status: o.Status,
Total: o.Total,
// Deep copied references
Items: items,
Tags: tags,
Attributes: attributes,
Customer: customer,
// mu gets zero value (new mutex)
}
}
// Helper for OrderItem
func (i OrderItem) Clone() OrderItem {
// Implement based on OrderItem structure
return i
}
// Helper for Customer
func (c Customer) Clone() Customer {
// Implement based on Customer structure
return c
}
This implementation ensures complete isolation between concurrent processors while maintaining good performance characteristics.