CommandR: CQRS and Beyond
ActualLab.CommandR is a CQRS-style command handling library that powers Fusion's distributed command execution and multi-host invalidation.
Why does CommandR exist? The primary reason is the Operations Framework described in Part 5. Operations Framework requires a command execution pipeline to implement multi-host invalidation, operation logging, and other features that make Fusion work reliably in distributed scenarios. CommandR provides exactly the extensible pipeline needed for this.
Key Features
- Unified handler pipeline: Any handler can act as a filter (middleware) or final handler
- CommandContext: An
HttpContext-like type for accessing state during command execution - Convention-based handlers: Use
[CommandHandler]attribute instead of implementing interfaces - Command services with interceptors: Methods marked with
[CommandHandler]can only be invoked through Commander - RPC integration: Seamless distributed command execution with ActualLab.Rpc
If you're familiar with MediatR, see MediatR Comparison for a detailed mapping of concepts.
Required Packages
| Package | Purpose |
|---|---|
| ActualLab.CommandR | Core CommandR: commands, handlers, pipeline, ICommander |
| ActualLab.Rpc | (Optional) Needed only for distributed command execution over RPC |
TIP
If you're using Fusion, ActualLab.Fusion already includes CommandR. You only need to reference ActualLab.CommandR directly if you're using it standalone without Fusion.
Getting Started
1. Define a Command
Commands are simple classes or records implementing ICommand<TResult>:
public class PrintCommand : ICommand<Unit>
{
public string Message { get; set; } = "";
}
// Interface-based command handler
public class PrintCommandHandler : ICommandHandler<PrintCommand>, IDisposable
{
public PrintCommandHandler() => WriteLine("Creating PrintCommandHandler.");
public void Dispose() => WriteLine("Disposing PrintCommandHandler");
public async Task OnCommand(PrintCommand command, CommandContext context, CancellationToken cancellationToken)
{
WriteLine(command.Message);
WriteLine("Sir, yes, sir!");
}
}2. Register and Execute
// Building IoC container
var serviceBuilder = new ServiceCollection()
.AddScoped<PrintCommandHandler>(); // Try changing this to AddSingleton
var rpc = serviceBuilder.AddRpc();
var commanderBuilder = serviceBuilder.AddCommander()
.AddHandlers<PrintCommandHandler>();
var services = serviceBuilder.BuildServiceProvider();
var commander = services.Commander(); // Same as .GetRequiredService<ICommander>()
await commander.Call(new PrintCommand() { Message = "Are you operational?" });
await commander.Call(new PrintCommand() { Message = "Are you operational?" });The output:
Creating PrintCommandHandler.
Are you operational?
Sir, yes, sir!
Disposing PrintCommandHandler
Creating PrintCommandHandler.
Are you operational?
Sir, yes, sir!
Disposing PrintCommandHandlerKey points:
- CommandR doesn't auto-register handler services – you must register them separately
Callcreates a newIServiceScopefor each command invocation- Try changing
AddScopedtoAddSingletonto see the difference
Convention-Based Handlers and CommandContext
You don't need to implement ICommandHandler<T>. Any method with [CommandHandler] works:
public class RecSumCommand : ICommand<long>
{
public long[] Numbers { get; set; } = Array.Empty<long>();
}// Building IoC container
var serviceBuilder = new ServiceCollection()
.AddScoped<RecSumCommandHandler>();
var rpc = serviceBuilder.AddRpc();
var commanderBuilder = serviceBuilder.AddCommander()
.AddHandlers<RecSumCommandHandler>();
var services = serviceBuilder.BuildServiceProvider();
var commander = services.Commander(); // Same as .GetRequiredService<ICommander>()
WriteLine(await commander.Call(new RecSumCommand() { Numbers = new [] { 1L, 2, 3 }}));Convention-based handlers are flexible with arguments:
- First argument: the command
- Last argument:
CancellationToken CommandContextarguments are resolved viaCommandContext.GetCurrent()- Everything else is resolved via the scoped
IServiceProvider
CommandContext
CommandContext provides access to:
- The currently running command
- Its result (usually set automatically)
IServiceScopefor the commandItems– a dictionary for storing data during executionOuterContextandOutermostContextfor nested commands
When commands call other commands, each gets its own CommandContext, but they share the same ServiceScope (unless you explicitly isolate them).
Ways to Run a Command
ICommander provides several methods via CommanderExt:
| Method | Returns | Behavior |
|---|---|---|
Call | Task<TResult> | Invokes command and returns result. Throws on error. |
Run | Task<CommandContext> | Invokes command and returns context. Never throws. |
Start | CommandContext | Fire-and-forget. Returns context immediately. |
All methods accept optional parameters:
bool isolate = false– if true, runs in a newExecutionContextwith no async localsCancellationToken cancellationToken = default
Command Services
The most powerful way to define handlers is via Command Services.
Note:
IComputeServiceextendsICommandService, so all compute services are automatically command services too. This means you can add[CommandHandler]methods to any compute service without additional setup.
public class RecSumCommandService : ICommandService
{
[CommandHandler] // Note that ICommandHandler<RecSumCommand, long> support isn't needed
public virtual async Task<long> RecSum( // Notice "public virtual"!
RecSumCommand command,
// You can't have any extra arguments here
CancellationToken cancellationToken = default)
{
if (command.Numbers.Length == 0)
return 0;
var head = command.Numbers[0];
var tail = command.Numbers[1..];
var context = CommandContext.GetCurrent();
var tailSum = await context.Commander.Call( // Invoke nested command through Commander
new RecSumCommand() { Numbers = tail },
cancellationToken);
return head + tailSum;
}
// This handler is associated with ANY command (ICommand)
// Priority = 10 means it runs earlier than any handler with the default priority 0
// IsFilter tells it triggers other handlers via InvokeRemainingHandlers
[CommandHandler(Priority = 10, IsFilter = true)]
protected virtual async Task DepthTracker(ICommand command, CancellationToken cancellationToken)
{
var context = CommandContext.GetCurrent();
var depth = 1 + (int) (context.Items["Depth"] ?? 0);
context.Items["Depth"] = depth;
WriteLine($"Depth via context.Items: {depth}");
await context.InvokeRemainingHandlers(cancellationToken).ConfigureAwait(false);
}
// Another filter for RecSumCommand
[CommandHandler(Priority = 9, IsFilter = true)]
protected virtual Task ArgumentWriter(RecSumCommand command, CancellationToken cancellationToken)
{
WriteLine($"Numbers: {command.Numbers.ToDelimitedString()}");
var context = CommandContext.GetCurrent();
return context.InvokeRemainingHandlers(cancellationToken);
}
}Register command services with AddService:
// Building IoC container
var serviceBuilder = new ServiceCollection();
var rpc = serviceBuilder.AddRpc();
var commanderBuilder = serviceBuilder.AddCommander()
.AddService<RecSumCommandService>(); // Such services are auto-registered as singletons
var services = serviceBuilder.BuildServiceProvider();
var commander = services.Commander();
var recSumService = services.GetRequiredService<RecSumCommandService>();
WriteLine(recSumService.GetType());
WriteLine(await commander.Call(new RecSumCommand() { Numbers = new [] { 1L, 2 }}));
WriteLine(await commander.Call(new RecSumCommand() { Numbers = new [] { 3L, 4 }}));Output:
ActualLabProxies.RecSumCommandServiceProxy
Depth via context.Items: 1
Numbers: 1, 2
Depth via context.Items: 1
Numbers: 2
Depth via context.Items: 1
Numbers:
3
...The proxy type prevents direct invocation of command handler methods. If you try to call a [CommandHandler] method directly, it throws NotSupportedException. All command handler methods must be invoked through ICommander.Call() – this ensures the full pipeline (filters, context, scoping) always runs.
Direct Calls Throw
// This throws NotSupportedException!
await recSumService.RecSum(new RecSumCommand { Numbers = [1, 2, 3] }, default);
// This works - always use Commander
await commander.Call(new RecSumCommand { Numbers = [1, 2, 3] });Note that each CommandContext has its own Items. To share data across nested calls, use context.OutermostContext.Items.
Filter Handlers
Filter handlers wrap subsequent handlers, like middleware:
[CommandHandler(Priority = 10, IsFilter = true)]
protected virtual async Task MyFilter(ICommand command, CancellationToken ct)
{
// Before
try {
await context.InvokeRemainingHandlers(ct);
}
finally {
// After
}
}Key points:
- Higher
Priority= runs earlier IsFilter = trueindicates this is a filter- Must call
InvokeRemainingHandlersto continue the pipeline - Can handle specific command types or
ICommandfor all commands
Learn More
- Command Interfaces – All command interfaces and tagging interfaces
- Built-in Handlers – Complete list of built-in handlers and their priorities
- MediatR Comparison – Mapping from MediatR concepts
- Cheat Sheet – Quick reference
Next Steps
CommandR is the foundation for Fusion's Operations Framework, which adds:
- Multi-host invalidation
- Operation logging and replay
- Transaction support with Entity Framework
