结构化流 - 消费每条消息

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了结构化流 - 消费每条消息相关的知识,希望对你有一定的参考价值。

什么是“推荐”的方式来处理每个消息,因为它来自结构化流媒体管道(我在Spark 2.1.1上,源是Kafka 0.10.2.1)?

到目前为止,我正在查看dataframe.mapPartitions(因为我需要连接到HBase,其客户端连接类不可serizalable,因此mapPartitions)。

想法?

答案

你应该能够使用foreach输出接收器:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinkshttps://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

即使客户端不可序列化,您也不必在ForeachWriter构造函数中打开它。只需将其保留为None / null,并在open方法中初始化,该方法在序列化后调用,但每个任务只调用一次。

在伪代码中:

class HBaseForeachWriter extends ForeachWriter[MyType] {
  var client: Option[HBaseClient] = None
  def open(partitionId: Long, version: Long): Boolean = {
    client = Some(... open a client ...)
  }
  def process(record: MyType) = {
    client match {
      case None => throw Exception("shouldn't happen")
      case Some(cl) => {
        ... use cl to write record ...
      }
    }
  }
  def close(errorOrNull: Throwable): Unit = {
    client.foreach(cl => cl.close())
  }
}

以上是关于结构化流 - 消费每条消息的主要内容,如果未能解决你的问题,请参考以下文章

结构化流 - Foreach接收器

Kafka - 如何在使用高级消费者的每条消息后提交偏移量?

嵌入式RTOS生产者和消费者有多种类型的消息[关闭]

每条消息在分区中的位置信息由一个叫offset的数据来表征。

每条消息在分区中的位置信息由一个叫offset的数据来表征第二步

Kafka 实现动态goupId 实现广播消息