Spark - 为 udf 提供额外的参数

Posted

技术标签:

【中文标题】Spark - 为 udf 提供额外的参数【英文标题】:Spark - provide extra parameter to udf 【发布时间】:2018-03-20 15:56:10 【问题描述】:

我正在尝试创建一个 spark scala udf 来转换以下形状的 MongoDB 对象:

Object:
    "1": 50.3
    "8": 2.4
    "117": 1.0

进入 Spark ml SparseVector。 问题是,为了创建一个 SparseVector,我还需要一个输入参数——它的大小。 在我的应用程序中,我将向量大小保存在单独的 MongoDB 集合中。 所以,我定义了以下 UDF 函数:

val mapToSparseVectorUdf = udf 
  (myMap: Map[String, Double], size: Int) => 
    val vb: VectorBuilder[Double] = new VectorBuilder(length = -1)
    vb.use(myMap.keys.map(key => key.toInt).toArray, myMap.values.toArray, size)
    vb.toSparseVector
  

我试图这样称呼它:

df.withColumn("VecColumn", mapToSparseVectorUdf(col("MapColumn"), vecSize)).drop("MapColumn")

但是,我的 IDE 对那个 udf 调用说“不适用”。 有没有办法制作这种可以带额外参数的UDF?

【问题讨论】:

【参考方案1】:

Udf 函数需要将列作为参数传递,而传递的 columns 将通过序列化解析为原始数据类型反序列化。这就是为什么 udf 函数很昂贵

如果vecSize一个整数常量,那么你可以简单地使用lit 内置函数作为

df.withColumn("VecColumn", mapToSparseVectorUdf(col("MapColumn"), lit(vecSize))).drop("MapColumn")

【讨论】:

谢谢。我应该导入哪个包来使用 lit 功能? IDE 说它无法解决它 啊,找到了:import org.apache.spark.sql.functions.col, udf, lit 很高兴听到你自己找到了它@DaniilAndreyevichBaunov :)【参考方案2】:

这样就可以了:

def mapToSparseVectorUdf(vectorSize: Int) = udf[Vector, Map[String, Double]](
  (myMap: Map[String, Double]) => 
    val elements = myMap.toSeq.map case (index, value) => (index.toInt, value)
    Vectors.sparse(vectorSize, elements)
  
)

用法:

val data = spark.createDataFrame(Seq(
    ("1", Map("1" -> 50.3, "8" -> 2.4)), 
    ("2", Map("2" -> 23.5, "3" -> 41.2))
)).toDF("id", "MapColumn")

data.withColumn("VecColumn", mapToSparseVectorUdf(10)($"MapColumn")).show(false)

注意:

考虑修复您的 MongoDB 架构! ;) 大小是 SparseVector 的成员,我不会将它与它的元素分开。

【讨论】:

.. 然后按如下方式使用:df.withColumn("VecColumn", mapToSparseVectorUdf(vecSize)(col("MapColumn"))) 添加了示例用法! 我正在考虑以这种方式更改我的架构。但是在我的 NodeJS 服务器中使用这样的模式会很不方便。谢谢你的建议

以上是关于Spark - 为 udf 提供额外的参数的主要内容,如果未能解决你的问题,请参考以下文章

如何将一些额外的字符串参数传递给每行的 spark udf?

为 Spark UDF 执行提供上下文

如何使用 withColumn 将额外的参数传递给 UDF

Spark(Hive) SQL中UDF的使用(Python)

Pandas UDF Facebook Prophet / 多个参数

在 Spark 中将可选参数建模为 UDF 的最佳方法是啥?