Apache Kafka 根据消息的值对窗口消息进行排序
Posted
技术标签:
【中文标题】Apache Kafka 根据消息的值对窗口消息进行排序【英文标题】:Apache Kafka order windowed messages based on their value 【发布时间】:2017-10-11 21:07:11 【问题描述】:我正在尝试找到一种方法来重新排序主题分区中的消息并将已排序的消息发送到新主题。
我有发送以下格式的字符串消息的 Kafka 发布者:
system_timestamp-event_name?parameters
例如:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
此外,我们为每条消息添加一些消息键,将它们发送到相应的分区。
我想做的是根据消息的 system-timestamp 部分并在 1 分钟的窗口内重新排序事件,因为我们的发布者不保证消息会在符合 system-timestamp 值。
例如,我们可以先向主题传递 system-timestamp 值较大的消息。
我研究了 Kafka Stream API 并找到了一些关于消息窗口和聚合的示例:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream("events");
KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.
/* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
() -> "", // initial value
(aggKey, value, aggregate) -> aggregate + "", // aggregating value
TimeWindows.of(1000), // intervals in milliseconds
Serdes.String(), // serde for aggregated value
"test-store"
);*/
但是接下来我应该如何处理这个分组流?我没有看到任何 'sort() (e1,e2) -> e1.compareTo(e2)' 方法可用,windows 也可以应用于像 aggregation() 这样的方法em>, reduce() ,count() ,但我认为我不需要任何消息数据操作。
如何在 1 分钟窗口中重新排序消息并将它们发送到另一个主题?
【问题讨论】:
【参考方案1】:这是一个大纲:
创建一个处理器实现:
在 process() 方法中,对于每条消息:
从消息值中读取时间戳 使用 (timestamp, message-key) 对作为键,将消息-值作为值插入 KeyValueStore。注意这也提供了重复数据删除。您需要提供一个自定义 Serde 来序列化密钥,以便按字节排列时间戳在前,以便范围查询首先按时间戳排序。在 punctuate() 方法中:
使用从 0 到时间戳 - 60'000(=1 分钟)的范围提取来读取存储 使用 context.forward() 按顺序发送获取的消息并将它们从存储中删除这种方法的问题是,如果没有新的消息到达以提前“流时间”,则不会触发 punctuate()。如果在您的情况下这是一个风险,您可以创建一个外部调度程序,向您的主题的每个(!)分区发送定期“tick”消息,您的处理器应该忽略这些消息,但它们会导致标点符号在缺席时触发“真实”的消息。 KIP-138 将通过添加对系统时间标点符号的明确支持来解决此限制: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics
【讨论】:
再想一想,当生产者在同一毫秒内发出多个具有相同密钥的消息时,这将无法正常工作。因此,键应该是 (timestamp, some-unique-key) 对,或者值应该是一个集合。 假设生产者已经提供了正确的消息顺序,这应该没有问题,因为消息的消费顺序与它们在针对同一分区时产生的顺序相同——这我们知道拥有相同的键 按照这个很棒的说明画了一个例子。 github.com/confluentinc/kafka-streams-examples/issues/179【参考方案2】:这是我在项目中订购流的方式。
-
已创建具有源、处理器和接收器的拓扑。
在处理器中
-
process(key, value) -> 将每条记录添加到 List(实例变量)。
Init() -> schedule(WINDOW_BUFFER_TIME, WALL_CLOCK_TIME) -> punctuate (timestamp) 对List(实例变量)中窗口缓冲时间项的排序列表并迭代转发。清除列表(实例变量)。
这个逻辑对我来说很好。
【讨论】:
明智的做法是使用 StateStore 支持该集合,以便在应用程序崩溃的情况下不会丢失缓冲的事件。这是一个示例,您可能会丢失缓冲区***.com/a/62677079/2256618。我已经在这里提交了如何使用状态存储对事件进行排序 -> github.com/vinodhinic/kstream-sorting以上是关于Apache Kafka 根据消息的值对窗口消息进行排序的主要内容,如果未能解决你的问题,请参考以下文章