pyspark 在列上应用函数

Posted

技术标签:

【中文标题】pyspark 在列上应用函数【英文标题】:pyspark apply function on column 【发布时间】:2018-08-24 09:16:22 【问题描述】:

我想在数据框列上运行自定义函数。该列有一个长字符串,其中包含一些 Opened 或 Clicked 信息。字符串格式在 Path 列中是这样的:

+---------------------------------------------------------------+               
|                                                           Path|
+---------------------------------------------------------------+
|/utility/tracking/opened/50DD3254-BA1D-4D0B-ADB5-6529E9C90982/0|
|/utility/tracking/tracking/ClickedUrl                          |
+---------------------------------------------------------------+

源数据框看起来像这样

enter image description here

现在我正在运行 pyspark UDF 以应用于“路径”列,该列查找列中是否“打开”或“单击”,并为我提供具有值 10 或 20 的“路径”列的新数据框,否则为空,具体取决于打开点击或其他条件

def clickopen(x):
    if 'opened' in x.lower().split('/'):
        print(10)
    elif 'clickedurl' in x.lower().split('/'):
        print(20)
    else:
        print('null')
hunter = udf(clickopen)  
new_df = new_df.withColumn("Path", hunter("Path"))
new_df.show(n=20)

并且结果在“路径”列中显示为空,而不是值 10 或 20

enter image description here

在获取整数值方面需要一点帮助,如果我做错了什么,请提出建议,我们将不胜感激。在此先感谢

【问题讨论】:

需要返回值形式的函数不打印。 【参考方案1】:

正如其他人所提到的,您的主要问题是您正在打印值而不是返回它。但是,即使在修复之后,在此处使用 udf 效率也非常低。

相反,您可以使用pyspark.sql.functions.when()pyspark.sql.Column.like()

试试:

import pyspark.sql.functions as f

new_df = new_df.withColumn(
    "Path",
    f.when(
        f.lower(f.col("Path")).like(r"%opened%"),
        f.lit(10)
    ).when(
        f.lower(f.col("Path")).like(r"%clickedurl"),
        f.lit(20)
    )
)

默认情况下,如果条件都不匹配,when 将返回 null

【讨论】:

【参考方案2】:

您需要从函数返回值而不是打印它们。像这样——

def clickopen(x):
    if 'opened' in x.lower().split('/'):
        return 10
    elif 'clickedurl' in x.lower().split('/'):
        return 20
    else:
        return None

我不确定如何在 pySpark 中返回 null。

【讨论】:

以上是关于pyspark 在列上应用函数的主要内容,如果未能解决你的问题,请参考以下文章

在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()

在 PySpark 中的多个列上应用 MinMaxScaler

Pyspark - 一次聚合数据框的所有列[重复]

Pyspark:在列和索引上排名()?

基于另一列中的值的一列上的pyspark滞后函数

在 pyspark 中的特定列上应用过滤器描述