Apache-Flink 1.11 无法在 SQL 函数 DDL 中使用 Python UDF

Posted

技术标签:

【中文标题】Apache-Flink 1.11 无法在 SQL 函数 DDL 中使用 Python UDF【英文标题】:Apache-Flink 1.11 Unable to use Python UDF in SQL Function DDL 【发布时间】:2020-07-09 19:39:11 【问题描述】:

根据这个合流页面:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL

python udf 在 Flink 1.11 中可用于 SQL 函数。

我在这里查看了 flink 文档:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html

在终端上试试这个并使用以下参数启动 sql-client.sh

$ sql-client.sh embedded --pyExecutable /Users/jonathanfigueroa/opt/anaconda3/bin/python --pyFiles /Users/jonathanfigueroa/Desktop/pyflink/inference/test1.py

然后:

> Create Temporary System Function func1 as 'test1.func1' Language PYTHON;
[INFO] Function has been created. 

当我尝试时:

> Select func1(str) From (VALUES ("Name1", "Name2", "Name3"));
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function 'test1.func1' failed.

我尝试在每个组合 .zip, .py 中使用:-pyarch,--pyArchives, -pyexec,--pyExecutable, -pyfs,--pyFiles,结果总是相同。

顺便说一句,我的 python 文件如下所示:

def func1(s):
    return s;

我有什么遗漏的吗?

亲切的问候,

乔纳森

【问题讨论】:

【参考方案1】:

python UDF 应该被 pyflink.table.udf 中的“udf”装饰器包裹,像这样:

from pyflink.table.types import DataTypes
from pyflink.table.udf import udf

@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a):
    return a + 1

并且启动sql-client时需要加载flink-python jar,如下:

$ cd $FLINK_HOME/bin
$ ./start-cluster.sh
$ ./sql-client.sh embedded -pyfs xxx.py -j ../opt/flink-python_2.11-1.11.0.jar

另外,需要将taskmanager.memory.task.off-heap.size: 79mb添加到$FLINK_HOME/conf/flink-conf.yaml或其他可用于设置配置的文件(如sql客户端环境文件),否则执行python udf会报错:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key'taskmanager.memory .task.off-heap.size'.

最好的, 魏

【讨论】:

以上是关于Apache-Flink 1.11 无法在 SQL 函数 DDL 中使用 Python UDF的主要内容,如果未能解决你的问题,请参考以下文章

Apache-Flink深度解析-State

识别 Apache-Flink 中哪个对象不可序列化

Apache-Flink深度解析-概述

Apache-Flink深度解析-TableAPI

(1.11)SQL优化——mysql提示(hint)

Flink SQL 1.11新功能详解:Hive 数仓实时化 & Flink SQL + CDC 实践