Scala 与 Python 的 Spark 性能

Posted

技术标签:

【中文标题】Scala 与 Python 的 Spark 性能【英文标题】:Spark performance for Scala vs Python 【发布时间】:2015-09-08 17:46:02 【问题描述】:

我更喜欢 Python 而不是 Scala。但是,由于 Spark 本身是用 Scala 编写的,因此出于显而易见的原因,我希望我的代码在 Scala 中比 Python 版本运行得更快。

有了这个假设,我想学习并编写一些非常常见的预处理代码的 Scala 版本,用于处理 1 GB 的数据。数据来自 Kaggle 上的 SpringLeaf 竞赛。只是为了概述数据(它包含 1936 个维度和 145232 行)。数据由各种类型组成,例如整数、浮点数、字符串、布尔值。我使用 8 个内核中的 6 个进行 Spark 处理;这就是我使用minPartitions=6 的原因,以便每个内核都有要处理的内容。

Scala 代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex  (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter 
val delim1 = "\001"

def separateCols(line: String): Array[String] = 
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) 
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  
  vals


val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = 
  val vals = line.split(delim1)
  (vals(0), vals(1))


val input4 = input3.map(toKeyVal)

def valsConcat(val1: String, val2: String): String = 
  val1 + "," + val2


val input5 = input4.reduceByKey(valsConcat)

input5.saveAsTextFile("output")

Python 代码

input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'


def drop_first_line(index, itr):
  if index == 0:
    return iter(list(itr)[1:])
  else:
    return itr

input2 = input.mapPartitionsWithIndex(drop_first_line)

def separate_cols(line):
  line = line.replace('true', '1').replace('false', '0')
  vals = line.split(',')
  vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
           for e, val in enumerate(vals)]
  return vals2


input3 = input2.flatMap(separate_cols)

def to_key_val(kv):
  key, val = kv.split(DELIM_1)
  return (key, val)
input4 = input3.map(to_key_val)

def vals_concat(v1, v2):
  return v1 + ',' + v2

input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')

Scala 性能 第 0 阶段(38 分钟),第 1 阶段(18 秒)

Python 性能 第 0 阶段(11 分钟),第 1 阶段(7 秒)

两者都生成不同的 DAG 可视化图(因此,两张图片都显示了 Scala (map) 和 Python (reduceByKey) 的不同阶段 0 函数)

但是,基本上这两个代码都试图将数据转换为(dimension_id,值列表字符串)RDD 并保存到磁盘。输出将用于计算每个维度的各种统计信息。

在性能方面,此类真实数据的 Scala 代码的运行速度似乎比 Python 版本慢 4 倍。 对我来说,好消息是它给了我继续使用 Python 的动力。坏消息是我不太明白为什么?

【问题讨论】:

也许这取决于代码和应用程​​序,因为我得到了另一个结果,即apache spark python is slower than scala, when summing a billion terms of the Leibniz formula for π 有趣的问题!顺便说一句,也可以看看这里:emptypipes.org/2015/01/17/python-vs-scala-vs-spark 你拥有的核心越多,你就越看不到语言之间的差异。 您是否考虑过accepting 现有的答案? 【参考方案1】:

讨论代码的原始答​​案可以在下面找到。


首先,您必须区分不同类型的 API,每种 API 都有自己的性能考虑因素。

RDD API

(纯 Python 结构和基于 JVM 的编排)

这是受 Python 代码性能和 PySpark 实现细节影响最大的组件。虽然 Python 性能不太可能成为问题,但您至少需要考虑几个因素:

JVM 通信的开销。实际上,所有进出 Python 执行程序的数据都必须通过套接字和 JVM 工作程序。虽然这是一种相对有效的本地通信,但它仍然不是免费的。

基于进程的执行器 (Python) 与基于线程的(单个 JVM 多线程)执行器 (Scala)。每个 Python 执行器都在自己的进程中运行。作为副作用,它提供了比其 JVM 对应物更强的隔离性和对执行程序生命周期的一些控制,但可能会显着提高内存使用率:

解释器内存占用 已加载库的足迹 广播效率较低(每个进程都需要自己的广播副本)

Python 代码本身的性能。一般来说,Scala 比 Python 快,但它会因任务而异。此外,您还有多种选择,包括像 Numba 这样的 JIT、C 扩展 (Cython) 或像 Theano 这样的专用库。最后,如果您不使用 ML / MLlib(或简单的 NumPy 堆栈),请考虑使用 PyPy 作为替代解释器。见SPARK-3094。

PySpark 配置提供了spark.python.worker.reuse 选项,可用于在为每个任务派生 Python 进程和重用现有进程之间进行选择。后一种选择似乎有助于避免昂贵的垃圾收集(它更像是一种印象而不是系统测试的结果),而前一种(默认)最适合用于昂贵的广播和导入。 在 CPython 中用作第一行垃圾回收方法的引用计数非常适用于典型的 Spark 工作负载(流式处理,无引用周期),并降低了长时间 GC 暂停的风险。

MLlib

(混合 Python 和 JVM 执行)

基本注意事项与以前几乎相同,但有一些额外的问题。虽然与 MLlib 一起使用的基本结构是纯 Python RDD 对象,但所有算法都直接使用 Scala 执行。

这意味着将 Python 对象转换为 Scala 对象会产生额外成本,反之亦然,会增加内存使用量以及一些我们将在稍后介绍的额外限制。

截至目前 (Spark 2.x),基于 RDD 的 API 处于维护模式,为 scheduled to be removed in Spark 3.0。

DataFrame API 和 Spark ML

(JVM 执行与 Python 代码仅限于驱动程序)

这些可能是标准数据处理任务的最佳选择。由于 Python 代码大多仅限于驱动程序上的高级逻辑操作,因此 Python 和 Scala 之间应该没有性能差异。

一个例外是使用逐行 Python UDF,其效率明显低于其 Scala 等效项。虽然有一些改进的机会(在 Spark 2.0.0 中已经有了实质性的发展),但最大的限制是内部表示 (JVM) 和 Python 解释器之间的完整往返。如果可能,您应该支持内置表达式的组合(example。Spark 2.0.0 中改进了 Python UDF 行为,但与本机执行相比,它仍然不是最理想的。

将来可能会改进随着 vectorized UDFs (SPARK-21190 and further extensions) 的引入而得到显着改进,它使用 Arrow Streaming 进行有效的数据交换和零拷贝反序列化。对于大多数应用程序,它们的次要开销可以忽略不计。

还要确保避免在DataFramesRDDs 之间传递不必要的数据。这需要昂贵的序列化和反序列化,更不用说与 Python 解释器之间的数据传输了。

值得注意的是,Py4J 调用具有相当高的延迟。这包括简单的调用,例如:

from pyspark.sql.functions import col

col("foo")

通常,这无关紧要(开销是恒定的,不取决于数据量),但在软实时应用程序的情况下,您可以考虑缓存/重用 Java 包装器。

GraphX 和 Spark 数据集

目前(Spark 1.6 2.1)都没有提供 PySpark API,所以你可以说 PySpark 比 Scala 差了很多。

图X

实际上,GraphX 开发几乎完全停止,该项目目前处于维护模式,related JIRA tickets closed as won't fix。 GraphFrames 库提供了一个带有 Python 绑定的替代图形处理库。

数据集

主观地说,在 Python 中静态类型的 Datasets 并没有太多的地方,即使当前的 Scala 实现过于简单,也无法提供与 DataFrame 相同的性能优势。

流媒体

就我目前所见,我强烈建议使用 Scala 而不是 Python。如果 PySpark 获得对结构化流的支持,它可能会在未来发生变化,但现在 Scala API 似乎更加健壮、全面和高效。我的经验非常有限。

Spark 2.x 中的结构化流似乎缩小了语言之间的差距,但目前仍处于早期阶段。尽管如此,基于 RDD 的 API 已经在 Databricks Documentation(访问日期 2017-03-03)中被称为“遗留流”,因此期待进一步的统一努力是合理的。

非性能注意事项

功能奇偶校验

并非所有 Spark 功能都通过 PySpark API 公开。请务必检查您需要的部分是否已经实现,并尝试了解可能的限制。

当您使用 MLlib 和类似的混合上下文时,这一点尤其重要(请参阅Calling Java/Scala function from a task)。公平地说,PySpark API 的某些部分,例如 mllib.linalg,提供了比 Scala 更全面的方法集。

API设计

PySpark API 紧密地反映了它的 Scala 对应物,因此并不完全是 Pythonic。这意味着在语言之间进行映射非常容易,但与此同时,Python 代码可能更难理解。

复杂的架构

与纯 JVM 执行相比,PySpark 数据流相对复杂。推理 PySpark 程序或调试要困难得多。此外,至少对 Scala 和 JVM 有基本的了解是必不可少的。

Spark 2.x 及更高版本

Dataset API 的持续转变,以及冻结的 RDD API 为 Python 用户带来了机遇和挑战。虽然 API 的高级部分更容易在 Python 中公开,但更高级的功能几乎不可能直接使用。

此外,原生 Python 函数仍然是 SQL 世界中的二等公民。希望将来通过 Apache Arrow 序列化(current efforts target data collection 但 UDF serde 是 long term goal)会有所改善。

对于强烈依赖 Python 代码库的项目,纯 Python 替代方案(如 Dask 或 Ray)可能是一个有趣的替代方案。

不一定非要一对一

Spark DataFrame (SQL, Dataset) API 提供了一种在 PySpark 应用程序中集成 Scala/Java 代码的优雅方式。您可以使用DataFrames 将数据公开给本机JVM 代码并读回结果。我已经解释了一些选项somewhere else,您可以在How to use a Scala class inside Pyspark 中找到 Python-Scala 往返的工作示例。

它可以通过引入用户定义类型来进一步增强(参见How to define schema for custom type in Spark SQL?)。


问题中提供的代码有什么问题

(免责声明:Pythonista 的观点。很可能我错过了一些 Scala 技巧)

首先,您的代码中有一部分完全没有意义。如果您已经使用zipWithIndexenumerate 创建了(key, value) 对,那么创建字符串只是为了在之后将其拆分有什么意义? flatMap 不能递归工作,因此您可以简单地生成元组并跳过 map 之后的任何内容。

我发现有问题的另一部分是reduceByKey。一般来说,reduceByKey 很有用,如果应用聚合函数可以减少必须洗牌的数据量。由于您只是连接字符串,因此这里没有任何收获。忽略低级的东西,比如引用的数量,你必须传输的数据量与 groupByKey 完全相同。

通常我不会详述这一点,但据我所知,这是您的 Scala 代码中的一个瓶颈。在 JVM 上连接字符串是一项相当昂贵的操作(例如:Is string concatenation in scala as costly as it is in Java?)。这意味着类似_.reduceByKey((v1: String, v2: String) =&gt; v1 + ',' + v2) 在您的代码中相当于input4.reduceByKey(valsConcat) 不是一个好主意。

如果您想避免使用groupByKey,可以尝试将aggregateByKeyStringBuilder 一起使用。与此类似的东西应该可以解决问题:

rdd.aggregateByKey(new StringBuilder)(
  (acc, e) => 
    if(!acc.isEmpty) acc.append(",").append(e)
    else acc.append(e)
  ,
  (acc1, acc2) => 
    if(acc1.isEmpty | acc2.isEmpty)  acc1.addString(acc2)
    else acc1.append(",").addString(acc2)
  
)

但我怀疑这是否值得大惊小怪。

牢记以上内容,我已将您的代码重写如下:

斯卡拉

val input = sc.textFile("train.csv", 6).mapPartitionsWithIndex
  (idx, iter) => if (idx == 0) iter.drop(1) else iter


val pairs = input.flatMap(line => line.split(",").zipWithIndex.map
  case ("true", i) => (i, "1")
  case ("false", i) => (i, "0")
  case p => p.swap
)

val result = pairs.groupByKey.map
  case (k, vals) =>  
    val valsString = vals.mkString(",")
    s"$k,$valsString"
  


result.saveAsTextFile("scalaout")

Python

def drop_first_line(index, itr):
    if index == 0:
        return iter(list(itr)[1:])
    else:
        return itr

def separate_cols(line):
    line = line.replace('true', '1').replace('false', '0')
    vals = line.split(',')
    for (i, x) in enumerate(vals):
        yield (i, x)

input = (sc
    .textFile('train.csv', minPartitions=6)
    .mapPartitionsWithIndex(drop_first_line))

pairs = input.flatMap(separate_cols)

result = (pairs
    .groupByKey()
    .map(lambda kv: "0,1".format(kv[0], ",".join(kv[1]))))

result.saveAsTextFile("pythonout")

结果

local[6] 模式下(Intel(R) Xeon(R) CPU E3-1245 V2 @ 3.40GHz),每个执行程序需要 4GB 内存(n = 3):

Scala - 平均值:250.00s,标准差:12.49 Python - 平均:246.66s,标准差:1.15

我很确定大部分时间都花在了洗牌、序列化、反序列化和其他次要任务上。只是为了好玩,下面是简单的 Python 单线程代码,它可以在不到一分钟的时间内在这台机器上执行相同的任务:

def go():
    with open("train.csv") as fr:
        lines = [
            line.replace('true', '1').replace('false', '0').split(",")
            for line in fr]
    return zip(*lines[1:])

【讨论】:

这是同一个任务吗?最后一个zip是不是很懒,没有保存到文件?【参考方案2】:

上述答案的扩展 -

Scala 在很多方面都证明比 python 更快,但是 python 变得比 scala 更受欢迎有一些正当的原因,让我们看看其中的几个——

用于 Apache Spark 的 Python 非常易于学习和使用。然而,这并不是 Pyspark 比 Scala 更好的选择的唯一原因。还有更多。

用于 Spark 的 Python API 在集群上可能会更慢,但最终,与 Scala 相比,数据科学家可以用它做更多的事情。没有 Scala 的复杂性。界面简洁全面。

谈代码的可读性、维护和熟悉 Python API for Apache Spark 远胜于 Scala。

Python 附带了几个与机器学习和自然语言处理相关的库。这有助于数据分析,并且还具有成熟且经过时间考验的统计数据。例如,numpy、pandas、scikit-learn、seaborn 和 matplotlib。

注意:大多数数据科学家使用混合方法,充分利用这两种 API。

最后,Scala 社区对程序员的帮助往往少得多。这使得 Python 成为一个非常有价值的学习。如果您对任何静态类型的编程语言(如 Java)有足够的经验,您就不必担心完全不使用 Scala。

【讨论】:

来源:analytixlabs.co.in/blog/pyspark-taking-scala

以上是关于Scala 与 Python 的 Spark 性能的主要内容,如果未能解决你的问题,请参考以下文章

Spark常用的算子以及Scala函数总结

scala python哪个用来开发spark更好

如何成为Spark高手

教你如何在Spark Scala/Java应用中调用Python脚本

使用 spark 和 scala 进行连接计数时获得性能的最佳方法

教你如何在Spark Scala/Java应用中调用Python脚本