C#中定义自己的消费队列(下)

Posted Hello 寻梦者!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C#中定义自己的消费队列(下)相关的知识,希望对你有一定的参考价值。

一 背景

上一篇中我们介绍了一个关于使用C#中的Queue来定义自己的消费队列,这篇文章我将再次使用Queue来定义另外一种消费队列,这个队列中会使用到System.Threading.Timer来定义一个10ms的Interval,和上一篇中产生数据一个个消费不同这篇文章中介绍的消费队列中消费定时器时间间隔内的所有待消费项,前面我们还是一样会通过源码来一步步讲述其内部原理,最后会通过几个单元测试来验证对应的使用。

二 源码

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace Pangea.Common.Utility.Buffer

    public class CustomConsumeQueue<T> : IDisposable
    
        #region Fields

        public const int INTERVAL_CONSUMING = 10;//单位:ms

        // used for dispose
        private bool _isDisposed = false;

        protected const bool FINALIZE_DISPOSING = false;
        protected const bool EXPLICIT_DISPOSING = true;

        private Timer _timer;
        private Queue<T> _queue;
        private readonly object _lockObj = new object();
        private readonly Action<IList<T>> _consumerQueueItemsAction;

#if DEBUG

        // the counter for working thread numbers in current;
        public int _threadCounter = 0;

#endif

        #endregion

        #region Ctor

        public CustomConsumeQueue(Action<IList<T>> consumerQueueItemsAction)
        
            _queue = new Queue<T>();
            _timer = new Timer(new TimerCallback(BeginTakeQueueItems));
            _timer.Change(Timeout.Infinite, Timeout.Infinite);
            _consumerQueueItemsAction = consumerQueueItemsAction;
        

        ~CustomConsumeQueue()
        
            Dispose(FINALIZE_DISPOSING);
        

        #endregion

        #region Methods

        #region Public

        public bool Add(T item)
        
            if (_isDisposed) return false;

            lock (_lockObj)
            
                _queue.Enqueue(item);
                _timer.Change(INTERVAL_CONSUMING, Timeout.Infinite);
            
            return true;
        

        public int PendingToConsumeCount()
        
            lock (_lockObj)
            
                return _queue.Count;
            
        

        public void Dispose()
        
            Dispose(EXPLICIT_DISPOSING);
            GC.SuppressFinalize(this);
        

        public void Dispose(bool disposingMode)
        
            if (_isDisposed) return;

            if (disposingMode == EXPLICIT_DISPOSING)
            
                // release managed resource whne dispose by explicit
            

            _timer?.Dispose();
            _timer = null;

            _isDisposed = true;
        

        #endregion

        #region Private

        private void BeginTakeQueueItems(object state)
        
            ThreadPool.QueueUserWorkItem(state1 =>
            
#if DEBUG
                Interlocked.Increment(ref _threadCounter);
#endif
                try
                
                    T[] itemsArray = null;
                    lock (_lockObj)
                    
                        itemsArray = new T[_queue.Count];
                        _queue.CopyTo(itemsArray, 0);
                        _queue.Clear();

                        if (_isDisposed == false)
                        
                            // may throw a disposed exception
                            ((Timer)state).Change(Timeout.Infinite, Timeout.Infinite);
                        
                    
                    Trace.WriteLine($"[DateTime.Now:HH-mm-ss fff] Begin into consume procedure,QueueCount:itemsArray.Length,Time:DateTime.Now:yyyy-MM-dd HH:mm:ss.fff");

                    //begin consumer queue items
                    _consumerQueueItemsAction.Invoke(itemsArray);
                
                catch (Exception ex)
                
                    // TODO Log exception, terminate current thread
                
                finally
                
#if DEBUG
                    Interlocked.Decrement(ref _threadCounter);
#endif
                
            );
        

        #endregion

        #endregion
    

2.1 原理讲解

  1. Add 的过程
    在Add的时候除了将待消费项添加到默认的_queue对象中以外就开始设置定时器触发的Interval为INTERVAL_CONSUMING,这个值默认设置为10ms,也就是当前定时器的触发间隔在添加消费项的时候会默认开启。
  2. 定时器消费过程
    定时器消费的时候会一次性将10ms的间隔内所有消费项一次性取到,然后一次性消费掉,另外需要注意的是,在定时器触发时间回调的过程中需要重新设置定时器的IntervalTimeout.Infinite,这样就能够将原来的定时器停掉,在下一次再次添加的时候重新设置默认的Interval,这样就能够一次往复交替进行,这个是消费过程的主要逻辑。
  3. 实现IDispose模式
    当前的CustomConsumeQueue显式实现IDispose的接口,代码中也都实现了对_isDisposed字段的判断,这个在使用的过程中需要特别注意。

三 测试

单元测试用例如下:

using NUnit.Framework;
using Pangea.Common.Utility.Buffer;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;

namespace ACM.Framework.Test.Modules.Utils

    public class CustomConsumeQueueTest
    
        CustomConsumeQueue<string> _consumingQueue = null;

        [SetUp]
        public void Setup()
        
            CustomConsumeQueue<string> consumeQueue = new CustomConsumeQueue<string>(StartConsuming);
            _consumingQueue = consumeQueue;
        

        [Test]
        public void GeneralWritingTest()
        
            int consumedCount = 0;
            CustomConsumeQueue<string> consumeQueue = new CustomConsumeQueue<string>(queueItems =>
            
                int currentConsumedCount = queueItems.Count;
                Console.WriteLine($"[DateTime.Now:HH-mm-ss fff] currentConsumedCount items has been consumed.");
                consumedCount += currentConsumedCount;
            );

            int producedCount = new Random(Guid.NewGuid().GetHashCode()).Next(1, 100);
            AddItem(consumeQueue, producedCount);

            int loopTime = 1000;
            while (consumedCount != producedCount)
            
                Thread.Sleep(100);
                loopTime -= 100;
            

            Assert.IsTrue(consumeQueue.PendingToConsumeCount() == 0);
            Assert.IsTrue(consumeQueue._threadCounter == 0);
            Assert.IsTrue(consumedCount == producedCount);
        

        [Test]
        public void TimerWritingTest()
        
            int times = 20;
            int paralleCount = 5;
            int completedThreadCount = 0;
            for (int i = 0; i < paralleCount; i++)
            
                int localIndex = i;
                ThreadPool.QueueUserWorkItem(obj =>
                
                    while (times-- > 0)
                    
                        AddItem(_consumingQueue, new Random(Guid.NewGuid().GetHashCode()).Next(1, 100));
                        Thread.Yield();
                    
                    Interlocked.Increment(ref completedThreadCount);
                    Trace.WriteLine($"***************Begin run into parralle thread:localIndex************************");
                );
            

            while (completedThreadCount != 5)
            
                Thread.Sleep(100);
            

            Thread.Sleep(1000);

            Assert.IsTrue(_consumingQueue.PendingToConsumeCount() == 0);
            Assert.IsTrue(_consumingQueue._threadCounter == 0);
        

        [Test]
        public void AddAfterDisposeTest()
        
            string item1 = GetRandomString();
            bool result1 = _consumingQueue.Add(item1);
            Assert.IsTrue(result1);

            Thread.Sleep(1000); // Console.WriteLine will cost lots of time

            Assert.IsTrue(_consumingQueue.PendingToConsumeCount() == 0);
            Assert.IsTrue(_consumingQueue._threadCounter == 0, $"[DateTime.Now:HH-mm-ss fff] Thread numbers is not zero.");

            _consumingQueue.Dispose();

            string item2 = GetRandomString();
            bool result2 = _consumingQueue.Add(item2);
            Assert.IsFalse(result2);
        

        private void StartConsuming(IList<string> queueItems)
        
            Console.WriteLine($"[DateTime.Now:HH-mm-ss fff] queueItems.Count items has been consumed.");
        
        private void AddItem(CustomConsumeQueue<string> queue, int numbers)
        
            IList<string> items = new List<string>();
            for (int i = 0; i < numbers; i++)
            
                items.Add(GetRandomString());
            
            foreach (var item in items)
            
                queue.Add(item);
                Thread.Sleep(new Random(Guid.NewGuid().GetHashCode()).Next(1, 50));
            
        
        private string GetRandomString()
        
            return Guid.NewGuid().ToString();
        
    

测试的单元用例主要包含下面的几个部分:

  1. GeneralWritingTest()实现单个线程增加PendingItem,然后实现消费的过程。
  2. TimerWritingTest()使用5个独立的线程增加PendingItem,在每个线程中随机产生消费项,然后判断消费过程是否正确。
  3. AddAfterDisposeTest() 主要测试代码实现Dispose的过程,在添加然后消费后中途在显式的Dispose当前的Queue,判断后续是否能够再继续进行添加。

以上是关于C#中定义自己的消费队列(下)的主要内容,如果未能解决你的问题,请参考以下文章

C#阻塞队列BlockingCollection

RibbitMQ 大数据分布式下的消息队列思

C#多线程 一个缓冲队列,一个生产者线程,一个消费者线程,这两个线程同时操作这个队列,必须加互斥锁吗

c#数据流或任务,消费消息并行处理

并发无锁队列

多个生产者,单个消费者