数据框上的 Pyspark UDF 列
Posted
技术标签:
【中文标题】数据框上的 Pyspark UDF 列【英文标题】:Pyspark UDF column on Dataframe 【发布时间】:2018-09-26 16:08:32 【问题描述】:我正在尝试根据某些列的值在数据框上创建一个新列。它在所有情况下都返回 null。有人知道这个简单的例子出了什么问题吗?
df = pd.DataFrame([[0,1,0],[1,0,0],[1,1,1]],columns = ['Foo','Bar','Baz'])
spark_df = spark.createDataFrame(df)
def get_profile():
if 'Foo'==1:
return 'Foo'
elif 'Bar' == 1:
return 'Bar'
elif 'Baz' ==1 :
return 'Baz'
spark_df = spark_df.withColumn('get_profile', lit(get_profile()))
spark_df.show()
Foo Bar Baz get_profile
0 1 0 None
1 0 0 None
1 1 1 None
我希望所有行都填写 get_profile 列。
我也试过这个:
spark_udf = udf(get_profile,StringType())
spark_df = spark_df.withColumn('get_profile', spark_udf())
print(spark_df.toPandas())
同样的效果。
【问题讨论】:
您正在将字符串与数字进行比较。'Foo' != 1
,其他情况同理。这就是为什么你一无所获。 UDF 需要列作为参数,而 get_profile
的参数为零。
使用 when/otherwise 内置函数而不是 udf 函数
我会这样做:spark_df.withColumn("get_profile", coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns]))
谢谢 - 实际上 when/otherwise 函数并不实用,因为要进行更多比较,这只是一个简化示例。
【参考方案1】:
udf
不知道列名是什么。因此,它会检查您的if
/elif
块中的每个条件,并且所有条件都评估为False
。因此function will return None
。
您必须重写您的 udf
以获取您要检查的列:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def get_profile(foo, bar, baz):
if foo == 1:
return 'Foo'
elif bar == 1:
return 'Bar'
elif baz == 1 :
return 'Baz'
spark_udf = udf(get_profile, StringType())
spark_df = spark_df.withColumn('get_profile',spark_udf('Foo', 'Bar', 'Baz'))
spark_df.show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#| 0| 1| 0| Bar|
#| 1| 0| 0| Foo|
#| 1| 1| 1| Foo|
#+---+---+---+-----------+
如果您有很多列并且想要将它们全部传递(按顺序):
spark_df = spark_df.withColumn('get_profile', spark_udf(*spark_df.columns))
更一般地说,您可以解压缩任何有序的列列表:
cols_to_pass_to_udf = ['Foo', 'Bar', 'Baz']
spark_df = spark_df.withColumn('get_profile', spark_udf(*cols_to_pass_to_udf ))
但此特定操作不需要udf
。我会这样做:
from pyspark.sql.functions import coalesce, when, col, lit
spark_df.withColumn(
"get_profile",
coalesce(*[when(col(c)==1, lit(c)) for c in spark_df.columns])
).show()
#+---+---+---+-----------+
#|Foo|Bar|Baz|get_profile|
#+---+---+---+-----------+
#| 0| 1| 0| Bar|
#| 1| 0| 0| Foo|
#| 1| 1| 1| Foo|
#+---+---+---+-----------+
这是因为pyspark.sql.functions.when()
将默认返回null
如果条件计算为False
并且没有指定otherwise
。那么pyspark.sql.functions.coalesce
的列表推导将返回第一个非空列。
请注意,仅当列的顺序与 get_profile
函数中评估的顺序相同时,这才等效于 udf
。更明确地说,你应该这样做:
spark_df.withColumn(
"get_profile",
coalesce(*[when(col(c)==1, lit(c)) for c in ['Foo', 'Bar', 'Baz'])
).show()
【讨论】:
知道了 - 非常感谢。所以这是实际功能的简化版本。实际上,分配值的列/条件要多得多,并且嵌套的 when 结构不实用。有没有办法将所有列作为参数输入 UDF? @flyingmeatball 你也可以spark_udf(*spark_df.columns)
,但你必须确保列的顺序与你的udf参数的顺序相同。
另外,即使它很复杂,使用嵌套的when
也可能是faster than using a udf
。
明白了——它更多的是一种可读性/编程能力。我愿意牺牲一点速度来证明我的计算是正确的。以上是关于数据框上的 Pyspark UDF 列的主要内容,如果未能解决你的问题,请参考以下文章