如何将复杂的外部变量(例如映射值)从 Spark 与 Java 中的驱动程序传递给 UDF?

Posted

技术标签:

【中文标题】如何将复杂的外部变量(例如映射值)从 Spark 与 Java 中的驱动程序传递给 UDF?【英文标题】:How can I pass a complex external variable, say a map's value to an UDF from the driver program in Spark with Java? 【发布时间】:2020-03-05 11:50:46 【问题描述】:

当我需要将 Java 哈希映射传递给 UDF 时,我遇到了一个大问题,UDF 本身被定义为一个单独的类,而不是一些内联 lambda 函数,它可以访问定义为广播变量的封闭范围的变量。我也出于这个目的在这里提出了这个问题:

How do I pass Spark broadcast variable to a UDF in Java?

没有提供令人满意的答案,因为人们只向我提供了包含简单 UDF 的答案,这些 UDF 可以定义为小 lambda,因此可以从驱动程序访问广播变量。

然后我开始研究 typedlits,正如我在另一个问题中所详述的那样,在我看来,这似乎是前进的方向,但在 Java 中这种方法几乎没有任何文档形式存在,尽管存在示例和教程在 Scala 中也是如此。因此,我的问题是如何使用 typedlit 将复杂变量的值传递给 UDF?

【问题讨论】:

【参考方案1】:

我通过漫长而艰难的道路找到了这个问题的答案,并将其发布在这里,以帮助可能面临同样问题的其他人。

官方的 Spark Javadocs here 给出 typedLit 方法定义如下:

typedLit(T literal, scala.reflect.api.TypeTags.TypeTag<T> evidence$1)

在Java中如何使用这种方法几乎无处可寻,最后,我偶然发现了这个问题:

How to get the TypeTag for a class in Java

在这里,我们了解了如何为我们想要发送到 UDF 的所需类创建自定义 Scala 对象。使用答案,我为 Scala Map 创建了自定义 Scala 对象:

import scala.reflect.runtime.universe._
import scala.collection.convert._
object TypeTags 
  val MapString = typeTag[scala.collection.Map[String, String]]

为了在我的 Java Maven 项目中使用这个对象,我遵循了这个博客给出的结构:

https://dzone.com/articles/scala-in-java-maven-project

我必须在我的 pom 中包含的依赖项如下:

<dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>2.11.7</version>
    </dependency>

但是,pom 中存在的 Scala 生命周期标签并没有为我编译。这是最初的 pom sn-p:

<plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <executions>
            <execution>
                <id>scala-compile-first</id>
                <phase>process-resources</phase>
                <goals>
                    <goal>add-source</goal>
                    <goal>compile</goal>
                </goals>
            </execution>
            <execution>
                <id>scala-test-compile</id>
                <phase>process-test-resources</phase>
                <goals>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
    </plugin>

然后我发现这个问题包含一组单独的生命周期标签:

My mixed Scala/Java Maven project doesn't compile

另外,我下载了以下链接给出的混合 Java/Scala 项目:

https://github.com/danyaljj/sampleMixedScalaJavaMavenProject/blob/master/pom.xml

这个项目的 pom 终于为我工作了,由于生命周期标签,我可以提前解决编译问题。新的pom sn -p如下:

      <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <id>compile</id>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                    <phase>compile</phase>
                </execution>
                <execution>
                    <id>test-compile</id>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                    <phase>test-compile</phase>
                </execution>
                <execution>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>compile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

在我尝试使用我在主类中定义的 TypeTag 时出现了很多编译错误。最后,我将这个问题的答案用于我的目的:

Convert Java Map to Scala Map

首先,我必须在我的主类中手动导入我在文件 TypeTags.scala 中定义的 Scala 对象:

import com.esrx.dqm.datasync.TypeTags$;

我定义了一个虚拟地图发送到我的 UDF:

 Map<String, String> testMap = new HashMap<>();
 testMap.put("1", "One");

然后我将 hashmap 转换为 Scala map:

List<Tuple2<String, String>> tuples = testMap.entrySet().stream()
            .map(e -> Tuple2.apply(e.getKey(), e.getValue()))
            .collect(Collectors.toList());

scala.collection.Map scalaMap = scala.collection.Map$.MODULE$.apply(JavaConversions.asScalaBuffer(tuples).toSeq());

然后我将地图发送到我之前定义的 UDF:

TypeTags$ type = TypeTags$.MODULE$;
data = data.withColumn("broadcast", functions.callUDF("TestUDF", functions.typedLit(scalaMap, type.MapString())));

我无法将 MapString val 发送到 UDF,因为编译器总是抱怨它在 TypeDefs 中具有私有访问权限。从链接here 中,我发现在Java 中,val 是通过getter 之类的方法调用来访问的,而不是直接访问val 本身。

我定义的TestUDF如下:

public class TestUDF implements UDF1<scala.collection.immutable.Map<String, String>,String> 

@Override
public String call(scala.collection.immutable.Map<String, String> t1) throws Exception 
    // TODO Auto-generated method stub
    System.out.println(t1);
    AsJava<Map<String, String>> asJavaMap = JavaConverters.mapAsJavaMapConverter(t1);
    Map<String, String> javaMap = asJavaMap.asJava();
    System.out.println("Value of 1: " + javaMap.get("1"));      
    return null;

这终于奏效了,我可以从我的 UDF 访问地图。

【讨论】:

这对我帮助很大,谢谢。但是我遇到了 scala 版本不兼容 Map coz 的问题。 UDF 收到 immutable 映射,但定义的 scalaMapmutable,所以我认为将来人们会遇到问题,请考虑场景

以上是关于如何将复杂的外部变量(例如映射值)从 Spark 与 Java 中的驱动程序传递给 UDF?的主要内容,如果未能解决你的问题,请参考以下文章

Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?

如何将变量从外部 JavaScript 传递到 HTML 表单

在 Spark 结构化流中,我如何将完整的聚合输出到外部源,如 REST 服务

spark之从外部文件获取广播变量

spark之从外部文件获取广播变量

使用Spark中的复杂条件和滞后自引用创建新列