结构化流 - Foreach接收器

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了结构化流 - Foreach接收器相关的知识,希望对你有一定的参考价值。

我基本上是从Kafka源代码中读取,并将每条消息转发到我的foreach处理器(感谢Jacek的简单示例页面)。

如果这实际上有效,我将在process方法中实际执行一些业务逻辑,但是,这不起作用。我相信println因为在执行程序上运行而无法工作,并且无法将这些日志恢复到驱动程序。但是,这个insert into临时表至少应该工作,并告诉我消息实际上被消耗并处理到接收器。

我在这里错过了什么?

真的在寻找第二组眼睛来检查我的努力:

 val stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker")) 
      .option("subscribe", src_topic) 
      .load()

    val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]

    val df = stream.selectExpr("cast (value as string) as json")

    val writer = new ForeachWriter[Row] {
      val scon = new SConConnection
      override def open(partitionId: Long, version: Long) = {
        true
      }
      override def process(value: Row) = {
        println("++++++++++++++++++++++++++++++++++++" + value.get(0))
        scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
      }
      override def close(errorOrNull: Throwable) = {
        scon.closeConnection
      }
    }


    val yy = df.writeStream
      .queryName("ForEachQuery")
      .foreach(writer)
      .outputMode("append")
      .start()

    yy.awaitTermination()
答案

感谢Harald和其他人的评论,我发现了一些事情,这使我实现了正常的处理行为 -

  1. 用本地模式测试代码,纱线不是调试的最大帮助
  2. 由于某种原因,foreach接收器的处理方法不允许调用其他方法。当我将我的业务逻辑直接放在那里时,它就可以了。

希望它能帮助别人。

以上是关于结构化流 - Foreach接收器的主要内容,如果未能解决你的问题,请参考以下文章

Spark结构化流多个WriteStream到同一个接收器

结构化流 - 消费每条消息

带有自定义接收器的 Spark 结构化流中的输入行数

Pyspark 结构化流处理

使用 SSIS Foreach Loop 容器 – Foreach Item Enumerator

将 Spark SQL 批处理源转换为结构化流接收器