来自 Java Runnable 的 Py4J 回调

Posted

技术标签:

【中文标题】来自 Java Runnable 的 Py4J 回调【英文标题】:Py4J callback from Java Runnable 【发布时间】:2018-05-16 09:21:09 【问题描述】:

我目前正在尝试使用 Py4J 执行以下操作:

在 Python 中定义一个调用 JVM 方法的方法(“执行程序”) 定义一个实现 JVM 接口的 Python(“回调”)对象 给定这个回调对象构造一个 JVM 对象 在此对象上调用一个方法,该方法将在 Java 中生成一个新线程,在回调对象上调用回调,这将(在 Python 端)执行“执行程序”方法

这是我对 Java 方面的看法:

package javabridge.test;
public interface PythonCallback 
    Object notify(Object source);

package javabridge.test;
public class ScheduledRunnable implements Runnable 
    private PythonCallback callback;
    public ScheduledRunnable(PythonCallback callback) 
        this.callback = callback;
    
    @Override
    public void run() 
        System.out.println("[ScheduledRunnable] run -> notify");
        this.callback.notify(this);
    

package javabridge.test;
import py4j.GatewayServer;
public class Test 
    private PythonCallback callback;
    public Test(PythonCallback callback) 
        this.callback = callback;
    
    public void runSynchronous() 
        System.out.println("[runSynchronous] run -> notify");
        this.callback.notify(this);
    
    public void runAsynchronous() 
        System.out.println("[runAsynchronous] run -> spawn thread");
        ScheduledRunnable runnable = new ScheduledRunnable(callback);
        Thread t = new Thread(runnable);
        t.start();
    
    public static void main(String[] args) 
        GatewayServer server = new GatewayServer();
        server.start(true);
       

在 Python 方面,我有以下脚本:

from py4j.java_gateway import JavaGateway, java_import, get_field, CallbackServerParameters
from py4j.clientserver import ClientServer, JavaParameters, PythonParameters

gateway = JavaGateway(callback_server_parameters=CallbackServerParameters())
#gateway = ClientServer(java_parameters=JavaParameters(), python_parameters=PythonParameters())

java_import(gateway.jvm, 'javabridge.test.*')

class PythonCallbackImpl(object):
    def __init__(self, execfunc):
        self.execfunc = execfunc
    def notify(self, obj):
        print('[PythonCallbackImpl] notified from Java')
        self.execfunc()
        return 'dummy return value'
    class Java:
        implements = ["javabridge.test.PythonCallback"]

def simple_fun():
    print('[simple_fun] called')
    gateway.jvm.System.out.println("[simple_fun] Hello from python!")

# Test 1: Without threading
input('Ready to begin test 1')
python_callback = PythonCallbackImpl(simple_fun)
nothread_executor = gateway.jvm.Test(python_callback)
nothread_executor.runSynchronous()

# Test 2: With threading
input('Ready to begin test 2')
python_callback = PythonCallbackImpl(simple_fun)
nothread_executor = gateway.jvm.Test(python_callback)
nothread_executor.runAsynchronous()

gateway.shutdown()

这是尝试执行此脚本时发生的情况。首先,使用gateway = ClientServer(java_parameters=JavaParameters(), python_parameters=PythonParameters()),两个测试都失败了:

Test 1:

py4j.protocol.Py4JJavaError: An error occurred while calling o0.runSynchronous.
: py4j.Py4JException: Command Part is Empty or is the End of Command Part
        at py4j.Protocol.getObject(Protocol.java:277)
        at py4j.Protocol.getReturnValue(Protocol.java:458)

Test 2:

Exception in thread "Thread-4" py4j.Py4JException: Error while obtaining a new communication channel
        at py4j.CallbackClient.getConnectionLock(CallbackClient.java:218)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:337)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:316)

但是,如果我注释掉 self.execfunc() 行,测试 1 可以正常工作而不会引发错误。然而,测试 2 仍然失败:

Exception in thread "Thread-5" py4j.Py4JException: Error while sending a command.
        at py4j.CallbackClient.sendCommand(CallbackClient.java:357)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:316)

现在切换到gateway = JavaGateway(callback_server_parameters=CallbackServerParameters())。当我将 self.execfunc() 注释掉时,测试 2 在这里仍然失败:

Exception in thread "Thread-5" py4j.Py4JException: Error while sending a command.
        at py4j.CallbackClient.sendCommand(CallbackClient.java:357)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:316)

但至少测试 1 在启用 self.execfunc() 的情况下确实有效。

我的问题是:如何在 self.execfunc() 调用中使用线程方法? Py4J 可以做到这一点吗?

编辑:为了让事情变得更加棘手,self.execfunc() 调用的 Java 命令应该在调用 .notify() 的同一 Java 线程中运行。

【问题讨论】:

【参考方案1】:

解决了。结果很简单:

    在 Python 端和 Java 端使用 ClientServer! 不要调用 gateway.shutdown(),因为这会在收到回调之前断开 Python 的连接(呃!)

Java 将巧妙地遵循预期的线程模型,即接收 Python 回调调用的 Java 命令在执行回调的同一 Java 线程中执行。

通过一个简单的 Python 函数,可以添加一个 shutdown_when_done 方法,该方法会等到所有回调都返回后再退出。

【讨论】:

有机会,您是否使用此方法连接到 Apache Spark 执行程序? (pySpark) 不,尽管大多数帖子和问题确实出现在 pySpark 的上下文中,这似乎是 Py4J 的主要用户。

以上是关于来自 Java Runnable 的 Py4J 回调的主要内容,如果未能解决你的问题,请参考以下文章

py4j.protocol.Py4JNetworkError : 尝试连接到 Java 服务器时出错

如何停止 Python 运行 Py4J ClientServer

Java InputStream 到 Python (PY4J)

py4j:如何从 Python 启动 java 网关

如何使用 PY4J 从 python 调用 java

使用 py4j 将矩阵作为 int[][] 数组从 Python 发送到 Java