PgOutput2Json.RabbitMqStreams
1.0.3
See the version list below for details.
dotnet add package PgOutput2Json.RabbitMqStreams --version 1.0.3
NuGet\Install-Package PgOutput2Json.RabbitMqStreams -Version 1.0.3
<PackageReference Include="PgOutput2Json.RabbitMqStreams" Version="1.0.3" />
<PackageVersion Include="PgOutput2Json.RabbitMqStreams" Version="1.0.3" />
<PackageReference Include="PgOutput2Json.RabbitMqStreams" />
paket add PgOutput2Json.RabbitMqStreams --version 1.0.3
#r "nuget: PgOutput2Json.RabbitMqStreams, 1.0.3"
#:package PgOutput2Json.RabbitMqStreams@1.0.3
#addin nuget:?package=PgOutput2Json.RabbitMqStreams&version=1.0.3
#tool nuget:?package=PgOutput2Json.RabbitMqStreams&version=1.0.3
PgOutput2Json
PgOutput2Json is a .NET library that uses PostgreSQL logical replication to stream JSON-encoded change events from your database to a .NET application or a message broker like RabbitMQ, Kafka, or Redis.
π Use Cases
Use PgOutput2Json
when you need to react to DB changes in near real-time:
- π Background tasks triggered by inserts/updates
- β‘ Low-latency application cache updates
- π Asynchronous logging (to file or another DB)
- π Real-time data aggregation
- π οΈ Lightweight ETL for DWH solutions
All with minimal latency β events are dispatched shortly after a transaction is committed, though large transactions may introduce additional delay due to processing and transmission overhead.
π Supported Outputs
- β .NET application via a simple delegate handler
- β Kafka
- β RabbitMQ (Streams + Classic Queues)
- β Redis (Streams + Pub/Sub Channels)
- β SQLite (used by PgFreshCache)
- β MongoDB
- β Amazon Kinesis
- β Amazon DynamoDB
- β Azure Event Hubs
Plug-and-play adapters handle the heavy lifting β or handle messages directly in your app for maximum control.
π§ How It Works
PostgreSQL 10+ ships with a built-in logical decoding plugin called pgoutput
, which emits row-level changes (INSERT/UPDATE/DELETE) over a replication slot.
PgOutput2Json
:
- Connects to a replication slot via
pgoutput
- Converts change events into clean JSON
- Sends them to your application or a supported message broker
No extra plugins needed β pgoutput
comes with PostgreSQL by default.
The change events JSON format:
{
"c": "U", // Change type: I (insert), U (update), D (delete)
"w": 2485645760, // WAL end offset
"t": "schema.table", // Table name (if enabled in JSON options)
"k": { ... }, // Key values β included for deletes, and for updates if the key changed,
// or old row values, if the table uses REPLICA IDENTITY FULL
"r": { ... } // New row values (not present for deletes)
}
β οΈ Development Status
Still early days β the library is under active development, but it's fully usable for testing and early integration.
Quick Start
Configure postgresql.conf
To enable logical replication, add the following setting in your postgresql.conf
:
wal_level = logical
Other necessary settings usually have appropriate default values for a basic setup.
Note: PostgreSQL must be restarted after modifying this setting.
Create a Replication User
Next, create a user in PostgreSQL that will be used for replication. Run the following SQL command:
CREATE USER pgoutput2json WITH
PASSWORD '_your_password_here_'
REPLICATION;
If you are connecting from a remote machine, don't forget to modify pg_hba.conf
to allow non-local connections for this user. After modifying pg_hba.conf
, apply the changes by either:
- Sending a SIGHUP to the PostgreSQL process
- Running
pg_ctl reload
- Or executing:
SELECT pg_reload_conf();
Create a Publication
Now, connect to the target database and create a publication to specify which tables and actions should be replicated:
CREATE PUBLICATION my_publication
FOR TABLE my_table1, my_table2
WITH (publish = 'insert, update, delete');
In the example code below, we'll assume the database name is my_database
.
Set Up RabbitMQ Streams
Before using RabbitMQ Streams, ensure that the RabbitMQ Streams plugin is enabled. The stream protocol must be active for the stream to work.
Create a stream in RabbitMQ, for example, a stream named my_stream
. You can do this from the RabbitMQ management interface or through the appropriate RabbitMQ commands.
Create .NET Worker Service
Set up a new .NET Worker Service and add the following package reference:
dotnet add package PgOutput2Json.RabbitMqStreams
In your Worker.cs
, add the following code to use RabbitMQ Streams:
using PgOutput2Json;
using PgOutput2Json.RabbitMqStreams;
public class Worker : BackgroundService
{
private readonly ILoggerFactory _loggerFactory;
public Worker(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// This code assumes PostgreSQL and RabbitMQ are on localhost
using var pgOutput2Json = PgOutput2JsonBuilder.Create()
.WithLoggerFactory(_loggerFactory)
.WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
.WithPgPublications("my_publication")
.UseRabbitMqStreams(options =>
{
options.StreamName = "my_stream";
options.StreamSystemConfig.UserName = "guest";
options.StreamSystemConfig.Password = "guest";
options.StreamSystemConfig.VirtualHost = "/";
options.StreamSystemConfig.Endpoints =
[
new IPEndPoint(IPAddress.Loopback, 5552)
];
})
.Build();
await pgOutput2Json.StartAsync(stoppingToken);
}
}
Once the stream is configured in RabbitMQ and the .NET service is running, changes to the PostgreSQL tables specified in my_publication
will be pushed to the my_stream
stream in RabbitMQ.
No need to manage exchanges or queues β everything will flow directly through the stream.
Note: This example uses a temporary replication slot, meaning it wonβt capture changes made while the worker was stopped.
Working with Permanent Replication Slots
If you want to capture changes made while the worker is stopped, you need a permanent replication slot.
In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database.
To create a replication slot, call the pg_create_logical_replication_slot
function in the same database that holds the tables being tracked:
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');
The first parameter is the name of the slot, that must be unique across all databases in a PostgreSQL cluster. Make sure you specify pgoutput
as the second parameter - that specifies the correct logical decoding plugin.
The current position of each slot is persisted only at checkpoint, so in the case of a crash, the slot may return to an earlier log sequence number (LSN), which will then cause recent changes to be sent again when the server restarts.
Note: It is the responsibility of the receiving client to avoid the ill effects from handling the same message more than once.
β οΈ Important: Replication slots persist across crashes and know nothing about the state of their consumer(s). They will prevent removal of required resources even when there is no connection using them. This consumes storage because neither required WAL nor required rows from the system catalogs can be removed by VACUUM as long as they are required by a replication slot.
In extreme cases, this could cause the database to shut down to prevent transaction ID wraparound.
So, if a slot is no longer required, it should be dropped. To drop a replication slot, use:
SELECT * FROM pg_drop_replication_slot('my_slot');
Once the replication slot is created, to use it, simply specify the name of the slot in the PgOutput2JsonBuilder
:
// ...
using var pgOutput2Json = PgOutput2JsonBuilder.Create()
.WithLoggerFactory(_loggerFactory)
.WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
.WithPgPublications("my_publication")
.WithPgReplicationSlot("my_slot") // <-- slot specified here
//...
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.8)
- PgOutput2Json (>= 1.0.3)
- RabbitMQ.Stream.Client (>= 1.9.0)
-
net9.0
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.8)
- PgOutput2Json (>= 1.0.3)
- RabbitMQ.Stream.Client (>= 1.9.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
1.2.0 | 9 | 8/17/2025 |
1.1.2 | 20 | 8/16/2025 |
1.1.1 | 92 | 8/11/2025 |
1.1.0 | 93 | 8/11/2025 |
1.0.6 | 87 | 8/10/2025 |
1.0.3 | 88 | 8/10/2025 |
1.0.2 | 91 | 8/10/2025 |
1.0.1 | 71 | 8/9/2025 |
1.0.0 | 167 | 8/5/2025 |
0.9.20 | 148 | 5/19/2025 |
0.9.17 | 147 | 5/18/2025 |
0.9.16 | 234 | 5/12/2025 |
0.9.15 | 156 | 5/5/2025 |
0.9.14 | 73 | 5/3/2025 |
0.9.13 | 154 | 5/1/2025 |
0.9.12 | 160 | 4/27/2025 |
0.9.11 | 103 | 4/25/2025 |
0.9.10 | 162 | 4/21/2025 |
0.9.9 | 159 | 4/21/2025 |
0.9.8 | 155 | 4/20/2025 |
0.9.7 | 141 | 4/18/2025 |
0.9.6 | 218 | 4/15/2025 |
0.9.5 | 187 | 4/13/2025 |