Excalibur.Dispatch.Transport.RabbitMQ
3.0.0-alpha.26
dotnet add package Excalibur.Dispatch.Transport.RabbitMQ --version 3.0.0-alpha.26
NuGet\Install-Package Excalibur.Dispatch.Transport.RabbitMQ -Version 3.0.0-alpha.26
<PackageReference Include="Excalibur.Dispatch.Transport.RabbitMQ" Version="3.0.0-alpha.26" />
<PackageVersion Include="Excalibur.Dispatch.Transport.RabbitMQ" Version="3.0.0-alpha.26" />
<PackageReference Include="Excalibur.Dispatch.Transport.RabbitMQ" />
paket add Excalibur.Dispatch.Transport.RabbitMQ --version 3.0.0-alpha.26
#r "nuget: Excalibur.Dispatch.Transport.RabbitMQ, 3.0.0-alpha.26"
#:package Excalibur.Dispatch.Transport.RabbitMQ@3.0.0-alpha.26
#addin nuget:?package=Excalibur.Dispatch.Transport.RabbitMQ&version=3.0.0-alpha.26&prerelease
#tool nuget:?package=Excalibur.Dispatch.Transport.RabbitMQ&version=3.0.0-alpha.26&prerelease
Excalibur.Dispatch.Transport.RabbitMQ
RabbitMQ transport implementation for the Excalibur framework, providing reliable message queuing with advanced features including dead letter handling, CloudEvents support, and automatic recovery.
Overview
This package provides RabbitMQ integration for Excalibur.Dispatch, enabling:
- Message Publishing & Consuming: Full support for exchanges, queues, and routing
- CloudEvents Support: DoD-compliant structured and binary mode CloudEvents
- Reliability Features: Dead letter queues, publisher confirms, automatic recovery
- Batching: Configurable batch processing for high-throughput scenarios
- Encryption: Optional message-level encryption support
Installation
dotnet add package Excalibur.Dispatch.Transport.RabbitMQ
Configuration
Connection Options
Using Connection String
services.AddRabbitMqMessageBus(options =>
{
options.ConnectionString = "amqp://user:password@localhost:5672/vhost";
});
Using Individual Properties
services.Configure<RabbitMqOptions>(options =>
{
options.ConnectionString = "amqp://localhost";
options.Exchange = "dispatch.events";
options.QueueName = "my-service-queue";
options.RoutingKey = "orders.*";
});
Environment Variables
Configure via environment variables for containerized deployments:
RABBITMQ__CONNECTIONSTRING=amqp://user:password@rabbitmq:5672/
RABBITMQ__EXCHANGE=dispatch.events
RABBITMQ__QUEUENAME=my-service-queue
services.Configure<RabbitMqOptions>(configuration.GetSection("RabbitMQ"));
Authentication
Username/Password (Connection String)
options.ConnectionString = "amqp://username:password@hostname:5672/vhost";
TLS/SSL Configuration
For production environments, enable TLS:
options.ConnectionString = "amqps://user:password@hostname:5671/";
Certificate-Based Authentication
When using client certificates, configure the connection factory directly through the RabbitMQ.Client library before registering services.
Message Configuration
Exchange and Queue Settings
services.Configure<RabbitMqOptions>(options =>
{
// Exchange configuration
options.Exchange = "dispatch.events";
// Queue configuration
options.QueueName = "order-processor";
options.QueueDurable = true; // Survive broker restart (default: true)
options.QueueExclusive = false; // Allow multiple consumers (default: false)
options.QueueAutoDelete = false; // Keep queue when consumers disconnect (default: false)
// Routing
options.RoutingKey = "orders.#"; // Wildcard routing pattern
});
Consumer Settings
services.Configure<RabbitMqOptions>(options =>
{
// Prefetch (QoS)
options.PrefetchCount = 100; // Messages to prefetch (default: 100)
options.PrefetchGlobal = false; // Per-consumer prefetch (default: false)
// Acknowledgment
options.AutoAck = false; // Manual acknowledgment (default: false)
options.RequeueOnReject = true; // Requeue rejected messages (default: true)
// Batching
options.MaxBatchSize = 50; // Max messages per batch (default: 50)
options.MaxBatchWaitMs = 500; // Max wait for batch (default: 500ms)
// Consumer identification
options.ConsumerTag = "order-service-1";
});
CloudEvents Support
Enable CloudEvents for interoperable event-driven architectures:
services.AddRabbitMqMessageBus(options =>
{
options.ConnectionString = "amqp://localhost";
options.EnableCloudEvents = true; // Default: true
// CloudEvents-specific settings
options.ExchangeType = ExchangeType.Topic;
options.Persistence = MessagePersistence.Persistent;
options.RoutingStrategy = RoutingStrategy.EventType;
});
For DoD-compliant validation:
services.AddRabbitMqCloudEventValidation(enableDoDCompliance: true);
Encryption
Enable message-level encryption for sensitive data:
services.Configure<RabbitMqOptions>(options =>
{
options.EnableEncryption = true;
});
Retry Policies
Dead Letter Queue Configuration
services.Configure<RabbitMqOptions>(options =>
{
// Enable dead letter handling
options.EnableDeadLetterExchange = true;
options.DeadLetterExchange = "dispatch.dlx";
options.DeadLetterRoutingKey = "failed.orders";
});
Connection Recovery
services.Configure<RabbitMqOptions>(options =>
{
// Connection resilience
options.ConnectionTimeoutSeconds = 30; // Connection timeout (default: 30)
options.AutomaticRecoveryEnabled = true; // Auto-reconnect (default: true)
options.NetworkRecoveryIntervalSeconds = 10; // Recovery interval (default: 10)
});
Health Checks
Registration
The transport implements ITransportHealthChecker for integration with ASP.NET Core health checks:
services.AddHealthChecks()
.AddCheck<RabbitMqHealthCheck>("rabbitmq", tags: new[] { "ready", "messaging" });
Configuration
Configure health check behavior:
services.Configure<RabbitMqHealthCheckOptions>(options =>
{
options.Timeout = TimeSpan.FromSeconds(5);
options.IncludeQueueMetrics = true;
});
Custom Health Check Implementation
public class RabbitMqHealthCheck : IHealthCheck
{
private readonly ITransportHealthChecker _healthChecker;
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
var result = await _healthChecker.CheckQuickHealthAsync(cancellationToken);
return result.Status switch
{
TransportHealthStatus.Healthy => HealthCheckResult.Healthy(),
TransportHealthStatus.Degraded => HealthCheckResult.Degraded(result.Description),
_ => HealthCheckResult.Unhealthy(result.Description)
};
}
}
Production Considerations
Scaling
Horizontal Scaling
- Use competing consumers pattern with shared queue name
- Set
QueueExclusive = falseto allow multiple consumers - Adjust
PrefetchCountbased on processing time (lower for slow consumers)
High Availability
- Deploy RabbitMQ in cluster mode with mirrored queues
- Use
QueueDurable = truefor message persistence - Enable
AutomaticRecoveryEnabledfor automatic reconnection
Performance Tuning
services.Configure<RabbitMqOptions>(options =>
{
// High-throughput configuration
options.PrefetchCount = 250; // Increase for fast processors
options.MaxBatchSize = 100; // Larger batches
options.MaxBatchWaitMs = 100; // Shorter wait times
options.AutoAck = false; // Keep manual ack for reliability
});
Monitoring and Alerting
Key metrics to monitor:
| Metric | Description | Alert Threshold |
|---|---|---|
| Queue Depth | Messages waiting | > 10,000 |
| Consumer Utilization | Active consumers | < 1 |
| Message Rate | Messages/second | Baseline deviation |
| Unacked Messages | Pending acknowledgments | > PrefetchCount × 2 |
Security Best Practices
- Use TLS (
amqps://) in production - Rotate credentials regularly using environment variables
- Limit permissions per virtual host and user
- Enable encryption for sensitive payloads
- Use separate virtual hosts for different environments
Troubleshooting
Common Issues
Connection Refused
RabbitMQ.Client.Exceptions.BrokerUnreachableException: None of the specified endpoints were reachable
Solutions:
- Verify RabbitMQ is running:
rabbitmqctl status - Check hostname/port in connection string
- Verify firewall allows port 5672 (or 5671 for TLS)
- Confirm credentials are correct
Authentication Failed
RabbitMQ.Client.Exceptions.AuthenticationFailureException: ACCESS_REFUSED
Solutions:
- Verify username/password
- Check virtual host permissions:
rabbitmqctl list_permissions -p /vhost - Ensure user has access to the virtual host
Queue Not Found
RabbitMQ.Client.Exceptions.OperationInterruptedException: NOT_FOUND - no queue
Solutions:
- Queue may not be declared; enable auto-declaration
- Check queue name spelling
- Verify the queue exists:
rabbitmqctl list_queues
Message Redelivery Loop
Messages continuously redelivered without processing.
Solutions:
- Check for exceptions in message handler
- Verify
RequeueOnRejectsetting matches desired behavior - Configure dead letter queue to capture failed messages
- Review
PrefetchCountto avoid overwhelming consumers
Logging Configuration
Enable detailed logging for troubleshooting:
{
"Logging": {
"LogLevel": {
"Excalibur.Dispatch.Transport.RabbitMQ": "Debug",
"RabbitMQ.Client": "Warning"
}
}
}
Debug Tips
- Enable RabbitMQ Management Plugin: Access web UI at
http://localhost:15672 - Monitor connections:
rabbitmqctl list_connections - Check channel status:
rabbitmqctl list_channels - View queue bindings:
rabbitmqctl list_bindings - Trace messages: Enable RabbitMQ Firehose tracer for message inspection
Complete Configuration Reference
services.Configure<RabbitMqOptions>(options =>
{
// Connection
options.ConnectionString = "amqp://user:pass@localhost:5672/";
options.ConnectionTimeoutSeconds = 30;
options.AutomaticRecoveryEnabled = true;
options.NetworkRecoveryIntervalSeconds = 10;
// Exchange
options.Exchange = "dispatch.events";
// Queue
options.QueueName = "my-service";
options.QueueDurable = true;
options.QueueExclusive = false;
options.QueueAutoDelete = false;
options.QueueArguments = new Dictionary<string, object>
{
["x-message-ttl"] = 86400000, // 24 hours
["x-max-length"] = 100000
};
// Routing
options.RoutingKey = "orders.#";
// Consumer
options.PrefetchCount = 100;
options.PrefetchGlobal = false;
options.AutoAck = false;
options.RequeueOnReject = true;
options.ConsumerTag = "order-processor-1";
// Batching
options.MaxBatchSize = 50;
options.MaxBatchWaitMs = 500;
// Dead Letter
options.EnableDeadLetterExchange = true;
options.DeadLetterExchange = "dispatch.dlx";
options.DeadLetterRoutingKey = "failed";
// Security
options.EnableEncryption = false;
});
See Also
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- CloudNative.CloudEvents (>= 2.8.0)
- CloudNative.CloudEvents.SystemTextJson (>= 2.8.0)
- Consul (>= 1.7.14.9)
- Cronos (>= 0.11.1)
- Excalibur.Dispatch (>= 3.0.0-alpha.26)
- Excalibur.Dispatch.Abstractions (>= 3.0.0-alpha.26)
- Excalibur.Dispatch.Transport.Abstractions (>= 3.0.0-alpha.26)
- Grpc.Net.Client (>= 2.71.0)
- KubernetesClient (>= 17.0.14)
- Medo.Uuid7 (>= 1.4.0)
- MemoryPack (>= 1.21.4)
- Microsoft.ApplicationInsights (>= 2.23.0)
- Microsoft.AspNetCore.Authorization (>= 9.0.9)
- Microsoft.Extensions.Configuration (>= 10.0.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 10.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Http (>= 10.0.0)
- Microsoft.Extensions.Logging (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
- Microsoft.Extensions.ObjectPool (>= 10.0.0)
- Microsoft.Extensions.Options (>= 10.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.0)
- Microsoft.Extensions.Options.DataAnnotations (>= 10.0.0)
- OpenTelemetry.Api (>= 1.13.0)
- Polly (>= 8.6.4)
- RabbitMQ.Client (>= 7.2.0)
- System.Threading.RateLimiting (>= 10.0.0)
- YamlDotNet (>= 16.3.0)
-
net8.0
- CloudNative.CloudEvents (>= 2.8.0)
- CloudNative.CloudEvents.SystemTextJson (>= 2.8.0)
- Consul (>= 1.7.14.9)
- Cronos (>= 0.11.1)
- Excalibur.Dispatch (>= 3.0.0-alpha.26)
- Excalibur.Dispatch.Abstractions (>= 3.0.0-alpha.26)
- Excalibur.Dispatch.Transport.Abstractions (>= 3.0.0-alpha.26)
- Grpc.Net.Client (>= 2.71.0)
- KubernetesClient (>= 17.0.14)
- Medo.Uuid7 (>= 1.4.0)
- MemoryPack (>= 1.21.4)
- Microsoft.ApplicationInsights (>= 2.23.0)
- Microsoft.AspNetCore.Authorization (>= 9.0.9)
- Microsoft.Extensions.Configuration (>= 10.0.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 10.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Http (>= 10.0.0)
- Microsoft.Extensions.Logging (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
- Microsoft.Extensions.ObjectPool (>= 10.0.0)
- Microsoft.Extensions.Options (>= 10.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.0)
- Microsoft.Extensions.Options.DataAnnotations (>= 10.0.0)
- OpenTelemetry.Api (>= 1.13.0)
- Polly (>= 8.6.4)
- RabbitMQ.Client (>= 7.2.0)
- System.Diagnostics.DiagnosticSource (>= 10.0.0)
- System.IO.Pipelines (>= 10.0.0)
- System.Text.Json (>= 10.0.0)
- System.Threading.Channels (>= 10.0.0)
- System.Threading.RateLimiting (>= 10.0.0)
- YamlDotNet (>= 16.3.0)
-
net9.0
- CloudNative.CloudEvents (>= 2.8.0)
- CloudNative.CloudEvents.SystemTextJson (>= 2.8.0)
- Consul (>= 1.7.14.9)
- Cronos (>= 0.11.1)
- Excalibur.Dispatch (>= 3.0.0-alpha.26)
- Excalibur.Dispatch.Abstractions (>= 3.0.0-alpha.26)
- Excalibur.Dispatch.Transport.Abstractions (>= 3.0.0-alpha.26)
- Grpc.Net.Client (>= 2.71.0)
- KubernetesClient (>= 17.0.14)
- Medo.Uuid7 (>= 1.4.0)
- MemoryPack (>= 1.21.4)
- Microsoft.ApplicationInsights (>= 2.23.0)
- Microsoft.AspNetCore.Authorization (>= 9.0.9)
- Microsoft.Extensions.Configuration (>= 10.0.0)
- Microsoft.Extensions.Configuration.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks (>= 10.0.0)
- Microsoft.Extensions.Diagnostics.HealthChecks.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Http (>= 10.0.0)
- Microsoft.Extensions.Logging (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
- Microsoft.Extensions.ObjectPool (>= 10.0.0)
- Microsoft.Extensions.Options (>= 10.0.0)
- Microsoft.Extensions.Options.ConfigurationExtensions (>= 10.0.0)
- Microsoft.Extensions.Options.DataAnnotations (>= 10.0.0)
- OpenTelemetry.Api (>= 1.13.0)
- Polly (>= 8.6.4)
- RabbitMQ.Client (>= 7.2.0)
- System.Diagnostics.DiagnosticSource (>= 10.0.0)
- System.IO.Pipelines (>= 10.0.0)
- System.Text.Json (>= 10.0.0)
- System.Threading.Channels (>= 10.0.0)
- System.Threading.RateLimiting (>= 10.0.0)
- YamlDotNet (>= 16.3.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 3.0.0-alpha.26 | 31 | 3/5/2026 |
| 3.0.0-alpha.19 | 46 | 2/26/2026 |