Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题相关的知识,希望对你有一定的参考价值。

我正在尝试准备Spark流媒体应用程序(Spark 2.1,Kafka 0.10)

我需要从Kafka主题“输入”中读取数据,找到正确的数据并将结果写入主题“输出”

我可以从Kafka基于KafkaUtils.createDirectStream方法读取数据。

我将RDD转换为json并准备过滤器:

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>

  val PeopleDf=spark.read.schema(schema1).json(rdd)
  import spark.implicits._
  PeopleDf.show()
  val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
  PeopleDfFilter.show()
}

我可以从Kafka加载数据并“按原样”写入Kafka使用KafkaProducer:

    messages.foreachRDD( rdd => {
      rdd.foreachPartition( partition => {
        val kafkaTopic = "output"
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        val producer = new KafkaProducer[String, String](props)
        partition.foreach{ record: ConsumerRecord[String, String] => {
        System.out.print("########################" + record.value())
        val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value())
        producer.send(messageResult)
        }}
        producer.close()
      })

    })

但我无法整合这两个动作>在json中找到适当的值并将结果写入Kafka:用JSON格式编写PeopleDfFilter来“输出”Kafka主题。

我在Kafka中有很多输入消息,这就是我想使用foreachPartition创建Kafka生成器的原因。

非常感谢您的任何建议。

答案

这个过程非常简单,为什么不一直使用结构化流媒体呢?

import org.apache.spark.sql.functions.from_json

spark
  // Read the data
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", inservers) 
  .option("subscribe", intopic)
  .load()
  // Transform / filter
  .select(from_json($"value".cast("string"), schema).alias("value"))
  .filter(...)  // Add the condition
  .select(to_json($"value").alias("value")
  // Write back
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", outservers)
  .option("subscribe", outtopic)
  .start()
另一答案

尝试使用Structured Streaming。即使您使用Spark 2.1,您也可以实现自己的Kafka ForeachWriter,如下所示:

Kafka sink

import java.util.Properties
import kafkashaded.org.apache.kafka.clients.producer._
import org.apache.spark.sql.ForeachWriter


 class  KafkaSink(topic:String, servers:String) extends ForeachWriter[(String, String)] {
      val kafkaProperties = new Properties()
      kafkaProperties.put("bootstrap.servers", servers)
      kafkaProperties.put("key.serializer",
        classOf[org.apache.kafka.common.serialization.StringSerializer].toString)
      kafkaProperties.put("value.serializer",
        classOf[org.apache.kafka.common.serialization.StringSerializer].toString)
      val results = new scala.collection.mutable.HashMap[String, String]
      var producer: KafkaProducer[String, String] = _

      def open(partitionId: Long,version: Long): Boolean = {
        producer = new KafkaProducer(kafkaProperties)
        true
      }

      def process(value: (String, String)): Unit = {
          producer.send(new ProducerRecord(topic, value._1 + ":" + value._2))
      }

      def close(errorOrNull: Throwable): Unit = {
        producer.close()
      }
   }

用法:

val topic = "<topic2>"
val brokers = "<server:ip>"

val writer = new KafkaSink(topic, brokers)

val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

以上是关于Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题的主要内容,如果未能解决你的问题,请参考以下文章

如何将数据从Kafka传递到Spark Streaming?

如何将数据从 Kafka 传递到 Spark Streaming?

Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题

spark streaming从指定offset处消费Kafka数据

Spark Streaming通过JDBC操作数据库

Spark Streaming + Kafka整合(Kafka broker版本0.8.2.1+)