Consensus
Byzantine fault-tolerant consensus for distributed ZAP deployments
Consensus
ZAP includes an optional Byzantine Fault Tolerant (BFT) consensus layer for distributed deployments requiring strong consistency guarantees.
Overview
The consensus layer enables multiple ZAP nodes to agree on the order of operations, even in the presence of faulty or malicious nodes.
Properties
| Property | Guarantee |
|---|---|
| Safety | Correct nodes never disagree |
| Liveness | Requests eventually complete |
| Fault Tolerance | Tolerates f faults with 3f+1 nodes |
| Ordering | Total order of operations |
When to Use
Enable consensus when you need:
- State machine replication: Multiple nodes maintaining identical state
- Leader election: Automatic failover with consistency
- Distributed transactions: Cross-node atomic operations
- Audit trails: Tamper-evident operation logs
Quick Start
Enable Consensus
server := zap.NewServer(
zap.WithAddress(":9000"),
zap.WithConsensus(zap.BFT{
NodeID: "node1",
Nodes: []string{
"node1:9000",
"node2:9000",
"node3:9000",
"node4:9000",
},
Threshold: 3,
}),
)Consensus-Aware Service
type counterService struct {
count int64
}
// Replicated operation - goes through consensus
func (s *counterService) Increment(ctx context.Context) (int64, error) {
s.count++
return s.count, nil
}
// Read-only operation - can bypass consensus
func (s *counterService) Get(ctx context.Context) (int64, error) {
return s.count, nil
}
// Register with replication annotations
server.Register(&counterService{},
zap.WithReplicated("Increment"),
zap.WithReadOnly("Get"),
)BFT Protocol
PBFT Overview
ZAP uses a PBFT-based protocol with the following phases:
┌─────────────────────────────────────────────────────────────┐
│ │
│ Client Leader Replica 1 Replica 2 │
│ │ │ │ │ │
│ │──Request────▶│ │ │ │
│ │ │ │ │ │
│ │ │──Pre-prepare─▶│ │ │
│ │ │──Pre-prepare──────────────────▶ │
│ │ │ │ │ │
│ │ │◀──Prepare─────│ │ │
│ │ │◀──Prepare─────────────────────│ │
│ │ │ │ │ │
│ │ │──Prepare─────▶│ │ │
│ │ │──Prepare──────────────────────▶ │
│ │ │ │ │ │
│ │ │◀──Commit──────│ │ │
│ │ │◀──Commit──────────────────────│ │
│ │ │ │ │ │
│ │◀──Response───│ │ │ │
│ │
└─────────────────────────────────────────────────────────────┘Phases
- Pre-prepare: Leader assigns sequence number and broadcasts
- Prepare: Nodes acknowledge they received the pre-prepare
- Commit: Nodes confirm they have 2f+1 prepares
- Execute: Apply operation after 2f+1 commits
Configuration
Basic Configuration
zap.WithConsensus(zap.BFT{
// This node's identifier
NodeID: "node1",
// All nodes in the cluster
Nodes: []string{
"node1:9000",
"node2:9000",
"node3:9000",
"node4:9000",
},
// Minimum nodes for agreement (2f+1 for f faults)
Threshold: 3,
})Advanced Configuration
zap.WithConsensus(zap.BFT{
NodeID: "node1",
Nodes: nodes,
Threshold: 3,
// Timeouts
RoundTimeout: 500 * time.Millisecond,
ElectionTimeout: 2 * time.Second,
// Batching for throughput
BatchSize: 100,
BatchTimeout: 10 * time.Millisecond,
// Persistence
WALPath: "/var/lib/zap/wal",
// Snapshots
SnapshotInterval: 10000, // Operations between snapshots
SnapshotPath: "/var/lib/zap/snapshots",
})Cluster Sizing
| Cluster Size | Fault Tolerance | Threshold |
|---|---|---|
| 4 nodes | 1 Byzantine fault | 3 |
| 7 nodes | 2 Byzantine faults | 5 |
| 10 nodes | 3 Byzantine faults | 7 |
| 3f+1 nodes | f Byzantine faults | 2f+1 |
Leader Election
Automatic Leader Selection
ZAP automatically elects a leader:
server := zap.NewServer(
zap.WithConsensus(zap.BFT{
// ...
LeaderElection: zap.AutoElection{
Timeout: 2 * time.Second,
},
}),
)Fixed Leader (Testing)
For testing, specify a fixed leader:
zap.WithConsensus(zap.BFT{
LeaderElection: zap.FixedLeader{
NodeID: "node1",
},
})Leader Callbacks
server.OnLeaderChange(func(isLeader bool, leaderID string) {
if isLeader {
log.Println("This node is now the leader")
} else {
log.Printf("Leader changed to: %s", leaderID)
}
})State Machine Replication
Implementing StateMachine
type KVStore struct {
data map[string][]byte
mu sync.RWMutex
}
// Apply implements zap.StateMachine
func (s *KVStore) Apply(op []byte) ([]byte, error) {
var cmd Command
if err := capnp.Unmarshal(op, &cmd); err != nil {
return nil, err
}
s.mu.Lock()
defer s.mu.Unlock()
switch cmd.Type() {
case CommandTypePut:
s.data[cmd.Key()] = cmd.Value()
return nil, nil
case CommandTypeDelete:
delete(s.data, cmd.Key())
return nil, nil
default:
return nil, fmt.Errorf("unknown command: %d", cmd.Type())
}
}
// Snapshot implements zap.StateMachine
func (s *KVStore) Snapshot() ([]byte, error) {
s.mu.RLock()
defer s.mu.RUnlock()
return json.Marshal(s.data)
}
// Restore implements zap.StateMachine
func (s *KVStore) Restore(snapshot []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
return json.Unmarshal(snapshot, &s.data)
}Register State Machine
store := &KVStore{data: make(map[string][]byte)}
server := zap.NewServer(
zap.WithConsensus(zap.BFT{
StateMachine: store,
// ...
}),
)Read Strategies
Linearizable Reads
All reads go through consensus (strongest guarantee):
server := zap.NewServer(
zap.WithConsensus(zap.BFT{
ReadStrategy: zap.LinearizableReads,
}),
)Leader Reads
Reads from leader only (faster, still consistent):
zap.WithConsensus(zap.BFT{
ReadStrategy: zap.LeaderReads,
})Local Reads
Read from any node (eventual consistency):
zap.WithConsensus(zap.BFT{
ReadStrategy: zap.LocalReads,
})Per-Method Configuration
server.Register(&myService{},
zap.WithReplicated("Write"), // Goes through consensus
zap.WithLinearizable("StrongRead"), // Linearizable read
zap.WithLocalRead("EventualRead"), // Local read
)Persistence
Write-Ahead Log
zap.WithConsensus(zap.BFT{
WAL: zap.WALConfig{
Path: "/var/lib/zap/wal",
SyncMode: zap.SyncAlways, // fsync every write
MaxSize: 100 * 1024 * 1024, // 100MB per segment
},
})Snapshots
zap.WithConsensus(zap.BFT{
Snapshot: zap.SnapshotConfig{
Path: "/var/lib/zap/snapshots",
Interval: 10000, // Every 10000 operations
Retain: 3, // Keep last 3 snapshots
},
})Membership Changes
Adding a Node
// From any current member
client, _ := zap.Dial("tcp://node1:9000")
admin := zap.NewAdminClient(client)
err := admin.AddNode(ctx, "node5:9000")Removing a Node
err := admin.RemoveNode(ctx, "node3:9000")Viewing Membership
members, err := admin.ListMembers(ctx)
for _, m := range members {
fmt.Printf("%s: %s\n", m.ID, m.Status)
}Monitoring
Metrics
# Consensus latency
zap_consensus_latency_seconds{phase="prepare"}
zap_consensus_latency_seconds{phase="commit"}
# Throughput
zap_consensus_operations_total
zap_consensus_operations_per_second
# Health
zap_consensus_leader{node_id="node1"}
zap_consensus_term
zap_consensus_commit_indexHealth Checks
health, err := admin.GetHealth(ctx)
fmt.Printf("Leader: %s\n", health.Leader)
fmt.Printf("Term: %d\n", health.Term)
fmt.Printf("Commit Index: %d\n", health.CommitIndex)
fmt.Printf("Nodes: %v\n", health.Nodes)Best Practices
1. Deploy Odd Numbers
Use 3, 5, or 7 nodes to avoid ties in leader election.
2. Geographic Distribution
Distribute nodes across failure domains:
// Poor: All in one data center
nodes := []string{"dc1-node1", "dc1-node2", "dc1-node3"}
// Better: Across data centers
nodes := []string{"dc1-node1", "dc2-node1", "dc3-node1"}3. Monitor Latency
Consensus adds latency. Monitor and tune timeouts:
zap.WithConsensus(zap.BFT{
RoundTimeout: 500 * time.Millisecond, // Increase for high-latency networks
})4. Use Batching
For high throughput, enable batching:
zap.WithConsensus(zap.BFT{
BatchSize: 100,
BatchTimeout: 5 * time.Millisecond,
})5. Regular Snapshots
Prevent unbounded log growth:
zap.WithConsensus(zap.BFT{
Snapshot: zap.SnapshotConfig{
Interval: 10000,
Retain: 3,
},
})Next Steps
- API Reference - Complete API documentation
- Architecture - System design details
- Transports - Network configuration