将 Spark DataFrame 写入 Hive 表时的内存分配问题

Posted

技术标签:

【中文标题】将 Spark DataFrame 写入 Hive 表时的内存分配问题【英文标题】:Memory allocation issue in writing Spark DataFrame to Hive table 【发布时间】:2017-05-17 23:31:28 【问题描述】:

我想一个Spark数据帧保存到一个蜂巢表(平面)与.saveAsTable()在pySpark,但要运行内存问题象下面这样: P>

org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1:
New Memory allocation 1034931 bytes is smaller than the minimum allocation size of 1048576 bytes.

在第一个数字(1034931)通常不断变化在不同的运行。我认识的第二个数字(1048576)是1024^2,但我根本不知道那是什么在这里的意思。 P>

我已经使用了其他一些我的项目完全相同的技术(大得多DataFrames),并没有问题任职。在这里,我基本上是复制粘贴的过程和配置,但运行的结构中内存的问题!它必须是一些小事我失踪。 P>

在数据帧火花(让我们称之为sdf)具有结构(10〜列和300K〜行,但也可以是更多的,如果这个运行正确地):

+----------+----------+----------+---------------+---------------+
| col_a_str| col_b_num| col_c_num|partition_d_str|partition_e_str|
+----------+----------+----------+---------------+---------------+
|val_a1_str|val_b1_num|val_c1_num|     val_d1_str|     val_e1_str|
|val_a2_str|val_b2_num|val_c2_num|     val_d2_str|     val_e2_str|
|       ...|       ...|       ...|            ...|            ...|
+----------+----------+----------+---------------+---------------+

蜂房表是这样创建的:

sqlContext.sql("""
                    CREATE TABLE IF NOT EXISTS my_hive_table (
                        col_a_str string,
                        col_b_num double,
                        col_c_num double
                    ) 
                    PARTITIONED BY (partition_d_str string,
                                    partition_e_str string)
                    STORED AS PARQUETFILE
               """)

在插入数据到该表中的尝试是使用下面的命令:

sdf.write \
   .mode('append') \
   .partitionBy('partition_d_str', 'partition_e_str') \
   .saveAsTable('my_hive_table')

在火花/蜂巢结构是这样的:

spark_conf = pyspark.SparkConf()
spark_conf.setAppName('my_project')

spark_conf.set('spark.executor.memory', '16g')
spark_conf.set('spark.python.worker.memory', '8g')
spark_conf.set('spark.yarn.executor.memoryOverhead', '15000')
spark_conf.set('spark.dynamicAllocation.maxExecutors', '64')
spark_conf.set('spark.executor.cores', '4')

sc = pyspark.SparkContext(conf=spark_conf)

sqlContext = pyspark.sql.HiveContext(sc)
sqlContext.setConf('hive.exec.dynamic.partition', 'true')
sqlContext.setConf('hive.exec.max.dynamic.partitions', '5000')
sqlContext.setConf('hive.exec.dynamic.partition.mode', 'nonstrict')
sqlContext.setConf('hive.exec.compress.output', 'true')

我试图改变.partitionBy('partition_d_str', 'partition_e_str').partitionBy(['partition_d_str', 'partition_e_str']),增加内存,分割数据帧,以更小的块,重新创建表和数据帧,但似乎没有任何工作。我不能在网上找到任何解决方案,无论是。什么是导致内存错误(我不完全明白的地方它无论从推出),我怎么可以改变我的代码写入到蜂巢表?谢谢。

【问题讨论】:

最小页大小即最小读/拼花的写单元由属性987654334 @缺省定义@到1048576及其可能您正在试图写入数据下该阈值下降。这就是为什么抛出错误可能是什么?这只是我的猜测...... check this out SPAN> 感谢您的链接。通过与parquet.page.size玩,parquet.block.size配置,以及由刚乘我的数据的大小,但没有运气想你的建议。相同的错误:(跨度> 【参考方案1】:

事实证明,我正在使用一个可空字段进行分区,该字段将.saveAsTable() 关闭。当我将 RDD 转换为 Spark DataFrame 时,我提供的架构是这样生成的:

from pyspark.sql.types import *

# Define schema
my_schema = StructType(
                    [StructField('col_a_str', StringType(), False),
                     StructField('col_b_num', DoubleType(), True),
                     StructField('col_c_num', DoubleType(), True),
                     StructField('partition_d_str', StringType(), False),
                     StructField('partition_e_str', StringType(), True)])

# Convert RDD to Spark DataFrame
sdf = sqlContext.createDataFrame(my_rdd, schema=my_schema)

由于partition_e_str 被声明为nullable=TrueStructField 的第三个参数),它在写入 Hive 表时出现问题,因为它被用作分区字段之一。我把它改成:

# Define schema
my_schema = StructType(
                    [StructField('col_a_str', StringType(), False),
                     StructField('col_b_num', DoubleType(), True),
                     StructField('col_c_num', DoubleType(), True),
                     StructField('partition_d_str', StringType(), False),
                     StructField('partition_e_str', StringType(), False)])

一切又好了!

课程:确保您的分区字段不可为空!

【讨论】:

以上是关于将 Spark DataFrame 写入 Hive 表时的内存分配问题的主要内容,如果未能解决你的问题,请参考以下文章

将 Spark Dataframe 写入 HDP2.6 中的 Hive 可访问表

spark利用sparkSQL将数据写入hive两种通用方式实现及比较

spark利用sparkSQL将数据写入hive两种通用方式实现及比较

将 spark DataFrame 写入表

Spark - 如何将约 20TB 的数据从 DataFrame 写入配置单元表或 hdfs?

如何将数据写入 Hive 表?