将值插入行类型的 Pyspark 中的 Hive 表

Posted

技术标签:

【中文标题】将值插入行类型的 Pyspark 中的 Hive 表【英文标题】:Insert values to Hive table in Pyspark of Row type 【发布时间】:2018-10-10 13:00:21 【问题描述】:

我刚开始使用 Pyspark。我有一个函数可以计算查询的最大值并插入 Row 类型的最大值以及其他两个值日期和产品名称。

def findCount(query, prod_date, prod_name):
        count = query.agg("count": "max").collect()[0] (returns Row(max(count)=Decimal('1.0000000000')))
        reopen = hc.sql('insert into details values(row_date, row_name, count)')
        print(=count)

这是调用函数的代码:

for row in aggs_list:
        prod_date= row.date
        prod_name = row.product_name
        query = prod_load.filter((col("date") == prod_date) & (col("prod_name") == row_name))
        findCount(query, prod_date, prod_name)

这是我尝试过的,但不起作用。有没有更有效的方法来做到这一点?

【问题讨论】:

【参考方案1】:

您可能应该远离 Row 类型,这通常意味着您已将所有数据收集到驱动程序。如果是这样,则没有理由使用 spark,因为您没有利用并行计算环境。

您或许可以使用 spark sql 完成以下任务:

max_data = spark.sql("SELECT product_name, max(count), product_date FROM table")

就插入数据库而言(我猜你正在使用来自hc 的 Hive,大多数人会每天运行该作业并将结果写入日期分区表,如下所示:

首先注册临时配置单元表 max_data.registerTempTable("md")

然后覆盖分区 spark.sql("INSERT OVERWRITE new_table PARTITION(dt=product_date) SELECT * FROM md")

【讨论】:

以上是关于将值插入行类型的 Pyspark 中的 Hive 表的主要内容,如果未能解决你的问题,请参考以下文章

Hive:通过 Hue 插入表格产生的文件数量与 pyspark 不同

将行列表保存到 pyspark 中的 Hive 表

在 Hive-S3 表的情况下,pyspark 命令行中的错误

在 Pyspark/Hive 中处理不断变化的数据类型

如何使用 pyspark 并行插入 Hive

如何在 Spark 2.4.0 中使用 PySpark API 将表插入 Hive