如何使用 Spark Structured Streaming 打印 Json 编码的消息

Posted

技术标签:

【中文标题】如何使用 Spark Structured Streaming 打印 Json 编码的消息【英文标题】:How to print Json encoded messages using Spark Structured Streaming 【发布时间】:2017-05-13 21:32:36 【问题描述】:

我有一个DataSet[Row],其中每一行都是 JSON 字符串。我只想打印 JSON 流或计算每批的 JSON 流。

这是我目前的代码

val ds = sparkSession.readStream()
               .format("kafka")
                .option("kafka.bootstrap.servers",bootstrapServers"))
               .option("subscribe", topicName)
               .option("checkpointLocation", hdfsCheckPointDir)
               .load();

val ds1 = ds.select(from_json(col("value").cast("string"), schema) as 'payload)
val ds2 = ds1.select($"payload.info")
val query = ds2.writeStream.outputMode("append").queryName("table").format("memory").start()
query.awaitTermination()
select * from table; --  don't see anything and there are no errors. However when I run my Kafka consumer separately (independent ofSpark I can see the data)

我的问题真的是我需要做什么才能使用结构化流打印从 Kafka 接收的数据? Kafka 中的消息是 JSON 编码的字符串,因此我将 JSON 编码的字符串转换为某种结构,并最终转换为数据集。我正在使用 Spark 2.1.0

【问题讨论】:

您是否也在 Spark 用户邮件列表(与 TD)上讨论它?试图找出这两个用例的不同之处。 嗨!是的,但我们还没有结束转换。我只是想使用结构化流打印数据,但我很难做到:( 我也试过val query = ds.writeStream.outputMode("append").format("console").start(),但也没有用 【参考方案1】:
val ds1 = ds.select(from_json(col("value").cast("string"), schema) as payload).select($"payload.*")

这将在控制台上打印您的数据。

ds1.writeStream.format("console").option("truncate", "false").start().awaitTermination()

在这种情况下,请始终使用awaitTermination()thread.Sleep(time in seconds) 之类的内容。

【讨论】:

以上是关于如何使用 Spark Structured Streaming 打印 Json 编码的消息的主要内容,如果未能解决你的问题,请参考以下文章

Spark Structured Streaming框架之进程管理

混合 Spark Structured Streaming API 和 DStream 写入 Kafka

如何使用Spark Structured Streaming连续监视目录

如何使用 Python 在 Spark Structured Streaming 中查看特定指标

如何使用 Spark Structured Streaming 打印 Json 编码的消息

如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源