FastCSharp6.RabbitSubscriber
2.1.0
dotnet add package FastCSharp6.RabbitSubscriber --version 2.1.0
NuGet\Install-Package FastCSharp6.RabbitSubscriber -Version 2.1.0
<PackageReference Include="FastCSharp6.RabbitSubscriber" Version="2.1.0" />
paket add FastCSharp6.RabbitSubscriber --version 2.1.0
#r "nuget: FastCSharp6.RabbitSubscriber, 2.1.0"
// Install FastCSharp6.RabbitSubscriber as a Cake Addin #addin nuget:?package=FastCSharp6.RabbitSubscriber&version=2.1.0 // Install FastCSharp6.RabbitSubscriber as a Cake Tool #tool nuget:?package=FastCSharp6.RabbitSubscriber&version=2.1.0
FastCSharp's RabbitMQ Subscriber
RabbitSubscriber provides a simple approach for subscribing to a RabbitMQ queue.
It is a wrapper around the RabbitMQ.Client library.
Usage
All you need to do is create a new subscriber to an existing queue and register a callback.
Minimal Example (Check out BasicSubscriber project at FastCSharp.TestRabbitImpl for more complex project examples)
Create a new minimal console project.
dotnet new console -o BasicSubscriber
cd .\BasicSubscriber\
dotnet add package FastCSharp.RabbitSubscriber
dotnet add package Microsoft.Extensions.Configuration.Json
dotnet add package Microsoft.Extensions.Logging.Console
Open the BasicSubscriber.csproj
file and add the following configuration.
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
Create a json file named appsettings.json
and add the following configuration.
{
"RabbitSubscriberConfig" :
{
"HostName" : "localhost",
"Port" : 5672,
"VirtualHost": "test-vhost",
"UserName" : "guest",
"Password" : "guest",
"HeartbeatTimeout" : "00:00:20",
"Queues" :
{
"DIRECT_QUEUE" :
{
"Name":"test.direct.q",
"PrefetchCount":1,
"PrefetchSize":0
}
}
}
}
Go to the Rabbit Management UI and create a new queue named test.direct.q
. You may bind it to the amq.direct
exchange if you want to publish messages with a Publisher.
Open the Program.cs
file and replace the code with the one below.
using FastCSharp.RabbitSubscriber;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
ILoggerFactory loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("Program");
IConfiguration defaultConfiguration = new ConfigurationBuilder()
.AddJsonFile("rabbitsettings.CLUSTER.json", true, true)
.Build();
var subscriberFactory = new RabbitSubscriberFactory(defaultConfiguration, loggerFactory);
using var directSubscriber = subscriberFactory.NewSubscriber<string>("DIRECT_QUEUE");
directSubscriber.Register(async (message) =>
{
logger.LogInformation($"Received {message}");
return await Task.FromResult(true);
});
logger.LogInformation(" Press [enter] to exit.");
Console.ReadLine();
All set. Run the project:
dotnet run
Now you can go to the Rabbit Management UI and publish a message to the test.direct.q
queue. Remember to enclose the message in double quotes (e.g. "Hello World"
).
Adding a Circuit Breaker
Add the following packages to the project:
dotnet add package FastCSharp.CircuitBreaker
Open the Program.cs
file and replace the code with the one below.
using FastCSharp.CircuitBreaker;
using FastCSharp.RabbitSubscriber;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
ILoggerFactory loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger("Program");
IConfiguration defaultConfiguration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", true, true)
.Build();
var circuit = new EventDrivenCircuitBreaker(
new ConsecutiveFailuresBreakerStrategy(
5,
new FixedBackoff(new TimeSpan(0, 0, 0, 0, 5))));
var subscriberFactory = new RabbitSubscriberFactory(defaultConfiguration, loggerFactory);
using var directSubscriber = subscriberFactory.NewSubscriber<string>("DIRECT_QUEUE");
circuit.OnOpen += (sender) => directSubscriber.UnSubscribe();
circuit.OnReset += (sender) => directSubscriber.Reset();
directSubscriber.Register(async (message) =>
{
return await circuit.Wrap(async () =>
{
logger.LogInformation($"Received {message}");
return await Task.FromResult(true);
});
});
logger.LogInformation(" Press [enter] to exit.");
Console.ReadLine();
Breaking down subscriber flux control
The subscriber can be stopped by calling:
subscriber.UnSubscribe();
This is a useful callback when a circuit breaker is triggered and the OnOpen
or OnBreak
events are fired.
The subscriber can be reset by calling:
subscriber.Reset();
This is a useful callback when a circuit breaker is triggered and the OnReset
event is fired.
Check the FastCSharp.CircuitBreaker package for more information on circuit breakers.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
-
net6.0
- FastCSharp6.Common (>= 0.2.0)
- FastCSharp6.RabbitCommon (>= 1.1.0)
- FastCSharp6.Subscriber (>= 2.1.0)
- Microsoft.Extensions.Configuration (>= 7.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 7.0.4)
- Microsoft.Extensions.Configuration.EnvironmentVariables (>= 7.0.0)
- Microsoft.Extensions.Logging (>= 7.0.0)
- RabbitMQ.Client (>= 6.5.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.