using System; #if NET20 || NET35 using System.Collections.Generic; #else using System.Collections.Concurrent; #endif using System.Threading; using System.Threading.Tasks; namespace MPF.Core.Data { public sealed class ProcessingQueue : IDisposable { /// /// Internal queue to hold data to process /// #if NET20 || NET35 private readonly Queue InternalQueue; #else private readonly ConcurrentQueue InternalQueue; #endif /// /// Custom processing step for dequeued data /// private readonly Action CustomProcessing; /// /// Cancellation method for the processing task /// private readonly CancellationTokenSource TokenSource; public ProcessingQueue(Action customProcessing) { #if NET20 || NET35 this.InternalQueue = new Queue(); #else this.InternalQueue = new ConcurrentQueue(); #endif this.CustomProcessing = customProcessing; this.TokenSource = new CancellationTokenSource(); #if NET20 || NET35 Task.Run(() => ProcessQueue()); #elif NET40 Task.Factory.StartNew(() => ProcessQueue()); #else Task.Run(() => ProcessQueue(), this.TokenSource.Token); #endif } /// /// Dispose the current instance /// public void Dispose() => this.TokenSource.Cancel(); /// /// Enqueue a new item for processing /// /// public void Enqueue(T? item) { // Only accept new data when not cancelled if (item != null && !this.TokenSource.IsCancellationRequested) this.InternalQueue.Enqueue(item); } /// /// Process /// private void ProcessQueue() { while (true) { // Nothing in the queue means we get to idle #if NET20 || NET35 if (InternalQueue.Count == 0) #else if (InternalQueue.IsEmpty) #endif { if (this.TokenSource.IsCancellationRequested) break; Thread.Sleep(10); continue; } #if NET20 || NET35 // Get the next item from the queue and invoke the lambda, if possible this.CustomProcessing?.Invoke(this.InternalQueue.Dequeue()); #else // Get the next item from the queue if (!this.InternalQueue.TryDequeue(out var nextItem)) continue; // Invoke the lambda, if possible this.CustomProcessing?.Invoke(nextItem); #endif } } } }