如何在 Spark 2.4.0 中使用 PySpark API 将表插入 Hive

Posted

技术标签:

【中文标题】如何在 Spark 2.4.0 中使用 PySpark API 将表插入 Hive【英文标题】:How to insert a table into Hive with PySpark API In Spark 2.4.0 【发布时间】:2020-08-10 09:20:32 【问题描述】:

我需要插入到 Hive 的表中。仅供参考,此表在 Hive 中可用。这是我的代码,

from pyspark.sql import SparkSession as sc, HiveContext as HC
spark = sc.builder.appName('eap').enableHiveSupport().getOrCreate()
sqlContext = HC(spark)
sqlContext.sql("INSERT INTO mydb.my_job_status_table 
SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime 
FROM batches b 
inner join sourcetables st on b.rungroup = st.rungroup 
inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid 
inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid 
WHERE b.rungroup like 'sgb_%'")

在 Spark 中运行代码后,我收到了一个错误,

raise ParseException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.ParseException: u"\nmismatched input '01' expecting <EOF>(line 1, pos 195)\n\n== SQL ==\nINSERT INTO mydb.my_job_status_table...

我做错了什么? SqlContext 和 Spark.sql 有什么区别?

【问题讨论】:

【参考方案1】:

您的问题不是 pyspark 特有的。

不要使用 insert into spark sql。

首先,使用 SELECT 来制作你的数据集:

  dataset = sqlContext.sql(" SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime 
    FROM batches b 
    inner join sourcetables st on b.rungroup = st.rungroup 
    inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid 
    inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid 
    WHERE b.rungroup like 'sgb_%'")

然后使用 insertInto

dataset.insertInto("mydb.my_job_status_table")

pyspark 文档:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe#pyspark.sql.DataFrameWriter.insertInto

Javadoc:https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/DataFrameWriter.html#insertInto-java.lang.String-

【讨论】:

感谢您的回复。我试过。但我有这个错误,'myuserid' 不是 inode=/data/folder/something/ 的所有者。我想你的方法是正确的。我的用户没有访问权限。 是的,您应该有权访问它。你可以使用 hadoop fs -chown 和 hadoop fs -chmod【参考方案2】:

试试这个

spark = sc.builder.appName('eap').enableHiveSupport().getOrCreate()

spark.sql("INSERT INTO mydb.my_job_status_table " + 
"SELECT st.tablename, fs.finalhivetable, ss.lastrunid, fs.status, b.id, b.rungroup, ss.starttime, fs.endtime " + 
"FROM batches b " + 
"inner join sourcetables st on b.rungroup = st.rungroup " +
"inner join stagingstatus ss on b.id = ss.batchid and st.id = ss.tableid " + 
"inner join finalstatus fs on b.id = fs.batchid and st.id = fs.tableid " + 
"WHERE b.rungroup like 'sgb_%'")

【讨论】:

以上是关于如何在 Spark 2.4.0 中使用 PySpark API 将表插入 Hive的主要内容,如果未能解决你的问题,请参考以下文章

使用 Jupyter Notebook 为 PySpark 内核设置 spark.app.name

YMatrix + PLPython替代Spark实现车联网算法

我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?

Spark 2.4.0 Avro Java - 无法解析方法from_avro

CDH-6.3.2内置spark-2.4.0的BUG

Spark 2.4 standalone 部署