java - 调用自定义火花UDF时如何解决Java中的NoSuchMethodException
Posted
技术标签:
【中文标题】java - 调用自定义火花UDF时如何解决Java中的NoSuchMethodException【英文标题】:How to resolve NoSuchMethodException in java when calling custom spark UDF 【发布时间】:2020-11-09 06:41:12 【问题描述】:我有一个 java spark 流应用程序(使用 spark 3.0.1),我需要在其中调用流数据的自定义转换。这些自定义转换在类中定义为方法,并作为 jar 文件提供给我们。我们需要将这些方法包装在 UDF 中并在我们的 spark 代码中调用它们。一组转换样本可以定义如下。请记住它是以罐子的形式出现的。
import java.io.Serializable;
public class CustomTransformations implements Serializable
public String f1(String input)
return input + "_1";
public String f2(String input)
return input + "_2";
public String f3(String input)
return input + "_3";
假设在某个地方(例如 json 或配置文件)我们有一个转换映射和相应的方法名称(字符串),因此给定转换,我们可以将相应的方法包装在 UDF 中并调用它。为此,我创建了以下类。
import java.lang.reflect.Method;
import static org.apache.spark.sql.functions.udf;
public class Creator
public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
throws NoSuchMethodException
Method method = ct.getClass().getDeclaredMethod(funcName);
return udf(
(UDF1<String, Object>) method::invoke, DataTypes.StringType);
到目前为止没有编译错误。但现在的问题是,如果我从 spark 代码中调用此方法 getUDF
,它会显示 NoSuchMethodException
。例如。我的火花代码如下所示。
public class SampleSparkJob
public static void main(String[] args)
SparkSession.Builder sparkSessionBuilder = SparkSession.builder()
.master("local[2]")
.appName("sample-streaming");
CustomTransformations ct = new CustomTransformations();
try (SparkSession spark = sparkSessionBuilder.getOrCreate())
Dataset<Row> df1 = MyKafkaConnectors.readFromKafka();
// this is where I get the exceptions
Dataset<Row> df2 = df1
.withColumn("value", Creator.getUDF(ct, "f1").apply(col("value")))
.withColumn("value", Creator.getUDF(ct, "f2").apply(col("value")))
.withColumn("value", Creator.getUDF(ct, "f3").apply(col("value")));
StreamingQuery query = MyKafkaConnectors.WriteToKafka(df2);
query.awaitTermination();
catch (TimeoutException | StreamingQueryException | NoSuchMethodException e)
e.printStackTrace();
这是我得到的错误:
java.lang.NoSuchMethodException: <pkgname>.CustomTransformations.f1()
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
at Creator.getUDF(Creator.java:14)
at SampleSparkJob.main(SampleSparkJob.java:29)
包名是正确的。显然,客户端的CustomTransformations
类有一个方法f1
。所以我无法理解为什么它会显示这个错误。任何帮助表示赞赏。
【问题讨论】:
【参考方案1】:您的代码有两个问题,都与 spark 无关。
首先,NoSuchMethodException:CustomTransformations.f1()
告诉你,没有任何方法 f1
没有任何参数确实存在。这是真的。
您需要将参数类型指定为getDeclaredMethod
(本例中为字符串)。
第二,一个方法不能用invoke自己调用,需要传递“owner”或“this”对象来调用。
然后Creator
固定看起来像这样:
public class Creator
public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
throws NoSuchMethodException
Method method = ct.getClass().getDeclaredMethod(funcName, String.class);
return udf(
(UDF1<String, Object>) (s -> method.invoke(ct, s)), DataTypes.StringType);
它会工作得很好。
【讨论】:
感谢您指出我的代码中的缺陷。它解决了我试图解决的问题,所以我会接受你的回答。但是供大家参考,这种方式本身是行不通的,因为java.lang.reflect.Method
没有实现java.io.Serializable
。这是与 Spark 相关的问题。如果我能找到解决方案,我会添加一个新的答案。【参考方案2】:
对于@fonkap 给出的准确答案,我只有一点要补充。由于java.lang.reflect.Method
不可序列化,我们需要绕过在getUDF
方法中引用该对象。 Creator
类将类似于以下内容。
import static org.apache.spark.sql.functions.udf;
public class Creator implements Serializable
public static UserDefinedFunction getUDF(CustomTransformation ct, String funcName)
return udf((UDF1<String, Object>) (s -> ct.getClass().getDeclaredMethod(funcName,
String.class).invoke(ct, s)),
DataTypes.StringType);
【讨论】:
以上是关于java - 调用自定义火花UDF时如何解决Java中的NoSuchMethodException的主要内容,如果未能解决你的问题,请参考以下文章