关于 C# ConcurrentQueue<T>

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于 C# ConcurrentQueue<T>相关的知识,希望对你有一定的参考价值。

不太明白ConCurrentQueue这个类是做什么用的,求讲解
我尝试写了如下
public async Task<bool> Check(string termCode, string crn)

return await somemethod();


public async Task ConcurrentRequest(Dictionary<string, List<string>> CoursesbyTerm)


ConcurrentQueue<Task> ProgressQueue = new ConcurrentQueue<Task>();
var task = new Task(
() =>

//Console.WriteLine("Enqueue elements...");
foreach (string key in CoursesbyTerm.Keys)

foreach (string crn in CoursesbyTerm[key])

ProgressQueue.Enqueue(Check(key,crn));

//Thread.Sleep(100);


Task item;

);
task.Start();

这代表了并行运行Check方法么? 如果是, 我要如何在不阻塞队列进程的情况下随时查看Check的结果并根据结果来TryDequeue 对应的Check呢,也就是说Check返回是false就要一遍一遍的运行Check,是true的时候从队列里去除
如果不是,怎么来用ConcurrentQueue这个类来实现并行运行Check?
分不多。。求大神 解决了大大提高分数,谢谢各位

参考技术A http://www.cnblogs.com/quark/archive/2012/03/19/2406024.html
看些理论性的东西吧。追问

然后您有什么高见?

生产消费模式:多线程读写队列ConcurrentQueue

需求:现需要将多个数据源的数据导入到目标数据库,这是一个经典的生产消费应用的例子。

直接上代码,看下实现:

            // 初始化列队缓冲区 队列大小为100
            IDataCollection<List<T>> queue = new QueueCollection<List<T>>(100);

            //开启X个后台任务,读取RabbitMQ队列信息, 把列队信息插入缓冲区队列
            var count = 1;
            for (int i = 0; i < count; i++)
            {
                Task.Factory.StartNew(() => new Producer<List<T>>(queue).Start(new RabbitSource<List<T>>().Get));
            }

            //开启X个后台任务,主动获取数据库数据,作为数据生产者,插入到缓冲区队列,
            for (int i = 0; i < count; i++)
            {
                Task.Factory.StartNew(() => new Producer<List<T>>(queue).Start(new DatabaseSource<List<T>>().Get));
            }

            //开启X个后台任务,主动获取读取缓冲区列队,作为数据消息者,把数据插入到ES库,
            for (int i = 0; i < count; i++)
            {
                Task.Factory.StartNew(() => new Customer<List<T>>(queue).Start(new Elastic().Insert));
            }

队列我们采用线程安全的ConcurrentQueue队列:

/// <summary>
    /// 缓冲区队列
    /// ConcurrentQueue线程安全,不用考虑锁的问题
    /// </summary>
    public class QueueCollection<T> :IDataCollection<T>
    {
        //队列最大值
        private int _maxSize;

        /// <summary>
        /// 线程安全的队列
        /// </summary>
        private ConcurrentQueue<T> _queue;

        public QueueCollection(int maxSize)
        {
            this._maxSize = maxSize;
            _queue = new ConcurrentQueue<T>();
        }
        
        public bool isPopWaiting()
        {
            return !_queue.Any();
        }

        public bool isPushWaiting()
        {
            return this._maxSize == _queue.Count;
        }
        
        public T Pop()
        {
            T _obj = default(T);
            if (!_queue.IsEmpty)
                _queue.TryDequeue(out _obj);

            return _obj;
        }

        public void Push(T t)
        {
            if (this._maxSize > _queue.Count)
            {
                _queue.Enqueue(t);
            }
        }
    }

如果我们不使用这个队列,只要满足IDataCollection接口,也可以进行替换:

public interface IDataCollection<T>
    {
        /// <summary>
        /// 插入数据 
        /// </summary>
        /// <param name="t"></param>
        void Push(T t);

        /// <summary>
        /// 取出数据
        /// </summary>
        /// <returns></returns>
        T Pop();

        /// <summary>
        /// 是否插入数据等待
        /// </summary>
        /// <returns></returns>
        bool isPushWaiting();

        /// <summary>
        /// 是否取出数据等待
        /// </summary>
        /// <returns></returns>
        bool isPopWaiting();
        
    }

生产者:

 public class Producer<T> : ITransientDependency
    {
        private int sleep;

        private IDataCollection<T> bufferQueue;

        public Producer(IDataCollection<T> queue)
        {
            sleep = 3000;
            bufferQueue = queue;
        }

        public void Start(Action<Action<T>> methodCall)
        {
            //入队
            methodCall((bills) => 
            {
                this.Enqueue(bills);
            });
        }

        private void Enqueue(T t)
        {
            var isWaiting = true;

            while (isWaiting)
            {
                if (!bufferQueue.isPushWaiting())
                {
                    this.bufferQueue.Push(t);
                    isWaiting = false;
                }
                else
                {
                    //生产者等待时间
                    Thread.Sleep(sleep);
                }
            }
        }
    }

消费者:

/// <summary>
    /// 消费者
    /// </summary>
    public class Customer<T>
    {
        //产品缓存队列
        private IDataCollection<T> _queue;
        
        //消费者等待时间
        private int Spead = 5000;//消费者等待时间

        public Customer(IDataCollection<T> queue)
        {
            this._queue = queue;
        }

        public void Start(Action<T> method)
        {
            while (true)
            {
                if (!_queue.isPopWaiting())
                {
                    T box = this._queue.Pop();

                    method(box);
                }
                else
                {
                    Thread.Sleep(Spead);
                }
            }
        }
    }

方法委托,也写了个基类,其实意义并不大,只是为了规范, 防止方法命名随意起。

    public interface IDataSource<T>
    {
        void Get(Action<T> func);
    }

最后,在DataSource的get方法中,调用 func即可。

 

以上是关于关于 C# ConcurrentQueue<T>的主要内容,如果未能解决你的问题,请参考以下文章

C#多线程开发-并发集合中的ConcurrentQueue

C#线程安全队列ConcurrentQueue

在托管 c++/.net 4.0 中创建对象的 ConcurrentQueue

ConcurrentQueue队列

ConcurrentQueue表示线程安全的先进先出 (FIFO) 集合。

生产消费模式:多线程读写队列ConcurrentQueue