SakontStack.ReactiveStream 1.2.1

dotnet add package SakontStack.ReactiveStream --version 1.2.1
NuGet\Install-Package SakontStack.ReactiveStream -Version 1.2.1
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="SakontStack.ReactiveStream" Version="1.2.1" />
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add SakontStack.ReactiveStream --version 1.2.1
#r "nuget: SakontStack.ReactiveStream, 1.2.1"
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install SakontStack.ReactiveStream as a Cake Addin
#addin nuget:?package=SakontStack.ReactiveStream&version=1.2.1

// Install SakontStack.ReactiveStream as a Cake Tool
#tool nuget:?package=SakontStack.ReactiveStream&version=1.2.1

Reactive Stream

a Stream wrapper that provides Read/Write progress reporting through IProgress<StreamProgress> and IObservable<StreamProgress> with speed throttling for Read/Write streams separately.

Version Downloads
Latest version Downloads

Example Usage:

Report file download progress:

  • progress using IProgress<StreamProgress>
using System.Reactive.Linq;
using SakontStack.ReactiveStream;
using ByteSizeLib;

var fileLink = new Uri(@"https://releases.ubuntu.com/jammy/ubuntu-22.04.3-desktop-amd64.iso");

var client   = new HttpClient();
var response = await client.GetAsync(fileLink, HttpCompletionOption.ResponseHeadersRead);
var length   = long.Parse(response.Content.Headers
                                  .First(h => h.Key.Equals("Content-Length"))
                                  .Value
                                  .First());

var progress = new Progress<ReactiveStream.StreamProgress>(p => Console
                                                              .WriteLine($"Downloaded {ByteSize.FromBytes(p.Bytes).ToBinaryString()}" +
                                                                         $"/{ByteSize.FromBytes(p.TotalBytes ?? 0).ToBinaryString()} " +
                                                                         $"({p.Percentage:N2}%) " +
                                                                         $"{ByteSize.FromBytes(p.BytesPerSecond).ToBinaryString()}/sec"));

var stream = new ReactiveStream(new MemoryStream(), progress: progress,  totalLength: length);
// stream progress will only start reporting progress after you subscribe to it
stream.Subscribe();
await response.Content.CopyToAsync(stream);

NOTE: setting custom report interval while using IProgress<StreamProgress> alone, requires using stream.Sample() operator.

  • progress using IObservable<StreamProgress>
using System.Reactive.Linq;
using SakontStack.ReactiveStream;
using ByteSizeLib;

var fileLink = new Uri(@"https://releases.ubuntu.com/jammy/ubuntu-22.04.3-desktop-amd64.iso");

var client   = new HttpClient();
var response = await client.GetAsync(fileLink, HttpCompletionOption.ResponseHeadersRead);
var length   = long.Parse(response.Content.Headers
                                  .First(h => h.Key.Equals("Content-Length"))
                                  .Value
                                  .First());

var stream = new ReactiveStream(new MemoryStream(), totalLength: length);
stream
  .Sample(TimeSpan.FromSeconds(0.25)) // Only report progress each 0.25 seconds
  .Do(p => Console
            .WriteLine($"Downloaded {ByteSize.FromBytes(p.Bytes).ToBinaryString()}"+
                       $"/{ByteSize.FromBytes(p.TotalBytes ?? 0).ToBinaryString()}"+
                       $"({p.Percentage:N2}%) "+
                       $"{ByteSize.FromBytes(p.BytesPerSecond).ToBinaryString()}/sec"))
  .Subscribe();
await response.Content.CopyToAsync(stream);
  • setting download speed limits:
using System.Reactive.Linq;
using SakontStack.ReactiveStream;
using ByteSizeLib;

var fileLink = new Uri(@"https://releases.ubuntu.com/jammy/ubuntu-22.04.3-desktop-amd64.iso");

var client   = new HttpClient();
var response = await client.GetAsync(fileLink, HttpCompletionOption.ResponseHeadersRead);
var length   = long.Parse(response.Content.Headers
                                  .First(h => h.Key.Equals("Content-Length"))
                                  .Value
                                  .First());

var stream = new ReactiveStream(new MemoryStream(), totalLength: length,
                                configureStream: s =>
                                                 {
                                                     // 1 MB/sec speed limit for write streams;
                                                     s.WriteSpeedLimit = 1024 * 1024;
                                                 });
stream
  .Sample(TimeSpan.FromSeconds(0.25)) // Only report progress each 0.25 seconds
  .Do(p => Console
            .WriteLine($"Downloaded {ByteSize.FromBytes(p.Bytes).ToBinaryString()}"+
                       $"/{ByteSize.FromBytes(p.TotalBytes ?? 0).ToBinaryString()}"+
                       $"({p.Percentage:N2}%) "+
                       $"{ByteSize.FromBytes(p.BytesPerSecond).ToBinaryString()}/sec"))
  .Subscribe();
await response.Content.CopyToAsync(stream);
  • changing speed limits in realtime:
stream.ModifyOptions(x => x.WriteSpeedLimit = null);

ByteSizeLib library is not a dependency, and only used in this example to provide sample codes that will print progress with better formatted byte sizes

example output:

...
Downloaded 4.7 MiB/4.69 GiB (0.10%) 680 KiB/sec
Downloaded 5.03 MiB/4.69 GiB (0.10%) 1016 KiB/sec
Downloaded 5.04 MiB/4.69 GiB (0.10%) 1 MiB/sec
Downloaded 5.36 MiB/4.69 GiB (0.11%) 328 KiB/sec
Downloaded 5.7 MiB/4.69 GiB (0.12%) 680 KiB/sec

API:

the ReactiveStream class implements both Stream & IObservable<ReactiveStream.StreamProgress>

public class ReactiveStream : Stream, IObservable<StreamProgress>
{
    //* ... *//
}
Product Compatible and additional computed target framework versions.
.NET net6.0 is compatible.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (1)

Showing the top 1 NuGet packages that depend on SakontStack.ReactiveStream:

Package Downloads
QwikHosting.Deno

Host Qwik app using Deno and ASP.NET

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.2.1 134 8/19/2023
1.2.0 116 8/19/2023
1.0.1 116 8/19/2023
1.0.0 138 8/19/2023