Larva.Messaging
1.0.0
The owner has unlisted this package.
This could mean that the package is deprecated, has security vulnerabilities or shouldn't be used anymore.
dotnet add package Larva.Messaging --version 1.0.0
NuGet\Install-Package Larva.Messaging -Version 1.0.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Larva.Messaging" Version="1.0.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Larva.Messaging --version 1.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
#r "nuget: Larva.Messaging, 1.0.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Larva.Messaging as a Cake Addin
#addin nuget:?package=Larva.Messaging&version=1.0.0
// Install Larva.Messaging as a Cake Tool
#tool nuget:?package=Larva.Messaging&version=1.0.0
The NuGet Team does not provide support for this client. Please contact its maintainers for support.
消息类库
基于消息队列的消息库
- 支持消息路由、顺序消费;
- 支持部署多台消费端程序(相同Exchange下,一个队列仅允许被一个消费端程序消费;但一个消费端程序可以同时消费多个队列);
- 采用Envelope<T>进行消息封装,通过实现接口IMessageHandler<T>进行消息处理;
- 消息发送,默认发送到mq失败时会不断重试,也可以通过初始化TopicSender时显式指定重试次数;
- 同一个队列,如果消息处理失败,则会不断重试,直到显式标记为丢弃消息(通过IMessageTransportationContext.DropMessage);
- 支持专题模式(Topic)和发布订阅模式(Pubsub);
- 发送端和消息端使用的消息类之间解耦,可以通过MessageTypeAttribute对消息类进行消息类型声明,默认为带名字空间的类名;
- 消费端反序列化失败,或者找不到对应的消息处理器时,利用rabbitmq的DLX特性,丢到一个DLX Exchange(使用默认Exchange)中,DLX Exchange发送的队列的命名=Exchange名 + "-dlx",发布订阅模式需要手工建;
调用示例
- 定义消息
// 可以定义消息类型名,不设置,默认取带名字空间的类名
//[MessageType("Tests.Messages.1")]
public class Message01
{
public Guid HostingFilialeId { get; set; }
public Message01() { }
public Message01(Guid hostingFilialeId)
{
HostingFilialeId = hostingFilialeId;
}
}
- 消息发送
// 创建 Rabbitmq 连接(此为全局变量,一个应用共享一个连接即可)
var conn = new Connection(new ConnectionConfig(new Uri("amqp://user:pwd@host:port/virtual_host")));
// 创建Topic发送者(此为全局变量,一个对象实例对应使用一个rabbitmq的exchange)
ITopicSender publisher = new TopicSender(conn, exchangeName, queueCount);
//如果采用发布订阅方式,则如下声明
//IPubsubSender publisher = new PubsubSender(conn, exchangeName);
// 发送消息
var message = new Message01(new Guid("58437EDC-87B7-4995-A5C0-BB5FD0FE49E0"));
publisher.SendMessage(Envelope.Create(message, message.HostingFilialeId.ToString()));
// 释放
publisher.Dispose();
- 消息消费
// 定义消息处理器
// 注意:一个处理器类,可以定义为多个消息类型的处理器;但针对同一个Exchange,一个消息类型仅允许对应一个消息处理器
public class MessageHandler01 : IMessageHandler<Message01>, IMessageHandler<Message02>
{
public void Handle(Message01 message, IMessageTransportationContext context)
{
if (context.RetryCount > 3)
{
// 执行次数超过3次,则记录日志
context.DropMessage();// 丢弃消息
return;
}
// 处理消息 Message01
}
public void Handle(Message02 message, IMessageTransportationContext context)
{
if (context.RetryCount > 3)
{
// 执行次数超过3次,则记录日志
context.DropMessage();// 丢弃消息
return;
}
// 处理消息 Message02
}
}
// 创建 Rabbitmq 连接(此为全局变量,一个应用共享一个连接即可)
var conn = new Connection(new ConnectionConfig(new Uri("amqp://user:pwd@host:port/virtual_host")));
// 创建接受者
ITopicReceiver receiver = new TopicReceiver(conn, exchangeName, queueCount);
//如果采用发布订阅方式,则如下声明
//IPubsubReceiver receiver = new PubsubReceiver(conn, queueName);
// 注册消息处理器
receiver.RegisterMessageHandler<MessageHandler01>();
//如果按指定程序集下注册所有的消息处理器,可以如下调用
//receiver.RegisterMessageHandlerByAssembly(typeof(MessageHandler01).Assembly);
// 订阅所有队列
receiver.SubscribeAll();
// 如果采用分布式部署多台消费端后端程序,则通过指定队列序号来实现
// 注意:同一个队列,仅允许一个后端程序;但一个后端程序可以消费多个队列
// receiver.Subscribe(0);
//如果采用发布订阅方式,则如下调用
//receiver.Subscribe();
更新历史
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.
-
.NETStandard 2.0
- log4net (>= 2.0.8)
- Newtonsoft.Json (>= 11.0.2)
- RabbitMQ.Client (>= 5.0.1)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|