在 PySpark 中对 DataFrame 进行逐行操作

Posted

技术标签:

【中文标题】在 PySpark 中对 DataFrame 进行逐行操作【英文标题】:Rowwise manipulation of a DataFrame in PySpark 【发布时间】:2017-08-22 12:46:40 【问题描述】:

如果有一个 DataFrame 并且想要根据行的值对函数中的数据进行一些操作。

my_udf(row):
    threshold = 10
        if row.val_x > threshold
        row.val_x = another_function(row.val_x)
        row.val_y = another_function(row.val_y)
        return row
    else:
        return row

有人知道如何将我的 udf 应用到 DataFrame 吗?

【问题讨论】:

【参考方案1】:

据我了解,udf 参数是列名。您的示例可能会这样重写:

from pyspark.sql.functions import udf, array
from pyspark.sql.types import IntegerType

def change_val_x(val_x):
    threshold = 10
    if val_x > threshold:
        return another_function(val_x)
    else:
        return val_x

def change_val_y(arr):
    threshold = 10
    # arr[0] -> val_x, arr[0] -> val_y 
    if arr[0] > threshold:
        return another_function(arr[1])
    else:
        return val_y

change_val_x_udf = udf(change_val_x, IntegerType())
change_val_y_udf = udf(change_val_y, IntegerType())

# apply these functions to your dataframe
df = df.withColumn('val_y', change_val_y_udf(array('val_x', 'val_y')))\
       .withColumn('val_x', change_val_x_udf('val_x'))

要修改 val_x 列,一个简单的 udf 就足够了,但对于 val_y,您需要 val_y 和 val_x 列值,解决方案是使用 array。请注意,此代码未经测试...

请参阅this question 以在多个列上应用 udf。

【讨论】:

.withColumn('val_y', change_val_y_udf(array('val_x', 'val_y'))) 不是.withColumn('val_y', change_val_x_udf(array('val_x', 'val_y'))) 另外,您可能在change_val_y_udf 中使用之前更改了val_x 的值。【参考方案2】:

如果您可以使用 pyspark 函数,最好不要使用 UDF,如果您无法将 another_function 转换为 pyspark 函数,您可以这样做:

from pyspark.sql.types import *
import pyspark.sql.functions as psf

def another_function(val):
    ...

another_function_udf = psf.udf(another_function, [outputType()])

其中outputType()是与another_function的输出对应的pyspark类型(IntegerType()StringType()...)

def apply_another_function(val):
    return psf.when(df.val_x > threshold, another_function_udf(val)).otherwise(val)

df = df.withColumn('val_y', apply_another_function(df.val_y))\
       .withColumn('val_x', apply_another_function(df.val_x))

【讨论】:

以上是关于在 PySpark 中对 DataFrame 进行逐行操作的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中对数组中的标签进行编码

我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?

在 pyspark 中对大量列进行累积求和的优化方法

如何在pyspark中对数组中的标签进行编码

在pyspark中使用整数对列进行编码

在pyspark中用整数编码一列