如何将行传递到pyspark udf

Posted

技术标签:

【中文标题】如何将行传递到pyspark udf【英文标题】:How to pass rows into pyspark udf 【发布时间】:2020-07-07 17:13:52 【问题描述】:

我有一个包含列(id、id2、vec、vec2)的表,其中 id 是整数,向量是 pyspark SparseVeectors。我想写一个 udf 来获取 vec 和 vec2 的点积,如下所示:

def dot_product(vec, vec2): #or do the row and I can access them later as row.vec, row.vec2?
    return vec.dot(vec2)
udf = udf(dot_product, FloatType())
dot_product = df.withColumn('dot_product', udf('vec', 'vec2'))

但这会将字符串 'vec' 和 'vec2' 作为变量传递。我试过 df.vec 和 df.vec 但它通过了整个专栏。我怎样才能做到这一点作为一个行操作?我不能使用 selectExpr 因为 SQL 没有点函数。如果有 SQL 方法,我也会对此持开放态度

【问题讨论】:

请尝试对udf、传递列和向量函数进行一些研究。堆栈溢出中已经有很多关于相同主题的答案。 @Raghu 我可以传递该列,但它说“列”对象不可调用。我也意识到当我为问题重写代码时,我忘记了代码中的一行 在答案中,列必须包含在 col() 中。我认为即使您不能使用 .dot 功能,因为列没有该属性。 @Raghu 是的,我也试过了。我完全按原样尝试了下面提供的答案,但这也不起作用。我需要做一个点积,你有什么其他建议吗? 您必须在 UDF 中实现点积的逻辑 - 类似于 ***.com/questions/35363542/… 【参考方案1】:

使用

将您的函数转换为 pyspark udf
import pyspark.sql.functions as F
dot_prod_udf = F.udf(dotproduct,<returnType>)
df.select(*,dot_prod_udf(F.col('vec'),F.col('vec2')))

【讨论】:

这就是我一直在做的事情,并且有一个错误说“列”对象不可调用。抱歉,我在这里重新输入时忘记了“udf = ...”行

以上是关于如何将行传递到pyspark udf的主要内容,如果未能解决你的问题,请参考以下文章

如何将行转换为pyspark中的字典列表?

将行列表保存到 pyspark 中的 Hive 表

如何将行值从 iframe 弹出窗口传递到父页面文本框

PySpark如何读取具有多种编码的字符串的文件

pyspark 将行转换为列

Pyspark 将行数据转换为键值对