如何与 partitionBy 或 InsertInto Hive 并行保存 Dataframe/Dataset 的每个分区

Posted

技术标签:

【中文标题】如何与 partitionBy 或 InsertInto Hive 并行保存 Dataframe/Dataset 的每个分区【英文标题】:How to save each partition of a Dataframe/Dataset in parallel with partitionBy or InsertInto Hive 【发布时间】:2016-11-03 13:51:36 【问题描述】:

我目前使用 spark 2.0.1,我尝试使用 insertInto() 将我的数据集保存到“分区表 Hive”中,或者使用 partitionBy("col") 将我的数据集保存在 S3 存储中,同时工作在并发(并行)中。但是使用这 2 种方法,我的数据集的每个分区都是按顺序保存的。这是非常非常缓慢的。 我已经知道我必须一次使用 insertInto() 或 partitionBy() 一个。 我假设 spark.2.0.1 Dataframe 是 Resilient Data Set 。 我当前的代码:

df.write.mode(SaveMode.Append).partitionBy("col").save("s3://bucket/diroutput")

或者

df.write.mode(SaveMode.Append).insertInto("TableHivealreadypartitioned")

所以我用 df.foreachPartition 尝试了一些类似这样的东西:

df.foreachPartitiondatasetpartition => datasetpartition.foreach(row => row.sometransformation)

您会在下面找到提取日志。在第一个示例中,它是 hive 中的“InserInto(tablehivealreadypartitionned)”。 我们可以看到,Spark 的所有“分区”都是一一写入的。 在第二个示例中,直接写入 S3 的是“partitionBy().save()”。 我们还可以看到,所有的“分区”spark 都是一一写入的。 我们处理的数据帧只有一个“分区”,它的大小约为 200MB 未压缩(在内存中)。 Job 可以使用 local[4] 选项来保存 Data 120s 170s。

[INFO] 2016-11-03 00:10:33,255 org.apache.spark.SparkContext logInfo - Created broadcast 2330 from broadcast at TorExitLookup.scala:43
[INFO] 2016-11-03 00:10:35,302 org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:10:35,363 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/db/.hive-staging_hive_2016-11-03_00-10-29_426_1749488585639143697-1/-ext-10000/tsbucket=2016-11-02 09%3A00%3A00/part-00001
[INFO] 2016-11-03 00:10:35,380 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030010_0948_m_000001_0
[INFO] 2016-11-03 00:10:35,380 org.apache.spark.executor.Executor logInfo - Finished task 1.0 in stage 948.0 (TID 1385). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:10:35,381 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 1.0 in stage 948.0 (TID 1385) in 5718 ms on localhost (1/2)
[INFO] 2016-11-03 00:11:23,033 org.apache.spark.storage.BlockManagerInfo logInfo - Removed broadcast_2330_piece0 on 10.0.193.149:34016 in memory (size: 6.9 KB, free: 414.4 MB)
[INFO] 2016-11-03 00:11:58,194 org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:12:00,210 org.apache.spark.storage.BlockManagerInfo logInfo - Removed broadcast_2329_piece0 on 10.0.193.149:34016 in memory (size: 6.9 KB, free: 414.4 MB)
[INFO] 2016-11-03 00:12:05,295 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/db/.hive-staging_hive_2016-11-03_00-10-29_426_1749488585639143697-1/-ext-10000/tsbucket=2016-11-02 09%3A00%3A00/part-00000
[INFO] 2016-11-03 00:12:05,831 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030010_0948_m_000000_0
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.executor.Executor logInfo - Finished task 0.0 in stage 948.0 (TID 1384). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 0.0 in stage 948.0 (TID 1384) in 96173 ms on localhost (2/2)
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.scheduler.DAGScheduler logInfo - ResultStage 948 (insertInto at ImportHive.scala:24) finished in 96,173 s
[INFO] 2016-11-03 00:12:05,835 org.apache.spark.scheduler.TaskSchedulerImpl logInfo - Removed TaskSet 948.0, whose tasks have all completed, from pool
[INFO] 2016-11-03 00:12:05,836 org.apache.spark.scheduler.DAGScheduler logInfo - Job 948 finished: insertInto at ImportHive.scala:24, took 96,188035 s


[INFO] 2016-11-03 00:12:17,171 org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:12:17,296 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/rescue/tsbucket=2016-11-02 09%3A00%3A00/part-r-00001-f433a41e-1b59-49af-b232-cf701e0c6df9.zlib.orc
[INFO] 2016-11-03 00:12:17,388 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030012_0949_m_000001_0
[INFO] 2016-11-03 00:12:17,388 org.apache.spark.executor.Executor logInfo - Finished task 1.0 in stage 949.0 (TID 1387). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:12:17,389 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 1.0 in stage 949.0 (TID 1387) in 6892 ms on localhost (1/2)
[INFO] 2016-11-03 00:12:57,467 org.apache.spark.storage.BlockManagerInfo logInfo - Removed broadcast_2333_piece0 on 10.0.193.149:34016 in memory (size: 6.9 KB, free: 414.4 MB)
[INFO] 2016-11-03 00:13:36,195 org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer logInfo - Sorting complete. Writing out partition files one at a time.
[INFO] 2016-11-03 00:13:43,689 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/rescue/tsbucket=2016-11-02 09%3A00%3A00/part-r-00000-f433a41e-1b59-49af-b232-cf701e0c6df9.zlib.orc
[INFO] 2016-11-03 00:13:44,258 org.apache.spark.mapred.SparkHadoopMapRedUtil logInfo - No need to commit output of task because needsTaskCommit=false: attempt_201611030012_0949_m_000000_0
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.executor.Executor logInfo - Finished task 0.0 in stage 949.0 (TID 1386). 2652 bytes result sent to driver
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.TaskSetManager logInfo - Finished task 0.0 in stage 949.0 (TID 1386) in 93762 ms on localhost (2/2)
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.DAGScheduler logInfo - ResultStage 949 (save at ImportHive.scala:30) finished in 93,762 s
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.TaskSchedulerImpl logInfo - Removed TaskSet 949.0, whose tasks have all completed, from pool
[INFO] 2016-11-03 00:13:44,259 org.apache.spark.scheduler.DAGScheduler logInfo - Job 949 finished: save at ImportHive.scala:30, took 93,772483 s
[INFO] 2016-11-03 00:13:44,260 org.apache.hadoop.mapreduce.lib.output.DirectFileOutputCommitter cleanupJob - Nothing to clean up since no temporary files were written.
[INFO] 2016-11-03 00:13:44,260 com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream close - close closed:false s3://BUCKETS3/rescue/_SUCCESS
[INFO] 2016-11-03 00:13:44,275 org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer logInfo - Job job_201611030012_0000 committed.

不幸的是,我仍然找不到并行写入/保存数据集的每个 spark 分区的方法。

有人已经这样做了吗?

你能告诉我如何进行吗?

这是一个错误的方向吗? 感谢您的帮助

【问题讨论】:

【参考方案1】:

我们处理的数据帧只有一个“分区”,它的大小约为 200MB 未压缩(在内存中)

这是你的问题.. spark 根据分区在执行者之间分配工作。

为了并行工作,您的 df 需要有多个分区。 您可以使用:

df.repartition(number)

还要确保您正在使用:

hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2")

写入 s3 时。

【讨论】:

以上是关于如何与 partitionBy 或 InsertInto Hive 并行保存 Dataframe/Dataset 的每个分区的主要内容,如果未能解决你的问题,请参考以下文章

在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?

如何在窗口 scala/spark 中使用 partitionBy 函数

如何在 partitionBy 输出之前平衡 Spark DataFrame 数据

在 pyspark 数据框中使用 write.partitionBy 时如何删除重复项?

如何与“row_number() over (partition by [Col] order by [Col])”相反

如何使用“OVER(PARTITION BY ...)”来区分[关闭]