Spark 需要 0.5 秒来平均 100 个数字
Posted
技术标签:
【中文标题】Spark 需要 0.5 秒来平均 100 个数字【英文标题】:Spark takes 0.5 second to average 100 numbers 【发布时间】:2017-08-20 06:42:02 【问题描述】:我有一个包含大约 7000 万行用户位置和日期时间的 CSV 数据集,并编写了以下代码来平均前 100 个用户的点数:
val spark = org.apache.spark.sql.SparkSession.builder
.appName("Test")
.getOrCreate
import spark.implicits._
val watch = new Stopwatch()
watch.start()
val schema = new StructType().add("user_id", StringType).add("datetime", LongType)
val df = spark.read.format("csv").option("header", "true").schema(schema).csv(inputFile)
df.createOrReplaceTempView("paths")
val pathDs = spark.sql("select user_id, min(datetime) as started, max(datetime) as finished, " +
"count(*) as total, max(datetime) - min(datetime) as timeDelta " +
"from paths group by user_id order by total desc limit 100")
pathDs.cache()
pathDs.collect.foreach(println)
println(watch.elapsedTime(TimeUnit.MILLISECONDS))
val avgPoints = pathDs.select(avg("total")).as[Double].head()
println(avgPoints)
println(watch.stop())
这里发生的情况是,我获取数百万/数十亿条记录(最终可能需要 TB),并将它们聚合成 5 列的 100 条记录。问题不在于这部分需要多长时间或我们如何加快速度,而是当我们处理生成的 100 条记录时会发生什么。
还有一种更简单的方法可以通过 SQL 直接执行此操作,但我还需要 pathDS 稍后进行更多处理。代码运行良好,但我注意到 pathDs.select(avg("total")).as[Double].head()
开始做大量工作并最终花费大约半秒,即使 pathDS 仅包含 100 行。
您知道为什么要花这么长时间吗?我怎样才能加快速度,特别是在处理这个只有 100 行的小型数据集时?在进行任何进一步聚合之前,我专门对其进行了 .cache 和 .collect 以在本地调出所有 100 条记录(无论如何我现在正在本地运行它)。
我在本地使用 Scala 2.11 上的 Spark 2.2。
【问题讨论】:
我不在这里处理小数据 - 完整的数据集将以数百 GB/TB 为单位。但我特别想看看为什么这个特定的例子很慢,因为我认为执行 .cache() 应该将我的 100 行数据集保留在内存中,并且结果应该立即出来。 我建议查看 SparkUI 中的一些高级指标,以了解为什么需要这么长时间。我还建议通过此代码运行至少一个数据集,例如 1000 个值,这样您就可以更好地推断恒定开销与实际 (O(n)) 计算时间。 【参考方案1】:Spark 针对大型数据集进行了优化。这意味着通常有一些开销对于大型数据集可以忽略不计,但对于小型数据集却不是那么可忽略。
考虑运行计算 avgPoints 时会发生什么:
-
Spark 计算“转换”,即它定义了需要进行的计算(这是选择、平均等的部分)。
您调用“head”动作,这会导致 spark 采用您创建的表达式树并将其转换为物理计划。这包括优化以及比较几种可能的解决方案。注意,该表达式还包括计算缓存部分的表达式。在实践中,这些步骤将被跳过(您可以在 spark UI 中看到这一点),但它们仍被视为 spark 可能会在某些情况下决定重新计算一些缓存数据(在这种情况下几乎肯定不会)。
Spark 使用全阶段代码生成将物理计划编译成代码,序列化此代码并将其发送给所有相关的执行器。
当 spark 创建计划时,它对数据进行分区(可能是 200 个分区,因为这是 groupby 的默认值)。这意味着您在执行者之间分配了 200 个任务。大多数分区将有 0 或 1 个元素,因此它们执行的任务几乎是即时的,但 spark 必须启动 200 个任务。
Spark 将 200 个任务中的每一个任务的结果发送到缓冲区,然后将它们全部发送到单个执行程序以进行最终聚合。在所有任务完成并发送数据之前,最终的聚合任务不会开始。
最终聚合完成后,会将结果发送回驱动程序。
如您所见,这里有很多阶段,包括网络传输和开始/结束任务(需要管理)。即使没有实际数据,这里的总开销也很容易达到半秒。
如果您改为将限制更改为 1000,即使您处理 10 倍的数据,您也可能会看到总时间的变化很小。
使用 spark 来减少问题的大小是一个常见的用例,即您有大量数据,进行一些聚合并获得较少数量的元素(在您的情况下为 100 个),然后您将它们收集到驱动程序并直接对它们采取行动,而不是使用 spark 来避免开销(例如,在您的情况下,保存收集的结果,而不是使用 println 进行 foreach,只需将它们总结起来)。
您可以做的一件事是在计算 pathD 时使用 coalesce(1)。这意味着您只有一个分区(所有内容的加入将是第一阶段的一部分)。这与使用收集结果没有太大区别,只是如果您想将限制更改为更大的大小,则合并为一个较小但不是 1 的值可能很有用(例如,您可能会限制 10000,然后合并为 4 以仍然得到一些并行性)。
更新
基于 cmets,当前限制的结果是 1 个分区,因此合并不会有帮助(这也意味着没有真正的理由不进行收集,除非您想对结果使用数据框函数)。上面描述的过程仍然是正确的,只是只使用了一个分区而不是多个分区。
【讨论】:
我建议删除最后一段。这是完全没用的,尤其是当您考虑到pathDs
将始终有 1 个分区时。
@zero323 这总是真的吗?分区数不会取决于限制的实际大小和每个分区的键数吗?
所以在我的代码中,我在计算平均值之前专门做了 pathDs.cache pathDs.collect.foreach(println),认为这实际上会降低驱动程序的结果 - 但它没有好像就是这么回事。在这里收集的正确方法是什么?请注意,如果我收集并操作收集到的数据集,结果会快得多,因为我只处理本地 Scala 对象,但如果我收集、打印,然后在 pathDS 上执行其他操作(如在我的代码中),它仍然是 0.5 秒。有没有更好的缓存/收集方法?
@kozyr 你的缓存+收集的效果是物化缓存(你扔掉收集)。当您使用 head 操作时,它会重新计算所有内容,但由于它被缓存,它会跳过缓存的元素。您仍然在 spark 内部进行聚合。相反,如果您将收集放在一个变量中并使用它,您将不会使用 spark。
@AssafMendelson 我猜还有一个问题,那么是否有一种无需重新计算的缓存方式?【参考方案2】:
优化它的一种方法是使用函数collect
将整个数据集放入内存,然后使用规范的 scala 操作可以在 1-2 毫秒内完成它?但这违背了最初使用 Spark 的原因。
Spark 的优势在于跨不同机器上的多个节点高效地执行分布式计算。在不通过 Spark 的情况下,对小型数据集的操作总是会更高效。您进行实验类似于为必须飞行 100m 的 747 计时。现在你想知道为什么 747 这么慢,大家都说飞行让你飞得这么快。
在使用 RDD 在 Spark 中工作的旧方式中,在 1.2 -> 1.6 版本左右,您可以使用 mapPartitionsWithIndex
之类的函数对分区数据执行正常的 scala 操作以避免火花开销。这当然意味着在该函数中,所有数据都已经在 spark 节点级别隔离。使用这种方法,您可以获得两全其美的好处。
【讨论】:
问题是,我确实在我的数据集上运行收集。我还运行 .cache() 另外,我没有在小数据集上操作 - 这将在生产中以 TB 为单位,如果在这里计算平均 100 个数字在某种程度上取决于我的初始数据集,那么这将需要远远超过 0.5 秒,这将是非常麻烦的。 IE。假设这是我的用例 - 获取数 TB 的 GPS 点,获取前 100 条路径,计算其中的一些平均值。【参考方案3】:-
在没有
cache
的情况下进行测量,因为在将数据加载到cache
本身时会浪费时间。可能会更快。
您能否将input data
放入parquet
并将其加载到内存存储中,例如同一集群上的alluxio
?如果是,partition
user_id
。理想情况下,设计架构使得新的输入数据被推送到kafka
,一个structured streaming
作业将其附加到alluxio
或cassandra
,另一个在选定范围内聚合。此外,请尝试使用flink
或batch
或stream
,因为它通常更快。
如果你不能控制输入数据结构,那么重点给2dn阶段,尝试使用typed aggregates
如:
groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K,
T]
您需要为csv
创建数据集:
case class Input(userId: String, time: DateTime)
val ds = spark.read.format("csv").option("header",
"true").schema(schema).csv(inputFile).as[Input]
ds.groupByKey(_.userId).avg(_.time).show
由于类型化性能优势,对于大型数据集肯定会更快,但对于较小的数据集可能不会
【讨论】:
问题是,这里有两个阶段 - 一个加载大量数据(不管需要多长时间),然后将其聚合成 100 条记录。然后第二阶段从这 100 条记录中计算单个列的平均值,这需要 0.5 秒,这非常慢。 我认为这不会给我带来任何性能优势,也不会真正解释我所看到的情况,这只是我遇到问题的小数据集。 由于类型化的性能优势,它对于大型数据集肯定会更快 - 它肯定不会:)以上是关于Spark 需要 0.5 秒来平均 100 个数字的主要内容,如果未能解决你的问题,请参考以下文章
使用 .NET for Spark 对 DataFrame 进行递归计算
假设网络的生产管理系统采用B/S工作方式,经常上网的用户数为100个,每个用户每分钟平均产生11个事务,平均事务量大小为0.06MB,则这个系统需要的传输速率为(34)。