将值插入行类型的 Pyspark 中的 Hive 表
Posted
技术标签:
【中文标题】将值插入行类型的 Pyspark 中的 Hive 表【英文标题】:Insert values to Hive table in Pyspark of Row type 【发布时间】:2018-10-10 13:00:21 【问题描述】:我刚开始使用 Pyspark。我有一个函数可以计算查询的最大值并插入 Row 类型的最大值以及其他两个值日期和产品名称。
def findCount(query, prod_date, prod_name):
count = query.agg("count": "max").collect()[0] (returns Row(max(count)=Decimal('1.0000000000')))
reopen = hc.sql('insert into details values(row_date, row_name, count)')
print(=count)
这是调用函数的代码:
for row in aggs_list:
prod_date= row.date
prod_name = row.product_name
query = prod_load.filter((col("date") == prod_date) & (col("prod_name") == row_name))
findCount(query, prod_date, prod_name)
这是我尝试过的,但不起作用。有没有更有效的方法来做到这一点?
【问题讨论】:
【参考方案1】:您可能应该远离 Row 类型,这通常意味着您已将所有数据收集到驱动程序。如果是这样,则没有理由使用 spark,因为您没有利用并行计算环境。
您或许可以使用 spark sql 完成以下任务:
max_data = spark.sql("SELECT product_name, max(count), product_date FROM table")
就插入数据库而言(我猜你正在使用来自hc
的 Hive,大多数人会每天运行该作业并将结果写入日期分区表,如下所示:
首先注册临时配置单元表
max_data.registerTempTable("md")
然后覆盖分区
spark.sql("INSERT OVERWRITE new_table PARTITION(dt=product_date) SELECT * FROM md")
【讨论】:
以上是关于将值插入行类型的 Pyspark 中的 Hive 表的主要内容,如果未能解决你的问题,请参考以下文章
Hive:通过 Hue 插入表格产生的文件数量与 pyspark 不同