efecan.zone

โœง ๐Ÿš€ BinaryRPC - ultra-low-latency WebSocket RPC framework โ€ข QoS โ€ข Pluggable Middleware โ€ข Header-only Core โœง

BinaryRPC

BinaryRPC, high-throughput RPC framework built on top of uWebSockets.
It is designed for latency-sensitive applications such as multiplayer games, financial tick streams, and IoT dashboards, delivering ultra-low latency and minimal overhead. With a modular architecture and efficient networking, BinaryRPC remains lightweight both in resource usage and developer experience.

โœจ Highlights

CapabilityDescription
โšก WebSocket transportBlazing-fast networking powered by uWebSockets and epoll/kqueue.
๐Ÿ”„ Configurable reliability (QoS)None, AtLeastOnce, ExactlyOnce with retries, ACKs & two-phase commit, plus pluggable back-off strategies & per-session TTL.
๐Ÿงฉ Pluggable layersDrop-in protocols (SimpleText, MsgPack, โ€ฆ), transports, middleware & plugins.
๐Ÿง‘โ€๐Ÿคโ€๐Ÿง‘ Stateful sessionsReconnect-friendly Session objects with automatic expiry & indexed fields.
๐Ÿ›ก๏ธ Middleware chainJWT auth, token-bucket rate-limiter and any custom middleware you write.
๐Ÿ”Œ Header-only coreAlmost all of BinaryRPC lives in headers โ€“ just add include path.

๐Ÿšช Custom handshake & authentication (IHandshakeInspector)

The first thing every WebSocket upgrade hits is an IHandshakeInspector. The default one simply accepts the socket and builds a trivial ClientIdentity from the URL query-string. In production you almost always want to sub-class it.

Below is a condensed version of the CustomHandshakeInspector actually running in the reference chat server:

class CustomHandshakeInspector : public IHandshakeInspector {
        public:
            std::optional<ClientIdentity> extract(uWS::HttpRequest& req) override {
                // 1. Parse query-string
                //    ws://host:9010/?clientId=123&deviceId=456&sessionToken=ABCDโ€ฆ

                std::string q{req.getQuery()};
                auto [clientId, deviceIdStr, tokenHex] = parseQuery(q);

                if (clientId.empty() || deviceIdStr.empty())
                    return std::nullopt;                         // missing IDs

                int deviceId;
                try { deviceId = std::stoi(deviceIdStr); }
                catch (...) { return std::nullopt; }

                if (!sessionFileContains(clientId, deviceIdStr))
                    return std::nullopt;                         // unknown combo

                std::array<std::uint8_t,16> token{};
                if (tokenHex.size() == 32)
                    hexStringToByteArray(tokenHex, token);       // reuse
                else
                    token = sha256_16(clientId+':'+deviceIdStr+':'+epochMs());

                ClientIdentity id{clientId, deviceId, token};
                return id;                                       // success
            }
        };

        // Attach inspector
        ws->setHandshakeInspector(std::make_shared<CustomHandshakeInspector>());
        

Reusing an active session

  1. Attach the new socket to that session.
  2. Replay all QoS-1/2 frames still waiting in its outbox.
  3. Retain all custom fields saved via FrameworkAPI; the client continues seamlessly.

๐Ÿšฆ Reliability & QoS

LevelGuaranteesFrame flow
QoSLevel::NoneAt-most-once. Fire-and-forget โ€“ lowest latency.DATA โ†’ client
QoSLevel::AtLeastOnceAt-least-once. Server retries until client ACKs.DATA โ†” ACK
QoSLevel::ExactlyOnceExactly-once. Two-phase commit eliminates duplicates.PREPARE โ†” PREPARE_ACK โ†” COMMIT โ†” COMPLETE

Why sessionTtlMs matters ๐Ÿค”

ReliableOptions

struct ReliableOptions {
            QoSLevel level = QoSLevel::None;
            std::uint32_t baseRetryMs   = 100;
            std::uint32_t maxBackoffMs  = 2000;
            std::uint16_t maxRetry      = 5;
            std::uint32_t sessionTtlMs  = 30000;
            std::shared_ptr<IBackoffStrategy> backoffStrategy;
        };

Custom back-off policy

class FibonacciBackoff : public IBackoffStrategy {
        public:
            std::chrono::milliseconds nextDelay(std::size_t n) const override {
                if (n < 2) return {100};
                std::size_t a = 0, b = 1;
                for (std::size_t i = 0; i < n; ++i) { std::size_t t = a + b; a = b; b = t; }
                return std::chrono::milliseconds(std::min(b * 100UL, 5000UL));
            }
        };

        ws->setReliable({
            .level = QoSLevel::AtLeastOnce,
            .baseRetryMs = 100,
            .backoffStrategy = std::make_shared<FibonacciBackoff>()
        });
        

๐Ÿ—„๏ธ Session management via FrameworkAPI

FrameworkAPI glues the transport and the lock-free SessionManager. Use it to store, search and push data to any connected (or recently disconnected) client.

using namespace binaryrpc;
        auto& app = App::getInstance();
        FrameworkAPI fw{ &app.getSessionManager(), app.getTransport() };
        

indexed flag โ€“ when to flip it

indexedBehaviourComplexity
false (default)Value kept only in session blob.Lookup โ‡’ O(N)
trueValue also put into a global hash-map.fw.findBy() โ‡’ O(1)

Example: persist state & target users

// 1๏ธโƒฃ Persist state
        fw.setField(ctx.sessionId(), "userId", userId, true);
        fw.setField(ctx.sessionId(), "username", username);
        fw.setField(ctx.sessionId(), "xp", 0);

        // 2๏ธโƒฃ Target users later
        for (auto& s : fw.findBy("userId", std::to_string(userId))) {
            fw.sendToSession(s, app.getProtocol()->serialize("levelUp", {{"lvl", newLvl}}));
        }
        

๐Ÿ—บ๏ธ Architecture Overview


        +-----------------------------+
        | raw bytes --> IProtocol     |
        |               (parse)       |
        +--------------+--------------+
                    |
                    v
        +--------------+--------------+
        |        ParsedRequest        |
        +--------------+--------------+
                    |
                    v
        +--------------+--------------+
        |     MiddlewareChain (*)     |
        +--------------+--------------+
                    |
                    v
        +--------------+--------------+
        | RpcContext & SessionManager |
        +--------------+--------------+
                    |
                    v
        +--------------+--------------+
        |           RPCManager        |
        +--------------+--------------+
                    |
                    v
        +--------------+--------------+
        |    ITransport (send)        |
        |         <--> client         |
        +-----------------------------+

All rectangles are replaceable: implement the interface and plug your own.


๐Ÿ› ๏ธ Customisation Cheat-Sheet

What you want to changeHow
QoS level / retriesWebSocketTransport::setReliable(ReliableOptions)
Back-off curveImplement IBackoffStrategy
SerialisationImplement IProtocol
TransportImplement ITransport
MiddlewareApp::use, useFor
PluginsApp::usePlugin
Session fieldsfw.setField / fw.getField
Logging sinkLogger::inst().setSink(...)

๐Ÿ”„ Middleware Management

1. Global Middleware (use)

app.use([](Session& session, const std::string& method, std::vector<uint8_t>& payload, NextFunc next) {
            LOG_DEBUG("Incoming request: " + method);
            next();
        });

2. Single Method Middleware (useFor)

app.useFor("login", [](Session& session, const std::string& m,
                            std::vector<uint8_t>& payload, NextFunc next) {
            auto req = parseMsgPackPayload(payload);
            if (!validateCredentials(req)) throw ErrorObj("Invalid credentials");
            next();
        });

3. Multi-Method Middleware (useForMulti)

app.useForMulti({"send_message","join_room"}, [](Session& s,const std::string&,std::vector<uint8_t>&,
        NextFunc n){
            if(!s.isAuthenticated()) throw ErrorObj("Auth required");
            n();
        });

๐Ÿ“ž Registering RPCs & Replying

The core of your application is the set of RPC handlers that process client requests. You register them on the App instance.

registerRPC(method, handler)

This method binds a handler to a specific method name. The handler is a function that receives the raw payload and an RpcContext object. The handler is responsible for parsing the incoming payload and serializing the outgoing payload before replying.

The IProtocol interface (MsgPackProtocol, SimpleTextProtocol, etc.) is responsible for framing the data, not for serializing your application-specific data structures. It wraps your payload (std::vector<uint8_t>) with a method name.

Here is a corrected example using nlohmann/json to handle the application-level data serialization.

// main.cpp
#include "binaryrpc/core/app.hpp"
#include "binaryrpc/core/protocol/msgpack_protocol.hpp"
#include <nlohmann/json.hpp> // You'll need a JSON library
#include <iostream>

int main() {
    auto& app = binaryrpc::App::getInstance();
    
    // Use the MsgPack protocol for framing
    auto protocol = std::make_shared<binaryrpc::MsgPackProtocol>();
    app.setProtocol(protocol);

    // Create a FrameworkAPI instance to interact with sessions
    FrameworkAPI api(&app.getSessionManager(), app.getTransport());

    // Register a "login" handler
    app.registerRPC("login", [&](const std::vector<uint8_t>& payload, binaryrpc::RpcContext& ctx) {
        nlohmann::json response_data;
        try {
            // 1. Parse the incoming payload
            auto request = nlohmann::json::parse(payload);
            std::string username = request.value("username", "");
            std::string password = request.value("password", "");

            // 2. Authenticate the user
            if (username == "admin" && password == "password") {
                std::cout << "Login successful for: " << username << std::endl;
                
                // 3. Store state using FrameworkAPI
                api.setField(ctx.session().id(), "isAuthenticated", true, false);
                api.setField(ctx.session().id(), "username", username, true); // indexed

                // 4. Prepare the response payload
                response_data["status"] = "success";
                response_data["ts"] = std::chrono::duration_cast<std::chrono::milliseconds>(
                        std::chrono::system_clock::now().time_since_epoch()).count();

            } else {
                response_data["status"] = "error";
                response_data["reason"] = "Invalid credentials";
            }

        } catch (const std::exception& e) {
            std::cerr << "Error in login handler: " << e.what() << std::endl;
            response_data["status"] = "error";
            response_data["reason"] = "Internal server error";
        }

        // 5. Serialize the response data and reply
        std::string response_str = response_data.dump();
        std::vector<uint8_t> response_bytes(response_str.begin(), response_str.end());
        
        // Use the protocol to frame the response
        ctx.reply(protocol->serialize("login_response", response_bytes));
    });

    // Add an authentication middleware for "get_profile"
    app.useFor("get_profile", [&](Session& session, const std::string& method, std::vector<uint8_t>& payload,
     NextFunc next) {
        auto isAuthenticated = api.getField<bool>(session.id(), "isAuthenticated");

        if (!isAuthenticated.value_or(false)) {
            // User is not authenticated. Send an error response directly and stop the chain.
            nlohmann::json err_data = {
                {"status", "error"},
                {"reason", "Authentication required"}
            };
            std::string err_str = err_data.dump();
            std::vector<uint8_t> err_bytes(err_str.begin(), err_str.end());
            
            // Frame the error response
            auto framed_error = protocol->serialize("auth_error", err_bytes);
            
            // Send it directly to the session and DO NOT call next()
            // Note: This creates a shared_ptr copy of the session to pass to the API.
            api.sendToSession(std::make_shared<Session>(session), framed_error);
            return; 
        }
        
        // User is authenticated, proceed to the RPC handler
        next();
    });

    // Register another handler - now protected by middleware
    app.registerRPC("get_profile", [&](const auto& payload, auto& ctx) {
        // Auth check is now in the middleware, we can proceed directly.
        auto username = api.getField<std::string>(ctx.session().id(), "username");
        
        nlohmann::json profile_data;
        profile_data["username"] = username.value_or("N/A");
        // ... other profile data

        std::string profile_str = profile_data.dump();
        std::vector<uint8_t> profile_bytes(profile_str.begin(), profile_str.end());
        
        ctx.reply(protocol->serialize("profile_data", profile_bytes));
    });

    app.run(9010);
    return 0;
}

The RpcContext

The RpcContext is your gateway to interacting with the client and their session:

Delivery of replies respects the QoS settings of the transport, just like sendToSession() does (including ACKs, retries, and the offline outbox).


๐Ÿ”Œ How clients connect

Server-side expectations

clientId   โ‡’ opaque user string (e.g. email)
        deviceId   โ‡’ integer; each device gets a stable ID
        sessionToken (optional) โ‡’ 32-hex chars to resume previous session
        

JavaScript reference client

import "./binaryrpc.js";

        const rpc = new BinaryRPC("ws://localhost:9010", {
            onConnect() { rpc.call("join"); },
            onError: console.error
        });

        rpc.connect("user-42", "web", null);

        // laterโ€ฆ
        rpc.call("broadcast", { message: "hello" });
        

Note: The documentation is currently in this state. The framework is actively being used in projects at the company Iโ€™m currently working for. Iโ€™ll be publishing it as open-source in the near future. 15.06.2025 last updated