FluentWorkflow.Core 1.3.2

There is a newer version of this package available.
See the version list below for details.
dotnet add package FluentWorkflow.Core --version 1.3.2                
NuGet\Install-Package FluentWorkflow.Core -Version 1.3.2                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="FluentWorkflow.Core" Version="1.3.2" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add FluentWorkflow.Core --version 1.3.2                
#r "nuget: FluentWorkflow.Core, 1.3.2"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install FluentWorkflow.Core as a Cake Addin
#addin nuget:?package=FluentWorkflow.Core&version=1.3.2

// Install FluentWorkflow.Core as a Cake Tool
#tool nuget:?package=FluentWorkflow.Core&version=1.3.2                

FluentWorkflow

A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。

1. Intro

基于消息驱动的分布式异步工作流程处理框架,使用 SourceGenerator 简化开发中的重复工作。

使用场景

  • 典型的消息驱动处理流程

sample_message_driven_work_flow

在典型的消息驱动处理流程中,阶段的开始消息结束消息、各个消息的触发都需要手动定义,这些多数属于重复工作,FluentWorkflow是为了减少这些重复劳动而诞生的

BreakChanges

  • 已在包 FluentWorkflow.RabbitMQ 启用 RabbitMQ.Client7.0 版本支持,6.* 版本支持使用包 FluentWorkflow.RabbitMQ.Legacy

2. Features

  • 基础代码自动生成,开发时只需要关注业务;
  • 跨实例、跨服务工作流程驱动;
  • 灵活的子工作流程等待/工作流程嵌套;
  • 灵活的拓展性(partial/继承);
  • Diagnostic支持;
  • 目标框架 net7.0+;
  • *针对单个消息类型的Qos;

NOTE:

  • 更新包时应当尽可能全链路更新,避免导致的未知问题;
  • WorkflowContext 核心为 字符串字典 其属性在赋值时进行序列化存放,对象后续的修改不会反应到上下文中;
  • Workflow 中重写各个阶段的触发事件方法时,方法内不能往外抛出异常,会导致该阶段消息重新进入队列,再次执行;
  • 默认分发器 FluentWorkflow.RabbitMQ 依赖 交换机队列 进行消息分发,当存在多套环境需要隔离时,确保 交换机队列 都不相同,否则将会出现消息重复消费;
  • 默认分发器 FluentWorkflow.RabbitMQ绑定信息交换机队列)变更时不能完全自动调整,需要人工修正,如手动移除队列错误的交换机绑定RoutingKey绑定,否则将会出现消息重复消费;
  • 框架暂时没有保证消息可靠性,即在消息队列中间件异常的情况下可能会出现流程中断、重复消费等情况;

3. 开始使用

3.1 引用 FluentWorkflow.Core

<ItemGroup>
  <PackageReference Include="FluentWorkflow.Core" Version="1.3.1" />
</ItemGroup>

3.2 定义工作流程

声明一个工作流程类
public partial class SampleWorkflow : IWorkflow
{}
  • 声明类型为 partial;
  • 继承接口 IWorkflow;

此时代码生成器会自动为其继承基类,手动实现基类并定义工作流程
public partial class SampleWorkflow : IWorkflow
{
    public SampleWorkflow(SampleWorkflowContext context, IServiceProvider serviceProvider) : base(context, serviceProvider)
    {
    }

    protected override void BuildStages(ISampleWorkflowStageBuilder stageBuilder)
    {
        stageBuilder.Begin()
                    .Then("SampleStage1")
                    .Then("SampleStage2")
                    .Then("SampleStage3")
                    .Completion();
    }
}

到此一个工作流程就声明完成了,该工作流程名为SampleWorkflow,包含三个阶段 SampleStage1SampleStage2SampleStage3

  • 工作流程在BuildStages方法中使用参数stageBuilder定义,必须链式调用,由Begin()开始Completion()结束,使用Then("StageName")声明每个阶段,声明顺序即为阶段顺序,阶段名称必须满足C#标识符命名规则和约定
  • 代码生成器已为工作流程生成了必要的工作代码:
    • 工作流程上下文 SampleWorkflowContext (模板:{WorkflowName}Context)
    • 工作流程消息 - 每个阶段的开始完成消息等 (模板:{WorkflowName}{StageName}(Stage|StageCompleted)Message)
    • 阶段处理器基类 SampleWorkflowSampleStage1StageHandlerBaseSampleWorkflowSampleStage2StageHandlerBaseSampleWorkflowSampleStage3StageHandlerBase (模板:{WorkflowName}{StageName}StageHandlerBase)
    • *其它相关支撑类型

3.3 实现阶段处理器

继承对应的阶段处理器基类,并实现各个阶段处理逻辑
// SampleStage2 与 SampleStage3 同理
public class SampleWorkflowSampleStage1StageHandler : SampleWorkflowSampleStage1StageHandlerBase
{
    public SampleWorkflowSampleStage1StageHandler(IServiceProvider serviceProvider) : base(serviceProvider)
    {
    }

    protected override Task ProcessAsync(ProcessContext processContext, SampleWorkflowSampleStage1StageMessage stageMessage, CancellationToken cancellationToken)
    {
        //TODO 阶段业务逻辑
        return Task.CompletedTask;
    }
}

3.4 配置服务

配置控制服务
services.AddFluentWorkflow()
        .AddSampleWorkflowScheduler()   //添加工作流程调度器
        .AddSampleWorkflowResultObserver()          //添加结果观察器
        .UseInMemoryWorkflowMessageDispatcher();    //配置使用的消息分发器,这里使用基于内存的分发器来示范

配置阶段处理服务
services.AddFluentWorkflow()
        .AddSampleWorkflowSampleStage1StageHandler<SampleWorkflowSampleStage1StageHandler>()    //添加对应阶段的处理器, SampleStage2 与 SampleStage3 同理
        .UseInMemoryWorkflowMessageDispatcher();    //配置使用的消息分发器,这里使用基于内存的分发器来示范

FluentWorkflow正常工作的必要条件:

  • 流程中的所有服务使用同一套消息分发器;
  • 有且仅配置了一个(单个服务,可多实例)工作流程调度器 - WorkflowScheduler;
  • 所有阶段的阶段处理器 - StageHandler,各个阶段的阶段处理器有且仅有一个(单个服务,可多实例);
  • *需要等待子工作流程时必须配置子工作流程结果观察器 - ResultObserver;
  • *需要单次等待多个子工作流程时,必须使用支持等待多个子工作流程IWorkflowAwaitProcessor; (默认实现了基于redis的多流程等待处理器,配置时使用UseRedisWorkflowAwaitProcessor方法以启用)

3.5 启动工作流程

//从DI容器中获取工作流程构建器
var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
//创建工作流程上下文
var context = new SampleWorkflowContext();
//构建工作流程
var workflow = workflowBuilder.Build(context);
//启动工作流程,框架会自动触发各个阶段处理器后完成
await workflow.StartAsync(default);

4. 注意事项

  • 启动工作流程的服务可以不是配置工作流程调度器 - WorkflowScheduler的服务,但需要接入消息分发器并在配置时使用 Add****Workflow() 添加对应的工作流程构造器;
  • 源代码生成器生成的绝大部分类型都是partial的,可以声明partial类进行拓展,不可使用partial类拓展的功能基本上都可以继承后重写,在配置服务时替换默认实现即可;
  • 定义的 Workflow 类会添加生命周期各个阶段的触发事件方法,可以继承后重写其逻辑以在各个阶段执行相关的逻辑(注意每次触发可能不在同一个服务实例中。重写后应当捕获并处理所有异常,不要抛出);
  • WorkflowContext 核心为字符串字典,对其修改理论上只对后续可见并在整个执行周期可用,可以将执行参数、结果、中间值等存放其中;
  • 消息的分发、重试等逻辑由具体使用的消息分发器IWorkflowMessageDispatcher控制(默认提供了基于CAPAbp以及基础的FluentWorkflow.RabbitMQ可选);
  • 默认情况下 StageHandler 出现异常则认为工作流程失败,不会将异常抛给上层 IWorkflowMessageDispatcher(消息分发的重试不会触发),可以重写 StageHandlerOnException 方法来将异常向上抛出;
  • 更改既有工作流程时,如果修改/删除了既有的阶段定义,会导致还在处理过程中工作流程无法正常运行(但添加不会影响);

5. 其它

5.1 生成拓展功能代码

部分功能为源码接入的方式,默认不生成,在项目中指定需要的功能后自动生成

<PropertyGroup>
  <FluentWorkflowGeneratorAdditional>AbpFoundation,CAPFoundation,AbpMessageDispatcher,CAPMessageDispatcher,RedisAwaitProcessor</FluentWorkflowGeneratorAdditional>
</PropertyGroup>
名称 功能
AbpFoundation Abp的基础功能支持
CAPFoundation CAP的基础功能支持
AbpMessageDispatcher Abp的消息分发器
CAPMessageDispatcher CAP的消息分发器
RedisAwaitProcessor 基于StackExchange.Redis的子流程等待处理器
  • 生成的可能冲突的类型会放到命名空间 FluentWorkflow.GenericExtension.{工作流程命名空间} 下,如配置拓展方法等;

5.2 使用默认分发器 FluentWorkflow.RabbitMQ

引用 FluentWorkflow.RabbitMQ
<ItemGroup>
    <PackageReference Include="FluentWorkflow.RabbitMQ" Version="1.3.1" />
</ItemGroup>
配置
services.AddFluentWorkflow()
        .UseRabbitMQMessageDispatcher(options =>
        {
            //配置RabbitMQ
        });

*控制指定消息的消费速率

配置单个消息的消费速率,其它消息不受限

services.Configure<RabbitMQOptions>(options =>
{
    //配置阶段Stage1的消息 - SampleWorkflowSampleStage1StageMessage 的消费速率,即当前服务实例同时只会有一个阶段Stage1在处理
    options.Option<SampleWorkflowSampleStage1StageMessage>(static handleOptions =>
    {
        handleOptions.Qos = 1;
    });
});
*消息确认超时

RabbitMQ消息的消费ack超时时间默认为30分钟,进行长时间处理时可能会出现意外情况,可参照 acknowledgement-timeout 进行调整

  • 框架已默认尝试设置队列参数 x-consumer-timeout 为 1 小时(如果RabbitMQ版本支持的话);
  • 可使用 RabbitMQOptions.QueueArgumentsSetup 对队列的 x-consumer-timeout 参数进行调整;

5.3 子工作流程等待

在阶段处理器中实现子工作流程等待逻辑

internal class SampleWorkflowSampleStage1StageHandler : SampleWorkflowSampleStage1StageHandlerBase
{
    public SampleWorkflowSampleStage1StageHandler(IServiceProvider serviceProvider) : base(serviceProvider)
    {
    }

    protected async override Task ProcessAsync(ProcessContext processContext, SampleWorkflowSampleStage1StageMessage stageMessage, CancellationToken cancellationToken)
    {
        //构建子工作流程
        var workflowBuilder = ServiceProvider.GetRequiredService<IWorkflowBuilder<SampleWorkflow>>();
        var workflow = workflowBuilder.Build(new SampleWorkflowContext());
        //Other workflow setting

        //将未启动的子工作流程传递给当前阶段处理上下文,并命名为 - 'taskName'
        processContext.AwaitChildWorkflow("taskName", workflow);

        // Other logic

        //当前阶段将等待子工作流程处理完成后再完成
    }

    protected override async Task OnAwaitFinishedAsync(SampleWorkflowContext context, IReadOnlyDictionary<string, IWorkflowContext?> childWorkflowContexts, CancellationToken cancellationToken)
    {
        //从等待的子工作流程上下文字典中取出 - 'taskName'
        var workflowContext = (SampleWorkflowContext)childWorkflowContexts["taskName"];

        //处理子工作流程结果,如将 workflowContext 内的结果赋值给 context,以便在当前工作流程的后续阶段中使用等

        await base.OnAwaitFinishedAsync(context, childWorkflowContexts, cancellationToken);

        //当前阶段将完成
    }
}

5.4 启用Diagnostic支持

services.AddFluentWorkFlow().EnableDiagnostic();

6 流程的中止、挂起与恢复

6.1 中止流程

WorkFlowOn*StageAsyncOn*StageCompletedAsync 中不执行参数委托 fireMessage,则后续流程不再执行

6.2 流程挂起

WorkFlowOn*StageAsyncOn*StageCompletedAsync 中不执行参数委托 fireMessage,中止流程,在此基础上调用 SerializeContext 方法将上下文序列化后存放

// 存放 contextData 以用于流程恢复
var contextData = SerializeContext(message.Context);

6.3 流程恢复

调用具体 WorkFlow 的静态方法 ResumeAsync 使用挂起的流程数据进行恢复执行

// contextData 为序列化的上下文数据
await XXXXWorkflow.ResumeAsync(contextData, serviceProvider, cancellationToken)
注意:

恢复流程将会再次调用序列化上下文时的方法,需要注意,小心再次被挂起


更多信息参见源码内的测试代码

Product Compatible and additional computed target framework versions.
.NET net7.0 is compatible.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on FluentWorkflow.Core:

Package Downloads
FluentWorkflow.RabbitMQ

A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。

FluentWorkflow.RabbitMQ.Legacy

A message driven distributed asynchronous workflow framework. 消息驱动的分布式异步工作流程处理框架。

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.4.3 435 12/17/2024
1.4.2 93 12/16/2024
1.4.1 347 12/5/2024
1.4.0 274 12/4/2024
1.3.5 147 12/2/2024
1.3.4 243 11/23/2024
1.3.3 112 11/22/2024
1.3.2 114 11/17/2024
1.3.1 113 11/16/2024
1.3.0 128 11/12/2024
1.2.0 148 7/18/2024
1.1.6 504 7/1/2024
1.1.5 156 6/16/2024
1.1.4 148 6/13/2024
1.1.3 134 6/2/2024
1.1.2 832 5/24/2024
1.1.1 123 5/24/2024
1.1.0 129 5/19/2024
1.0.0 150 3/22/2024
1.0.0-preview-007 119 2/26/2024
1.0.0-preview-006 240 11/15/2023
1.0.0-preview-005 161 10/23/2023
1.0.0-preview-004 333 10/17/2023
1.0.0-preview-003 568 7/2/2023
1.0.0-preview-002 158 6/13/2023