是否可以使用 Kafka Streams 访问消息头?
Posted
技术标签:
【中文标题】是否可以使用 Kafka Streams 访问消息头?【英文标题】:Is it possible to access message headers with Kafka Streams? 【发布时间】:2018-03-25 23:27:56 【问题描述】:在 Kafka 0.11 的记录(ProducerRecord 和 ConsumerRecord)中添加了 Headers,在使用 Kafka Streams 处理主题时是否可以获得这些标头?当在KStream
上调用map
之类的方法时,它提供了key
和记录的value
的参数,但我看不到访问headers
。如果我们可以在ConsumerRecord
s 上使用map
,那就太好了。
例如
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
这样的事情会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) ->
record.headers();
record.key();
record.value();
)
...
【问题讨论】:
【参考方案1】:自 2.0.0 版起可访问记录标头(参见KIP-244 了解详细信息)。
您可以通过处理器 API(即通过transform()
、transformValues()
或process()
),通过给定的“上下文”对象(参见https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context)访问记录元数据。
更新
从 2.7.0 版本开始,处理器 API 得到了改进(参见 KIP-478),添加了一个新的类型安全 api.Processor
类,使用 process(Record)
代替 process(K, V)
方法。对于这种情况,可以通过Record
类访问标头(和记录元数据)。
此新功能在“DSL 的 PAPI 方法中尚不可用(例如,KStream#process()
、KStream#transform()
和兄弟姐妹)。
+++++
在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳——但不公开在那些旧版本中读取时实际上由 Streams 删除的标头。
但元数据在 DSL 级别不可用。但是,通过KIP-159 扩展 DSL 的工作也在进行中。
【讨论】:
澄清 Matthias 所说的:是的,Kafka Streams 中的处理器 API 允许您访问记录元数据,例如主题名称、分区号、偏移量等。Kafka Streams 中的 DSL 不提供访问权限.但是因为你可以将处理器 API 和 DSL 结合起来,你仍然可以编写一个基于 DSL 的流处理应用程序,通过使用 DSL 的transform()
或 transformValues()
函数来访问记录元数据,它允许你传入一个 Processor/来自处理器 API 的转换器。
感谢各位提供信息,我会留意何时将元数据添加到 DSL 级别,以便更新此答案。
@MatthiasJ.Sax 和@MichaelG.Noll:在cwiki.apache.org/confluence/display/KAFKA/… 中,对于RecordContext
提案,它似乎没有暴露标题。那是要添加的东西吗?
没有计划通过 KIP-159 扩展 RecordContext ——当我们添加标头支持时,它是 TDB 的样子,但我假设我们会添加新方法到 RecordContext 为此。如果您对详细信息感兴趣,Jira 是什么:)
@MatthiasJ.Sax 对我来说仍然不是 100% 清楚:这是否意味着通过 Streams 1.0.1 既无法通过 DSL 也无法通过处理器 API 访问消息的标头?我通过检查 ProcessorContext (kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/…) 来询问我无法找到当前处理的消息的标题。以上是关于是否可以使用 Kafka Streams 访问消息头?的主要内容,如果未能解决你的问题,请参考以下文章
Kafka 消费者 API 上的 Kafka Streams DSL