Spark:连接 2 个大型 DF 时,大小超过 Integer.MAX_VALUE
Posted
技术标签:
【中文标题】Spark:连接 2 个大型 DF 时,大小超过 Integer.MAX_VALUE【英文标题】:Spark : Size exceeds Integer.MAX_VALUE When Joining 2 Large DFs 【发布时间】:2016-11-25 11:26:48 【问题描述】:伙计们,
我在尝试在 spark 中加入 2 个大型数据帧(每个 100GB +)时遇到了这个问题,每行只有一个键标识符。
我在 EMR 上使用 Spark 1.6,这就是我正在做的事情:
val df1 = sqlContext.read.json("hdfs:///df1/")
val df2 = sqlContext.read.json("hdfs:///df2/")
// clean up and filter steps later
df1.registerTempTable("df1")
df2.registerTempTable("df2")
val df3 = sql("select df1.*, df2.col1 from df1 left join df2 on df1.col3 = df2.col4")
df3.write.json("hdfs:///df3/")
这基本上是我正在做的事情的要点,以及最终加入 df1 和 df2 之间的其他清理和过滤步骤。
我看到的错误是:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)
at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
配置和参考: 我使用 13 个节点,每个集群 60GB,执行程序和驱动程序内存相应地设置了开销。我尝试过调整的事情:
spark.sql.broadcastTimeout
spark.sql.shuffle.partitions
我也尝试过使用更大的集群,但没有帮助。 This link 表示如果 Shuffle 分区大小超过 2GB,则会引发此错误。但是我尝试将分区数量增加到一个很高的值,仍然没有运气。
我怀疑这可能与延迟加载有关。当我在 DF 上执行 10 次操作时,它们仅在最后一步执行。我尝试在 DF 的各种存储级别上添加.persist()
,但仍然没有成功。我也尝试过删除临时表,清空所有早期的 DF 进行清理。
但是,如果我将代码分解为 2 个部分,则代码可以工作 - 将最终的临时数据(2 个数据帧)写入磁盘,然后退出。重启只加入两个DF。
我早些时候收到此错误:
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.sql.execution.joins.BroadcastHashOuterJoin.doExecute(BroadcastHashOuterJoin.scala:113)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at org.apache.spark.sql.DataFrame.toJSON(DataFrame.scala:1724)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
但是当我调整spark.sql.broadcastTimeout
时,我开始收到第一个错误。
在这种情况下将不胜感激。如果需要,我可以添加更多信息。
【问题讨论】:
你应该尝试调整的属性是spark.sql.shuffle.partitions
(注意s
atfer 分区)
没错。更正了错字。谢谢
可能是由于数据倾斜。尝试找出您是否有一个比所有其他连接键都大得多的连接键。
@LiMuBei 每一行都是不同的,并且在两个 DF 中都有唯一的 ID。加入也会导致 1 行输出(1 对 1)。
嗯...您是否尝试过在加入之前手动重新分区数据帧?
【参考方案1】:
在 spark 中你不能有大于 2GB 的 shuffle 块。这是因为, Spark 将 shuffle 块存储为 ByteBuffer。分配方式如下:
ByteBuffer.allocate(int capacity)
因为,ByteBuffer 受 Integer.MAX_SIZE (2GB) 的限制,所以 shuffle 块也是如此!!解决方案是通过在 SparkSQL 中使用 spark.sql.shuffle.partitions
或在 rdd 中使用 rdd.partition() or rdd.colease()
来增加分区数量,以使每个分区大小
您提到您尝试增加分区数,但仍然失败。您能否检查分区大小是否> 2GB。只需确保指定的分区数量足以使每个块大小
【讨论】:
以上是关于Spark:连接 2 个大型 DF 时,大小超过 Integer.MAX_VALUE的主要内容,如果未能解决你的问题,请参考以下文章
超过 `spark.driver.maxResultSize` 而没有给驱动程序带来任何数据