pyspark:从 pyspark 调用自定义 java 函数。我需要 Java_Gateway 吗?

Posted

技术标签:

【中文标题】pyspark:从 pyspark 调用自定义 java 函数。我需要 Java_Gateway 吗?【英文标题】:pyspark: call a custom java function from pyspark. Do I need Java_Gateway? 【发布时间】:2016-06-11 18:45:24 【问题描述】:

我编写了以下 MyPythonGateway.java 以便可以从 Python 调用我的自定义 java 类:

public class MyPythonGateway 

    public String findMyNum(String input) 
        return MyUtiltity.parse(input).getMyNum(); 
    

    public static void main(String[] args) 
        GatewayServer server = new GatewayServer(new MyPythonGateway());
        server.start();
    

这是我在 Python 代码中使用它的方式:

def main():

    gateway = JavaGateway()                   # connect to the JVM
    myObj = gateway.entry_point.findMyNum("1234 GOOD DAY")
    print(myObj)


if __name__ == '__main__':
    main()

现在我想使用 PySpark 中的 MyPythonGateway.findMyNum() 函数,而不仅仅是一个独立的 python 脚本。我做了以下事情:

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
print(myNum)

但是,我收到以下错误:

... line 43, in main:
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
  File "/home/edamameQ/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.

那么我在这里错过了什么?我不知道在使用 pyspark 时是否应该运行 MyPythonGateway 的单独 JavaApplication 来启动网关服务器。请指教。谢谢!


以下正是我需要的:

input.map(f)

def f(row):
   // call MyUtility.java 
   // x = MyUtility.parse(row).getMyNum()
   // return x

解决此问题的最佳方法是什么?谢谢!

【问题讨论】:

对于input.map(f)input 是什么?如果它不是 RDD/Dataset/Dataframe,那么 Kuttan 下面的内容就可以了。但是如果是RDD,那么他的解决方案就行不通了。 【参考方案1】:

首先,您看到的错误通常意味着您尝试使用的类不可访问。所以很可能是CLASSPATH 问题。

关于总体思路,有两个重要问题:

您无法在操作或转换中访问SparkContext,因此无法使用 PySpark 网关(有关详细信息,请参阅 How to use Java/Scala function from an action or a transformation?)。如果您想从 worker 中使用 Py4J,您必须在每台 worker 机器上启动一个单独的网关。 您真的不想以这种方式在 Python 和 JVM 之间传递数据。 Py4J 不是为数据密集型任务而设计的。

【讨论】:

谢谢!基本上,MyUtitlity.java 有点复杂,我们真的不想在 python 中重新编码。有没有办法从 pyspark 作业中调用 MyUtility.java?如果有其他选择,我们不一定需要使用 Py4J ... 嗯,很大程度上取决于您的架构和代码。可能最简单且相对有效的解决方案是将pipe 数据写入Java 代码并读取输出。或者,您可以通过磁盘传递数据(这基本上是 PySpark 驱动程序过去处理事情的方式,尽管我认为现在不再如此。或者也许是这样)。最复杂的解决方案是拥有处理请求的持久(或临时,例如在执行程序的生命周期内)Java 进程。 如何在驱动程序和工作程序中正确注册 jars?然后将 Python 包装器制作成 jar,以便在驱动程序上正确调用? @AlexanderMyltsev 对于您需要driver-class-path 或类似解决方案的驱动程序。对于工作人员,--jars--packages 是一种选择方法,但手动分发和添加到类路径也应该可以。 > 如果你想从 worker 中使用 Py4J,你必须在每台 worker 机器上启动一个单独的网关如何?【参考方案2】:

在开始调用方法之前在 PySpark 中 -

myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")

您必须按如下方式导入 MyPythonGateway java 类

java_import(sparkContext._jvm, "myPackage.MyPythonGateway")
myPythonGateway  = spark.sparkContext._jvm.MyPythonGateway()
myPythonGateway.findMyNum("1234 GOOD DAY")

spark-submit

中使用 --jars 选项指定包含 myPackage.MyPythonGateway 的 jar

【讨论】:

【参考方案3】:

例如,如果input.map(f) 有输入作为 RDD,这可能会起作用,因为您无法访问执行器内部的 JVM 变量(附加到 spark 上下文)以获取 RDD 的映射函数(据我所知,那里有与 pyspark 中的 @transient lazy val 不等效)。

def pythonGatewayIterator(iterator):
    results = []
    jvm = py4j.java_gateway.JavaGateway().jvm
    mygw = jvm.myPackage.MyPythonGateway()
    for value in iterator:
        results.append(mygw.findMyNum(value))
    return results


inputs.mapPartitions(pythonGatewayIterator)

【讨论】:

【参考方案4】:

您需要做的就是编译 jar 并使用 --jars 或 --driver-class-path spark 提交选项添加到 pyspark 类路径。然后使用以下代码访问类和方法-

sc._jvm.com.company.MyClass.func1()

哪里 sc - 火花上下文

使用 Spark 2.3 测试。请记住,您只能从驱动程序而不是执行程序调用 JVM 类方法。

【讨论】:

以上是关于pyspark:从 pyspark 调用自定义 java 函数。我需要 Java_Gateway 吗?的主要内容,如果未能解决你的问题,请参考以下文章

Glue PySpark 作业:调用 o100.pyWriteDynamicFrame 时出错

使用 udf 传递列作为参数将自定义列添加到 pyspark 数据帧

Pyspark 使用自定义函数

在 pyspark 中编写自定义 UDAF

PySpark 函数基于多列数据框创建自定义输出

pyspark 数据框中的自定义排序