Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)

Posted

技术标签:

【中文标题】Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)【英文标题】:Spark structured streaming kafka convert JSON without schema (infer schema) 【发布时间】:2018-06-29 21:06:51 【问题描述】:

我读过 Spark Structured Streaming 不支持将 Kafka 消息读取为 JSON 的模式推断。有没有办法像 Spark Streaming 一样检索架构:

val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema 

【问题讨论】:

您能解释一下上下文吗?听起来像XY problem。 我正在尝试从 Kafka 消费到我们的柱状数据库。我们的主题没有架构,我想使用可选列查询主题。是否可以使用可选列构建架构列,结构化流式传输? 可选是什么意思?不存在 int 记录?只需定义一个带有可为空字段的静态架构。 【参考方案1】:

这是一种可能的方法:

    在开始流式传输之前,从 Kafka 获取一小批数据

    从小批量推断架构

    使用提取的架构开始流式传输数据。

下面的伪代码说明了这种方法。

第 1 步:

从 Kafka 中提取一小批(两条记录),

val smallBatch = spark.read.format("kafka")
                           .option("kafka.bootstrap.servers", "node:9092")
                           .option("subscribe", "topicName")
                           .option("startingOffsets", "earliest")
                           .option("endingOffsets", """"topicName":"0":2""")
                           .load()
                           .selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()

第 2 步: 将小批量写入文件:

smallBatch.write.mode("overwrite").format("text").save("/batch")

此命令将小批量写入 hdfs 目录 /batch。它创建的文件的名称是 part-xyz*。因此,您首先需要使用 hadoop FileSystem 命令重命名文件(参见 org.apache.hadoop.fs._ 和 org.apache.hadoop.conf.Configuration,这是一个示例 https://***.com/a/41990859),然后将文件读取为 json:

val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema

这里,batchName.txt 是文件的新名称,smallBatchSchema 包含从小批量推断的架构。

最后,您可以按如下方式流式传输数据(第 3 步):

val inputDf = spark.readStream.format("kafka")
                             .option("kafka.bootstrap.servers", "node:9092")
                             .option("subscribe", "topicName")
                             .option("startingOffsets", "earliest")
                             .load()

val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
                    .select( from_json($"json", schema=smallBatchSchema).as("data"))
                    .select("data.*")

希望这会有所帮助!

【讨论】:

不写文件还有其他方法吗?【参考方案2】:

可以使用这个结构:

myStream = spark.readStream.schema(spark.read.json("my_sample_json_file_as_schema.json").schema).json("my_json_file")..

这怎么可能?好吧,由于 spark.read.json("..").schema 准确地返回了一个想要的推断模式,你可以使用这个返回的模式作为 spark.readStream 的强制模式参数的参数

我所做的是指定一个单行示例 json 作为推断架构内容的输入,这样就不会不必要地占用内存。如果您的数据发生变化,只需更新您的 sample-json。

我花了一些时间才弄明白(手动构建 StructTypes 和 StructFields 很痛苦..),因此我会为所有的赞成票感到高兴:-)

【讨论】:

不写文件还有其他方法吗?【参考方案3】:

这是不可能的。 Spark Streaming 在开发中支持有限的模式推断,spark.sql.streaming.schemaInference 设置为true

默认情况下,来自基于文件的源的结构化流式处理要求您指定架构,而不是依靠 Spark 自动推断它。此限制可确保将一致的模式用于流式查询,即使在失败的情况下也是如此。对于临时用例,您可以通过将 spark.sql.streaming.schemaInference 设置为 true 来重新启用架构推断。

但它不能用于从 Kafka 消息中提取 JSON,并且 DataFrameReader.json 不支持将 Datasets 流式传输作为参数。

您必须手动提供架构How to read records in JSON format from Kafka using Structured Streaming?

【讨论】:

今天我们通过 Spark Streaming 消费,我们没有架构。我使用的是 DateFrame.schema 是否可以检索架构。val df = stream.selectExpr("cast (value as string) as json") 从 df 中检索它? Thx,关于使用带有 Spark 结构化流的 Kafka 并在没有模式的情况下转换 JSON 的任何想法???【参考方案4】:

可以将 JSON 转换为 DataFrame,而无需手动输入架构,如果这是您要问的。

最近我遇到了一种情况,我通过 Kafka 接收大量长的嵌套 JSON 数据包,手动输入架构既麻烦又容易出错。

使用少量数据样本和一些技巧,您可以向 Spark2+ 提供架构,如下所示:

val jsonstr = """ copy paste a representative sample of data here"""
val jsondf = spark.read.json(Seq(jsonstr).toDS) //jsondf.schema has the nested json structure we need

val event = spark.readStream.format..option...load() //configure your source

val eventWithSchema = event.select($"value" cast "string" as "json").select(from_json($"json", jsondf.schema) as "data").select("data.*")

现在您可以像使用 Direct Streaming 一样使用此 val 做任何您想做的事情。创建临时视图,运行 SQL 查询,等等。

【讨论】:

【参考方案5】:

将Arnon's 解决方案带到下一步(因为它在 spark 的较新版本中已被弃用,并且需要迭代整个数据框以进行类型转换)

spark.read.json(df.as[String])

无论如何,就目前而言,它仍处于试验阶段。

【讨论】:

究竟什么是弃用的? github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/…

以上是关于Spark结构化流式kafka在没有模式的情况下转换JSON(推断模式)的主要内容,如果未能解决你的问题,请参考以下文章

Spark从Kafka获取数据写入MySQL的实现(流式数据)

从多个 Kafka 主题读取的 Spark 结构化流式应用程序

Spark:如何使用 RowEncoder 创建流式数据集?

基于kafka分区的结构化流式读取

Spark 结构化流式传输 - 为不同的 GroupBy 键使用不同的 Windows

如何在 Spark 结构化流中手动设置 group.id 并提交 kafka 偏移量?