Appearance
@actuallab/rpc
The RPC package provides WebSocket-based communication with .NET ActualLab.Rpc servers. It handles connection management, serialization, streaming, reconnection, and the system call protocol.
See ActualLab.Rpc in .NET for the full conceptual overview.
Key Differences from .NET
| Aspect | .NET | TypeScript |
|---|---|---|
| Service definition | Reflection on C# interfaces | Explicit defineRpcService() or @rpcService/@rpcMethod decorators |
| Client creation | DI: fusion.AddClient<T>() | hub.addClient<T>(peer, def) |
| Transport | Pluggable (WebSocket, WebTransport) | WebSocket only (browser + Node.js) |
| Serialization | MemoryPack, MessagePack, System.Text.Json | JSON only (json5np — no polymorphism) |
| Reconnection | RpcClientPeerReconnectDelayer profiles | Exponential backoff: 1s → 60s max |
| Middleware | IRpcMiddleware[] pipeline | Direct dispatch (no middleware) |
| Versioning | API version sets in handshake | No versioning |
Defining Services
There are two ways to define RPC services:
Option 1: defineRpcService (Recommended for Clients)
ts
import { defineRpcService, RpcType } from "@actuallab/rpc";
const SimpleServiceDef = defineRpcService("ISimpleService", {
// Regular method: (message: string) → string
Greet: { args: [""] },
// Streaming method: () → RpcStream<number>
Counter: { args: [], returns: RpcType.stream },
// Fire-and-forget: (message: string) → void
Ping: { args: [""], returns: RpcType.noWait },
});Each method entry specifies:
| Field | Description |
|---|---|
args | Array of example values (used only for args.length to determine argument count) |
returns | Optional: RpcType.stream, RpcType.noWait, or omit for regular |
callTypeId | Optional: custom call type (used by @actuallab/fusion-rpc for compute calls) |
wireArgCount | Optional: override wire argument count (default: args.length + 1 to account for CancellationToken) |
Option 2: Decorators (for Server Implementations)
ts
import { rpcService, rpcMethod, RpcType } from "@actuallab/rpc";
@rpcService("ISimpleService")
abstract class ISimpleService {
@rpcMethod()
greet(message: string): Promise<string> { throw ""; }
@rpcMethod({ returns: RpcType.stream })
counter(): Promise<AsyncIterable<number>> { throw ""; }
@rpcMethod({ returns: RpcType.noWait })
ping(message: string): void { throw ""; }
}TIP
Decorator-based definitions work with both RpcHub.addClient() and RpcHub.addService() — the hub extracts the service definition from decorator metadata automatically.
RpcHub
Central coordinator that manages peers, services, and configuration.
ts
import { RpcHub } from "@actuallab/rpc";
const hub = new RpcHub();
// Register a server-side service implementation
hub.addService(SimpleServiceDef, {
Greet: (message: string) => `Hello, ${message}!`,
Counter: async function*() {
for (let i = 0; ; i++) {
yield i;
await new Promise(r => setTimeout(r, 100));
}
},
Ping: (message: string) => { console.log(`Ping: ${message}`); },
});
// Create a client proxy
const peer = hub.getClientPeer("ws://localhost:5005/rpc/ws");
const client = hub.addClient<ISimpleService>(peer, SimpleServiceDef);
hub.close(); // Close all peers| Member | Description |
|---|---|
.hubId | Auto-generated UUID |
.peers | Map<string, RpcPeer> of all managed peers |
.serviceHost | Dispatches inbound calls to registered service implementations |
.addPeer(peer) | Register a peer |
.getPeer(ref) | Get or create a peer (client or server based on ref prefix) |
.getClientPeer(ref) | Get or create a client peer |
.getServerPeer(ref) | Get or create a server peer |
.addService(def, impl) | Register a service implementation |
.addClient<T>(peer, def) | Create a typed client proxy on a peer |
.close() | Close all peers |
RpcClientPeer
Client-side peer that manages a WebSocket connection with automatic reconnection.
ts
import { RpcClientPeer } from "@actuallab/rpc";
const peer = new RpcClientPeer(hub, "ws://localhost:5005/rpc/ws");
hub.addPeer(peer);
// Start the reconnection loop (runs until peer.close())
void peer.run();
// Events
peer.connected.add(() => console.log("Connected"));
peer.disconnected.add(({ code, reason }) => console.log(`Disconnected: ${reason}`));
peer.peerChanged.add(() => console.log("Server restarted"));
// Connection state
peer.isConnected; // boolean
peer.connectionKind; // Disconnected | Connecting | ConnectedConnection Lifecycle
peer.run()starts the reconnection loop- Opens a WebSocket to the URL + query params (
clientId,f=json5np) - Exchanges handshakes with the server
- Detects server restarts via
RemoteHubIdcomparison (peerChangedevent) - On disconnect, waits (exponential backoff), then reconnects
- Outbound calls made while disconnected are buffered and sent on reconnect
Reconnection
ts
// Default exponential backoff: 1s → 60s
peer.reconnectDelayer;
// Force immediate reconnection
peer.reconnectDelayer.cancelDelays();
// Track when next reconnect will happen
peer.reconnectsAt; // timestamp (ms) or 0
peer.reconnectsAtChanged.add(() => { /* update UI */ });RpcServerPeer
Server-side peer that wraps an accepted connection.
ts
import { RpcServerPeer, RpcWebSocketConnection } from "@actuallab/rpc";
// Accept incoming WebSocket (e.g., in a Node.js server)
const ref = `server://${crypto.randomUUID()}`;
const peer = hub.getServerPeer(ref);
const conn = new RpcWebSocketConnection(ws);
peer.accept(conn);RpcStream<T>
Client-side stream consumer — implements AsyncIterable<T> for for await...of. See RpcStream in .NET for the full conceptual overview.
ts
// Assuming Counter() returns RpcStream<number>
const stream = await client.Counter();
for await (const item of stream) {
console.log(item); // 0, 1, 2, ...
if (item > 10) break;
}Key Characteristics
| Feature | Behavior |
|---|---|
| Enumeration | Can only be iterated once |
| Flow control | Built-in ack-based backpressure (ackPeriod, ackAdvance) |
| Reconnection | Automatically resumes from last received index |
| Nested streams | Stream refs inside returned objects are auto-resolved |
| Cancellation | break from for await sends AckEnd to the server |
Nested Streams
When a method returns an object containing stream fields, the RPC layer automatically resolves stream reference strings into live RpcStream instances:
ts
// .NET service returns Table<int> with an RpcStream<Row<int>> Rows field
const table = await client.GetTable("My Table");
console.log(table.Title);
for await (const row of table.Rows) {
for await (const item of row.Items) {
console.log(item);
}
}Fire-and-Forget (NoWait)
Methods marked with RpcType.noWait send a message without waiting for a response. The call is silently dropped if the peer is disconnected.
ts
// Define
const def = defineRpcService("INotifier", {
Notify: { args: [""], returns: RpcType.noWait },
});
// Use — returns void (synchronous)
client.Notify("something happened");RpcPeerStateMonitor
High-level connection state monitor with JustConnected/JustDisconnected transitions — useful for building connection status UI.
ts
import { RpcPeerStateMonitor, RpcPeerStateKind } from "@actuallab/rpc";
const monitor = new RpcPeerStateMonitor(peer);
monitor.stateChanged.add((state) => {
switch (state.kind) {
case RpcPeerStateKind.Connected:
// Stable connection (after JustConnected grace period)
break;
case RpcPeerStateKind.JustConnected:
// Just connected (within 1.5s of connect)
break;
case RpcPeerStateKind.JustDisconnected:
// Just disconnected (within 3s of disconnect)
break;
case RpcPeerStateKind.Disconnected:
// Fully disconnected
console.log(`Reconnects in: ${state.reconnectsIn}ms`);
break;
}
});
// Clean up
monitor.dispose();| State | Description |
|---|---|
JustConnected | Within 1.5 seconds of connecting |
Connected | Stably connected |
JustDisconnected | Within 3 seconds of disconnecting (hides brief blips) |
Disconnected | Fully disconnected; state.reconnectsIn shows countdown |
The JustConnected and JustDisconnected states provide grace periods so the UI can avoid flashing connection status changes during brief network blips.
Wire Protocol
The TypeScript port uses the same wire format as .NET ActualLab.Rpc:
- Messages are JSON objects with
Method,RelatedId,CallType, andHeadersfields - Arguments follow the envelope, separated by
\x02(ARG_DELIMITER) - Multiple messages per frame, separated by
\x03(FRAME_DELIMITER) - Serialization format:
json5np(System.Text.Json without polymorphic type wrapping)
System calls use $sys.* method names ($sys.Ok, $sys.Error, $sys.Cancel, $sys.KeepAlive, $sys.I, $sys.B, $sys.End, $sys.Ack, $sys.AckEnd).
