Kafka Ktable 还流式传输重复更新
Posted
技术标签:
【中文标题】Kafka Ktable 还流式传输重复更新【英文标题】:Kafka Ktable also streaming duplicate updates 【发布时间】:2020-08-04 05:47:12 【问题描述】:。
我想处理 Ktable(使用 Kstream.reduce() 创建)更改日志流,即 Ktable 中键值的任何更改。但它似乎即使多次向 Ktable 发送相同的键值对,每次都向下游发送。仅当值发生更改时,我才需要在键的值中发送更新。
`
groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde()))
.reduce(new Reducer<Long>()
@Override
public Long apply(Long t1, Long t2)
return t2;
).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
sendUpdate(key);
);
`
【问题讨论】:
Kafka Streams 实现了“更新时发出”策略,并且不比较值是否实际更改。不过,还有一些工作正在进行中,以部分更改行为,并且仅在值实际更改时才发出:cwiki.apache.org/confluence/display/KAFKA/… 【参考方案1】:这是KTable#toStream()
的默认行为,它将更改日志主题转换为KStream
,因此reduce
的下游运算符在每次上游reduce运算符收到消息时都会更新。
您可以使用Processor API 归档您的期望行为,在这种情况下,我们使用 KStream.transfomerValues()。
首先注册一个 KeyValueStore 来存储你的最新值:
//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));
numberKStream
.transformValues(ExtractIfValueChangedTransformer::new, "number_store")
.filter((key, value) -> value != null)
.foreach((key, value) -> sendUpdate(key));
然后我们创建一个ExtractIfValueChangedTransformer
,如果值改变了只返回新消息的值,如果没有则返回null:
public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long>
KeyValueStore<Long, Long> kvStore;
@Override
public void init(ProcessorContext context)
kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
@Override
public Long transform(Long key, Long newValue)
Long oldValue = kvStore.get(key);
kvStore.put(key, newValue);
if (oldValue == null) return newValue;
return oldValue.equals(newValue) ? null : newValue;
@Override
public void close()
【讨论】:
【参考方案2】:Kafka Streams 提供 2 种语义:emit-on-update 和 emit-on-window-close。
KIP-557 是关于基于数据的字节数组比较添加emit-on-change语义。它已经在 Kafka Streams 2.6 和 removed due to "potential data loss" 中实现。
不过,我使用 Kafka Streams DSL 开发了一个变化时发出语义的实现。
我们的想法是将具有 emit-on-update 语义的 KStream 转换为具有 emit-on-change 语义的 KStream。您可以在您提供的源 Kstream 上使用此实现来创建 KTable,也可以在应用 .toStream()
后在 KTable 上使用此实现。
这个实现隐式地创建了一个状态存储,其中的值包含 KStream 数据和一个标志,指示是否应该发出更新。此标志在聚合操作中设置,并基于Object#equals
进行比较。但您可以更改实现以使用Comparator
。
这是改变 KStream 语义的withEmitOnChange
方法。您可能必须为 EmitOnChangeState
数据结构指定一个 serde(见下文)。
public static <K, V> KStream<K, V> withEmitOnChange(KStream<K, V> streams)
return streams
.groupByKey()
.aggregate(
() -> (EmitOnChangeState<V>) null,
(k, data, state) ->
if (state == null)
return new EmitOnChangeState<>(data, true);
else
return state.merge(data);
)
.toStream()
.filter((k, state) -> state.shouldEmit)
.mapValues(state -> (V) state.data);
这是存储在状态存储中的数据结构,用于检查是否应该发出更新。
public static class EmitOnChangeState<T>
public final T data;
public final boolean shouldEmit;
public EmitOnChangeState(T data, boolean shouldEmit)
this.data = data;
this.shouldEmit = shouldEmit;
public EmitOnChangeState<T> merge(T newData)
return new EmitOnChangeState<>(newData, Objects.equals(data, newData));
@Override
public boolean equals(Object o)
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EmitOnChangeState<?> that = (EmitOnChangeState<?>) o;
return shouldEmit == that.shouldEmit && Objects.equals(data, that.data);
@Override
public int hashCode()
return Objects.hash(data, shouldEmit);
用法:
KStream<ProductKey, Product> products = builder.stream("product-topic");
withEmitOnChange(products)
.to("out-product-topic"); // output topic with emit-on-change semantic
【讨论】:
以上是关于Kafka Ktable 还流式传输重复更新的主要内容,如果未能解决你的问题,请参考以下文章