Java Spark-如何以多列作为参数调用UDF
Posted
技术标签:
【中文标题】Java Spark-如何以多列作为参数调用UDF【英文标题】:Java Spark- How to call UDF with multiple column as argument 【发布时间】:2020-04-02 05:53:20 【问题描述】:我正在尝试执行以下代码
SparkSession sparkSession = SparkSession
.builder()
.appName("test")
.master("local")
//.enableHiveSupport()
.getOrCreate();
StructField[] structFields = new StructField[]
new StructField("FIRST", DataTypes.StringType, true, Metadata.empty()),
new StructField("SECOND", DataTypes.StringType, true, Metadata.empty())
;
StructType structType = new StructType(structFields);
List<Row> rows = new ArrayList<>();
rows.add(RowFactory.create("1","2"));
rows.add(RowFactory.create("2","3"));
Dataset<Row> dataDs = sparkSession.createDataFrame(rows, structType);
UDF1 mode = new UDF1<String,String, String>()
public String call(String a,String b) throws Exception
return a+b;
;
sparkSession.udf().register("mode",mode,DataTypes.StringType);
dataDs.withColumn("newCol",callUDF("mode",col("FIRST"),col("SECOND")));
dataDs.show();
但是,我似乎无法像使用多个参数那样声明 UDF
UDF1 mode = new UDF1<String,String, String>()
public String call(String a,String b) throws Exception
return a+b;
;
我想将我的两列作为 UDF 的输入传递。
我可以在第一个参数中传递一个对象,比如 Integer 或其他东西,但我想传递多个列。我该怎么做?有人可以帮我吗?我一直在互联网上搜索这个但无法找到解决方案。我是新来的火花。
【问题讨论】:
函数接口根据参数的数量不同命名。所以你需要做到UDF2 mode = new UDF2<String, String, String>() ...
哦。我懂了。我不知道。我是新来的火花。谢谢。我会试试看。
请注意:您可以使用 lambda 表达式缩短代码:UDF2<String, String, String> mode = (a, b) -> a + b;
(如果您使用的是 Java 8+)
谢谢。我实际上想在这个 UDF 中调用一个复杂的代码,所以不要认为 lambda 会对此有所帮助。
【参考方案1】:
我完全同意@emest_k 的 cmets 并想再添加一件事。因为 dataFrame 是不可变的,所以您需要使用新的 dataFrame 来使用 show() 查看输出。这是一个示例。
...
UDF2 mode = new UDF2<String, String, String>()
public String call(String a, String b) throws Exception
return a + b;
;
sparkSession.udf().register("mode", mode, DataTypes.StringType);
Dataset<Row> newDataFrame = dataDs.withColumn("newCol", callUDF("mode", col("FIRST"), col("SECOND")));
newDataFrame.show();
【讨论】:
以上是关于Java Spark-如何以多列作为参数调用UDF的主要内容,如果未能解决你的问题,请参考以下文章
如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?
如何在不指定每一列的情况下将整行作为参数传递给 Spark(Java)中的 UDF?
java,如何在spark 1.4.1中调用UDF [重复]