Baubit.Mediation
2026.4.1
dotnet add package Baubit.Mediation --version 2026.4.1
NuGet\Install-Package Baubit.Mediation -Version 2026.4.1
<PackageReference Include="Baubit.Mediation" Version="2026.4.1" />
<PackageVersion Include="Baubit.Mediation" Version="2026.4.1" />
<PackageReference Include="Baubit.Mediation" />
paket add Baubit.Mediation --version 2026.4.1
#r "nuget: Baubit.Mediation, 2026.4.1"
#:package Baubit.Mediation@2026.4.1
#addin nuget:?package=Baubit.Mediation&version=2026.4.1
#tool nuget:?package=Baubit.Mediation&version=2026.4.1
Baubit.Mediation
A lightweight mediator pattern with cache-backed async request/response routing, smoothing out producer backpressure by buffering messages for consumers that process at different rates.<br/><br/>
DI extension: Baubit.Mediation.DI
For persisted mediation: Baubit.Caching.LiteDB
Installation
dotnet add package Baubit.Mediation
Quick Start
using Baubit.Mediation;
using Baubit.Caching;
using Baubit.Caching.InMemory;
using Microsoft.Extensions.Logging;
using System.Threading;
// Create dependencies
var loggerFactory = LoggerFactory.Create(b => b.AddConsole());
var configuration = new Baubit.Caching.Configuration();
Func<long?, long?> nextIdFactory = (lastId) => Interlocked.Increment(ref lastId ?? 0);
var store = new Baubit.Caching.InMemory.Store<long, object>(null, null, nextIdFactory, loggerFactory);
var metadata = new Baubit.Caching.InMemory.Metadata<long>(configuration, loggerFactory);
var cache = new Baubit.Caching.OrderedCache<long, object>(configuration, null, store, metadata, loggerFactory);
// Create mediator
var mediator = new Mediator(cache, loggerFactory);
// Define request/response
public class GetUserRequest : IRequest<GetUserResponse>
{
public int UserId { get; set; }
}
public class GetUserResponse : IResponse
{
public string Name { get; set; }
}
// Define handler
public class GetUserHandler : IRequestHandler<GetUserRequest, GetUserResponse>
{
public GetUserResponse Handle(GetUserRequest request, CancellationToken cancellationToken = default)
{
return new GetUserResponse { Name = $"User {request.UserId}" };
}
}
// Register handler and publish request
using var cts = new CancellationTokenSource();
_ = mediator.SubscribeAsync<GetUserRequest, GetUserResponse>(new GetUserHandler(), true, cts.Token);
var response = await mediator.PublishAsync<GetUserRequest, GetUserResponse>(new GetUserRequest { UserId = 1 });
Console.WriteLine(response.Name); // "User 1"
Features
- Asynchronous request/response handling with optional buffering
- Cache-backed async processing pipeline
- Notification pub/sub with typed subscribers
- Notification and request delivery with buffering control
- Buffered mode (
enableBuffering: true): Messages pass through an ordered cache before delivery. Useful when handlers are required to process events in the order of occurrence and/or the system requires durability/rewind-replay (look at Baubit.Caching.LiteDB for persistence) - Unbuffered mode (
enableBuffering: false): Messages delivered directly to handlers for low-latency processing
- Buffered mode (
- Handler registration with cancellation token lifecycle
- Thread-safe concurrent access
- Function-based handler subscriptions
API Reference
IMediator
| Method | Description |
|---|---|
Publish<T>(notification, cancellationToken) |
Publish a notification synchronously to subscribers. Checks cancellation before delivering to each subscriber. |
PublishAsync<T>(notification, cancellationToken) |
Publish a notification asynchronously (fire and forget). Passes cancellation token to Publish. |
PublishAsync<TRequest, TResponse>(request, cancellationToken) |
Publish a request and await response from registered handler. Cancellation token is monitored during processing. |
SubscribeAsync<T>(subscriber, enableBuffering, cancellationToken) |
Subscribe to notifications with ISubscriber<T>. Cancellation token ends subscription. |
SubscribeAsync<T>(subscriber, enableBuffering, name, cancellationToken) |
Subscribe to notifications with named cache enumerator. |
SubscribeAsync<TRequest, TResponse>(handler, enableBuffering, cancellationToken) |
Register request handler with IRequestHandler<TRequest, TResponse>. |
SubscribeAsync<TRequest, TResponse>(handler, enableBuffering, name, cancellationToken) |
Register request handler with named cache enumerator. |
SubscribeAsync<TRequest, TResponse>(handler, enableBuffering, cancellationToken) |
Register async request handler with IAsyncRequestHandler<TRequest, TResponse>. |
SubscribeAsync<TRequest, TResponse>(handler, enableBuffering, name, cancellationToken) |
Register async request handler with named cache enumerator. |
SubscribeAsync<TNotification>(func, enableBuffering, cancellationToken) |
Subscribe to notifications using function handler. Function receives cancellation token. |
SubscribeAsync<TNotification>(func, enableBuffering, name, cancellationToken) |
Subscribe to notifications using function handler with named enumerator. |
SubscribeAsync<TRequest, TResponse>(func, enableBuffering, cancellationToken) |
Register async request handler using function. Function receives cancellation token. |
SubscribeAsync<TRequest, TResponse>(func, enableBuffering, name, cancellationToken) |
Register async request handler using function with named enumerator. |
Handler Interfaces
IRequestHandler<TRequest, TResponse>- Synchronous handler withHandle(TRequest, CancellationToken)method. Receives subscription's cancellation token.IAsyncRequestHandler<TRequest, TResponse>- Asynchronous handler withHandleAsync(TRequest, CancellationToken)method. Receives subscription's cancellation token.ISubscriber<T>- Notification subscriber withOnNext(T, CancellationToken)method. Receives subscription's cancellation token.
Usage Examples
Notification Aggregation with Caching (Buffering)
When enableBuffering is true (default), notifications are persisted to the cache before delivery. This enables message replay, durability, and distributed pub/sub capabilities backed by Baubit.Caching.
// Define notification type
public class OrderCreated
{
public int OrderId { get; set; }
public decimal Amount { get; set; }
}
// Define subscriber
public class OrderNotificationSubscriber : ISubscriber<OrderCreated>
{
public bool OnNext(OrderCreated notification, CancellationToken cancellationToken = default)
{
Console.WriteLine($"Order {notification.OrderId} created: ${notification.Amount}");
return true;
}
public bool OnError(Exception error)
{
Console.WriteLine($"Error: {error.Message}");
return true;
}
public bool OnCompleted() => true;
public void Dispose() { }
}
// Setup mediator
var loggerFactory = LoggerFactory.Create(b => b.AddConsole());
var configuration = new Baubit.Caching.Configuration();
Func<long?, long?> nextIdFactory = (lastId) => Interlocked.Increment(ref lastId ?? 0);
var store = new Baubit.Caching.InMemory.Store<long, object>(null, null, nextIdFactory, loggerFactory);
var metadata = new Baubit.Caching.InMemory.Metadata<long>(configuration, loggerFactory);
var cache = new Baubit.Caching.OrderedCache<long, object>(configuration, null, store, metadata, loggerFactory);
var mediator = new Mediator(cache, loggerFactory);
using var cts = new CancellationTokenSource();
// Subscribe with buffering enabled (default)
var subscriber = new OrderNotificationSubscriber();
var subscribeTask = mediator.SubscribeAsync(subscriber, enableBuffering: true, cts.Token);
// Publish notification - stored in cache then delivered
mediator.Publish(new OrderCreated { OrderId = 1, Amount = 99.99m });
// Notifications persist in cache for replay or distributed scenarios
Console.WriteLine($"Cached notifications: {cache.Count}");
Notification Aggregation without Caching (Direct Delivery)
When enableBuffering is false, notifications bypass the cache and are delivered directly to subscribers. This provides minimal latency for scenarios where persistence is not required.
// Same notification and subscriber types as above
// Subscribe with buffering disabled
var subscriber = new OrderNotificationSubscriber();
var subscribeTask = mediator.SubscribeAsync(subscriber, enableBuffering: false, cts.Token);
// Publish notification - delivered directly without caching
mediator.Publish(new OrderCreated { OrderId = 2, Amount = 149.99m });
// No caching overhead - immediate delivery
Console.WriteLine($"Cached notifications: {cache.Count}"); // 0
Mixed Buffering Scenarios
Different subscribers can use different buffering strategies for the same notification type:
var bufferedSubscriber = new OrderNotificationSubscriber();
var directSubscriber = new OrderNotificationSubscriber();
// One subscriber with caching, one without
var bufferedTask = mediator.SubscribeAsync(bufferedSubscriber, enableBuffering: true, cts.Token);
var directTask = mediator.SubscribeAsync(directSubscriber, enableBuffering: false, cts.Token);
// Publish once - buffered subscriber gets it from cache, direct subscriber gets immediate delivery
mediator.Publish(new OrderCreated { OrderId = 3, Amount = 199.99m });
// Both subscribers receive the notification via their preferred delivery mechanism
Request/Response Mediation
// Publish request asynchronously and await response
var response = await mediator.PublishAsync<GetUserRequest, GetUserResponse>(
new GetUserRequest { UserId = 1 }
);
// All request handling is asynchronous
// Buffered mode (enableBuffering: true) tracks request/response through cache
// Unbuffered mode (enableBuffering: false) delivers directly to handler
Function-Based Subscriptions
For scenarios where creating a full handler class is unnecessary, use function-based subscriptions:
Notification Handler Functions
// Subscribe to notifications using a function handler
using var cts = new CancellationTokenSource();
var subscribeTask = mediator.SubscribeAsync<OrderCreated>(
async (notification, ct) =>
{
Console.WriteLine($"Order {notification.OrderId} received");
await ProcessOrderAsync(notification, ct);
return true;
},
enableBuffering: true,
cts.Token
);
// Publish notifications
mediator.Publish(new OrderCreated { OrderId = 1, Amount = 99.99m });
mediator.Publish(new OrderCreated { OrderId = 2, Amount = 149.99m });
// Cancel subscription when done
cts.Cancel();
Async Request Handler Functions
// Subscribe to requests using a function handler
using var cts = new CancellationTokenSource();
var subscribeTask = mediator.SubscribeAsync<GetUserRequest, GetUserResponse>(
async (request, ct) =>
{
var user = await database.GetUserAsync(request.UserId, ct);
return new GetUserResponse { Name = user.Name };
},
enableBuffering: true,
cts.Token
);
// Publish async request - function handler processes it
var response = await mediator.PublishAsync<GetUserRequest, GetUserResponse>(
new GetUserRequest { UserId = 1 },
CancellationToken.None
);
// Cancel subscription when done
cts.Cancel();
Architecture Notes
MediatR vs Baubit.Mediation:
- MediatR: Offers built-in pipeline behaviors optimized for in-memory processing
- Baubit.Mediation: Expects pipelines to be built outside of its knowledge, focusing on cache-backed durability and distributed messaging
Cache-Backed Async Mediation:
Baubit.Mediation is powered by Baubit.Caching, a high-performance hybrid cache. Baubit.Caching is being extended to support distributed systems - once complete, Baubit.Mediation will natively support distributed mediation scenarios.
License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
| .NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen40 was computed. tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Baubit.Caching (>= 2026.4.1)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Baubit.Mediation:
| Package | Downloads |
|---|---|
|
Baubit.Mediation.DI
DI support for Baubit.Mediation |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 2026.4.1 | 144 | 1/25/2026 |
| 2026.3.1-prerelease | 141 | 1/13/2026 |
| 2026.1.2-prerelease | 139 | 1/4/2026 |
| 2026.1.1 | 197 | 12/31/2025 |
| 2026.1.1-prerelease | 97 | 12/31/2025 |
| 2025.51.2 | 273 | 12/19/2025 |
| 2025.51.1 | 320 | 12/19/2025 |
| 2025.49.1 | 741 | 12/1/2025 |
| 2025.48.4 | 131 | 11/28/2025 |
| 2025.48.1 | 196 | 11/27/2025 |