Flink KeyedProcessFunction 与广播状态

Posted

技术标签:

【中文标题】Flink KeyedProcessFunction 与广播状态【英文标题】:Flink KeyedProcessFunction vs Broadcast state 【发布时间】:2021-04-16 12:18:13 【问题描述】:

我尝试为我的 flink 应用程序使用广播状态模式,但经过一些研究后我做了以下操作:

case class MyData(field1: String, field2: String, ts: Long, type: String) // type can be DATA or CONFIG

val stream1: DataStream[MyData] = ... // kinesis queue 1. main Data stream
val stream2: DataStream[MyData] = ... // kinesis queue 2. configuration stream

val union = stream1.union(stream2)
  .keyBy(x => s"$x.field1_$x.field2")
  .process(new MyProcessFun)

MyProcessFun() 中,我读取数据,并根据来自stream2 的数据对数据进行一些逻辑处理并发出一些元素。 基本上我使用stream2 就像广播状态模式。我没有专门使用广播,因为没有简单的方法可以访问我从processBroadcastElement 获得的某些状态。 由于我的配置流被用作我在MyProcessFun() 中的清理状态的指标。

流是.keyBy,所以我预计并行性> 1 不会出现问题。这是真的吗?

我的问题是在哪些情况下还需要广播? 什么情况下需要使用广播模式? 因为在许多情况下,此类功能可以在 .union(),.connect() // with no broadcast + CoProcessFunction() 的帮助下解决。

【问题讨论】:

【参考方案1】:

这里要说两件事。 首先,您似乎可以使用标准的KeyedCoProcess 函数来实现您现在使用union 所做的事情。它不会有太大的不同,但您可以为两个流设置单独的类,这样可以提高类型安全性和更好的域隔离。

至于broadcast,主要用例是control 流没有keyBy 的密钥,或者根本不能/不应该被分区。

要考虑的一个例子是,您可能有一些由外部系统生成的事件,并且您想应用规则来过滤掉不符合规则要求的事件。您希望拥有动态规则,以便如果用户定义规则,它将立即用于过滤传入事件。为简单起见,我们假设规则对所有事件类型都适用(例如,如果事件发生在某天下午 5 点之后,那么它应该被过滤,或者如果事件持续超过 5 分钟,我们假设它是无效的)。你不能划分这样的规则,所以解决方案是broadcast

或者,如果您想要一个系统,您可以实时计算送货司机的总收入。您可能有一套额外的奖金(例如,如果司机在一小时内完成 10 次送货,则有 5% 的奖金)。你不会想为每个司机创建一套单独的奖励规则,这样你就可以keyBy它,你会吗?:)

【讨论】:

我使用了.union,因为我实际上在那里合并了几个流。据我了解,如果您使用.connect,这是不可能的。也就是说,在我的实现中,并行度大于 1 没有问题吗?感谢您详细说明何时使用广播。我的理解如下,如果您想向control 流发送不与data 流中的任何实体绑定的一般事件 - 使用广播:) 如果键是唯一的,那么在联合的情况下处理并行性应该不是什么问题。

以上是关于Flink KeyedProcessFunction 与广播状态的主要内容,如果未能解决你的问题,请参考以下文章

flink 读取mysql并使用flink sql

Flink关于Flink:Flink-SortShuffle-实现简介

Flink学习笔记:Flink的最简安装

Flink入门——Flink架构介绍

flink实战教程-集群的部署

Flink 源码解析 —— 深度解析 Flink 序列化机制