Influx.SDK.Consumer 0.3.0-preview-005

This is a prerelease version of Influx.SDK.Consumer.
dotnet add package Influx.SDK.Consumer --version 0.3.0-preview-005
NuGet\Install-Package Influx.SDK.Consumer -Version 0.3.0-preview-005
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Influx.SDK.Consumer" Version="0.3.0-preview-005" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add Influx.SDK.Consumer --version 0.3.0-preview-005
#r "nuget: Influx.SDK.Consumer, 0.3.0-preview-005"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install Influx.SDK.Consumer as a Cake Addin
#addin nuget:?package=Influx.SDK.Consumer&version=0.3.0-preview-005&prerelease

// Install Influx.SDK.Consumer as a Cake Tool
#tool nuget:?package=Influx.SDK.Consumer&version=0.3.0-preview-005&prerelease

Influx Consumer SDK

This is the Consumer SDK for Influx. It provides a way to consume messages from the platform without having to deal with the underlying gRPC communication.

Note: This project is still in development and should be considered experimental. It is not recommended to use it in a production environment.

Installation

The package is available from NuGet. To install it, run the following command:

dotnet add package Influx.SDK.Consumer --prerelease

Usage

To handle data received from Influx, you will need to create a message processor class that implements the Influx.Pulse.IMessageProcessor interface. The only thing that is required is the ExecuteAsync method, which will be called for each message received from the platform.

public class MyMessageProcessor : IMessageProcessor
{
    public RetryOptions? Options => null;

    public Task ExecuteAsync( Message message, CancellationToken cancellationToken )
    {
        // your code here
    }
}

Next you can register the consumer service along with your processor. This will set up a gRPC client and a background service that will automatically connect to Influx and consume data from it, triggering your processor for each message received.

IServiceCollection services = ...;

services.AddGrpcConsumer<MyMessageProcessor>( options =>
{
    options.Url = "https://influx.example.com";
    options.Group = "default";
    options.Key = "consumer-key";  // required if the host is secured
} );

Handling acknowledgements

The acknowledgement of a message is done automatically by the SDK, depending on the result of the execution of your processor. If it does not throw an exception, the message is considered to be acknowledged and an ack instruction is sent to the platform. If however, the processor throws an exception, the message is considered to be not acknowledged and a nack instruction is sent instead.

By implementing the RetryOptions property in the processor, it's possible to control how many times and how long after a message is resent to the consumer. By default (if the RetryOptions property returns null), the message will be resent indefinitely, with a 1 second delay between each attempt.

Timeouts

If the processor takes too long to process a message and a (n)ack instruction isn't sent back on time, the platform will assume a nack and resend the message. At the time of writing, this timeout defaults to 30 seconds. If you need to change this value, you can do so when setting up the consumer.

services.AddGrpcConsumer<MyMessageProcessor>( options =>
{
    // ...

    // let's set a longer timeout
    options.Timeout = TimeSpan.FromSeconds( 60 );
} );

Asynchronous partitions

When a consumer is assigned multiple partitions, the default behaviour of the platform is to sequentially go through each partition and send a subset of messages from each one. This is sufficient for most use cases, since the number of consumers can be scaled to match a higher load.

However, it is possible to instruct the platform to send messages from different partitions in parallel. This might be suitable for consumers handling a large number of partitions or when the processing of a message is particularly slow. If you think this might be the case, you can enable this feature by setting the AsyncPartitions property to true when setting up the consumer.

services.AddGrpcConsumer<MyMessageProcessor>( options =>
{
    // ...

    // let's enable asynchronous partitions
    options.AsyncPartitions = true;
} );

Setting this property to true will instruct the platform to send messages from different partitions in parallel. In the same way, the SDK will process messages from different partitions in different threads. This can sometimes lead to a higher throughput, particularly when the processing of a message is slow, but it can also lead to a higher resource usage, so it should be used with caution.

Note: Although messages from different partitions will be processed in parallel, the order of the messages for each partition will still be preserved. Influx will not send new messages from the same partition until the current one is (n)acknowledged.

Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
0.3.0-preview-005 66 3/4/2024