Flink 的广播状态行为

Posted

技术标签:

【中文标题】Flink 的广播状态行为【英文标题】:Flink's broadcast state behavior 【发布时间】:2018-06-25 09:38:50 【问题描述】:

我正在尝试用一个简单的案例来玩弄 flink 的广播状态。

我只是想将一个整数流乘以另一个整数成为广播流。

我的 Broadcast 的行为是“奇怪的”,如果我在输入流中放的元素太少(比如 10),什么也不会发生,我的 MapState 是空的,但是如果我放更多的元素(比如 100)我有我想要的行为(在这里将整数流乘以 2)。

如果我给的元素太少,为什么广播流不考虑?

如何控制广播流的工作时间?

可选:我只想保留广播流的最后一个元素,.clear() 是个好方法吗?

谢谢!

这是我的BroadcastProcessFunction

import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._

class BroadcastProcess extends BroadcastProcessFunction[Int, Int, Int] 
  override def processElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#ReadOnlyContext, out: Collector[Int]) = 
    val currentBroadcastState = ctx.getBroadcastState(State.mapState).immutableEntries()
    if (currentBroadcastState.isEmpty) 
      out.collect(value)
     else 
      out.collect(currentBroadcastState.last.getValue * value)
    
  

  override def processBroadcastElement(value: Int, ctx: BroadcastProcessFunction[Int, Int, Int]#Context, out: Collector[Int]) = 
    // Keep only last state
    ctx.getBroadcastState(State.mapState).clear()
    // Add state
    ctx.getBroadcastState(State.mapState).put("key", value)
  

还有我的MapState

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.scala._

object State 
  val mapState: MapStateDescriptor[String, Int] =
    new MapStateDescriptor(
      "State",
      createTypeInformation[String],
      createTypeInformation[Int]
    )

还有我的Main

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object Broadcast 
  def main(args: Array[String]): Unit = 
    val numberElements = 100
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val broadcastStream = env.fromElements(2).broadcast(State.mapState)
    val input = (1 to numberElements).toList
    val inputStream = env.fromCollection(input)
    val outputStream = inputStream
      .connect(broadcastStream)
      .process(new BroadcastProcess())
    outputStream.print()
    env.execute()
  

编辑:我使用 Flink 1.5,广播状态文档是 here。

【问题讨论】:

【参考方案1】:

Flink 不会同步流的摄取,即流会尽快生成数据。这适用于常规和广播输入。 BroadcastProcess 在摄取常规输入之前不会等待第一个广播输入到达。

当您将更多数字放入常规输入时,序列化、反序列化和服务输入需要更多时间,以便在第一个常规数字到达时已经存在广播输入。

【讨论】:

感谢您的回答!你知道是否有办法控制(或至少知道)广播输入何时到达? 这是特定于应用程序的。 bc 输入也可能是无限的。您可以在状态(例如 ListState)中缓冲所有常规输入,并在收到所有(或第一个)bc 输入后处理processBroadcastElement() 中记录的事件 嗯,好的,谢谢!在“正常”情况下不会有问题,但我想“单元测试”广播处理。我会检查我是否找到解决方案,或者我是否只是添加一个Thread.sleep 来等待广播元素。 我有同样的语义问题。没有一个 flink 示例涵盖这个主题。即使在放置了一些逻辑来缓冲主流并等待广播元素到达之后,也无法保证它何时会发生。如果主流的吞吐量真的很高,那么广播流什么时候有机会被运营商处理还不清楚。使缓冲变得不可预测。

以上是关于Flink 的广播状态行为的主要内容,如果未能解决你的问题,请参考以下文章

Flink Broadcast State 实战指南

Flink Broadcast State 实战指南

Flink 源码:广播流状态源码解析

Flink KeyedProcessFunction 与广播状态

如何在 flink 中更新 KeyedBroadcastProcessFunction 中的广播状态?

Flink-状态与容错-Broadcast State--flink1.13