using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; namespace MPF.Data { public class ProcessingQueue : IDisposable { /// /// Internal queue to hold data to process /// private readonly ConcurrentQueue InternalQueue; /// /// Custom processing step for dequeued data /// private readonly Action CustomProcessing; /// /// Internal processing task for dequeueing /// private readonly Task ProcessingTask; /// /// Cancellation method for the processing task /// private readonly CancellationTokenSource TokenSource; public ProcessingQueue(Action customProcessing) { this.InternalQueue = new ConcurrentQueue(); this.CustomProcessing = customProcessing; this.TokenSource = new CancellationTokenSource(); this.ProcessingTask = Task.Run(() => ProcessQueue(), this.TokenSource.Token); } /// /// Dispose the current instance /// public void Dispose() { this.TokenSource.Cancel(); while (!this.ProcessingTask.IsCompleted) ; this.ProcessingTask.Dispose(); } /// /// Enqueue a new item for processing /// /// public void Enqueue(T item) { this.InternalQueue.Enqueue(item); } /// /// Process /// private void ProcessQueue() { while (true) { // If cancellation was requested, just do it if (this.TokenSource.IsCancellationRequested) break; // Nothing in the queue means we get to idle if (this.InternalQueue.Count == 0) continue; // Get the next item from the queue if (!this.InternalQueue.TryDequeue(out T nextItem)) continue; // Invoke the lambda, if possible this.CustomProcessing?.Invoke(nextItem); } } } }