在 Flink 中通过广播进行 join 操作
Posted
技术标签:
【中文标题】在 Flink 中通过广播进行 join 操作【英文标题】:Performing a join operation through broadcast in Flink 【发布时间】:2021-09-08 14:18:05 【问题描述】:我想加入一个大流和一个小得多的流。我想广播较小的流,然后将其连接到较大的流。
但是我不确定如何处理存储广播的模式以及在processElement method
中如何查找匹配的模式然后组合这两个元素。
编辑: 我已经设法使用以下 sn-p 制作了广播加入的原型。我改编了官方培训存储库中的正常连接:https://github.com/apache/flink-training/blob/release-1.13/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
这似乎有效,但我不确定我的逻辑是否正确。
//The main function has been abbreviated for ease of reading
def main()
val rides = env
.addSource(rideSourceOrTest(new TaxiRideGenerator()))
.filter ride => ride.isStart
// .keyBy ride => ride.rideId
val fares = env
.addSource(fareSourceOrTest(new TaxiFareGenerator()))
val broadcastStateDescriptor = new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])
val faresBroadcast: BroadcastStream[TaxiFare] = fares
.broadcast(broadcastStateDescriptor)
val result: DataStream[(TaxiRide,TaxiFare)] = rides
.connect(faresBroadcast)
.process(new BroadcastJoin())
class BroadcastJoin() extends BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]//IN1, IN2, OUT。 That is, non broadcast stream type, broadcast stream type and output stream type
//Broadcast state descriptor
private lazy val broadcastStateDescriptor = new MapStateDescriptor[Long,TaxiFare]("fares_broadcast",classOf[Long],classOf[TaxiFare])
//Process the broadcast stream element, value is the broadcast stream element passed in, and the modifiable broadcast state can be obtained through CTX
override def processBroadcastElement(value: TaxiFare, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#Context, out: Collector[(TaxiRide,TaxiFare)]): Unit =
val broadcast_status: BroadcastState[Long,TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor)
if(broadcast_status.contains(value.rideId))
broadcast_status.remove(value.rideId)
broadcast_status.put ( value.rideId , value) // add the broadcast stream element to the broadcast state, which will be saved in local memory
//Handle non broadcast stream elements. Value is the non broadcast stream element passed in. Only read-only broadcast status can be obtained through CTX
override def processElement(value: TaxiRide, ctx: BroadcastProcessFunction[TaxiRide,TaxiFare,(TaxiRide,TaxiFare)]#ReadOnlyContext, out: Collector[(TaxiRide,TaxiFare)]): Unit =
//Read broadcast status
val broadcast_status: ReadOnlyBroadcastState[Long, TaxiFare] = ctx.getBroadcastState(broadcastStateDescriptor)
if(broadcast_status.contains(value.rideId))
val foundMatch = broadcast_status.get(value.rideId)
out.collect((value, foundMatch)) //Send out the desired results
【问题讨论】:
欢迎来到 ***。你试过什么了?你在哪里卡住了? 嗨,彼得!我刚刚用更有意义的信息更新了我的帖子。我可以使用广播方法成功加入两个流,但我想知道我的实现是否有任何问题 【参考方案1】:例如,您的高流量流可能是金融交易,而低流量广播流可能是从各种货币到美元的外汇汇率。您的目标可能是将所有交易标准化为美元。
您需要描述作为从键到值的映射广播的任何数据。
在这种情况下,您可以广播一张地图,其中键是一种货币(例如,欧元),值是该货币对美元的最新汇率。然后在 BroadcastProcessFunction 的 processBroadcastElement 方法中存储这些汇率,并在 processElement 方法中查找来自广播状态的传入交易的相关汇率。
(这可能不是实现这一特定目标的最佳方式——这只是我想到的第一个例子。)
【讨论】:
您好大卫,非常感谢您的意见。我开始使用培训存储库,并修改了 RidesAndFares 练习以使用广播加入而不是普通加入。在我的例子中,我更改了 FareGenerator 以便它只创建 20 个样本。我设法使用上面粘贴的 sn-p 成功广播加入,结果确实有 20 个元组。但是,如果我尝试走另一条路,首先键入主流并改用 KeyedBroadcastProcessFunction(我的实现与我粘贴的非常相似),我总是收到少于 20 个元组。 我看到的一个问题是您忽略了在初始广播之前到达的任何出租车。一种解决方案是添加一个 open() 方法来初始化广播状态,以便总有一些东西可以加入。 我认为你是绝对正确的。我正在观察我丢失数据的行为,我认为它与水印配置或类似的东西有关。您能否具体告诉我如何更具体地在 open 方法中进行此初始化?以上是关于在 Flink 中通过广播进行 join 操作的主要内容,如果未能解决你的问题,请参考以下文章
Apache-Flink 1.11 无法在 Java Flink Streamming Job 中通过 SQL Function DDL 使用 Python UDF
通过 JDBC 在受限环境中通过流式处理或批处理处理整个 SQL 表