Pyspark:访问 UDF 中行内的列

Posted

技术标签:

【中文标题】Pyspark:访问 UDF 中行内的列【英文标题】:Pyspark: Accessing a column within row in a UDF 【发布时间】:2019-08-16 08:17:27 【问题描述】:

pyspark 的初学者试图理解 UDF:

我有一个 PySpark 数据框 p_b,我通过传递数据框的所有行来调用 UDF。我想从行访问列debit。出于某种原因,这没有发生。请在下面找到 sn-ps。

p_b has 4 columns, id, credit, debit,sum

功能:

def test(row): return('123'+row['debit'])

转换为 UDF

test_udf=udf(test,IntegerType())

在数据帧 p_b 上调用 UDF

vals=test_udf(struct([p_b[x] for x in p_b.columns])) print(type(vals)) print(vals)

输出

Column<b'test(named_struct(id, credit,debit,sum))'>

【问题讨论】:

您似乎正在尝试将“123”添加到数据框的每一行。不是吗? 您必须使用 with 列为您的数据框调用 udf,数据框列值必须作为参数传递。像这样定义你的函数。 def user_func(row): 返回行+123 my_func = udf(user_func, IntegerType()) newdf = df.withColumn('new_column',my_func(df.value)) 查看详情。 ***.com/questions/57517381/… 感谢 cmets。我试图将“123”添加到“借方”列的所有行 【参考方案1】:

让我们先制作一个示例数据框:

from pyspark.sql.functions import *
from pyspark.sql.types import *  
schema = StructType([StructField("id", StringType(), True),\
                               StructField("credit", IntegerType(), True),\
                       StructField("debit", IntegerType(), True),\
                     StructField("sum", IntegerType(), True)])
df = spark.createDataFrame([("user_10",100, 10,110),("user_11",200, 20,220),("user_12",300, 30,330) ], schema)
df.show()

导致:

+-------+------+-----+---+
|     id|credit|debit|sum|
+-------+------+-----+---+
|user_10|   100|   10|110|
|user_11|   200|   20|220|
|user_12|   300|   30|330|
+-------+------+-----+---+

现在,让我们定义将 123 添加到传递给它的值的 udf:

def test(x):
    return(123+x)
test_udf=udf(test,IntegerType())

让我们看看如何使用 UDF:

df2 = df.withColumn( 'debit' , test_udf(col('debit')) )
df2.show()

导致:

+-------+------+-----+---+
|     id|credit|debit|sum|
+-------+------+-----+---+
|user_10|   100|  133|110|
|user_11|   200|  143|220|
|user_12|   300|  153|330|
+-------+------+-----+---+

请注意,现在您可能需要重新计算“sum”列:

df2 = df2.withColumn( 'sum' ,  col('debit')+col('credit') )
df2.show()

导致:

+-------+------+-----+---+
|     id|credit|debit|sum|
+-------+------+-----+---+
|user_10|   100|  133|233|
|user_11|   200|  143|343|
|user_12|   300|  153|453|
+-------+------+-----+---+

【讨论】:

以上是关于Pyspark:访问 UDF 中行内的列的主要内容,如果未能解决你的问题,请参考以下文章

将 pyspark pandas_udf 与 AWS EMR 一起使用时出现“没有名为‘pandas’的模块”错误

如何在不使用 StandardScaler 的情况下标准化 PySpark 中的列?

如何在pyspark的列中找到列表的平均值?

pyspark 中的 UDF 能否返回与列不同的对象?

如何在 pyspark 中使用 pandas UDF 并在 StructType 中返回结果

pyspark 在 udf 中获取结构数据类型的字段名称