从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题

Posted

技术标签:

【中文标题】从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题【英文标题】:Read from Kafka topic process the data and write back to Kafka topic using scala and spark 【发布时间】:2021-06-11 16:29:43 【问题描述】:

嗨,我正在阅读一个 kafka 主题,我想处理从 kafka 接收到的数据,例如标记化、过滤掉不必要的数据、删除停用词,最后我想写回另一个 Kafka 主题

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

val df = readStream.selectExpr("CAST(value AS STRING)" )
df.show(false)
val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))

// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result

// write back to kafka
val writeStream = cleanedDataframe
      .writeStream
      .outputMode("append")
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "writing.val")
      .start()
    writeStream.awaitTermination()

然后我收到以下错误

线程“主”org.apache.spark.sql.AnalysisException 中的异常: 必须使用流式源执行查询 writeStream.start();;

然后我编辑我的代码如下从kafka读取并写入控制台

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
val query = df.writeStream
      .outputMode("append")
      .format("console")
      .start().awaitTermination();

// then perform the data processing part as mentioned in the first half

使用第二种方法,控制台中不断显示数据,但从未运行过数据处理部分。我能否知道如何从 kafka 主题中读取数据,然后对接收到的数据执行一些操作(标记化、删除停用词)并最终写回新的 kafka 主题?

编辑

堆栈跟踪在错误期间指向上述代码中的 df.show(false)

【问题讨论】:

【参考方案1】:

在您当前的实现中存在两个常见问题:

    在流​​式上下文中应用 show awaitTermination 之后的代码不会被执行

到 1。

show 方法是对数据帧的操作(与转换相反)。当您处理流式数据帧时,这将导致错误,因为流式查询需要以 start 开始(正如 Excpetion 文本告诉您的那样)。

到 2。

awaitTermination方法是阻塞方法,意味着后续代码不会在每个微批处理中执行。

整体解决方案

如果您想读取和写入 Kafka 并在两者之间想通过在控制台中显示数据来了解正在处理的数据,您可以执行以下操作:

// read from kafka
val readStream = existingSparkSession
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("subscribe", "my.raw") // Always read from offset 0, for dev/testing purpose
      .load()

// write to console
val df = readStream.selectExpr("CAST(value AS STRING)" )
df.writeStream
      .outputMode("append")
      .format("console")
      .start()

val df_json = df.select(from_json(col("value"), mySchema.defineSchema()).alias("parsed_value"))
val df_text = df_json.withColumn("text", col("parsed_value.payload.Text"))

// perform some data processing actions such as tokenization etc and return cleanedDataframe as the final result

// write back to kafka
// the columns `key` and `value` of the DataFrame `cleanedDataframe` will be used for producing the message into the Kafka topic.
val writeStreamKafka = cleanedDataframe
      .writeStream
      .outputMode("append")
      .format("kafka")
      .option("kafka.bootstrap.servers", hostAddress)
      .option("topic", "writing.val")
      .start()

existingSparkSession.awaitAnyTermination()

请注意代码末尾的existingSparkSession.awaitAnyTermination(),不要在start 之后直接使用awaitTermination。另外,请记住,DataFrame cleanedDataframekeyvalue 列将用于将消息生成到 Kafka 主题中。但是,key 列不是必需的,另请参阅here

此外,如果您使用检查点(推荐),那么您需要设置两个不同的位置:一个用于控制台流,另一个用于 kafka 输出流。请务必记住,这些流式查询是独立运行的。

【讨论】:

以上是关于从 Kafka 主题读取数据处理数据并使用 scala 和 spark 写回 Kafka 主题的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题

如何在 Spring Boot 中从 Mongodb 读取集合数据并定期发布到 kafka 主题中

使用 Spark Structured Streaming 从多个 Kafka 主题读取并写入不同接收器的最佳方式是啥?

确保已使用 REST 代理从 Kafka 主题读取所有消息

如何从 kafka 中的两个生产者那里摄取数据并使用 Spark 结构化流加入?

从Kafka主题中读取结构化流