如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?

Posted

技术标签:

【中文标题】如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?【英文标题】:How to send final kafka-streams aggregation result of a time windowed KTable? 【发布时间】:2016-12-20 12:38:38 【问题描述】:

我想做的是:

    使用数字主题 (Long's) 中的记录 聚合(计数)每个 5 秒窗口的值 将最终聚合结果发送到另一个主题

我的代码如下所示:

KStream<String, Long> longs = builder.stream(
            Serdes.String(), Serdes.Long(), "longs");

// In one ktable, count by key, on a five second tumbling window.
KTable<Windowed<String>, Long> longCounts = 
            longs.countByKey(TimeWindows.of("longCounts", 5000L));

// Finally, sink to the long-avgs topic.
longCounts.toStream((wk, v) -> wk.key())
          .to("long-counts");

看起来一切都按预期工作,但聚合被发送到每个传入记录的目标主题。我的问题是如何只发送每个窗口的最终聚合结果?

【问题讨论】:

【参考方案1】:

在 Kafka Streams 中没有“最终聚合”之类的东西。窗口始终保持打开状态,以处理在窗口结束时间过后到达的乱序记录。但是,窗户不会永远保留。一旦保留时间到期,它们就会被丢弃。窗口何时被丢弃没有特殊的操作。

查看 Confluent 文档了解更多详情:http://docs.confluent.io/current/streams/

因此,对于聚合的每次更新,都会生成一条结果记录(因为 Kafka Streams 也会在乱序记录上更新聚合结果)。您的“最终结果”将是最新的结果记录(在丢弃窗口之前)。根据您的用例,手动重复数据删除将是解决问题的一种方法(使用较低级别的 API,transform()process()

这篇博文也可能有帮助:https://timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html

另一篇不使用标点符号的博客文章:http://blog.inovatrend.com/2018/03/making-of-message-gateway-with-kafka.html

更新

使用KIP-328,添加了KTable#suppress() 运算符,这将允许以严格的方式抑制连续更新并在每个窗口发出单个结果记录;代价是延迟增加。

【讨论】:

另外:Kafka Streams 即将推出的功能将为您提供一个配置选项(您可以配置其大小的缓冲区/缓存)来控制 Kafka Streams 的下游/输出数据速率。如果设置更大的缓冲区大小,将合并更多的下游更新,从而降低下游速率。 这很不幸。 Flink 在每个“循环”结束时给出窗口的输出。 kafka 的实现听起来好像我们需要一个外部计时器进程来在每个窗口完成后转储 KTable。 作为一个(很晚的)跟进:能够为窗口状态通知注册回调将非常有用。创建、删除等。您有 statestore 的回调 - 只需继续趋势! 这绝对是一个黑客... :D 你有这个对我们凡人 Dimitry 可用的实现吗? :)【参考方案2】:

从 Kafka Streams 2.1 版开始,你可以实现这个usingsuppress

上面提到的 apache Kafka Streams 文档中有一个示例,当用户在一小时内发生的事件少于三个时会发送警报:

KGroupedStream<UserId, Event> grouped = ...;
grouped
  .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
  .count()
  .suppress(Suppressed.untilWindowCloses(unbounded()))
  .filter((windowedUserId, count) -> count < 3)
  .toStream()
  .foreach((windowedUserId, count) -> sendAlert(windowedUserId.window(), windowedUserId.key(), count));

正如this 答案的更新中所述,您应该注意权衡。此外,note 那个 suppress() 是基于事件时间的。

【讨论】:

【参考方案3】:

我遇到了这个问题,但我解决了这个问题,在固定窗口之后添加了 grace(0) 并使用了 Suppressed API

public void process(KStream<SensorKeyDTO, SensorDataDTO> stream) 

        buildAggregateMetricsBySensor(stream)
                .to(outputTopic, Produced.with(String(), new SensorAggregateMetricsSerde()));

    

private KStream<String, SensorAggregateMetricsDTO> buildAggregateMetricsBySensor(KStream<SensorKeyDTO, SensorDataDTO> stream) 
        return stream
                .map((key, val) -> new KeyValue<>(val.getId(), val))
                .groupByKey(Grouped.with(String(), new SensorDataSerde()))
                .windowedBy(TimeWindows.of(Duration.ofMinutes(WINDOW_SIZE_IN_MINUTES)).grace(Duration.ofMillis(0)))
                .aggregate(SensorAggregateMetricsDTO::new,
                        (String k, SensorDataDTO v, SensorAggregateMetricsDTO va) -> aggregateData(v, va),
                        buildWindowPersistentStore())
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .map((key, value) -> KeyValue.pair(key.key(), value));
    


    private Materialized<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>> buildWindowPersistentStore() 
        return Materialized
                .<String, SensorAggregateMetricsDTO, WindowStore<Bytes, byte[]>>as(WINDOW_STORE_NAME)
                .withKeySerde(String())
                .withValueSerde(new SensorAggregateMetricsSerde());
    

在这里你可以看到结果

【讨论】:

以上是关于如何发送时间窗口 KTable 的最终 kafka-streams 聚合结果?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Streams API:KStream 到 KTable

使用 Kafka 实现 SQL 更新

可以将 Kafka Streams 配置为等待 KTable 加载吗?

Kafka Stream 和 KTable 一对多关系加入

Kafka Streams API:避免在 KTable.mapValues 中添加额外的 stateStore

Kafka Streams KTable 外键连接未按预期工作