pyspark:从 pyspark 调用自定义 java 函数。我需要 Java_Gateway 吗?
Posted
技术标签:
【中文标题】pyspark:从 pyspark 调用自定义 java 函数。我需要 Java_Gateway 吗?【英文标题】:pyspark: call a custom java function from pyspark. Do I need Java_Gateway? 【发布时间】:2016-06-11 18:45:24 【问题描述】:我编写了以下 MyPythonGateway.java 以便可以从 Python 调用我的自定义 java 类:
public class MyPythonGateway
public String findMyNum(String input)
return MyUtiltity.parse(input).getMyNum();
public static void main(String[] args)
GatewayServer server = new GatewayServer(new MyPythonGateway());
server.start();
这是我在 Python 代码中使用它的方式:
def main():
gateway = JavaGateway() # connect to the JVM
myObj = gateway.entry_point.findMyNum("1234 GOOD DAY")
print(myObj)
if __name__ == '__main__':
main()
现在我想使用 PySpark 中的 MyPythonGateway.findMyNum()
函数,而不仅仅是一个独立的 python 脚本。我做了以下事情:
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
print(myNum)
但是,我收到以下错误:
... line 43, in main:
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
File "/home/edamameQ/spark-1.5.2/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 726, in __getattr__
py4j.protocol.Py4JError: Trying to call a package.
那么我在这里错过了什么?我不知道在使用 pyspark 时是否应该运行 MyPythonGateway 的单独 JavaApplication 来启动网关服务器。请指教。谢谢!
以下正是我需要的:
input.map(f)
def f(row):
// call MyUtility.java
// x = MyUtility.parse(row).getMyNum()
// return x
解决此问题的最佳方法是什么?谢谢!
【问题讨论】:
对于input.map(f)
,input
是什么?如果它不是 RDD/Dataset/Dataframe,那么 Kuttan 下面的内容就可以了。但是如果是RDD,那么他的解决方案就行不通了。
【参考方案1】:
首先,您看到的错误通常意味着您尝试使用的类不可访问。所以很可能是CLASSPATH
问题。
关于总体思路,有两个重要问题:
您无法在操作或转换中访问SparkContext
,因此无法使用 PySpark 网关(有关详细信息,请参阅 How to use Java/Scala function from an action or a transformation?)。如果您想从 worker 中使用 Py4J,您必须在每台 worker 机器上启动一个单独的网关。
您真的不想以这种方式在 Python 和 JVM 之间传递数据。 Py4J 不是为数据密集型任务而设计的。
【讨论】:
谢谢!基本上,MyUtitlity.java 有点复杂,我们真的不想在 python 中重新编码。有没有办法从 pyspark 作业中调用 MyUtility.java?如果有其他选择,我们不一定需要使用 Py4J ... 嗯,很大程度上取决于您的架构和代码。可能最简单且相对有效的解决方案是将pipe
数据写入Java 代码并读取输出。或者,您可以通过磁盘传递数据(这基本上是 PySpark 驱动程序过去处理事情的方式,尽管我认为现在不再如此。或者也许是这样)。最复杂的解决方案是拥有处理请求的持久(或临时,例如在执行程序的生命周期内)Java 进程。
如何在驱动程序和工作程序中正确注册 jars?然后将 Python 包装器制作成 jar,以便在驱动程序上正确调用?
@AlexanderMyltsev 对于您需要driver-class-path
或类似解决方案的驱动程序。对于工作人员,--jars
或 --packages
是一种选择方法,但手动分发和添加到类路径也应该可以。
> 如果你想从 worker 中使用 Py4J,你必须在每台 worker 机器上启动一个单独的网关如何?【参考方案2】:
在开始调用方法之前在 PySpark 中 -
myNum = sparkcontext._jvm.myPackage.MyPythonGateway.findMyNum("1234 GOOD DAY")
您必须按如下方式导入 MyPythonGateway java 类
java_import(sparkContext._jvm, "myPackage.MyPythonGateway")
myPythonGateway = spark.sparkContext._jvm.MyPythonGateway()
myPythonGateway.findMyNum("1234 GOOD DAY")
在 spark-submit
中使用 --jars 选项指定包含 myPackage.MyPythonGateway 的 jar【讨论】:
【参考方案3】:例如,如果input.map(f)
有输入作为 RDD,这可能会起作用,因为您无法访问执行器内部的 JVM 变量(附加到 spark 上下文)以获取 RDD 的映射函数(据我所知,那里有与 pyspark 中的 @transient lazy val
不等效)。
def pythonGatewayIterator(iterator):
results = []
jvm = py4j.java_gateway.JavaGateway().jvm
mygw = jvm.myPackage.MyPythonGateway()
for value in iterator:
results.append(mygw.findMyNum(value))
return results
inputs.mapPartitions(pythonGatewayIterator)
【讨论】:
【参考方案4】:您需要做的就是编译 jar 并使用 --jars 或 --driver-class-path spark 提交选项添加到 pyspark 类路径。然后使用以下代码访问类和方法-
sc._jvm.com.company.MyClass.func1()
哪里 sc - 火花上下文
使用 Spark 2.3 测试。请记住,您只能从驱动程序而不是执行程序调用 JVM 类方法。
【讨论】:
以上是关于pyspark:从 pyspark 调用自定义 java 函数。我需要 Java_Gateway 吗?的主要内容,如果未能解决你的问题,请参考以下文章
Glue PySpark 作业:调用 o100.pyWriteDynamicFrame 时出错