如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?

Posted

技术标签:

【中文标题】如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?【英文标题】:How to pass complex Java Class Object as parameter to Scala UDF in Spark? 【发布时间】:2020-09-27 07:26:07 【问题描述】:

我有一个响应 API 调用的 Java 客户端类(用作 spark-shell 的依赖 Jar) - 让我们调用类 SomeAPIRequester

在纯 Java 中,它会使用以下示例代码返回我想要的结果 -

SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123"))  // result: "id123": "item123"

我想通过存储在 spark 数据帧(在 scala 中)中的 ID 的 RDD 以分布式方式调用此 API -

val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...))  // sample RDD of IDs i want to call the API for

我将我的 UDF 定义为 -

val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => 
   requester.getSomeItem(id)
)

并将此 UDF 称为 -

inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester)  // requester as built with SomeAPIRequester.builder()....

// or directly with RDD ? udf, or a plain scala function .. 
inputIdRdd.foreach id => test(id, requester) 

当我在结果上运行.show().take() 时,我在请求者java 类上得到NullPointerException

我还尝试发送文字 (lit),并在 scala 中阅读了有关 typedLit 的信息,但我无法将 Java Requester 类转换为 scala 中任何允许的 typedLit 类型。

有没有办法通过 UDF 调用这个 Java 类对象并从 API 中获取结果?

编辑:

我还尝试在 RDD 的 foreach 块中初始化请求者类 -

inputIdRdd.foreach(x =>
  val apiRequester = SomeAPIRequester.builder()...(argPool).build()

  try 
    apiRequester.getSomeItem(x)
   catch 
    case ex: Exception => println(ex.printStackTrace()); ""
  
)

但这不会返回任何响应 - 无法初始化类等。

谢谢!

【问题讨论】:

您可能需要发布代码的整个结构。很难猜测在哪里声明了什么以及何时使用什么。此外,您正在使用带有 RDD 的 UDF,这很奇怪。 无论哪种方式,df.withColumn("newCol", udf(lit(x), requester)) 也会引发与我的rdd.foreachx => udf(x, requester) 类似的错误......对于我定义的 UDF。用更多细节更新我的问题.. 【参考方案1】:

使用与 Spark 一起使用的自定义类需要了解 Spark 在后台的工作原理。不要将您的实例作为参数放在 udf 中。 udfs中的参数是从dataframe的行中提取的,这种情况下空指针异常是可以理解的。您可以尝试以下选项:

    先把实例放到udf的范围内:

    val requester: SomeAPIRequester = ???
    
    val test: UserDefinedFunction = udf((id: String) => 
         requester.getSomeItem(id)
    )
    

此时,如果可能,您需要将您的类标记为 Serializable,否则您将遇到 NotSerializableException。

    如果您的类不是 Seriazable,因为它来自第三方,您可以将您的实例标记为 lazy transient val,如您在 https://mengdong.github.io/2016/08/16/spark-serialization-memo/ 或 https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d 中看到的那样。

    如果您在 RDD 领域工作,您可以使用 mapPartitions 为每个分区创建一个实例。

【讨论】:

感谢这些建议,让我试试这个! 编辑:这就像一个魅力!感谢mapPartitions 的想法!

以上是关于如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 unique_ptr 对象作为参数传递给作为库的类

如何将子类作为期望基类的函数的参数传递,然后将该对象传递给指向这些抽象类对象的指针向量?

C++:如何将变量作为参数传递给类构造函数

将 shared_ptr 作为参数传递给接受类类型对象的函数

C ++,将对象作为参数传递给另一个对象构造函数

当我们将对象作为参数传递给方法时,为啥会调用复制构造函数?