带有 UDF 的 PySpark 数据框

Posted

技术标签:

【中文标题】带有 UDF 的 PySpark 数据框【英文标题】:PySpark dataframe with UDF 【发布时间】:2020-11-15 11:35:51 【问题描述】:

我为 PySpark DF 提供了以下代码,通过将以下变量传递给 DF 来进行一些计算和聚合:

number_of_plays = 8
date_from = 2020-01-05
date_to = 2020-03-10

df_1 = df.groupBy('player_1', 'player_2').agg(count("*").alias("no_of_plays")).filter(column('no_of_plays')>number_of_plays).filter(column('game_date').between(date_from, date_to))
df_1.show()

现在我想将它包装到 Spark UDF 中,我可以将 3 个变量 number_of_plays、date_from、date_to 作为参数传递给这个函数 所以函数应该看起来像

def myfn (number_of_plays, date_from, date_to):
   # do the aggregation here and return the result

在我的代码中使用。

任何想法如何使用 Python 3 做到这一点?

【问题讨论】:

【参考方案1】:

这里不需要 UDF - 一个简单的 Python 函数就可以完成这项工作:

def myfn(df, number_of_plays, date_from, date_to):
    return (df.groupBy('player_1', 'player_2')
              .agg(count("*").alias("no_of_plays"))
              .filter(column('no_of_plays') > number_of_plays)
              .filter(column('game_date').between(date_from, date_to))
           )

你可以直接调用它,比如myfn(df, 10, ...)

【讨论】:

以上是关于带有 UDF 的 PySpark 数据框的主要内容,如果未能解决你的问题,请参考以下文章

pyspark:将多个数据框字段传递给 udf

Pyspark:使用带有参数的UDF创建一个新列[重复]

pyspark 数据框 UDF 异常处理

pyspark 在 udf 中使用数据框

所有列的 Pyspark 数据框数据类型由 UDF 更改为 String

PySpark - 将列表作为参数传递给 UDF + 迭代数据框列添加