在 PLINQ 中绑定源线程
Posted
技术标签:
【中文标题】在 PLINQ 中绑定源线程【英文标题】:Binding source thread in PLINQ 【发布时间】:2014-11-05 23:09:35 【问题描述】:我有一个使用 PLINQ 并行化的计算,如下所示:
Source IEnumerable<T> source
正在提供从
文件。
我有一个重量级计算 HeavyComputation
我需要做
每个T
,我希望这些跨线程分流,所以我
使用 PLINQ,例如:AsParallel().Select(HeavyComputation)
这里是有趣的地方:由于文件的限制
提供source
的阅读器类型,我需要source
在初始线程上枚举,而不是在并行工作者上。我需要
source
的完整评估将 绑定 到主
线。然而,似乎来源实际上是在工人身上列举的
线程。
我的问题是:有没有一种直接的方法可以修改此代码以
将source
的枚举绑定到初始线程,而
将繁重的工作分给并行工人?请记住,
只是在AsParallel()
之前做一个急切的.ToList()
不是这里的选择,
因为来自文件的数据流很大。
下面是一些示例代码,可以演示我看到的问题:
using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System;
public class PlinqTest
private static string FormatItems<T>(IEnumerable<T> source)
return String.Format("[0]", String.Join(";", source));
public static void Main()
var expectedThreadIds = new[] Thread.CurrentThread.ManagedThreadId ;
var threadIds = Enumerable.Range(1, 1000)
.Select(x => Thread.CurrentThread.ManagedThreadId) // (1)
.AsParallel()
.WithDegreeOfParallelism(8)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.AsOrdered()
.Select(x => x) // (2)
.ToArray();
// In the computation above, the lambda in (1) is a
// stand in for the file-reading operation that we
// want to be bound to the main thread, while the
// lambda in (2) is a stand-in for the "expensive
// computation" that we want to be farmed out to the
// parallel worker threads. In fact, (1) is being
// executed on all threads, as can be seen from the
// output.
Console.WriteLine("Expected thread IDs: 0",
FormatItems(expectedThreadIds));
Console.WriteLine("Found thread IDs: 0",
FormatItems(threadIds.Distinct()));
我得到的示例输出是:
Expected thread IDs: [1]
Found thread IDs: [7;4;8;6;11;5;10;9]
【问题讨论】:
您可以遍历 (1) 中的可枚举并在循环中生成工作任务。在循环结束时等待所有任务。 谢谢@Asad,我不确定这是否能让我控制并行度。会吗?或许你能提供更多细节。 【参考方案1】:如果您放弃 PLINQ 并明确使用任务并行库,这相当简单(尽管可能不那么简洁):
// Limits the parallelism of the "expensive task"
var semaphore = new SemaphoreSlim(8);
var tasks = Enumerable.Range(1, 1000)
.Select(x => Thread.CurrentThread.ManagedThreadId)
.Select(async x =>
await semaphore.WaitAsync();
var result = await Task.Run(() => Tuple.Create(x, Thread.CurrentThread.ManagedThreadId));
semaphore.Release();
return result;
);
return Task.WhenAll(tasks).Result;
请注意,我使用Tuple.Create
来记录来自主线程的线程 ID 和来自衍生任务的线程 ID。根据我的测试,对于每个元组,前者总是相同的,而后者则各不相同,这是应该的。
信号量确保并行度永远不会超过 8(尽管创建元组的廉价任务无论如何都不太可能)。如果达到 8,任何新任务都将等到信号量上有可用点。
【讨论】:
这真是太好了。我特别喜欢你在这里使用信号量。我绝对 不 与 PLINQ 结婚......让确保这明天在我的代码中有效,然后我会接受答案 我在这里看到的唯一潜在障碍是输出是非流式的。不一定是一个大问题——这本质上是一个数据缩减问题,所以输出要小得多,可以一次保存在内存中。只是好奇是否有办法恢复流媒体行为。我正在考虑。 @DavidAlexander “流式传输”是指您想要完成的结果吗? 通过流式传输,我的意思是我希望能够在计算整个输出之前从输出中获得答案。 (但是,请注意,我想保留原始顺序;所以我想等待第一个答案,然后是下一个答案......)。上面的解决方案使用WhenAll,它一直等到所有任务或完成。 在这种情况下,删除WhenAll
,并使用foreach(var t in tasks) var result = await t;
。我应该明确一点,这不会影响任务实际完成的顺序,据我所知,没有办法控制这一点。【参考方案2】:
您可以使用下面的OffloadQueryEnumeration
方法,该方法确保源序列的枚举将发生在枚举结果IEnumerable<TResult>
的同一线程上。 querySelector
是将源序列的代理转换为ParallelQuery<T>
的委托。此查询在ThreadPool
线程上进行内部枚举,但输出值会返回到当前线程。
/// <summary>Enumerates the source sequence on the current thread, and enumerates
/// the projected query on a ThreadPool thread.</summary>
public static IEnumerable<TResult> OffloadQueryEnumeration<TSource, TResult>(
this IEnumerable<TSource> source,
Func<IEnumerable<TSource>, IEnumerable<TResult>> querySelector)
// Arguments validation omitted
var locker = new object();
(TSource Value, bool HasValue) input = default; bool inputCompleted = false;
(TResult Value, bool HasValue) output = default; bool outputCompleted = false;
using var sourceEnumerator = source.GetEnumerator();
IEnumerable<TSource> GetSourceProxy()
while (true)
TSource item;
lock (locker)
if (!input.HasValue)
if (inputCompleted) yield break;
Monitor.Wait(locker); continue;
item = input.Value; input = default;
Monitor.PulseAll(locker);
yield return item;
var query = querySelector(GetSourceProxy());
var outputReaderTask = Task.Run(() =>
try
foreach (var result in query)
lock (locker)
while (output.HasValue) Monitor.Wait(locker);
output = (result, true);
Monitor.PulseAll(locker);
finally
lock (locker) outputCompleted = true; Monitor.PulseAll(locker);
);
List<Exception> exceptions = new();
while (true)
TResult result;
lock (locker)
if (output.HasValue)
result = output.Value; output = default;
Monitor.PulseAll(locker);
goto yieldResult;
if (outputCompleted)
if (!inputCompleted)
inputCompleted = true; Monitor.PulseAll(locker);
break;
if (input.HasValue || inputCompleted)
Monitor.Wait(locker); continue;
try
if (sourceEnumerator.MoveNext())
input = (sourceEnumerator.Current, true);
else
inputCompleted = true;
catch (Exception ex)
exceptions.Add(ex);
inputCompleted = true;
Monitor.PulseAll(locker); continue;
yieldResult:
yield return result;
// Propagate possible exceptions
try outputReaderTask.GetAwaiter().GetResult();
catch (OperationCanceledException) throw;
catch (AggregateException aex) exceptions.AddRange(aex.InnerExceptions);
catch (Exception ex) exceptions.Add(ex); // Fallback (should never happen)
if (exceptions.Count > 0) throw new AggregateException(exceptions);
此方法使用Monitor.Wait
/Monitor.Pulse
机制 (tutorial),以同步从一个线程到另一个线程的值传输。
使用示例:
int[] threadIds = Enumerable
.Range(1, 1000)
.Select(x => Thread.CurrentThread.ManagedThreadId)
.OffloadQueryEnumeration(proxy => proxy
.AsParallel()
.WithDegreeOfParallelism(8)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.AsOrdered()
.Select(x => x)
)
.ToArray();
【讨论】:
以上是关于在 PLINQ 中绑定源线程的主要内容,如果未能解决你的问题,请参考以下文章