嵌入在数组内的数组中的 PySpark Sum 字段
Posted
技术标签:
【中文标题】嵌入在数组内的数组中的 PySpark Sum 字段【英文标题】:PySpark Sum fields that are imbedded in a array within an array 【发布时间】:2021-01-26 19:14:18 【问题描述】:我想总结一个数组中的一个字段。下面是一个结构示例:
records = '["_main_object":"parent_array":["child_array":["amount_set":"presentment_money":"amount":"2.73","currency_code":"USD","shop_money":"amount":"2.73","currency_code":"USD"],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":["amount_set":"presentment_money":"amount":"2.27","currency_code":"USD","shop_money":"amount":"2.27","currency_code":"USD"],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""],"_main_object":"parent_array":["child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":["amount_set":"presentment_money":"amount":"2.20","currency_code":"USD","shop_money":"amount":"2.20","currency_code":"USD"],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":[],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":"","child_array":["amount_set":"presentment_money":"amount":"2.80","currency_code":"USD","shop_money":"amount":"2.80","currency_code":"USD"],"rollup_total_shop_money_amount":"","rollup_total_presentment_money_amount":""]]'
df = spark.read.json(sc.parallelize([records]))
df.show()
df.printSchema()
这是架构:
root
|-- _main_object: struct (nullable = true)
| |-- parent_array: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- child_array: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- amount_set: struct (nullable = true)
| | | | | | |-- presentment_money: struct (nullable = true)
| | | | | | | |-- amount: string (nullable = true)
| | | | | | | |-- currency_code: string (nullable = true)
| | | | | | |-- shop_money: struct (nullable = true)
| | | | | | | |-- amount: string (nullable = true)
| | | | | | | |-- currency_code: string (nullable = true)
| | | |-- rollup_total_presentment_money_amount: string (nullable = true)
| | | |-- rollup_total_shop_money_amount: string (nullable = true)
我发现它具有挑战性,将 _main_object.parent_array[*].child_array[*].amount_set.presentment_money.amount
中的值相加并将其存储在 _main_object.parent_array[*].rollup_total_presentment_money_amount
中。
根据@mck 的建议,我提出了以下建议:
df4 = df.withColumn("_main_object", struct(
expr("""transform(_main_object.parent_array, p -> struct(\
p.child_array as child_array,\
p.rollup_total_shop_money_amount as rollup_total_shop_money_amount,\
aggregate(transform(p.child_array, c -> double(c.amount_set.presentment_money.amount) ), double(0), (acc, x) -> acc + x) as rollup_total_presentment_money_amount\
))\
""").alias("parent_array")
))
df4.printSchema()
df4.select("_main_object.parent_array").show(truncate=False)
架构看起来正确:
root
|-- _main_object: struct (nullable = false)
| |-- parent_array: array (nullable = true)
| | |-- element: struct (containsNull = false)
| | | |-- child_array: array (nullable = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- amount_set: struct (nullable = true)
| | | | | | |-- presentment_money: struct (nullable = true)
| | | | | | | |-- amount: string (nullable = true)
| | | | | | | |-- currency_code: string (nullable = true)
| | | | | | |-- shop_money: struct (nullable = true)
| | | | | | | |-- amount: string (nullable = true)
| | | | | | | |-- currency_code: string (nullable = true)
| | | |-- rollup_total_shop_money_amount: string (nullable = true)
| | | |-- rollup_total_presentment_money_amount: double (nullable = true)
输出:
df4.select("_main_object.parent_array").show(truncate=False)
+--------------------------------------------------------------------------------------------------------------+
|parent_array |
+--------------------------------------------------------------------------------------------------------------+
|[[[[[[2.73, USD], [2.73, USD]]]], , 2.73], [[[[[2.27, USD], [2.27, USD]]]], , 2.27], [[], , 0.0], [[], , 0.0]]|
|[[[], , 0.0], [[[[[2.20, USD], [2.20, USD]]]], , 2.2], [[], , 0.0], [[[[[2.80, USD], [2.80, USD]]]], , 2.8]] |
+--------------------------------------------------------------------------------------------------------------+
【问题讨论】:
【参考方案1】:希望您使用的是 Spark >= 2.4,其中引入了 aggregate
和 transform
函数:
df2 = df.selectExpr("""
aggregate(
flatten(
transform(
_main_object.parent_array,
p -> transform(
p.child_array,
c -> double(c.amount_set.presentment_money.amount)
)
)
),
double(0),
(acc, x) -> acc + x
) total
""")
df2.show()
+-----+
|total|
+-----+
| 5.0|
| 5.0|
+-----+
【讨论】:
从 Spark 2.4 开始引入 :) 啊,谢谢@blackbishop - 我一定把它和别的东西搞混了! @mck 我根据您的建议提供了一个解决方案,用于填充 parent_array 中的 rollup_total_presentment_money_amount 字段。如果有其他解决方案,我们将不胜感激。以上是关于嵌入在数组内的数组中的 PySpark Sum 字段的主要内容,如果未能解决你的问题,请参考以下文章
循环数组的最大字串和Maximal-sum Subsequence
利用 PySpark,确定数组列中有多少元素包含在另一列中的数组数组中