Appearance
Streaming with RpcStream
RpcStream<T> enables efficient streaming of data over RPC connections. Unlike traditional RPC where you return a complete list, streaming allows you to:
- Send data incrementally as it becomes available
- Handle large datasets without loading everything into memory
- Build real-time data feeds
- Nest streams within other data structures
Why RpcStream Instead of IAsyncEnumerable?
You might wonder why ActualLab.Rpc uses a dedicated RpcStream<T> type instead of the standard IAsyncEnumerable<T>. The key reasons are:
Serialization control: ActualLab.Rpc supports multiple serialization formats – System.Text.Json, Newtonsoft.Json, MemoryPack, and MessagePack. Each has different serialization behavior, and
RpcStream<T>provides custom converters for each format to ensure streams are serialized correctly as lightweight references rather than materialized collections.Bidirectional streaming: Unlike
IAsyncEnumerable<T>which is read-only,RpcStream<T>supports both server-to-client and client-to-server streaming. The same type works in both directions.Embedding in data structures:
RpcStream<T>can be embedded as a property in records and classes. When these objects are serialized, the stream is serialized as a reference ID – the actual stream data flows through a separate channel. This enables nested streams (streams containing objects that contain their own streams).Flow control:
RpcStream<T>has built-in acknowledgment-based backpressure (AckPeriodandAckAdvanceproperties) to prevent producers from overwhelming consumers.Reconnection handling:
RpcStream<T>integrates with ActualLab.Rpc's reconnection mechanism, allowing streams to resume after network interruptions.
RpcStream API Overview
RpcStream<T> has a simple API:
| Member | Description |
|---|---|
RpcStream.New<T>(IAsyncEnumerable<T>) | Creates a stream from an async enumerable (server side) |
RpcStream.New<T>(IEnumerable<T>) | Creates a stream from a synchronous enumerable |
GetAsyncEnumerator() | Implements IAsyncEnumerable<T> for consumption |
AckPeriod | How often the consumer sends acknowledgments (default: 30 items) |
AckAdvance | How many items the producer can send ahead (default: 61 items) |
BatchSize | How many items are batched together for transmission (default: 64, max: 1024) |
IsReconnectable | Whether the stream can resume after disconnection (default: true) |
Single Enumeration
Remote streams can only be enumerated once. Attempting to enumerate a remote RpcStream<T> multiple times will throw an exception.
Creating an RpcStream (Server Side)
Use RpcStream.New() to create a stream from an IAsyncEnumerable<T> or IEnumerable<T>:
cs
public interface ISimpleService : IRpcService
{
// Returns a table with streaming rows
Task<Table<int>> GetTable(string title, CancellationToken cancellationToken = default);
}
// A record containing an RpcStream
[DataContract, MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record Table<T>(
[property: DataMember(Order = 0), MemoryPackOrder(0)] string Title,
[property: DataMember(Order = 1), MemoryPackOrder(1)] RpcStream<Row<T>> Rows);
[DataContract, MemoryPackable(GenerateType.VersionTolerant)]
public sealed partial record Row<T>(
[property: DataMember(Order = 0), MemoryPackOrder(0)] int Index,
[property: DataMember(Order = 1), MemoryPackOrder(1)] RpcStream<T> Items);The implementation creates streams from async enumerables:
cs
public class SimpleService : ISimpleService
{
public Task<Table<int>> GetTable(string title, CancellationToken cancellationToken = default)
{
// Create the table with a stream of rows
var table = new Table<int>(title, RpcStream.New(GetRows(CancellationToken.None)));
return Task.FromResult(table);
}
private async IAsyncEnumerable<Row<int>> GetRows(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
for (var i = 0; ; i++) {
await Task.Delay(100, cancellationToken); // Simulate work
// Each row contains its own stream of items
yield return new Row<int>(i, RpcStream.New(GetItems(i, CancellationToken.None)));
}
}
private async IAsyncEnumerable<int> GetItems(
int index, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
for (var i = 0; ; i++) {
await Task.Delay(50, cancellationToken); // Simulate work
yield return index * i;
}
}
}Consuming an RpcStream (Client Side)
RpcStream<T> implements IAsyncEnumerable<T>, so you can use await foreach:
cs
var table = await simpleService.GetTable("My Table", cancellationToken);
Console.WriteLine($"Table: {table.Title}");
await foreach (var row in table.Rows.WithCancellation(cancellationToken)) {
Console.WriteLine($"Row {row.Index}:");
await foreach (var item in row.Items.WithCancellation(cancellationToken)) {
Console.WriteLine($" Item: {item}");
// Break early if you've seen enough
if (item > 100)
break;
}
}Sending Streams to the Server
You can also send streams from the client to the server:
cs
public interface ISimpleService : IRpcService
{
// Server receives a stream from the client and computes the sum
Task<int> Sum(RpcStream<int> stream, CancellationToken cancellationToken = default);
}Server implementation:
cs
public Task<int> Sum(RpcStream<int> stream, CancellationToken cancellationToken = default)
=> stream.SumAsync(cancellationToken).AsTask();Client usage:
cs
// Create a stream from local data
var numbers = new[] { 1, 2, 3, 4, 5 };
var stream = RpcStream.New(numbers);
// Send to server for processing
var sum = await simpleService.Sum(stream, cancellationToken);
Console.WriteLine($"Sum: {sum}"); // Output: Sum: 15RpcStream Serialization
RpcStream<T> can be embedded in any serializable record or class. Use the standard serialization attributes:
cs
[DataContract, MemoryPackable(GenerateType.VersionTolerant), MessagePackObject(true)]
public sealed partial record Table<T>(
[property: DataMember(Order = 0), MemoryPackOrder(0)] string Title,
[property: DataMember(Order = 1), MemoryPackOrder(1)] RpcStream<Row<T>> Rows);Nested Streams
RpcStream supports nesting – you can have streams of records that contain their own streams. This is useful for hierarchical data like tables with rows, where each row has its own item stream.
Key Characteristics
| Feature | Behavior |
|---|---|
| Direction | Bidirectional – server-to-client and client-to-server |
| Enumeration | Remote streams can only be enumerated once |
| Backpressure | Built-in acknowledgment mechanism (configurable via AckPeriod and AckAdvance) |
| Cancellation | Streams can be cancelled from either end |
| Nesting | Streams can be nested within other data structures |
| Reconnection | Streams handle reconnection gracefully (configurable via IsReconnectable) |
Configuration Options
RpcStream<T> has configurable properties for flow control and reconnection behavior:
| Property | Default | Description |
|---|---|---|
AckPeriod | 30 | How often the client sends acknowledgments (every N items) |
AckAdvance | 61 | How many items the server can send ahead before waiting for acks |
BatchSize | 64 | How many items are batched together for transmission (max: 1024) |
IsReconnectable | true | Whether the stream can resume after a connection disruption |
These defaults work well for most scenarios. Adjust them if you need different throughput/latency tradeoffs.
BatchSize
BatchSize controls how many items are grouped together in a single network message. Larger batches reduce network overhead but increase latency for the first items. Unlike AckPeriod and AckAdvance, BatchSize is not serialized – it's a local configuration that only affects the sending side.
Reconnection Behavior
By default, RpcStream<T> handles network disconnections gracefully. When a connection is temporarily lost, the stream automatically resumes from where it left off once the connection is re-established.
IsReconnectable Property
The IsReconnectable property controls whether a stream can resume after a disconnection:
cs
// Default: stream can reconnect after network disruption
var reconnectableStream = RpcStream.New(GetItems());
// Disable reconnection: stream will fail if connection is lost
var nonReconnectableStream = new RpcStream<int>(GetItems()) { IsReconnectable = false };When IsReconnectable is false:
- The stream will throw
RpcStreamNotFoundExceptionif a disconnection occurs - Use this for streams where resuming from an intermediate position doesn't make sense
- Examples: real-time event streams, live data feeds where missed items are unrecoverable
Non-Serializable
IsReconnectable is not serialized – it's a local configuration on the server side that controls how the shared stream handles reconnection attempts.
RpcStreamNotFoundException
RpcStreamNotFoundException is thrown when a stream cannot be found or has been disconnected:
| Scenario | Description |
|---|---|
| Non-reconnectable stream | A stream with IsReconnectable = false throws this exception when the client attempts to reconnect after a disconnection |
| Stream not found | The server no longer has the stream registered (e.g., it was disposed or timed out) |
| Host mismatch | After reconnection, the client connects to a different server instance that doesn't have this stream |
Handle this exception when consuming streams that might be non-reconnectable or long-lived:
cs
try {
await foreach (var item in stream.WithCancellation(cancellationToken)) {
Process(item);
}
}
catch (RpcStreamNotFoundException) {
// Stream was disconnected and cannot be resumed
// Consider re-fetching the stream from the server
}Complete Example
See the TodoApp RpcExamplePage for a complete working example demonstrating:
- Server-to-client streaming with nested streams
- Client-to-server streaming for computation
- Real-time UI updates as stream data arrives
The example streams a table where:
- Each row arrives incrementally
- Each row contains its own stream of items
- The client can compute sums by streaming data back to the server
