在 Apache Flink 中注册聚合 UDF

Posted

技术标签:

【中文标题】在 Apache Flink 中注册聚合 UDF【英文标题】:Registering an Aggregate UDF in Apache Flink 【发布时间】:2018-03-15 21:15:06 【问题描述】:

我正在尝试按照here 的步骤创建一个基本的 Flink Aggregate UDF。我已经添加了依赖项()并实现了

public class MyAggregate extends AggregateFunction<Long, TestAgg> ..

我已经实现了强制方法以及其他一些方法:accumulate, merge, etc。所有这些构建都没有错误。现在根据文档,我应该可以将其注册为

    StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment sTableEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
    sTableEnv.registerFunction("MyMin", new MyAggregate());

但是,registerFucntion 似乎只需要 ScalarFunction 作为输入。我收到不兼容的类型错误:The method registerFunction(String, ScalarFunction) in the type TableEnvironment is not applicable for the arguments (String, MyAggregate)

任何帮助都会很棒。

【问题讨论】:

【参考方案1】:

您需要为您选择的语言导入StreamTableEnvironment,在您的情况下为org.apache.flink.table.api.java.StreamTableEnvironment

org.apache.flink.table.api.StreamTableEnvironmentStreamTableEnvironment 的 Java 和 Scala 变体的通用抽象类。我们注意到这部分 API 让用户感到困惑,我们会在未来对其进行改进。

【讨论】:

以上是关于在 Apache Flink 中注册聚合 UDF的主要内容,如果未能解决你的问题,请参考以下文章

spark的udf和udaf的注册

Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF

案例说明flink的udf

Flink UDF

95-910-144-源码-FlinkSQL-Flink的UDF

flink 有状态udf 引起血案一