SerialPortRx 3.1.2
dotnet add package SerialPortRx --version 3.1.2
NuGet\Install-Package SerialPortRx -Version 3.1.2
<PackageReference Include="SerialPortRx" Version="3.1.2" />
<PackageVersion Include="SerialPortRx" Version="3.1.2" />
<PackageReference Include="SerialPortRx" />
paket add SerialPortRx --version 3.1.2
#r "nuget: SerialPortRx, 3.1.2"
#:package SerialPortRx@3.1.2
#addin nuget:?package=SerialPortRx&version=3.1.2
#tool nuget:?package=SerialPortRx&version=3.1.2
SerialPortRx
A Reactive Serial, TCP, and UDP I/O library that exposes incoming data as IObservable streams and accepts writes via simple methods. Ideal for event-driven, message-framed, and polling scenarios.
Features
- SerialPortRx: Reactive wrapper for System.IO.Ports.SerialPort
- UdpClientRx and TcpClientRx: Reactive wrappers exposing a common IPortRx interface
- Observables:
- DataReceived: IObservable<char> for serial text flow
- Lines: IObservable<string> of complete lines split by NewLine
- BytesReceived: IObservable<int> for byte stream emitted when using ReadAsync
- IsOpenObservable: IObservable<bool> for connection state
- ErrorReceived: IObservable<Exception> for errors
- TCP/UDP batched reads:
- TcpClientRx.DataReceivedBatches: IObservable<byte[]> chunks per read loop
- UdpClientRx.DataReceivedBatches: IObservable<byte[]> per received datagram
- Helpers:
- PortNames(): reactive port enumeration with change notifications
- BufferUntil(): message framing between start and end delimiters with timeout
- WhileIsOpen(): periodic observable that fires only while a port is open
- Cross-targeted: netstandard2.0, net8.0, net9.0, and Windows-specific TFMs
Installation
- dotnet add package SerialPortRx
Supported target frameworks
- netstandard2.0
- net8.0, net9.0
- net8.0-windows10.0.19041.0, net9.0-windows10.0.19041.0 (adds Windows-only APIs guarded by HasWindows)
Quick start (Serial)
using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using CP.IO.Ports;
using ReactiveMarbles.Extensions;
var disposables = new CompositeDisposable();
var port = new SerialPortRx("COM3", 115200) { ReadTimeout = -1, WriteTimeout = -1 };
// Observe line/state/errors
port.IsOpenObservable.Subscribe(isOpen => Console.WriteLine($"Open: {isOpen}")).DisposeWith(disposables);
port.ErrorReceived.Subscribe(ex => Console.WriteLine($"Error: {ex.Message}")).DisposeWith(disposables);
// Raw character stream
port.DataReceived.Subscribe(ch => Console.Write(ch)).DisposeWith(disposables);
await port.Open();
port.WriteLine("AT");
// Close when done
port.Close();
disposables.Dispose();
Discovering serial ports
// Emits the list of available port names whenever it changes
SerialPortRx.PortNames(pollInterval: 500)
.Subscribe(names => Console.WriteLine(string.Join(", ", names)));
To auto-connect when a specific COM port appears:
var target = "COM3";
var comDisposables = new CompositeDisposable();
SerialPortRx.PortNames()
.Do(names =>
{
if (comDisposables.Count == 0 && Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
{
var port = new SerialPortRx(target, 115200);
port.DisposeWith(comDisposables);
port.ErrorReceived.Subscribe(Console.WriteLine).DisposeWith(comDisposables);
port.IsOpenObservable.Subscribe(open => Console.WriteLine($"{target}: {(open ? "Open" : "Closed")}"))
.DisposeWith(comDisposables);
port.Open();
}
else if (!Array.Exists(names, n => string.Equals(n, target, StringComparison.OrdinalIgnoreCase)))
{
comDisposables.Dispose(); // auto-cleanup if device removed
}
})
.ForEach()
.Subscribe();
Message framing with BufferUntil
BufferUntil helps extract framed messages from the character stream between a start and end delimiter within a timeout.
// Example: messages start with '!' and end with '\n' and must complete within 100ms
var start = 0x21.AsObservable(); // '!'
var end = 0x0a.AsObservable(); // '\n'
port.DataReceived
.BufferUntil(start, end, timeOut: 100)
.Subscribe(msg => Console.WriteLine($"MSG: {msg}"));
A variant returns a default message on timeout:
port.DataReceived
.BufferUntil(start, end, defaultValue: Observable.Return("<timeout>"), timeOut: 100)
.Subscribe(msg => Console.WriteLine($"MSG: {msg}"));
Periodic work while the port is open
// Write a heartbeat every 500ms but only while the port remains open
port.WhileIsOpen(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => port.Write("PING\n"));
Reading raw bytes with ReadAsync
Use ReadAsync for binary protocols or fixed-length reads. Each byte successfully read is also pushed to BytesReceived.
var buffer = new byte[64];
int read = await port.ReadAsync(buffer, 0, buffer.Length);
Console.WriteLine($"Read {read} bytes");
port.BytesReceived.Subscribe(b => Console.WriteLine($"Byte: {b:X2}"));
Notes:
- DataReceived is a char stream produced from SerialPort.ReadExisting().
- BytesReceived emits bytes read by your ReadAsync calls (not from ReadExisting()).
- Concurrent ReadAsync calls are serialized internally for safety.
Reading lines
Use ReadLineAsync to await a single complete line split by the configured NewLine. Supports single- and multi-character newline sequences and respects ReadTimeout (> 0).
port.NewLine = "\r\n"; // optional: default is "\n"
var line = await port.ReadLineAsync();
Console.WriteLine($"Line: {line}");
You can also pass a CancellationToken:
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var line = await port.ReadLineAsync(cts.Token);
Line streaming with Lines
Subscribe to Lines to get a continuous stream of complete lines:
port.NewLine = "\n";
port.Lines.Subscribe(line => Console.WriteLine($"LINE: {line}"));
Writing
- port.Write(string text)
- port.WriteLine(string text)
- port.Write(byte[] buffer)
- port.Write(byte[] buffer, int offset, int count)
- port.Write(char[] buffer)
- port.Write(char[] buffer, int offset, int count)
Error handling and state
- Subscribe to port.ErrorReceived for exceptions and serial errors.
- Subscribe to port.IsOpenObservable to react to open/close transitions.
- Call port.Close() or dispose subscriptions (DisposeWith) to release the port.
TCP/UDP variants
The TcpClientRx and UdpClientRx classes implement the same IPortRx interface for a similar reactive experience with sockets.
TCP example:
var tcp = new TcpClientRx("example.com", 80);
await tcp.Open();
var req = System.Text.Encoding.ASCII.GetBytes("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n");
tcp.Write(req, 0, req.Length);
var buf = new byte[1024];
var n = await tcp.ReadAsync(buf, 0, buf.Length);
Console.WriteLine(System.Text.Encoding.ASCII.GetString(buf, 0, n));
UDP example:
var udp = new UdpClientRx(12345);
await udp.Open();
var buf = new byte[16];
var n = await udp.ReadAsync(buf, 0, buf.Length);
Console.WriteLine($"UDP read {n} bytes");
Batched receive (TCP/UDP)
Subscribe to batched byte arrays for throughput-sensitive pipelines:
// TCP batched chunks per read loop
new TcpClientRx("example.com", 80).DataReceivedBatches
.Subscribe(chunk => Console.WriteLine($"TCP chunk size: {chunk.Length}"));
// UDP per-datagram batches
new UdpClientRx(12345).DataReceivedBatches
.Subscribe(datagram => Console.WriteLine($"UDP datagram size: {datagram.Length}"));
Threading and scheduling
- The DataReceived and other streams run on the underlying event threads. Use ObserveOn to marshal to a UI or a dedicated scheduler when needed.
- ReadAsync uses a lightweight lock and offloads blocking reads, avoiding CPU spin.
Tips and best practices
- Subscribe before calling Open() to ensure you don�t miss events.
- Tune Encoding (default ASCII), BaudRate, Parity, StopBits, and Handshake to match your device.
- Use BufferUntil for delimited protocols. For binary protocols, use ReadAsync with fixed sizes.
- Use Lines when dealing with text protocols; use ReadLineAsync when you need a one-shot line.
- Always dispose subscriptions (DisposeWith) and call Close() when done.
Example program (complete)
using System;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using CP.IO.Ports;
using ReactiveMarbles.Extensions;
internal static class Program
{
private static async System.Threading.Tasks.Task Main()
{
const string comPortName = "COM1";
const string dataToWrite = "DataToWrite";
var dis = new CompositeDisposable();
var startChar = 0x21.AsObservable(); // '!'
var endChar = 0x0a.AsObservable(); // '\n'
var comdis = new CompositeDisposable();
SerialPortRx.PortNames().Do(names =>
{
if (comdis.Count == 0 && names.Contains(comPortName))
{
var port = new SerialPortRx(comPortName, 9600);
port.DisposeWith(comdis);
port.ErrorReceived.Subscribe(Console.WriteLine).DisposeWith(comdis);
port.IsOpenObservable.Subscribe(open => Console.WriteLine($"{comPortName} {(open ? "Open" : "Closed")}"))
.DisposeWith(comdis);
port.DataReceived
.BufferUntil(startChar, endChar, 100)
.Subscribe(data => Console.WriteLine($"Data: {data}"))
.DisposeWith(comdis);
port.WhileIsOpen(TimeSpan.FromMilliseconds(500))
.Subscribe(_ => port.Write(dataToWrite))
.DisposeWith(comdis);
port.Open().Wait();
}
else if (!names.Contains(comPortName))
{
comdis.Dispose();
Console.WriteLine($"Port {comPortName} Disposed");
}
}).ForEach().Subscribe(Console.WriteLine).DisposeWith(dis);
Console.ReadLine();
comdis.Dispose();
dis.Dispose();
}
}
License
MIT. See LICENSE.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net8.0-windows10.0.19041 is compatible. net9.0 is compatible. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net9.0-windows10.0.19041 is compatible. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- ReactiveMarbles.Extensions (>= 1.1.13)
- System.IO.Ports (>= 9.0.8)
- System.Reactive (>= 6.0.1)
-
net8.0
- ReactiveMarbles.Extensions (>= 1.1.13)
- System.IO.Ports (>= 9.0.8)
- System.Reactive (>= 6.0.1)
-
net8.0-windows10.0.19041
- ReactiveMarbles.Extensions (>= 1.1.13)
- System.IO.Ports (>= 9.0.8)
- System.Reactive (>= 6.0.1)
-
net9.0
- ReactiveMarbles.Extensions (>= 1.1.13)
- System.IO.Ports (>= 9.0.8)
- System.Reactive (>= 6.0.1)
-
net9.0-windows10.0.19041
- ReactiveMarbles.Extensions (>= 1.1.13)
- System.IO.Ports (>= 9.0.8)
- System.Reactive (>= 6.0.1)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on SerialPortRx:
Package | Downloads |
---|---|
ModbusRx
An Reactive version of NModbus4 |
|
MQTTnet.Rx.SerialPort
Reactive extensions for MQTTnet Broker |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last Updated |
---|---|---|
3.1.2 | 149 | 8/18/2025 |
3.1.1 | 124 | 8/17/2025 |
3.0.3 | 1,994 | 11/24/2023 |
3.0.2 | 116 | 11/11/2023 |
2.3.4 | 929 | 2/18/2023 |
2.3.2 | 431 | 1/10/2023 |
2.3.1 | 355 | 1/10/2023 |
2.2.6 | 6,751 | 7/22/2022 |
2.1.4 | 509 | 7/21/2022 |
2.1.3 | 501 | 7/21/2022 |
2.1.2 | 516 | 7/20/2022 |
2.1.1 | 764 | 12/13/2021 |
1.4.4 | 828 | 5/1/2021 |
1.4.3 | 975 | 2/24/2020 |
1.4.2 | 734 | 1/19/2020 |
1.4.1 | 703 | 12/29/2019 |
1.4.0 | 724 | 12/29/2019 |
1.3.1 | 1,115 | 1/8/2019 |
1.3.0 | 995 | 10/17/2018 |
1.2.0 | 959 | 9/19/2018 |
1.1.0 | 1,080 | 8/15/2018 |
1.0.1 | 1,366 | 6/27/2018 |
1.0.0 | 1,356 | 5/28/2018 |
0.0.0.5 | 1,365 | 2/15/2018 |
0.0.0.4 | 1,432 | 9/29/2016 |
0.0.0.3 | 1,215 | 9/27/2016 |
0.0.0.2 | 1,180 | 9/26/2016 |
0.0.0.1 | 1,145 | 9/25/2016 |
Compatability with Net 6/ 7/ 8 and netstandard2.0