Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF

Posted

技术标签:

【中文标题】Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF【英文标题】:Apache-Flink 1.11 Unable to use Python UDF through SQL Function DDL in Java Flink Streamming Job 【发布时间】:2020-08-03 21:49:42 【问题描述】:

Flip-106 中有一个示例,说明如何通过 SQL Function DDL 在批处理作业 java 应用程序中调用用户定义的 python 函数...

BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");

tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();

我一直在尝试在流式作业 java 应用程序中重现相同的示例,这是我的代码:

final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");

fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
Table table = fsTableEnv.fromValues("1", "2", "3").as("str").select("func1(str)");
/* Missing line */

对于批处理作业中的这一特定行:

tEnv.toDataSet(table, String.class).collect();

我还没有找到流式作业的等效项

1.你能帮我把这个翻转 106 的例子从批处理映射到流吗?

我最终想要的是使用 flink 1.11 在流式作业 java flink 应用程序中调用 python 函数,如下所示:

final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");

fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
final Table table = fsTableEnv.fromDataStream(stream_filtered.map(x->x.idsUmid)).select("func1(f0)").as("umid");
System.out.println("Result --> " + table.select($("umid")) + " --> End of Result");

并使用该 udf 的结果进行进一步处理(不一定在控制台中打印)

我已经编辑了test.py 文件,以查看是否至少不管未命名的表在 python 中是否正在执行某些操作。

from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
from os import getcwd

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def func1(line):
    print(line)
    print(getcwd())
    with open("test.txt", "a") as myfile:
        myfile.write(line)
    return line

没有打印任何内容,没有创建 test.txt 文件,也没有将值返回到流式作业。所以基本上这个python函数没有被调用。

2。我在这里缺少什么?

感谢 David、Wei 和 Xingbo 迄今为止的支持,因为建议的每一个细节都对我有用。

最好的问候,

乔纳森

【问题讨论】:

【参考方案1】:

你可以试试这个:

final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");

// You need to specify the python interpreter used to run the python udf on cluster.
// I assume this is a local program so it is the same as the "python.client.executable".
fsTableEnv.getConfig().getConfiguration().setString("python.executable", "/Users/jf/opt/anaconda3/bin/python");

fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
final Table table = fsTableEnv.fromDataStream(stream_filtered.map(x->x.idsUmid)).select("func1(f0)").as("umid");

// 'table.select($("umid"))' will not trigger job execution. You need to call the "execute()" method explicitly.
table.execute().print();

【讨论】:

先生。钟……谢谢你的回复……很有帮助!让我问你点别的。如果我们将这段代码放入一个方法中,因为所有这些都是 java flink 应用程序的一部分,而不是表 API java flink 应用程序的一部分,所以数据是从源头消费的,但没有任何反应。另一方面,如果我们使用 tableEnv.fromValues() 函数并在每次插入一行时调用 table.execute() 就会创建一个新作业。一种解决方案不起作用,另一种解决方案取决于为每个新的 umid 创建一个新工作。结合 DataStream 和 Table API 的最佳方式是什么?

以上是关于Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF的主要内容,如果未能解决你的问题,请参考以下文章

Apache-Flink深度解析-State

识别 Apache-Flink 中哪个对象不可序列化

Apache-Flink深度解析-概述

Apache-Flink深度解析-TableAPI

Django 1.11 - 无法在管理面板中编辑我的模型

Django 1.11 使用命令makemigrations命令无法执行表修改动作