在 Spark Structured Streaming 中反序列化自引用 protobuf
Posted
技术标签:
【中文标题】在 Spark Structured Streaming 中反序列化自引用 protobuf【英文标题】:Deserialize self-referencing protobuf in Spark Structured Streaming 【发布时间】:2018-07-18 15:22:52 【问题描述】:我有一个自引用的 protobuf 架构:
message A
uint64 timestamp = 1;
repeated A fields = 2;
我正在使用scalaPB
生成相应的 Scala 类,然后按照以下步骤尝试解码从 Kafka 流中使用的消息:
def main(args : Array[String])
val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
This is a related question here on ***。
但是,Spark Structured Streaming 在此行会引发循环引用错误。
val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))
我知道这是因为递归引用只能在驱动程序中处理(基本上是RDD
或Dataset
级别)。有没有人想出一个解决方法,例如通过 UDF 启用递归调用?
【问题讨论】:
【参考方案1】:事实证明,这是由于 Spark 架构的制作方式受到限制。为了处理大量数据,代码连同一部分数据分布在所有从节点上,结果通过主节点进行协调。现在,由于工作节点上没有任何东西可以跟踪堆栈,因此工作节点不允许递归,而只能在驱动程序级别。
简而言之,目前的 spark 构建是不可能进行这种递归解析的。最好的选择是迁移到具有类似库并轻松解析递归 protobuf 文件的 java。
【讨论】:
以上是关于在 Spark Structured Streaming 中反序列化自引用 protobuf的主要内容,如果未能解决你的问题,请参考以下文章
一文读懂 超简单的 structured stream 源码解读
无法使用Spark Structured Streaming在Parquet文件中写入数据
如何使用Spark Structured Streaming连续监视目录
在 Spark Structured Streaming 中处理二进制数据