如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?
Posted
技术标签:
【中文标题】如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?【英文标题】:How to pass complex Java Class Object as parameter to Scala UDF in Spark? 【发布时间】:2020-09-27 07:26:07 【问题描述】:我有一个响应 API 调用的 Java 客户端类(用作 spark-shell
的依赖 Jar) - 让我们调用类 SomeAPIRequester
。
在纯 Java 中,它会使用以下示例代码返回我想要的结果 -
SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123")) // result: "id123": "item123"
我想通过存储在 spark 数据帧(在 scala 中)中的 ID 的 RDD 以分布式方式调用此 API -
val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...)) // sample RDD of IDs i want to call the API for
我将我的 UDF 定义为 -
val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) =>
requester.getSomeItem(id)
)
并将此 UDF 称为 -
inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester) // requester as built with SomeAPIRequester.builder()....
// or directly with RDD ? udf, or a plain scala function ..
inputIdRdd.foreach id => test(id, requester)
当我在结果上运行.show()
或.take()
时,我在请求者java 类上得到NullPointerException
。
我还尝试发送文字 (lit
),并在 scala 中阅读了有关 typedLit
的信息,但我无法将 Java Requester
类转换为 scala 中任何允许的 typedLit
类型。
有没有办法通过 UDF 调用这个 Java 类对象并从 API 中获取结果?
编辑:
我还尝试在 RDD 的 foreach 块中初始化请求者类 -
inputIdRdd.foreach(x =>
val apiRequester = SomeAPIRequester.builder()...(argPool).build()
try
apiRequester.getSomeItem(x)
catch
case ex: Exception => println(ex.printStackTrace()); ""
)
但这不会返回任何响应 - 无法初始化类等。
谢谢!
【问题讨论】:
您可能需要发布代码的整个结构。很难猜测在哪里声明了什么以及何时使用什么。此外,您正在使用带有 RDD 的 UDF,这很奇怪。 无论哪种方式,df.withColumn("newCol", udf(lit(x), requester))
也会引发与我的rdd.foreachx => udf(x, requester)
类似的错误......对于我定义的 UDF。用更多细节更新我的问题..
【参考方案1】:
使用与 Spark 一起使用的自定义类需要了解 Spark 在后台的工作原理。不要将您的实例作为参数放在 udf 中。 udfs中的参数是从dataframe的行中提取的,这种情况下空指针异常是可以理解的。您可以尝试以下选项:
先把实例放到udf的范围内:
val requester: SomeAPIRequester = ???
val test: UserDefinedFunction = udf((id: String) =>
requester.getSomeItem(id)
)
此时,如果可能,您需要将您的类标记为 Serializable,否则您将遇到 NotSerializableException。
如果您的类不是 Seriazable,因为它来自第三方,您可以将您的实例标记为 lazy transient val,如您在 https://mengdong.github.io/2016/08/16/spark-serialization-memo/ 或 https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d 中看到的那样。
如果您在 RDD 领域工作,您可以使用 mapPartitions 为每个分区创建一个实例。
【讨论】:
感谢这些建议,让我试试这个! 编辑:这就像一个魅力!感谢mapPartitions
的想法!以上是关于如何将复杂的 Java 类对象作为参数传递给 Spark 中的 Scala UDF?的主要内容,如果未能解决你的问题,请参考以下文章
如何将子类作为期望基类的函数的参数传递,然后将该对象传递给指向这些抽象类对象的指针向量?