Skip to content

@actuallab/rpc

The RPC package provides WebSocket-based communication with .NET ActualLab.Rpc servers. It handles connection management, serialization, streaming, reconnection, and the system call protocol.

See ActualLab.Rpc in .NET for the full conceptual overview.

Key Differences from .NET

Aspect.NETTypeScript
Service definitionReflection on C# interfacesExplicit defineRpcService() or @rpcService/@rpcMethod decorators
Client creationDI: fusion.AddClient<T>()hub.addClient<T>(peer, def)
TransportPluggable (WebSocket, WebTransport)WebSocket only (browser + Node.js)
SerializationMemoryPack, MessagePack, System.Text.JsonJSON only (json5np — no polymorphism)
ReconnectionRpcClientPeerReconnectDelayer profilesExponential backoff: 1s → 60s max
MiddlewareIRpcMiddleware[] pipelineDirect dispatch (no middleware)
VersioningAPI version sets in handshakeNo versioning

Defining Services

There are two ways to define RPC services:

ts
import { defineRpcService, RpcType } from "@actuallab/rpc";

const SimpleServiceDef = defineRpcService("ISimpleService", {
  // Regular method: (message: string) → string
  Greet:   { args: [""] },

  // Streaming method: () → RpcStream<number>
  Counter: { args: [], returns: RpcType.stream },

  // Fire-and-forget: (message: string) → void
  Ping:    { args: [""], returns: RpcType.noWait },
});

Each method entry specifies:

FieldDescription
argsArray of example values (used only for args.length to determine argument count)
returnsOptional: RpcType.stream, RpcType.noWait, or omit for regular
callTypeIdOptional: custom call type (used by @actuallab/fusion-rpc for compute calls)
wireArgCountOptional: override wire argument count (default: args.length + 1 to account for CancellationToken)

Option 2: Decorators (for Server Implementations)

ts
import { rpcService, rpcMethod, RpcType } from "@actuallab/rpc";

@rpcService("ISimpleService")
abstract class ISimpleService {
  @rpcMethod()
  greet(message: string): Promise<string> { throw ""; }

  @rpcMethod({ returns: RpcType.stream })
  counter(): Promise<AsyncIterable<number>> { throw ""; }

  @rpcMethod({ returns: RpcType.noWait })
  ping(message: string): void { throw ""; }
}

TIP

Decorator-based definitions work with both RpcHub.addClient() and RpcHub.addService() — the hub extracts the service definition from decorator metadata automatically.

RpcHub

Central coordinator that manages peers, services, and configuration.

ts
import { RpcHub } from "@actuallab/rpc";

const hub = new RpcHub();

// Register a server-side service implementation
hub.addService(SimpleServiceDef, {
  Greet: (message: string) => `Hello, ${message}!`,
  Counter: async function*() {
    for (let i = 0; ; i++) {
      yield i;
      await new Promise(r => setTimeout(r, 100));
    }
  },
  Ping: (message: string) => { console.log(`Ping: ${message}`); },
});

// Create a client proxy
const peer = hub.getClientPeer("ws://localhost:5005/rpc/ws");
const client = hub.addClient<ISimpleService>(peer, SimpleServiceDef);

hub.close();  // Close all peers
MemberDescription
.hubIdAuto-generated UUID
.peersMap<string, RpcPeer> of all managed peers
.serviceHostDispatches inbound calls to registered service implementations
.addPeer(peer)Register a peer
.getPeer(ref)Get or create a peer (client or server based on ref prefix)
.getClientPeer(ref)Get or create a client peer
.getServerPeer(ref)Get or create a server peer
.addService(def, impl)Register a service implementation
.addClient<T>(peer, def)Create a typed client proxy on a peer
.close()Close all peers

RpcClientPeer

Client-side peer that manages a WebSocket connection with automatic reconnection.

ts
import { RpcClientPeer } from "@actuallab/rpc";

const peer = new RpcClientPeer(hub, "ws://localhost:5005/rpc/ws");
hub.addPeer(peer);

// Start the reconnection loop (runs until peer.close())
void peer.run();

// Events
peer.connected.add(() => console.log("Connected"));
peer.disconnected.add(({ code, reason }) => console.log(`Disconnected: ${reason}`));
peer.peerChanged.add(() => console.log("Server restarted"));

// Connection state
peer.isConnected;       // boolean
peer.connectionKind;    // Disconnected | Connecting | Connected

Connection Lifecycle

  1. peer.run() starts the reconnection loop
  2. Opens a WebSocket to the URL + query params (clientId, f=json5np)
  3. Exchanges handshakes with the server
  4. Detects server restarts via RemoteHubId comparison (peerChanged event)
  5. On disconnect, waits (exponential backoff), then reconnects
  6. Outbound calls made while disconnected are buffered and sent on reconnect

Reconnection

ts
// Default exponential backoff: 1s → 60s
peer.reconnectDelayer;

// Force immediate reconnection
peer.reconnectDelayer.cancelDelays();

// Track when next reconnect will happen
peer.reconnectsAt;  // timestamp (ms) or 0
peer.reconnectsAtChanged.add(() => { /* update UI */ });

RpcServerPeer

Server-side peer that wraps an accepted connection.

ts
import { RpcServerPeer, RpcWebSocketConnection } from "@actuallab/rpc";

// Accept incoming WebSocket (e.g., in a Node.js server)
const ref = `server://${crypto.randomUUID()}`;
const peer = hub.getServerPeer(ref);
const conn = new RpcWebSocketConnection(ws);
peer.accept(conn);

RpcStream<T>

Client-side stream consumer — implements AsyncIterable<T> for for await...of. See RpcStream in .NET for the full conceptual overview.

ts
// Assuming Counter() returns RpcStream<number>
const stream = await client.Counter();

for await (const item of stream) {
  console.log(item);  // 0, 1, 2, ...
  if (item > 10) break;
}

Key Characteristics

FeatureBehavior
EnumerationCan only be iterated once
Flow controlBuilt-in ack-based backpressure (ackPeriod, ackAdvance)
ReconnectionAutomatically resumes from last received index
Nested streamsStream refs inside returned objects are auto-resolved
Cancellationbreak from for await sends AckEnd to the server

Nested Streams

When a method returns an object containing stream fields, the RPC layer automatically resolves stream reference strings into live RpcStream instances:

ts
// .NET service returns Table<int> with an RpcStream<Row<int>> Rows field
const table = await client.GetTable("My Table");
console.log(table.Title);
for await (const row of table.Rows) {
  for await (const item of row.Items) {
    console.log(item);
  }
}

Fire-and-Forget (NoWait)

Methods marked with RpcType.noWait send a message without waiting for a response. The call is silently dropped if the peer is disconnected.

ts
// Define
const def = defineRpcService("INotifier", {
  Notify: { args: [""], returns: RpcType.noWait },
});

// Use — returns void (synchronous)
client.Notify("something happened");

RpcPeerStateMonitor

High-level connection state monitor with JustConnected/JustDisconnected transitions — useful for building connection status UI.

ts
import { RpcPeerStateMonitor, RpcPeerStateKind } from "@actuallab/rpc";

const monitor = new RpcPeerStateMonitor(peer);

monitor.stateChanged.add((state) => {
  switch (state.kind) {
    case RpcPeerStateKind.Connected:
      // Stable connection (after JustConnected grace period)
      break;
    case RpcPeerStateKind.JustConnected:
      // Just connected (within 1.5s of connect)
      break;
    case RpcPeerStateKind.JustDisconnected:
      // Just disconnected (within 3s of disconnect)
      break;
    case RpcPeerStateKind.Disconnected:
      // Fully disconnected
      console.log(`Reconnects in: ${state.reconnectsIn}ms`);
      break;
  }
});

// Clean up
monitor.dispose();
StateDescription
JustConnectedWithin 1.5 seconds of connecting
ConnectedStably connected
JustDisconnectedWithin 3 seconds of disconnecting (hides brief blips)
DisconnectedFully disconnected; state.reconnectsIn shows countdown

The JustConnected and JustDisconnected states provide grace periods so the UI can avoid flashing connection status changes during brief network blips.

Wire Protocol

The TypeScript port uses the same wire format as .NET ActualLab.Rpc:

  • Messages are JSON objects with Method, RelatedId, CallType, and Headers fields
  • Arguments follow the envelope, separated by \x02 (ARG_DELIMITER)
  • Multiple messages per frame, separated by \x03 (FRAME_DELIMITER)
  • Serialization format: json5np (System.Text.Json without polymorphic type wrapping)

System calls use $sys.* method names ($sys.Ok, $sys.Error, $sys.Cancel, $sys.KeepAlive, $sys.I, $sys.B, $sys.End, $sys.Ack, $sys.AckEnd).