在 PLINQ 中绑定源线程

Posted

技术标签:

【中文标题】在 PLINQ 中绑定源线程【英文标题】:Binding source thread in PLINQ 【发布时间】:2014-11-05 23:09:35 【问题描述】:

我有一个使用 PLINQ 并行化的计算,如下所示:

Source IEnumerable<T> source 正在提供从 文件。

我有一个重量级计算 HeavyComputation 我需要做 每个T,我希望这些跨线程分流,所以我 使用 PLINQ,例如:AsParallel().Select(HeavyComputation)

这里是有趣的地方:由于文件的限制 提供source 的阅读器类型,我需要source 在初始线程上枚举,而不是在并行工作者上。我需要 source 的完整评估将 绑定 到主 线。然而,似乎来源实际上是在工人身上列举的 线程。

我的问题是:有没有一种直接的方法可以修改此代码以 将source 的枚举绑定到初始线程,而 将繁重的工作分给并行工人?请记住, 只是在AsParallel() 之前做一个急切的.ToList() 不是这里的选择, 因为来自文件的数据流很大。

下面是一些示例代码,可以演示我看到的问题:

using System.Threading;
using System.Collections.Generic;
using System.Linq;
using System;

public class PlinqTest

        private static string FormatItems<T>(IEnumerable<T> source)
        
                return String.Format("[0]", String.Join(";", source));
        

        public static void Main()
        
            var expectedThreadIds = new[]  Thread.CurrentThread.ManagedThreadId ;

            var threadIds = Enumerable.Range(1, 1000)
                    .Select(x => Thread.CurrentThread.ManagedThreadId) // (1)
                    .AsParallel()
                    .WithDegreeOfParallelism(8)
                    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                    .AsOrdered()
                    .Select(x => x)                                    // (2)
                    .ToArray();

            // In the computation above, the lambda in (1) is a
            // stand in for the file-reading operation that we
            // want to be bound to the main thread, while the
            // lambda in (2) is a stand-in for the "expensive
            // computation" that we want to be farmed out to the
            // parallel worker threads.  In fact, (1) is being
            // executed on all threads, as can be seen from the
            // output.

            Console.WriteLine("Expected thread IDs: 0",
                              FormatItems(expectedThreadIds));
            Console.WriteLine("Found thread IDs: 0",
                              FormatItems(threadIds.Distinct()));
        

我得到的示例输出是:

Expected thread IDs: [1]
Found thread IDs: [7;4;8;6;11;5;10;9]

【问题讨论】:

您可以遍历 (1) 中的可枚举并在循环中生成工作任务。在循环结束时等待所有任务。 谢谢@Asad,我不确定这是否能让我控制并行度。会吗?或许你能提供更多细节。 【参考方案1】:

如果您放弃 PLINQ 并明确使用任务并行库,这相当简单(尽管可能不那么简洁):

// Limits the parallelism of the "expensive task"
var semaphore = new SemaphoreSlim(8);

var tasks = Enumerable.Range(1, 1000)
    .Select(x => Thread.CurrentThread.ManagedThreadId)
    .Select(async x =>
    
        await semaphore.WaitAsync();
        var result = await Task.Run(() => Tuple.Create(x, Thread.CurrentThread.ManagedThreadId));
        semaphore.Release();

        return result;
    );

return Task.WhenAll(tasks).Result;

请注意,我使用Tuple.Create 来记录来自主线程的线程 ID 和来自衍生任务的线程 ID。根据我的测试,对于每个元组,前者总是相同的,而后者则各不相同,这是应该的。

信号量确保并行度永远不会超过 8(尽管创建元组的廉价任务无论如何都不太可能)。如果达到 8,任何新任务都将等到信号量上有可用点。

【讨论】:

这真是太好了。我特别喜欢你在这里使用信号量。我绝对 与 PLINQ 结婚......让确保这明天在我的代码中有效,然后我会接受答案 我在这里看到的唯一潜在障碍是输出是非流式的。不一定是一个大问题——这本质上是一个数据缩减问题,所以输出要小得多,可以一次保存在内存中。只是好奇是否有办法恢复流媒体行为。我正在考虑。 @DavidAlexander “流式传输”是指您想要完成的结果吗? 通过流式传输,我的意思是我希望能够在计算整个输出之前从输出中获得答案。 (但是,请注意,我想保留原始顺序;所以我想等待第一个答案,然后是下一个答案......)。上面的解决方案使用WhenAll,它一直等到所有任务或完成。 在这种情况下,删除WhenAll,并使用foreach(var t in tasks) var result = await t;。我应该明确一点,这不会影响任务实际完成的顺序,据我所知,没有办法控制这一点。【参考方案2】:

您可以使用下面的OffloadQueryEnumeration 方法,该方法确保源序列的枚举将发生在枚举结果IEnumerable&lt;TResult&gt; 的同一线程上。 querySelector 是将源序列的代理转换为ParallelQuery&lt;T&gt; 的委托。此查询在ThreadPool 线程上进行内部枚举,但输出值会返回到当前线程。

/// <summary>Enumerates the source sequence on the current thread, and enumerates
/// the projected query on a ThreadPool thread.</summary>
public static IEnumerable<TResult> OffloadQueryEnumeration<TSource, TResult>(
    this IEnumerable<TSource> source,
    Func<IEnumerable<TSource>, IEnumerable<TResult>> querySelector)

    // Arguments validation omitted
    var locker = new object();
    (TSource Value, bool HasValue) input = default; bool inputCompleted = false;
    (TResult Value, bool HasValue) output = default; bool outputCompleted = false;
    using var sourceEnumerator = source.GetEnumerator();

    IEnumerable<TSource> GetSourceProxy()
    
        while (true)
        
            TSource item;
            lock (locker)
            
                if (!input.HasValue)
                
                    if (inputCompleted) yield break;
                    Monitor.Wait(locker); continue;
                
                item = input.Value; input = default;
                Monitor.PulseAll(locker);
            
            yield return item;
        
    

    var query = querySelector(GetSourceProxy());

    var outputReaderTask = Task.Run(() =>
    
        try
        
            foreach (var result in query)
            
                lock (locker)
                
                    while (output.HasValue) Monitor.Wait(locker);
                    output = (result, true);
                    Monitor.PulseAll(locker);
                
            
        
        finally
        
            lock (locker)  outputCompleted = true; Monitor.PulseAll(locker); 
        
    );

    List<Exception> exceptions = new();
    while (true)
    
        TResult result;
        lock (locker)
        
            if (output.HasValue)
            
                result = output.Value; output = default;
                Monitor.PulseAll(locker);
                goto yieldResult;
            
            if (outputCompleted)
            
                if (!inputCompleted)
                
                    inputCompleted = true; Monitor.PulseAll(locker);
                
                break;
            
            if (input.HasValue || inputCompleted)
            
                Monitor.Wait(locker); continue;
            
            try
            
                if (sourceEnumerator.MoveNext())
                    input = (sourceEnumerator.Current, true);
                else
                    inputCompleted = true;
            
            catch (Exception ex)
            
                exceptions.Add(ex);
                inputCompleted = true;
            
            Monitor.PulseAll(locker); continue;
        
    yieldResult:
        yield return result;
    

    // Propagate possible exceptions
    try  outputReaderTask.GetAwaiter().GetResult(); 
    catch (OperationCanceledException)  throw; 
    catch (AggregateException aex)  exceptions.AddRange(aex.InnerExceptions); 
    catch (Exception ex)  exceptions.Add(ex);  // Fallback (should never happen)

    if (exceptions.Count > 0) throw new AggregateException(exceptions);

此方法使用Monitor.Wait/Monitor.Pulse 机制 (tutorial),以同步从一个线程到另一个线程的值传输。

使用示例:

int[] threadIds = Enumerable
    .Range(1, 1000)
    .Select(x => Thread.CurrentThread.ManagedThreadId)
    .OffloadQueryEnumeration(proxy => proxy
        .AsParallel()
        .WithDegreeOfParallelism(8)
        .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
        .AsOrdered()
        .Select(x => x)
    )
    .ToArray();

【讨论】:

以上是关于在 PLINQ 中绑定源线程的主要内容,如果未能解决你的问题,请参考以下文章

使用异步方法在XAML中绑定系统时间

plinq的用法之for的用法

数据绑定

“BindingSource不能是它自己的数据源” - 尝试从另一个类中的方法重置绑定源时出错

并行LINQ PLinq

数据绑定为Binding指定绑定源的几种方法