如何将复杂的外部变量(例如映射值)从 Spark 与 Java 中的驱动程序传递给 UDF?
Posted
技术标签:
【中文标题】如何将复杂的外部变量(例如映射值)从 Spark 与 Java 中的驱动程序传递给 UDF?【英文标题】:How can I pass a complex external variable, say a map's value to an UDF from the driver program in Spark with Java? 【发布时间】:2020-03-05 11:50:46 【问题描述】:当我需要将 Java 哈希映射传递给 UDF 时,我遇到了一个大问题,UDF 本身被定义为一个单独的类,而不是一些内联 lambda 函数,它可以访问定义为广播变量的封闭范围的变量。我也出于这个目的在这里提出了这个问题:
How do I pass Spark broadcast variable to a UDF in Java?
没有提供令人满意的答案,因为人们只向我提供了包含简单 UDF 的答案,这些 UDF 可以定义为小 lambda,因此可以从驱动程序访问广播变量。
然后我开始研究 typedlits,正如我在另一个问题中所详述的那样,在我看来,这似乎是前进的方向,但在 Java 中这种方法几乎没有任何文档形式存在,尽管存在示例和教程在 Scala 中也是如此。因此,我的问题是如何使用 typedlit 将复杂变量的值传递给 UDF?
【问题讨论】:
【参考方案1】:我通过漫长而艰难的道路找到了这个问题的答案,并将其发布在这里,以帮助可能面临同样问题的其他人。
官方的 Spark Javadocs here 给出 typedLit 方法定义如下:
typedLit(T literal, scala.reflect.api.TypeTags.TypeTag<T> evidence$1)
在Java中如何使用这种方法几乎无处可寻,最后,我偶然发现了这个问题:
How to get the TypeTag for a class in Java
在这里,我们了解了如何为我们想要发送到 UDF 的所需类创建自定义 Scala 对象。使用答案,我为 Scala Map 创建了自定义 Scala 对象:
import scala.reflect.runtime.universe._
import scala.collection.convert._
object TypeTags
val MapString = typeTag[scala.collection.Map[String, String]]
为了在我的 Java Maven 项目中使用这个对象,我遵循了这个博客给出的结构:
https://dzone.com/articles/scala-in-java-maven-project
我必须在我的 pom 中包含的依赖项如下:
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.7</version>
</dependency>
但是,pom 中存在的 Scala 生命周期标签并没有为我编译。这是最初的 pom sn-p:
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
然后我发现这个问题包含一组单独的生命周期标签:
My mixed Scala/Java Maven project doesn't compile
另外,我下载了以下链接给出的混合 Java/Scala 项目:
https://github.com/danyaljj/sampleMixedScalaJavaMavenProject/blob/master/pom.xml
这个项目的 pom 终于为我工作了,由于生命周期标签,我可以提前解决编译问题。新的pom sn -p如下:
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
在我尝试使用我在主类中定义的 TypeTag 时出现了很多编译错误。最后,我将这个问题的答案用于我的目的:
Convert Java Map to Scala Map
首先,我必须在我的主类中手动导入我在文件 TypeTags.scala 中定义的 Scala 对象:
import com.esrx.dqm.datasync.TypeTags$;
我定义了一个虚拟地图发送到我的 UDF:
Map<String, String> testMap = new HashMap<>();
testMap.put("1", "One");
然后我将 hashmap 转换为 Scala map:
List<Tuple2<String, String>> tuples = testMap.entrySet().stream()
.map(e -> Tuple2.apply(e.getKey(), e.getValue()))
.collect(Collectors.toList());
scala.collection.Map scalaMap = scala.collection.Map$.MODULE$.apply(JavaConversions.asScalaBuffer(tuples).toSeq());
然后我将地图发送到我之前定义的 UDF:
TypeTags$ type = TypeTags$.MODULE$;
data = data.withColumn("broadcast", functions.callUDF("TestUDF", functions.typedLit(scalaMap, type.MapString())));
我无法将 MapString val 发送到 UDF,因为编译器总是抱怨它在 TypeDefs 中具有私有访问权限。从链接here 中,我发现在Java 中,val 是通过getter 之类的方法调用来访问的,而不是直接访问val 本身。
我定义的TestUDF如下:
public class TestUDF implements UDF1<scala.collection.immutable.Map<String, String>,String>
@Override
public String call(scala.collection.immutable.Map<String, String> t1) throws Exception
// TODO Auto-generated method stub
System.out.println(t1);
AsJava<Map<String, String>> asJavaMap = JavaConverters.mapAsJavaMapConverter(t1);
Map<String, String> javaMap = asJavaMap.asJava();
System.out.println("Value of 1: " + javaMap.get("1"));
return null;
这终于奏效了,我可以从我的 UDF 访问地图。
【讨论】:
这对我帮助很大,谢谢。但是我遇到了 scala 版本不兼容 Map coz 的问题。 UDF 收到immutable
映射,但定义的 scalaMap
是 mutable
,所以我认为将来人们会遇到问题,请考虑场景以上是关于如何将复杂的外部变量(例如映射值)从 Spark 与 Java 中的驱动程序传递给 UDF?的主要内容,如果未能解决你的问题,请参考以下文章
Spark - 不可序列化的任务:如何使用调用外部类/对象的复杂地图闭包?
如何将变量从外部 JavaScript 传递到 HTML 表单