Spark - 在数据集的几列上应用 UDF 并形成新列
Posted
技术标签:
【中文标题】Spark - 在数据集的几列上应用 UDF 并形成新列【英文标题】:Spark - Apply UDF on few columns of a Dataset and form new columns 【发布时间】:2017-10-25 06:27:24 【问题描述】:我有一个字符串类型的数据集,我想在该数据集的某些列上应用一个函数,并根据该列将它们转换为 Long 或 Double 或 Int 等,并附加新列(甚至是其中的一个元组)列)到相同的数据集。有人可以建议这样做的正确方法吗?
更新:
以下失败:
ds.withColumn("newCol", Vectors.dense(strDoubleUDF(ds("col10")) + str2DoubleUDF(ds("col12")))
有错误
<console>:253: error: overloaded method value dense with alternatives:
(values: Array[Double])org.apache.spark.mllib.linalg.Vector <and>
(firstValue: Double,otherValues: Double*)org.apache.spark.mllib.linalg.Vector
cannot be applied to (org.apache.spark.sql.Column, org.apache.spark.sql.Column)
Vectors.dense(str2DoubleUDF(ds("col10")),
【问题讨论】:
如果没有一些数据和更详细的说明您想要实现的目标,这听起来有点太复杂了。也许还可以添加预期的输出。 听起来像“cast”功能:) @Shaido Dataset 只是一列字符串.. 我只想将几列转换为 Double 并在最后添加这些 double 值的向量。以最简单的形式,如果我有一个数据集,其中一行和 3 列分别为“1”、“2”、“3”,我想用 Vector(1.0, 2.0, 3.0) 添加一个新列。字符串有时不是数值,因此在我的代码中我实际上必须应用 UDF,但最终需要将 Double 值的向量添加到数据集中。 @T.Gawęda 我已经通过了 UDF 的“转换”部分,因为转换涉及逻辑。失败的是向量化超过了这一步。用确切的错误更新了帖子。 @S.K.好的,现在看来需要UDF。请看我的回答:) 【参考方案1】:没有内置支持制作向量,所以你应该使用 UDF:
val vectorUDF = udf ((col1 : Seq[Double], col2 : Seq[Double]) =>
Vectors.dense(col1 + col2)
);
ds.withColumn("newCol", vectorUDF(strDoubleUDF(ds("col10")), str2DoubleUDF(ds("col12")))
【讨论】:
VectorUDF 在这里是采用一两个参数吗?它的调用方式,似乎我们只传递了一个但定义似乎表明有两个参数 @S.K 两个参数 - 见编辑。复制粘贴太多:)【参考方案2】:这里有一个如何实现的示例:
val ds: Dataset[(String, String)] = Seq(
("1.0","1"),
("2.0","2"),
("3.0","3"),
("4.0","4")
).toDS()
val newDs: Dataset[(String, String, (Double, Int))] = ds
.mapcase (doubleStr,intStr) =>
(doubleStr,
intStr,
(doubleStr.toDouble,intStr.toInt) // new struct/tuple column
)
【讨论】:
感谢您的建议,这听起来很有希望。但是,我有大约 30 多列并且枚举以防万一可能很麻烦。有没有办法像我们对 Dataframes 做的那样……就像我们可以用额外的列和所有旧列定义新模式一样。 val newSc = StructType(Array(StructField(colName, DoubleType, false)) ++ df.schema.fields) 然后执行 df.sqlContext.createDataFrame( df.rdd.zipWithIndex.map(ln => Row.fromSeq(Seq( ln._2 + offset) ++ ln._1.toSeq ++ Seq())), newSc) .. 类似这样的东西。谢谢 @S.K 如果您坚持使用 Dataframe API,这很容易,只需将withColumn
与 UDF 结合使用或仅进行强制转换以上是关于Spark - 在数据集的几列上应用 UDF 并形成新列的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Spark(Java)在数据集的所有列上并行应用相同的函数
Pandas UDF Facebook Prophet / 多个参数