使用并发集合

Posted ccaa

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用并发集合相关的知识,希望对你有一定的参考价值。

带着问题去思考!大家好

并发集合(线程安全),既然是并发集合。那就要知道什么是并发。

并发:同一时间间隔对资源的共享。

ConcurrentDictionary  线程安全字典集合,对于读操作无需使用锁,写操作则需要锁。该并发使用多个锁。
ConcurrentQueue 使用了原子的比较和交换,使用SpanWait来保证线程安全,实现了FIFO.可以调用Enqueue方法向队列中加入元素。TryDequequ方法试图取出队列中的第一个元素,TryPeek方法则试图得到第一个元素但并不从队列中删除元素
ConcurrentStack

 实际中没有任何锁,采用CAS操作,LIFO集合,可以用push,pushRange方法添加元素,使用tryPop和TryPopRange方法获取元素,使用TryPeek方法检查

ConcurrentBag  支持重复元素无序集合,针对这样以下情况进行了优化,及多个线程以这样的方式工作,每个线程产生和消费自己的任务,极少与其他线程的任务交互,Add添加,TryPeek方法,获取元素用TryTask方法
BlockingCollection  是对IprodicerConsumerCollection泛型接口的实现的一个高级封装。支持如下功能,分块,调整内部集合容量,取消集合操作。从多块中获取元素

其中ConcurrentQueue,ConcurrentStack,ConcurrentBag避免使用上面提及的集合的Count属性,实现这些集合使用的是链表。Count时间复杂度为O(N).检查集合是否为空,使用IsEmpty属性,时间复杂度为O(1).

这里我们基本介绍下功能:

ConcurrentDictionary

单线程环境中的字典集合与使用并发字典的性能。

技术图片
const string Item = "Dictionary item";
        public static string CurrentItem;
        /// <summary>
        /// ConcurrentDictionary写操作比使用锁的通常的字典要慢得多。而读操作则要快些。
        /// 因此如果对字典需要大量的线程安全读操作,concurrentDictionary是最好的选择。
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            var concurrentDictionary = new ConcurrentDictionary<int, string>(); //并发集合
            var dictionary = new Dictionary<int, string>(); //正常集合

            var sw = new Stopwatch();
            sw.Start();
            for (int i = 0; i < 100000; i++)
            {
                //锁机制向标准的字典中添加元素,并测量完成100万次迭代的时间。
                lock(dictionary)
                {
                    dictionary[i] = Item;
                }
            }
            sw.Stop();
            Console.WriteLine("Writing to dictionary with a lock :{0}", sw.Elapsed);
            sw.Restart();
            
            for (int i = 0; i < 100000; i++)
            {
                //比较两个集合中获取值的性能
                concurrentDictionary[i] = Item;
            }
            sw.Stop();

            Console.WriteLine("Writing to a concurrent dictionary:{0}",sw.Elapsed);
            sw.Restart();
            for (int i = 0; i < 100000; i++)
            {
                lock(dictionary)
                {
                    CurrentItem = dictionary[i];
                }
            }
            sw.Stop();
            Console.WriteLine("Reading from dictionary with a lock {0}",sw.Elapsed);
            sw.Restart();
            for (int i = 0; i < 100000; i++)
            {
                CurrentItem = concurrentDictionary[i];
            }
            sw.Stop();
            Console.WriteLine("Reading from concurrent  dictionary  {0}", sw.Elapsed);
        }
View Code

创建两个集合,其中一个是标准的字典集合,另一个是新的并发字典集合。采用锁的机制想标准的字典中添加元素。比较两者之间。我们发现ConcurrentDictionary写操作比使用锁的通常的字典要慢的多,而读操作则要快些。因此如果对字典需要大量的线程安全的操作。ConcurrentDictionary是最好的选择。

ConcurrentDictionary的实现使用了细粒度锁技术,在多线程写入方面比使用锁的通常的字典的可伸缩性更好。在本例中,当只用一个线程时,并发字典非常慢。但是扩展到5-6个线程,并发字典的性能会更好

如果你对字典只需要多线程访问只读元素,则没必要执行线程安全的读操作。在此场景中最好只使用通常的字典或者ReadOnlyDictionary集合。

技术图片

 ConcurrentQueue

创建能被多个工作者异步处理的一组任务的例子

技术图片
static async Task RunProgram()
        {
            var taskQueue = new ConcurrentQueue<CustomerTask>();//任务队列
            var cts = new CancellationTokenSource(); //取消标志
            var taskSource = Task.Run(() => TaskProducer(taskQueue));
            Task[] processors = new Task[4];
            for (int i = 0; i < 4; i++)
            {
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(() => TaskProcessor(taskQueue, "Processor" + processorId, cts.Token));
                await taskSource;
                cts.CancelAfter(TimeSpan.FromSeconds(2));
                await Task.WhenAll(processors);
            }
        }

        private static async Task TaskProducer(ConcurrentQueue<CustomerTask> taskQueue)
        {
            for (int i = 0; i <= 20; i++)
            {
                await Task.Delay(50);
                var workItem = new CustomerTask { Id = i };
                taskQueue.Enqueue(workItem);
                Console.WriteLine("Task {0} has been posted", workItem.Id);
            }
        }
        private static async Task TaskProcessor(ConcurrentQueue<CustomerTask> queue, string name, CancellationToken token)
        {
            CustomerTask customerTask;
            bool dequeueSuccesful = false;
            await GetRandomDelay();
            do
            {
                dequeueSuccesful = queue.TryDequeue(out customerTask);
                if (dequeueSuccesful)
                {
                    Console.WriteLine("Task {0} has been processed by {1}", customerTask.Id, name);
                }
                await GetRandomDelay();
            } while (!token.IsCancellationRequested);
        }
        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }

        public class CustomerTask
        {
            public int Id { get; set; }
        }
        static void Main(string[] args)
        {
            Task t = RunProgram();
            t.Wait();
        }
View Code

技术图片

 

我们使用ConcurrentQueue集合实例创建了一个任务队列,然后一个取消标志,用来在我们将任务放入队列后停止工作的。接下来启动了一个单独的工作线程来将任务放入任务队列中。现在定义该程序中消费任务的部分。我们创建了四个工作者,它们会随时等待一段时间,然后从任务中获取一个任务,处理该任务,一直重复整个过程直到我们发出取消标志信号。

ConcurrentStack异步处理

创建了被多个工作者异步处理的一组任务。

技术图片
     static async Task RunProgram()
        {
            var tasks = new ConcurrentStack<CustomerTask>();//任务
            var cts = new CancellationTokenSource(); //取消标志
            var taskSource = Task.Run(() => TaskProducer(tasks));
            Task[] processors = new Task[4];
            for (int i = 0; i < 4; i++)
            {
                string processorId = i.ToString();
                processors[i - 1] = Task.Run(() => TaskProcessor(tasks, "Processor" + processorId, cts.Token));
                await taskSource;
                cts.CancelAfter(TimeSpan.FromSeconds(2));
                await Task.WhenAll(processors);
            }
        }

        private static async Task TaskProducer(ConcurrentStack<CustomerTask> tasks)
        {
            for (int i = 0; i <= 20; i++)
            {
                await Task.Delay(50);
                var workItem = new CustomerTask { Id = i };
                tasks.Push(workItem);
                Console.WriteLine("Task {0} has been posted", workItem.Id);
            }
        }
        private static async Task TaskProcessor(ConcurrentStack<CustomerTask> queue, string name, CancellationToken token)
        {
            CustomerTask customerTask;
            bool dequeueSuccesful = false;
            await GetRandomDelay();
            do
            {
                dequeueSuccesful = queue.TryPop(out customerTask);
                if (dequeueSuccesful)
                {
                    Console.WriteLine("Task {0} has been processed by {1}", customerTask.Id, name);
                }
                await GetRandomDelay();
            } while (!token.IsCancellationRequested);
        }
        static Task GetRandomDelay()
        {
            int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
            return Task.Delay(delay);
        }

        public class CustomerTask
        {
            public int Id { get; set; }
        }
        static void Main(string[] args)
        {
            Task t = RunProgram();
            t.Wait();
        }
View Code

技术图片

 

 与之前的代码几乎一样。唯一不同之处是我们对并发堆栈使用Push和TryPop方法。而对并发队列使用Enqueue和TryDequeue方法。

处理的顺序被改变了了、堆栈是一个LIFO集合,工作者先处理最近的任务。在并发队列中,任务被处理的顺序与被添加的顺序几乎一致。在堆栈中,早先创建的任务具有较低的优先级。而且直到生产者停止向堆栈中放入更多的任务后,该任务才有可能停止。

ConcurrentBag

多个独立的既可以生产工作又可消费工作的工作者如果扩展工作量。
具体可以借鉴https://www.cnblogs.com/InCerry/p/9497729.html

 

以上是关于使用并发集合的主要内容,如果未能解决你的问题,请参考以下文章

Swift新async/await并发中利用Task防止指定代码片段执行的数据竞争(Data Race)问题

代码片段 - Golang 实现集合操作

laravel特殊功能代码片段集合

Alfred常见使用

如何从设置中获取数据并发送到此片段

golang代码片段(摘抄)