如何使用 pyspark 并行插入 Hive
Posted
技术标签:
【中文标题】如何使用 pyspark 并行插入 Hive【英文标题】:How to parallel insert into Hive using pyspark 【发布时间】:2020-05-03 11:08:41 【问题描述】:我有一份工作分配给工人,每个工人输出一个需要写入配置单元的数据帧,我无法弄清楚如何在不初始化另一个 sparkcontext 的情况下从工人访问配置单元,所以我尝试收集他们的输出并插入像下面这样一次性完成
result = df.rdd.map(lambda rdd: predict_item_by_model(rdd, columns)).collect()
df_list = sc.parallelize(result).map(lambda df: hiveContext.createDataFrame(df)).collect() #throws error
mergedDF = reduce(DataFrame.union, df_list)
mergedDF.write.mode('overwrite').partitionBy("item_id").saveAsTable("items")
但是现在它抛出了这个错误
_pickle.PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
是否可以直接从工人那里访问蜂巢?如果没有,我怎样才能收集数据并插入一次?
【问题讨论】:
【参考方案1】:.map(lambda df: hiveContext.createDataFrame(df))
这种方法在 Spark 中根本不可能。根本不是它的工作原理。
任何 Spark 驱动程序应用程序的第一步是创建一个包含 Hive 上下文的 SparkContext(如果需要)。仅驱动方面。正如消息所述。
看看这里https://www.waitingforcode.com/apache-spark/serialization-issues-part-1/read 让你自己解决这个序列化问题。
【讨论】:
以上是关于如何使用 pyspark 并行插入 Hive的主要内容,如果未能解决你的问题,请参考以下文章