Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF
Posted
技术标签:
【中文标题】Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF【英文标题】:Spark SQL: How to call UDF from DataFrame operation using JAVA 【发布时间】:2015-11-20 14:45:58 【问题描述】:我想知道如何使用 JAVA 从 Spark SQL 中的领域特定语言 (DSL) 函数调用 UDF 函数。
我有UDF函数(仅举例):
UDF2 equals = new UDF2<String, String, Boolean>()
@Override
public Boolean call(String first, String second) throws Exception
return first.equals(second);
;
我已经将它注册到 sqlContext
sqlContext.udf().register("equals", equals, DataTypes.BooleanType);
当我运行以下查询时,我的 UDF 被调用并得到一个结果。
sqlContext.sql("SELECT p0.value FROM values p0 WHERE equals(p0.value, 'someString')");
我会在 Spark SQL 中使用领域特定语言的函数转换此查询,但我不知道该怎么做。
valuesDF.select("value").where(???);
我发现存在 callUDF() 函数,其中一个参数是 Function2 fnctn 但不是 UDF2。 如何使用 DSL 中的 UDF 和函数?
【问题讨论】:
【参考方案1】:我找到了一个我满意的解决方案。 可以将 UDF 称为列条件,例如:
valuesDF.filter("equals(columnName, 'someString')").select("columnName");
但我还是想知道是否可以直接调用UDF。
编辑:
顺便说一句,可以直接调用 udf 例如:
df.where(callUdf("equals", scala.collection.JavaConversions.asScalaBuffer(
Arrays.asList(col("columnName"), col("otherColumnName"))
).seq())).select("columnName");
需要导入org.apache.spark.sql.函数。
【讨论】:
这很有趣,与 scala API 的细微差别!【参考方案2】:查询数据框时,您应该能够使用以下方式执行 UDF:
sourceDf.filter(equals(col("columnName"), "someString")).select("columnName")
其中 col("columnName") 是您要比较的列。
【讨论】:
我认为它应该像你描述的那样工作,但它不起作用。我得到了这个异常 java.lang.RuntimeException: Uncompilable source code - Erroneous tree type: 好的 ^^ 是我的 NetBeans 中的错误...您的解决方案不起作用。这是因为:method equals in class Object cannot be applied to given types; | required: Object | found: Column,String | reason: actual and formal argument lists differ in length
也调用 equals.call(col("columnName"), "someString") 不是解决方案,因为 call() 需要 String,String 作为参数,而 col() 返回 Column。有人对如何处理 UDF 有任何建议吗?
嗯,对不起,我不知道,这很奇怪!这适用于 Scala API,但我也无法让它在 Java API 中运行。【参考方案3】:
这是工作代码示例。它适用于 Spark 1.5.x 和 1.6.x。在管道转换器中调用 UDF 的技巧是使用 DataFrame 上的 sqlContext() 来注册您的 UDF
@Test
public void test()
// https://issues.apache.org/jira/browse/SPARK-12484
logger.info("BEGIN");
DataFrame df = createData();
final String tableName = "myTable";
sqlContext.registerDataFrameAsTable(df, tableName);
logger.info("print schema");
df.printSchema();
logger.info("original data before we applied UDF");
df.show();
MyUDF udf = new MyUDF();
final String udfName = "myUDF";
sqlContext.udf().register(udfName, udf, DataTypes.StringType);
String fmt = "SELECT *, %s(%s) as transformedByUDF FROM %s";
String stmt = String.format(fmt, udfName, tableName+".labelStr", tableName);
logger.info("AEDWIP stmt:", stmt);
DataFrame udfDF = sqlContext.sql(stmt);
Row[] results = udfDF.head(3);
for (Row row : results)
logger.info("row returned by applying UDF ", row);
logger.info("AEDWIP udfDF schema");
udfDF.printSchema();
logger.info("AEDWIP udfDF data");
udfDF.show();
logger.info("END");
DataFrame createData()
Features f1 = new Features(1, category1);
Features f2 = new Features(2, category2);
ArrayList<Features> data = new ArrayList<Features>(2);
data.add(f1);
data.add(f2);
//JavaRDD<Features> rdd = javaSparkContext.parallelize(Arrays.asList(f1, f2));
JavaRDD<Features> rdd = javaSparkContext.parallelize(data);
DataFrame df = sqlContext.createDataFrame(rdd, Features.class);
return df;
class MyUDF implements UDF1<String, String>
private static final long serialVersionUID = 1L;
@Override
public String call(String s) throws Exception
logger.info("AEDWIP s:", s);
String ret = s.equalsIgnoreCase(category1) ? category1 : category3;
return ret;
public class Features implements Serializable
private static final long serialVersionUID = 1L;
int id;
String labelStr;
Features(int id, String l)
this.id = id;
this.labelStr = l;
public int getId()
return id;
public void setId(int id)
this.id = id;
public String getLabelStr()
return labelStr;
public void setLabelStr(String labelStr)
this.labelStr = labelStr;
this is the output
+---+--------+
| id|labelStr|
+---+--------+
| 1| noise|
| 2| ack|
+---+--------+
root
|-- id: integer (nullable = false)
|-- labelStr: string (nullable = true)
|-- transformedByUDF: string (nullable = true)
+---+--------+----------------+
| id|labelStr|transformedByUDF|
+---+--------+----------------+
| 1| noise| noise|
| 2| ack| signal|
+---+--------+----------------+
【讨论】:
以上是关于Spark SQL:如何使用 JAVA 从 DataFrame 操作中调用 UDF的主要内容,如果未能解决你的问题,请参考以下文章
spark之通过sparksql中的SQL语句实现电影点评系统用户行为分析
Java spark无法从spark sql中的本地文件系统加载文件
如何将具有值的列添加到 Spark Java 中的新数据集?