using System; #if NET20 || NET35 using System.Collections.Generic; #else using System.Collections.Concurrent; #endif using System.Threading; using System.Threading.Tasks; namespace MPF.Frontend { public sealed class ProcessingQueue : IDisposable { /// /// Internal queue to hold data to process /// #if NET20 || NET35 private readonly Queue _internalQueue; #else private readonly ConcurrentQueue _internalQueue; #endif /// /// Custom processing step for dequeued data /// private readonly Action _customProcessing; /// /// Cancellation method for the processing task /// private readonly CancellationTokenSource _tokenSource; public ProcessingQueue(Action customProcessing) { _internalQueue = []; _customProcessing = customProcessing; _tokenSource = new CancellationTokenSource(); #if NET20 || NET35 Task.Run(() => ProcessQueue()); #elif NET40 Task.Factory.StartNew(() => ProcessQueue()); #else Task.Run(() => ProcessQueue(), _tokenSource.Token); #endif } /// /// Dispose the current instance /// public void Dispose() => _tokenSource.Cancel(); /// /// Enqueue a new item for processing /// /// public void Enqueue(T? item) { // Only accept new data when not cancelled if (item != null && !_tokenSource.IsCancellationRequested) _internalQueue.Enqueue(item); } /// /// Process /// private void ProcessQueue() { while (true) { // Nothing in the queue means we get to idle if (_internalQueue.Count == 0) { if (_tokenSource.IsCancellationRequested) break; Thread.Sleep(1); continue; } #if NET20 || NET35 // Get the next item from the queue and invoke the lambda, if possible _customProcessing?.Invoke(_internalQueue.Dequeue()); #else // Get the next item from the queue if (!_internalQueue.TryDequeue(out var nextItem)) continue; // Invoke the lambda, if possible _customProcessing?.Invoke(nextItem); #endif } } } }