ZAP C++

RPC System

Capability-based RPC with promise pipelining

RPC System

ZAP C++ includes a capability-based RPC system with promise pipelining, allowing efficient distributed communication.

Core Concepts

Capabilities

A capability is a reference to a remote object. If you have a capability, you can call methods on it. This is the object-capability security model:

  • No ambient authority - You can only access what you're given
  • Unforgeable references - Capabilities cannot be guessed or fabricated
  • Delegation - You can pass capabilities to others
  • Revocation - Dropping a capability removes access

Promise Pipelining

When a method returns a capability, you can immediately call methods on that capability without waiting for the first call to complete. The system automatically pipelines requests:

// Without pipelining: 3 round trips
User user = await session.login(creds);
Profile profile = await user.getProfile();
Avatar avatar = await profile.getAvatar();
 
// With pipelining: 1 round trip
Promise<Avatar> avatar = session.login(creds)
                                .getProfile()
                                .getAvatar();

This dramatically reduces latency in distributed systems.

Defining Interfaces

# calculator.zap
@0x85150b117366d14b;
 
interface Calculator {
  add @0 (a :Float64, b :Float64) -> (result :Float64);
  subtract @1 (a :Float64, b :Float64) -> (result :Float64);
  multiply @2 (a :Float64, b :Float64) -> (result :Float64);
  divide @3 (a :Float64, b :Float64) -> (result :Float64);
 
  # Returns a new calculator that adds offset to all results
  withOffset @4 (offset :Float64) -> (calc :Calculator);
}

Implementing Servers

Each interface method becomes a virtual function to override:

#include <zap/rpc-twoparty.h>
#include <kj/async-io.h>
#include "calculator.zap.h"
 
class CalculatorImpl final : public Calculator::Server {
public:
  kj::Promise<void> add(AddContext context) override {
    auto params = context.getParams();
    auto result = params.getA() + params.getB();
    context.getResults().setResult(result);
    return kj::READY_NOW;
  }
 
  kj::Promise<void> subtract(SubtractContext context) override {
    auto params = context.getParams();
    auto result = params.getA() - params.getB();
    context.getResults().setResult(result);
    return kj::READY_NOW;
  }
 
  kj::Promise<void> multiply(MultiplyContext context) override {
    auto params = context.getParams();
    auto result = params.getA() * params.getB();
    context.getResults().setResult(result);
    return kj::READY_NOW;
  }
 
  kj::Promise<void> divide(DivideContext context) override {
    auto params = context.getParams();
    if (params.getB() == 0) {
      KJ_FAIL_REQUIRE("Division by zero");
    }
    auto result = params.getA() / params.getB();
    context.getResults().setResult(result);
    return kj::READY_NOW;
  }
 
  kj::Promise<void> withOffset(WithOffsetContext context) override {
    auto offset = context.getParams().getOffset();
    context.getResults().setCalc(
        kj::heap<OffsetCalculator>(offset));
    return kj::READY_NOW;
  }
};

Running a Server

Using EzRpcServer

The simplest way to start a server:

#include <zap/ez-rpc.h>
 
int main() {
  // Start RPC server
  zap::EzRpcServer server(
      kj::heap<CalculatorImpl>(),
      "localhost:5000");
 
  // Get the wait scope for blocking
  auto& waitScope = server.getWaitScope();
 
  std::cout << "Listening on localhost:5000..." << std::endl;
 
  // Run forever
  kj::NEVER_DONE.wait(waitScope);
 
  return 0;
}

Manual Setup

For more control:

#include <zap/rpc-twoparty.h>
#include <kj/async-io.h>
 
int main() {
  // Create the event loop
  kj::AsyncIoContext ctx = kj::setupAsyncIo();
 
  // Listen on a port
  auto listener = ctx.provider->getNetwork()
      .parseAddress("*:5000").wait(ctx.waitScope)
      ->listen();
 
  // Create server capability
  Calculator::Client calculator = kj::heap<CalculatorImpl>();
 
  // Accept connections
  auto acceptLoop = acceptConnections(*listener, calculator, ctx);
  acceptLoop.wait(ctx.waitScope);
 
  return 0;
}

Creating Clients

Using EzRpcClient

#include <zap/ez-rpc.h>
#include "calculator.zap.h"
 
int main() {
  // Connect to server
  zap::EzRpcClient client("localhost:5000");
  auto& waitScope = client.getWaitScope();
 
  // Get the calculator capability
  Calculator::Client calc = client.getMain<Calculator>();
 
  // Make a request
  auto request = calc.addRequest();
  request.setA(5.0);
  request.setB(3.0);
 
  // Wait for result
  auto response = request.send().wait(waitScope);
  double result = response.getResult();
  // result == 8.0
 
  return 0;
}

Promise Pipelining in Action

// Get a calculator with offset, then use it
auto offsetCalc = calc.withOffsetRequest();
offsetCalc.setOffset(10.0);
auto promise = offsetCalc.send();
 
// Pipeline: call add on the returned calculator
// This doesn't wait for withOffset to complete
auto addRequest = promise.getCalc().addRequest();
addRequest.setA(1.0);
addRequest.setB(2.0);
 
// Only one round-trip for both calls!
auto result = addRequest.send().wait(waitScope);
// result.getResult() == 13.0 (1 + 2 + 10)

Error Handling

try {
  auto request = calc.divideRequest();
  request.setA(10.0);
  request.setB(0.0);
 
  auto response = request.send().wait(waitScope);
  // This throws because the server failed
} catch (const kj::Exception& e) {
  // e.getDescription() contains error message
  std::cerr << "RPC failed: " << e.getDescription().cStr() << std::endl;
}

Returning Errors

On the server side:

kj::Promise<void> riskyMethod(RiskyMethodContext context) override {
  if (somethingWrong) {
    // This becomes an exception on the client
    KJ_FAIL_REQUIRE("Something went wrong", details);
  }
  // Normal processing...
  return kj::READY_NOW;
}

Cancellation

Dropping a promise cancels the request:

// Start a long operation
auto request = server.longOperationRequest();
auto promise = request.send();
 
// Cancel it by dropping the promise
promise = nullptr;  // Request is cancelled

Streaming

Schema

interface DataStream {
  # Called for each chunk
  write @0 (data :Data) -> stream;
 
  # Called when done
  done @1 () -> ();
}
 
interface DataProducer {
  # Start streaming to the given callback
  stream @0 (callback :DataStream) -> ();
}

Server Implementation

class DataStreamImpl : public DataStream::Server {
  kj::Promise<void> write(WriteContext context) override {
    auto data = context.getParams().getData();
    processChunk(data);
    return kj::READY_NOW;
  }
 
  kj::Promise<void> done(DoneContext context) override {
    finalize();
    return kj::READY_NOW;
  }
};

Two-Party Protocol

For direct connections without a central server:

#include <zap/rpc-twoparty.h>
#include <kj/async-io.h>
 
// Server side
void runServer(kj::AsyncIoStream& stream) {
  zap::TwoPartyServer server(kj::heap<MyServiceImpl>());
 
  auto& network = server.accept(stream);
  // Connection is now active
}
 
// Client side
void runClient(kj::AsyncIoStream& stream) {
  zap::TwoPartyClient client(stream);
 
  auto cap = client.bootstrap().castAs<MyService>();
  // Use the capability
}

Bidirectional Communication

Both sides can export capabilities:

interface ChatServer {
  join @0 (username :Text, client :ChatClient) -> (session :ChatSession);
}
 
interface ChatSession {
  send @0 (message :Text) -> ();
  leave @1 () -> ();
}
 
interface ChatClient {
  receive @0 (from :Text, message :Text) -> ();
  userJoined @1 (username :Text) -> ();
  userLeft @2 (username :Text) -> ();
}

The client passes a ChatClient capability to the server, which can then call back to the client.

Connection Management

Detecting Disconnection

// Get notified when connection drops
auto disconnectPromise = client.onDisconnect();
 
disconnectPromise.then([]() {
  std::cout << "Disconnected from server" << std::endl;
}).detach([](kj::Exception&& e) {
  std::cerr << "Disconnect error: " << e.getDescription().cStr() << std::endl;
});

Graceful Shutdown

// Server: stop accepting new connections
listener = nullptr;
 
// Wait for existing requests to complete
// or implement a shutdown method in your interface

Best Practices

  1. Use promise pipelining - Avoid unnecessary round trips by chaining calls
  2. Return capabilities - Instead of IDs, return object references
  3. Cancel unused promises - Free server resources early
  4. Handle errors - Always catch exceptions from RPC calls
  5. Use streaming for large data - Don't send huge messages at once
  6. Keep interfaces focused - Small interfaces are easier to mock and test
  7. Document capability lifecycles - Make it clear who owns what

Performance Considerations

  • Batch requests - Multiple pipelined calls are more efficient
  • Reuse capabilities - Don't re-bootstrap for each operation
  • Consider locality - Pass capabilities to where the data is
  • Monitor message sizes - Large messages can cause latency spikes

Next Steps