如何从 Spark Structured Streaming 刷新 Hive/Impala 表?
Posted
技术标签:
【中文标题】如何从 Spark Structured Streaming 刷新 Hive/Impala 表?【英文标题】:How can I refresh a Hive/Impala table from Spark Structured Streaming? 【发布时间】:2018-05-18 13:59:12 【问题描述】:目前我的 Spark Structured Streaming 是这样的(仅显示 Sink 部分):
//Output aggregation query to Parquet in append mode
aggregationQuery.writeStream
.format("parquet")
.trigger(Trigger.ProcessingTime("15 seconds"))
.partitionBy("date", "hour")
.option("path", "hdfs://<myip>:8020/user/myuser/spark/proyecto3")
.option("checkpointLocation", "hdfs://<myip>:8020/user/myuser/spark/checkpointfolder3")
.outputMode("append")
.start()
以上代码在path定义的目录下生成.parquet
文件。
我在外部定义了一个从该路径读取的 Impala 表,但我需要在每次追加 parquet
文件后更新或刷新该表。
如何做到这一点?
【问题讨论】:
spark.sql("REFRESH TABLE some_name")
我应该在哪里尝试那行代码?在生成每个新的 parquet 文件后它会起作用吗?
@messenjah00,你成功了吗?愿意分享吗?
抱歉,我做不到
【参考方案1】:
您需要在文件接收器之后更新表的分区。
import spark.sql
val query1 = "ALTER TABLE proyecto3 ADD IF NOT EXISTS PARTITION (date='20200803') LOCATION '/your/location/proyecto3/date=20200803'"
sql(s"$query1")
import spark.sql
val query2 = "ALTER TABLE proyecto3 ADD IF NOT EXISTS PARTITION (hour='104700') LOCATION '/your/location/proyecto3/date=20200803/hour=104700'"
sql(s"$query2")
【讨论】:
以上是关于如何从 Spark Structured Streaming 刷新 Hive/Impala 表?的主要内容,如果未能解决你的问题,请参考以下文章
打通实时流处理log4j-flume-kafka-structured-streaming
Spark Structured Streaming - 此查询不支持从检查点位置恢复
Spark Structured Streaming - 新批次上的空字典
如何使用Spark Structured Streaming连续监视目录