是否可以使用 Kafka Streams 访问消息头?

Posted

技术标签:

【中文标题】是否可以使用 Kafka Streams 访问消息头?【英文标题】:Is it possible to access message headers with Kafka Streams? 【发布时间】:2018-03-25 23:27:56 【问题描述】:

在 Kafka 0.11 的记录(ProducerRecord 和 ConsumerRecord)中添加了 Headers,在使用 Kafka Streams 处理主题时是否可以获得这些标头?当在KStream 上调用map 之类的方法时,它提供了key 和记录的value 的参数,但我看不到访问headers。如果我们可以在ConsumerRecords 上使用map,那就太好了。

例如

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
    ... 

这样的事情会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
    .map((record) -> 
        record.headers();
        record.key();
        record.value();
    )
    ...

【问题讨论】:

【参考方案1】:

自 2.0.0 版起可访问记录标头(参见KIP-244 了解详细信息)。

您可以通过处理器 API(即通过transform()transformValues()process()),通过给定的“上下文”对象(参见https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context)访问记录元数据。

更新

从 2.7.0 版本开始,处理器 API 得到了改进(参见 KIP-478),添加了一个新的类型安全 api.Processor 类,使用 process(Record) 代替 process(K, V) 方法。对于这种情况,可以通过Record 类访问标头(和记录元数据)。

此新功能在“DSL 的 PAPI 方法中尚不可用(例如,KStream#process()KStream#transform() 和兄弟姐妹)。

+++++

在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳——但不公开在那些旧版本中读取时实际上由 Streams 删除的标头。

但元数据在 DSL 级别不可用。但是,通过KIP-159 扩展 DSL 的工作也在进行中。

【讨论】:

澄清 Matthias 所说的:是的,Kafka Streams 中的处理器 API 允许您访问记录元数据,例如主题名称、分区号、偏移量等。Kafka Streams 中的 DSL 不提供访问权限.但是因为你可以将处理器 API 和 DSL 结合起来,你仍然可以编写一个基于 DSL 的流处理应用程序,通过使用 DSL 的 transform()transformValues() 函数来访问记录元数据,它允许你传入一个 Processor/来自处理器 API 的转换器。 感谢各位提供信息,我会留意何时将元数据添加到 DSL 级别,以便更新此答案。 @MatthiasJ.Sax 和@MichaelG.Noll:在cwiki.apache.org/confluence/display/KAFKA/… 中,对于RecordContext 提案,它似乎没有暴露标题。那是要添加的东西吗? 没有计划通过 KIP-159 扩展 RecordContext ——当我们添加标头支持时,它是 TDB 的样子,但我假设我们会添加新方法到 RecordContext 为此。如果您对详细信息感兴趣,Jira 是什么:) @MatthiasJ.Sax 对我来说仍然不是 100% 清楚:这是否意味着通过 Streams 1.0.1 既无法通过 DSL 也无法通过处理器 API 访问消息的标头?我通过检查 ProcessorContext (kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/…) 来询问我无法找到当前处理的消息的标题。

以上是关于是否可以使用 Kafka Streams 访问消息头?的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 消费者 API 上的 Kafka Streams DSL

使用 Kafka 的 Streams API 处理错误消息

使用Kafka Streams处理复杂的Avro消息

Spring Cloud Streams 没有在消息中设置 kafka 键?

合并多个相同的 Kafka Streams 主题

Kafka Streams 是不是适合触发记录的批处理?