如何将行传递到pyspark udf
Posted
技术标签:
【中文标题】如何将行传递到pyspark udf【英文标题】:How to pass rows into pyspark udf 【发布时间】:2020-07-07 17:13:52 【问题描述】:我有一个包含列(id、id2、vec、vec2)的表,其中 id 是整数,向量是 pyspark SparseVeectors。我想写一个 udf 来获取 vec 和 vec2 的点积,如下所示:
def dot_product(vec, vec2): #or do the row and I can access them later as row.vec, row.vec2?
return vec.dot(vec2)
udf = udf(dot_product, FloatType())
dot_product = df.withColumn('dot_product', udf('vec', 'vec2'))
但这会将字符串 'vec' 和 'vec2' 作为变量传递。我试过 df.vec 和 df.vec 但它通过了整个专栏。我怎样才能做到这一点作为一个行操作?我不能使用 selectExpr 因为 SQL 没有点函数。如果有 SQL 方法,我也会对此持开放态度
【问题讨论】:
请尝试对udf、传递列和向量函数进行一些研究。堆栈溢出中已经有很多关于相同主题的答案。 @Raghu 我可以传递该列,但它说“列”对象不可调用。我也意识到当我为问题重写代码时,我忘记了代码中的一行 在答案中,列必须包含在 col() 中。我认为即使您不能使用 .dot 功能,因为列没有该属性。 @Raghu 是的,我也试过了。我完全按原样尝试了下面提供的答案,但这也不起作用。我需要做一个点积,你有什么其他建议吗? 您必须在 UDF 中实现点积的逻辑 - 类似于 ***.com/questions/35363542/… 【参考方案1】:使用
将您的函数转换为 pyspark udfimport pyspark.sql.functions as F
dot_prod_udf = F.udf(dotproduct,<returnType>)
df.select(*,dot_prod_udf(F.col('vec'),F.col('vec2')))
【讨论】:
这就是我一直在做的事情,并且有一个错误说“列”对象不可调用。抱歉,我在这里重新输入时忘记了“udf = ...”行以上是关于如何将行传递到pyspark udf的主要内容,如果未能解决你的问题,请参考以下文章