Rivulet.Hosting 1.3.0

dotnet add package Rivulet.Hosting --version 1.3.0
                    
NuGet\Install-Package Rivulet.Hosting -Version 1.3.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Rivulet.Hosting" Version="1.3.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Rivulet.Hosting" Version="1.3.0" />
                    
Directory.Packages.props
<PackageReference Include="Rivulet.Hosting" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Rivulet.Hosting --version 1.3.0
                    
#r "nuget: Rivulet.Hosting, 1.3.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Rivulet.Hosting@1.3.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Rivulet.Hosting&version=1.3.0
                    
Install as a Cake Addin
#tool nuget:?package=Rivulet.Hosting&version=1.3.0
                    
Install as a Cake Tool

Rivulet.Hosting

Integration package for using Rivulet with Microsoft.Extensions.Hosting, ASP.NET Core, and the .NET Generic Host.

Features

  • Dependency Injection integration
  • Configuration binding for ParallelOptionsRivulet
  • Base classes for parallel background services
  • Health checks for monitoring parallel operations
  • Support for ASP.NET Core and Worker Services

Installation

dotnet add package Rivulet.Hosting

Quick Start

1. Configure Services

using Rivulet.Hosting;

var builder = WebApplication.CreateBuilder(args);

// Register Rivulet with configuration from appsettings.json
builder.Services.AddRivulet(builder.Configuration);

// Or configure manually
builder.Services.AddRivulet(options =>
{
    options.MaxDegreeOfParallelism = 10;
    options.RetryPolicy = new RetryPolicyOptions
    {
        MaxRetries = 3,
        BackoffType = BackoffType.Exponential
    };
});

var app = builder.Build();
app.Run();

2. Configuration Binding (appsettings.json)

{
  "Rivulet": {
    "MaxDegreeOfParallelism": 10,
    "MaxRetries": 3,
    "BaseDelay": "00:00:00.100",
    "BackoffStrategy": "ExponentialJitter",
    "PerItemTimeout": "00:00:30",
    "ErrorMode": "CollectAndContinue",
    "CircuitBreaker": {
      "FailureThreshold": 5,
      "SuccessThreshold": 2,
      "OpenTimeout": "00:00:30",
      "SamplingDuration": "00:01:00"
    },
    "RateLimit": {
      "TokensPerSecond": 100,
      "BurstCapacity": 200
    },
    "AdaptiveConcurrency": {
      "MinConcurrency": 1,
      "MaxConcurrency": 100,
      "TargetLatency": "00:00:00.100",
      "MinSuccessRate": 0.95
    }
  }
}

3. Named Configurations

Register multiple configurations for different use cases:

// Register named configurations
builder.Services.AddRivulet("HighThroughput", builder.Configuration);
builder.Services.AddRivulet("LowLatency", builder.Configuration);

// In appsettings.json
{
  "Rivulet": {
    "HighThroughput": {
      "MaxDegreeOfParallelism": 50,
      "RateLimit": {
        "TokensPerSecond": 500,
        "BurstCapacity": 1000
      }
    },
    "LowLatency": {
      "MaxDegreeOfParallelism": 5,
      "AdaptiveConcurrency": {
        "MinConcurrency": 1,
        "MaxConcurrency": 10,
        "TargetLatency": "00:00:00.100"
      }
    }
  }
}

// Use named options
public class MyService
{
    private readonly ParallelOptionsRivulet _options;

    public MyService(IOptionsSnapshot<ParallelOptionsRivulet> options)
    {
        _options = options.Get("HighThroughput");
    }
}

Background Services

ParallelBackgroundService

Simple background service for processing items one at a time:

public class DataProcessorService : ParallelBackgroundService<DataItem>
{
    private readonly IDataRepository _repository;

    public DataProcessorService(
        ILogger<DataProcessorService> logger,
        IDataRepository repository,
        IOptions<ParallelOptionsRivulet> options)
        : base(logger, options.Value)
    {
        _repository = repository;
    }

    protected override async IAsyncEnumerable<DataItem> GetItemsAsync(
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        await foreach (var item in _repository.GetPendingItemsAsync(cancellationToken))
        {
            yield return item;
        }
    }

    protected override async Task ProcessItemAsync(DataItem item, CancellationToken cancellationToken)
    {
        // Process single item
        await _repository.ProcessAsync(item, cancellationToken);
    }
}

// Register the service
builder.Services.AddHostedService<DataProcessorService>();

ParallelWorkerService

Advanced background service with parallel processing and result handling:

public class ImageProcessingWorker : ParallelWorkerService<ImageJob, ProcessedImage>
{
    private readonly IImageService _imageService;
    private readonly IStorageService _storage;

    public ImageProcessingWorker(
        ILogger<ImageProcessingWorker> logger,
        IImageService imageService,
        IStorageService storage,
        IOptions<ParallelOptionsRivulet> options)
        : base(logger, options.Value)
    {
        _imageService = imageService;
        _storage = storage;
    }

    protected override async IAsyncEnumerable<ImageJob> GetSourceItems(
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        // Poll for new jobs every 5 seconds
        while (!cancellationToken.IsCancellationRequested)
        {
            var jobs = await _imageService.GetPendingJobsAsync(cancellationToken);

            foreach (var job in jobs)
            {
                yield return job;
            }

            await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
        }
    }

    protected override async Task<ProcessedImage> ProcessAsync(
        ImageJob job,
        CancellationToken cancellationToken)
    {
        // Download and process image
        var imageData = await _storage.DownloadAsync(job.ImageUrl, cancellationToken);
        var processed = await _imageService.ProcessImageAsync(imageData, job.Options, cancellationToken);

        return new ProcessedImage
        {
            JobId = job.Id,
            Data = processed,
            ProcessedAt = DateTime.UtcNow
        };
    }

    protected override async Task OnResultAsync(
        ProcessedImage result,
        CancellationToken cancellationToken)
    {
        // Save processed image
        await _storage.UploadAsync(result.Data, $"processed/{result.JobId}", cancellationToken);
        await _imageService.MarkCompletedAsync(result.JobId, cancellationToken);
    }
}

// Register the service
builder.Services.AddHostedService<ImageProcessingWorker>();

Health Checks

Monitor your parallel operations with built-in health checks:

using Rivulet.Diagnostics;

// Register PrometheusExporter and health checks
builder.Services.AddSingleton<PrometheusExporter>();
builder.Services.AddHealthChecks()
    .AddCheck<RivuletHealthCheck>(
        "rivulet",
        tags: new[] { "ready" });

// Configure health check options
builder.Services.Configure<RivuletHealthCheckOptions>(options =>
{
    options.ErrorRateThreshold = 0.1;      // 10% error rate triggers degraded status
    options.FailureCountThreshold = 100;   // 100 failures triggers unhealthy status
});

// Health check automatically monitors metrics from Rivulet operations
// No manual recording needed - metrics are captured via EventCounters

// Expose health check endpoint
app.MapHealthChecks("/health");

// Example output:
// Healthy: "Rivulet operations healthy: 950/1000 completed, 50 failures"
// Degraded: "Error rate (15.00%) exceeds threshold (10.00%)"
// Unhealthy: "Failure count (150) exceeds threshold (100)"

ASP.NET Core Integration

Use Rivulet in your ASP.NET Core controllers and minimal APIs:

// In a controller
[ApiController]
[Route("api/[controller]")]
public class DataController : ControllerBase
{
    private readonly ParallelOptionsRivulet _options;

    public DataController(IOptions<ParallelOptionsRivulet> options)
    {
        _options = options.Value;
    }

    [HttpPost("process")]
    public async Task<IActionResult> ProcessItems([FromBody] List<DataItem> items)
    {
        var results = await items
            .ToAsyncEnumerable()
            .SelectParallelStreamAsync(
                async (item, ct) => await ProcessItemAsync(item, ct),
                _options,
                HttpContext.RequestAborted)
            .ToListAsync(HttpContext.RequestAborted);

        return Ok(results);
    }
}

// In minimal APIs
app.MapPost("/api/batch", async (
    List<DataItem> items,
    IOptions<ParallelOptionsRivulet> options,
    CancellationToken ct) =>
{
    var results = await items
        .ToAsyncEnumerable()
        .SelectParallelStreamAsync(
            async (item, token) => await ProcessItemAsync(item, token),
            options.Value,
            ct)
        .ToListAsync(ct);

    return Results.Ok(results);
});

Worker Service Example

Complete example of a .NET Worker Service:

// Program.cs
using Rivulet.Hosting;

var builder = Host.CreateApplicationBuilder(args);

// Configure Rivulet
builder.Services.AddRivulet(builder.Configuration);

// Register background services
builder.Services.AddHostedService<DataSyncWorker>();
builder.Services.AddHostedService<NotificationWorker>();

// Add health checks
builder.Services.AddHealthChecks()
    .AddCheck<RivuletOperationHealthCheck>("rivulet");

var host = builder.Build();
host.Run();

// DataSyncWorker.cs
public class DataSyncWorker : ParallelWorkerService<SyncJob, SyncResult>
{
    private readonly IDataService _dataService;

    public DataSyncWorker(
        ILogger<DataSyncWorker> logger,
        IDataService dataService,
        IOptions<ParallelOptionsRivulet> options)
        : base(logger, options.Value)
    {
        _dataService = dataService;
    }

    protected override async IAsyncEnumerable<SyncJob> GetSourceItems(
        [EnumeratorCancellation] CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var jobs = await _dataService.GetPendingSyncJobsAsync(cancellationToken);

            foreach (var job in jobs)
            {
                yield return job;
            }

            if (jobs.Count == 0)
            {
                await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
            }
        }
    }

    protected override async Task<SyncResult> ProcessAsync(
        SyncJob job,
        CancellationToken cancellationToken)
    {
        var data = await _dataService.FetchDataAsync(job.Source, cancellationToken);
        await _dataService.SyncToDestinationAsync(job.Destination, data, cancellationToken);

        return new SyncResult
        {
            JobId = job.Id,
            RecordsSynced = data.Count,
            CompletedAt = DateTime.UtcNow
        };
    }

    protected override async Task OnResultAsync(
        SyncResult result,
        CancellationToken cancellationToken)
    {
        await _dataService.UpdateJobStatusAsync(result.JobId, "Completed", cancellationToken);
    }
}

Best Practices

1. Use Dependency Injection

Always inject IOptions<ParallelOptionsRivulet> to access configuration:

public class MyService
{
    private readonly ParallelOptionsRivulet _options;

    public MyService(IOptions<ParallelOptionsRivulet> options)
    {
        _options = options.Value;
    }
}

2. Graceful Shutdown

Background services automatically handle cancellation. Always respect the CancellationToken:

protected override async Task ProcessItemAsync(DataItem item, CancellationToken cancellationToken)
{
    // Check cancellation frequently
    cancellationToken.ThrowIfCancellationRequested();

    await LongRunningOperationAsync(item, cancellationToken);
}

3. Error Handling

Use health checks and logging for monitoring:

protected override async Task<Result> ProcessAsync(Job job, CancellationToken cancellationToken)
{
    try
    {
        var result = await ProcessJobAsync(job, cancellationToken);
        _healthCheck.RecordSuccess();
        return result;
    }
    catch (Exception ex)
    {
        _logger.LogError(ex, "Failed to process job {JobId}", job.Id);
        _healthCheck.RecordFailure();
        throw;
    }
}

4. Configuration Management

Use different configurations for different environments:

{
  "Rivulet": {
    "MaxDegreeOfParallelism": 10,
    "MaxRetries": 3,
    "BaseDelay": "00:00:00.100"
  }
}

// appsettings.Production.json
{
  "Rivulet": {
    "MaxDegreeOfParallelism": 50,
    "MaxRetries": 5,
    "BaseDelay": "00:00:00.100",
    "RateLimit": {
      "TokensPerSecond": 1000,
      "BurstCapacity": 2000
    }
  }
}

5. Resource Management

Configure appropriate parallelism based on workload:

  • CPU-bound: MaxDegreeOfParallelism = Environment.ProcessorCount
  • I/O-bound: MaxDegreeOfParallelism = Environment.ProcessorCount * 2 or higher
  • Rate-limited: Use RateLimit options to respect external API limits

API Reference

ServiceCollectionExtensions

  • AddRivulet(IConfiguration) - Register from configuration
  • AddRivulet(Action<ParallelOptionsRivulet>) - Register with action
  • AddRivulet(string, IConfiguration) - Register named configuration

ParallelBackgroundService<T>

  • GetItemsAsync(CancellationToken) - Override to provide data source
  • ProcessItemAsync(T, CancellationToken) - Override to process items

ParallelWorkerService<TSource, TResult>

  • GetSourceItems(CancellationToken) - Override to provide source stream
  • ProcessAsync(TSource, CancellationToken) - Override to process items
  • OnResultAsync(TResult, CancellationToken) - Override to handle results

RivuletHealthCheck (from Rivulet.Diagnostics)

  • CheckHealthAsync(HealthCheckContext, CancellationToken) - Check health status based on metrics
  • Automatically monitors Rivulet operations via EventCounters
  • Requires PrometheusExporter dependency for metric collection

Contributing

Contributions are welcome! Please see the main Rivulet repository for guidelines.

License

MIT License - see LICENSE file for details


Made with ❤️ by Jeffeek | NuGet | GitHub

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.3.0 170 12/13/2025
1.3.0-beta 419 12/8/2025
1.3.0-alpha 307 12/8/2025
1.2.0 411 11/19/2025