F# PSeq.iter 似乎并未使用所有内核

Posted

技术标签:

【中文标题】F# PSeq.iter 似乎并未使用所有内核【英文标题】:F# PSeq.iter does not seem to be using all cores 【发布时间】:2012-04-17 04:22:46 【问题描述】:

我一直在用 F# 做一些计算密集型的工作。像 Array.Parallel.map 这样使用 .Net 任务并行库的函数以指数级的速度加快了我的代码速度,而工作量非常小。

但是,由于内存问题,我重新编写了一段代码,以便可以在序列表达式中懒惰地评估它(这意味着我必须存储和传递更少的信息)。到了评估的时候,我使用了:

// processor and memory intensive task, results are not stored
let calculations : seq<Calculation> =  seq  ...yield one thing at a time... 

// extract results from calculations for summary data
PSeq.iter someFuncToExtractResults results

代替:

// processor and memory intensive task, storing these results is an unnecessary task
let calculations : Calculation[] = ...do all the things...

// extract results from calculations for summary data
Array.Parallel.map someFuncToExtractResults calculations 

当使用任何 Array.Parallel 函数时,我可以清楚地看到我计算机上的所有内核都在运行(CPU 使用率约为 100%)。然而,所需的额外内存意味着程序永远不会完成。

当我运行程序时使用 PSeq.iter 版本,CPU 使用率只有 8% 左右(并且 RAM 使用率最低)。

那么:PSeq 版本运行这么慢有什么原因吗?是因为懒惰的评价吗?我是否缺少一些神奇的“平行”东西?

谢谢,

其他资源,两者的源代码实现(它们似乎在 .NET 中使用不同的并行库):

https://github.com/fsharp/fsharp/blob/master/src/fsharp/FSharp.Core/array.fs

https://github.com/fsharp/powerpack/blob/master/src/FSharp.PowerPack.Parallel.Seq/pseq.fs

编辑:为代码示例和细节添加了更多细节

代码:

序列

// processor and memory intensive task, results are not stored
let calculations : seq<Calculation> =  
    seq  
        for index in 0..data.length-1 do
            yield calculationFunc data.[index]
    

// extract results from calculations for summary data (different module)
PSeq.iter someFuncToExtractResults results

数组

// processor and memory intensive task, storing these results is an unnecessary task
let calculations : Calculation[] =
    Array.Parallel.map calculationFunc data

// extract results from calculations for summary data (different module)
Array.Parallel.map someFuncToExtractResults calculations 

详情:

存储中间阵列版本在 10 分钟内快速运行(就崩溃前而言),但在崩溃前使用约 70GB RAM(64GB 物理,其余分页) seq 版本需要 34 分钟,并使用一小部分 RAM(仅约 30GB) 我正在计算大约十亿个值。因此,十亿双倍(每个 64 位)= 7.4505806GB。有更复杂的数据形式......我正在清理一些不必要的副本,因此当前使用了大量的 RAM。 是的,架构不是很好,懒惰的评估是我尝试优化程序和/或将数据分批成更小的块的第一部分 使用较小的数据集,两个代码块输出相同的结果。 @pad,我尝试了你的建议,当输入 Calculation[] 时,PSeq.iter 似乎工作正常(所有内核都处于活动状态),但仍然存在 RAM 问题(最终崩溃) 代码的摘要部分和计算部分都是 CPU 密集型的(主要是因为大数据集) 对于 Seq 版本,我的目标只是并行化一次

【问题讨论】:

延迟评估在并行执行中表现不佳。公平地说,将相同的Calculation[] 传递给PSeq.iterArray.Parallel.map。如果没有CalculationsomeFuncToExtractResults 的更多详细信息,就不可能说出原因。 感谢您的建议,我试过了,当给定数组而不是惰性序列时,PSeq 表现良好......但是它不能解决 RAM 问题 【参考方案1】:

根据您更新的信息,我将我的答案缩短到相关部分。你只需要这个而不是你目前拥有的:

let result = data |> PSeq.map (calculationFunc >> someFuncToExtractResults)

无论您使用PSeq.map 还是Array.Parallel.map,这都一样。

但是,您的真正问题不会得到解决。这个问题可以表述为:当达到期望的并行工作程度以达到 100% 的 CPU 使用率时,没有足够的内存来支持进程。

你能看出这将如何解决吗?您可以按顺序处理事物(CPU 效率较低,但内存效率较低),也可以并行处理事物(CPU 效率更高,但内存不足)。

那么选项是:

    将这些函数使用的并行度更改为不会破坏您的记忆的东西:

    let result = data 
                 |> PSeq.withDegreeOfParallelism 2 
                 |> PSeq.map (calculationFunc >> someFuncToExtractResults)
    

    更改calculationFunc &gt;&gt; someFuncToExtractResults 的底层逻辑,使其成为更高效的单一函数并将数据流式传输到结果。在不了解更多细节的情况下,要了解如何做到这一点并不容易。但在内部,当然可以进行一些延迟加载。

【讨论】:

两者都很密集,我不确定您的第二点是什么意思,请您详细说明一下吗? @AnthonyTruskinger:根据您提供的额外信息,我做了一些重要的更新。请注意,如果您不希望更改算法,则必须在某处进行权衡(如果不更改算法,您将无法获得 100% 的 CPU 和高效内存)。如果您可以更改算法,那么,请参阅我的答案。【参考方案2】:

Array.Parallel.map 在底层使用Parallel.For,而PSeqPLINQ 的薄包装。但是它们在这里表现不同的原因是当seq&lt;Calculation&gt; 是连续的并且产生新结果的速度太慢时,PSeq.iter 的工作负载不足。

我不知道使用中间序列或数组。假设data 是输入数组,将所有计算移动到一个地方是要走的路:

// Should use PSeq.map to match with Array.Parallel.map
PSeq.map (calculationFunc >> someFuncToExtractResults) data

Array.Parallel.map (calculationFunc >> someFuncToExtractResults) data

您可以避免消耗过多内存并在一个地方进行密集计算,从而提高并行执行的效率。

【讨论】:

【参考方案3】:

我遇到了与您类似的问题,并通过将以下内容添加到解决方案的 App.config 文件中来解决它:

<runtime> 
    <gcServer enabled="true" />
    <gcConcurrent enabled="true"/>
</runtime>

在 Process Lasso 上使用 5'49'' 并显示大约 22% 的 CPU 使用率的计算需要 1'36'' 显示大约 80% 的 CPU 使用率。

另一个可能影响并行代码速度的因素是 Bios 中是否启用了超线程 (Intel) 或 SMT (AMD)。我见过禁用导致更快执行的情况。

【讨论】:

以上是关于F# PSeq.iter 似乎并未使用所有内核的主要内容,如果未能解决你的问题,请参考以下文章

移交协议未传递线程所有权-但它带来了“成功”响应-似乎是错误

EMR Hadoop 并未利用所有集群节点

Array.every 函数并未在 Object 的所有元素上运行

我的 Go 程序如何让所有 CPU 内核保持忙碌状态?

确保通过 Python 中的 UDP 套接字接收所有数据

如何获取所有 SQL Server 默认值和规则的名称