Spark - 如何将约 20TB 的数据从 DataFrame 写入配置单元表或 hdfs?

Posted

技术标签:

【中文标题】Spark - 如何将约 20TB 的数据从 DataFrame 写入配置单元表或 hdfs?【英文标题】:Spark - how to write ~20TB of data from a DataFrame to a hive table or hdfs? 【发布时间】:2018-06-17 04:27:02 【问题描述】:

我正在使用 Spark 处理超过 20TB 的数据量。 我正在尝试使用以下命令将数据写入 Hive 表:

df.registerTempTable('temporary_table')
sqlContext.sql("INSERT OVERWRITE TABLE my_table SELECT * FROM temporary_table")

df 是 Spark DataFrame。不幸的是,它没有我可以划分的任何日期。当我运行上面的代码时,我遇到了错误信息:

py4j.protocol.Py4JJavaError:调用 z:org.apache.spark.sql.execution.EvaluatePython.takeAndServe 时出错。 : org.apache.spark.SparkException: 作业因阶段故障而中止:95561 个任务的序列化结果的总大小 (1024.0 MB) 大于 spark.driver.maxResultSize (1024.0 MB)

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply$mcI$sp(python.scala:126)
at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124)
at org.apache.spark.sql.execution.EvaluatePython$$anonfun$takeAndServe$1.apply(python.scala:124)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
at org.apache.spark.sql.execution.EvaluatePython$.takeAndServe(python.scala:124)
at org.apache.spark.sql.execution.EvaluatePython.takeAndServe(python.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)

错误消息似乎也取决于数据量。数据稍小,遇到如下错误提示

地图输出状态为 395624469 字节,超过了 spark.akka.frameSize(134217728 字节)。

有什么更实用的方法来实现这一点(如果任务可行的话)?我正在使用 Spark 1.6。

以下是提交 spark 作业时的配置变量: spark-submit --deploy-mode cluster --master yarn --executor-memory 20G --num-executors 500 --driver-memory 64g --driver-cores 8 --files 'my_script.py'

顺便说一句,我天真地想象当写操作发生时,Spark 会将数据从执行程序写入 hdfs。但是错误信息似乎暗示执行者和驱动程序之间存在一些数据传输?

我对 Spark 的了解很浅,所以请原谅我的愚蠢问题!

【问题讨论】:

完整的堆栈跟踪可以更好地了解问题 @cwl 你可能根本没有足够的内存来处理你的任务。请也提供您的内存设置。 谢谢。为上下文添加了更多详细信息。 【参考方案1】:

检查以下配置并根据需要进行修改,默认值为 1 g

由 SparkConf 设置:conf.set("spark.driver.maxResultSize", "10g") 由 spark-defaults.conf 设置:spark.driver.maxResultSize 10g

调用 spark-submit 时设置:--conf spark.driver.maxResultSize=10g

https://spark.apache.org/docs/latest/configuration.html

【讨论】:

这个答案没有解决主要问题。问题是(从原文复制):“顺便说一句,我天真地想象当写操作发生时,Spark 会将数据从执行程序写入 hdfs。但错误消息似乎暗示执行程序之间存在一些数据传输和司机?”

以上是关于Spark - 如何将约 20TB 的数据从 DataFrame 写入配置单元表或 hdfs?的主要内容,如果未能解决你的问题,请参考以下文章

怎样利用Spark Streaming和Hadoop实现近实时的会话连接

使用 Apache Spark 提供实时 Web 服务查询

在 Spark 中高效地连接一个大表(1TB)和另一个小表(250GB)

从 S3 将嵌套文本文件读入 spark 时出现内存错误

如何在 Spark 中对嵌套的 Dataframe 进行平面映射

Spark SQL在100TB上的自适应执行实践(转载)