Rig.TUnit.Messaging.Kafka
0.1.0-beta.2
dotnet add package Rig.TUnit.Messaging.Kafka --version 0.1.0-beta.2
NuGet\Install-Package Rig.TUnit.Messaging.Kafka -Version 0.1.0-beta.2
<PackageReference Include="Rig.TUnit.Messaging.Kafka" Version="0.1.0-beta.2" />
<PackageVersion Include="Rig.TUnit.Messaging.Kafka" Version="0.1.0-beta.2" />
<PackageReference Include="Rig.TUnit.Messaging.Kafka" />
paket add Rig.TUnit.Messaging.Kafka --version 0.1.0-beta.2
#r "nuget: Rig.TUnit.Messaging.Kafka, 0.1.0-beta.2"
#:package Rig.TUnit.Messaging.Kafka@0.1.0-beta.2
#addin nuget:?package=Rig.TUnit.Messaging.Kafka&version=0.1.0-beta.2&prerelease
#tool nuget:?package=Rig.TUnit.Messaging.Kafka&version=0.1.0-beta.2&prerelease
Rig.TUnit.Messaging.Kafka
Testcontainers-backed Apache Kafka fixture with
KafkaListener/KafkaEventSender, multi-partitionConfluent.Kafkaadmin-driven topology, and partition-orderedOrderingAssert.
What this package is
The Rig.TUnit Kafka provider. KafkaFixture spins
confluentinc/cp-kafka in single-broker KRaft mode (no separate
Zookeeper since Kafka 3.3). Ships:
KafkaEventSender— sender with aSendContextoverload mappingPartitionKey(orSessionKeyas a fallback) to the KafkaMessage.Key. The legacycorrelationId → Message.Keyconflation was decoupled in Feature 007 T020.KafkaListener— wraps aConfluent.Kafkaconsumer;Subscribefor group-managed assignment orAssign(int partition)to pin a listener to a specific partition. Pre-creates the topic before subscribing (so a fast publisher never races a not-yet-existing topic).KafkaTopologyBuilder+IKafkaTopologyBuilder/IKafkaTopicConfig— multi-partition + custom configs (retention, cleanup policy) via theConfluent.Kafka.AdminClient. Wired to the rig viaKafkaRigBuilder.WithTopology(…).KafkaFixtureOptions.DefaultPartitions— global default for topics created by the listener.
When to use it
- Integration tests for Kafka consumers / producers.
- Asserting partition ordering is preserved across a producer / consumer
pair (multi-partition
OrderingAssert.PerKeyMonotonicper key). - Verifying consumer-group rebalance behaviour (two listeners in the same group).
- Declaring topics with custom partition counts, retention, or log-compaction at runtime from test code.
- Not for: pure unit tests of message-handler logic — mock the consumer there.
Prerequisites
- .NET 10 SDK
- Docker Desktop / Colima (Kafka image ~500 MB)
Confluent.Kafka(transitive)
Quick start
using Rig.TUnit.Messaging.Helpers;
using Rig.TUnit.Messaging.Kafka.Fixtures;
using Rig.TUnit.Messaging.Kafka.Helpers;
await using var fx = new KafkaFixture();
await fx.InitializeAsync();
// Per-key ordered send (hashed to a partition by Confluent.Kafka).
await using var sender = new KafkaEventSender(fx.ConnectionString, topic: "orders");
await using var listener = new KafkaListener(fx.ConnectionString, "orders", groupId: "shipping");
await listener.StartAsync(ct);
await sender.SendAsync(
"{\"orderId\":1}",
context: new SendContext(PartitionKey: "customer-42"),
ct: ct);
Multi-partition topology via the WithTopology rig hook:
services.AddRigTUnit(rig =>
rig.UseKafka(RigConnect.FromValue(fx.ConnectionString), k =>
k.WithTopology(t =>
t.Topic("orders", c => c
.WithPartitions(6)
.WithReplicationFactor(1)
.WithConfig("cleanup.policy", "compact")
.WithConfig("retention.ms", "86400000")))));
Options
| Property | Type | Default | Description |
|---|---|---|---|
Image |
string |
"confluentinc/cp-kafka:7.6.1" |
Broker image |
StartupTimeoutSeconds |
int |
120 |
Kafka boot |
NumPartitions |
int |
3 |
Default topic partition count |
ReplicationFactor |
short |
1 |
Single-broker dev cluster |
DefaultPartitions |
int |
1 |
Per-listener default when no topology declares the topic (range 1–200) |
Fixture + helper APIs
Rig.TUnit.Messaging.Kafka.Fixtures.KafkaFixtureRig.TUnit.Messaging.Kafka.Options.KafkaFixtureOptions—DefaultPartitionscontrols the auto-create count.Rig.TUnit.Messaging.Kafka.Builder.KafkaRigBuilder— shipsWithTopology(Action<IKafkaTopologyBuilder>).Rig.TUnit.Messaging.Kafka.Helpers.KafkaListener—SubscribeorAssign(int partition)for pinned-partition tests.Rig.TUnit.Messaging.Kafka.Helpers.KafkaEventSender— shipsSendAsync(string, SendContext, …)overload.Rig.TUnit.Messaging.Kafka.Topology.IKafkaTopologyBuilderRig.TUnit.Messaging.Kafka.Topology.IKafkaTopicConfig—WithPartitions,WithReplicationFactor,WithConfig(key, value).Rig.TUnit.Messaging.Kafka.Topology.KafkaTopologyBuilder
Per-test isolation
Per-test topic orders_{IsolationKey:short}. Teardown deletes the
topic. Consumer groups also carry the IsolationKey suffix so parallel
tests do not join the same rebalance group.
Parallelism + performance
- First-run pull: ~60 s.
- Warm startup: ~10–15 s.
- Per-op send: ~3–5 ms.
- Bind-port contention — each Kafka broker binds a single host port;
multiple fixtures run in parallel because Testcontainers allocates
ephemeral ports, but
KafkaFixtureOptions.FixedHostPortbreaks this.
Troubleshooting
Local: Broker transport failure— consumer started before broker finished topic creation.KafkaListener.StartAsyncretries with exponential backoff; do not shortcut the wait.- Consumer-group rebalance loops forever — two listeners use the
same group id; parallel tests must suffix with
IsolationKey. - Message not received but sent — producer's delivery report arrived
but consumer hasn't polled yet; poll with
MessageAssert.Within(…)rather than a fixed delay.
See docs/troubleshooting.md#kafka.
Provider quirks + edge cases
- Kafka's partition-level ordering guarantee is the strongest thing
asserted in
OrderingAssert; global ordering across partitions is explicitly not guaranteed. UseSendContext.PartitionKeyso all messages for a key land in the same partition. - Kafka has no native session concept; if
SendContext.SessionKeyis supplied withoutPartitionKey, the sender folds it to theMessage.Keyautomatically. correlationIdno longer doubles asMessage.Key— the legacy conflation inKafkaEventSenderwas decoupled. Tests that relied on the old behaviour must passPartitionKeyexplicitly.- Tombstones (null value) are required to delete keys in log-compacted
topics — the test harness surfaces this via
KafkaEventSender.SendTombstoneAsync. auto.offset.reset=earliestis set by default so new consumer groups see historical messages; switch tolatestfor replay-free tests.- Topology apply is idempotent:
EnsureTopicExistsAsyncswallowsCreateTopicsExceptionwhen every result isTopicAlreadyExists.
Benchmarks
See KafkaMessagingBenchmarks.cs;
baseline in benchmarks/baseline-005.json.
Related docs
- Architecture diagram
- Glossary
- Troubleshooting
- Provider deep-dive:
docs/providers/kafka.md(multi-partition ordering, topic configs, pinned-partition examples). - Family base:
Rig.TUnit.Messaging - Feature design: Sessions & Partitions · Topology Builder
License
MIT. See LICENSE.
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net10.0 is compatible. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net10.0
- Bogus (>= 35.6.1)
- Confluent.Kafka (>= 2.6.0)
- Microsoft.Extensions.Configuration (>= 10.0.0)
- Microsoft.Extensions.Configuration.Binder (>= 10.0.0)
- Microsoft.Extensions.DependencyInjection.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Logging.Abstractions (>= 10.0.0)
- Microsoft.Extensions.Options (>= 10.0.0)
- Microsoft.Extensions.Options.DataAnnotations (>= 10.0.0)
- Rig.TUnit.Messaging (>= 0.1.0-beta.2)
- Testcontainers (>= 4.11.0)
- Testcontainers.Kafka (>= 4.11.0)
- TUnit.Core (>= 1.34.5)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Rig.TUnit.Messaging.Kafka:
| Package | Downloads |
|---|---|
|
Rig.TUnit.All
Meta-package containing every Rig.TUnit.* package. DISCOURAGED — prefer per-feature or per-stack meta-packages (Rig.TUnit, Rig.TUnit.Microservices). |
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.1.0-beta.2 | 60 | 4/27/2026 |
| 0.0.0-alpha.0.14 | 52 | 4/26/2026 |