using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace MPF.Core.Data { public class ProcessingQueue : IDisposable { /// /// Internal queue to hold data to process /// private readonly ConcurrentQueue InternalQueue; /// /// Custom processing step for dequeued data /// private readonly Action CustomProcessing; /// /// Cancellation method for the processing task /// private readonly CancellationTokenSource TokenSource; public ProcessingQueue(Action customProcessing) { this.InternalQueue = new ConcurrentQueue(); this.CustomProcessing = customProcessing; this.TokenSource = new CancellationTokenSource(); Task.Run(() => ProcessQueue(), this.TokenSource.Token); } /// /// Dispose the current instance /// public void Dispose() => this.TokenSource.Cancel(); /// /// Enqueue a new item for processing /// /// public void Enqueue(T item) { // Only accept new data when not cancelled if (!this.TokenSource.IsCancellationRequested) this.InternalQueue.Enqueue(item); } /// /// Process /// private void ProcessQueue() { while (true) { // Nothing in the queue means we get to idle if (this.InternalQueue.Count == 0) { if (this.TokenSource.IsCancellationRequested) break; Thread.Sleep(10); continue; } // Get the next item from the queue if (!this.InternalQueue.TryDequeue(out T nextItem)) continue; // Invoke the lambda, if possible this.CustomProcessing?.Invoke(nextItem); } } } }