Influx.SDK.Consumer
0.3.0-preview-005
dotnet add package Influx.SDK.Consumer --version 0.3.0-preview-005
NuGet\Install-Package Influx.SDK.Consumer -Version 0.3.0-preview-005
<PackageReference Include="Influx.SDK.Consumer" Version="0.3.0-preview-005" />
paket add Influx.SDK.Consumer --version 0.3.0-preview-005
#r "nuget: Influx.SDK.Consumer, 0.3.0-preview-005"
// Install Influx.SDK.Consumer as a Cake Addin
#addin nuget:?package=Influx.SDK.Consumer&version=0.3.0-preview-005&prerelease
// Install Influx.SDK.Consumer as a Cake Tool
#tool nuget:?package=Influx.SDK.Consumer&version=0.3.0-preview-005&prerelease
Influx Consumer SDK
This is the Consumer SDK for Influx. It provides a way to consume messages from the platform without having to deal with the underlying gRPC communication.
Note: This project is still in development and should be considered experimental. It is not recommended to use it in a production environment.
Installation
The package is available from NuGet. To install it, run the following command:
dotnet add package Influx.SDK.Consumer --prerelease
Usage
To handle data received from Influx, you will need to create a message processor class that implements the Influx.Pulse.IMessageProcessor
interface. The only thing that is required is the ExecuteAsync
method, which will be called for each message received from the platform.
public class MyMessageProcessor : IMessageProcessor
{
public RetryOptions? Options => null;
public Task ExecuteAsync( Message message, CancellationToken cancellationToken )
{
// your code here
}
}
Next you can register the consumer service along with your processor. This will set up a gRPC client and a background service that will automatically connect to Influx and consume data from it, triggering your processor for each message received.
IServiceCollection services = ...;
services.AddGrpcConsumer<MyMessageProcessor>( options =>
{
options.Url = "https://influx.example.com";
options.Group = "default";
options.Key = "consumer-key"; // required if the host is secured
} );
Handling acknowledgements
The acknowledgement of a message is done automatically by the SDK, depending on the result of the execution of your processor. If it does not throw an exception, the message is considered to be acknowledged and an ack
instruction is sent to the platform. If however, the processor throws an exception, the message is considered to be not acknowledged and a nack
instruction is sent instead.
By implementing the RetryOptions
property in the processor, it's possible to control how many times and how long after a message is resent to the consumer. By default (if the RetryOptions
property returns null), the message will be resent indefinitely, with a 1 second delay between each attempt.
Timeouts
If the processor takes too long to process a message and a (n)ack
instruction isn't sent back on time, the platform will assume a nack
and resend the message. At the time of writing, this timeout defaults to 30 seconds. If you need to change this value, you can do so when setting up the consumer.
services.AddGrpcConsumer<MyMessageProcessor>( options =>
{
// ...
// let's set a longer timeout
options.Timeout = TimeSpan.FromSeconds( 60 );
} );
Asynchronous partitions
When a consumer is assigned multiple partitions, the default behaviour of the platform is to sequentially go through each partition and send a subset of messages from each one. This is sufficient for most use cases, since the number of consumers can be scaled to match a higher load.
However, it is possible to instruct the platform to send messages from different partitions in parallel. This might be suitable for consumers handling a large number of partitions or when the processing of a message is particularly slow. If you think this might be the case, you can enable this feature by setting the AsyncPartitions
property to true
when setting up the consumer.
services.AddGrpcConsumer<MyMessageProcessor>( options =>
{
// ...
// let's enable asynchronous partitions
options.AsyncPartitions = true;
} );
Setting this property to true
will instruct the platform to send messages from different partitions in parallel. In the same way, the SDK will process messages from different partitions in different threads. This can sometimes lead to a higher throughput, particularly when the processing of a message is slow, but it can also lead to a higher resource usage, so it should be used with caution.
Note: Although messages from different partitions will be processed in parallel, the order of the messages for each partition will still be preserved. Influx will not send new messages from the same partition until the current one is (n)acknowledged.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
-
net8.0
- Faactory.Channels (>= 0.9.1)
- Faactory.Channels.Core (>= 0.9.1)
- Faactory.Channels.Parcel (>= 0.4.0)
- Google.Protobuf (>= 3.25.3)
- Grpc.Net.Client (>= 2.61.0)
- Microsoft.Extensions.Hosting.Abstractions (>= 8.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.3.0-preview-005 | 66 | 3/4/2024 |