如何使用 Java UDF 向 Spark 数据框添加新列

Posted

技术标签:

【中文标题】如何使用 Java UDF 向 Spark 数据框添加新列【英文标题】:How to add new column to Spark dataframe using a Java UDF 【发布时间】:2019-03-25 07:21:15 【问题描述】:

我有一个 Dataset<Row> inputDS,它有 4 列,即 Id, List<long> time, List<String> value, aggregateType 我想使用 map 函数向 Dataset value_new 添加一列,该 map 函数采用列 timevalue 和 @ 987654327@ 将其传递给函数getAggregate(String aggregateType, List<long> time, List<String> value) 并在处理参数时返回一个双精度值。 getAggregate 方法返回的 Double 值将是新列值,即 value_new 的值

数据集输入DS

 +------+---+-----------+---------------------------------------------+---------------+
 |    Id| value         |     time                                   |aggregateType  |
 +------+---------------+---------------------------------------------+---------------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |
 +------+---------------+---------------------------------------------+---------------+

预期的数据集输出DS

 +------+---------------+---------------------------------------------+---------------+-----------+
 |    Id| value         |     time                                    |aggregateType  | value_new |
 +------+---------------+---------------------------------------------+---------------+-----------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |   9.4     |
 +------+---------------+---------------------------------------------+---------------+-----------+

我尝试过的代码

 inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>()

 public double call(Row row)
 String aggregateType = row.getAS("aggregateType");
 List<long> timeList = row.getList("time");
 List<long> valueList= row.getList("value");  

 return  getAggregate(aggregateType ,timeList,valueList);    

 ),Encoders.DOUBLE())));

错误

 Unsupported literal type class org.apache.spark.sql.Dataset [value:double]

注意对不起,如果我错误地使用了map函数,如果有任何解决方法,请建议我。

谢谢!

【问题讨论】:

【参考方案1】:

您收到错误是因为您尝试使用 Dataset.map() 的结果创建函数文字 (lit()),您可以在 docs 中看到它是一个数据集。您可以在Dataset.withColumn() 的 API 中看到,您需要一个列参数。

您似乎需要创建一个用户定义的函数。看看How do I call a UDF on a Spark DataFrame using JAVA?

【讨论】:

以上是关于如何使用 Java UDF 向 Spark 数据框添加新列的主要内容,如果未能解决你的问题,请参考以下文章

Java代码如何向Spark注册无参数UDF

如何使用Java UDF将新列添加到Spark数据帧

在数据框 API 中使用 spark SQL udf

SPARK 数据框错误:在使用 UDF 拆分列中的字符串时无法转换为 scala.Function2

如何创建 Pyspark UDF 以向数据框添加新列

如何使用scala将特定函数转换为apache spark中的udf函数? [复制]