SerialPortRx 3.1.2

dotnet add package SerialPortRx --version 3.1.2
                    
NuGet\Install-Package SerialPortRx -Version 3.1.2
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SerialPortRx" Version="3.1.2" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="SerialPortRx" Version="3.1.2" />
                    
Directory.Packages.props
<PackageReference Include="SerialPortRx" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add SerialPortRx --version 3.1.2
                    
#r "nuget: SerialPortRx, 3.1.2"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package SerialPortRx@3.1.2
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=SerialPortRx&version=3.1.2
                    
Install as a Cake Addin
#tool nuget:?package=SerialPortRx&version=3.1.2
                    
Install as a Cake Tool

SerialPortRx

A Reactive Serial, TCP, and UDP I/O library that exposes incoming data as IObservable streams and accepts writes via simple methods. Ideal for event-driven, message-framed, and polling scenarios.

SerialPortRx CI-Build Nuget NuGet Stats

Features

  • SerialPortRx: Reactive wrapper for System.IO.Ports.SerialPort
  • UdpClientRx and TcpClientRx: Reactive wrappers exposing a common IPortRx interface
  • Observables:
    • DataReceived: IObservable<char> for serial text flow
    • Lines: IObservable<string> of complete lines split by NewLine
    • BytesReceived: IObservable<int> for byte stream emitted when using ReadAsync
    • IsOpenObservable: IObservable<bool> for connection state
    • ErrorReceived: IObservable<Exception> for errors
  • TCP/UDP batched reads:
    • TcpClientRx.DataReceivedBatches: IObservable<byte[]> chunks per read loop
    • UdpClientRx.DataReceivedBatches: IObservable<byte[]> per received datagram
  • Helpers:
    • PortNames(): reactive port enumeration with change notifications
    • BufferUntil(): message framing between start and end delimiters with timeout
    • WhileIsOpen(): periodic observable that fires only while a port is open
  • Cross-targeted: netstandard2.0, net8.0, net9.0, and Windows-specific TFMs

Installation

  • dotnet add package SerialPortRx

Supported target frameworks

  • netstandard2.0
  • net8.0, net9.0
  • net8.0-windows10.0.19041.0, net9.0-windows10.0.19041.0 (adds Windows-only APIs guarded by HasWindows)

Quick start (Serial)

using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using CP.IO.Ports;
using ReactiveMarbles.Extensions;

var disposables = new CompositeDisposable();
var port = new SerialPortRx("COM3", 115200) { ReadTimeout = -1, WriteTimeout = -1 };

// Observe line/state/errors
port.IsOpenObservable.Subscribe(isOpen => Console.WriteLine($"Open: {isOpen}")).DisposeWith(disposables);
port.ErrorReceived.Subscribe(ex => Console.WriteLine($"Error: {ex.Message}")).DisposeWith(disposables);

// Raw character stream
port.DataReceived.Subscribe(ch => Console.Write(ch)).DisposeWith(disposables);

await port.Open();
port.WriteLine("AT");

// Close when done
port.Close();
disposables.Dispose();

Discovering serial ports

// Emits the list of available port names whenever it changes
SerialPortRx.PortNames(pollInterval: 500)
    .Subscribe(names => Console.WriteLine(string.Join(", ", names)));

To auto-connect when a specific COM port appears:

var target = "COM3";
var comDisposables = new CompositeDisposable();

SerialPortRx.PortNames()
    .Do(names =>
    {
        if (comDisposables.Count == 0 && Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
        {
            var port = new SerialPortRx(target, 115200);
            port.DisposeWith(comDisposables);

            port.ErrorReceived.Subscribe(Console.WriteLine).DisposeWith(comDisposables);
            port.IsOpenObservable.Subscribe(open => Console.WriteLine($"{target}: {(open ? "Open" : "Closed")}"))
                .DisposeWith(comDisposables);

            port.Open();
        }
        else if (!Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
        {
            comDisposables.Dispose(); // auto-cleanup if device removed
        }
    })
    .ForEach()
    .Subscribe();

Message framing with BufferUntil

BufferUntil helps extract framed messages from the character stream between a start and end delimiter within a timeout.

// Example: messages start with '!' and end with '\n' and must complete within 100ms
var start = 0x21.AsObservable();  // '!'
var end   = 0x0a.AsObservable();  // '\n'

port.DataReceived
    .BufferUntil(start, end, timeOut: 100)
    .Subscribe(msg => Console.WriteLine($"MSG: {msg}"));

A variant returns a default message on timeout:

port.DataReceived
    .BufferUntil(start, end, defaultValue: Observable.Return("<timeout>"), timeOut: 100)
    .Subscribe(msg => Console.WriteLine($"MSG: {msg}"));

Periodic work while the port is open

// Write a heartbeat every 500ms but only while the port remains open
port.WhileIsOpen(TimeSpan.FromMilliseconds(500))
    .Subscribe(_ => port.Write("PING\n"));

Reading raw bytes with ReadAsync

Use ReadAsync for binary protocols or fixed-length reads. Each byte successfully read is also pushed to BytesReceived.

var buffer = new byte[64];
int read = await port.ReadAsync(buffer, 0, buffer.Length);
Console.WriteLine($"Read {read} bytes");

port.BytesReceived.Subscribe(b => Console.WriteLine($"Byte: {b:X2}"));

Notes:

  • DataReceived is a char stream produced from SerialPort.ReadExisting().
  • BytesReceived emits bytes read by your ReadAsync calls (not from ReadExisting()).
  • Concurrent ReadAsync calls are serialized internally for safety.

Reading lines

Use ReadLineAsync to await a single complete line split by the configured NewLine. Supports single- and multi-character newline sequences and respects ReadTimeout (> 0).

port.NewLine = "\r\n"; // optional: default is "\n"
var line = await port.ReadLineAsync();
Console.WriteLine($"Line: {line}");

You can also pass a CancellationToken:

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var line = await port.ReadLineAsync(cts.Token);

Line streaming with Lines

Subscribe to Lines to get a continuous stream of complete lines:

port.NewLine = "\n";
port.Lines.Subscribe(line => Console.WriteLine($"LINE: {line}"));

Writing

  • port.Write(string text)
  • port.WriteLine(string text)
  • port.Write(byte[] buffer)
  • port.Write(byte[] buffer, int offset, int count)
  • port.Write(char[] buffer)
  • port.Write(char[] buffer, int offset, int count)

Error handling and state

  • Subscribe to port.ErrorReceived for exceptions and serial errors.
  • Subscribe to port.IsOpenObservable to react to open/close transitions.
  • Call port.Close() or dispose subscriptions (DisposeWith) to release the port.

TCP/UDP variants

The TcpClientRx and UdpClientRx classes implement the same IPortRx interface for a similar reactive experience with sockets.

TCP example:

var tcp = new TcpClientRx("example.com", 80);
await tcp.Open();
var req = System.Text.Encoding.ASCII.GetBytes("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n");
tcp.Write(req, 0, req.Length);
var buf = new byte[1024];
var n = await tcp.ReadAsync(buf, 0, buf.Length);
Console.WriteLine(System.Text.Encoding.ASCII.GetString(buf, 0, n));

UDP example:

var udp = new UdpClientRx(12345);
await udp.Open();
var buf = new byte[16];
var n = await udp.ReadAsync(buf, 0, buf.Length);
Console.WriteLine($"UDP read {n} bytes");

Batched receive (TCP/UDP)

Subscribe to batched byte arrays for throughput-sensitive pipelines:

// TCP batched chunks per read loop
new TcpClientRx("example.com", 80).DataReceivedBatches
    .Subscribe(chunk => Console.WriteLine($"TCP chunk size: {chunk.Length}"));

// UDP per-datagram batches
new UdpClientRx(12345).DataReceivedBatches
    .Subscribe(datagram => Console.WriteLine($"UDP datagram size: {datagram.Length}"));

Threading and scheduling

  • The DataReceived and other streams run on the underlying event threads. Use ObserveOn to marshal to a UI or a dedicated scheduler when needed.
  • ReadAsync uses a lightweight lock and offloads blocking reads, avoiding CPU spin.

Tips and best practices

  • Subscribe before calling Open() to ensure you don�t miss events.
  • Tune Encoding (default ASCII), BaudRate, Parity, StopBits, and Handshake to match your device.
  • Use BufferUntil for delimited protocols. For binary protocols, use ReadAsync with fixed sizes.
  • Use Lines when dealing with text protocols; use ReadLineAsync when you need a one-shot line.
  • Always dispose subscriptions (DisposeWith) and call Close() when done.

Example program (complete)

using System;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using CP.IO.Ports;
using ReactiveMarbles.Extensions;

internal static class Program
{
    private static async System.Threading.Tasks.Task Main()
    {
        const string comPortName = "COM1";
        const string dataToWrite = "DataToWrite";
        var dis = new CompositeDisposable();

        var startChar = 0x21.AsObservable(); // '!'
        var endChar = 0x0a.AsObservable();   // '\n'

        var comdis = new CompositeDisposable();

        SerialPortRx.PortNames().Do(names =>
        {
            if (comdis.Count == 0 && names.Contains(comPortName))
            {
                var port = new SerialPortRx(comPortName, 9600);
                port.DisposeWith(comdis);

                port.ErrorReceived.Subscribe(Console.WriteLine).DisposeWith(comdis);
                port.IsOpenObservable.Subscribe(open => Console.WriteLine($"{comPortName} {(open ? "Open" : "Closed")}"))
                    .DisposeWith(comdis);

                port.DataReceived
                    .BufferUntil(startChar, endChar, 100)
                    .Subscribe(data => Console.WriteLine($"Data: {data}"))
                    .DisposeWith(comdis);

                port.WhileIsOpen(TimeSpan.FromMilliseconds(500))
                    .Subscribe(_ => port.Write(dataToWrite))
                    .DisposeWith(comdis);

                port.Open().Wait();
            }
            else if (!names.Contains(comPortName))
            {
                comdis.Dispose();
                Console.WriteLine($"Port {comPortName} Disposed");
            }
        }).ForEach().Subscribe(Console.WriteLine).DisposeWith(dis);

        Console.ReadLine();
        comdis.Dispose();
        dis.Dispose();
    }
}

License

MIT. See LICENSE.

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net8.0-windows10.0.19041 is compatible.  net9.0 is compatible.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net9.0-windows10.0.19041 is compatible.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (2)

Showing the top 2 NuGet packages that depend on SerialPortRx:

Package Downloads
ModbusRx

An Reactive version of NModbus4

MQTTnet.Rx.SerialPort

Reactive extensions for MQTTnet Broker

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
3.1.2 149 8/18/2025
3.1.1 124 8/17/2025
3.0.3 1,994 11/24/2023
3.0.2 116 11/11/2023
2.3.4 929 2/18/2023
2.3.2 431 1/10/2023
2.3.1 355 1/10/2023
2.2.6 6,751 7/22/2022
2.1.4 509 7/21/2022
2.1.3 501 7/21/2022
2.1.2 516 7/20/2022
2.1.1 764 12/13/2021
1.4.4 828 5/1/2021
1.4.3 975 2/24/2020
1.4.2 734 1/19/2020
1.4.1 703 12/29/2019
1.4.0 724 12/29/2019
1.3.1 1,115 1/8/2019
1.3.0 995 10/17/2018
1.2.0 959 9/19/2018
1.1.0 1,080 8/15/2018
1.0.1 1,366 6/27/2018
1.0.0 1,356 5/28/2018
0.0.0.5 1,365 2/15/2018
0.0.0.4 1,432 9/29/2016
0.0.0.3 1,215 9/27/2016
0.0.0.2 1,180 9/26/2016
0.0.0.1 1,145 9/25/2016

Compatability with Net 6/ 7/ 8 and netstandard2.0