在 Spark Structured Streaming 中反序列化自引用 protobuf

Posted

技术标签:

【中文标题】在 Spark Structured Streaming 中反序列化自引用 protobuf【英文标题】:Deserialize self-referencing protobuf in Spark Structured Streaming 【发布时间】:2018-07-18 15:22:52 【问题描述】:

我有一个自引用的 protobuf 架构:

message A  
 uint64 timestamp = 1; 
 repeated A fields = 2; 

我正在使用scalaPB 生成相应的 Scala 类,然后按照以下步骤尝试解码从 Kafka 流中使用的消息:

def main(args : Array[String]) 

  val spark = SparkSession.builder.
    master("local")
    .appName("spark session example")
    .getOrCreate()

  import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
      option("kafka.bootstrap.servers","localhost:9092").
      option("subscribe","student").load()

  val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

  val query = ds2.writeStream
      .outputMode("append")
      .format("console")
      .start()

  query.awaitTermination()


This is a related question here on ***。

但是,Spark Structured Streaming 在此行会引发循环引用错误。

val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

我知道这是因为递归引用只能在驱动程序中处理(基本上是RDDDataset 级别)。有没有人想出一个解决方法,例如通过 UDF 启用递归调用?

【问题讨论】:

【参考方案1】:

事实证明,这是由于 Spark 架构的制作方式受到限制。为了处理大量数据,代码连同一部分数据分布在所有从节点上,结果通过主节点进行协调。现在,由于工作节点上没有任何东西可以跟踪堆栈,因此工作节点不允许递归,而只能在驱动程序级别。

简而言之,目前的 spark 构建是不可能进行这种递归解析的。最好的选择是迁移到具有类似库并轻松解析递归 protobuf 文件的 java。

【讨论】:

以上是关于在 Spark Structured Streaming 中反序列化自引用 protobuf的主要内容,如果未能解决你的问题,请参考以下文章

一文读懂 超简单的 structured stream 源码解读

无法使用Spark Structured Streaming在Parquet文件中写入数据

如何使用Spark Structured Streaming连续监视目录

在 Spark Structured Streaming 中处理二进制数据

如何使用 Python 在 Spark Structured Streaming 中查看特定指标

Spark Structured Streaming - groupByKey 按分区单独