数据框 Spark scala 爆炸 json 数组

Posted

技术标签:

【中文标题】数据框 Spark scala 爆炸 json 数组【英文标题】:dataframe Spark scala explode json array 【发布时间】:2017-08-08 03:44:51 【问题描述】:

假设我有一个如下所示的数据框:

+--------------------+--------------------+--------------------------------------------------------------+
|                id  |           Name     |                                                       Payment|
+--------------------+--------------------+--------------------------------------------------------------+
|                1   |           James    |[ "@id": 1, "currency":"GBP","@id": 2, "currency": "USD" ]|
+--------------------+--------------------+--------------------------------------------------------------+

架构是:

|-- id: integer (nullable = true)
|-- Name: string (nullable = true)   
|-- Payment: string (nullable = true)

我怎样才能把上面的 JSON 数组分解成下面的:

+--------------------+--------------------+-------------------------------+
|                id  |           Name     |                        Payment|
+--------------------+--------------------+-------------------------------+
|                1   |           James    |   "@id":1, "currency":"GBP" |
+--------------------+--------------------+-------------------------------+
|                1   |           James    |   "@id":2, "currency":"USD" |
+--------------------+--------------------+-------------------------------+

我一直在尝试使用如下所示的分解功能,但它不起作用。它给出了一个关于无法分解字符串类型的错误,并且它需要一个映射或数组。这是有道理的,因为架构表示它是一个字符串,而不是数组/映射,但我不确定如何将其转换为适当的格式。

val newDF = dataframe.withColumn("nestedPayment", explode(dataframe.col("Payment")))

非常感谢任何帮助!

【问题讨论】:

【参考方案1】:

您必须将 JSON 字符串解析为 JSON 的 数组,然后在结果上使用 explode(explode 需要一个数组)。

为此(假设 Spark 2.0.*):

如果您知道所有Payment 值都包含一个表示具有相同大小的数组的json(例如,在本例中为2),您可以硬编码提取第一个和第二个元素,将它们包装在一个数组中并分解:

val newDF = dataframe.withColumn("Payment", explode(array(
  get_json_object($"Payment", "$[0]"),
  get_json_object($"Payment", "$[1]")
)))

如果您不能保证所有记录都有一个带有 2 元素数组的 JSON,但您可以保证这些数组的 最大长度,则可以使用这个将元素解析到最大大小,然后过滤掉生成的 nulls 的技巧:

val maxJsonParts = 3 // whatever that number is...
val jsonElements = (0 until maxJsonParts)
                     .map(i => get_json_object($"Payment", s"$$[$i]"))

val newDF = dataframe
  .withColumn("Payment", explode(array(jsonElements: _*)))
  .where(!isnull($"Payment")) 

【讨论】:

有没有办法用while循环来做到这一点?好像效率更高 通过 while 循环实现的假定性能改进是如此之小,以至于可能无法衡量。这是一个 Spark 应用程序,可以假设运行时由实际的 DataFrame 操作而不是构建它们的驱动程序端代码支配。这种“过早的优化”只会让代码更难阅读。 您好,如果我不知道数组的最大长度。我该怎么做: val jsonElements = (0 until arrayLength) .map(i => get_json_object($"Payment", s"$$[$i]")) ? @TzachZohar 我们如何使用 get_json_object() 计算 json 数组的大小,我试过 get_json_object(col("col_name"), "$.length()"),它没有用并给出 null【参考方案2】:
import org.apache.spark.sql.types._

val newDF = dataframe.withColumn("Payment", 
explode(
from_json(
  get_json_object($"Payment", "$."),ArrayType(StringType)
)))

【讨论】:

请留下评论,而不是仅仅投反对票,以便我知道我的回答有什么问题。这是我的第一篇文章,当您只想提供帮助时,这非常令人沮丧。谢谢。【参考方案3】:

您可以使用 ArrayType 定义 Payment json 数组的架构。

import org.apache.spark.sql.types._

val paymentSchema = ArrayType(StructType(
                  Array(
                        StructField("@id", DataTypes.IntegerType),
                        StructField("currency", DataTypes.StringType)
                  )
))

然后将 from_json 与此架构一起使用后爆炸将返回所需的结果。

val newDF = dataframe.withColumn("Payment", explode(from_json($"Payment", paymentSchema)))

【讨论】:

以上是关于数据框 Spark scala 爆炸 json 数组的主要内容,如果未能解决你的问题,请参考以下文章

Scala Spark - 从简单的数据框创建嵌套的 json 输出

如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

我如何将平面数据框转换为 spark(scala 或 java)中的嵌套 json

如何将字符串中带有双引号的json文件加载到spark scala中的数据框中

如何使用 Apache Spark 和 Scala 创建嵌套 json

使用具有相同名称的嵌套子属性展平 Spark JSON 数据框