Fluent.Queue
1.0.1
has bug onDelay message
See the version list below for details.
dotnet tool install --global Fluent.Queue --version 1.0.1
dotnet new tool-manifest # if you are setting up this repo dotnet tool install --local Fluent.Queue --version 1.0.1
#tool dotnet:?package=Fluent.Queue&version=1.0.1
nuke :add-package Fluent.Queue --version 1.0.1
What is Fluent Queue?
The Fluent Queue component provides a unified API across a variety of different queue services for the dotnet framework.
How can Install Fluent Queue?
dotnet add package Fluent.Queue --version 1.0.1
How can use Fluent Queue?
1. Producing
registration
Register all your queue connections in program.cs and also assign a name to each connection, to able setup custom connection for queues. and set a ConnectionName as the default connection to use if the queue connection is not changed.
builder.Services.AddMessageBus(
connections: new Dictionary<string, BaseBusConnectionDtoAbstract>
{
{
"LocalRabbit",
new RabbitMqConnectionDto{
HostName = "127.0.0.1",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
}
}
},
defaultConnection: "LocalRabbit"
);
usage
Inject FluentQueue.Interfaces.Bus.IBus
into your class through DI
. Then you can publish a message in the queue using the following code example.
Messages must be inherit of
FluentQueue.Interfaces.Message.IMessage
public abstract class MessageBase : IMessage
{
public MessageBase(object body, IMessageProperties? properties = null)
{
Body = body;
Properties = properties;
}
public object Body { get; set; }
public IMessageProperties? Properties { get; set; }
}
public class TestMessageBody
{
public required string Key { get; set; }
public required string Value { get; set; }
}
public class TestProperties : IMessageProperties
{
public string? CorrelationId { get; set; }
public DateTime? Expiration { get; set; }
}
public class TestExchange : IExchange
{
public string Name { get; set; } = "test";
public string Type { get; set; } = "direct";
}
public class TestMessage : MessageBase, IRabbitMqMessage
{
public TestMessage(object body, IMessageProperties? properties = null) : base(body, properties)
{
Exchange = new TestExchange();
RoutingKey = null;
}
public IExchange? Exchange { get; set; }
public string? RoutingKey { get; set; }
}
The use of
OnQueue
,OnConnection
andOnDelay
is optional.
Warning ⚠️
If you don't send
OnQueue
, the package will generate a default queue.
_bus.Message(
message: new TestMessage(
body: new TestMessageBody
{
Key = "key",
Value = "value"
},
properties: new TestProperties
{
CorrelationId = "test_correlation_id",
// Expiration = DateTime.Now.AddDays(1)
}
)
).OnQueue(
queue: new TestQueue()
).OnConnection(
connection: "LocalRabbit"
).OnDelay(
availableAt: DateTime.Now.AddSeconds(120)
).Dispatch();
2. Subscribing
registration
Similar to the first case, first make the connection of the queues. Then you can define your subscribers. Sending connectionName
, retry
and consumerCount
is optional.
builder.Services.AddMessageBus(
connections: new Dictionary<string, BaseBusConnectionDtoAbstract>
{
{
"LocalRabbit",
new RabbitMqConnectionDto{
HostName = "127.0.0.1",
Port = 5672,
UserName = "guest",
Password = "guest",
VirtualHost = "/"
}
}
},
defaultConnection: "LocalRabbit"
).AddSubscriberBus(
queues: new List<IQueue>{
new TestQueue()
},
connectionName: "LocalRabbit",
retry: 3,
consumerCount: 1
);
usage
To use, you must define jobs that inherit from FluentQueue.Interfaces.Job.IInvocableJob<TMessage>
. Whenever the message in the queue can be deserialized with the generic job parameter. Then this job will be fired.
public class TestJob : IInvocableJob<TestMessageBody>
{
public Task FailedJob(TestMessageBody message, object? properties)
{
throw new NotImplementedException();
}
public Task Invoke(TestMessageBody message, string? correlationId)
{
throw new NotImplementedException();
}
}
What drivers does Fluent Queue support?
More drivers will be added to following collection in the near future.
- RabbitMQ
How to add custom driver to Fluent Queue?
Custom driver registration has three steps:
- Create a dto of connection parameters. With the help of inheritance from
FluentQueue.Implementation.Connection.BaseBusConnectionDtoAbstract
. - Create a connection. With the help of inheritance from
FluentQueue.Implementation.Connection.ConnectionBuilderAbstract<TConnection, TConnectionDto>
. - Create a producer. With the help of inheritance from
FluentQueue.Implementation.Bus.ProducerAbstract<TConnection, TMessage>
. - Create a consumer. With the help of inheritance from
FluentQueue.Implementation.Bus.SubscriberAbstract<TConnection>
.
Note ⚠️
In the producer, if the queue is not sent, a default queue must be created.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net7.0 is compatible. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
This package has no dependencies.