ADR 011: Runtime Message Routing
- Date: 2025-01-11
- Status: Accepted
- Applies to: Runtime
- Tags: runtime, routing, handlers, dispatch
Context
The runtime's Router layer dispatches incoming frames to handlers. The existing specs define subject namespaces (ADR-006, ADR-008) and RPC envelope structure (ADR-010), but do not specify:
- How handlers are registered and matched
- Dispatch ordering when multiple handlers match
- Error propagation from handlers
- Per-peer vs global handler scoping
Key design goals:
- Predictable: Deterministic dispatch ordering
- Composable: Works for RPC, events, and custom protocols
- Extensible: Custom prefixes without hacks
- Safe: Hard to accidentally misuse (e.g., frameId reuse)
Decision
1. Router API
The Router manages handler registration and dispatch:
interface Router {
/** Register handler for exact subject match */
route(subject: string, handler: MessageHandler, options?: RouteOptions): Unsubscribe;
/** Register handler for subject prefix */
routePrefix(prefix: string, handler: MessageHandler, options?: RouteOptions): Unsubscribe;
/** Remove all handlers for a subject */
unroute(subject: string): void;
/** Clear all handlers */
clear(): void;
}
interface RouteOptions {
mode?: "exclusive" | "broadcast"; // Default: inferred from prefix
}
type Unsubscribe = () => void;
type MessageHandler = (msg: InboundMessage) => Promise<void> | void;2. InboundMessage (not MessageContext)
The handler receives an InboundMessage with clear separation between general messaging and RPC:
interface InboundMessage {
// Core message data
readonly subject: Subject;
readonly payload: Uint8Array;
readonly peerId: PeerId;
readonly session: Session;
// Low-level frame access (read-only)
readonly frame: Readonly<MessageFrame>;
/** Send a response message (generates fresh frameId) */
send(subject: Subject, data: Uint8Array): Promise<void>;
/** RPC context (only present for rpc/ subjects with valid envelope) */
readonly rpc?: RpcContext;
}
interface RpcContext {
readonly method: string;
readonly params: unknown;
readonly cid: FrameId;
/** Reply with success result */
reply(result?: unknown): Promise<void>;
/** Reply with error */
error(code: number, message: string, data?: unknown): Promise<void>;
}Key design:
send()is always available (generates fresh frameId internally)rpcis only present when envelope decodes successfullyframeis read-only to prevent accidental frameId reuse; exposed for diagnostics and tracing only—application logic SHOULD NOT depend on frame internalsrpc.reply()andrpc.error()handle correlation automatically
3. Subject Validation Policy
Subject prefixes are policy-driven, not type-locked:
interface SubjectPolicy {
/** Required prefixes (default: ["rpc/", "event/", "stream/", "app/"]) */
allowedPrefixes: string[];
/** Prefixes that are reserved/rejected (default: ["stream/"]) */
reservedPrefixes: string[];
/** Custom classifier for dispatch semantics */
classify?(subject: string): SubjectKind;
}
type SubjectKind = "rpc" | "event" | "custom" | "reserved";Validation order:
- Check
reservedPrefixes→ reject withErrorFrame{code: 1003}(UnsupportedFeature) - Check
allowedPrefixes→ reject withErrorFrame{code: 1002}(InvalidFrame) if no match - Run
classify()if provided → determines dispatch semantics
Precedence: reservedPrefixes always takes precedence over allowedPrefixes. If a prefix appears in both, it is reserved.
Classification invariant: If classify() returns "rpc", the message MUST contain a valid RPC envelope; otherwise the message is rejected with ErrorFrame{code: 1002} (InvalidFrame: malformed payload). This prevents semantic spoofing where non-RPC messages bypass envelope validation.
Default validation:
| Prefix | Kind | Default mode | Behavior |
|---|---|---|---|
rpc/ | rpc | exclusive | Single handler, must reply |
event/ | event | broadcast | Fan-out, no reply |
stream/ | reserved | — | Reject with ErrorFrame(1003) |
app/ | custom | broadcast | User-defined semantics |
Future extensibility: Reserved prefixes (e.g., stream/) may be reclassified by negotiators in future protocol versions. V2 negotiators can advertise streaming capability and override the default reserved status.
Extensibility:
// Allow custom prefix
const router = createRouter({
subjectPolicy: {
allowedPrefixes: ["rpc/", "event/", "app/", "debug/", "admin/"],
reservedPrefixes: ["stream/"],
}
});
router.routePrefix("debug/", debugHandler);
router.routePrefix("admin/", adminHandler, { mode: "exclusive" });4. Dispatch Ordering
When multiple handlers match, dispatch follows deterministic rules:
Matching priority:
- Exact match handlers (highest priority)
- Prefix handlers (longest prefix first)
Within each bucket:
- Registration order (first registered, first called)
Invocation:
| Mode | Behavior |
|---|---|
exclusive | First matching handler only; others ignored |
broadcast | All matching handlers invoked sequentially |
Why sequential broadcast? Handlers are awaited sequentially—the next handler is invoked only after the previous one resolves. This ensures deterministic side-effects. Concurrent invocation would introduce race conditions and make debugging harder. If parallel execution is needed, handlers can spawn their own async tasks.
Example:
router.route("event/user.joined", handlerA); // exact
router.routePrefix("event/user.", handlerB); // prefix (longer)
router.routePrefix("event/", handlerC); // prefix (shorter)
// Incoming: "event/user.joined"
// Dispatch order: handlerA (exact), then handlerB (longer prefix), then handlerC
// For broadcast mode: all three called in order
// For exclusive mode: only handlerA called5. RPC Dispatch Rules
For subjects classified as rpc:
- Decode RPC envelope from payload
- If envelope decoding fails → emit
ErrorFrame{code: 1002}(InvalidFrame: malformed payload) - If no handler → reply
RpcError{code: 1101, message: "Method not found"} - If handler doesn't reply within timeout → reply
RpcError{code: 1103, message: "Handler timeout"} - If handler throws → reply
RpcError{code: 2000, message: error.message}(default mapper)
Note: The frame itself is valid; only the payload (envelope) is malformed. This is a non-fatal error per SBP InvalidFrame semantics—the connection remains open; only the specific request fails.
Timeout configuration:
interface RouterConfig {
rpcTimeoutMs: number; // Default: 30000 (30s)
}6. Event Dispatch Rules
For event/ subjects:
- Decode envelope (expect
RpcNotification) - If decode fails → log warning, drop (no response)
- Fan-out to all matching handlers (broadcast mode)
- Handler errors → log warning, continue to next handler
- No reply expected;
msg.rpcis undefined
7. Error Propagation
Error handling is configurable via error mapper:
interface RouterConfig {
/** Map handler errors to RPC error responses */
errorMapper?: (error: Error, msg: InboundMessage) => RpcErrorPayload;
}
interface RpcErrorPayload {
code: number;
message: string;
data?: unknown;
}Default error mapper:
const defaultErrorMapper = (error: Error): RpcErrorPayload => ({
code: 2000, // Application error range
message: error.message,
});Custom error mapper example:
const router = createRouter({
errorMapper: (error, msg) => {
if (error instanceof ValidationError) {
return { code: 2001, message: "Validation failed", data: error.details };
}
if (error instanceof NotFoundError) {
return { code: 2002, message: "Resource not found" };
}
return { code: 2000, message: error.message };
}
});Error code ranges (see docs/protocols/error-codes.md):
| Range | Owner | Examples |
|---|---|---|
| 1000–1099 | SBP (protocol) | InvalidFrame, ProtocolViolation |
| 1100–1199 | RPC (runtime) | UnsupportedMethod (1101), Timeout (1103) |
| 2000+ | Application | User-defined |
8. Per-Session vs Global Handlers
Handlers can be registered at two levels:
// Global: survives session reconnects
runtime.router.route("rpc/getStatus", handler);
// Per-session: auto-cleared on session close
session.router.route("rpc/sessionSpecific", handler);Use cases:
- Global: Service methods that should always be available
- Per-session: Handlers tied to session state (e.g., authenticated user context)
Cleanup:
session.close()→ clears session-scoped handlersruntime.close()→ clears all handlers
Cleanup timing: Session-scoped handlers are cleared after the closed event is emitted. This allows cleanup logic in event handlers to still inspect registered handlers if needed.
9. Preventing frameId Misuse
The API makes it hard to accidentally reuse frameId:
// WRONG: Exposing frame for modification
ctx.frame.frameId // This would allow reuse
// RIGHT: Read-only frame, send() generates new frameId
msg.send(subject, data); // Always fresh frameId
// RIGHT: RPC reply uses cid for correlation, fresh frameId for response
msg.rpc.reply(result); // cid from request, new frameIdImplementation:
async function send(subject: Subject, data: Uint8Array): Promise<void> {
const frame = createMessageFrame(subject, data); // Generates new frameId
await session.transport.send(encodeFrame(frame));
}
async function rpcReply(result: unknown): Promise<void> {
const envelope: RpcSuccess = { t: "R", cid: this.cid, result };
const frame = createMessageFrame(this.subject, encodeRpcEnvelope(envelope));
await session.transport.send(encodeFrame(frame));
}Alternatives Considered
| Alternative | Why Rejected |
|---|---|
Type-locked SubjectPrefix union | Prevents custom prefixes; forces workarounds via app/ |
reply() always available | Confusing for events; RPC-specific methods belong in rpc context |
| Concurrent dispatch for all modes | Unpredictable ordering; sequential is safer default |
| Global handlers only | Forces manual filtering by peerId; per-session is cleaner |
| Glob/regex patterns | Over-engineering for v1; prefix matching covers 95% of cases |
Consequences
- Predictable dispatch: Exact > prefix, registration order, documented rules
- Safe API:
send()always generates fresh frameId;frameis read-only - Extensible prefixes: Policy-driven, not type-locked
- Clear RPC boundary:
msg.rpconly present when appropriate - Flexible error handling: Custom mappers for domain-specific codes
- Two scopes: Global handlers persist; session handlers auto-clean
References
- ADR-006 (RPC envelope, subject namespaces)
- ADR-008 (subject namespace validation)
- ADR-009 (session lifecycle)
- ADR-010 (RPC correlation via
cid) docs/protocols/sbp/errors.md(error code ranges)