KJ Library
Modern C++ utility library for async I/O, strings, and more
KJ Library
KJ is a modern C++ utility library included with ZAP C++. It provides async I/O, string utilities, containers, memory management, and more.
Headers Overview
#include <kj/common.h> // Core utilities
#include <kj/string.h> // String types
#include <kj/array.h> // Array and ArrayPtr
#include <kj/vector.h> // Dynamic arrays
#include <kj/memory.h> // Own, Maybe
#include <kj/async.h> // Promises, event loop
#include <kj/async-io.h> // Async networking
#include <kj/exception.h> // Exception handling
#include <kj/debug.h> // Logging and assertions
#include <kj/time.h> // Time utilitiesStrings
StringPtr
Non-owning reference to a string (like std::string_view):
#include <kj/string.h>
void process(kj::StringPtr str) {
// Access characters
for (char c : str) { /* ... */ }
// Get C string (null-terminated)
const char* cstr = str.cStr();
// Get size
size_t len = str.size();
// Comparison
if (str == "hello") { /* ... */ }
if (str.startsWith("prefix")) { /* ... */ }
if (str.endsWith("suffix")) { /* ... */ }
// Slicing
kj::StringPtr sub = str.slice(start, end);
}String
Owning string type (like std::string):
#include <kj/string.h>
// Create string using kj::str (concatenates arguments)
kj::String str = kj::str("Hello, ", name, "!");
// From heap allocated
kj::String owned = kj::heapString("hello");
// From StringPtr
kj::String copied = kj::heapString(stringPtr);
// Convert to StringPtr (implicit)
kj::StringPtr ptr = str;
// Convert to std::string
std::string stdStr = str.cStr();
// Move semantics
kj::String str2 = kj::mv(str);String Formatting
// Concatenate anything
kj::String msg = kj::str("Value: ", value, ", Count: ", count);
// With hex formatting
kj::String hex = kj::str(kj::hex(value));
// With fixed precision
kj::String num = kj::str(kj::fixedDouble(3.14159, 2)); // "3.14"Arrays
ArrayPtr
Non-owning reference to a contiguous array:
#include <kj/array.h>
void process(kj::ArrayPtr<const int> arr) {
// Iterate
for (int x : arr) { /* ... */ }
// Access elements
int first = arr[0];
size_t size = arr.size();
// Slicing
auto slice = arr.slice(1, 5);
// Check if empty
if (arr.size() == 0) { /* ... */ }
}Array
Owning array type:
#include <kj/array.h>
// Create array
kj::Array<int> arr = kj::heapArray<int>(100);
// Initialize
for (size_t i = 0; i < arr.size(); i++) {
arr[i] = i;
}
// Create from initializer
kj::Array<int> small = kj::heapArray<int>({1, 2, 3, 4, 5});
// Move semantics
kj::Array<int> arr2 = kj::mv(arr);
// Convert to ArrayPtr
kj::ArrayPtr<int> ptr = arr2;Vector
Dynamic array (like std::vector):
#include <kj/vector.h>
kj::Vector<int> vec;
vec.add(1);
vec.add(2);
vec.add(3);
// Access
int first = vec[0];
size_t size = vec.size();
// Convert to Array (moves data)
kj::Array<int> arr = vec.releaseAsArray();Memory Management
Own
Unique ownership pointer (like std::unique_ptr):
#include <kj/memory.h>
// Create owned object
kj::Own<MyClass> obj = kj::heap<MyClass>(arg1, arg2);
// Access
obj->method();
MyClass& ref = *obj;
// Move
kj::Own<MyClass> obj2 = kj::mv(obj);
// Release raw pointer (transfers ownership)
MyClass* raw = obj2.release();
// Custom destructor
auto custom = kj::Own<Resource>(ptr, [](Resource* r) {
r->cleanup();
delete r;
});Maybe
Optional value type (like std::optional):
#include <kj/memory.h>
kj::Maybe<int> maybeValue;
// Check if has value
if (maybeValue != kj::none) { /* ... */ }
// Pattern matching with KJ_IF_MAYBE
KJ_IF_MAYBE(value, maybeValue) {
// *value is the int
process(*value);
} else {
// No value present
}
// Get with default
int value = maybeValue.orDefault(42);
// Map transformation
kj::Maybe<kj::String> maybeStr = maybeValue.map([](int v) {
return kj::str(v);
});Async I/O
Event Loop
#include <kj/async-io.h>
int main() {
// Set up async I/O context
kj::AsyncIoContext ctx = kj::setupAsyncIo();
kj::WaitScope& waitScope = ctx.waitScope;
// Run async code
auto promise = doAsyncWork();
auto result = promise.wait(waitScope);
return 0;
}Promises
#include <kj/async.h>
// Create a promise that resolves immediately
kj::Promise<int> promise = kj::Promise<int>(42);
// Chain with then()
auto doubled = promise.then([](int x) {
return x * 2;
});
// Chain multiple operations
auto result = fetchData()
.then([](Data data) { return process(data); })
.then([](Result r) { return format(r); });
// Handle errors
auto safe = riskyOperation()
.catch_([](kj::Exception&& e) {
return kj::str("Error: ", e.getDescription());
});
// Void promises
kj::Promise<void> done = doSomething()
.then([]() {
// No value, just completion
});Creating Promises
// From a value
kj::Promise<int> immediate = kj::Promise<int>(42);
// Ready now (void)
return kj::READY_NOW;
// Never completes
kj::Promise<void> forever = kj::NEVER_DONE;
// From a fulfiller (deferred resolution)
auto paf = kj::newPromiseAndFulfiller<int>();
// Later: paf.fulfiller->fulfill(42);
// Or: paf.fulfiller->reject(KJ_EXCEPTION(FAILED, "reason"));
return kj::mv(paf.promise);Forking Promises
Use the result of a promise multiple times:
// Fork a promise to use result multiple times
auto fork = promise.fork();
auto branch1 = fork.addBranch().then([](int x) { return x + 1; });
auto branch2 = fork.addBranch().then([](int x) { return x * 2; });
// Both branches will receive the same valueJoining Promises
// Wait for multiple promises
kj::Vector<kj::Promise<int>> promises;
promises.add(task1());
promises.add(task2());
promises.add(task3());
auto all = kj::joinPromises(promises.releaseAsArray());
auto results = all.wait(waitScope);
// Join two specific promises
auto both = promise1.then([&](int a) {
return promise2.then([a](int b) {
return a + b;
});
});Exclusive Join
Race promises, cancel the loser:
auto winner = promise1.exclusiveJoin(kj::mv(promise2));
// Only the first to complete survives, the other is cancelledNetworking
TCP Server
#include <kj/async-io.h>
kj::Promise<void> runServer(kj::AsyncIoContext& ctx) {
auto& network = ctx.provider->getNetwork();
// Listen on port
auto listener = network.parseAddress("*:8080")
.wait(ctx.waitScope)
->listen();
// Accept connections
return acceptLoop(*listener);
}
kj::Promise<void> acceptLoop(kj::ConnectionReceiver& listener) {
return listener.accept()
.then([&listener](kj::Own<kj::AsyncIoStream> stream) {
// Handle connection in background
handleConnection(kj::mv(stream)).detach([](kj::Exception&& e) {
KJ_LOG(ERROR, "Connection error", e);
});
// Accept next connection
return acceptLoop(listener);
});
}TCP Client
kj::Promise<void> connect(kj::AsyncIoContext& ctx) {
auto& network = ctx.provider->getNetwork();
return network.parseAddress("localhost:8080")
.then([](kj::Own<kj::NetworkAddress> addr) {
return addr->connect();
})
.then([](kj::Own<kj::AsyncIoStream> stream) {
return useConnection(kj::mv(stream));
});
}Reading and Writing
kj::Promise<void> echo(kj::AsyncIoStream& stream) {
auto buffer = kj::heapArray<kj::byte>(1024);
return stream.read(buffer.begin(), 1, buffer.size())
.then([&stream, buffer = kj::mv(buffer)](size_t n) mutable {
return stream.write(buffer.begin(), n);
})
.then([&stream]() {
return echo(stream); // Continue echoing
});
}Exception Handling
#include <kj/exception.h>
// Throw with context
KJ_FAIL_REQUIRE("Invalid input", value, "expected positive");
// Assert (throws if false)
KJ_REQUIRE(value > 0, "Value must be positive", value);
// Debug assertions (removed in release builds)
KJ_DASSERT(pointer != nullptr);
// Catch and handle
try {
riskyOperation();
} catch (const kj::Exception& e) {
KJ_LOG(ERROR, "Operation failed", e.getDescription());
}
// Create exception
auto e = KJ_EXCEPTION(FAILED, "Something went wrong", details);Exception Types
FAILED- General failureOVERLOADED- Too busy, try again laterDISCONNECTED- Connection lostUNIMPLEMENTED- Feature not implemented
Logging
#include <kj/debug.h>
KJ_LOG(INFO, "Starting server", port);
KJ_LOG(WARNING, "Connection timeout", address);
KJ_LOG(ERROR, "Failed to read", filename, errno);
// Debug logging (can be compiled out)
KJ_DBG("Debug value", someVariable);Time and Timers
#include <kj/async-io.h>
#include <kj/time.h>
kj::Promise<void> delayed(kj::Timer& timer) {
// Sleep for 1 second
return timer.afterDelay(1 * kj::SECONDS);
}
kj::Promise<void> timeout(kj::Timer& timer, kj::Promise<void> op) {
// Timeout after 30 seconds
return op.exclusiveJoin(
timer.afterDelay(30 * kj::SECONDS)
.then([]() -> kj::Promise<void> {
KJ_FAIL_REQUIRE("Operation timed out");
}));
}
// Get current time
auto now = kj::systemPreciseMonotonicClock().now();Thread Safety
KJ async is single-threaded by design. Use executors for multi-threading:
#include <kj/async.h>
#include <kj/thread.h>
// Get current thread's executor
auto& executor = kj::getCurrentThreadExecutor();
// Run on another thread
kj::Thread thread([&]() {
// This runs on another thread
auto promise = executor.executeAsync([&]() {
// This runs back on the main thread
return result;
});
});Cross-Thread Communication
// Create a cross-thread promise fulfiller
auto paf = kj::newPromiseAndCrossThreadFulfiller<int>();
// In another thread:
paf.fulfiller->fulfill(42);
// In the main thread:
auto result = paf.promise.wait(waitScope);Common Patterns
RAII with Defer
auto guard = kj::defer([&]() {
cleanup();
});
// cleanup() is called when guard goes out of scopeError Propagation
kj::Promise<Result> processAsync() {
return fetchData()
.then([](Data data) -> kj::Promise<Result> {
if (!data.isValid()) {
return KJ_EXCEPTION(FAILED, "Invalid data");
}
return process(data);
});
}Next Steps
- Explore the API Reference
- See Examples
- Learn about RPC