是否可以在流式 flink 作业中创建批处理 flink 作业?

Posted

技术标签:

【中文标题】是否可以在流式 flink 作业中创建批处理 flink 作业?【英文标题】:Is it possible to create a batch flink job in streaming flink job? 【发布时间】:2020-12-21 13:18:12 【问题描述】:

我有一个使用 Scala 使用 Apache Flink(flink 版本:1.8.1)流式传输的作业。流式作业要求如下: Kafka -> 写入 Hbase -> 使用不同的主题再次发送到 kafka

在向 Hbase 写入过程中,需要从另一个表中检索数据。为保证数据不为空(NULL),作业必须反复(一定时间内)检查数据是否为空。

这可以用 Flink 实现吗?如果是,您能否提供与我的需求相似的情况的示例?

编辑: 我的意思是,对于我在内容中描述的问题,我考虑过在作业流中创建某种作业批处理,但我找不到适合我的案例的正确示例。那么,是否可以在流式 flink 作业中创建批量 flink 作业?如果是,您能否提供与我的需求相似的情况的示例?

【问题讨论】:

我认为广播模式适用于您的用例 (ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/…),尽管您必须更改 ruleStateDescriptor 以每 X 分钟更新一次状态。 不清楚您的问题标题(“”)与其余内容的关系。 【参考方案1】:

使用最新版本的 Flink,您可以从 SQL/Table API 对 HBase 进行查找查询(使用可配置的缓存)。您的用例听起来很容易以这种方式实现。请参阅the docs 了解更多信息。

【讨论】:

【参考方案2】:

为了澄清我的评论,我将根据The Broadcast State Pattern 发布我试图提出的建议的草图。该链接提供了一个 Java 示例,因此我将遵循它。如果你想在 Scala 中使用它,它应该不会有太大的不同。正如我提到的链接中所解释的那样,您可能必须实现以下代码:

DataStream<String> output = colorPartitionedStream
                 .connect(ruleBroadcastStream)
                 .process(
                     // type arguments in our KeyedBroadcastProcessFunction represent: 
                     //   1. the key of the keyed stream
                     //   2. the type of elements in the non-broadcast side
                     //   3. the type of elements in the broadcast side
                     //   4. the type of the result, here a string
                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() 
                         // my matching logic
                     
                 );

我建议您可以以固定的时间间隔从数据库或您的商店中收集流ruleBroadcastStream。而不是得到:

// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

就像网页上说的那样。您需要添加一个可以安排它每 X 分钟运行一次的源。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
BroadcastStream<Rule> ruleBroadcastStream = env
             .addSource(new YourStreamSource())
             .broadcast(ruleStateDescriptor);
 
public class YourStreamSource extends RichSourceFunction<YourType> 
    private volatile boolean running = true;
    @Override
    public void run(SourceContext<YourType> ctx) throws Exception 
        while (running) 
            // TODO: yourData = FETCH DATA;
            ctx.collect(yourData);

            Thread.sleep("sleep for X minutes");
        
    
    @Override
    public void cancel() 
        this.running = false;
    

【讨论】:

以上是关于是否可以在流式 flink 作业中创建批处理 flink 作业?的主要内容,如果未能解决你的问题,请参考以下文章

Flink Table API & SQL Planner 演变

今晚直播流式处理新秀Flink原理与实践

Flink Table API & SQL Planner 演变

我可以在同一个 Flink 作业中使用 DataSet API 和 DataStream API 吗?

在 PowerShell 中创建批处理作业

在批处理作业中创建文件名作为时间戳