如何使用 Spark Structured Streaming 打印 Json 编码的消息
Posted
技术标签:
【中文标题】如何使用 Spark Structured Streaming 打印 Json 编码的消息【英文标题】:How to print Json encoded messages using Spark Structured Streaming 【发布时间】:2017-05-13 21:32:36 【问题描述】:我有一个DataSet[Row]
,其中每一行都是 JSON 字符串。我只想打印 JSON 流或计算每批的 JSON 流。
这是我目前的代码
val ds = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers",bootstrapServers"))
.option("subscribe", topicName)
.option("checkpointLocation", hdfsCheckPointDir)
.load();
val ds1 = ds.select(from_json(col("value").cast("string"), schema) as 'payload)
val ds2 = ds1.select($"payload.info")
val query = ds2.writeStream.outputMode("append").queryName("table").format("memory").start()
query.awaitTermination()
select * from table; -- don't see anything and there are no errors. However when I run my Kafka consumer separately (independent ofSpark I can see the data)
我的问题真的是我需要做什么才能使用结构化流打印从 Kafka 接收的数据? Kafka 中的消息是 JSON 编码的字符串,因此我将 JSON 编码的字符串转换为某种结构,并最终转换为数据集。我正在使用 Spark 2.1.0
【问题讨论】:
您是否也在 Spark 用户邮件列表(与 TD)上讨论它?试图找出这两个用例的不同之处。 嗨!是的,但我们还没有结束转换。我只是想使用结构化流打印数据,但我很难做到:( 我也试过val query = ds.writeStream.outputMode("append").format("console").start()
,但也没有用
【参考方案1】:
val ds1 = ds.select(from_json(col("value").cast("string"), schema) as payload).select($"payload.*")
这将在控制台上打印您的数据。
ds1.writeStream.format("console").option("truncate", "false").start().awaitTermination()
在这种情况下,请始终使用awaitTermination()
或thread.Sleep(time in seconds)
之类的内容。
【讨论】:
以上是关于如何使用 Spark Structured Streaming 打印 Json 编码的消息的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming框架之进程管理
混合 Spark Structured Streaming API 和 DStream 写入 Kafka
如何使用Spark Structured Streaming连续监视目录
如何使用 Python 在 Spark Structured Streaming 中查看特定指标
如何使用 Spark Structured Streaming 打印 Json 编码的消息
如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源