MapType 列值上的 PySpark 杠杆函数

Posted

技术标签:

【中文标题】MapType 列值上的 PySpark 杠杆函数【英文标题】:PySpark Leverage Function on MapType Column Values 【发布时间】:2021-10-22 14:57:18 【问题描述】:

下面是一个代表我想要完成的数据框。不过请注意,我要利用的功能比这个例子要复杂一些。

import pyspark
from pyspark.sql import SparkSession

arrayData = [
        ('1',1:100,2:200),
        ('1',1:100,2:None)]

df=spark.createDataFrame(data=arrayData, schema = ['id','value'])

我想做的是利用 withColumn 创建一个新列,其中包含一个已应用函数的新地图类型对象。

假设我想对每个值进行平方。我知道我可以创建一个将值乘以 2 并使用 withColumn 的 udf...但是这似乎不适用于应用于 MapType。

我想要达到的输出是:

arrayData = [
        ('1',1:200,2:200,1:400,2:400),
        ('1',1:100,2:None,1:200,2:None)]

df=spark.createDataFrame(data=arrayData, schema = ['id','value','newCol'])

最后,我需要保持并行性并尽量避免爆炸,因为我想将它保持在一行中。这如何实现?

【问题讨论】:

【参考方案1】:

不知道为什么它不适合你,但这个 UDF 对我来说很好用

import pyspark.sql.functions as F

def sq(m):
    return k:e**2 if e is not None else None for k,e in m.items()

print(sq(1:100,2:200))  # 1: 10000, 2: 40000
print(sq(1:100,2:None)) # 1: 10000, 2: None

(df
    .withColumn('newCol', F.udf(sq, T.MapType(T.LongType(), T.LongType()))('value'))
    .show(10, False)
)

# Output
# +---+---------------------+------------------------+
# |id |value                |newCol                  |
# +---+---------------------+------------------------+
# |1  |1 -> 100, 2 -> 200 |1 -> 10000, 2 -> 40000|
# |1  |1 -> 100, 2 -> null|1 -> 10000, 2 -> null |
# +---+---------------------+------------------------+

【讨论】:

我对此的担心是列表推导会将数据发送到驱动程序,就像一个 for 循环一样,我会遇到内存不足的问题。我弄错了吗? 它不会发送给驱动程序。工人仍然是 UDF 运行的地方。它不如内置的 Spark 函数高效,但仍然具有并行性。

以上是关于MapType 列值上的 PySpark 杠杆函数的主要内容,如果未能解决你的问题,请参考以下文章

如何更新重复列值上的 mySQL 键

如果数据帧基于列值上的过滤器,则从字典中提取行数据

在pyspark中展平Maptype列

pyspark:从现有列创建 MapType 列

如何从 Pyspark 中的 MapType 列获取键和值

ag-grid:在列值上创建过滤器下拉列表?