嵌入在数组内的数组中的 PySpark Sum 字段

Posted

技术标签:

【中文标题】嵌入在数组内的数组中的 PySpark Sum 字段【英文标题】:PySpark Sum fields that are imbedded in a array within an array 【发布时间】:2021-01-26 19:14:18 【问题描述】:

我想总结一个数组中的一个字段。下面是一个结构示例:

records = '["_main_object":"parent_array":["child_array":["amount_set":"presentment_money":"amount":"2.73","currency_code":"USD","shop_money":"amount":"2.73","currency_code":"USD"],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":["amount_set":"presentment_money":"amount":"2.27","currency_code":"USD","shop_money":"amount":"2.27","currency_code":"USD"],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""],"_main_object":"parent_array":["child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":["amount_set":"presentment_money":"amount":"2.20","currency_code":"USD","shop_money":"amount":"2.20","currency_code":"USD"],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":["amount_set":"presentment_money":"amount":"2.80","currency_code":"USD","shop_money":"amount":"2.80","currency_code":"USD"],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""]]'
df = spark.read.json(sc.parallelize([records]))
df.show()
df.printSchema()

这是架构:

root
 |-- _main_object: struct (nullable = true)
 |    |-- parent_array: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- child_array: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- amount_set: struct (nullable = true)
 |    |    |    |    |    |    |-- presentment_money: struct (nullable = true)
 |    |    |    |    |    |    |    |-- amount: string (nullable = true)
 |    |    |    |    |    |    |    |-- currency_code: string (nullable = true)
 |    |    |    |    |    |    |-- shop_money: struct (nullable = true)
 |    |    |    |    |    |    |    |-- amount: string (nullable = true)
 |    |    |    |    |    |    |    |-- currency_code: string (nullable = true)
 |    |    |    |-- rollup_total_presentment_money_amount: string (nullable = true)
 |    |    |    |-- rollup_total_shop_money_amount: string (nullable = true)

我发现它具有挑战性,将 _main_object.parent_array[*].child_array[*].amount_set.presentment_money.amount 中的值相加并将其存储在 _main_object.parent_array[*].rollup_total_presentment_money_amount 中。

根据@mck 的建议,我提出了以下建议:

df4 = df.withColumn("_main_object", struct(
                                    expr("""transform(_main_object.parent_array, p -> struct(\
                                            p.child_array as child_array,\
                                            p.rollup_total_shop_money_amount as rollup_total_shop_money_amount,\
                                            aggregate(transform(p.child_array, c -> double(c.amount_set.presentment_money.amount) ), double(0), (acc, x) -> acc + x) as rollup_total_presentment_money_amount\
                                            ))\
                                        """).alias("parent_array")    
                             ))
df4.printSchema()
df4.select("_main_object.parent_array").show(truncate=False) 

架构看起来正确:

root
 |-- _main_object: struct (nullable = false)
 |    |-- parent_array: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- child_array: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- amount_set: struct (nullable = true)
 |    |    |    |    |    |    |-- presentment_money: struct (nullable = true)
 |    |    |    |    |    |    |    |-- amount: string (nullable = true)
 |    |    |    |    |    |    |    |-- currency_code: string (nullable = true)
 |    |    |    |    |    |    |-- shop_money: struct (nullable = true)
 |    |    |    |    |    |    |    |-- amount: string (nullable = true)
 |    |    |    |    |    |    |    |-- currency_code: string (nullable = true)
 |    |    |    |-- rollup_total_shop_money_amount: string (nullable = true)
 |    |    |    |-- rollup_total_presentment_money_amount: double (nullable = true)

输出:

df4.select("_main_object.parent_array").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------+
|parent_array                                                                                                  |
+--------------------------------------------------------------------------------------------------------------+
|[[[[[[2.73, USD], [2.73, USD]]]], , 2.73], [[[[[2.27, USD], [2.27, USD]]]], , 2.27], [[], , 0.0], [[], , 0.0]]|
|[[[], , 0.0], [[[[[2.20, USD], [2.20, USD]]]], , 2.2], [[], , 0.0], [[[[[2.80, USD], [2.80, USD]]]], , 2.8]]  |
+--------------------------------------------------------------------------------------------------------------+

【问题讨论】:

【参考方案1】:

希望您使用的是 Spark >= 2.4,其中引入了 aggregatetransform 函数:

df2 = df.selectExpr("""
    aggregate(
        flatten(
            transform(
                _main_object.parent_array,
                p -> transform(
                    p.child_array,
                    c -> double(c.amount_set.presentment_money.amount)
                )
            )
        ),
    double(0),
    (acc, x) -> acc + x
    ) total
""")

df2.show()
+-----+
|total|
+-----+
|  5.0|
|  5.0|
+-----+

【讨论】:

从 Spark 2.4 开始引入 :) 啊,谢谢@blackbishop - 我一定把它和别的东西搞混了! @mck 我根据您的建议提供了一个解决方案,用于填充 parent_array 中的 rollup_total_presentment_money_amount 字段。如果有其他解决方案,我们将不胜感激。

以上是关于嵌入在数组内的数组中的 PySpark Sum 字段的主要内容,如果未能解决你的问题,请参考以下文章

获取数组内的数组总和

循环数组的最大字串和Maximal-sum Subsequence

利用 PySpark,确定数组列中有多少元素包含在另一列中的数组数组中

基于其中一个数组中的 Null 值共同过滤 Pyspark 结构中的两个数组

在pyspark中的两个时间戳之间创建一个时间戳数组

如何在pyspark /中的结构内爆炸结构中的内部数组