spark saveAsTextFile 最后一个分区(几乎?)永远不会完成
Posted
技术标签:
【中文标题】spark saveAsTextFile 最后一个分区(几乎?)永远不会完成【英文标题】:spark saveAsTextFile last partition (almost?) never finishes 【发布时间】:2015-08-04 09:50:18 【问题描述】:我有一个非常简单的类似字数统计的程序,它可以生成(长,双)这样的字数:
val lines = sc.textFile(directory)
lines.repartition(600).mapPartitionslineIterator =>
// Generate iterator of (Long,Double) counts
.reduceByKey(new HashPartitioner(30), (v1, v2) => v1 + v2).saveAsTextFile(outDir, classOf[GzipCodec])
我的问题:30 个分区中的最后一个分区永远不会被写入。
这里有一些细节:
我的输入是 5 GB gz 压缩的,我预计大约 1B 的唯一长键。 我在 32 核 1.5TB 机器上运行。输入和输出来自具有 2TB 可用空间的本地磁盘。 Spark 被分配使用所有的 ram,并且很高兴地这样做了。此应用程序占用大约 0.5 TB。我可以观察到以下几点:
对于 29 个分区,reduce 和 repartition(由于 HashPartitioner)大约需要 2 小时。最后一个没有完成,即使过了一天也没有。两到四个线程保持 100%。 日志中未出现错误或警告 Spark 在 /tmp 中占用大约 100GB,这与 UI 报告的随机写入一致。 在 UI 中,我可以看到剩余任务的“随机读取记录”数量增长非常非常缓慢。一天后,距离所有已完成任务的显示还有一个数量级。最后的日志是这样的:
15/08/03 23:26:43 INFO SparkHadoopWriter: attempt_201508031748_0002_m_000020_748: Committed
15/08/03 23:26:43 INFO Executor: Finished task 20.0 in stage 2.0 (TID 748). 865 bytes result sent to driver
15/08/03 23:27:50 INFO FileOutputCommitter: Saved output of task 'attempt_201508031748_0002_m_000009_737' to file:/output-dir/_temporary/0/task_201508031748_0002_m_000009
15/08/03 23:27:50 INFO SparkHadoopWriter: attempt_201508031748_0002_m_000009_737: Committed
15/08/03 23:27:50 INFO Executor: Finished task 9.0 in stage 2.0 (TID 737). 865 bytes result sent to driver
15/08/04 02:44:54 INFO BlockManager: Removing broadcast 3
15/08/04 02:44:54 INFO BlockManager: Removing block broadcast_3_piece0
15/08/04 02:44:54 INFO MemoryStore: Block broadcast_3_piece0 of size 2009 dropped from memory (free 611091153849)
15/08/04 02:44:54 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0
15/08/04 02:44:54 INFO BlockManager: Removing block broadcast_3
15/08/04 02:44:54 INFO MemoryStore: Block broadcast_3 of size 3336 dropped from memory (free 611091157185)
15/08/04 02:44:54 INFO BlockManager: Removing broadcast 4
15/08/04 02:44:54 INFO BlockManager: Removing block broadcast_4_piece0
15/08/04 02:44:54 INFO MemoryStore: Block broadcast_4_piece0 of size 2295 dropped from memory (free 611091159480)
15/08/04 02:44:54 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0
15/08/04 02:44:54 INFO BlockManager: Removing block broadcast_4
15/08/04 02:44:54 INFO MemoryStore: Block broadcast_4 of size 4016 dropped from memory (free 611091163496)
想象一下前五行在两分钟的时间范围内对 28 个其他分区重复。
我已经尝试了几件事:
Spark 1.3.0 和 1.4.0 nio 代替 netty flatMap 代替 mapPartitions 只有 30 个而不是 600 个输入分区不过,我从来没有从火花中得到最后 1/30 的数据。有没有人观察到类似的东西?这两个帖子here 和here 似乎描述了类似的问题,但没有解决方案。
更新
永远不会完成的任务永远是reduceKey+writeToTextFile的第一个任务。我还删除了 HashPartitioner,甚至尝试在具有 400 个内核和 6000 个分区的更大集群上。只有 5999 成功完成,最后一个永远运行。
UI 显示所有任务,例如 随机读取大小/记录:20.0 MB / 1954832 但首先它显示(目前) 随机读取大小/记录:150.1 MB / 711836
数字仍在增长......
【问题讨论】:
【参考方案1】:可能是您的密钥非常倾斜。根据它们的分布方式(或者如果您有一个空键或默认键),大量数据可能会发送到单个执行程序,并且与在本地机器上运行没有什么不同(加上分布式平台的开销)。它甚至可能导致该机器交换到磁盘,变得非常慢。
尝试使用aggregateByKey
而不是reduceByKey
,因为它会尝试将部分总和分配给执行程序,而不是将所有(可能很大的)键值对集洗牌到单个执行程序。并且可能避免将输出分区的数量固定为 30,以防万一。
编辑:“它只是没有完成”很难检测到问题。您可以做的一件事是引入超时:
val result = Await.result(future
// Your normal computation
, timeout)
这样,无论什么任务耗时过长,您都可以检测到它并当场收集一些指标。
【讨论】:
等等,这一切都在这台 32 核机器上,我可以说磁盘和内存不是问题。是的,很可能有一组键(比如几百个)构成最多的元组。但我希望 HashPartitioner 平均分配它们。绝对没有 one' 非常频繁的键。无论如何,我会尝试您的 aggregateByKey 建议! 试一试,但你有一个奇怪的问题。它甚至可能是导致无限循环的特定数据条件?我用另一个选项编辑了我的答案。祝你好运! 尝试了未来的包装-不是很有启发性-但感谢您的帮助!!!还尝试了aggregateByKey ...更新了原始问题。 嗯。您至少应该能够获得违规输入的样本并独立尝试?生成对的代码必须陷入某种无限循环? 但是映射完成了——它是reduce和write永远不会完成。我可以看到 1585510308 条记录(101.7GB)从 map 中随机写入和 1573553370(101.6GB)从 reduce 中随机读取。我不明白怎么可能隐藏大量数据。以上是关于spark saveAsTextFile 最后一个分区(几乎?)永远不会完成的主要内容,如果未能解决你的问题,请参考以下文章
spark中saveAsTextFile如何最终生成一个文件
Spark 'saveAsTextFile' 到 S3:无法控制带有 'coalesce' 的文件数量
Spark&Scala:saveAsTextFile()异常