F# 性能:是啥让这段代码如此缓慢?

Posted

技术标签:

【中文标题】F# 性能:是啥让这段代码如此缓慢?【英文标题】:F# Performance: What is making this code so slow?F# 性能:是什么让这段代码如此缓慢? 【发布时间】:2016-06-12 15:59:53 【问题描述】:

此 F# 代码试图解决Project Euler problem #58:

let inc = function
| n -> n + 1
let is_prime = function
| 2 -> true
| n when n < 2 || n%2=0-> false 
| n -> 
       [3..2..(int (sqrt (float n)))] 
       |> List.tryFind (fun i -> n%i=0)
       |> Option.isNone
let spir = Seq.initInfinite (fun i -> 
    let n = i%4
    let a = 2 * (i/4 + 1)
    (a*n) + a + (a-1)*(a-1))
let rec accum se p n = 
   match se with
   | x when p*10 < n && p <> 0 -> 2*(n/4) + 1
   | x when is_prime (Seq.head x) -> accum (Seq.tail x) (inc p) (inc n)
   | x -> accum (Seq.tail x) p (inc n)
   | _ -> 0
printfn "%d" (accum spir 0 1)

我不知道这个程序的运行时间,因为我拒绝等待它完成。相反,我用 C++ 命令式地编写了这段代码:

#include "stdafx.h"
#include "math.h"
#include <iostream>

using namespace std;

int is_prime(int n)

    if (n % 2 == 0) return 0;
    for (int i = 3; i <= sqrt(n); i+=2)
    
        if (n%i == 0)
        
            return 0;
        
    
    return 1;


int spir(int i)

    int n = i % 4;
    int a = 2 * (i / 4 + 1);
    return (a*n) + a + ((a - 1)*(a - 1));


int main()

    int n = 1, p = 0, i = 0;
    cout << "start" << endl;
    while (p*10 >= n || p == 0)
    
        p += is_prime(spir(i));
        n++; i++;
    
    cout << 2*(i/4) + 1;

    return 0;

上面的代码运行不到2秒就得到了正确答案。

是什么让 F# 代码运行如此缓慢?即使使用了an old *** post 中提到的一些分析工具,我仍然无法弄清楚发生了哪些昂贵的操作。

编辑#1

通过 rmunn 的帖子,我能够想出一个不同的实现,它可以在不到 30 秒的时间内得到答案:

let inc = function
| n -> n + 1
let is_prime = function
| 2 -> true
| n when n < 2 || n%2=0-> false 
| n -> 
       [3..2..(int (sqrt (float n)))] 
       |> List.tryFind (fun i -> n%i=0)
       |> Option.isNone
let spir2 = 
    List.unfold (fun state -> 
        let p = fst state
        let i = snd state
        let n = i%4
        let a = 2 * (i/4 + 1)
        let diag = (a*n) + a + (a-1)*(a-1)
        if p*10 < (i+1) && p <> 0 then 
            printfn "%d" (2*((i+1)/4) + 1)
            None
        elif is_prime diag then
            Some(diag, (inc p, inc i))
        else Some(diag, (p, inc i))) (0, 0)

编辑#2

借助 FuleSnabel 的信息丰富的帖子,他的 is_prime 函数使上述代码的运行时间不到十分之一秒,比 C++ 代码更快:

let inc = function
| n -> n + 1
let is_prime = function
  | 1                 -> false
  | 2                 -> true
  | v when v % 2 = 0  -> false
  | v ->
    let stop = v |> float |> sqrt |> int
    let rec loop vv =
      if vv <= stop then
        if (v % vv) <> 0 then
          loop (vv + 2)
        else
          false
      else
        true
    loop 3
let spir2 = 
    List.unfold (fun state -> 
        let p = fst state
        let i = snd state
        let n = i%4
        let a = 2 * (i/4 + 1)
        let diag = (a*n) + a + (a-1)*(a-1)
        if p*10 < (i+1) && p <> 0 then 
            printfn "%d" (2*((i+1)/4) + 1)
            None
        elif i <> 3 && is_prime diag then
            Some(diag, (inc p, inc i))
        else Some(diag, (p, inc i))) (0, 0)

【问题讨论】:

Seq.head, Seq.tail @ildjarn 你能解释一下为什么Seq.headSeq.tail 很贵吗? 【参考方案1】:

核心 F# 库中没有 Seq.tail 函数(更新:有,请参阅 cmets),所以我假设您正在使用来自 FSharpx.Collections 的 Seq.tail 函数。如果您使用Seq.tail 的不同实现,它可能是相似的——而且几乎可以肯定这就是您的问题的原因,因为它不像您认为的那样 O(1)。获取 List 的尾部是 O(1),因为 List 是如何实现的(作为一系列 cons 单元格)。但是得到一个 Seq 的尾部最终会从原始枚举中创建一个全新的 Seq,从中丢弃一个项目,并返回它的其余项目。当您第二次通过 accum 循环时,您在“跳过 1 然后返回”序列上调用 Seq.tail。所以现在你有一个Seq,我将它称为 S2,它向 S1 请求一个 IEnumerable,跳过 S1 的第一项,并返回它的其余部分。 S1,当被问到它的第一个项目时,向 S0(原始 Seq)询问一个可枚举的,跳过它的第一个项目,然后返回它的其余部分。所以 S2 要跳过两个项目,它必须创建两个序列。现在,在下一次运行时,当您请求 S2 的 Seq.tail 时,您创建了 S3,它向 S2 请求一个 IEnumerable,它向 S1 请求一个 IEnumerable,它向 S0 请求一个 IEnumerable……等等。这实际上是 O(N^2),当您认为您正在编写 O(N) 操作时。

恐怕我现在没有时间为您找出解决方案;使用 List.tail 无济于事,因为您需要无限序列。但也许仅仅知道Seq.tail 的陷阱就足以让你开始,所以即使它不完整,我现在也会发布这个答案。

如果您需要更多帮助,请在此答案上发表评论,我会在有时间时再回复它 - 但这可能需要几天时间,因此希望其他人也能回答您的问题。

【讨论】:

感谢您提供的信息丰富的回答!我会看看我是否可以做出不同的实现。 @munn:在 FSharp 核心库中有 Seq.tail。源代码是here on GitHub - 似乎目前还没有 MSDN 文档。我在相关项目中添加了issue。 @AntonSchwaighofer - 我知道 F# 4 有 normalized the APIs 的 List、Seq 和 Array,因此所有集合函数都可用,所以我应该意识到有一个 @987654336 @ 现在。它与FSharpx.Collections 的实现相同,所以谢天谢地,除了第一句话之外,我的回答仍然正确。【参考方案2】:

编写高性能的 F# 是非常可能的,但需要一些模式知识,这些模式在紧密循环中具有较高的相对 CPU 成本。我建议使用 ILSpy 之类的工具来查找隐藏的开销。

例如,可以想象 F# 将这个表达式扩展为一个有效的 for 循环:

[3..2..(int (sqrt (float n)))] 
|> List.tryFind (fun i -> n%i=0)
|> Option.isNone

但目前还没有。相反,它使用内部运算符创建一个跨越范围的List,并将其传递给List.tryFind。与我们喜欢做的实际工作(模运算)相比,这很昂贵。 ILSpy 将上面的代码反编译成这样:

public static bool is_prime(int _arg1)

  switch (_arg1)
  
  case 2:
    return true;
  default:
    return _arg1 >= 2 && _arg1 % 2 != 0 && ListModule.TryFind<int>(new Program.Original.is_prime@10(_arg1), SeqModule.ToList<int>(Operators.CreateSequence<int>(Operators.OperatorIntrinsics.RangeInt32(3, 2, (int)Math.Sqrt((double)_arg1))))) == null;
  

这些运算符没有达到应有的性能(AFAIK,目前正在改进),但无论分配List 然后搜索它的效率如何,它都不会胜过 for 循环。

这意味着is_prime 没有达到应有的效果。相反,人们可以这样做:

let is_prime = function
  | 1                 -> false
  | 2                 -> true
  | v when v % 2 = 0  -> false
  | v ->
    let stop = v |> float |> sqrt |> int
    let rec loop vv =
      if vv <= stop then
        (v % vv) <> 0 && loop (vv + 2)
      else
        true
    loop 3

此版本的is_prime 依赖于 F# 中的尾调用优化,将循环扩展为高效的 for 循环(您可以使用 ILSpy 看到这一点)。 ILSpy 将循环反编译成这样:

while (vv <= stop)

  if (_arg1 % vv == 0)
  
    return false;
  
  int arg_13_0 = _arg1;
  int arg_11_0 = stop;
  vv += 2;
  stop = arg_11_0;
  _arg1 = arg_13_0;

这个循环不分配内存,只是一个相当有效的循环。人们看到一些无意义的任务,但希望 JIT:er 消除这些任务。我确信is_prime 可以进一步改进。

在高性能代码中使用Seq 时,必须记住它是惰性的,并且默认情况下不使用记忆(参见Seq.cache)。因此,人们可能很容易一遍又一遍地做同样的工作(请参阅@rmunn 答案)。

此外,Seq 并不是特别有效,因为 IEnumerable/IEnumerator 的设计方式。更好的选择是例如 Nessos Streams(在 nuget 上可用)。

如果您有兴趣,我做了一个快速实现,它依赖于一个简单的 Push Stream,看起来性能不错:

// Receiver<'T> is a callback that receives a value.
//  Returns true if it wants more values, false otherwise.
type Receiver<'T> = 'T -> bool
// Stream<'T> is function that accepts a Receiver<'T>
//  This means Stream<'T> is a push stream (as opposed to Seq that uses pull)
type Stream<'T>   = Receiver<'T> -> unit

// is_prime returns true if the input is prime, false otherwise
let is_prime = function
  | 1                 -> false
  | 2                 -> true
  | v when v % 2 = 0  -> false
  | v ->
    let stop = v |> float |> sqrt |> int
    let rec loop vv =
      if vv <= stop then
        (v % vv) <> 0 && loop (vv + 2)
      else
        true
    loop 3

// tryFind looks for the first value in the input stream for f v = true.
//  If found tryFind returns Some v, None otherwise
let tryFind f (s : Stream<'T>) : 'T option =
  let res = ref None
  s (fun v -> if f v then res := Some v; false else true)
  !res

// diagonals generates a tuple stream of all diagonal values
//  The first value is the side length, the second value is the diagonal value
let diagonals : Stream<int*int> =
  fun r ->
    let rec loop side v =
      let step  = side - 1
      if r (side, v + 1*step) && r (side, v + 2*step) && r (side, v + 3*step) && r (side, v + 4*step) then
        loop (side + 2) (v + 4*step)
    if r (1, 1) then loop 3 1

// ratio computes the streaming ratio for f v = true
let ratio f (s : Stream<'T>) : Stream<float*'T> =
  fun r ->
    let inc r = r := !r + 1.
    let acc   = ref 0.
    let count = ref 0.
    s (fun v -> (inc count; if f v then inc acc); r (!acc/(!count), v))

let result =
  diagonals
  |> ratio (snd >> is_prime)
  |> tryFind (fun (r, (_, v)) -> v > 1 && r < 0.1)

【讨论】:

感谢您提供非常深入的帖子!我也使用 ILSpy 来查看是否可以找到任何可疑的昂贵的东西。我对 IEnumerator 的性能了解不多。我只是将您的 is_prime 函数放入我的代码中,现在它运行不到一秒钟,比 C++ 代码快!如果List.tryFind 不能编译成简单的迭代循环,我为什么还要使用它? List.tryFind 如果已经有一个需要搜索的列表,则可以使用。这里的问题是生成一个列表然后搜索它。与我们喜欢的实际工作(模运算)相比,这相当昂贵。 更新了答案,以澄清最大的问题是列表创建。

以上是关于F# 性能:是啥让这段代码如此缓慢?的主要内容,如果未能解决你的问题,请参考以下文章

是啥让这两个功能分别为“公共”和“私有”,这是啥意思?

是啥让 ES6 如此特别?

Project loom:是啥让使用虚拟线程时性能更好?

为啥我的 Apps 脚本删除循环运行如此缓慢?我可以提高性能吗?

是啥让这个函数运行得慢得多?

是啥让投票回归者的得分低于其选民的得分?