Skip to content

Operations Framework: Multi-Host Invalidation, CQRS, and Reliable Command Processing

The Operations Framework (OF) provides a robust foundation for building distributed systems with Fusion. It solves several critical challenges that arise when running multiple instances of an application:

  • Multi-host cache invalidation: When data changes on one server, all other servers must invalidate their cached computed values
  • Reliable command processing: Commands must be executed exactly once, even in the face of failures
  • Event-driven architecture: Commands can produce events that are processed asynchronously with guaranteed delivery

Why Do You Need Operations Framework?

Consider a typical multi-server deployment:

When a user on Server A updates their profile:

  1. Server A writes to the database and invalidates its local cache
  2. Servers B and C still have stale data in their caches
  3. Users connected to B and C see outdated information

Without Operations Framework, you'd have to implement:

  • A message queue or pub/sub system for cross-server notifications
  • Retry logic for failed operations
  • Deduplication to prevent processing the same operation twice
  • Transaction handling to ensure atomicity

With Operations Framework, all of this is handled automatically.

Required Packages

PackagePurpose
ActualLab.FusionCore OF abstractions and in-memory implementation
ActualLab.Fusion.EntityFrameworkEF Core implementation: DbOperationScope, operation logging, DbContext integration
ActualLab.Fusion.EntityFramework.NpgsqlPostgreSQL: NpgsqlOperationLogWatcher for LISTEN/NOTIFY
ActualLab.Fusion.EntityFramework.RedisRedis: RedisOperationLogWatcher for pub/sub notifications

TIP

The base ActualLab.Fusion.EntityFramework package includes FileSystemOperationLogWatcher which works with any database but requires shared filesystem access. For production multi-host deployments, use database-specific watchers (Npgsql) or Redis.

The Outbox Pattern

Operations Framework implements the Transactional Outbox Pattern – a well-known solution for reliable messaging in distributed systems.

The Problem

In distributed systems, you often need to:

  1. Update your database
  2. Publish a message/event to notify other services

But what if step 2 fails after step 1 succeeds? You have inconsistent state.

The Solution: Outbox Pattern

Instead of publishing directly, write the message to an "outbox" table in the same transaction as your business data:

This guarantees at-least-once delivery: if the transaction commits, the operation will eventually be processed. If it fails, nothing is written.

How OF Implements It

  1. DbOperationScope wraps your command in a database transaction
  2. DbOperation entity stores the operation in the same transaction
  3. DbOperationLogReader (background service) watches for new operations
  4. Operation Log Watchers provide instant notifications (PostgreSQL NOTIFY, Redis Pub/Sub, etc.)
  5. OperationCompletionNotifier triggers invalidation on all hosts

Core Concepts

Operation

An Operation represents an action that can be logged and replayed. Currently, only commands act as operations, but the framework is designed to support other types in the future.

Key properties:

  • Uuid – Unique identifier
  • HostId – The server that executed the operation
  • Command – The command that was executed
  • Items – Data passed between execution and invalidation phases
  • NestedOperations – Child operations executed during this operation
  • Events – Events produced by this operation

Operation Scope

An Operation Scope provides the context for operation execution:

  • DbOperationScope: Persistent operations stored in database (default for database commands)
  • InMemoryOperationScope: Transient operations that don't persist (for in-memory commands)

Invalidation Mode

When an operation is "replayed" on other hosts, it runs in invalidation mode:

  • The command handler's main logic is skipped
  • Only the invalidation block executes
  • This ensures all hosts invalidate the same computed values

Quick Start

1. Add DbSet for Operations

cs
public DbSet<DbOperation> Operations { get; protected set; } = null!;
public DbSet<DbEvent> Events { get; protected set; } = null!;

2. Configure Services

cs
public static void ConfigureServices(IServiceCollection services, IHostEnvironment Env)
{
    services.AddDbContextServices<AppDbContext>(db => {
        // Uncomment if you'll be using AddRedisOperationLogWatcher
        // db.AddRedisDb("localhost", "FusionDocumentation.PartO");

        db.AddOperations(operations => {
            // This call enabled Operations Framework (OF) for AppDbContext.
            operations.ConfigureOperationLogReader(_ => new() {
                // We use AddFileSystemOperationLogWatcher, so unconditional wake up period
                // can be arbitrary long – all depends on the reliability of Notifier-Monitor chain.
                // See what .ToRandom does – most of timeouts in Fusion settings are RandomTimeSpan-s,
                // but you can provide a normal one too – there is an implicit conversion from it.
                CheckPeriod = TimeSpan.FromSeconds(Env.IsDevelopment() ? 60 : 5).ToRandom(0.05),
            });
            // Optionally enable file-based operation log watcher
            operations.AddFileSystemOperationLogWatcher();

            // Or, if you use PostgreSQL, use this instead of above line
            // operations.AddNpgsqlOperationLogWatcher();

            // Or, if you use Redis, use this instead of above line
            // operations.AddRedisOperationLogWatcher();
        });
    });
}

Note: OF works solely on the server side, so you don't need similar configuration in your Blazor WebAssembly client.

3. Create Command and Handler

cs
public record PostMessageCommand(Session Session, string Text) : ICommand<ChatMessage>;
cs
[CommandHandler]
public virtual async Task<ChatMessage> PostMessage(
    PostMessageCommand command, CancellationToken cancellationToken = default)
{
    if (Invalidation.IsActive) {
        _ = PseudoGetAnyChatTail();
        return default!;
    }

    await using var dbContext = await DbHub.CreateOperationDbContext(cancellationToken);
    // Actual code...
    var message = await PostMessageImpl(dbContext, command, cancellationToken);
    return message;
}

Command Handler Structure

A command handler with Operations Framework follows this pattern:

cs
[CommandHandler]
public virtual async Task<TResult> HandleCommand(
    TCommand command, CancellationToken cancellationToken = default)
{
    // 1. INVALIDATION BLOCK - runs on ALL hosts after successful execution
    if (Invalidation.IsActive) {
        // Invalidate computed values that depend on the data being changed
        _ = GetSomeData(command.Id, default);
        _ = GetRelatedData(command.RelatedId, default);
        return default!;  // Return value is ignored in invalidation mode
    }

    // 2. MAIN LOGIC - runs only on the originating host
    await using var dbContext = await DbHub.CreateOperationDbContext(cancellationToken);

    // Perform your business logic
    var result = await DoWork(dbContext, command, cancellationToken);

    await dbContext.SaveChangesAsync(cancellationToken);
    return result;
}

Key Points

  1. virtual modifier – Required for Fusion's proxy generation
  2. [CommandHandler] attribute – Registers this method as a command handler
  3. Invalidation.IsActive check – First thing in the method
  4. CreateOperationDbContext – Creates a DbContext that participates in the operation scope

Passing Data to Invalidation Block

The invalidation block runs on all hosts, but the main logic only runs on the originating host. To pass data from main logic to invalidation, use Operation.Items:

cs
public virtual async Task SignOut(
    SignOutCommand command, CancellationToken cancellationToken = default)
{
    // ...
    var context = CommandContext.GetCurrent();
    if (Invalidation.IsActive) {
        // Fetch operation item
        var invSessionInfo = context.Operation.Items.KeylessGet<SessionInfo>();
        if (invSessionInfo is not null) {
            // Use it
            _ = GetUser(invSessionInfo.UserId, default);
            _ = GetUserSessions(invSessionInfo.UserId, default);
        }
        return;
    }

    await using var dbContext = await DbHub.CreateOperationDbContext(cancellationToken).ConfigureAwait(false);

    var dbSessionInfo = await Sessions.FindOrCreate(dbContext, command.Session, cancellationToken).ConfigureAwait(false);
    var sessionInfo = dbSessionInfo.ToModel();
    if (sessionInfo.IsSignOutForced)
        return;

    // Store operation item for invalidation logic
    context.Operation.Items.KeylessSet(sessionInfo);
    // ...
}

How It Works

  1. During execution: Store data with context.Operation.Items.KeylessSet(value)
  2. During invalidation: Retrieve data with context.Operation.Items.KeylessGet<T>()
  3. Serialization: Items are JSON-serialized and stored with the operation in the database

Note: Operation.Items differs from CommandContext.Items:

  • CommandContext.Items exists only during command execution on the originating host
  • Operation.Items is persisted and available on all hosts during invalidation

Nested Commands

When one command calls another, the nested command is automatically logged and its invalidation logic runs on all hosts:

cs
[CommandHandler]
public virtual async Task<Order> CreateOrder(
    CreateOrderCommand command, CancellationToken cancellationToken = default)
{
    if (Invalidation.IsActive) {
        _ = GetOrder(command.OrderId, default);
        return default!;
    }

    await using var dbContext = await DbHub.CreateOperationDbContext(cancellationToken);

    var order = new Order { /* ... */ };
    dbContext.Orders.Add(order);
    await dbContext.SaveChangesAsync(cancellationToken);

    // This nested command is automatically logged
    await Commander.Call(new SendOrderConfirmationCommand(order.Id), cancellationToken);

    return order;
}

The nested command's Operation.Items are captured independently, so there's no collision with the parent command's items.

Command Pipeline

Operations Framework adds several filtering handlers to the command pipeline:

PriorityHandlerPurpose
11,000NestedOperationLoggerLogs nested commands and their items
10,000InMemoryOperationScopeProviderProvides transient scope, runs completion
1,000DbOperationScopeProvider<T>Provides database scope for each DbContext type
100InvalidatingCommandCompletionHandlerRuns invalidation for completed operations

Backend Commands

Commands that should only execute on the server should implement IBackendCommand:

cs
public record DeleteUserCommand(UserId UserId) : ICommand<Unit>, IBackendCommand;

This ensures:

  • The command can only be processed by backend servers
  • Client-side proxies won't attempt to handle it
  • RPC layer enforces server-side execution

Further Reading

Learning More

To explore OF's internals, check out:

HostId

HostId identifies each process in your cluster. It includes:

  • Machine name
  • Unique process ID
  • Unique ID per IoC container (useful for testing)

This allows OF to determine if an operation originated locally or from a peer.

InvalidatingCommandCompletionHandler

The logic that determines whether a command requires invalidation is in InvalidatingCommandCompletionHandler.IsRequired(). It returns true for any command with a final handler whose service implements IComputeService, but not for compute service clients (when RpcServiceMode.Client is set).

Getting Help

If you run into issues, join Fusion Place and ask questions. The author (Alex Y.) is active and happy to help.