ZAP Protocol

Consensus

Byzantine fault-tolerant consensus for distributed ZAP deployments

Consensus

ZAP includes an optional Byzantine Fault Tolerant (BFT) consensus layer for distributed deployments requiring strong consistency guarantees.

Overview

The consensus layer enables multiple ZAP nodes to agree on the order of operations, even in the presence of faulty or malicious nodes.

Properties

PropertyGuarantee
SafetyCorrect nodes never disagree
LivenessRequests eventually complete
Fault ToleranceTolerates f faults with 3f+1 nodes
OrderingTotal order of operations

When to Use

Enable consensus when you need:

  • State machine replication: Multiple nodes maintaining identical state
  • Leader election: Automatic failover with consistency
  • Distributed transactions: Cross-node atomic operations
  • Audit trails: Tamper-evident operation logs

Quick Start

Enable Consensus

server := zap.NewServer(
    zap.WithAddress(":9000"),
    zap.WithConsensus(zap.BFT{
        NodeID: "node1",
        Nodes: []string{
            "node1:9000",
            "node2:9000",
            "node3:9000",
            "node4:9000",
        },
        Threshold: 3,
    }),
)

Consensus-Aware Service

type counterService struct {
    count int64
}

// Replicated operation - goes through consensus
func (s *counterService) Increment(ctx context.Context) (int64, error) {
    s.count++
    return s.count, nil
}

// Read-only operation - can bypass consensus
func (s *counterService) Get(ctx context.Context) (int64, error) {
    return s.count, nil
}

// Register with replication annotations
server.Register(&counterService{},
    zap.WithReplicated("Increment"),
    zap.WithReadOnly("Get"),
)

BFT Protocol

PBFT Overview

ZAP uses a PBFT-based protocol with the following phases:

┌─────────────────────────────────────────────────────────────┐
│                                                             │
│   Client          Leader         Replica 1      Replica 2   │
│      │              │               │               │       │
│      │──Request────▶│               │               │       │
│      │              │               │               │       │
│      │              │──Pre-prepare─▶│               │       │
│      │              │──Pre-prepare──────────────────▶       │
│      │              │               │               │       │
│      │              │◀──Prepare─────│               │       │
│      │              │◀──Prepare─────────────────────│       │
│      │              │               │               │       │
│      │              │──Prepare─────▶│               │       │
│      │              │──Prepare──────────────────────▶       │
│      │              │               │               │       │
│      │              │◀──Commit──────│               │       │
│      │              │◀──Commit──────────────────────│       │
│      │              │               │               │       │
│      │◀──Response───│               │               │       │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Phases

  1. Pre-prepare: Leader assigns sequence number and broadcasts
  2. Prepare: Nodes acknowledge they received the pre-prepare
  3. Commit: Nodes confirm they have 2f+1 prepares
  4. Execute: Apply operation after 2f+1 commits

Configuration

Basic Configuration

zap.WithConsensus(zap.BFT{
    // This node's identifier
    NodeID: "node1",

    // All nodes in the cluster
    Nodes: []string{
        "node1:9000",
        "node2:9000",
        "node3:9000",
        "node4:9000",
    },

    // Minimum nodes for agreement (2f+1 for f faults)
    Threshold: 3,
})

Advanced Configuration

zap.WithConsensus(zap.BFT{
    NodeID:    "node1",
    Nodes:     nodes,
    Threshold: 3,

    // Timeouts
    RoundTimeout:   500 * time.Millisecond,
    ElectionTimeout: 2 * time.Second,

    // Batching for throughput
    BatchSize:    100,
    BatchTimeout: 10 * time.Millisecond,

    // Persistence
    WALPath: "/var/lib/zap/wal",

    // Snapshots
    SnapshotInterval: 10000,  // Operations between snapshots
    SnapshotPath:     "/var/lib/zap/snapshots",
})

Cluster Sizing

Cluster SizeFault ToleranceThreshold
4 nodes1 Byzantine fault3
7 nodes2 Byzantine faults5
10 nodes3 Byzantine faults7
3f+1 nodesf Byzantine faults2f+1

Leader Election

Automatic Leader Selection

ZAP automatically elects a leader:

server := zap.NewServer(
    zap.WithConsensus(zap.BFT{
        // ...
        LeaderElection: zap.AutoElection{
            Timeout: 2 * time.Second,
        },
    }),
)

Fixed Leader (Testing)

For testing, specify a fixed leader:

zap.WithConsensus(zap.BFT{
    LeaderElection: zap.FixedLeader{
        NodeID: "node1",
    },
})

Leader Callbacks

server.OnLeaderChange(func(isLeader bool, leaderID string) {
    if isLeader {
        log.Println("This node is now the leader")
    } else {
        log.Printf("Leader changed to: %s", leaderID)
    }
})

State Machine Replication

Implementing StateMachine

type KVStore struct {
    data map[string][]byte
    mu   sync.RWMutex
}

// Apply implements zap.StateMachine
func (s *KVStore) Apply(op []byte) ([]byte, error) {
    var cmd Command
    if err := capnp.Unmarshal(op, &cmd); err != nil {
        return nil, err
    }

    s.mu.Lock()
    defer s.mu.Unlock()

    switch cmd.Type() {
    case CommandTypePut:
        s.data[cmd.Key()] = cmd.Value()
        return nil, nil
    case CommandTypeDelete:
        delete(s.data, cmd.Key())
        return nil, nil
    default:
        return nil, fmt.Errorf("unknown command: %d", cmd.Type())
    }
}

// Snapshot implements zap.StateMachine
func (s *KVStore) Snapshot() ([]byte, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    return json.Marshal(s.data)
}

// Restore implements zap.StateMachine
func (s *KVStore) Restore(snapshot []byte) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    return json.Unmarshal(snapshot, &s.data)
}

Register State Machine

store := &KVStore{data: make(map[string][]byte)}

server := zap.NewServer(
    zap.WithConsensus(zap.BFT{
        StateMachine: store,
        // ...
    }),
)

Read Strategies

Linearizable Reads

All reads go through consensus (strongest guarantee):

server := zap.NewServer(
    zap.WithConsensus(zap.BFT{
        ReadStrategy: zap.LinearizableReads,
    }),
)

Leader Reads

Reads from leader only (faster, still consistent):

zap.WithConsensus(zap.BFT{
    ReadStrategy: zap.LeaderReads,
})

Local Reads

Read from any node (eventual consistency):

zap.WithConsensus(zap.BFT{
    ReadStrategy: zap.LocalReads,
})

Per-Method Configuration

server.Register(&myService{},
    zap.WithReplicated("Write"),           // Goes through consensus
    zap.WithLinearizable("StrongRead"),    // Linearizable read
    zap.WithLocalRead("EventualRead"),     // Local read
)

Persistence

Write-Ahead Log

zap.WithConsensus(zap.BFT{
    WAL: zap.WALConfig{
        Path:      "/var/lib/zap/wal",
        SyncMode:  zap.SyncAlways,  // fsync every write
        MaxSize:   100 * 1024 * 1024,  // 100MB per segment
    },
})

Snapshots

zap.WithConsensus(zap.BFT{
    Snapshot: zap.SnapshotConfig{
        Path:     "/var/lib/zap/snapshots",
        Interval: 10000,  // Every 10000 operations
        Retain:   3,      // Keep last 3 snapshots
    },
})

Membership Changes

Adding a Node

// From any current member
client, _ := zap.Dial("tcp://node1:9000")
admin := zap.NewAdminClient(client)

err := admin.AddNode(ctx, "node5:9000")

Removing a Node

err := admin.RemoveNode(ctx, "node3:9000")

Viewing Membership

members, err := admin.ListMembers(ctx)
for _, m := range members {
    fmt.Printf("%s: %s\n", m.ID, m.Status)
}

Monitoring

Metrics

# Consensus latency
zap_consensus_latency_seconds{phase="prepare"}
zap_consensus_latency_seconds{phase="commit"}

# Throughput
zap_consensus_operations_total
zap_consensus_operations_per_second

# Health
zap_consensus_leader{node_id="node1"}
zap_consensus_term
zap_consensus_commit_index

Health Checks

health, err := admin.GetHealth(ctx)
fmt.Printf("Leader: %s\n", health.Leader)
fmt.Printf("Term: %d\n", health.Term)
fmt.Printf("Commit Index: %d\n", health.CommitIndex)
fmt.Printf("Nodes: %v\n", health.Nodes)

Best Practices

1. Deploy Odd Numbers

Use 3, 5, or 7 nodes to avoid ties in leader election.

2. Geographic Distribution

Distribute nodes across failure domains:

// Poor: All in one data center
nodes := []string{"dc1-node1", "dc1-node2", "dc1-node3"}

// Better: Across data centers
nodes := []string{"dc1-node1", "dc2-node1", "dc3-node1"}

3. Monitor Latency

Consensus adds latency. Monitor and tune timeouts:

zap.WithConsensus(zap.BFT{
    RoundTimeout: 500 * time.Millisecond,  // Increase for high-latency networks
})

4. Use Batching

For high throughput, enable batching:

zap.WithConsensus(zap.BFT{
    BatchSize:    100,
    BatchTimeout: 5 * time.Millisecond,
})

5. Regular Snapshots

Prevent unbounded log growth:

zap.WithConsensus(zap.BFT{
    Snapshot: zap.SnapshotConfig{
        Interval: 10000,
        Retain:   3,
    },
})

Next Steps

On this page