是否可以在流式 flink 作业中创建批处理 flink 作业?
Posted
技术标签:
【中文标题】是否可以在流式 flink 作业中创建批处理 flink 作业?【英文标题】:Is it possible to create a batch flink job in streaming flink job? 【发布时间】:2020-12-21 13:18:12 【问题描述】:我有一个使用 Scala 使用 Apache Flink(flink 版本:1.8.1)流式传输的作业。流式作业要求如下: Kafka -> 写入 Hbase -> 使用不同的主题再次发送到 kafka
在向 Hbase 写入过程中,需要从另一个表中检索数据。为保证数据不为空(NULL),作业必须反复(一定时间内)检查数据是否为空。
这可以用 Flink 实现吗?如果是,您能否提供与我的需求相似的情况的示例?
编辑: 我的意思是,对于我在内容中描述的问题,我考虑过在作业流中创建某种作业批处理,但我找不到适合我的案例的正确示例。那么,是否可以在流式 flink 作业中创建批量 flink 作业?如果是,您能否提供与我的需求相似的情况的示例?
【问题讨论】:
我认为广播模式适用于您的用例 (ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/…),尽管您必须更改ruleStateDescriptor
以每 X 分钟更新一次状态。
不清楚您的问题标题(“”)与其余内容的关系。
【参考方案1】:
使用最新版本的 Flink,您可以从 SQL/Table API 对 HBase 进行查找查询(使用可配置的缓存)。您的用例听起来很容易以这种方式实现。请参阅the docs 了解更多信息。
【讨论】:
【参考方案2】:为了澄清我的评论,我将根据The Broadcast State Pattern 发布我试图提出的建议的草图。该链接提供了一个 Java 示例,因此我将遵循它。如果你想在 Scala 中使用它,它应该不会有太大的不同。正如我提到的链接中所解释的那样,您可能必须实现以下代码:
DataStream<String> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
// type arguments in our KeyedBroadcastProcessFunction represent:
// 1. the key of the keyed stream
// 2. the type of elements in the non-broadcast side
// 3. the type of elements in the broadcast side
// 4. the type of the result, here a string
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>()
// my matching logic
);
我建议您可以以固定的时间间隔从数据库或您的商店中收集流ruleBroadcastStream
。而不是得到:
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
就像网页上说的那样。您需要添加一个可以安排它每 X 分钟运行一次的源。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
BroadcastStream<Rule> ruleBroadcastStream = env
.addSource(new YourStreamSource())
.broadcast(ruleStateDescriptor);
public class YourStreamSource extends RichSourceFunction<YourType>
private volatile boolean running = true;
@Override
public void run(SourceContext<YourType> ctx) throws Exception
while (running)
// TODO: yourData = FETCH DATA;
ctx.collect(yourData);
Thread.sleep("sleep for X minutes");
@Override
public void cancel()
this.running = false;
【讨论】:
以上是关于是否可以在流式 flink 作业中创建批处理 flink 作业?的主要内容,如果未能解决你的问题,请参考以下文章
Flink Table API & SQL Planner 演变
Flink Table API & SQL Planner 演变