在 Spark 2 中使用 DataSet.repartition - 多个任务处理多个分区
Posted
技术标签:
【中文标题】在 Spark 2 中使用 DataSet.repartition - 多个任务处理多个分区【英文标题】:using DataSet.repartition in Spark 2 - several tasks handle more than one partition 【发布时间】:2017-07-31 19:34:16 【问题描述】:我们有一个 spark 流应用程序(spark 2.1 在 Hortonworks 2.6 上运行)并使用 DataSet.repartition
(在从 Kafka 读取的 DataSet<Row>
上)根据给定列重新分区 DataSet<Row>'s
分区(称为block_id)。
我们从包含 50 个分区的 DataSet<Row>
开始,最终(在调用 DataSet.repartition
之后)分区数等于唯一 block_id's 的数量。
问题在于 DataSet.repartition
的行为与我们预期的不同 - 当我们查看运行 repartition
的 spark 作业的事件时间线时,我们看到有几个任务处理 1 block_id em> 和更少的任务处理 2 个 block_id's 甚至 3 或 4 个 block_id's。
似乎DataSet.repartition
确保具有相同block_id 的所有Rows
将位于单个分区内,但并不是每个创建分区的任务将只处理一个block_id。
结果是重新分区作业(在流式应用程序中运行)所花费的时间与其最长的任务(即处理最多 block_id 的任务)一样多。
我们尝试使用分配给流媒体应用的 Vcore 数量 - 从 10 到 25 到 50(我们在从 Kafka 读取的原始 RDD 中有 50 个分区),但结果是相同的 - 总是有一个或多个任务处理多个block_id。
我们甚至尝试增加批处理时间,但同样没有帮助我们实现一个任务处理一个 block_id 的目标。
举个例子 - 下面是描述repartition
spark 作业运行的事件时间线和任务表:
事件时间线 - 红色的两个任务是处理两个block_id的任务:
任务表 - 红色的两个任务与上面的两个相同 - 注意每个任务的持续时间是所有其他任务持续时间的两倍(仅处理一个 block_id )
这对我们来说是个问题,因为流式应用程序由于这些长任务而延迟,我们需要一个解决方案,使我们能够在 DataSet 上执行repartition
,同时让每个任务只处理一个 block_id时间>。
如果这不可能,那么也许在 JavaRDD?
上是可能的,因为在我们的例子中,DataSet<Row>
我们运行 repartition
是从 JavaRDD
创建的。
【问题讨论】:
【参考方案1】:你需要考虑的2个问题:
拥有一个确保数据均匀分布的自定义分区器,1 个 block_id / 分区 调整集群大小,以便您有足够的执行程序来同时运行所有任务 (block_ids)正如您所见,对 DataFrame 进行简单的重新分区并不能保证您将获得均匀分布。当您通过 block_id 重新分区时,它将使用 HashPartitioner,公式为:
Utils.nonNegativeMod(key.hashCode, numPartitions)
见:https://github.com/apache/spark/blob/branch-2.2/core/src/main/scala/org/apache/spark/Partitioner.scala#L80-L88
很有可能 2+ 个键被分配给相同的 partition_id,因为 partition_id 是键的 hashCode 模 numPartitions。
您可以通过使用带有自定义分区器的 RDD 来实现您所需要的。最简单的方法是在重新分区之前提取不同的 block_id 列表。
这是一个简单的例子。假设您可以有 5 个块(2,3,6,8,9)并且您的集群有 8 个执行器(最多可以同时运行 8 个任务),我们被 3 个执行器过度配置:
scala> spark.conf.get("spark.sql.shuffle.partitions")
res0: String = 8
scala> spark.conf.get("spark.default.parallelism")
res1: String = 8
// Basic class to store dummy records
scala> case class MyRec(block_id: Int, other: String)
defined class MyRec
// Sample DS
scala> val ds = List((2,"A"), (3,"X"), (3, "B"), (9, "Y"), (6, "C"), (9, "M"), (6, "Q"), (2, "K"), (2, "O"), (6, "W"), (2, "T"), (8, "T")).toDF("block_id", "other").as[MyRec]
ds: org.apache.spark.sql.Dataset[MyRec] = [block_id: int, other: string]
scala> ds.show
+--------+-----+
|block_id|other|
+--------+-----+
| 2| A|
| 3| X|
| 3| B|
| 9| Y|
| 6| C|
| 9| M|
| 6| Q|
| 2| K|
| 2| O|
| 6| W|
| 2| T|
| 8| T|
+--------+-----+
// Default partitioning gets data distributed as uniformly as possible (record count)
scala> ds.rdd.getNumPartitions
res3: Int = 8
// Print records distribution by partition
scala> ds.rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").show
+------------+--------------+
|partition_id| block_ids|
+------------+--------------+
| 0| [[2,A]]|
| 1|[[3,X], [3,B]]|
| 2| [[9,Y]]|
| 3|[[6,C], [9,M]]|
| 4| [[6,Q]]|
| 5|[[2,K], [2,O]]|
| 6| [[6,W]]|
| 7|[[2,T], [8,T]]|
+------------+--------------+
// repartitioning by block_id leaves 4 partitions empty and assigns 2 block_ids (6,9) to same partition (1)
scala> ds.repartition('block_id).rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").where(size('block_ids) > 0).show(false)
+------------+-----------------------------------+
|partition_id|block_ids |
+------------+-----------------------------------+
|1 |[[9,Y], [6,C], [9,M], [6,Q], [6,W]]|
|3 |[[3,X], [3,B]] |
|6 |[[2,A], [2,K], [2,O], [2,T]] |
|7 |[[8,T]] |
+------------+-----------------------------------+
// Create a simple mapping for block_id to partition_id to be used by our custom partitioner (logic may be more elaborate or static if the list of block_ids is static):
scala> val mappings = ds.map(_.block_id).dropDuplicates.collect.zipWithIndex.toMap
mappings: scala.collection.immutable.Map[Int,Int] = Map(6 -> 1, 9 -> 0, 2 -> 3, 3 -> 2, 8 -> 4)
//custom partitioner assigns partition_id according to the mapping arg
scala> class CustomPartitioner(mappings: Map[Int,Int]) extends org.apache.spark.Partitioner
| override def numPartitions: Int = mappings.size
| override def getPartition(rec: Any): Int = mappings.getOrElse(rec.asInstanceOf[Int], 0)
|
defined class CustomPartitioner
// Repartition DS using new partitioner
scala> val newDS = ds.rdd.map(r => (r.block_id, r)).partitionBy(new CustomPartitioner(mappings)).toDS
newDS: org.apache.spark.sql.Dataset[(Int, MyRec)] = [_1: int, _2: struct<block_id: int, other: string>]
// Display evenly distributed block_ids
scala> newDS.rdd.mapPartitionsWithIndex((idx, it) => Iterator((idx, it.toList))).toDF("partition_id", "block_ids").where(size('block_ids) > 0).show(false)
+------------+--------------------------------------------+
|partition_id|block_ids |
+------------+--------------------------------------------+
|0 |[[9,[9,Y]], [9,[9,M]]] |
|1 |[[6,[6,C]], [6,[6,Q]], [6,[6,W]]] |
|2 |[[3,[3,X]], [3,[3,B]]] |
|3 |[[2,[2,A]], [2,[2,K]], [2,[2,O]], [2,[2,T]]]|
|4 |[[8,[8,T]]] |
+------------+--------------------------------------------+
【讨论】:
我们尝试了您的方法(将您编写的 scala 代码翻译成 java)并且大多数情况下它都有效(这意味着 RDD.repartition 作业中的所有任务都是线性的 - 每个都处理相同数量的分区)。但是,有时 RDD.repartition 作业中的一项任务会永远卡住,我猜是无限循环(因为顶部正好显示 100%)。这会导致流媒体卡住,在这种情况下只有重新启动才有帮助。这是 spark2 中的已知错误吗? 分区(块)是否有可能变得太大(大key导致数据倾斜,导致GC)?您能否尝试记录每个块的前 10 条记录,也许运行详细的 gc 并在发生这种情况时进行更深入的挖掘,检查正在运行的任务的线程转储等(databricks.com/blog/2015/05/28/…)。这种方法依赖于正确平衡的数据,否则即使每个执行程序只处理 1 个,一个 1 行的块与一个 1M 行的块仍然会导致处理偏差。 RDD.repartition 作业每次卡住流式应用程序,都是由某些任务卡住造成的。每次发生此错误时,这些任务是唯一比所有其他任务处理更少(!!)记录的任务。例如- 如果重新分区作业有 50 个任务,那么 46 个任务正好处理 1M 条记录,而 4 个卡住的任务处理的记录更少。因此我们可以确定此错误始终是由处理少于所需行数的任务引起的。我们验证GC不是问题,线程转储只显示4个线程处于WAITING状态。 @EladEldor 您找到根本原因和解决方案了吗?谢谢。以上是关于在 Spark 2 中使用 DataSet.repartition - 多个任务处理多个分区的主要内容,如果未能解决你的问题,请参考以下文章
在 Spark 上下文中使用多个同时作业的 Spark 2 作业监控 (JobProgressListener)
如何在Spark提交中使用s3a和Apache spark 2.2(hadoop 2.8)?
在 docker 中使用 spark 2.2 运行 zeppelin
在 spark 版本 2.2.0 中使用 python(pyspark) 从 mqtt 获取数据流