DiskQueue 1.5.0

There is a newer version of this package available.
See the version list below for details.
dotnet add package DiskQueue --version 1.5.0                
NuGet\Install-Package DiskQueue -Version 1.5.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="DiskQueue" Version="1.5.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add DiskQueue --version 1.5.0                
#r "nuget: DiskQueue, 1.5.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install DiskQueue as a Cake Addin
#addin nuget:?package=DiskQueue&version=1.5.0

// Install DiskQueue as a Cake Tool
#tool nuget:?package=DiskQueue&version=1.5.0                

DiskQueue

A thread-safe, multi-process(ish) persistent queue, based very heavily on http://ayende.com/blog/3479/rhino-queues-storage-disk .

Requirements and Environment

Works on .Net 4+ and Mono 2.10.8+ (3.0.6+ recommended)

Requires access to filesystem storage

Basic Usage

  • PersistentQueue.WaitFor(...) is the main entry point. This will attempt to gain an exclusive lock on the given storage location. On first use, a directory will be created with the required files inside it.
  • This queue object can be shared among threads. Each thread should call OpenSession() to get its own session object.
  • Both IPersistentQueues and IPersistentQueueSessions should be wrapped in using() clauses, or otherwise disposed of properly.

Example

Queue on one thread, consume on another; retry some exceptions.

Note this is one queue being shared between two sessions. You should not open two queue instances for one storage location at once.

IPersistentQueue queue = new PersistentQueue("queue_a");
var t1 = new Thread(() =>
{
	while (HaveWork())
	{
		using (var session = queue.OpenSession())
		{
			session.Enqueue(NextWorkItem());
			session.Flush();
		}
	}
});
var t2 = new Thread(()=> {
	while (true) {
		using (var session = queue.OpenSession()) {
			var data = session.Dequeue();
			if (data == null) {Thread.Sleep(100); continue;}
			
			try {
				MaybeDoWork(data)
				session.Flush();
			} catch (RetryException) {
				continue;
			} catch {
				session.Flush();
			}
		}
	}
});

t1.Start();
t2.Start();

Example

Batch up a load of work and have another thread work through it.

IPersistentQueue queue = new PersistentQueue("batchQueue");
var worker = new Thread(()=> {
	using (var session = queue.OpenSession()) {
		byte[] data;
		while ((data = session.Dequeue()) != null) {
			MaybeDoWork(data)
			session.Flush();
		}
	}
});

using (var session = queue.OpenSession()) {
	foreach (var item in LoadsOfStuff()) {
		session.Enqueue(item);
	}
	session.Flush();
}

worker.IsBackground = true; // anything not complete when we close will be left on the queue for next time.
worker.Start();

Transactions

Each session is a transaction. Any Enqueues or Dequeues will be rolled back when the session is disposed unless you call session.Flush(). Data will only be visible between threads once it has been flushed. Each flush incurs a performance penalty. By default, each flush is persisted to disk before continuing. You can get more speed at a safety cost by setting queue.ParanoidFlushing = false;

Data loss and transaction truncation

By default, DiskQueue will silently discard transaction blocks that have been truncated; it will throw an InvalidOperationException when transaction block markers are overwritten (this happens if more than one process is using the queue by mistake. It can also happen with some kinds of disk corruption). If you construct your queue with throwOnConflict: false, all recoverable transaction errors will be silently truncated. This should only be used when uptime is more important than data consistency.

using (var queue = new PersistentQueue(path, Constants._32Megabytes, throwOnConflict: false)) {
    . . .
}

Global default settings

Each instance of a PersistentQueue has it's own settings for flush levels and corruption behaviour. You can set these individually after creating an instance, or globally with PersistentQueue.DefaultSettings. Default settings are applied to all queue instances in the same process created after the setting is changed.

For example, if performance is more important than crash safety:

PersistentQueue.DefaultSettings.ParanoidFlushing = false;
PersistentQueue.DefaultSettings.TrimTransactionLogOnDispose = false;

Or if up-time is more important than detecting corruption early (often the case for embedded systems):

PersistentQueue.DefaultSettings.AllowTruncatedEntries = true;
PersistentQueue.DefaultSettings.ParanoidFlushing = true;

Removing or resetting queues

Queues create a directory and set of files for storage. You can remove all files for a queue with the HardDelete method. If you give true as the reset parameter, the directory will be written again.

This WILL delete ANY AND ALL files inside the queue directory. You should not call this method in normal use. If you start a queue with the same path as an existing directory, this method will delete the entire directory, not just the queue files.

var subject = new PersistentQueue("queue_a");
subject.HardDelete(true); // wipe any existing data and start again

Multi-Process Usage

Each IPersistentQueue gives exclusive access to the storage until it is disposed. There is a static helper method PersistentQueue.WaitFor("path", TimeSpan...) which will wait to gain access until other processes release the lock or the timeout expires. If each process uses the lock for a short time and wait long enough, they can share a storage location.

E.g.

...
void AddToQueue(byte[] data) {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		session.Enqueue(data);
		session.Flush();
	}
}

byte[] ReadQueue() {
	Thread.Sleep(150);
	using (var queue = PersistentQueue.WaitFor(SharedStorage, TimeSpan.FromSeconds(30)))
	using (var session = queue.OpenSession()) {
		var data = session.Dequeue();
		session.Flush();
		return data;
	}
}
...

If you need the transaction semantics of sessions across multiple processes, try a more robust solution like https://github.com/i-e-b/SevenDigital.Messaging

Product Compatible and additional computed target framework versions.
.NET net5.0 was computed.  net5.0-windows was computed.  net6.0 was computed.  net6.0-android was computed.  net6.0-ios was computed.  net6.0-maccatalyst was computed.  net6.0-macos was computed.  net6.0-tvos was computed.  net6.0-windows was computed.  net7.0 was computed.  net7.0-android was computed.  net7.0-ios was computed.  net7.0-maccatalyst was computed.  net7.0-macos was computed.  net7.0-tvos was computed.  net7.0-windows was computed.  net8.0 was computed.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed. 
.NET Core netcoreapp2.0 was computed.  netcoreapp2.1 was computed.  netcoreapp2.2 was computed.  netcoreapp3.0 was computed.  netcoreapp3.1 was computed. 
.NET Standard netstandard2.0 is compatible.  netstandard2.1 was computed. 
.NET Framework net461 was computed.  net462 was computed.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
MonoAndroid monoandroid was computed. 
MonoMac monomac was computed. 
MonoTouch monotouch was computed. 
Tizen tizen40 was computed.  tizen60 was computed. 
Xamarin.iOS xamarinios was computed. 
Xamarin.Mac xamarinmac was computed. 
Xamarin.TVOS xamarintvos was computed. 
Xamarin.WatchOS xamarinwatchos was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

This package has no dependencies.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
1.7.1 37,627 12/13/2023
1.7.0 11,989 8/24/2023
1.6.8 8,608 4/18/2023
1.6.7 1,913 3/15/2023
1.6.6 246 3/14/2023
1.6.5 8,596 1/13/2023
1.6.4 430 12/19/2022
1.6.3 386 12/7/2022
1.6.2 371 12/5/2022
1.6.1 330 12/5/2022
1.6.0 2,900 7/22/2022
1.5.0 7,305 4/6/2022
1.4.0 2,641 3/16/2022
1.3.2 10,691 7/16/2021
1.3.1 7,079 2/10/2021
1.3.0 2,875 9/14/2020
1.2.0 39,338 8/6/2018
1.1.0 3,498 2/22/2018
1.0.0 21,446 9/3/2015

Rewritten file access layer with multiple retry and timeout points