MapType 列值上的 PySpark 杠杆函数
Posted
技术标签:
【中文标题】MapType 列值上的 PySpark 杠杆函数【英文标题】:PySpark Leverage Function on MapType Column Values 【发布时间】:2021-10-22 14:57:18 【问题描述】:下面是一个代表我想要完成的数据框。不过请注意,我要利用的功能比这个例子要复杂一些。
import pyspark
from pyspark.sql import SparkSession
arrayData = [
('1',1:100,2:200),
('1',1:100,2:None)]
df=spark.createDataFrame(data=arrayData, schema = ['id','value'])
我想做的是利用 withColumn 创建一个新列,其中包含一个已应用函数的新地图类型对象。
假设我想对每个值进行平方。我知道我可以创建一个将值乘以 2 并使用 withColumn 的 udf...但是这似乎不适用于应用于 MapType。
我想要达到的输出是:
arrayData = [
('1',1:200,2:200,1:400,2:400),
('1',1:100,2:None,1:200,2:None)]
df=spark.createDataFrame(data=arrayData, schema = ['id','value','newCol'])
最后,我需要保持并行性并尽量避免爆炸,因为我想将它保持在一行中。这如何实现?
【问题讨论】:
【参考方案1】:不知道为什么它不适合你,但这个 UDF 对我来说很好用
import pyspark.sql.functions as F
def sq(m):
return k:e**2 if e is not None else None for k,e in m.items()
print(sq(1:100,2:200)) # 1: 10000, 2: 40000
print(sq(1:100,2:None)) # 1: 10000, 2: None
(df
.withColumn('newCol', F.udf(sq, T.MapType(T.LongType(), T.LongType()))('value'))
.show(10, False)
)
# Output
# +---+---------------------+------------------------+
# |id |value |newCol |
# +---+---------------------+------------------------+
# |1 |1 -> 100, 2 -> 200 |1 -> 10000, 2 -> 40000|
# |1 |1 -> 100, 2 -> null|1 -> 10000, 2 -> null |
# +---+---------------------+------------------------+
【讨论】:
我对此的担心是列表推导会将数据发送到驱动程序,就像一个 for 循环一样,我会遇到内存不足的问题。我弄错了吗? 它不会发送给驱动程序。工人仍然是 UDF 运行的地方。它不如内置的 Spark 函数高效,但仍然具有并行性。以上是关于MapType 列值上的 PySpark 杠杆函数的主要内容,如果未能解决你的问题,请参考以下文章