Apache Flink-在不使用广播状态的情况下更新操作员中的配置

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Flink-在不使用广播状态的情况下更新操作员中的配置相关的知识,希望对你有一定的参考价值。

我们正在使用flink对每个事件进行http调用,这需要将某些数据存储在数据库中。该数据大约每周更新一次。此更新必须交给操作员。

在我们试图保持体系结构中流的数量少并且还因为数据更改不频繁的情况下,是否有任何方法可以在不使用广播流的情况下在操作员内部更新此数据?

答案

可能的选项:

A)您可以简单地将ProcessFunction与计时器配合使用,并每隔X分钟拉动更改。

B)如果状态很小并且重新启动不是很关键:如果不更新数据,则服务器请求很可能失败(403?)。然后,您可以将数据加载到open中,并且在收到403并恢复时使操作员失败。

编辑:

A)如何工作的示例。假设您有

源(记录)-> MyAsyncFunc(输出)->接收器(输出)

我要添加另一个功能

Source(Record)-> ConfFetcher(Tuple2(Record,Conf))-> MyAsyncFunc(Output)-> Sink(Output)

edit2:

正如您在评论中指出的那样,Flink计时器绑定到键控状态。但是,对于此用例,我们根本不需要使用任何Flink计时器,而只需使用Java计时器。

private static class PullConfig<T> extends RichMapFunction<T, Tuple2<T, Conf>> {
    private transient ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
    private transient volatile Conf conf;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        service.scheduleWithFixedDelay(this::pullConfig, 0, 1, TimeUnit.HOURS);
    }

    void pullConfig() {
        conf = ...
    }


    @Override
    public Tuple2<T, Conf> map(T value) throws Exception {
        return new Tuple2(value, conf);
    }
    ...
}

以上是关于Apache Flink-在不使用广播状态的情况下更新操作员中的配置的主要内容,如果未能解决你的问题,请参考以下文章

Flink Broadcast State 实战指南

在不使用定价服务作为推送器的情况下创建 Laravel 广播的最简单方法是啥?

Flink KeyedProcessFunction 与广播状态

如何在 flink 中更新 KeyedBroadcastProcessFunction 中的广播状态?

Python Pandas:如何在不编写辅助函数的情况下使用 apply 广播操作

Flink 的广播状态行为