Spark Structured Streaming Databricks 事件中心架构定义问题
Posted
技术标签:
【中文标题】Spark Structured Streaming Databricks 事件中心架构定义问题【英文标题】:Spark Structured Streaming Databricks Event Hub Schema Defining issue 【发布时间】:2019-06-26 20:50:38 【问题描述】:我在定义 json 文档的结构时遇到问题。
现在我正在尝试在 streamread 上执行相同的架构。
val jsonSchema = StructType([ StructField("associatedEntities", struct<driver:StringType,truck:StringType>, True),
StructField("heading", StringType, True),
StructField("location", struct<accuracyType:StringType,captureDateTime:StringType,cityStateCode:StringType,description:StringType,latitude:DoubleType,longitude:DoubleType,quality:StringType,transmitDateTime:StringType>, True),
StructField("measurements", array<struct<type:StringType,uom:StringType,value:StringType>>, True),
StructField("source", struct<entityType:StringType,key:StringType,vendor:StringType>, True),
StructField("speed", DoubleType, True)])
val df = spark
.readStream
.format("eventhubs")
//.schema(jsonSchema)
.options(ehConf.toMap)
.load()
当我在笔记本中运行此单元格时“:15:错误:简单表达式的非法开始 val jsonSchema = StructType([ StructField("associatedEntities", struct, True),"
编辑:目标是将数据放入数据框中。我可以从事件中心消息的正文中获取 json 字符串,但如果我无法让架构工作,我不确定从那里做什么。
【问题讨论】:
检查这个 SO 问题***.com/questions/46568435/… 我将如何处理 array由于您的架构定义,您会收到错误消息。架构定义应如下所示:
import org.apache.spark.sql.types._
val jsonSchema = StructType(
Seq(StructField("associatedEntities",
StructType(Seq(
StructField("driver", StringType),
StructField ("truck", StringType)
))),
StructField("heading", StringType),
StructField("measurements", ArrayType(StructType(Seq(StructField ("type", StringType), StructField ("uom", StringType), StructField("value", StringType)))))
)
)
您可以使用以下命令仔细检查架构:
jsonSchema.printTreeString
返回架构:
root
|-- associatedEntities: struct (nullable = true)
| |-- driver: string (nullable = true)
| |-- truck: string (nullable = true)
|-- heading: string (nullable = true)
|-- measurements: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- type: string (nullable = true)
| | |-- uom: string (nullable = true)
| | |-- value: string (nullable = true)
如 cmets 中所述,您将获得二进制数据。所以首先你得到原始数据框:
val rawData = spark.readStream
.format("eventhubs")
.option(...)
.load()
你必须:
将数据转换为字符串 解析嵌套的json 并将其展平用解析后的数据定义数据框:
val parsedData = rawData
.selectExpr("cast (Body as string) as json")
.select(from_json($"json", jsonSchema).as("data"))
.select("data.*")
【讨论】:
我正在研究你的答案,我很好奇 seq 函数是做什么的?以上是关于Spark Structured Streaming Databricks 事件中心架构定义问题的主要内容,如果未能解决你的问题,请参考以下文章
Spark Structured Streaming - 1
删除由 spark-structured-streaming 写入的损坏的 parquet 文件时,我会丢失数据吗?