BinaryRPC
BinaryRPC, high-throughput RPC framework built on top of uWebSockets.
It is designed for latency-sensitive applications such as multiplayer games, financial tick streams, and IoT dashboards, delivering ultra-low latency and minimal overhead. With a modular architecture and efficient networking, BinaryRPC remains lightweight both in resource usage and developer experience.
โจ Highlights
Capability | Description |
---|---|
โก WebSocket transport | Blazing-fast networking powered by uWebSockets and epoll /kqueue . |
๐ Configurable reliability (QoS) | None , AtLeastOnce , ExactlyOnce with retries, ACKs & two-phase commit, plus pluggable back-off strategies & per-session TTL. |
๐งฉ Pluggable layers | Drop-in protocols (SimpleText , MsgPack, โฆ), transports, middleware & plugins. |
๐งโ๐คโ๐ง Stateful sessions | Reconnect-friendly Session objects with automatic expiry & indexed fields. |
๐ก๏ธ Middleware chain | JWT auth, token-bucket rate-limiter and any custom middleware you write. |
๐ Header-only core | Almost all of BinaryRPC lives in headers โ just add include path. |
๐ช Custom handshake & authentication (IHandshakeInspector
)
The first thing every WebSocket upgrade hits is an IHandshakeInspector
. The default one simply accepts the socket and builds a trivial ClientIdentity
from the URL query-string. In production you almost always want to sub-class it.
Below is a condensed version of the CustomHandshakeInspector
actually running in the reference chat server:
class CustomHandshakeInspector : public IHandshakeInspector {
public:
std::optional<ClientIdentity> extract(uWS::HttpRequest& req) override {
// 1. Parse query-string
// ws://host:9010/?clientId=123&deviceId=456&sessionToken=ABCDโฆ
std::string q{req.getQuery()};
auto [clientId, deviceIdStr, tokenHex] = parseQuery(q);
if (clientId.empty() || deviceIdStr.empty())
return std::nullopt; // missing IDs
int deviceId;
try { deviceId = std::stoi(deviceIdStr); }
catch (...) { return std::nullopt; }
if (!sessionFileContains(clientId, deviceIdStr))
return std::nullopt; // unknown combo
std::array<std::uint8_t,16> token{};
if (tokenHex.size() == 32)
hexStringToByteArray(tokenHex, token); // reuse
else
token = sha256_16(clientId+':'+deviceIdStr+':'+epochMs());
ClientIdentity id{clientId, deviceId, token};
return id; // success
}
};
// Attach inspector
ws->setHandshakeInspector(std::make_shared<CustomHandshakeInspector>());
Reusing an active session
- Attach the new socket to that session.
- Replay all QoS-1/2 frames still waiting in its outbox.
- Retain all custom fields saved via
FrameworkAPI
; the client continues seamlessly.
๐ฆ Reliability & QoS
Level | Guarantees | Frame flow |
---|---|---|
QoSLevel::None | At-most-once. Fire-and-forget โ lowest latency. | DATA โ client |
QoSLevel::AtLeastOnce | At-least-once. Server retries until client ACKs. | DATA โ ACK |
QoSLevel::ExactlyOnce | Exactly-once. Two-phase commit eliminates duplicates. | PREPARE โ PREPARE_ACK โ COMMIT โ COMPLETE |
Why sessionTtlMs
matters ๐ค
- Seamless reconnects โ brief drops are invisible if the user returns within TTL.
- Reliable offline push โ queued QoS-1/2 frames flush automatically.
- Duplicate shielding โ old retries are ignored after ExactlyOnce commit.
ReliableOptions
struct ReliableOptions {
QoSLevel level = QoSLevel::None;
std::uint32_t baseRetryMs = 100;
std::uint32_t maxBackoffMs = 2000;
std::uint16_t maxRetry = 5;
std::uint32_t sessionTtlMs = 30000;
std::shared_ptr<IBackoffStrategy> backoffStrategy;
};
Custom back-off policy
class FibonacciBackoff : public IBackoffStrategy {
public:
std::chrono::milliseconds nextDelay(std::size_t n) const override {
if (n < 2) return {100};
std::size_t a = 0, b = 1;
for (std::size_t i = 0; i < n; ++i) { std::size_t t = a + b; a = b; b = t; }
return std::chrono::milliseconds(std::min(b * 100UL, 5000UL));
}
};
ws->setReliable({
.level = QoSLevel::AtLeastOnce,
.baseRetryMs = 100,
.backoffStrategy = std::make_shared<FibonacciBackoff>()
});
๐๏ธ Session management via FrameworkAPI
FrameworkAPI
glues the transport and the lock-free SessionManager
. Use it to store, search and push data to any connected (or recently disconnected) client.
using namespace binaryrpc;
auto& app = App::getInstance();
FrameworkAPI fw{ &app.getSessionManager(), app.getTransport() };
indexed
flag โ when to flip it
indexed | Behaviour | Complexity |
---|---|---|
false (default) | Value kept only in session blob. | Lookup โ O(N) |
true | Value also put into a global hash-map. | fw.findBy() โ O(1) |
Example: persist state & target users
// 1๏ธโฃ Persist state
fw.setField(ctx.sessionId(), "userId", userId, true);
fw.setField(ctx.sessionId(), "username", username);
fw.setField(ctx.sessionId(), "xp", 0);
// 2๏ธโฃ Target users later
for (auto& s : fw.findBy("userId", std::to_string(userId))) {
fw.sendToSession(s, app.getProtocol()->serialize("levelUp", {{"lvl", newLvl}}));
}
๐บ๏ธ Architecture Overview
+-----------------------------+
| raw bytes --> IProtocol |
| (parse) |
+--------------+--------------+
|
v
+--------------+--------------+
| ParsedRequest |
+--------------+--------------+
|
v
+--------------+--------------+
| MiddlewareChain (*) |
+--------------+--------------+
|
v
+--------------+--------------+
| RpcContext & SessionManager |
+--------------+--------------+
|
v
+--------------+--------------+
| RPCManager |
+--------------+--------------+
|
v
+--------------+--------------+
| ITransport (send) |
| <--> client |
+-----------------------------+
All rectangles are replaceable: implement the interface and plug your own.
๐ ๏ธ Customisation Cheat-Sheet
What you want to change | How |
---|---|
QoS level / retries | WebSocketTransport::setReliable(ReliableOptions) |
Back-off curve | Implement IBackoffStrategy |
Serialisation | Implement IProtocol |
Transport | Implement ITransport |
Middleware | App::use , useFor |
Plugins | App::usePlugin |
Session fields | fw.setField / fw.getField |
Logging sink | Logger::inst().setSink(...) |
๐ Middleware Management
1. Global Middleware (use
)
app.use([](Session& session, const std::string& method, std::vector<uint8_t>& payload, NextFunc next) {
LOG_DEBUG("Incoming request: " + method);
next();
});
2. Single Method Middleware (useFor
)
app.useFor("login", [](Session& session, const std::string& m,
std::vector<uint8_t>& payload, NextFunc next) {
auto req = parseMsgPackPayload(payload);
if (!validateCredentials(req)) throw ErrorObj("Invalid credentials");
next();
});
3. Multi-Method Middleware (useForMulti
)
app.useForMulti({"send_message","join_room"}, [](Session& s,const std::string&,std::vector<uint8_t>&,
NextFunc n){
if(!s.isAuthenticated()) throw ErrorObj("Auth required");
n();
});
๐ Registering RPCs & Replying
The core of your application is the set of RPC handlers that process client requests. You register them on the App
instance.
registerRPC(method, handler)
This method binds a handler to a specific method name. The handler is a function that receives the raw payload and an RpcContext
object. The handler is responsible for parsing the incoming payload and serializing the outgoing payload before replying.
The IProtocol
interface (MsgPackProtocol
, SimpleTextProtocol
, etc.) is responsible for framing the data, not for serializing your application-specific data structures. It wraps your payload (std::vector<uint8_t>
) with a method name.
Here is a corrected example using nlohmann/json
to handle the application-level data serialization.
// main.cpp
#include "binaryrpc/core/app.hpp"
#include "binaryrpc/core/protocol/msgpack_protocol.hpp"
#include <nlohmann/json.hpp> // You'll need a JSON library
#include <iostream>
int main() {
auto& app = binaryrpc::App::getInstance();
// Use the MsgPack protocol for framing
auto protocol = std::make_shared<binaryrpc::MsgPackProtocol>();
app.setProtocol(protocol);
// Create a FrameworkAPI instance to interact with sessions
FrameworkAPI api(&app.getSessionManager(), app.getTransport());
// Register a "login" handler
app.registerRPC("login", [&](const std::vector<uint8_t>& payload, binaryrpc::RpcContext& ctx) {
nlohmann::json response_data;
try {
// 1. Parse the incoming payload
auto request = nlohmann::json::parse(payload);
std::string username = request.value("username", "");
std::string password = request.value("password", "");
// 2. Authenticate the user
if (username == "admin" && password == "password") {
std::cout << "Login successful for: " << username << std::endl;
// 3. Store state using FrameworkAPI
api.setField(ctx.session().id(), "isAuthenticated", true, false);
api.setField(ctx.session().id(), "username", username, true); // indexed
// 4. Prepare the response payload
response_data["status"] = "success";
response_data["ts"] = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()).count();
} else {
response_data["status"] = "error";
response_data["reason"] = "Invalid credentials";
}
} catch (const std::exception& e) {
std::cerr << "Error in login handler: " << e.what() << std::endl;
response_data["status"] = "error";
response_data["reason"] = "Internal server error";
}
// 5. Serialize the response data and reply
std::string response_str = response_data.dump();
std::vector<uint8_t> response_bytes(response_str.begin(), response_str.end());
// Use the protocol to frame the response
ctx.reply(protocol->serialize("login_response", response_bytes));
});
// Add an authentication middleware for "get_profile"
app.useFor("get_profile", [&](Session& session, const std::string& method, std::vector<uint8_t>& payload,
NextFunc next) {
auto isAuthenticated = api.getField<bool>(session.id(), "isAuthenticated");
if (!isAuthenticated.value_or(false)) {
// User is not authenticated. Send an error response directly and stop the chain.
nlohmann::json err_data = {
{"status", "error"},
{"reason", "Authentication required"}
};
std::string err_str = err_data.dump();
std::vector<uint8_t> err_bytes(err_str.begin(), err_str.end());
// Frame the error response
auto framed_error = protocol->serialize("auth_error", err_bytes);
// Send it directly to the session and DO NOT call next()
// Note: This creates a shared_ptr copy of the session to pass to the API.
api.sendToSession(std::make_shared<Session>(session), framed_error);
return;
}
// User is authenticated, proceed to the RPC handler
next();
});
// Register another handler - now protected by middleware
app.registerRPC("get_profile", [&](const auto& payload, auto& ctx) {
// Auth check is now in the middleware, we can proceed directly.
auto username = api.getField<std::string>(ctx.session().id(), "username");
nlohmann::json profile_data;
profile_data["username"] = username.value_or("N/A");
// ... other profile data
std::string profile_str = profile_data.dump();
std::vector<uint8_t> profile_bytes(profile_str.begin(), profile_str.end());
ctx.reply(protocol->serialize("profile_data", profile_bytes));
});
app.run(9010);
return 0;
}
The RpcContext
The RpcContext
is your gateway to interacting with the client and their session:
ctx.reply(payload)
: Sends a message back to the originating client.ctx.broadcast(payload)
: Sends a message to all connected clients.ctx.session()
: Returns a reference to theSession
object for the current client. You can use this to store and retrieve connection-specific state usingsetField
andgetField
. (But I prefer framework api to use)ctx.disconnect()
: Closes the connection.
Delivery of replies respects the QoS settings of the transport, just like sendToSession()
does (including ACKs, retries, and the offline outbox).
๐ How clients connect
Server-side expectations
clientId โ opaque user string (e.g. email)
deviceId โ integer; each device gets a stable ID
sessionToken (optional) โ 32-hex chars to resume previous session
JavaScript reference client
import "./binaryrpc.js";
const rpc = new BinaryRPC("ws://localhost:9010", {
onConnect() { rpc.call("join"); },
onError: console.error
});
rpc.connect("user-42", "web", null);
// laterโฆ
rpc.call("broadcast", { message: "hello" });
Note: The documentation is currently in this state. The framework is actively being used in projects at the company Iโm currently working for. Iโll be publishing it as open-source in the near future. 15.06.2025 last updated