去重 KStream-KStream 的中间结果加入 Kafka Streams

Posted

技术标签:

【中文标题】去重 KStream-KStream 的中间结果加入 Kafka Streams【英文标题】:Deduplicate intermediate results of KStream-KStream joins in Kafka Streams 【发布时间】:2019-09-28 01:41:48 【问题描述】:

我有以下场景:

    表 A 和表 B 使用 FK 连接。 事务性插入/更新到 A 和 B。 Debezium 为表 A 发出一个事件 a,为表 B 发出一个事件 b。 Kafka Streams 为表 A 和 B 创建 KStream。 Kafka Streams 应用程序leftJoin KStreams A 和 B。(假设ab 记录具有相同的键并落在连接窗口中)。 输出记录将为[a, null], [a, b]

你如何丢弃[a, null]

一个选项是执行innerJoin,但在update查询的情况下这仍然是一个问题。

我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但不能保证时间戳的唯一性。

即。最终目标是能够识别最新的聚合,以便我们可以在查询时过滤掉中间结果(在 Athena/Presto 或某些 RDBMS 中)。

【问题讨论】:

加入后可以filter()吗?另请注意,支持外键连接的是 WIP atm:cwiki.apache.org/confluence/display/KAFKA/… 我可以使用filter,但不想依赖插入始终是事务性的实现细节。考虑两个顺序插入,这将是两个事件,对于 KStream-KStream 连接,它们将输出 2 条记录,而不仅仅是我想要的 1 条。 @MatthiasJ.Sax 特别是我想弄清楚在使用 S3 接收器之类的东西时如何识别下游每个键的最新消息。我目前为eventCreatedAt 添加了一个字段,但对于同一事务中的事件,这显然是相同的(并且不保证会增加)。 非常类似于***.com/questions/47495299/…。 【参考方案1】:

目前,我发现的最佳工作方法是利用输出记录中的 Kafka 偏移量。

方法可以概括为:

    执行您想要执行的所有逻辑,不必担心同一个键有多个记录。 将结果写入中间主题,保留时间非常短(1 小时等) 使用处理器读取中间主题,并在处理器内使用context.offset() 使用 Kafka 偏移量丰富消息。 将消息写入输出主题。

现在,您的输出主题包含相同键的多条消息,但每条消息的偏移量不同。

现在在查询期间,您可以使用子查询为每个键选择最大偏移量。

TransformerSupplier 示例如下所示

/**
 * @param <K> key type
 * @param <V> value type
 */
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>> 
  @Override
  public Transformer<K, V, KeyValue<String, String>> get() 
    return new OutputTransformer<>();
  

  private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>> 
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) 
      this.context = context;
    

    /**
     * @param key   the key for the record
     * @param value the value for the record
     */
    @Override
    public KeyValue<String, String> transform(K key, V value) 
      if (value != null) 
        value.setKafkaOffset(context.offset());
      
      return new KeyValue<>(key, value);
    

    @Override
    public KeyValue<String, String> punctuate(long timestamp) 
      return null;
    

    @Override
    public void close() 
      // nothing to close
    
  

【讨论】:

以上是关于去重 KStream-KStream 的中间结果加入 Kafka Streams的主要内容,如果未能解决你的问题,请参考以下文章

爬虫最后一天,爬取到的数据存到mysql中,爬虫和下载中间件加代理cookieheaderselenium随机生成uersagent去重规则源码分析(布隆过滤器)scrapy-redis实现分布式爬虫

Linux 文件去重所遇到的bug

KStream-KStream 内连接抛出 java.lang.ClassCastException

SQL 查询结果加引号

当进行数据的去重查询时,使用group by效率更高。

数据库中的distinct关键字(去重)