去重 KStream-KStream 的中间结果加入 Kafka Streams
Posted
技术标签:
【中文标题】去重 KStream-KStream 的中间结果加入 Kafka Streams【英文标题】:Deduplicate intermediate results of KStream-KStream joins in Kafka Streams 【发布时间】:2019-09-28 01:41:48 【问题描述】:我有以下场景:
-
表 A 和表 B 使用 FK 连接。
事务性插入/更新到 A 和 B。
Debezium 为表 A 发出一个事件
a
,为表 B 发出一个事件 b
。
Kafka Streams 为表 A 和 B 创建 KStream。
Kafka Streams 应用程序leftJoin
KStreams A 和 B。(假设a
和b
记录具有相同的键并落在连接窗口中)。
输出记录将为[a, null], [a, b]
。
你如何丢弃[a, null]
?
一个选项是执行innerJoin
,但在update
查询的情况下这仍然是一个问题。
我们尝试使用事件时间戳进行过滤(即使用最新的时间戳保留事件),但不能保证时间戳的唯一性。
即。最终目标是能够识别最新的聚合,以便我们可以在查询时过滤掉中间结果(在 Athena/Presto 或某些 RDBMS 中)。
【问题讨论】:
加入后可以filter()
吗?另请注意,支持外键连接的是 WIP atm:cwiki.apache.org/confluence/display/KAFKA/…
我可以使用filter
,但不想依赖插入始终是事务性的实现细节。考虑两个顺序插入,这将是两个事件,对于 KStream-KStream 连接,它们将输出 2 条记录,而不仅仅是我想要的 1 条。
@MatthiasJ.Sax 特别是我想弄清楚在使用 S3 接收器之类的东西时如何识别下游每个键的最新消息。我目前为eventCreatedAt
添加了一个字段,但对于同一事务中的事件,这显然是相同的(并且不保证会增加)。
非常类似于***.com/questions/47495299/…。
【参考方案1】:
目前,我发现的最佳工作方法是利用输出记录中的 Kafka 偏移量。
方法可以概括为:
-
执行您想要执行的所有逻辑,不必担心同一个键有多个记录。
将结果写入中间主题,保留时间非常短(1 小时等)
使用处理器读取中间主题,并在处理器内使用
context.offset()
使用 Kafka 偏移量丰富消息。
将消息写入输出主题。
现在,您的输出主题包含相同键的多条消息,但每条消息的偏移量不同。
现在在查询期间,您可以使用子查询为每个键选择最大偏移量。
TransformerSupplier 示例如下所示
/**
* @param <K> key type
* @param <V> value type
*/
public class OutputTransformSupplier<K, V> implements TransformerSupplier<K, V, KeyValue<String, String>>
@Override
public Transformer<K, V, KeyValue<String, String>> get()
return new OutputTransformer<>();
private class OutputTransformer<K, V> implements Transformer<K, V, KeyValue<String, String>>
private ProcessorContext context;
@Override
public void init(ProcessorContext context)
this.context = context;
/**
* @param key the key for the record
* @param value the value for the record
*/
@Override
public KeyValue<String, String> transform(K key, V value)
if (value != null)
value.setKafkaOffset(context.offset());
return new KeyValue<>(key, value);
@Override
public KeyValue<String, String> punctuate(long timestamp)
return null;
@Override
public void close()
// nothing to close
【讨论】:
以上是关于去重 KStream-KStream 的中间结果加入 Kafka Streams的主要内容,如果未能解决你的问题,请参考以下文章
爬虫最后一天,爬取到的数据存到mysql中,爬虫和下载中间件加代理cookieheaderselenium随机生成uersagent去重规则源码分析(布隆过滤器)scrapy-redis实现分布式爬虫