如何从 Spark Structured Streaming 刷新 Hive/Impala 表?

Posted

技术标签:

【中文标题】如何从 Spark Structured Streaming 刷新 Hive/Impala 表?【英文标题】:How can I refresh a Hive/Impala table from Spark Structured Streaming? 【发布时间】:2018-05-18 13:59:12 【问题描述】:

目前我的 Spark Structured Streaming 是这样的(仅显示 Sink 部分):

    //Output aggregation query to Parquet in append mode
    aggregationQuery.writeStream
      .format("parquet")
      .trigger(Trigger.ProcessingTime("15 seconds"))
      .partitionBy("date", "hour")
      .option("path", "hdfs://<myip>:8020/user/myuser/spark/proyecto3")
      .option("checkpointLocation", "hdfs://<myip>:8020/user/myuser/spark/checkpointfolder3")
      .outputMode("append")
      .start()

以上代码在path定义的目录下生成.parquet文件。

我在外部定义了一个从该路径读取的 Impala 表,但我需要在每次追加 parquet 文件后更新或刷新该表。

如何做到这一点?

【问题讨论】:

spark.sql("REFRESH TABLE some_name") 我应该在哪里尝试那行代码?在生成每个新的 parquet 文件后它会起作用吗? @messenjah00,你成功了吗?愿意分享吗? 抱歉,我做不到 【参考方案1】:

您需要在文件接收器之后更新表的分区。

    import spark.sql
    val query1 = "ALTER TABLE proyecto3 ADD IF NOT EXISTS PARTITION (date='20200803') LOCATION '/your/location/proyecto3/date=20200803'"
    sql(s"$query1")

    import spark.sql
    val query2 = "ALTER TABLE proyecto3 ADD IF NOT EXISTS PARTITION (hour='104700') LOCATION '/your/location/proyecto3/date=20200803/hour=104700'"
    sql(s"$query2")

【讨论】:

以上是关于如何从 Spark Structured Streaming 刷新 Hive/Impala 表?的主要内容,如果未能解决你的问题,请参考以下文章

打通实时流处理log4j-flume-kafka-structured-streaming

Spark Structured Streaming - 此查询不支持从检查点位置恢复

Spark Structured Streaming - 新批次上的空字典

如何使用Spark Structured Streaming连续监视目录

如何使用 Python 在 Spark Structured Streaming 中查看特定指标

如何使用 Spark Structured Streaming 打印 Json 编码的消息