如何在 Java 中创建一个接受字符串数组的 Spark UDF?

Posted

技术标签:

【中文标题】如何在 Java 中创建一个接受字符串数组的 Spark UDF?【英文标题】:How to create a Spark UDF in Java which accepts array of Strings? 【发布时间】:2019-11-25 06:39:09 【问题描述】:

这个问题已经被问到here Scala,它对我没有帮助,因为我正在使用 Java API。我一直在扔所有东西和厨房水槽,所以这是我的方法:

List<String> sourceClasses = new ArrayList<String>();
//Add elements
List<String> targetClasses = new ArrayList<String>();
//Add elements

dataset = dataset.withColumn("Transformer", callUDF(
    "Transformer",
    lit((String[])sourceClasses.toArray())
        .cast(DataTypes.createArrayType(DataTypes.StringType)),
    lit((String[])targetClasses.toArray())
        .cast(DataTypes.createArrayType(DataTypes.StringType))
));

对于我的 UDF 声明:

public class Transformer implements UDF2<Seq<String>, Seq<String>, String> 


//  @SuppressWarnings("deprecation")
public String call(Seq<String> sourceClasses, Seq<String> targetClasses)
    throws Exception 

当我运行代码时,执行不会通过 UDF 调用,这是意料之中的,因为我无法匹配类型。请在这方面帮助我。

编辑

我尝试了@Oli 建议的解决方案。但是,我遇到了以下异常:

org.apache.spark.SparkException: Failed to execute user defined function($anonfun$261: (array<string>, array<string>) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq
at com.esrx.dqm.uuid.UUIDTransformerEngine$1.call(UUIDTransformerEngine.java:1)
at org.apache.spark.sql.UDFRegistration$$anonfun$261.apply(UDFRegistration.scala:774)
... 22 more

这一行似乎特别表明存在问题:

Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to scala.collection.immutable.Seq

【问题讨论】:

您能否补充一下您的问题:stackTrace 和您的数据集的架构? @dnej,我只创建了一个 1 行数据框,引用了这个:***.com/questions/39967194/…。执行只是在 UDF 调用处阻塞。它不会终止,也不会抛出异常。 【参考方案1】:

根据我对您的 UDF 类型的了解,您正在尝试创建一个将两个数组作为输入并返回一个字符串的 UDF。

在 java 中,这有点痛苦,但可以管理。

假设您想连接两个数组并用单词 AND 链接它们。您可以按如下方式定义 UDF:

UDF2 my_udf2 = new UDF2<WrappedArray<String>, WrappedArray<String>, String>() 
    public String call(WrappedArray<String> a1, WrappedArray a2) throws Exception 
        ArrayList<String> l1 = new ArrayList(JavaConverters
            .asJavaCollectionConverter(a1)
            .asJavaCollection());
        ArrayList<String> l2 = new ArrayList(JavaConverters
            .asJavaCollectionConverter(a2)
            .asJavaCollection());
        return l1.stream().collect(Collectors.joining(",")) +
             " AND " +
             l2.stream().collect(Collectors.joining(","));
    
;

请注意,您需要在方法的签名中使用 scala WrappedArray,并在方法的主体中使用 JavaConverters 转换它们,以便能够在 Java 中操作它们。这是以防万一所需的导入。

import scala.collection.mutable.WrappedArray;
import scala.collection.JavaConverters;

然后你就可以注册 UDF 了,它可以和 Spark 一起使用。为了能够使用它,我从“id”列创建了一个示例数据框和两个虚拟数组。请注意,它也可以与 lit 函数一起使用,就像您在问题中尝试做的那样。

spark.udf().register("my_udf2", my_udf2, DataTypes.StringType);

String[] data = "abcd", "efgh", "ijkl";

spark.range(3)
    .withColumn("id", col("id").cast("string"))
    .withColumn("array", functions.array(col("id"), col("id")))
    .withColumn("string_of_arrays",
          functions.callUDF("my_udf2", col("array"), lit(data)))
    .show(false);

产生:

+---+------+----------------------+
|id |array |string_of_arrays      |
+---+------+----------------------+
|0  |[0, 0]|0,0 AND abcd,efgh,ijkl|
|1  |[1, 1]|1,1 AND abcd,efgh,ijkl|
|2  |[2, 2]|2,2 AND abcd,efgh,ijkl|
+---+------+----------------------+

在 Spark >= 2.3 中,您也可以这样做:

UserDefinedFunction my_udf2 = udf(
    (WrappedArray<String> s1, WrappedArray<String> s2) -> "some_string",
    DataTypes.StringType
);

df.select(my_udf2.apply(col("a1"), col("a2")).show(false);

【讨论】:

嗨@Oli,我们可以将实际的Java 数组或列表作为发光参数传递到UDF 中吗?这实际上是我的用例。 我不确定所以我只是试了一下。有用。我用一个例子编辑了答案;) 我尝试了您的解决方案,但得到了我在帖子的编辑中提到的堆栈跟踪异常。 奇怪...代码在 Spark 2.2.1 上对我有用:-/让我用更强大的东西编辑我的帖子。 我使用的是 Spark 2.4.4。

以上是关于如何在 Java 中创建一个接受字符串数组的 Spark UDF?的主要内容,如果未能解决你的问题,请参考以下文章

我不知道如何从字符串、整数或变量中创建一个特定的数组

在 python 中创建字符串数组的最佳方法是啥?

如何在 Java 中创建一个泛型数组?

如何使用多个 for 循环在 php 中创建 sql 语句

如何在Java中创建数组列表

如何在 Typescript 中创建一个空字符串数组?