java - 调用自定义火花UDF时如何解决Java中的NoSuchMethodException

Posted

技术标签:

【中文标题】java - 调用自定义火花UDF时如何解决Java中的NoSuchMethodException【英文标题】:How to resolve NoSuchMethodException in java when calling custom spark UDF 【发布时间】:2020-11-09 06:41:12 【问题描述】:

我有一个 java spark 流应用程序(使用 spark 3.0.1),我需要在其中调用流数据的自定义转换。这些自定义转换在类中定义为方法,并作为 jar 文件提供给我们。我们需要将这些方法包装在 UDF 中并在我们的 spark 代码中调用它们。一组转换样本可以定义如下。请记住它是以罐子的形式出现的。

import java.io.Serializable;

public class CustomTransformations implements Serializable 
    public String f1(String input) 
        return input + "_1";
    

    public String f2(String input) 
        return input + "_2";
    

    public String f3(String input) 
        return input + "_3";
    

假设在某个地方(例如 json 或配置文件)我们有一个转换映射和相应的方法名称(字符串),因此给定转换,我们可以将相应的方法包装在 UDF 中并调用它。为此,我创建了以下类。

import java.lang.reflect.Method;

import static org.apache.spark.sql.functions.udf;

public class Creator 
    public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
            throws NoSuchMethodException 
        Method method = ct.getClass().getDeclaredMethod(funcName);
        return udf(
                (UDF1<String, Object>) method::invoke, DataTypes.StringType);
    

到目前为止没有编译错误。但现在的问题是,如果我从 spark 代码中调用此方法 getUDF,它会显示 NoSuchMethodException。例如。我的火花代码如下所示。

public class SampleSparkJob 
    public static void main(String[] args) 
        SparkSession.Builder sparkSessionBuilder = SparkSession.builder()
                .master("local[2]")
                .appName("sample-streaming");

        CustomTransformations ct = new CustomTransformations();
        try (SparkSession spark = sparkSessionBuilder.getOrCreate()) 
            Dataset<Row> df1 = MyKafkaConnectors.readFromKafka();

            // this is where I get the exceptions
            Dataset<Row> df2 = df1
                    .withColumn("value", Creator.getUDF(ct, "f1").apply(col("value")))
                    .withColumn("value", Creator.getUDF(ct, "f2").apply(col("value")))
                    .withColumn("value", Creator.getUDF(ct, "f3").apply(col("value")));

            StreamingQuery query = MyKafkaConnectors.WriteToKafka(df2);
            query.awaitTermination();
         catch (TimeoutException | StreamingQueryException | NoSuchMethodException e) 
            e.printStackTrace();
        
    

这是我得到的错误:

java.lang.NoSuchMethodException: <pkgname>.CustomTransformations.f1()
    at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
    at Creator.getUDF(Creator.java:14)
    at SampleSparkJob.main(SampleSparkJob.java:29)

包名是正确的。显然,客户端的CustomTransformations 类有一个方法f1。所以我无法理解为什么它会显示这个错误。任何帮助表示赞赏。

【问题讨论】:

【参考方案1】:

您的代码有两个问题,都与 spark 无关。

首先,NoSuchMethodException:CustomTransformations.f1() 告诉你,没有任何方法 f1 没有任何参数确实存在。这是真的。

您需要将参数类型指定为getDeclaredMethod(本例中为字符串)。

第二,一个方法不能用invoke自己调用,需要传递“owner”或“this”对象来调用。

然后Creator 固定看起来像这样:

public class Creator 
    public static UserDefinedFunction getUDF(CustomTransformations ct, String funcName)
            throws NoSuchMethodException 
        Method method = ct.getClass().getDeclaredMethod(funcName, String.class);
        return udf(
                (UDF1<String, Object>) (s -> method.invoke(ct, s)), DataTypes.StringType);
    

它会工作得很好。

【讨论】:

感谢您指出我的代码中的缺陷。它解决了我试图解决的问题,所以我会接受你的回答。但是供大家参考,这种方式本身是行不通的,因为java.lang.reflect.Method没有实现java.io.Serializable。这是与 Spark 相关的问题。如果我能找到解决方案,我会添加一个新的答案。【参考方案2】:

对于@fonkap 给出的准确答案,我只有一点要补充。由于java.lang.reflect.Method 不可序列化,我们需要绕过在getUDF 方法中引用该对象。 Creator 类将类似于以下内容。

import static org.apache.spark.sql.functions.udf;

public class Creator implements Serializable 
    public static UserDefinedFunction getUDF(CustomTransformation ct, String funcName) 
        return udf((UDF1<String, Object>) (s -> ct.getClass().getDeclaredMethod(funcName,
                String.class).invoke(ct, s)),
                DataTypes.StringType);
                

【讨论】:

以上是关于java - 调用自定义火花UDF时如何解决Java中的NoSuchMethodException的主要内容,如果未能解决你的问题,请参考以下文章

UDF Scala 火花语法

hive自定义UDF函数,步骤详解

Hive开发自定义函数UDF

描述 hive 自定义 UDF 文档

UDF函数,hive调用java包简单方法

Databricks 火花 UDF 不适用于过滤的数据帧