基于需要在执行时间内接收更多项目的列表执行 Parallel.ForEach
Posted
技术标签:
【中文标题】基于需要在执行时间内接收更多项目的列表执行 Parallel.ForEach【英文标题】:Execute Parallel.ForEach based in a List that needs to receive more items in execution time 【发布时间】:2021-12-19 19:13:12 【问题描述】:我在这里遇到了一个挑战,找到解决方案让我很头疼。
我有一个List
的东西,我根据它执行Parallel.ForEach
:
List<Customer> customers = GetNotProcessedCostumer();
Parallel.ForEach(customers, new ParallelOptions MaxDegreeOfParallelism = 2,
cust=>
ExecuteSomething(cust);
);
这里的问题是我需要再次调用GetNotProcessedCostumer
以检查数据库中是否有新的未处理项目可用,而此并行仍在运行。
再次调用该方法是可以的,但是,如何在并行已经使用的List
中插入新项目?
换句话说,List<Customer>
是活着的,我需要一直在上面插入项目,并尝试使用现有Parallel
中的可用线程。看看:
List<Customer> customers = GetNotProcessCustomer // get not processed customers from database
Parallel.ForEach(customers) // ...... Start the parallel ...
customer.Add(GetNotProcessCustomer()) // Read database again..
“嘿 Parallel,你有可用的线程吗?”如果是,请使用它。
我可以接受其他人的方法和想法,比如Threads
、ThreadPool
......
有人可以帮帮我吗?
【问题讨论】:
听起来像是你想要一个队列的生产者/消费者的东西。您定期使用来自 GetNotProcessedCostumer 的数据填充队列,另一个线程检查队列中是否有新项目以及有多少线程已准备好运行并启动 ExecuteSomething 的异步版本。 我同意。查找生产者/消费者队列。 除了我喜欢客户错字 是的,你是对的。队列是解决这个问题的最佳方法。谢谢大家,我找到了使用队列的方法! Gabriel 您可以考虑将您的解决方案发布到as an answer,以便其他人可以从中受益。 【参考方案1】:可能有比Parallel
类更好的方法来完成这项工作,TPL Dataflow 库中的ActionBlock<Customer>
是最有希望的候选者。但是,如果您想使用已有的知识来完成您的工作,您可以使用延迟的IEnumerable<Customer>
序列而不是具体化的List<Customer>
来提供并行循环。这个序列将查询数据库并在一个永无止境的循环中产生未处理的客户。在组合中添加Task.Delay
可能是个好主意,以确保不会比每 X 秒更频繁地查询数据库。
IEnumerable<Customer> GetNotProcessedCustomersNonStop(
CancellationToken cancellationToken = default)
while (true)
var delayTask = Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
foreach (var customer in GetNotProcessedCustomers())
yield return customer;
delayTask.GetAwaiter().GetResult();
在混合中添加CancellationToken
可能也是一个好主意,因为最终您想停止循环,不是吗?
如果你对延迟可枚举序列和yield
语句不熟悉,可以看看这个文档:Iterators
最后一个重要的细节是告诉Parallel
类你不希望它做一些花哨的事情,比如贪婪地枚举可枚举对象并缓存它的项目。您希望它仅在准备好处理它时才能抓住下一个客户。您可以通过在混合中添加Partitioner.Create
来做到这一点。把所有东西放在一起:
var cts = new CancellationTokenSource();
var source = Partitioner.Create(GetNotProcessedCustomersNonStop(cts.Token),
EnumerablePartitionerOptions.NoBuffering);
var parallelOptions = new ParallelOptions()
MaxDegreeOfParallelism = 2,
CancellationToken = cts.Token,
;
Parallel.ForEach(source, parallelOptions, customer =>
ProcessCustomer(customer);
);
//cts.Cancel(); // eventually...
【讨论】:
以上是关于基于需要在执行时间内接收更多项目的列表执行 Parallel.ForEach的主要内容,如果未能解决你的问题,请参考以下文章
ioctlsocket 或 recv 在 Windows 套接字编程中需要更多时间来执行?