PgOutput2Json 1.1.0
See the version list below for details.
dotnet add package PgOutput2Json --version 1.1.0
NuGet\Install-Package PgOutput2Json -Version 1.1.0
<PackageReference Include="PgOutput2Json" Version="1.1.0" />
<PackageVersion Include="PgOutput2Json" Version="1.1.0" />
<PackageReference Include="PgOutput2Json" />
paket add PgOutput2Json --version 1.1.0
#r "nuget: PgOutput2Json, 1.1.0"
#:package PgOutput2Json@1.1.0
#addin nuget:?package=PgOutput2Json&version=1.1.0
#tool nuget:?package=PgOutput2Json&version=1.1.0
PgOutput2Json
PgOutput2Json is a .NET library that uses PostgreSQL logical replication to stream JSON-encoded change events from your database to a .NET application or a message broker like RabbitMQ, Kafka, or Redis.
π Use Cases
Use PgOutput2Json
when you need to react to DB changes in near real-time:
- π Background tasks triggered by inserts/updates
- β‘ Low-latency application cache updates
- π Asynchronous logging (to file or another DB)
- π Real-time data aggregation
- π οΈ Lightweight ETL for DWH solutions
All with minimal latency β events are dispatched shortly after a transaction is committed, though large transactions may introduce additional delay due to processing and transmission overhead.
π Supported Outputs
- β .NET application via a simple delegate handler
- β Kafka
- β RabbitMQ (Streams + Classic Queues)
- β Redis (Streams + Pub/Sub Channels)
- β SQLite (used by PgFreshCache)
- β MongoDB
- β Amazon Kinesis
- β Amazon DynamoDB
Plug-and-play adapters handle the heavy lifting β or handle messages directly in your app for maximum control.
π§ How It Works
PostgreSQL 10+ ships with a built-in logical decoding plugin called pgoutput
, which emits row-level changes (INSERT/UPDATE/DELETE) over a replication slot.
PgOutput2Json
:
- Connects to a replication slot via
pgoutput
- Converts change events into clean JSON
- Sends them to your application or a supported message broker
No extra plugins needed β pgoutput
comes with PostgreSQL by default.
The change events JSON format:
{
"c": "U", // Change type: I (insert), U (update), D (delete)
"w": 2485645760, // WAL end offset
"t": "schema.table", // Table name (if enabled in JSON options)
"k": { ... }, // Key values β included for deletes, and for updates if the key changed,
// or old row values, if the table uses REPLICA IDENTITY FULL
"r": { ... } // New row values (not present for deletes)
}
Quick Start
Configure postgresql.conf
To enable logical replication, add the following setting in your postgresql.conf
:
wal_level = logical
Other necessary settings usually have appropriate default values for a basic setup.
Note: PostgreSQL must be restarted after modifying this setting.
Create a Replication User
Next, create a user in PostgreSQL that will be used for replication. Run the following SQL command:
CREATE USER pgoutput2json WITH
PASSWORD '_your_password_here_'
REPLICATION;
If you are connecting from a remote machine, don't forget to modify pg_hba.conf
to allow non-local connections for this user. After modifying pg_hba.conf
, apply the changes by either:
- Sending a SIGHUP to the PostgreSQL process
- Running
pg_ctl reload
- Or executing:
SELECT pg_reload_conf();
Create a Publication
Now, connect to the target database and create a publication to specify which tables and actions should be replicated:
CREATE PUBLICATION my_publication
FOR TABLE my_table1, my_table2
WITH (publish = 'insert, update, delete');
In the example code below, we'll assume the database name is my_database
.
Create a .NET Worker Service
Set up a new .NET Worker Service and add the PgOutput2Json
package:
dotnet add package PgOutput2Json
In your Worker.cs
, add the following:
using PgOutput2Json;
public class Worker : BackgroundService
{
private readonly ILoggerFactory _loggerFactory;
public Worker(ILoggerFactory loggerFactory)
{
_loggerFactory = loggerFactory;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using var pgOutput2Json = PgOutput2JsonBuilder.Create()
.WithLoggerFactory(_loggerFactory)
.WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
.WithPgPublications("my_publication")
.WithMessageHandler((json, table, key, partition) =>
{
Console.WriteLine($"{table}: {json}");
return Task.FromResult(true);
})
.Build();
await pgOutput2Json.StartAsync(stoppingToken);
}
}
Note: This example uses a temporary replication slot, meaning it wonβt capture changes made while the worker was stopped.
Working with Permanent Replication Slots
If you want to capture changes made while the worker is stopped, you need a permanent replication slot.
In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database.
To create a replication slot, call the pg_create_logical_replication_slot
function in the same database that holds the tables being tracked:
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');
The first parameter is the name of the slot, that must be unique across all databases in a PostgreSQL cluster. Make sure you specify pgoutput
as the second parameter - that specifies the correct logical decoding plugin.
The current position of each slot is persisted only at checkpoint, so in the case of a crash, the slot may return to an earlier log sequence number (LSN), which will then cause recent changes to be sent again when the server restarts.
Note: It is the responsibility of the receiving client to avoid the ill effects from handling the same message more than once.
β οΈ Important: Replication slots persist across crashes and know nothing about the state of their consumer(s). They will prevent removal of required resources even when there is no connection using them. This consumes storage because neither required WAL nor required rows from the system catalogs can be removed by VACUUM as long as they are required by a replication slot.
In extreme cases, this could cause the database to shut down to prevent transaction ID wraparound.
So, if a slot is no longer required, it should be dropped. To drop a replication slot, use:
SELECT * FROM pg_drop_replication_slot('my_slot');
Once the replication slot is created, to use it, simply specify the name of the slot in the PgOutput2JsonBuilder
:
// ...
using var pgOutput2Json = PgOutput2JsonBuilder.Create()
.WithLoggerFactory(_loggerFactory)
.WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
.WithPgPublications("my_publication")
.WithPgReplicationSlot("my_slot") // <-- slot specified here
//...
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.8)
- Npgsql (>= 9.0.3)
-
net9.0
- Microsoft.Extensions.Logging.Abstractions (>= 9.0.8)
- Npgsql (>= 9.0.3)
NuGet packages (10)
Showing the top 5 NuGet packages that depend on PgOutput2Json:
Package | Downloads |
---|---|
PgOutput2Json.RabbitMq
PgOutput2Json.RabbitMQ library uses PostgreSQL logical replication to push changes, as JSON messages, from database tables to RabbitMQ |
|
PgOutput2Json.Redis
PgOutput2Json.Redis library uses PostgreSQL logical replication to push changes, as JSON messages, from database tables to Redis |
|
PgOutput2Json.RabbitMqStreams
PgOutput2Json.RabbitMqStreams library uses PostgreSQL logical replication to push changes, as JSON messages, from database tables to RabbitMQ Streams |
|
PgOutput2Json.Kafka
PgOutput2Json.Kafka library uses PostgreSQL logical replication to push changes, as JSON messages, from database tables to Kafka |
|
PgOutput2Json.Sqlite
PgOutput2Json.Sqlite library uses logical replication to copy rows from PostgreSQL to Sqlite |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
1.2.0 | 146 | 8/17/2025 |
1.1.2 | 88 | 8/16/2025 |
1.1.1 | 193 | 8/11/2025 |
1.1.0 | 163 | 8/11/2025 |
1.0.6 | 173 | 8/10/2025 |
1.0.5 | 126 | 8/10/2025 |
1.0.4 | 127 | 8/10/2025 |
1.0.3 | 163 | 8/10/2025 |
1.0.2 | 167 | 8/10/2025 |
1.0.1 | 147 | 8/9/2025 |
1.0.0 | 281 | 8/5/2025 |
0.9.20 | 414 | 5/19/2025 |
0.9.17 | 238 | 5/18/2025 |
0.9.16 | 321 | 5/12/2025 |
0.9.15 | 245 | 5/5/2025 |
0.9.14 | 139 | 5/3/2025 |
0.9.13 | 207 | 5/1/2025 |
0.9.12 | 225 | 4/27/2025 |
0.9.11 | 149 | 4/25/2025 |
0.9.10 | 252 | 4/21/2025 |
0.9.9 | 215 | 4/21/2025 |
0.9.8 | 207 | 4/20/2025 |
0.9.7 | 186 | 4/18/2025 |
0.9.6 | 296 | 4/15/2025 |
0.9.5 | 251 | 4/13/2025 |
0.9.4 | 306 | 3/12/2025 |
0.9.3 | 220 | 3/12/2025 |
0.9.2 | 226 | 3/10/2025 |
0.9.1 | 190 | 3/9/2025 |
0.9.0 | 224 | 2/23/2025 |
0.8.0 | 2,743 | 4/3/2024 |
0.2.1 | 2,401 | 4/8/2023 |
0.2.0 | 237 | 4/8/2023 |
0.1.10 | 878 | 12/21/2021 |
0.1.9 | 492 | 12/19/2021 |
0.1.8 | 460 | 12/11/2021 |
0.1.7 | 694 | 12/11/2021 |
0.1.6 | 482 | 12/10/2021 |
0.1.5 | 440 | 12/10/2021 |
0.1.4 | 455 | 12/10/2021 |
0.1.3 | 457 | 12/9/2021 |
0.1.2 | 465 | 12/8/2021 |