如何并行运行 LINQ 'let' 语句?

Posted

技术标签:

【中文标题】如何并行运行 LINQ \'let\' 语句?【英文标题】:How to run LINQ 'let' statements in parallel?如何并行运行 LINQ 'let' 语句? 【发布时间】:2014-12-24 05:47:21 【问题描述】:

我有这样的代码:

var list = new List<int> 1, 2, 3, 4, 5;

var result = from x in list.AsParallel()
             let a = LongRunningCalc1(x)
             let b = LongRunningCalc2(x)
             select new a, b;

假设LongRunningCalc 方法每个需要1 秒。上面的代码运行大约需要 2 秒,因为在并行操作 5 个元素的列表时,let 语句中调用的两个方法是按顺序调用的。

但是,这些方法也可以安全地并行调用。他们显然需要合并回select,但在那之前应该并行运行 - select 应该等待他们。

有没有办法做到这一点?

【问题讨论】:

【参考方案1】:

您将无法使用查询语法或let 操作,但您可以编写一个方法对每个项目并行执行多个操作:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TFinal>(
    this ParallelQuery<T> query,
    Func<T, TResult1> selector1,
    Func<T, TResult2> selector2,
    Func<TResult1, TResult2, TFinal> resultAggregator)

    return query.Select(item =>
    
        var result1 = Task.Run(() => selector1(item));
        var result2 = Task.Run(() => selector2(item));
        return resultAggregator(result1.Result, result2.Result);
    );

这将允许您编写:

var query = list.AsParallel()
    .SelectAll(LongRunningCalc1, 
        LongRunningCalc2, 
        (a, b) => new a, b)

您也可以为其他并行操作添加重载:

public static ParallelQuery<TFinal> SelectAll<T, TResult1, TResult2, TResult3, TFinal>
    (this ParallelQuery<T> query,
    Func<T, TResult1> selector1,
    Func<T, TResult2> selector2,
    Func<T, TResult3> selector3,
    Func<TResult1, TResult2, TResult3, TFinal> resultAggregator)

    return query.Select(item =>
    
        var result1 = Task.Run(() => selector1(item));
        var result2 = Task.Run(() => selector2(item));
        var result3 = Task.Run(() => selector3(item));
        return resultAggregator(
            result1.Result,
            result2.Result,
            result3.Result);
    );

可以编写一个版本来处理许多在编译时未知的选择器,但要做到这一点,它们都需要计算相同类型的值:

public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query,
    IEnumerable<Func<T, TResult>> selectors)

    return query.Select(item => selectors.AsParallel()
            .Select(selector => selector(item))
            .AsEnumerable());

public static ParallelQuery<IEnumerable<TResult>> SelectAll<T, TResult>(
    this ParallelQuery<T> query,
    params Func<T, TResult>[] selectors)

    return SelectAll(query, selectors);

【讨论】:

这不是我的专长但是你不需要等待任务的结果吗? Yeh 调用.Result 等待任务执行。我稍微简化了我的示例代码,但这让我走上了正轨,感谢您的帮助。【参考方案2】:

我会使用 Microsoft 的响应式框架(NuGet 中的“Rx-Main”)来执行此操作。

这里是:

var result =
    from x in list.ToObservable()
    from a in Observable.Start(() => LongRunningCalc1(x))
    from b in Observable.Start(() => LongRunningCalc2(x))
    select new a, b;

好处是您可以访问使用.Subscribe(...) 方法生成的结果:

result.Subscribe(x => /* Do something with x.a and/or x.b */ );

超级简单!

【讨论】:

以上是关于如何并行运行 LINQ 'let' 语句?的主要内容,如果未能解决你的问题,请参考以下文章

c# linq let 语句用视图模型分解成碎片

linq 和并行性 - SQL Server

并行LINQ PLinq

verilog 中啥语句并行运行啥时候顺序运行!搞不懂 请教高手!

c#基础并行Linq

是否可以在.net 的同一事务中并行运行多个 SQL 语句?