DataflowBuilder 0.0.0

dotnet add package DataflowBuilder --version 0.0.0                
NuGet\Install-Package DataflowBuilder -Version 0.0.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DataflowBuilder" Version="0.0.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add DataflowBuilder --version 0.0.0                
#r "nuget: DataflowBuilder, 0.0.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install DataflowBuilder as a Cake Addin
#addin nuget:?package=DataflowBuilder&version=0.0.0

// Install DataflowBuilder as a Cake Tool
#tool nuget:?package=DataflowBuilder&version=0.0.0                

dataflow-builder

TPL Dataflow is a nice library to build pipeline to process data in memory. The downside is that it's cumbersome to construct a pipeline that is readable and consistent. This library aims to reduce the friction of constructing the pipeline by providing methods that is type safe and intuitive for consumers to use.

Usage

A simple usage can be like this:

var pipelineBlockOpts = new PipelineBlockOptions<ExecutionDataflowBlockOptions>
{
  BlockOptions = new() { MaxDegreeOfParallelism = 1 },
  LinkOptions = new() { PropagateCompletion = true }
};

// <string> is the type that the pipeline accepts
var pipeline = new Pipeline<string>("test-pipeline");
pipeline
  .AddFirstBlock(int.Parse) // Convert string to int
  .AddBlock(number => number * 2, pipelineBlockOpts) // Double each int
  .AddBlock(number => number.ToString(), pipelineBlockOpts) // Convert int back to string
  .AddManyBlock(str => str.ToArray(), pipelineBlockOpts) // Convert string to char array & each char is sent to next block
  .AddLastBlock(character => Console.WriteLine($"OUTPUT: {character}"), pipelineBlockOpts); // Print to console

var runner = pipeline.Build();
await runner.ExecuteAsync(["10", "20"]);

Async

For async operation, use AddAsyncBlock(). You can mix both async and sync operation to build the pipeline. But you can only have either sync or async first block AND either sync or async last block.

var pipelineBlockOpts = new PipelineBlockOptions<ExecutionDataflowBlockOptions>
{
  BlockOptions = new() { MaxDegreeOfParallelism = 1 },
  LinkOptions = new() { PropagateCompletion = true }
};

// <string> is the type that the pipeline accepts
var pipeline = new Pipeline<string>("test-pipeline");
pipeline
  .AddFirstBlock(int.Parse)
  .AddBlock(number => number * 2, pipelineBlockOpts)
  .AddAsyncBlock(async number =>
  {
    await Task.Delay(1000);
    return number.ToString();
  }, pipelineBlockOpts)
  .AddLastAsyncBlock(async str =>
  {
    await Task.Delay(1000);
    Console.WriteLine($"OUTPUT: {str}");
  }, pipelineBlockOpts);

var runner = pipeline.Build();
await runner.ExecuteAsync(["1", "2"]);

Normally if you have a pipeline like this:

(string -> Task<int>) => (Task<int> -> void)

where the output type of the first block is the type of the next block. However, it can be not as "fluent" when building this pipeline with Task. So this library includes the AddAsyncBlock that "unwraps" the task for you, and allows the type inside the Task to be carried to the next block. For example, the above example now become:

(string -> Task<int>) => (int -> void)

But if you do need to return a Task and the next block does need to accept a Task, then you can use AddBlock with the parameter allowTaskOutput=true.

Fork

This library supports "branching" off a pipeline into multiple pipelines by callling Fork():

var pipelineBlockOpts = new PipelineBlockOptions<ExecutionDataflowBlockOptions>
{
  BlockOptions = new() { MaxDegreeOfParallelism = 1 },
  LinkOptions = new() { PropagateCompletion = true }
};

var evenPipeline = new Pipeline<int>("even");
evenPipeline
  .AddFirstBlock(number => number)
  .AddLastBlock(number => Console.WriteLine($"EVEN: {number}"), pipelineBlockOpts);

var oddPipeline = new Pipeline<int>("odd");
oddPipeline
  .AddFirstBlock(number => number)
  .AddLastBlock(number => Console.WriteLine($"ODD: {number}"), pipelineBlockOpts);

var pipeline = new Pipeline<string>("test");
pipeline
  .AddFirstBlock(int.Parse)
  .Fork()
    // If it's even number, deliver the number to evenPipeline
    .Branch(number => number % 2 == 0, evenPipeline, pipelineBlockOpts.LinkOptions)
    // For other value, deliver to oddPipeline
    .Default(oddPipeline, pipelineBlockOpts.LinkOptions);

var runner = pipeline.Builder();
await runner.ExecuteAsync(["1", "2", "3"]);

Broadcast

This library supports "broadcasting" which is delivering the same value to multiple branch pipelines by callling Broadcast():

var pipelineBlockOpts = new PipelineBlockOptions<ExecutionDataflowBlockOptions>
{
  BlockOptions = new() { MaxDegreeOfParallelism = 1 },
  LinkOptions = new() { PropagateCompletion = true }
};

var broadcastBlockOpts = new PipelineBlockOptions<DataflowBlockOptions>
{
  BlockOptions = new() { MaxDegreeOfParallelism = 1 },
  LinkOptions = new() { PropagateCompletion = true }
};

var fbPipeline = new Pipeline<int>("fb");
fbPipeline
  .AddFirstBlock(number => number)
  .AddLastBlock(number => Console.WriteLine($"FB: {number}"), pipelineBlockOpts);

var twitterPipeline = new Pipeline<int>("twitter");
twitterPipeline
  .AddFirstBlock(number => number)
  .AddLastBlock(number => Console.WriteLine($"TWITTER: {number}"), pipelineBlockOpts);

var pipeline = new Pipeline<string>("test");
pipeline
  .AddFirstBlock(int.Parse)
  .Broadcast(null, broadcastBlockOpts)
    // For every .Branch(), the same value will be deliver to every branch pipeline
    .Branch(fbPipeline, pipelineBlockOpts.LinkOptions)
    .Branch(twitterPipeline, pipelineBlockOpts.LinkOptions);

var runner = pipeline.Builder();
await runner.ExecuteAsync(["1", "2", "3"]);

Export for Visualization

To visualize your pipeline, use the method pipeline.ExportAsync(<PipelineExporter>). You can define your own exporter and pass it to the method ExportAsync. This libary ships with GraphvizExporter to make it easy for you to get started.

var pipelineBlockOpts = new PipelineBlockOptions
{
  BlockOptions = new() { MaxDegreeOfParallelism = 1 },
  LinkOptions = new() { PropagateCompletion = true }
};

var fbEvenPipeline = new Pipeline<int>("fb-even");
fbEvenPipeline
  .AddFirstBlock(number => number)
  .AddLastBlock(number => Console.WriteLine($"fb-even: {number}"), pipelineBlockOpts);

var fbOddPipeline = new Pipeline<int>("fb-odd");
fbOddPipeline
  .AddFirstBlock(number => number)
  .AddLastBlock(number => Console.WriteLine($"fb-odd: {number}"), pipelineBlockOpts);

var fbPipeline = new Pipeline<int>("fb");
fbPipeline
  .AddFirstBlock(number => number)
  .Fork()
    .Branch(number => number % 2 == 0, fbEvenPipeline, pipelineBlockOpts.LinkOptions)
    .Default(fbOddPipeline, pipelineBlockOpts.LinkOptions);

var twitterPipeline = new Pipeline<int>("twitter");
twitterPipeline
  .AddFirstBlock(number => number)
  .AddLastBlock(number => Console.WriteLine($"TWITTER: {number}"), pipelineBlockOpts);

var pipeline = new Pipeline<string>("test");
pipeline
  .AddFirstBlock(int.Parse)
  .Broadcast(null, pipelineBlockOpts)
    .Branch(fbPipeline, pipelineBlockOpts.LinkOptions)
    .Branch(twitterPipeline, pipelineBlockOpts.LinkOptions);

// Copy the output and visualize it in https://magjac.com/graphviz-visual-editor/
System.Console.WriteLine(await pipeline.ExportAsync(new GraphvizExporter()));

This is the visualize of the pipeline above. Image

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.0.0 52 1/11/2025