在 Flink 中通过广播进行 join 操作

Posted

技术标签:

【中文标题】在 Flink 中通过广播进行 join 操作【英文标题】:Performing a join operation through broadcast in Flink 【发布时间】:2021-09-08 14:18:05 【问题描述】:

我想加入一个大流和一个小得多的流。我想广播较小的流,然后将其连接到较大的流。

但是我不确定如何处理存储广播的模式以及在processElement method 中如何查找匹配的模式然后组合这两个元素。

编辑: 我已经设法使用以下 sn-p 制作了广播加入的原型。我改编了官方培训存储库中的正常连接:https://github.com/apache/flink-training/blob/release-1.13/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala

这似乎有效,但我不确定我的逻辑是否正确。

//The main function has been abbreviated for ease of reading
def main()
    val rides = env
      .addSource(rideSourceOrTest(new TaxiRideGenerator()))
      .filter  ride => ride.isStart 
//      .keyBy  ride => ride.rideId 

    val fares = env
      .addSource(fareSourceOrTest(new TaxiFareGenerator()))

    val broadcastStateDescriptor = new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])
    val faresBroadcast: BroadcastStream[TaxiFare] = fares
      .broadcast(broadcastStateDescriptor)

    val result: DataStream[(TaxiRide,TaxiFare)] = rides
      .connect(faresBroadcast)
      .process(new BroadcastJoin())


class BroadcastJoin() extends BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]//IN1, IN2, OUT。 That is, non broadcast stream type, broadcast stream type and output stream type
    //Broadcast state descriptor
    private lazy val broadcastStateDescriptor =  new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])

    //Process the broadcast stream element, value is the broadcast stream element passed in, and the modifiable broadcast state can be obtained through CTX
    override def processBroadcastElement(value: TaxiFare, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#Context, out: Collector[(TaxiRide,TaxiFare)]): Unit = 
      val broadcast_status: BroadcastState[Long,TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor)
      if(broadcast_status.contains(value.rideId))
        broadcast_status.remove(value.rideId)
      
      broadcast_status.put ( value.rideId , value) // add the broadcast stream element to the broadcast state, which will be saved in local memory
    

    //Handle non broadcast stream elements. Value is the non broadcast stream element passed in. Only read-only broadcast status can be obtained through CTX
    override def processElement(value: TaxiRide, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#ReadOnlyContext, out: Collector[(TaxiRide,TaxiFare)]): Unit = 
      //Read broadcast status
      val broadcast_status: ReadOnlyBroadcastState[Long, TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor)
      if(broadcast_status.contains(value.rideId)) 
        val foundMatch = broadcast_status.get(value.rideId)
        out.collect((value, foundMatch)) //Send out the desired results
      
    
  

【问题讨论】:

欢迎来到 ***。你试过什么了?你在哪里卡住了? 嗨,彼得!我刚刚用更有意义的信息更新了我的帖子。我可以使用广播方法成功加入两个流,但我想知道我的实现是否有任何问题 【参考方案1】:

例如,您的高流量流可能是金融交易,而低流量广播流可能是从各种货币到美元的外汇汇率。您的目标可能是将所有交易标准化为美元。

您需要描述作为从键到值的映射广播的任何数据。

在这种情况下,您可以广播一张地图,其中键是一种货币(例如,欧元),值是该货币对美元的最新汇率。然后在 BroadcastProcessFunction 的 processBroadcastElement 方法中存储这些汇率,并在 processElement 方法中查找来自广播状态的传入交易的相关汇率。

(这可能不是实现这一特定目标的最佳方式——这只是我想到的第一个例子。)

【讨论】:

您好大卫,非常感谢您的意见。我开始使用培训存储库,并修改了 RidesAndFares 练习以使用广播加入而不是普通加入。在我的例子中,我更改了 FareGenerator 以便它只创建 20 个样本。我设法使用上面粘贴的 sn-p 成功广播加入,结果确实有 20 个元组。但是,如果我尝试走另一条路,首先键入主流并改用 KeyedBroadcastProcessFunction(我的实现与我粘贴的非常相似),我总是收到少于 20 个元组。 我看到的一个问题是您忽略了在初始广播之前到达的任何出租车。一种解决方案是添加一个 open() 方法来初始化广播状态,以便总有一些东西可以加入。 我认为你是绝对正确的。我正在观察我丢失数据的行为,我认为它与水印配置或类似的东西有关。您能否具体告诉我如何更具体地在 open 方法中进行此初始化?

以上是关于在 Flink 中通过广播进行 join 操作的主要内容,如果未能解决你的问题,请参考以下文章

Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF

通过 JDBC 在受限环境中通过流式处理或批处理处理整个 SQL 表

在 Kotlin 中通过发送广播启动新 Activity

Flink通过异步IO实现redis维表join

命令行中通过adb shell am broadcast发送带参数广播

在Android中通过音频信号传输和提取消息