如何在并行线程中获取当前对象

Posted

技术标签:

【中文标题】如何在并行线程中获取当前对象【英文标题】:How get current object in operation parallel Thread 【发布时间】:2021-06-12 15:11:41 【问题描述】:

我是发送到 api 的对象列表。我在下面使用了并行线程。代码

  List<object> data  ;//contain data

                result =new  Dictionary<decimal, object>();
                

                var threadCount = 4;
                if (data.Count < threadCount)
                
                    threadCount = data.Count;
                

                var pageSize = threadCount > 0 ? Convert.ToInt32(Math.Ceiling((Convert.ToDecimal(data.Count) / threadCount))) : 0;
                var pageCount = threadCount;

                

                for (int j = 0; j < threadCount; j++)
                

                    var temp = data.Skip(j * pageSize).Take(pageSize).ToList();
                    var tempLength = temp?.Count;
                    Parallel.ForEach(temp, item =>
                    
                        result.Add(item.ID, null);

                       //call Api and get resultApi

                        if (resultApi != null && resultApi.Result != null)
                        
                            
                            result[item.ID] = resultApi.Result.id;
                        
                        else if (resultApi != null && resultApi .Message != null)
                        
                            
                            result[item.ID] = null;
                        
                        else
                        
                            result[item.ID] = null;
                        
                    );
                 

问题 在检查结果的顶部操作中,我看到一些项目与其 ID 无关并且已被移动。如果退出并行模式时没有移位,则所有标识符都设置正确 如何解决问题?

【问题讨论】:

从哪里开始。 Dictionary 不是线程安全的。因此,您需要将其更改为线程安全的数据结构(ConcurrentDictionary 似乎很合适)。也就是说,您如何使用该数据结构也需要是线程安全的,但也不是。最好先确定要在本地输入字典的内容,然后在最后的 1 个操作中将其写入ConcurrentDictionary。很难说虽然不知道到底发生了什么以及你想要实现什么 @Knoop 非常感谢。我想在操作结束时将对象列表作为批量存储在数据库中 //call Api and get resultApi -> 这是异步调用吗? @JohnathanBarclay ,非常感谢,是的。 【参考方案1】:

我的建议是使用PLINQ 而不是Parallel 类。对于入门级多线程,它是一种更安全的工具。 PLINQ 类似于 LINQ,但它以 .AsParallel() 开头。它包括几乎所有熟悉的 LINQ 运算符,如 SelectWhereTakeToList 等。

Dictionary<decimal, object> dictionary = data
    .AsParallel()
    .WithDegreeOfParallelism(4)
    .Cast<Item>()
    .Select(item => (item.ID, CallAPI(item).Result))
    .Where(entry => entry.Result != null)
    .ToDictionary(entry => entry.ID, entry => (object)entry.Result.Message);

CallAPI 方法假定具有此签名:Task&lt;APIResult&gt; CallAPI(Item item);

此 PLINQ 查询将以 4 的并发级别处理您的数据。这意味着 4 个操作将同时进行,当一项完成时,下一项将自动开始。

IDs 应该是唯一的,否则 ToDictionary 运算符将抛出异常。

建议使用这种方法是因为它的简单性,而不是它的效率。 PLINQ 并不是真正用于处理 I/O 绑定的工作负载,并且在这样做时会不必要地阻塞 ThreadPool 线程。您可以查看 here 以了解更有效的方法来限制异步 I/O 绑定操作。

【讨论】:

非常感谢。请检查answer并表达您的意见。【参考方案2】:

我已经找到答案了。如下。代码

List<object> data  ;//contain data

                result =new  Dictionary<decimal, object>();
                

                var threadCount = 4;
                if (data.Count < threadCount)
                
                    threadCount = data.Count;
                

                var pageSize = threadCount > 0 ? Convert.ToInt32(Math.Ceiling((Convert.ToDecimal(data.Count) / threadCount))) : 0;
                var pageCount = threadCount;

                

                for (int j = 0; j < threadCount; j++)
                

                    var temp = data.Skip(j * pageSize).Take(pageSize).ToList();
                    var tempLength = temp?.Count;
                    Parallel.ForEach(temp, item =>
                    
                      lock (result)
                        
                           var temp = callapi(item.ID);
                           result.Add(temp.Item1, temp.Item2);
                        
                    );
                 
     private (decimal, object) callapi(decimal id)
                        //call Api and get resultApi

                        if (resultApi != null && resultApi.Result != null)
                        
                            
                            result[item.ID] = resultApi.Result.id;
                            return (id,result);
                        
                        else if (resultApi != null && resultApi .Message != null)
                        
                            
                            result[item.ID] = null;
                            return (id,result);
                        
                        else
                        
                            result[item.ID] = null;
                            return (id,result);
                        

【讨论】:

通过调用锁内的callapi,您正在序列化调用,实质上是在终止并行性。您应该只使用锁保护result.Add 调用。除此之外,您的解决方案按顺序调用多个Parallel.ForEach 循环,在我看来,这就像尝试实现自定义分区方案。任务并行库包含一个专用的Partitioner 类,因此实现自己的方案似乎不是明智之举。您的自定义分区器不太可能比内置分区器更好。

以上是关于如何在并行线程中获取当前对象的主要内容,如果未能解决你的问题,请参考以下文章

我的多线程-多线程必知的N个常识

如何线程安全地将并行进程中的数据收集到单个对象中?

如何获取当前线程的 std::thread?

CUDA如何获取网格、块、线程大小和并行化非方阵计算

random_number()如何并行工作?

如何设置并行集合的线程号?