Cleipnir.Flows.PostgresSql
4.1.5
Prefix Reserved
dotnet add package Cleipnir.Flows.PostgresSql --version 4.1.5
NuGet\Install-Package Cleipnir.Flows.PostgresSql -Version 4.1.5
<PackageReference Include="Cleipnir.Flows.PostgresSql" Version="4.1.5" />
paket add Cleipnir.Flows.PostgresSql --version 4.1.5
#r "nuget: Cleipnir.Flows.PostgresSql, 4.1.5"
// Install Cleipnir.Flows.PostgresSql as a Cake Addin #addin nuget:?package=Cleipnir.Flows.PostgresSql&version=4.1.5 // Install Cleipnir.Flows.PostgresSql as a Cake Tool #tool nuget:?package=Cleipnir.Flows.PostgresSql&version=4.1.5
<p align="center"> <img src="https://github.com/stidsborg/Cleipnir.Flows/blob/main/Docs/cleipnir.png" alt="logo" /> <br> Simply making <strong>crash-resilient</strong> code <strong>simple</strong> <br> </p>
Cleipnir.NET
Cleipnir Flows is a powerful durable execution .NET framework - ensuring your code will always execute to completetion correctly.
- Makes C#-code behave correctly after a crash, restart or suspension
- Wait for external events directly inside your code
- Suspend code execution for minutes, hours, weeks or longer
- Requires only a database
- Use with ASP.NET / generic host service
- Integrates easily with all message-brokers and service-buses
- Removes need for saga-pattern and outbox-pattern
- Powerful alterrnative to job-schedulers (HangFire, Quartz)
Abstractions
Cleipnir.NET provides the following 3 abstractions:
Capture
Remembers the result of arbitary code:
var transactionId = await Capture("TransactionId", () => Guid.NewGuid());
//or simply
var transactionId = await Capture(Guid.NewGuid);
Messages
Wait for retrival of external message - without taking up resources:
var fundsReserved = await Messages<FundsReserved>(timesOutIn: TimeSpan.FromMinutes(5));
Suspension
Suspends the current execution at-will, resuming after some duration:
await Delay(TimeSpan.FromMinutes(5));
Examples
Message-brokered (source code):
[GenerateFlows]
public class OrderFlow(IBus bus) : Flow<Order>
{
public override async Task Run(Order order)
{
var transactionId = await Capture(Guid.NewGuid); //generated transaction id is fixed after this statement
await PublishReserveFunds(order, transactionId);
await Message<FundsReserved>(); //execution is suspended until a funds reserved message is received
await PublishShipProducts(order);
var trackAndTraceNumber = (await Message<ProductsShipped>()).TrackAndTraceNumber;
await PublishCaptureFunds(order, transactionId);
await Message<FundsCaptured>();
await PublishSendOrderConfirmationEmail(order, trackAndTraceNumber);
await Message<OrderConfirmationEmailSent>();
}
private Task PublishReserveFunds(Order order, Guid transactionId)
=> Capture(async () => await bus.Publish(new ReserveFunds(order.OrderId, order.TotalPrice, transactionId, order.CustomerId)));
}
RPC (source code):
[GenerateFlows]
public class OrderFlow(
IPaymentProviderClient paymentProviderClient,
IEmailClient emailClient,
ILogisticsClient logisticsClient
) : Flow<Order>
{
public override async Task Run(Order order)
{
var transactionId = Capture(() => Guid.NewGuid); //generated transaction id is fixed after this statement
await paymentProviderClient.Reserve(order.CustomerId, transactionId, order.TotalPrice);
var trackAndTrace = await Capture(
() => paymentProviderClient.Capture(transactionId),
ResiliencyLevel.AtMostOnce
); //external calls can also be captured - will never be called multiple times
await emailClient.SendOrderConfirmation(order.CustomerId, trackAndTrace, order.ProductIds);
}
}
What is durable execution?
Durable execution is an emerging paradigm for simplifying the implementation of code which can safely resume execution after a process crash or restart (i.e. after a production deployment). It allows the developer to implement such code using ordinary C#-code with loops, conditionals and so on.
Furthermore, durable execution allows suspending code execution for an arbitraty amount of time - thereby saving process resources.
Essentially, durable execution works by saving state at explicitly defined points during the invocation of code, thereby allowing the framework to skip over previously executed parts of the code when/if the code is re-executed. This occurs both after a crash and suspension.
Why durable execution?
Currently, implementing resilient business flows either entails (1) sagas (i.e. MassTransit, NServiceBus) or (2) job-schedulers (i.e. HangFire).
Both approaches have a unique set of challenges:
- Saga - becomes difficult to implement for real-world scenarios as they are either realized by declaratively constructing a state-machine or implementing a distinct message handler per message type.
- Job-scheduler - requires one to implement idempotent code by hand (in case of failure). Moreover, it cannot be integrated with message-brokers and does not support programmatically suspending a job in the middle of its execution.
Getting Started
To get started simply perform the following three steps in an ASP.NET or generic-hosted service (sample repo):
Firstly, install the Cleipnir.Flows nuget package (using either Postgres, SqlServer or MariaDB as persistence layer). I.e.
Install-Package Cleipnir.Flows.Postgres
Secondly, add the following to the setup in Program.cs
(source code):
builder.Services.AddFlows(c => c
.UsePostgresSqlStore(connectionString)
.RegisterFlowsAutomatically()
);
RPC Flows
Finally, implement your flow (source code):
[GenerateFlows]
public class OrderFlow(
IPaymentProviderClient paymentProviderClient,
IEmailClient emailClient,
ILogisticsClient logisticsClient
) : Flow<Order>
{
public override async Task Run(Order order)
{
var transactionId = Capture(() => Guid.NewGuid);
await paymentProviderClient.Reserve(order.CustomerId, transactionId, order.TotalPrice);
var trackAndTrace = await Capture(
() => paymentProviderClient.Capture(transactionId),
ResiliencyLevel.AtMostOnce
);
await emailClient.SendOrderConfirmation(order.CustomerId, trackAndTrace, order.ProductIds);
}
}
Message-brokered Flows
Or, if the flow is using a message-broker (source code):
[GenerateFlows]
public class OrderFlow(IBus bus) : Flow<Order>
{
public override async Task Run(Order order)
{
var transactionId = await Capture(Guid.NewGuid);
await PublishReserveFunds(order, transactionId);
await Message<FundsReserved>();
await PublishShipProducts(order);
var trackAndTraceNumber = (await Message<ProductsShipped>()).TrackAndTraceNumber;
await PublishCaptureFunds(order, transactionId);
await Message<FundsCaptured>();
await PublishSendOrderConfirmationEmail(order, trackAndTraceNumber);
await Message<OrderConfirmationEmailSent>();
}
The implemented flow can then be started using the corresponding source generated Flows-type (source code):
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
private readonly OrderFlows _orderFlows;
public OrderController(OrderFlows orderFlows) => _orderFlows = orderFlows;
[HttpPost]
public async Task Post(Order order) => await _orderFlows.Run(order.OrderId, order);
}
Media
Discord
Get live help at the Discord channel:
Service Bus Integrations
It is simple to use Cleipnir with all the popular service bus frameworks. In order to do simply implement an event handler - which forwards received events - for each flow type:
MassTransit Handler
public class SimpleFlowsHandler(SimpleFlows simpleFlows) : IConsumer<MyMessage>
{
public Task Consume(ConsumeContext<MyMessage> context)
=> simpleFlows.SendMessage(context.Message.Value, context.Message);
}
NServiceBus Handler
public class SimpleFlowsHandler(SimpleFlows flows) : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
=> flows.SendMessage(message.Value, message);
}
Rebus Handler
public class SimpleFlowsHandler(SimpleFlows simpleFlows) : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage msg) => simpleFlows.SendMessage(msg.Value, msg);
}
Wolverine Handler
public class SimpleFlowsHandler(SimpleFlows flows)
{
public Task Handle(MyMessage myMessage)
=> flows.SendMessage(myMessage.Value, myMessage);
}
Publish multiple messages in batch
await flows.SendMessages(batchedMessages);
Kafka Handler
ConsumeMessages(
batchSize: 10,
topic,
handler: messages =>
flows.SendMessages(
messages.Select(msg => new BatchedMessage(msg.Instance, msg)).ToList()
));
More examples
As an example is worth a thousand lines of documentation - various useful examples are presented in the following section:
Integration-test (source code):
var transactionId = Guid.NewGuid();
var usedTransactionId = default(Guid?);
var serviceProvider = new ServiceCollection()
.AddSingleton(new OrderFlow(
PaymentProviderClientTestStub.Create(reserve: (id, _, _) => { usedTransactionId = id; return Task.CompletedTask; }),
EmailClientStub.Instance,
LogisticsClientStub.Instance)
).BuildServiceProvider();
using var container = FlowsContainer.Create(serviceProvider);
var flows = new OrderFlows(container);
var testOrder = new Order("MK-54321", CustomerId: Guid.NewGuid(), ProductIds: [Guid.NewGuid()], TotalPrice: 120);
await flows.Run(
instanceId: testOrder.OrderId,
testOrder,
new InitialState(Messages: [], Effects: [new InitialEffect("TransactionId", transactionId)])
);
Assert.AreEqual(transactionId, usedTransactionId);
Avoid re-executing already completed code:
[GenerateFlows]
public class AtLeastOnceFlow : Flow<string, string>
{
private readonly PuzzleSolverService _puzzleSolverService = new();
public override async Task<string> Run(string hashCode)
{
var solution = await Effect.Capture(
id: "PuzzleSolution",
work: () => _puzzleSolverService.SolveCryptographicPuzzle(hashCode)
);
return solution;
}
}
Ensure code is executed at-most-once:
[GenerateFlows]
public class AtMostOnceFlow : Flow<string>
{
private readonly RocketSender _rocketSender = new();
public override async Task Run(string rocketId)
{
await Effect.Capture(
id: "FireRocket",
_rocketSender.FireRocket,
ResiliencyLevel.AtMostOnce
);
}
}
Wait for 2 external messages before continuing flow (source code):
[GenerateFlows]
public class WaitForMessagesFlow : Flow<string>
{
public override async Task Run(string param)
{
await Messages
.OfTypes<FundsReserved, InventoryLocked>()
.Take(2)
.Completion(maxWait: TimeSpan.FromSeconds(30));
System.Console.WriteLine("Complete order-processing");
}
}
When the max wait duration has passed the flow is automatically suspended in order to save resources. Thus, the flow can also be suspended immediately when all messages have not been received:
await Messages
.OfTypes<FundsReserved, InventoryLocked>()
.Take(2)
.Completion();
Emit a signal to a flow (source code):
var messagesWriter = flows.MessagesWriter(orderId);
await messagesWriter.AppendMessage(new FundsReserved(orderId), idempotencyKey: nameof(FundsReserved));
Restart a failed flow (source code):
var controlPanel = await flows.ControlPanel(flowId);
controlPanel!.Param = "valid parameter";
await controlPanel.ReInvoke();
Postpone a running flow (without taking in-memory resources) (source code):
[GenerateFlows]
public class PostponeFlow : Flow<string>
{
private readonly ExternalService _externalService = new();
public override async Task Run(string orderId)
{
if (await _externalService.IsOverloaded())
Postpone(delay: TimeSpan.FromMinutes(10));
//execute rest of the flow
}
}
Add metrics middleware (source code):
public class MetricsMiddleware : IMiddleware
{
private Action IncrementCompletedFlowsCounter { get; }
private Action IncrementFailedFlowsCounter { get; }
private Action IncrementRestartedFlowsCounter { get; }
public MetricsMiddleware(Action incrementCompletedFlowsCounter, Action incrementFailedFlowsCounter, Action incrementRestartedFlowsCounter)
{
IncrementCompletedFlowsCounter = incrementCompletedFlowsCounter;
IncrementFailedFlowsCounter = incrementFailedFlowsCounter;
IncrementRestartedFlowsCounter = incrementRestartedFlowsCounter;
}
public async Task<Result<TResult>> Run<TFlow, TParam, TResult>(
TParam param,
Context context,
Next<TFlow, TParam, TResult> next) where TParam : notnull
{
var started = workflow.Effect.TryGet<bool>(id: "Started", out _);
if (started)
IncrementRestartedFlowsCounter();
else
await workflow.Effect.Upsert("Started", true);
var result = await next(param, workflow);
if (result.Outcome == Outcome.Fail)
IncrementFailedFlowsCounter();
else if (result.Outcome == Outcome.Succeed)
IncrementCompletedFlowsCounter();
return result;
}
}
Distributed system challenges
When distributed systems needs to cooperator in order to fulfill some business process a system crash or restart may leave the system in an inconsistent state.
Consider the following order-flow:
public async Task ProcessOrder(Order order)
{
await _paymentProviderClient.Reserve(order.TransactionId, order.CustomerId, order.TotalPrice);
await _logisticsClient.ShipProducts(order.CustomerId, order.ProductIds);
await _paymentProviderClient.Capture(order.TransactionId);
await _emailClient.SendOrderConfirmation(order.CustomerId, order.ProductIds);
}
Currently, the flow is not resilient against crashes or restarts.
For instance, if the process crashes just before capturing the funds from the payment provider then the ordered products are shipped to the customer but nothing is deducted from the customer’s credit card. Not an ideal situation for the business. No matter how we rearrange the flow a crash might lead to either situation:
- products are shipped to the customer without payment being deducted from the customer’s credit card
- payment is deducted from the customer’s credit card but products are never shipped
Ensuring flow-restart on crashes or restarts:
Thus, to rectify the situation we must ensure that the flow is restarted if it did not complete in a previous execution.
RPC-solution
Consider the following Order-flow:
[GenerateFlows]
public class OrderFlow : Flow<Order>
{
private readonly IPaymentProviderClient _paymentProviderClient;
private readonly IEmailClient _emailClient;
private readonly ILogisticsClient _logisticsClient;
public OrderFlow(IPaymentProviderClient paymentProviderClient, IEmailClient emailClient, ILogisticsClient logisticsClient)
{
_paymentProviderClient = paymentProviderClient;
_emailClient = emailClient;
_logisticsClient = logisticsClient;
}
public async Task ProcessOrder(Order order)
{
Log.Logger.ForContext<OrderFlow>().Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' started");
var transactionId = Guid.Empty;
await _paymentProviderClient.Reserve(order.CustomerId, transactionId, order.TotalPrice);
await _logisticsClient.ShipProducts(order.CustomerId, order.ProductIds);
await _paymentProviderClient.Capture(transactionId);
await _emailClient.SendOrderConfirmation(order.CustomerId, order.ProductIds);
Log.Logger.ForContext<OrderFlow>().Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' completed");
}
}
Sometimes simply wrapping a business flow inside the framework is enough.
This would be the case if all the steps in the flow were idempotent. In that situation it is fine to call an endpoint multiple times without causing unintended side-effects.
At-least-once & Idempotency
However, in the order-flow presented here this is not the case.
The payment provider requires the caller to provide a transaction-id. Thus, the same transaction-id must be provided when re-executing the flow.
In Cleipnir this challenge is solved by wrapping non-determinism inside effects.
public async Task ProcessOrder(Order order)
{
Log.Logger.Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' started");
var transactionId = Effect.Capture("TransactionId", Guid.NewGuid);
await _paymentProviderClient.Reserve(transactionId, order.CustomerId, order.TotalPrice);
await _logisticsClient.ShipProducts(order.CustomerId, order.ProductIds);
await _paymentProviderClient.Capture(transactionId);
await _emailClient.SendOrderConfirmation(order.CustomerId, order.ProductIds);
Log.Logger.ForContext<OrderProcessor>().Information($"Processing of order '{order.OrderId}' completed");
}
At-most-once API:
For the sake of presenting the framework’s versatility let us assume that the logistics’ API is not idempotent and that it is out of our control to change that.
Thus, every time a successful call is made to the logistics service the content of the order is shipped to the customer.
As a result the order-flow must fail if it is restarted and:
- a request was previously sent to logistics-service
- but no response was received.
This can again be accomplished by using effects:
public async Task ProcessOrder(Order order)
{
Log.Logger.Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' started");
var transactionId = await Effect.Capture("TransactionId", Guid.NewGuid);
await _paymentProviderClient.Reserve(order.CustomerId, transactionId, order.TotalPrice);
await Effect.Capture(
id: "ShipProducts",
work: () => _logisticsClient.ShipProducts(order.CustomerId, order.ProductIds)
);
await _paymentProviderClient.Capture(transactionId);
await _emailClient.SendOrderConfirmation(order.CustomerId, order.ProductIds);
Log.Logger.ForContext<OrderProcessor>().Information($"Processing of order '{order.OrderId}' completed");
}
A failed/exception throwing flow is not automatically retried by the framework.
Instead, it must be manually restarted by using the flow's associated control-panel.
Control Panel:
Using the flow’s control panel both the parameter and scrapbook may be changed before the flow is retried.
For instance, assuming it is determined that the products where not shipped for a certain order, then the following code re-invokes the order with the state changed accordingly.
var controlPanel = await flows.ControlPanel(order.OrderId);
await controlPanel!.Effects.Remove("ShipProducts");
await controlPanel.ReInvoke();
Message-based Solution
Message- or event-driven system are omnipresent in enterprise architectures today.
They fundamentally differ from RPC-based in that:
- messages related to the same order are not delivered to the same process.
This has huge implications in how a flow must be implemented and as a result a simple sequential flow - as in the case of the order-flow:
- becomes fragmented and hard to reason about
- inefficient - each time a message is received the entire state must be reestablished
- inflexible
Cleipnir Flows takes a novel approach by piggy-backing on the features described so far and using event-sourcing and reactive programming together to form a simple and extremely useful abstraction.
As a result the order-flow can be implemented as follows:
public async Task ProcessOrder(Order order)
{
Log.Logger.Information($"ORDER_PROCESSOR: Processing of order '{order.OrderId}' started");
await _bus.Send(new ReserveFunds(order.OrderId, order.TotalPrice, Scrapbook.TransactionId, order.CustomerId));
await Messages.NextOfType<FundsReserved>();
await _bus.Send(new ShipProducts(order.OrderId, order.CustomerId, order.ProductIds));
await Messages.NextOfType<ProductsShipped>();
await _bus.Send(new CaptureFunds(order.OrderId, order.CustomerId, Scrapbook.TransactionId));
await Messages.NextOfType<FundsCaptured>();
await _bus.Send(new SendOrderConfirmationEmail(order.OrderId, order.CustomerId));
await Messages.NextOfType<OrderConfirmationEmailSent>();
Log.Logger.ForContext<OrderProcessor>().Information($"Processing of order '{order.OrderId}' completed");
}
There is a bit more going on in the example above compared to the previous RPC-example. However, the flow is actually very similar to RPC-based. It is sequential and robust. If the flow crashes and is restarted it will continue from the point it got to before the crash.
It is noted that the message broker in the example is just a stand-in - thus not a framework concept - for RabbitMQ, Kafka or some other messaging infrastructure client.
In a real application the message broker would be replaced with the actual way the application broadcasts a message/event to other services.
Furthermore, each flow has an associated private event source called Messages. When events are received from the network they can be placed into the relevant flow's event source - thereby allowing the flow to continue.
Did you know? |
---|
The framework allows awaiting events both in-memory or suspending the invocation until an event has been appended to the event source. </br>Thus, allowing the developer to find the sweet-spot per use-case between performance and releasing resources. |
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
-
net9.0
- Cleipnir.Flows (>= 4.1.5)
- Cleipnir.ResilientFunctions.PostgreSQL (>= 4.1.5)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
4.1.5 | 21 | 3/25/2025 |
4.1.4 | 76 | 3/21/2025 |
4.1.3 | 59 | 3/15/2025 |
4.1.2 | 123 | 3/9/2025 |
4.1.1 | 125 | 3/9/2025 |
4.0.12 | 94 | 2/14/2025 |
4.0.11 | 106 | 2/8/2025 |
4.0.10 | 92 | 2/3/2025 |
4.0.9 | 96 | 1/30/2025 |
4.0.8 | 87 | 1/26/2025 |
4.0.7 | 90 | 1/25/2025 |
4.0.6 | 84 | 1/22/2025 |
4.0.5 | 83 | 1/22/2025 |
4.0.4 | 79 | 1/16/2025 |
4.0.3 | 88 | 1/16/2025 |
4.0.2 | 77 | 1/12/2025 |
4.0.1 | 85 | 1/11/2025 |
4.0.0 | 97 | 1/5/2025 |
3.0.3 | 115 | 8/30/2024 |
3.0.2 | 95 | 7/25/2024 |
3.0.1 | 82 | 7/25/2024 |
3.0.0 | 106 | 7/14/2024 |
2.0.1 | 199 | 3/16/2024 |
1.0.16 | 422 | 9/2/2023 |
1.0.15 | 428 | 8/30/2023 |
1.0.14 | 540 | 8/13/2023 |
1.0.13 | 569 | 8/11/2023 |
1.0.12 | 537 | 8/2/2023 |
1.0.11 | 601 | 8/1/2023 |
1.0.9 | 553 | 7/31/2023 |
1.0.8 | 598 | 7/24/2023 |
1.0.7 | 557 | 7/19/2023 |
1.0.6 | 542 | 7/12/2023 |