Spark - 为 udf 提供额外的参数
Posted
技术标签:
【中文标题】Spark - 为 udf 提供额外的参数【英文标题】:Spark - provide extra parameter to udf 【发布时间】:2018-03-20 15:56:10 【问题描述】:我正在尝试创建一个 spark scala udf 来转换以下形状的 MongoDB 对象:
Object:
"1": 50.3
"8": 2.4
"117": 1.0
进入 Spark ml SparseVector。 问题是,为了创建一个 SparseVector,我还需要一个输入参数——它的大小。 在我的应用程序中,我将向量大小保存在单独的 MongoDB 集合中。 所以,我定义了以下 UDF 函数:
val mapToSparseVectorUdf = udf
(myMap: Map[String, Double], size: Int) =>
val vb: VectorBuilder[Double] = new VectorBuilder(length = -1)
vb.use(myMap.keys.map(key => key.toInt).toArray, myMap.values.toArray, size)
vb.toSparseVector
我试图这样称呼它:
df.withColumn("VecColumn", mapToSparseVectorUdf(col("MapColumn"), vecSize)).drop("MapColumn")
但是,我的 IDE 对那个 udf 调用说“不适用”。 有没有办法制作这种可以带额外参数的UDF?
【问题讨论】:
【参考方案1】:Udf 函数需要将列作为参数传递,而传递的 columns
将通过序列化解析为原始数据类型和反序列化。这就是为什么 udf 函数很昂贵
如果vecSize
是一个整数常量,那么你可以简单地使用lit
内置函数作为
df.withColumn("VecColumn", mapToSparseVectorUdf(col("MapColumn"), lit(vecSize))).drop("MapColumn")
【讨论】:
谢谢。我应该导入哪个包来使用 lit 功能? IDE 说它无法解决它 啊,找到了:import org.apache.spark.sql.functions.col, udf, lit 很高兴听到你自己找到了它@DaniilAndreyevichBaunov :)【参考方案2】:这样就可以了:
def mapToSparseVectorUdf(vectorSize: Int) = udf[Vector, Map[String, Double]](
(myMap: Map[String, Double]) =>
val elements = myMap.toSeq.map case (index, value) => (index.toInt, value)
Vectors.sparse(vectorSize, elements)
)
用法:
val data = spark.createDataFrame(Seq(
("1", Map("1" -> 50.3, "8" -> 2.4)),
("2", Map("2" -> 23.5, "3" -> 41.2))
)).toDF("id", "MapColumn")
data.withColumn("VecColumn", mapToSparseVectorUdf(10)($"MapColumn")).show(false)
注意:
考虑修复您的 MongoDB 架构! ;) 大小是 SparseVector 的成员,我不会将它与它的元素分开。
【讨论】:
.. 然后按如下方式使用:df.withColumn("VecColumn", mapToSparseVectorUdf(vecSize)(col("MapColumn")))
添加了示例用法!
我正在考虑以这种方式更改我的架构。但是在我的 NodeJS 服务器中使用这样的模式会很不方便。谢谢你的建议以上是关于Spark - 为 udf 提供额外的参数的主要内容,如果未能解决你的问题,请参考以下文章
如何将一些额外的字符串参数传递给每行的 spark udf?
Spark(Hive) SQL中UDF的使用(Python)