Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)
Posted
技术标签:
【中文标题】Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)【英文标题】:Spark structured streaming kafka convert JSON without schema (infer schema) 【发布时间】:2018-06-29 21:06:51 【问题描述】:我读过 Spark Structured Streaming 不支持将 Kafka 消息读取为 JSON 的模式推断。有没有办法像 Spark Streaming 一样检索架构:
val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema
【问题讨论】:
您能解释一下上下文吗?听起来像XY problem。 我正在尝试从 Kafka 消费到我们的柱状数据库。我们的主题没有架构,我想使用可选列查询主题。是否可以使用可选列构建架构列,结构化流式传输? 可选是什么意思?不存在 int 记录?只需定义一个带有可为空字段的静态架构。 【参考方案1】:这是一种可能的方法:
在开始流式传输之前,从 Kafka 获取一小批数据
从小批量推断架构
使用提取的架构开始流式传输数据。
下面的伪代码说明了这种方法。
第 1 步:
从 Kafka 中提取一小批(两条记录),
val smallBatch = spark.read.format("kafka")
.option("kafka.bootstrap.servers", "node:9092")
.option("subscribe", "topicName")
.option("startingOffsets", "earliest")
.option("endingOffsets", """"topicName":"0":2""")
.load()
.selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()
第 2 步: 将小批量写入文件:
smallBatch.write.mode("overwrite").format("text").save("/batch")
此命令将小批量写入 hdfs 目录 /batch。它创建的文件的名称是 part-xyz*。因此,您首先需要使用 hadoop FileSystem 命令重命名文件(参见 org.apache.hadoop.fs._ 和 org.apache.hadoop.conf.Configuration,这是一个示例 https://***.com/a/41990859),然后将文件读取为 json:
val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema
这里,batchName.txt 是文件的新名称,smallBatchSchema 包含从小批量推断的架构。
最后,您可以按如下方式流式传输数据(第 3 步):
val inputDf = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "node:9092")
.option("subscribe", "topicName")
.option("startingOffsets", "earliest")
.load()
val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
.select( from_json($"json", schema=smallBatchSchema).as("data"))
.select("data.*")
希望这会有所帮助!
【讨论】:
不写文件还有其他方法吗?【参考方案2】:可以使用这个结构:
myStream = spark.readStream.schema(spark.read.json("my_sample_json_file_as_schema.json").schema).json("my_json_file")..
这怎么可能?好吧,由于 spark.read.json("..").schema 准确地返回了一个想要的推断模式,你可以使用这个返回的模式作为 spark.readStream 的强制模式参数的参数
我所做的是指定一个单行示例 json 作为推断架构内容的输入,这样就不会不必要地占用内存。如果您的数据发生变化,只需更新您的 sample-json。
我花了一些时间才弄明白(手动构建 StructTypes 和 StructFields 很痛苦..),因此我会为所有的赞成票感到高兴:-)
【讨论】:
不写文件还有其他方法吗?【参考方案3】:这是不可能的。 Spark Streaming 在开发中支持有限的模式推断,spark.sql.streaming.schemaInference
设置为true
:
默认情况下,来自基于文件的源的结构化流式处理要求您指定架构,而不是依靠 Spark 自动推断它。此限制可确保将一致的模式用于流式查询,即使在失败的情况下也是如此。对于临时用例,您可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用架构推断。
但它不能用于从 Kafka 消息中提取 JSON,并且 DataFrameReader.json
不支持将 Datasets
流式传输作为参数。
您必须手动提供架构How to read records in JSON format from Kafka using Structured Streaming?
【讨论】:
今天我们通过 Spark Streaming 消费,我们没有架构。我使用的是 DateFrame.schema 是否可以检索架构。val df = stream.selectExpr("cast (value as string) as json") 从 df 中检索它? Thx,关于使用带有 Spark 结构化流的 Kafka 并在没有模式的情况下转换 JSON 的任何想法???【参考方案4】:可以将 JSON 转换为 DataFrame,而无需手动输入架构,如果这是您要问的。
最近我遇到了一种情况,我通过 Kafka 接收大量长的嵌套 JSON 数据包,手动输入架构既麻烦又容易出错。
使用少量数据样本和一些技巧,您可以向 Spark2+ 提供架构,如下所示:
val jsonstr = """ copy paste a representative sample of data here"""
val jsondf = spark.read.json(Seq(jsonstr).toDS) //jsondf.schema has the nested json structure we need
val event = spark.readStream.format..option...load() //configure your source
val eventWithSchema = event.select($"value" cast "string" as "json").select(from_json($"json", jsondf.schema) as "data").select("data.*")
现在您可以像使用 Direct Streaming 一样使用此 val 做任何您想做的事情。创建临时视图,运行 SQL 查询,等等。
【讨论】:
【参考方案5】:将Arnon's 解决方案带到下一步(因为它在 spark 的较新版本中已被弃用,并且需要迭代整个数据框以进行类型转换)
spark.read.json(df.as[String])
无论如何,就目前而言,它仍处于试验阶段。
【讨论】:
究竟什么是弃用的? github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/…以上是关于Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)的主要内容,如果未能解决你的问题,请参考以下文章
Spark从Kafka获取数据写入MySQL的实现(流式数据)
从多个 Kafka 主题读取的 Spark 结构化流式应用程序
Spark:如何使用 RowEncoder 创建流式数据集?