使用火花流解析事件中心消息上的 JSON

Posted

技术标签:

【中文标题】使用火花流解析事件中心消息上的 JSON【英文标题】:Parsing JSON on Event Hub messages using spark streaming 【发布时间】:2022-01-08 13:39:42 【问题描述】:

我正在尝试解析通过EventHub 流式传输的JSON 文件,我将消息正文转换为string,然后我正在使用from_json,如下所示。我可以将整个 JSON 对象保存为增量表中的单个单元格(当我在下面的代码中将 df4 写入流时会发生这种情况),但是当我使用 body.*col(body.*) 拆分 @987654328 @ 进入多个列我得到一个错误。有关如何处理此问题的任何建议。

// Scala Code //
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

incomingStream.printSchema()

val outputStream = incomingStream.select($"body".cast(StringType)).alias("body")
                                
val df = outputStream.toDF()
val df4=df.select(from_json(col("body"),jsonSchema))
val df5=df4.select("body.*")

df5.writeStream
  .format("delta")
  .outputMode("append")
  .option("ignoreChanges", "true")
  .option("checkpointLocation", "/mnt/abc/checkpoints/samplestream")
  .start("/mnt/abc/samplestream")

输出

root
 |-- body: binary (nullable = true)
 |-- partition: string (nullable = true)
 |-- offset: string (nullable = true)
 |-- sequenceNumber: long (nullable = true)
 |-- enqueuedTime: timestamp (nullable = true)
 |-- publisher: string (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- systemProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

root
 |-- body: string (nullable = true)

AnalysisException: cannot resolve 'body.*' given input columns 'body'
    at org.apache.spark.sql.catalyst.analysis.UnresolvedStarBase.expand(unresolved.scala:416)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.$anonfun$expand$1(Analyzer.scala:2507)
    at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$expand(Analyzer.scala:2506)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.$anonfun$buildExpandedProjectList$1(Analyzer.scala:2526)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.buildExpandedProjectList(Analyzer.scala:2524)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$18.applyOrElse(Analyzer.scala:2238)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$18.applyOrElse(Analyzer.scala:2233)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:137)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:137)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:340)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:133)

以下链接显示了在控制台上显示的方式,它适用于我,我正在尝试将 json 写入具有多列的 delta 文件。

[https://***.com/questions/57298849/parsing-event-hub-messages-using-spark-streaming]

【问题讨论】:

您收到什么错误?请使用错误和相关的堆栈跟踪更新问题。 【参考方案1】:

您的代码的问题似乎是您使用alias 的方式,因此body 列不再可用。关于您在代码中使用别名的一些观察结果以及您可能会尝试解决此问题的方法:

观察 1

问题:

val outputStream = incomingStream.select($"body".cast(StringType)).alias("body")

您上面的代码是整个数据框的别名。如果您的意图是确保在字符串转换后将 body 列别名为 body,您可以尝试以下操作

建议:

val outputStream = incomingStream.select($"body".cast(StringType).alias("body"))

观察 2

你在哪里

问题:

val df4=df.select(from_json(col("body"),jsonSchema))

您应该使用别名,以便以后可以访问它,因为它现在被另一个名称引用(您可以在调试时使用printSchemashow 自己查看)。

建议:

val df4=df.select(from_json(col("body"),jsonSchema).alias("body"))

【讨论】:

@NgD 太好了!请对此答案投赞成票,然后将其标记为该问题的已接受答案,以帮助遇到类似问题的其他 *** 用户轻松识别对可能解决其问题的问题有效的答案。

以上是关于使用火花流解析事件中心消息上的 JSON的主要内容,如果未能解决你的问题,请参考以下文章

流分析通过事件中心从 Python 反序列化 JSON

“消息驱动事件驱动流 ”基础概念解析

动态解析JSON后如何确定节点是否存在

在javascript中解析json消息包数据

使用Spark解析多个JSON模式

Function Mesh:Serverless 在消息与流数据场景下的火花