Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF

Posted

技术标签:

【中文标题】Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF【英文标题】:Spark SQL: How to call UDF from DataFrame operation using JAVA 【发布时间】:2015-11-20 14:45:58 【问题描述】:

我想知道如何使用 JAVA 从 Spark SQL 中的领域特定语言 (DSL) 函数调用 UDF 函数。

我有UDF函数(仅举例):

UDF2 equals = new UDF2<String, String, Boolean>() 
   @Override
   public Boolean call(String first, String second) throws Exception 
       return first.equals(second);
   
;

我已经将它注册到 sqlContext

sqlContext.udf().register("equals", equals, DataTypes.BooleanType);

当我运行以下查询时,我的 UDF 被调用并得到一个结果。

sqlContext.sql("SELECT p0.value FROM values p0 WHERE equals(p0.value, 'someString')");

我会在 Spark SQL 中使用领域特定语言的函数转换此查询,但我不知道该怎么做。

valuesDF.select("value").where(???);

我发现存在 callUDF() 函数,其中一个参数是 Function2 fnctn 但不是 UDF2。 如何使用 DSL 中的 UDF 和函数?

【问题讨论】:

【参考方案1】:

我找到了一个我满意的解决方案。 可以将 UDF 称为列条件,例如:

valuesDF.filter("equals(columnName, 'someString')").select("columnName");

但我还是想知道是否可以直接调用UDF。


编辑:

顺便说一句,可以直接调用 udf 例如:

df.where(callUdf("equals", scala.collection.JavaConversions.asScalaBuffer(
                        Arrays.asList(col("columnName"), col("otherColumnName"))
                    ).seq())).select("columnName");

需要导入org.​apache.​spark.​sql.​函数。

【讨论】:

这很有趣,与 scala API 的细微差别!【参考方案2】:

查询数据框时,您应该能够使用以下方式执行 UDF:

sourceDf.filter(equals(col("columnName"), "someString")).select("columnName")

其中 col("columnName") 是您要比较的列。

【讨论】:

我认为它应该像你描述的那样工作,但它不起作用。我得到了这个异常 java.lang.RuntimeException: Uncompilable source code - Erroneous tree type: 好的 ^^ 是我的 NetBeans 中的错误...您的解决方案不起作用。这是因为:method equals in class Object cannot be applied to given types; | required: Object | found: Column,String | reason: actual and formal argument lists differ in length 也调用 equals.call(col("columnName"), "someString") 不是解决方案,因为 call() 需要 String,String 作为参数,而 col() 返回 Column。有人对如何处理 UDF 有任何建议吗? 嗯,对不起,我不知道,这很奇怪!这适用于 Scala API,但我也无法让它在 Java API 中运行。【参考方案3】:

这是工作代码示例。它适用于 Spark 1.5.x 和 1.6.x。在管道转换器中调用 UDF 的技巧是使用 DataFrame 上的 sqlContext() 来注册您的 UDF

@Test
public void test() 
    // https://issues.apache.org/jira/browse/SPARK-12484
    logger.info("BEGIN");

    DataFrame df = createData();        
    final String tableName = "myTable";
    sqlContext.registerDataFrameAsTable(df, tableName);

    logger.info("print schema");
    df.printSchema();
    logger.info("original data before we applied UDF");
    df.show();

    MyUDF udf = new MyUDF();
    final String udfName = "myUDF";
    sqlContext.udf().register(udfName, udf, DataTypes.StringType);

    String fmt = "SELECT *, %s(%s) as transformedByUDF FROM %s";
    String stmt = String.format(fmt, udfName, tableName+".labelStr", tableName); 
    logger.info("AEDWIP stmt:", stmt);
    DataFrame udfDF = sqlContext.sql(stmt);
    Row[] results = udfDF.head(3);
    for (Row row : results) 
        logger.info("row returned by applying UDF ", row);
    

    logger.info("AEDWIP udfDF schema");
    udfDF.printSchema();
    logger.info("AEDWIP udfDF data");
    udfDF.show();


    logger.info("END");


DataFrame createData() 
    Features f1 = new Features(1, category1);
    Features f2 = new Features(2, category2);
    ArrayList<Features> data = new ArrayList<Features>(2);
    data.add(f1);
    data.add(f2);
    //JavaRDD<Features> rdd = javaSparkContext.parallelize(Arrays.asList(f1, f2));
    JavaRDD<Features> rdd = javaSparkContext.parallelize(data);
    DataFrame df = sqlContext.createDataFrame(rdd, Features.class);
    return df;


class MyUDF implements UDF1<String, String> 
    private static final long serialVersionUID = 1L;

    @Override
    public String call(String s) throws Exception 
        logger.info("AEDWIP s:", s);
        String ret = s.equalsIgnoreCase(category1) ?  category1 : category3;
        return ret;
    


public class Features implements Serializable
    private static final long serialVersionUID = 1L;
    int id;
    String labelStr;

    Features(int id, String l) 
        this.id = id;
        this.labelStr = l;
    

    public int getId() 
        return id;
    

    public void setId(int id) 
        this.id = id;
    

    public String getLabelStr() 
        return labelStr;
    

    public void setLabelStr(String labelStr) 
        this.labelStr = labelStr;
    


this is the output

+---+--------+
| id|labelStr|
+---+--------+
|  1|   noise|
|  2|     ack|
+---+--------+

root
 |-- id: integer (nullable = false)
 |-- labelStr: string (nullable = true)
 |-- transformedByUDF: string (nullable = true)

+---+--------+----------------+
| id|labelStr|transformedByUDF|
+---+--------+----------------+
|  1|   noise|           noise|
|  2|     ack|          signal|
+---+--------+----------------+

【讨论】:

以上是关于Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF的主要内容,如果未能解决你的问题,请参考以下文章

如何将数据从 Spark SQL 导出到 CSV

spark之通过sparksql中的SQL语句实现电影点评系统用户行为分析

Java spark无法从spark sql中的本地文件系统加载文件

如何将具有值的列添加到 Spark Java 中的新数据集?

使用 Java 的 Spark 和 Spark SQL 新手

Spark SQL - 从 oracle 导入时将 oracle 日期数据类型错误转换为时间戳(java.sql)