EnterpriseIntegration.RabbitMQ 1.0.1

There is a newer version of this package available.
See the version list below for details.
dotnet add package EnterpriseIntegration.RabbitMQ --version 1.0.1                
NuGet\Install-Package EnterpriseIntegration.RabbitMQ -Version 1.0.1                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="EnterpriseIntegration.RabbitMQ" Version="1.0.1" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add EnterpriseIntegration.RabbitMQ --version 1.0.1                
#r "nuget: EnterpriseIntegration.RabbitMQ, 1.0.1"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install EnterpriseIntegration.RabbitMQ as a Cake Addin
#addin nuget:?package=EnterpriseIntegration.RabbitMQ&version=1.0.1

// Install EnterpriseIntegration.RabbitMQ as a Cake Tool
#tool nuget:?package=EnterpriseIntegration.RabbitMQ&version=1.0.1                

.NET 6 Implementation of Enterprise Integration Pattern

Following the definition of EIP (Enterprise Integration Pattern) inspired by the Spring Integration.

This Project does not claim complete implementation but best effort - Feel free to help expand the project

Quick start

The Framework is working with Attributes to build the Integration Flow:

Example of a flow. A flow could be split over several classes, the different steps are connected by the name of the channels:

using EnterpriseIntegration.ChannelAttributes;

public class ExampleFlow001
{
    [ServiceActivator(inChannelId: "hello", outChannelId: "world")]
    public string Hello(string prefix)
    {
        return $"{prefix} hello";
    }

    [ServiceActivator(inChannelId: "world", outChannelId: "random")]
    public string World(string data)
    {
        return $"{data} world";
    }

    [Router(inChannelId: "random")]
    public string Randomizer(string data)
    {
        return Random.Shared.NextInt64() % 2 == 0 ? "hello" : "end";
    }

    [Endpoint(inChannelId: "end")]
    public void End(string data)
    {
        ...
    }
}

Starting the flow by sending a message:

using EnterpriseIntegration.Flow;

public class ProductController : Controller
{
    private readonly IMessageGateway _messageGateway;

    // Inject Gateway with Dependency Injection
    public ProductController(IMessageGateway messageGateway)
    {
        _messageGateway = messageGateway;
    }

    public async Task<ActionResult> OrderProduct(Product product)
    {
        // send message to flow for processing
        await _messageGateway.Send("order-product", product);
        return View();
    }
}

Register Enterprise Integration to the ServiceCollection

using EnterpriseIntegration;

public void ConfigureServices(IServiceCollection services)
{
    services
        // Register Flows used (so to leverage Dependency Injection)
        .AddSingleton<ExampleFlow001>()
        // Register Enterprise Integration 
        .UseEnterpriseIntegration();
}

Diagram

EIP Diagram

Feature overview

Feature Status Description
ServiceActivator DONE Allows to define a method which receives and sends a Message.
Router DONE Allows to define the next channel based on Conditions.
Endpoint DONE Allows to define a method which only receives a Message.
Splitter DONE Allows to split a single Message to several Messages (,to be aggregated again).
Aggregator DONE Allows to aggregate several Messages back into one (after being split).
Filter TODO Allows to only continue with a subset of Messages
WireTap DONE (PRE/POSTAction) Allows to consume Messages without being part of the flow
History TODO (POSTAction) Allows to Track the History of an Message
ErrorHandling DONE Exceptions are forwarded to an ErrorChannel
InMemoryChannel DONE Channel for passing messages in the same application
RabbitMQChannel DONE Channel for passing messages via RabbitMQ queues
KafkaChannel TODO Channel for passing messages via Kafka topics

Components

ServiceActivator

The ServiceActivator allows to activate/execute service from within the flow. This is the basic use case to execute code or call other services as part of a flow.

[ServiceActivator(inChannelId: "register-user", outChannelId: "handle-register-user-result")]
public UserRegistrationResult Register(User user)
{
    return UserService.Register(user);
}

The FlowEngine tries to map the provided payload to the parameters of the attributed method. In addition to just expecting the payload, it's also possible to expect the message headers and/or the message itself. Modifications to the message headers will be forwarded to further flow nodes.

[ServiceActivator(inChannelId: "register-user", outChannelId: "handle-register-user-result")]
public UserRegistrationResult Register(IMessage<User> userMessage)
{
    return UserService.Register(userMessage.Payload);
}
[ServiceActivator(inChannelId: "register-user", outChannelId: "handle-register-user-result")]
public user Authenticate(User user, IMessageHeaders headers)
{
    headers.Add("token", UserService.Authenticate(user));
    return user;
}

Router

A Router allows to route messages to different channels by evaluating any condition. The as router defined method, can execute any code and must at the end return a ChannelId, to where the originial message/payload is to be sent to.

[Router(inChannelId: "route-user-exists")]
public ChannelId RouteUserExists(User? user)
{
    return new ChannelId(user != null ? "set-user" : "load-user");
}
Built-In Router

The Framework comes with a predefined Router, which routes to a Channel provided in the headers.

using EnterpriseIntegration.Channels;

[Router(inChannelId: "load-users", outChannelId: EngineChannels.RouteByHeaderChannel)]
public async Task<User> LoadUser(string userId, IMessageHeaders headers)
{
    headers.RouteToChannel = new ChannelId(user != null ? "set-user" : "load-user");
    return await UserService.LoadUser(userId);
}

Endpoint

Endpoints are similar to ServiceActivator but are intended to be placed at the end of the flow, and therefor do not provide a outChannelId.


[Endpoint(inChannelId: "complete-user-creation")]
public async Task CompleteUserCreation(string userId)
{
    await UserService.SetUserActive(userId);
}

Splitter

Splitter allow to generate multiple follow up messages from a single message, these can later be aggregated with an Aggregator to complete the complete flow with a single message.

A Splitter adds meta information to the headers, to be used by an Aggregator to wait for completion of all sent messages.

[Splitter(inChannelId: "process-complete-order", outChannelId: "process-single-item")]
public IEnumerable<OrderItem> ProcessOrder(Oder order)
{
    return order.Items;
}

Aggregator

Aggregator wait for all/enough messages to be arrived, before processing them all. Waiting for all messages requires a MessageStore, where the messages can be stored, while waiting for others to arrive. In a setup with multiple apps, it is important to have a MessageStore which is shared over all instances.

[Aggregator("aggregate-completed-items", "complete-order")]
public CompleteOrder AggregateOrder(IEnumerable<CompleteOrderItem> completeOrderItem)
{
    return OrderService.CompleteOrder(completeOrderItem);
}

WireTap

WireTaps are used to listen to Messages without interrupting the Flow (for testing/debugging).

// to be injected by dependency injection
IWireTapService wireTapService

IMessage result = null;
// first parameter: name of the channel to be tapped
// second parameter: method to be executed, when a message arrives (in this example it stores the message in a variable)
WireTapId id = _wireTapService.CreateWireTap("name_of_channel", async msg => result = msg);

...

// remove the wiretap, when you are finished, to reduce overhead.
_wireTapService.RemoveWireTap(id);

Channels

EIP: Messaging Channels are responsible to transport messages between the different components of the Enterprise Integration Pattern. The default channel used, is an InMemoryChannel invoking other components in the same application. By replacing such a channel with another implementations (e.g. RabbitMQ), distribution of different applications can be achieved.

InMemoryChannel

see: EIP: Point to Point Channel

Is an InMemoryChannel allowing to connect two endpoints with eachother. The channel is One-to-One connection, directly moving the return value of one endpoint to the next endpoint.

This is the default channel type and also the fallback, if a channel is requested (for sending or receiving) and no channel has been previously registered, a new InMemoryChannel will be created.

RabbitMQChannel (AMQP)

Provides a simple channel implementation using RabbitMQ.

Registration of RabbitMQ

using EnterpriseIntegation.RabbitMQ;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.IO;

namespace EnterpriseIntegration.RabbitMQ.Tests;
public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        IConfigurationBuilder configBuilder = new ConfigurationBuilder()
            .SetBasePath(Directory.GetCurrentDirectory())
            .AddJsonFile("appsettings.json");
        IConfiguration config = configBuilder.Build();

        services
            .AddSingleton<ServiceActivatorFlow001>()
            // Enable RabbitMQ Messaging based on json config
            .WithRabbitMQMessaging(config)
            // Provide the channel "001_world" via RabbitMQ
            .WithRabbitMQChannel("001_world")
            .UseEnterpriseIntegration();
    }
}

The Registration of a RabbitMQChannel can be configured with additional parameters of the registration method.

Errors

Immediate Error Handling

If an Error/Exception happens immediately after handing a Message is pushed through the MessageGateway the Exception will be thrown to the calling Thread/Method. As soon as the first receiver handled the message, or the message has been pushed to an external channel (Kafka, RabbitMQ...) the flow handles the exception by forwarding it to the error channel.

Flow Error Handling / Error Channel

If an Error/Exception happens during the flow, the FlowEngine catches the exception and forwards it to an error channel. The error channel can be defined via the message headers - if no error channel is defined, the default error channel is used; with the behaviour to log the exception.

IMessageHeaders headers = new MessageHeaders();
headers.WithErrorChannel("custom-error-channel");
IMessage message = new GenericMessage<ExamplePayload>(headers, ExamplePayload.CreateRandom());

await _messageGateway.SendMessage("flow-entry", message);

Common Errors

Common Errors and how to fix them:

Exception How to solve
TooManyPayloadParameters The method used as a flow node receiver has too many "payload" parameters. A method should have only one value parameter, which could be either any type or a parameter of type IMessage<>. In addition it is possible to have an IMessageHeaders injected.
PayloadTransformation The payload of a message did not match the parameter defined in the receiving method. Change the return type of the sending message, or change the parameter of the receiving message

Architecture

Tests

Unit Tests

Integration Tests

Integration Tests are using xUnit Dependencies to setup real applications with proper ServiceCollections to test the flows. Tests for Channels (e.g. RabbitMQ) are using Docker Images (Setup with FluentDocker) for testing.

Load Tests

Load Tests are setup with NBomber to run scenarios and measure their execution time. To make the starting and usage of the console app, the Framework Cocona has been used, to give a nice CLI feeling.

Publish Load Tests

To run the Load tests the must be published

# generating a OS agnostic output
mkdir loadtests
cd loadtests
dotnet publish ..\tests\EnterpriseIntegration.LoadTests --output .

Run Load Tests

To execute the Load tests the published artifact can be started with a scenario parameter.

# running load test for simple scenario
EnterpriseIntegration.LoadTests.exe --scenario Simple

the report is by default generated into the folder reports

Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.3.0 252 4/1/2023
1.2.0 445 6/25/2022
1.1.0 427 6/25/2022
1.0.2 425 6/12/2022
1.0.1 412 6/11/2022
1.0.0 431 6/11/2022