Cleipnir.Flows 4.1.4

Prefix Reserved
There is a newer version of this package available.
See the version list below for details.
dotnet add package Cleipnir.Flows --version 4.1.4                
NuGet\Install-Package Cleipnir.Flows -Version 4.1.4                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Cleipnir.Flows" Version="4.1.4" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Cleipnir.Flows --version 4.1.4                
#r "nuget: Cleipnir.Flows, 4.1.4"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Cleipnir.Flows as a Cake Addin
#addin nuget:?package=Cleipnir.Flows&version=4.1.4

// Install Cleipnir.Flows as a Cake Tool
#tool nuget:?package=Cleipnir.Flows&version=4.1.4                

.NET NuGet NuGet alt Join the conversation

<p align="center"> <img src="https://github.com/stidsborg/Cleipnir.Flows/blob/main/Docs/cleipnir.png" alt="logo" /> <br> Simply making <strong>crash-resilient</strong> code <strong>simple</strong> <br> </p>

Cleipnir.NET

Cleipnir Flows is a powerful durable execution .NET framework - ensuring your code will always execute to completetion correctly.

  • Makes C#-code behave correctly after a crash, restart or suspension
  • Wait for external events directly inside your code
  • Suspend code execution for minutes, hours, weeks or longer
  • Requires only a database
  • Use with ASP.NET / generic host service
  • Integrates easily with all message-brokers and service-buses
  • Removes need for saga-pattern and outbox-pattern
  • Powerful alterrnative to job-schedulers (HangFire, Quartz)

Abstractions

Cleipnir.NET provides the following 3 abstractions:

Capture

Remembers the result of arbitary code:

var transactionId = await Capture("TransactionId", () => Guid.NewGuid());
//or simply
var transactionId = await Capture(Guid.NewGuid);

Messages

Wait for retrival of external message - without taking up resources:

var fundsReserved = await Messages<FundsReserved>(timesOutIn: TimeSpan.FromMinutes(5));

Suspension

Suspends the current execution at-will, resuming after some duration:

await Delay(TimeSpan.FromMinutes(5));

Examples

Message-brokered (source code):

[GenerateFlows]
public class OrderFlow(IBus bus) : Flow<Order>
{
    public override async Task Run(Order order)
    {
        var transactionId = await Capture(Guid.NewGuid); //generated transaction id is fixed after this statement

        await PublishReserveFunds(order, transactionId);
        await Message<FundsReserved>(); //execution is suspended until a funds reserved message is received
        
        await PublishShipProducts(order);
        var trackAndTraceNumber = (await Message<ProductsShipped>()).TrackAndTraceNumber; 
        
        await PublishCaptureFunds(order, transactionId);
        await Message<FundsCaptured>();
        
        await PublishSendOrderConfirmationEmail(order, trackAndTraceNumber);
        await Message<OrderConfirmationEmailSent>();
    }

    private Task PublishReserveFunds(Order order, Guid transactionId) 
        => Capture(async () => await bus.Publish(new ReserveFunds(order.OrderId, order.TotalPrice, transactionId, order.CustomerId)));
}

RPC (source code):

[GenerateFlows]
public class OrderFlow(
    IPaymentProviderClient paymentProviderClient,
    IEmailClient emailClient,
    ILogisticsClient logisticsClient
) : Flow<Order>
{
    public override async Task Run(Order order)
    {
        var transactionId = Capture(() => Guid.NewGuid); //generated transaction id is fixed after this statement

        await paymentProviderClient.Reserve(order.CustomerId, transactionId, order.TotalPrice);
        var trackAndTrace = await Capture( 
            () => paymentProviderClient.Capture(transactionId),
            ResiliencyLevel.AtMostOnce
        ); //external calls can also be captured - will never be called multiple times

        await emailClient.SendOrderConfirmation(order.CustomerId, trackAndTrace, order.ProductIds);
    }
}

What is durable execution?

Durable execution is an emerging paradigm for simplifying the implementation of code which can safely resume execution after a process crash or restart (i.e. after a production deployment). It allows the developer to implement such code using ordinary C#-code with loops, conditionals and so on.

Furthermore, durable execution allows suspending code execution for an arbitraty amount of time - thereby saving process resources.

Essentially, durable execution works by saving state at explicitly defined points during the invocation of code, thereby allowing the framework to skip over previously executed parts of the code when/if the code is re-executed. This occurs both after a crash and suspension.

Why durable execution?

Currently, implementing resilient business flows either entails (1) sagas (i.e. MassTransit, NServiceBus) or (2) job-schedulers (i.e. HangFire).

Both approaches have a unique set of challenges:

  • Saga - becomes difficult to implement for real-world scenarios as they are either realized by declaratively constructing a state-machine or implementing a distinct message handler per message type.
  • Job-scheduler - requires one to implement idempotent code by hand (in case of failure). Moreover, it cannot be integrated with message-brokers and does not support programmatically suspending a job in the middle of its execution.

Getting Started

To get started simply perform the following three steps in an ASP.NET or generic-hosted service (sample repo):

Firstly, install the Cleipnir.Flows nuget package (using either Postgres, SqlServer or MariaDB as persistence layer). I.e.

Install-Package Cleipnir.Flows.Postgres

Secondly, add the following to the setup in Program.cs (source code):

builder.Services.AddFlows(c => c
  .UsePostgresSqlStore(connectionString)  
  .RegisterFlowsAutomatically()
);

RPC Flows

Finally, implement your flow (source code):

[GenerateFlows]
public class OrderFlow(
    IPaymentProviderClient paymentProviderClient,
    IEmailClient emailClient,
    ILogisticsClient logisticsClient
) : Flow<Order>
{
    public override async Task Run(Order order)
    {
        var transactionId = Capture(() => Guid.NewGuid);

        await paymentProviderClient.Reserve(order.CustomerId, transactionId, order.TotalPrice);
        var trackAndTrace = await Capture(
            () => paymentProviderClient.Capture(transactionId),
            ResiliencyLevel.AtMostOnce
        );

        await emailClient.SendOrderConfirmation(order.CustomerId, trackAndTrace, order.ProductIds);
    }
}

Message-brokered Flows

Or, if the flow is using a message-broker (source code):

[GenerateFlows]
public class OrderFlow(IBus bus) : Flow<Order>
{
    public override async Task Run(Order order)
    {
        var transactionId = await Capture(Guid.NewGuid);

        await PublishReserveFunds(order, transactionId);
        await Message<FundsReserved>();
        
        await PublishShipProducts(order);
        var trackAndTraceNumber = (await Message<ProductsShipped>()).TrackAndTraceNumber;
        
        await PublishCaptureFunds(order, transactionId);
        await Message<FundsCaptured>();
        
        await PublishSendOrderConfirmationEmail(order, trackAndTraceNumber);
        await Message<OrderConfirmationEmailSent>();
    }

The implemented flow can then be started using the corresponding source generated Flows-type (source code):

[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
    private readonly OrderFlows _orderFlows;

    public OrderController(OrderFlows orderFlows) => _orderFlows = orderFlows;

    [HttpPost]
    public async Task Post(Order order) => await _orderFlows.Run(order.OrderId, order);
}

Media

Video link      Video link

Discord

Get live help at the Discord channel:

alt Join the conversation

Service Bus Integrations

It is simple to use Cleipnir with all the popular service bus frameworks. In order to do simply implement an event handler - which forwards received events - for each flow type:

MassTransit Handler

public class SimpleFlowsHandler(SimpleFlows simpleFlows) : IConsumer<MyMessage>
{
    public Task Consume(ConsumeContext<MyMessage> context) 
        => simpleFlows.SendMessage(context.Message.Value, context.Message);
}

Source code

NServiceBus Handler

public class SimpleFlowsHandler(SimpleFlows flows) : IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage message, IMessageHandlerContext context)
        => flows.SendMessage(message.Value, message);
}

Source code

Rebus Handler

public class SimpleFlowsHandler(SimpleFlows simpleFlows) : IHandleMessages<MyMessage>
{
    public Task Handle(MyMessage msg) => simpleFlows.SendMessage(msg.Value, msg);
}

Source code

Wolverine Handler

public class SimpleFlowsHandler(SimpleFlows flows)
{
    public Task Handle(MyMessage myMessage)
        => flows.SendMessage(myMessage.Value, myMessage);
}

Source code

Publish multiple messages in batch

await flows.SendMessages(batchedMessages);

Source code

Kafka Handler

ConsumeMessages(
  batchSize: 10,
  topic,
  handler: messages =>
    flows.SendMessages(
      messages.Select(msg => new BatchedMessage(msg.Instance, msg)).ToList()
));

Source code

More examples

As an example is worth a thousand lines of documentation - various useful examples are presented in the following section:

Avoid re-executing already completed code:

[GenerateFlows]
public class AtLeastOnceFlow : Flow<string, string>
{
  private readonly PuzzleSolverService _puzzleSolverService = new();

  public override async Task<string> Run(string hashCode)
  {
    var solution = await Effect.Capture(
      id: "PuzzleSolution",
      work: () => _puzzleSolverService.SolveCryptographicPuzzle(hashCode)
    );

    return solution;
  }
}

Ensure code is executed at-most-once:

[GenerateFlows]
public class AtMostOnceFlow : Flow<string>
{
    private readonly RocketSender _rocketSender = new();
    
    public override async Task Run(string rocketId)
    {
        await Effect.Capture(
            id: "FireRocket",
            _rocketSender.FireRocket,
            ResiliencyLevel.AtMostOnce
        );
    }
}

Wait for 2 external messages before continuing flow (source code):

[GenerateFlows]
public class WaitForMessagesFlow : Flow<string>
{
  public override async Task Run(string param)
  {
    await Messages
      .OfTypes<FundsReserved, InventoryLocked>()
      .Take(2)
      .Completion(maxWait: TimeSpan.FromSeconds(30));

    System.Console.WriteLine("Complete order-processing");
  }
}

When the max wait duration has passed the flow is automatically suspended in order to save resources. Thus, the flow can also be suspended immediately when all messages have not been received:

await Messages
  .OfTypes<FundsReserved, InventoryLocked>()
  .Take(2)
  .Completion();

Emit a signal to a flow (source code):

var messagesWriter = flows.MessagesWriter(orderId);
await messagesWriter.AppendMessage(new FundsReserved(orderId), idempotencyKey: nameof(FundsReserved));

Restart a failed flow (source code):

var controlPanel = await flows.ControlPanel(flowId);
controlPanel!.Param = "valid parameter";
await controlPanel.ReInvoke();

Postpone a running flow (without taking in-memory resources) (source code):

[GenerateFlows]
public class PostponeFlow : Flow<string>
{
  private readonly ExternalService _externalService = new();

  public override async Task Run(string orderId)
  {
    if (await _externalService.IsOverloaded())
      Postpone(delay: TimeSpan.FromMinutes(10));
        
    //execute rest of the flow
  }
}

Add metrics middleware (source code):

public class MetricsMiddleware : IMiddleware
{
  private Action IncrementCompletedFlowsCounter { get; }
  private Action IncrementFailedFlowsCounter { get; }
  private Action IncrementRestartedFlowsCounter { get; }

  public MetricsMiddleware(Action incrementCompletedFlowsCounter, Action incrementFailedFlowsCounter, Action incrementRestartedFlowsCounter)
  {
    IncrementCompletedFlowsCounter = incrementCompletedFlowsCounter;
    IncrementFailedFlowsCounter = incrementFailedFlowsCounter;
    IncrementRestartedFlowsCounter = incrementRestartedFlowsCounter;
  }

  public async Task<Result<TResult>> Run<TFlow, TParam, TResult>(
    TParam param, 
    Context context, 
    Next<TFlow, TParam, TResult> next) where TParam : notnull
  {
    var started = workflow.Effect.TryGet<bool>(id: "Started", out _);
    if (started)
      IncrementRestartedFlowsCounter();
    else
      await workflow.Effect.Upsert("Started", true);
        
    var result = await next(param, workflow);
    if (result.Outcome == Outcome.Fail)
      IncrementFailedFlowsCounter();
    else if (result.Outcome == Outcome.Succeed)
      IncrementCompletedFlowsCounter();
        
    return result;
  }
}

Distributed system challenges

When distributed systems needs to cooperator in order to fulfill some business process a system crash or restart may leave the system in an inconsistent state.

Consider the following order-flow:

public async Task ProcessOrder(Order order)
{
  await _paymentProviderClient.Reserve(order.TransactionId, order.CustomerId, order.TotalPrice);
  await _logisticsClient.ShipProducts(order.CustomerId, order.ProductIds);
  await _paymentProviderClient.Capture(order.TransactionId);
  await _emailClient.SendOrderConfirmation(order.CustomerId, order.ProductIds);
}    

Currently, the flow is not resilient against crashes or restarts.

For instance, if the process crashes just before capturing the funds from the payment provider then the ordered products are shipped to the customer but nothing is deducted from the customer’s credit card. Not an ideal situation for the business. No matter how we rearrange the flow a crash might lead to either situation:

  • products are shipped to the customer without payment being deducted from the customer’s credit card
  • payment is deducted from the customer’s credit card but products are never shipped

Ensuring flow-restart on crashes or restarts:

Thus, to rectify the situation we must ensure that the flow is restarted if it did not complete in a previous execution.

RPC-solution

Consider the following Order-flow:

[GenerateFlows]
public class OrderFlow : Flow<Order>
{
  private readonly IPaymentProviderClient _paymentProviderClient;
  private readonly IEmailClient _emailClient;
  private readonly ILogisticsClient _logisticsClient;

  public OrderFlow(IPaymentProviderClient paymentProviderClient, IEmailClient emailClient, ILogisticsClient logisticsClient)
  {
    _paymentProviderClient = paymentProviderClient;
    _emailClient = emailClient;
    _logisticsClient = logisticsClient;
  }

  public async Task ProcessOrder(Order order)
  {
    Log.Logger.ForContext<OrderFlow>().Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' started");

    var transactionId = Guid.Empty;
    await _paymentProviderClient.Reserve(order.CustomerId, transactionId, order.TotalPrice);
    await _logisticsClient.ShipProducts(order.CustomerId, order.ProductIds);
    await _paymentProviderClient.Capture(transactionId);
    await _emailClient.SendOrderConfirmation(order.CustomerId, order.ProductIds);

    Log.Logger.ForContext<OrderFlow>().Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' completed");
  }       
}

Sometimes simply wrapping a business flow inside the framework is enough.

This would be the case if all the steps in the flow were idempotent. In that situation it is fine to call an endpoint multiple times without causing unintended side-effects.

At-least-once & Idempotency

However, in the order-flow presented here this is not the case.

The payment provider requires the caller to provide a transaction-id. Thus, the same transaction-id must be provided when re-executing the flow.

In Cleipnir this challenge is solved by wrapping non-determinism inside effects.

public async Task ProcessOrder(Order order)
{
  Log.Logger.Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' started");

  var transactionId = Effect.Capture("TransactionId", Guid.NewGuid);
  
  await _paymentProviderClient.Reserve(transactionId, order.CustomerId, order.TotalPrice);
  await _logisticsClient.ShipProducts(order.CustomerId, order.ProductIds);
  await _paymentProviderClient.Capture(transactionId);
  await _emailClient.SendOrderConfirmation(order.CustomerId, order.ProductIds);

  Log.Logger.ForContext<OrderProcessor>().Information($"Processing of order '{order.OrderId}' completed");
}
At-most-once API:

For the sake of presenting the framework’s versatility let us assume that the logistics’ API is not idempotent and that it is out of our control to change that.

Thus, every time a successful call is made to the logistics service the content of the order is shipped to the customer.

As a result the order-flow must fail if it is restarted and:

  • a request was previously sent to logistics-service
  • but no response was received.

This can again be accomplished by using effects:

public async Task ProcessOrder(Order order)
{
  Log.Logger.Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' started");

  var transactionId = await Effect.Capture("TransactionId", Guid.NewGuid);
  await _paymentProviderClient.Reserve(order.CustomerId, transactionId, order.TotalPrice);

  await Effect.Capture(
    id: "ShipProducts",
    work: () => _logisticsClient.ShipProducts(order.CustomerId, order.ProductIds)
  );
  
  await _paymentProviderClient.Capture(transactionId);           
  await _emailClient.SendOrderConfirmation(order.CustomerId, order.ProductIds);

  Log.Logger.ForContext<OrderProcessor>().Information($"Processing of order '{order.OrderId}' completed");
}  

A failed/exception throwing flow is not automatically retried by the framework.

Instead, it must be manually restarted by using the flow's associated control-panel.

Control Panel:

Using the flow’s control panel both the parameter and scrapbook may be changed before the flow is retried.

For instance, assuming it is determined that the products where not shipped for a certain order, then the following code re-invokes the order with the state changed accordingly.

var controlPanel = await flows.ControlPanel(order.OrderId);
await controlPanel!.Effects.Remove("ShipProducts");
await controlPanel.ReInvoke();

Message-based Solution

Message- or event-driven system are omnipresent in enterprise architectures today.

They fundamentally differ from RPC-based in that:

  • messages related to the same order are not delivered to the same process.

This has huge implications in how a flow must be implemented and as a result a simple sequential flow - as in the case of the order-flow:

  • becomes fragmented and hard to reason about
  • inefficient - each time a message is received the entire state must be reestablished
  • inflexible

Cleipnir Flows takes a novel approach by piggy-backing on the features described so far and using event-sourcing and reactive programming together to form a simple and extremely useful abstraction.

As a result the order-flow can be implemented as follows:

public async Task ProcessOrder(Order order)
{
  Log.Logger.Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' started");  

  await _bus.Send(new ReserveFunds(order.OrderId, order.TotalPrice, Scrapbook.TransactionId, order.CustomerId));
  await Messages.NextOfType<FundsReserved>();
            
  await _bus.Send(new ShipProducts(order.OrderId, order.CustomerId, order.ProductIds));
  await Messages.NextOfType<ProductsShipped>();
            
  await _bus.Send(new CaptureFunds(order.OrderId, order.CustomerId, Scrapbook.TransactionId));
  await Messages.NextOfType<FundsCaptured>();

  await _bus.Send(new SendOrderConfirmationEmail(order.OrderId, order.CustomerId));
  await Messages.NextOfType<OrderConfirmationEmailSent>();

  Log.Logger.ForContext<OrderProcessor>().Information($"Processing of order '{order.OrderId}' completed");      
}

There is a bit more going on in the example above compared to the previous RPC-example. However, the flow is actually very similar to RPC-based. It is sequential and robust. If the flow crashes and is restarted it will continue from the point it got to before the crash.

It is noted that the message broker in the example is just a stand-in - thus not a framework concept - for RabbitMQ, Kafka or some other messaging infrastructure client.

In a real application the message broker would be replaced with the actual way the application broadcasts a message/event to other services.

Furthermore, each flow has an associated private event source called Messages. When events are received from the network they can be placed into the relevant flow's event source - thereby allowing the flow to continue.

Did you know?
The framework allows awaiting events both in-memory or suspending the invocation until an event has been appended to the event source. </br>Thus, allowing the developer to find the sweet-spot per use-case between performance and releasing resources.
Product Compatible and additional computed target framework versions.
.NET net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (3)

Showing the top 3 NuGet packages that depend on Cleipnir.Flows:

Package Downloads
Cleipnir.Flows.SqlServer

Cleipnir Flows is a powerful .NET framework designed for ASP.NET services, providing a straightforward workflow-as-code approach. It ensures that your code executes reliably, even in the face of failures, restarts, deployments, versioning, and other challenges. While similar to Azure Durable Functions, Cleipnir Flows is specifically tailored for ASP.NET Core.

Cleipnir.Flows.PostgresSql

Cleipnir Flows is a powerful .NET framework designed for ASP.NET services, providing a straightforward workflow-as-code approach. It ensures that your code executes reliably, even in the face of failures, restarts, deployments, versioning, and other challenges. While similar to Azure Durable Functions, Cleipnir Flows is specifically tailored for ASP.NET Core.

Cleipnir.Flows.MariaDB

Cleipnir Flows is a powerful .NET framework designed for ASP.NET services, providing a straightforward workflow-as-code approach. It ensures that your code executes reliably, even in the face of failures, restarts, deployments, versioning, and other challenges. While similar to Azure Durable Functions, Cleipnir Flows is specifically tailored for ASP.NET Core.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
4.1.5 20 3/25/2025
4.1.4 94 3/21/2025
4.1.3 70 3/15/2025
4.1.2 126 3/9/2025
4.1.1 131 3/9/2025
4.0.12 112 2/14/2025
4.0.11 111 2/8/2025
4.0.10 110 2/3/2025
4.0.9 109 1/30/2025
4.0.8 111 1/26/2025
4.0.7 103 1/25/2025
4.0.6 105 1/22/2025
4.0.5 94 1/22/2025
4.0.4 93 1/16/2025
4.0.3 94 1/16/2025
4.0.2 93 1/12/2025
4.0.1 102 1/11/2025
4.0.0 115 1/5/2025
3.0.3 157 8/30/2024
3.0.2 107 7/25/2024
3.0.1 112 7/25/2024
3.0.0 141 7/14/2024
2.0.1 314 3/16/2024
1.0.16 1,348 9/2/2023
1.0.15 1,273 8/30/2023
1.0.14 1,418 8/13/2023
1.0.13 1,403 8/11/2023
1.0.12 1,547 8/2/2023
1.0.11 1,552 8/1/2023
1.0.9 1,494 7/31/2023
1.0.8 1,558 7/24/2023
1.0.7 1,538 7/19/2023
1.0.6 1,479 7/12/2023