Open.ChannelExtensions.Kafka
1.0.2
dotnet add package Open.ChannelExtensions.Kafka --version 1.0.2
NuGet\Install-Package Open.ChannelExtensions.Kafka -Version 1.0.2
<PackageReference Include="Open.ChannelExtensions.Kafka" Version="1.0.2" />
<PackageVersion Include="Open.ChannelExtensions.Kafka" Version="1.0.2" />
<PackageReference Include="Open.ChannelExtensions.Kafka" />
paket add Open.ChannelExtensions.Kafka --version 1.0.2
#r "nuget: Open.ChannelExtensions.Kafka, 1.0.2"
#:package Open.ChannelExtensions.Kafka@1.0.2
#addin nuget:?package=Open.ChannelExtensions.Kafka&version=1.0.2
#tool nuget:?package=Open.ChannelExtensions.Kafka&version=1.0.2
Open.ChannelExtensions.Kafka
Bridges Confluent.Kafka with System.Threading.Channels and Open.ChannelExtensions, giving you resilient, back-pressure-aware Kafka producers and consumers with minimal ceremony.
Features
- Consumer →
ChannelReader— pipe Kafka messages into a bounded channel with built-in back-pressure. - Consumer →
IAsyncEnumerable— iterate consumed messages with optional buffering. Unbuffered mode is LINQ-compatible. - Resilient consumption —
ConsumeUntilCancelledautomatically reconnects and resubscribes after transient failures with built-in back-off delays. - Managed producer singleton — thread-safe, lazily-built
IProducerwith automatic rebuild on fatal errors and exponential back-off. - Dependency injection — register a Kafka producer as a singleton service in one call.
- Structured logging — optional
ILoggerintegration throughout. - Broad target support —
netstandard2.0,netstandard2.1,net8.0,net9.0.
Installation
dotnet add package Open.ChannelExtensions.Kafka
Or via the Package Manager Console:
Install-Package Open.ChannelExtensions.Kafka
Quick Start
Consuming
As a ChannelReader
Turn any IConsumer<TKey, TValue> into a bounded ChannelReader that fills in the background:
using Confluent.Kafka;
using Open.ChannelExtensions.Kafka;
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "my-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("my-topic");
// Bounded channel with capacity of 100 messages.
ChannelReader<ConsumeResult<string, string>> reader =
consumer.ToChannelReader(capacity: 100, cancellationToken: cts.Token);
// Now use Open.ChannelExtensions to process:
await reader
.ReadAllAsync(cts.Token)
.ForEachAsync(result =>
{
Console.WriteLine($"{result.Message.Key}: {result.Message.Value}");
});
The singleReader parameter (default false) can be set to true when only one thread will read from the channel — this enables internal optimizations in the channel implementation.
As an IAsyncEnumerable
Buffered — reads ahead into an internal channel for throughput:
await foreach (var result in consumer.ToBufferedAsyncEnumerable(bufferSize: 50, cancellationToken: cts.Token))
{
Console.WriteLine(result.Message.Value);
}
Note: Because the buffer continues to fill independently, this is not LINQ-compatible.
Unbuffered — yields one at a time. Safe for LINQ composition:
await foreach (var result in consumer.AsAsyncEnumerable(logger: null, cancellationToken: cts.Token))
{
Console.WriteLine(result.Message.Value);
}
Resilient consumption with ConsumeUntilCancelled
For long-lived consumers that must survive broker restarts, network blips, and transient errors:
Func<ConsumerBuilder<string, string>> builderFactory =
() => new ConsumerBuilder<string, string>(config);
await foreach (var result in builderFactory.ConsumeUntilCancelled("my-topic", logger, cts.Token))
{
Process(result);
}
This method:
- Rebuilds the consumer if construction or subscription fails (retries after 30 seconds).
- Restarts after a consume error (retries after 5 seconds).
- Stops cleanly when the
CancellationTokenis cancelled.
You can also pass multiple topics:
builderFactory.ConsumeUntilCancelled(["topic-a", "topic-b"], logger, cts.Token);
Producing
Registering via Dependency Injection
Register a managed producer singleton in your DI container:
// Using a ProducerConfig:
services.AddKafkaProducer<string, string>(sp =>
new ProducerConfig { BootstrapServers = "localhost:9092" });
// Or using a ProducerBuilder factory for full control:
services.AddKafkaProducer<string, string>(sp =>
() => new ProducerBuilder<string, string>(
new ProducerConfig { BootstrapServers = "localhost:9092" }));
Then inject IKafkaProducerProvider<string, string> and get the producer:
public class MyService(IKafkaProducerProvider<string, string> producerProvider)
{
public async Task PublishAsync(string key, string value, CancellationToken ct)
{
var producer = await producerProvider.GetProducerAsync();
await producer.ProduceAsync("my-topic", new Message<string, string>
{
Key = key,
Value = value
}, ct);
}
}
Using KafkaProducerSingleton directly
If you're not using DI, you can create the singleton yourself:
using var producerSingleton = new KafkaProducerSingleton<string, string>(
new ProducerConfig { BootstrapServers = "localhost:9092" },
logger: null);
var producer = await producerSingleton.GetProducerAsync();
await producer.ProduceAsync("my-topic", new Message<string, string>
{
Key = "key",
Value = "hello"
});
The singleton:
- Lazily builds the
IProduceron first access. - Automatically disposes and rebuilds the producer when a fatal Kafka error is reported.
- Applies exponential back-off (5 → 10 → 20 → 40 seconds) between rebuild attempts, resetting after 5 minutes of stability.
- Is fully thread-safe — concurrent callers share the same producer instance.
API Reference
KafkaConsumerExtensions
| Method | Returns | Description |
|---|---|---|
ToChannelReader(capacity, ...) |
ChannelReader<ConsumeResult<TKey, TValue>> |
Bounded channel reader fed by the consumer in the background. |
ToBufferedAsyncEnumerable(bufferSize, ...) |
IAsyncEnumerable<ConsumeResult<TKey, TValue>> |
Buffered async enumerable (not LINQ-safe). |
AsAsyncEnumerable(logger, ...) |
IAsyncEnumerable<ConsumeResult<TKey, TValue>> |
Unbuffered async enumerable (LINQ-safe). |
ConsumeUntilCancelled(topics, ...) |
IAsyncEnumerable<ConsumeResult<TKey, TValue>> |
Resilient loop that rebuilds the consumer on failure. |
KafkaConfigExtensions
| Method | Description |
|---|---|
AddKafkaProducer<TKey, TValue>(configFactory) |
Registers a KafkaProducerSingleton using a ProducerConfig. |
AddKafkaProducer<TKey, TValue>(builderFactory) |
Registers a KafkaProducerSingleton using a ProducerBuilder factory. |
KafkaProducerSingleton<TKey, TValue>
| Member | Description |
|---|---|
GetProducerAsync() |
Returns the managed IProducer, building it on first call. |
Dispose() |
Cancels pending builds and disposes the producer. |
Requirements
- .NET Standard 2.0+, .NET 8.0+, or .NET 9.0+
- Confluent.Kafka ≥ 2.13.2
- Open.ChannelExtensions ≥ 9.3.0
License
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
| .NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
| .NET Standard | netstandard2.0 is compatible. netstandard2.1 is compatible. |
| .NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
| MonoAndroid | monoandroid was computed. |
| MonoMac | monomac was computed. |
| MonoTouch | monotouch was computed. |
| Tizen | tizen40 was computed. tizen60 was computed. |
| Xamarin.iOS | xamarinios was computed. |
| Xamarin.Mac | xamarinmac was computed. |
| Xamarin.TVOS | xamarintvos was computed. |
| Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Confluent.Kafka (>= 2.13.2)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.5)
- Open.ChannelExtensions (>= 9.3.0)
-
.NETStandard 2.1
- Confluent.Kafka (>= 2.13.2)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.5)
- Open.ChannelExtensions (>= 9.3.0)
-
net8.0
- Confluent.Kafka (>= 2.13.2)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.5)
- Open.ChannelExtensions (>= 9.3.0)
-
net9.0
- Confluent.Kafka (>= 2.13.2)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.5)
- Open.ChannelExtensions (>= 9.3.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 1.0.2 | 1,572 | 3/19/2026 |
| 1.0.1 | 216 | 3/18/2026 |
| 1.0.0 | 87 | 3/18/2026 |
| 1.0.0-beta02 | 231 | 3/27/2025 |
| 1.0.0-beta01 | 193 | 3/27/2025 |