在 pyspark 中为过滤后的数据帧调用函数

Posted

技术标签:

【中文标题】在 pyspark 中为过滤后的数据帧调用函数【英文标题】:Call a function for a filtered dataframe in pyspark 【发布时间】:2016-08-30 23:14:01 【问题描述】:

我有这样的数据,

数据

ID          filter
1             A
2             A
3             A
4             A
5             B
6             B
7             B
8             B

我想为数据框应用一个函数,

def add(x):
    y = x+1
    return(y)

from pyspark.sql.functions import *                 
from pyspark.sql.functions import udf

ol_val = udf(add, StringType())

data = data.withColumn("sum",ol_val(data.ID))

这给出了一个输出,

数据

ID          filter        sum
1             A            2
2             A            3
3             A            4
4             A            5
5             B            6
6             B            7
7             B            8
8             B            9

我只想在 filter = A其余部分我希望它为 NULL 时应用此功能。我想要的输出是,

数据

ID          filter        sum
1             A            2
2             A            3
3             A            4
4             A            5
5             B            NULL
6             B            NULL
7             B            NULL
8             B            NULL

这里的值为NULL,因为它不满足条件filter = A。我希望该函数仅在filter = A时应用。

谁能帮我更改代码以便在 pyspark 中获得此输出?

【问题讨论】:

【参考方案1】:

您需要使用when 和otherwise。顺便说一句,您不必创建 UDF

df = sc.parallelize([
    (1, "a"),
    (1, "b"),
    (3, "c")
  ]).toDF(["id", "filter"])

df.select("*", when(col("filter") == lit("a"), col("id") + 1).otherwise(None).alias("result")).show()

如果你真的需要调用那个函数,你可以简单地将col("id") + 1替换为yourUDF(col("id"))

【讨论】:

不是加一个,你能帮我修改代码,当filter = a时用col('id')调用函数add吗?我尝试更改此代码。但面临一些错误。如果您可以修改此代码以调用该函数会很有帮助

以上是关于在 pyspark 中为过滤后的数据帧调用函数的主要内容,如果未能解决你的问题,请参考以下文章

Elasticsearch:使用 rescore 来为过滤后的搜索结果重新打分

调用地图后的pyspark EOFError

在 pyspark 中使用 UDF 和简单数据帧

将 udf 调用移动到新函数后的 azure pyspark udf 属性 nonetype

熊猫数据帧的 PySpark rdd

Pyspark 错误:“Py4JJavaError:调用 o655.count 时出错。”在数据帧上调用 count() 方法时