如何在并行线程中获取当前对象
Posted
技术标签:
【中文标题】如何在并行线程中获取当前对象【英文标题】:How get current object in operation parallel Thread 【发布时间】:2021-06-12 15:11:41 【问题描述】:我是发送到 api 的对象列表。我在下面使用了并行线程。代码
List<object> data ;//contain data
result =new Dictionary<decimal, object>();
var threadCount = 4;
if (data.Count < threadCount)
threadCount = data.Count;
var pageSize = threadCount > 0 ? Convert.ToInt32(Math.Ceiling((Convert.ToDecimal(data.Count) / threadCount))) : 0;
var pageCount = threadCount;
for (int j = 0; j < threadCount; j++)
var temp = data.Skip(j * pageSize).Take(pageSize).ToList();
var tempLength = temp?.Count;
Parallel.ForEach(temp, item =>
result.Add(item.ID, null);
//call Api and get resultApi
if (resultApi != null && resultApi.Result != null)
result[item.ID] = resultApi.Result.id;
else if (resultApi != null && resultApi .Message != null)
result[item.ID] = null;
else
result[item.ID] = null;
);
问题 在检查结果的顶部操作中,我看到一些项目与其 ID 无关并且已被移动。如果退出并行模式时没有移位,则所有标识符都设置正确 如何解决问题?
【问题讨论】:
从哪里开始。Dictionary
不是线程安全的。因此,您需要将其更改为线程安全的数据结构(ConcurrentDictionary
似乎很合适)。也就是说,您如何使用该数据结构也需要是线程安全的,但也不是。最好先确定要在本地输入字典的内容,然后在最后的 1 个操作中将其写入ConcurrentDictionary
。很难说虽然不知道到底发生了什么以及你想要实现什么
@Knoop 非常感谢。我想在操作结束时将对象列表作为批量存储在数据库中
//call Api and get resultApi
-> 这是异步调用吗?
@JohnathanBarclay ,非常感谢,是的。
【参考方案1】:
我的建议是使用PLINQ 而不是Parallel
类。对于入门级多线程,它是一种更安全的工具。 PLINQ 类似于 LINQ,但它以 .AsParallel()
开头。它包括几乎所有熟悉的 LINQ 运算符,如 Select
、Where
、Take
、ToList
等。
Dictionary<decimal, object> dictionary = data
.AsParallel()
.WithDegreeOfParallelism(4)
.Cast<Item>()
.Select(item => (item.ID, CallAPI(item).Result))
.Where(entry => entry.Result != null)
.ToDictionary(entry => entry.ID, entry => (object)entry.Result.Message);
CallAPI
方法假定具有此签名:Task<APIResult> CallAPI(Item item);
此 PLINQ 查询将以 4 的并发级别处理您的数据。这意味着 4 个操作将同时进行,当一项完成时,下一项将自动开始。
ID
s 应该是唯一的,否则 ToDictionary
运算符将抛出异常。
建议使用这种方法是因为它的简单性,而不是它的效率。 PLINQ 并不是真正用于处理 I/O 绑定的工作负载,并且在这样做时会不必要地阻塞 ThreadPool
线程。您可以查看 here 以了解更有效的方法来限制异步 I/O 绑定操作。
【讨论】:
非常感谢。请检查answer并表达您的意见。【参考方案2】:我已经找到答案了。如下。代码
List<object> data ;//contain data
result =new Dictionary<decimal, object>();
var threadCount = 4;
if (data.Count < threadCount)
threadCount = data.Count;
var pageSize = threadCount > 0 ? Convert.ToInt32(Math.Ceiling((Convert.ToDecimal(data.Count) / threadCount))) : 0;
var pageCount = threadCount;
for (int j = 0; j < threadCount; j++)
var temp = data.Skip(j * pageSize).Take(pageSize).ToList();
var tempLength = temp?.Count;
Parallel.ForEach(temp, item =>
lock (result)
var temp = callapi(item.ID);
result.Add(temp.Item1, temp.Item2);
);
private (decimal, object) callapi(decimal id)
//call Api and get resultApi
if (resultApi != null && resultApi.Result != null)
result[item.ID] = resultApi.Result.id;
return (id,result);
else if (resultApi != null && resultApi .Message != null)
result[item.ID] = null;
return (id,result);
else
result[item.ID] = null;
return (id,result);
【讨论】:
通过调用锁内的callapi
,您正在序列化调用,实质上是在终止并行性。您应该只使用锁保护result.Add
调用。除此之外,您的解决方案按顺序调用多个Parallel.ForEach
循环,在我看来,这就像尝试实现自定义分区方案。任务并行库包含一个专用的Partitioner
类,因此实现自己的方案似乎不是明智之举。您的自定义分区器不太可能比内置分区器更好。以上是关于如何在并行线程中获取当前对象的主要内容,如果未能解决你的问题,请参考以下文章