如何使用 pyspark 并行插入 Hive

Posted

技术标签:

【中文标题】如何使用 pyspark 并行插入 Hive【英文标题】:How to parallel insert into Hive using pyspark 【发布时间】:2020-05-03 11:08:41 【问题描述】:

我有一份工作分配给工人,每个工人输出一个需要写入配置单元的数据帧,我无法弄清楚如何在不初始化另一个 sparkcontext 的情况下从工人访问配置单元,所以我尝试收集他们的输出并插入像下面这样一次性完成

result = df.rdd.map(lambda rdd: predict_item_by_model(rdd, columns)).collect()
df_list = sc.parallelize(result).map(lambda df: hiveContext.createDataFrame(df)).collect() #throws error
mergedDF = reduce(DataFrame.union, df_list) 
mergedDF.write.mode('overwrite').partitionBy("item_id").saveAsTable("items")

但是现在它抛出了这个错误

_pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

是否可以直接从工人那里访问蜂巢?如果没有,我怎样才能收集数据并插入一次?

【问题讨论】:

【参考方案1】:
.map(lambda df: hiveContext.createDataFrame(df))

这种方法在 Spark 中根本不可能。根本不是它的工作原理。

任何 Spark 驱动程序应用程序的第一步是创建一个包含 Hive 上下文的 SparkContext(如果需要)。仅驱动方面。正如消息所述。

看看这里https://www.waitingforcode.com/apache-spark/serialization-issues-part-1/read 让你自己解决这个序列化问题。

【讨论】:

以上是关于如何使用 pyspark 并行插入 Hive的主要内容,如果未能解决你的问题,请参考以下文章

如何使 pyspark 作业在多个节点上正确并行化并避免内存问题?

如何获取 HIVE/PySpark 表中每一列的唯一值?

如何在 pyspark 中使用“不存在”的 SQL 条件?

Openacc:如何使插入排序更加并行[关闭]

如何使用 PySpark 检查 Hive 表是不是存在

通过 JDBC 进行并行化 - Pyspark - 并行化如何使用 JDBC 工作?