MemoryMQ 1.2.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package MemoryMQ --version 1.2.0
NuGet\Install-Package MemoryMQ -Version 1.2.0
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MemoryMQ" Version="1.2.0" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add MemoryMQ --version 1.2.0
#r "nuget: MemoryMQ, 1.2.0"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install MemoryMQ as a Cake Addin
#addin nuget:?package=MemoryMQ&version=1.2.0

// Install MemoryMQ as a Cake Tool
#tool nuget:?package=MemoryMQ&version=1.2.0

<img src="icon.png" style="zoom: 33%;" />MemoryMQ

δΈ­ζ–‡|English

Introduction

A memory-based(System.Threading.Channels) message queue library , primarily designed for simple standalone projects that do not want to introduce dependencies like RabbitMQ but still need a messaging queue with the following features:

  1. Retry on failure (fixed interval, incremental interval, exponential interval)
  2. Message persistence
  3. Custom the concurrency, persistence, retry count, etc. of each consumer
  4. Message compression

Usage

Supports .NET 6 and above projects. Usage:

  1. add nuget library
dotnet add package MemoryMQ
  1. register services and consumers
builder.Services.AddMemoryMQ(it =>
{
    // support persistent, only Sqlite
    it.EnablePersistent = true;
    
    it.RetryMode = RetryMode.Incremental;
    
    it.RetryInterval = TimeSpan.FromSeconds(5);
});

// add consumers, use Scoped lifetime
builder.Services.AddScoped<ConsumerA>();
builder.Services.AddScoped<ConsumerB>();
  1. configure consumers

// implement IMessageConsumer interface
public class ConsumerA : IMessageConsumer
{
    private readonly ILogger<ConsumerA> _logger;

    public ConsumerA(ILogger<ConsumerA> logger)
    {
        _logger = logger;
    }

    public MessageOptions GetMessageConfig()
    {
        return new MessageOptions()
        {
            Topic = "topic-a",
            ParallelNum = 5,
            RetryCount = 3
        };
    }

    public Task ReceivedAsync(IMessage message, CancellationToken cancellationToken)
    {
        _logger.LogInformation("received {MessageBody} {Now}", message.Body, DateTime.Now);
        return Task.CompletedTask;
    }

    public Task FailureRetryAsync(IMessage message, CancellationToken cancellationToken)
    {
        _logger.LogInformation("retry max times {RetryTimes} {MessageBody} {Now}",message.GetRetryCount(), message.Body, DateTime.Now);
        return Task.CompletedTask;
    }
}
  1. send message

var publisher=serviceProvider.GetService<IMessagePublisher>();

var message = new Message()
{
    Body = "hello world",
    Topic = "topic-a"
};
await publisher.SendAsync(message);

// or this way
await publisher.SendAsync("topic-a","hello world");

Message Options

MemoryMQ supports global configuration as well as individual configuration for each consumer.. Global configuration is configured in the AddMemoryMQ method, and the configuration for each consumer is configured in the GetMessageOptions method. The global configuration is as follows:

/// <summary>
/// MemoryMQ options
/// </summary>
public class MemoryMQOptions
{
    /// <summary>
    /// consumer assemblies
    /// </summary>
    public Assembly[] ConsumerAssemblies { get; set; } = { Assembly.GetEntryAssembly()! };
    
    /// <summary>
    /// global channel max size
    /// </summary>
    public int GlobalMaxChannelSize { get; set; } = 10000;

    /// <summary>
    /// behavior when channel is full, default is wait
    /// </summary>
    public BoundedChannelFullMode GlobalBoundedChannelFullMode { get; set; } = BoundedChannelFullMode.Wait;
    
    /// <summary>
    /// enable message compression, default is true
    /// </summary>
    public bool EnableCompression { get; set; } = true;

    /// <summary>
    /// interval to poll message from queue
    /// </summary>
    public TimeSpan PollingInterval { get; set; } = TimeSpan.FromMilliseconds(500);

    /// <summary>
    /// enable persistent (only support sqlite), default is true
    /// </summary>
    public bool EnablePersistence { get; set; } = true;

    /// <summary>
    /// database connection string (now only support sqlite), default is 'data source=memorymq.db'
    /// </summary>
    public string DbConnectionString { get; set; } = "data source=memorymq.db";


    /// <summary>
    /// global retry count, null or 0 means no retry
    /// </summary>
    public uint? GlobalRetryCount { get; set; } = 3; 
    
    /// <summary>
    /// retry interval, default is 10 seconds
    /// </summary>
    public TimeSpan RetryInterval { get; set; } = TimeSpan.FromSeconds(10);

    /// <summary>
    /// retry mode, default is fixed
    /// <see cref="RetryMode"/>
    /// </summary>
    public RetryMode RetryMode { get; set; } = RetryMode.Fixed;
}

The configuration for each consumer is as follows:

/// <summary>
/// consume message options
/// </summary>
public class MessageOptions
{
    /// <summary>
    /// message topic, required and unique
    /// </summary>
    public string Topic { get; init; } = null!;

    /// <summary>
    /// parallel num, default is 1
    /// </summary>
    public uint ParallelNum { get; init; } = 1;
    
    /// <summary>
    /// retry count, if null will use global setting, if 0 means no retry
    /// </summary>
    public uint? RetryCount { get; init; } 
    
    /// <summary>
    /// Enable Persistence (only support sqlite), default is null (will use global setting)
    /// </summary>
    public bool? EnablePersistence { get; set; }
    
    /// <summary>
    /// behavior when channel is full, default is null (will use global setting)
    /// </summary>
    public BoundedChannelFullMode? BoundedChannelFullMode { get; set; }
    
    /// <summary>
    /// max channel size (default is null, will use global setting)
    /// </summary>
    public int? MaxChannelSize { get; set; }
    
    /// <summary>
    /// enable compression, default is null (will use global setting)
    /// </summary>
    public bool? EnableCompression { get; set; }
}

Consumers need to configure at least the Topic field, and other fields are optional. If not configured, global configuration will be used

Benchmark

Data Size: 53.2KB per message, message count: 100

// * Summary *

BenchmarkDotNet v0.13.6, Windows 10 (10.0.19045.3208/22H2/2022Update)
AMD Ryzen 7 PRO 4750U with Radeon Graphics, 1 CPU, 16 logical and 8 physical cores
.NET SDK 6.0.406
[Host]   : .NET 6.0.15 (6.0.1523.11507), X64 RyuJIT AVX2
.NET 6.0 : .NET 6.0.15 (6.0.1523.11507), X64 RyuJIT AVX2

Job=.NET 6.0  Runtime=.NET 6.0

|                               Method |       Mean |     Error |     StdDev |     Median |
|------------------------------------- |-----------:|----------:|-----------:|-----------:|
|                   PublishWithPersist | 104.733 ms | 3.2088 ms |  8.8917 ms | 106.186 ms |
|                PublishWithoutPersist |   3.145 ms | 0.3250 ms |  0.9584 ms |   3.937 ms |
| PublishWithoutPersistAndWithCompress |  36.384 ms | 3.9532 ms | 11.6560 ms |  30.872 ms |
|    PublishWithPersistAndWithCompress |  57.261 ms | 3.6179 ms | 10.4962 ms |  56.048 ms |
Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.4.1 234 11/24/2023
1.4.0 192 8/9/2023
1.3.0 166 8/9/2023
1.2.1 115 11/24/2023
1.2.0 150 7/29/2023
1.1.4 172 7/24/2023
1.1.1 176 7/21/2023
1.1.0 154 7/14/2023
1.0.1 138 7/13/2023
1.0.0 140 7/10/2023
0.0.2 171 7/9/2023
0.0.1 142 7/8/2023