在 Spark Structured Streaming 中处理二进制数据

Posted

技术标签:

【中文标题】在 Spark Structured Streaming 中处理二进制数据【英文标题】:Process binary data in Spark Structured Streaming 【发布时间】:2017-02-24 07:30:21 【问题描述】:

我正在使用 Kafka 和 Spark 结构化流。我收到以下格式的 kafka 消息。

"deviceId":"001","sNo":1,"data":"aaaaa"
"deviceId":"002","sNo":1,"data":"bbbbb"
"deviceId":"001","sNo":2,"data":"ccccc"
"deviceId":"002","sNo":2,"data":"ddddd"

我正在像下面这样阅读它。

Dataset<String> data = spark
      .readStream()
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option(subscribeType, topics)
      .load()
      .selectExpr("CAST(value AS STRING)")
      .as(Encoders.STRING());
Dataset<DeviceData> ds = data.as(ExpressionEncoder.javaBean(DeviceData.class)).orderBy("deviceId","sNo"); 
ds.foreach(event -> 
      processData(event.getDeviceId(),event.getSNo(),event.getData().getBytes())
);

private void processData(String deviceId,int SNo, byte[] data) 

  //How to check previous processed Dataset???
 

在我的 json 消息中,“数据”是字节 [] 的字符串形式。我有一个要求,我需要按“sNo”的顺序处理给定“deviceId”的二进制“数据”。所以对于“deviceId”=“001”,我必须处理“sNo”=1 的二进制数据,然后是“sNo”=2 等等。如何检查结构化流中先前处理的数据集的状态?

【问题讨论】:

到目前为止你尝试了什么? 我已经更新了我的代码。请检查。我正在做 orderBy 然后 forEach 来处理数据。我被困在 processData 方法如何处理流接收的 Dataset 中的当前和以前的数据。 【参考方案1】:

如果您正在寻找像 DStream.mapWithState 这样的状态管理,那么结构化流中尚不支持它。工作正在进行中。请检查 https://issues.apache.org/jira/browse/SPARK-19067.

【讨论】:

以上是关于在 Spark Structured Streaming 中处理二进制数据的主要内容,如果未能解决你的问题,请参考以下文章

一文读懂 超简单的 structured stream 源码解读

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录

在 Spark Structured Streaming 中处理二进制数据

如何使用 Python 在 Spark Structured Streaming 中查看特定指标

Spark Structured Streaming - groupByKey 按分区单独