如何使用Java UDF将新列添加到Spark数据帧

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Java UDF将新列添加到Spark数据帧相关的知识,希望对你有一定的参考价值。

我有一个Dataset<Row> inputDS有4列,即Id, List<long> time, List<String> value, aggregateType我想使用map函数向Dataset value_new添加一个列,该map函数采用列timevalueaggregateType将其传递给函数getAggregate(String aggregateType, List<long> time, List<String> value)并返回处理参数的double值。方法Double返回的getAggregate值将是新的列值,即value_new的值

数据集输入DS

 +------+---+-----------+---------------------------------------------+---------------+
 |    Id| value         |     time                                   |aggregateType  |
 +------+---------------+---------------------------------------------+---------------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |
 +------+---------------+---------------------------------------------+---------------+

预期的数据集输出DS

 +------+---------------+---------------------------------------------+---------------+-----------+
 |    Id| value         |     time                                    |aggregateType  | value_new |
 +------+---------------+---------------------------------------------+---------------+-----------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |   9.4     |
 +------+---------------+---------------------------------------------+---------------+-----------+

我试过的代码。

 inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>(){

 public double call(Row row){
 String aggregateType = row.getAS("aggregateType");
 List<long> timeList = row.getList("time");
 List<long> valueList= row.getList("value");  

 return  getAggregate(aggregateType ,timeList,valueList);    

 }}),Encoders.DOUBLE())));

错误

 Unsupported literal type class org.apache.spark.sql.Dataset [value:double]

注意抱歉,如果我错误地使用了map功能,如果有任何解决方法,请建议我。

谢谢。!

答案

你得到错误,因为你试图使用lit()的结果创建一个函数文字(Dataset.map()),你可以在docs中看到它是一个数据集。你可以在API中看到Dataset.withColumn()你需要一个列的参数。

您似乎需要创建用户定义的函数。看看How do I call a UDF on a Spark DataFrame using JAVA?

以上是关于如何使用Java UDF将新列添加到Spark数据帧的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark 如何将新列从列表/数组附加到 Spark 数据帧

如何将新列和相应的行特定值添加到火花数据帧?

在 UDF 之后将新列附加到现有 PySpark 数据帧

如何将新列动态添加到 bigquery 中已存在的表..?

如何将新列添加到 Android SQLite 数据库?

spark scala - UDF 用于创建新列