Apache flink:使用keyBy / connect维护流中的消息输入顺序
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache flink:使用keyBy / connect维护流中的消息输入顺序相关的知识,希望对你有一定的参考价值。
Intro
我正在使用apache flink构建一个相当复杂的数据流网络。这个想法是,用flink实现规则引擎。
作为应用程序的基本描述,这是它应该如何工作:
数据由kafka消费者源接收,并使用多个数据流进行处理,直到最终发送到kafka生产者接收器。传入数据包含具有逻辑键(“object-id”)的对象,并且传入消息可以引用相同的object-id。对于每个给定的object-id,必须在整个应用程序中保留其传入消息的顺序。整体消息的顺序可以是任意的。
这意味着,必须按顺序处理对象1的消息a,b和c,但是对象2的消息x可能在a1 / b1 / c1之间,之前或之后处理,无关紧要。
对于我目前的理解,这意味着我必须keyBy(_.objectID)
,以便按照它们到达的顺序处理同一对象的消息。
Current approach
为了实现实际的规则引擎,创建了一个流网络。这个想法如下:
- 每条规则都有1-n条件
- 对于每个规则的每个条件,使用
.filter(_.matches(rule.condition))
创建原始流的子流 - 通过使用
substream1.connect(substream2).flatMap(new CombineFunction[MyObject](...))
组合对应于相同规则的所有子流 connect
只能加入2个流,因此具有3个条件的规则将导致后续的2个连接- 使用相同条件的规则将重用第二步中创建的相同子流。
这将导致n个连接流,其中n对应于规则的数量。连接的流将附加一个map
函数,用于标记消息,以便我们知道规则匹配。
每个连接/结果流可以独立于其他结果将其结果(“规则xyz匹配”)发布到kafka生成器,因此此时我可以将接收器附加到流。
Connect details
因为两个流的.connect
(“条件” - 子流)必须只传递消息,如果它在两个流上都被接收(^ =两个条件匹配),我需要一个带键控状态的RichCoFlatMapFunction
,它可以处理“只有在另一方已收到“才通过”。
但是,问题是,流是由object-id键入的。那么,如果同一个对象的2条消息通过网络运行并到达.connect().map(new RichCoFlatMapFunction...)
会发生什么?这将导致错误的输出。我需要在进入网络时为每个传入消息分配唯一ID(UUID),因此我可以在.connect().map()..
连接中使用此密钥(而不是object-id)。但与此同时,我需要通过object-id键入流,以便按顺序处理相同对象的消息。该怎么办?
为了解决这个问题,我使用keyBy(_.objectID)
保存了输入流,但是stream-join中的RichCoFlatMapFunction
不再使用keyed-state。相反,我使用一个简单的操作符状态,它保存传递对象的映射,但实现相同的逻辑,只需手动键/值查找。
这似乎有效,但我不知道这是否会引入更多问题。
Visualization
flink GUI将呈现此图像,包含14个规则的列表,总共23个条件(某些规则只有一个条件):
Code
使用以下代码实现网络的创建:
val streamCache = mutable.Map[Int,DataStream[WorkingMemory]]()
val outputNodesCache = ListBuffer[DataStream[WorkingMemory]]()
if (rules.isEmpty)
return
// create partial streams for all conditions (first level)
// cache the sub-stream with the hashcode of its condition as key (for re-use)
for (rule <- rules if rule.checks.nonEmpty ;
cond <- rule.checks if !streamCache.contains(cond.hashCode()))
streamCache += cond.hashCode -> sourceStream.filter(cond.matches _)
// create joined streams for combined conditions (sub-levels)
for (rule <- rules if rule.checks.nonEmpty)
{
val ruleName = rule.ruleID
// for each rule, starting with the rule with the least conditions ...
if (rule.checks.size == 1)
{
// ... create exit node if single-condition rule
// each exit node applies the rule-name to the objects set of matched rules.
outputNodesCache += streamCache(rule.checks.head.hashCode).map(obj => { obj.matchedRule = ListBuffer((ruleName, rule.objectType.mkString(":"), rule.statement)) ; obj })
}
else
{
// ... iterate all conditions, and join nodes into full rule-path (reusing existing intermediate paths)
var sourceStream:DataStream[WorkingMemory] = streamCache(rule.checks.head.hashCode)
var idString = rule.checks.head.idString
for (i <- rule.checks.indices)
{
if (i == rule.checks.size-1)
{
// reached last condition of rule, create exit-node
// each exit node applies the rule-name to the objects set of matched rules.
val rn = ruleName
val objectType = rule.objectType.mkString(":")
val statement = rule.statement
outputNodesCache += sourceStream.map(obj => { obj.matchedRule = ListBuffer((rn, objectType, statement)) ; obj })
}
else
{
// intermediate condition, create normal intermediate node
val there = rule.checks(i+1)
val connectStream = streamCache(there.hashCode)
idString += (":" + there.idString)
// try to re-use existing tree-segments
if (streamCache.contains(idString.hashCode))
sourceStream = streamCache(idString.hashCode)
else
sourceStream = sourceStream.connect(connectStream).flatMap(new StatefulCombineFunction(idString))
}
}
}
}
// connect each output-node to the sink
for (stream <- outputNodesCache)
{
stream.map(wm => RuleEvent.toXml(wm, wm.matchedRule.headOption)).addSink(sink)
}
前面片段中使用的StatefulCombineFunction
:
class StatefulCombineFunction(id:String) extends RichCoFlatMapFunction[WorkingMemory, WorkingMemory, WorkingMemory] with CheckpointedFunction
{
@transient
private var leftState:ListState[(String, WorkingMemory)] = _
private var rightState:ListState[(String, WorkingMemory)] = _
private var bufferedLeft = ListBuffer[(String, WorkingMemory)]()
private var bufferedRight = ListBuffer[(String, WorkingMemory)]()
override def flatMap1(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedLeft, bufferedRight, xmlObject, out, "left")
override def flatMap2(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedRight, bufferedLeft, xmlObject, out, "right")
def combine(leftState: ListBuffer[(String, WorkingMemory)], rightState: ListBuffer[(String, WorkingMemory)], xmlObject:WorkingMemory, out: Collector[WorkingMemory], side:String): Unit =
{
val otherIdx:Int = leftState.indexWhere(_._1 == xmlObject.uuid)
if (otherIdx > -1)
{
out.collect(leftState(otherIdx)._2)
leftState.remove(otherIdx)
}
else
{
rightState += ((xmlObject.uuid, xmlObject))
}
}
override def initializeState(context:FunctionInitializationContext): Unit = ???
override def snapshotState(context:FunctionSnapshotContext):Unit = ???
}
我知道从运营商状态中清除部分匹配是缺失的(生存时间),但它对于当前的开发状态并不重要,并且将在稍后添加。
Background information
该应用程序应使用flink(https://en.wikipedia.org/wiki/Rete_algorithm)实现规则匹配的rete算法。
另一种方法是仅为每个传入消息循环所有规则,并附加结果。我有一个使用flink的这种方法的工作实现,所以请不要建议这个解决方案。
Issues
问题是,应用程序会在对象ID级别上混淆传入消息的顺序。也就是说,它没有达到我在介绍中所要求的。对于每个object-id,传入的消息必须保持顺序。但这种情况并非如此。
我不知道在代码中哪个时候订单搞砸了,或者这些操作是如何在线程之间分配的,所以我不知道如何解决这个问题。
一些评论......
- 我假设你已经检查了Flink的CEP支持,特别是Handling Lateness in Event Time。关键概念是你可以依靠事件时间(不是处理时间)来帮助订购事件,但是你总是必须决定你愿意容忍的最大迟到量(迟到可能是由两者引起的)源,以及工作流程中发生的任何处理)。
- 从您提供的Flink作业图中,您看起来像是通过散列对传入数据进行分区,但每条规则都需要获取每个传入的数据,对吧?那么在这种情况下你需要广播。
以上是关于Apache flink:使用keyBy / connect维护流中的消息输入顺序的主要内容,如果未能解决你的问题,请参考以下文章
flink 是不是可以均匀地使用固定数字 key to keyBy 一个数据流来避免数据倾斜?
Flink AggregateFunction窗口函数,执行步骤流程与实例