PgOutput2Json.RabbitMqStreams 1.0.3

There is a newer version of this package available.
See the version list below for details.
dotnet add package PgOutput2Json.RabbitMqStreams --version 1.0.3
                    
NuGet\Install-Package PgOutput2Json.RabbitMqStreams -Version 1.0.3
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="PgOutput2Json.RabbitMqStreams" Version="1.0.3" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="PgOutput2Json.RabbitMqStreams" Version="1.0.3" />
                    
Directory.Packages.props
<PackageReference Include="PgOutput2Json.RabbitMqStreams" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add PgOutput2Json.RabbitMqStreams --version 1.0.3
                    
#r "nuget: PgOutput2Json.RabbitMqStreams, 1.0.3"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package PgOutput2Json.RabbitMqStreams@1.0.3
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=PgOutput2Json.RabbitMqStreams&version=1.0.3
                    
Install as a Cake Addin
#tool nuget:?package=PgOutput2Json.RabbitMqStreams&version=1.0.3
                    
Install as a Cake Tool

PgOutput2Json

PgOutput2Json is a .NET library that uses PostgreSQL logical replication to stream JSON-encoded change events from your database to a .NET application or a message broker like RabbitMQ, Kafka, or Redis.

πŸš€ Use Cases

Use PgOutput2Json when you need to react to DB changes in near real-time:

  • πŸ”„ Background tasks triggered by inserts/updates
  • ⚑ Low-latency application cache updates
  • πŸ“ Asynchronous logging (to file or another DB)
  • πŸ“Š Real-time data aggregation
  • πŸ› οΈ Lightweight ETL for DWH solutions

All with minimal latency β€” events are dispatched shortly after a transaction is committed, though large transactions may introduce additional delay due to processing and transmission overhead.

πŸ”Œ Supported Outputs

  • βœ… .NET application via a simple delegate handler
  • βœ… Kafka
  • βœ… RabbitMQ (Streams + Classic Queues)
  • βœ… Redis (Streams + Pub/Sub Channels)
  • βœ… SQLite (used by PgFreshCache)
  • βœ… MongoDB
  • βœ… Amazon Kinesis
  • βœ… Amazon DynamoDB
  • βœ… Azure Event Hubs

Plug-and-play adapters handle the heavy lifting β€” or handle messages directly in your app for maximum control.

🧠 How It Works

PostgreSQL 10+ ships with a built-in logical decoding plugin called pgoutput, which emits row-level changes (INSERT/UPDATE/DELETE) over a replication slot.

PgOutput2Json:

  1. Connects to a replication slot via pgoutput
  2. Converts change events into clean JSON
  3. Sends them to your application or a supported message broker

No extra plugins needed β€” pgoutput comes with PostgreSQL by default.

The change events JSON format:

{
  "c": "U",             // Change type: I (insert), U (update), D (delete)
  "w": 2485645760,      // WAL end offset
  "t": "schema.table",  // Table name (if enabled in JSON options)
  "k": { ... },         // Key values β€” included for deletes, and for updates if the key changed,
                        // or old row values, if the table uses REPLICA IDENTITY FULL
  "r": { ... }          // New row values (not present for deletes)
}

⚠️ Development Status

Still early days β€” the library is under active development, but it's fully usable for testing and early integration.

Quick Start

Configure postgresql.conf

To enable logical replication, add the following setting in your postgresql.conf:

wal_level = logical

Other necessary settings usually have appropriate default values for a basic setup.

Note: PostgreSQL must be restarted after modifying this setting.

Create a Replication User

Next, create a user in PostgreSQL that will be used for replication. Run the following SQL command:

CREATE USER pgoutput2json WITH  
    PASSWORD '_your_password_here_'  
    REPLICATION;

If you are connecting from a remote machine, don't forget to modify pg_hba.conf to allow non-local connections for this user. After modifying pg_hba.conf, apply the changes by either:

  • Sending a SIGHUP to the PostgreSQL process
  • Running pg_ctl reload
  • Or executing: SELECT pg_reload_conf();

Create a Publication

Now, connect to the target database and create a publication to specify which tables and actions should be replicated:

CREATE PUBLICATION my_publication  
    FOR TABLE my_table1, my_table2  
    WITH (publish = 'insert, update, delete');

In the example code below, we'll assume the database name is my_database.

Set Up RabbitMQ Streams

Before using RabbitMQ Streams, ensure that the RabbitMQ Streams plugin is enabled. The stream protocol must be active for the stream to work.

Create a stream in RabbitMQ, for example, a stream named my_stream. You can do this from the RabbitMQ management interface or through the appropriate RabbitMQ commands.

Create .NET Worker Service

Set up a new .NET Worker Service and add the following package reference:

dotnet add package PgOutput2Json.RabbitMqStreams

In your Worker.cs, add the following code to use RabbitMQ Streams:

using PgOutput2Json;
using PgOutput2Json.RabbitMqStreams;

public class Worker : BackgroundService  
{  
    private readonly ILoggerFactory _loggerFactory;  

    public Worker(ILoggerFactory loggerFactory)  
    {  
        _loggerFactory = loggerFactory;  
    }  

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)  
    {  
        // This code assumes PostgreSQL and RabbitMQ are on localhost  
        using var pgOutput2Json = PgOutput2JsonBuilder.Create()  
            .WithLoggerFactory(_loggerFactory)  
            .WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")  
            .WithPgPublications("my_publication")  
            .UseRabbitMqStreams(options =>  
            {  
                options.StreamName = "my_stream";  
                options.StreamSystemConfig.UserName = "guest";  
                options.StreamSystemConfig.Password = "guest";  
                options.StreamSystemConfig.VirtualHost = "/";  
                options.StreamSystemConfig.Endpoints =  
                [  
                    new IPEndPoint(IPAddress.Loopback, 5552)  
                ];  
            })  
            .Build();  

        await pgOutput2Json.StartAsync(stoppingToken);  
    }  
}

Once the stream is configured in RabbitMQ and the .NET service is running, changes to the PostgreSQL tables specified in my_publication will be pushed to the my_stream stream in RabbitMQ. No need to manage exchanges or queues β€” everything will flow directly through the stream.

Note: This example uses a temporary replication slot, meaning it won’t capture changes made while the worker was stopped.

Working with Permanent Replication Slots

If you want to capture changes made while the worker is stopped, you need a permanent replication slot.

In the context of logical replication, a slot represents a stream of changes that can be replayed to a client in the order they were made on the origin server. Each slot streams a sequence of changes from a single database.

To create a replication slot, call the pg_create_logical_replication_slot function in the same database that holds the tables being tracked:

SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');

The first parameter is the name of the slot, that must be unique across all databases in a PostgreSQL cluster. Make sure you specify pgoutput as the second parameter - that specifies the correct logical decoding plugin.

The current position of each slot is persisted only at checkpoint, so in the case of a crash, the slot may return to an earlier log sequence number (LSN), which will then cause recent changes to be sent again when the server restarts.

Note: It is the responsibility of the receiving client to avoid the ill effects from handling the same message more than once.

⚠️ Important: Replication slots persist across crashes and know nothing about the state of their consumer(s). They will prevent removal of required resources even when there is no connection using them. This consumes storage because neither required WAL nor required rows from the system catalogs can be removed by VACUUM as long as they are required by a replication slot.

In extreme cases, this could cause the database to shut down to prevent transaction ID wraparound.

So, if a slot is no longer required, it should be dropped. To drop a replication slot, use:

SELECT * FROM pg_drop_replication_slot('my_slot');

Once the replication slot is created, to use it, simply specify the name of the slot in the PgOutput2JsonBuilder:

// ...
using var pgOutput2Json = PgOutput2JsonBuilder.Create()
    .WithLoggerFactory(_loggerFactory)
    .WithPgConnectionString("server=localhost;database=my_database;username=pgoutput2json;password=_your_password_here_")
    .WithPgPublications("my_publication")
    .WithPgReplicationSlot("my_slot")      // <-- slot specified here
//...
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
1.2.0 9 8/17/2025
1.1.2 20 8/16/2025
1.1.1 92 8/11/2025
1.1.0 93 8/11/2025
1.0.6 87 8/10/2025
1.0.3 88 8/10/2025
1.0.2 91 8/10/2025
1.0.1 71 8/9/2025
1.0.0 167 8/5/2025
0.9.20 148 5/19/2025
0.9.17 147 5/18/2025
0.9.16 234 5/12/2025
0.9.15 156 5/5/2025
0.9.14 73 5/3/2025
0.9.13 154 5/1/2025
0.9.12 160 4/27/2025
0.9.11 103 4/25/2025
0.9.10 162 4/21/2025
0.9.9 159 4/21/2025
0.9.8 155 4/20/2025
0.9.7 141 4/18/2025
0.9.6 218 4/15/2025
0.9.5 187 4/13/2025