如何在使用消息时访问 Kafka 标头?

Posted

技术标签:

【中文标题】如何在使用消息时访问 Kafka 标头?【英文标题】:How to access Kafka headers while consuming a message? 【发布时间】:2015-10-06 07:24:53 【问题描述】:

下面是我的配置

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
            kafka-consumer-context-ref="consumerContext"
            auto-startup="true"
            channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>

inputFromKafka下面经过改造

public Message<?> transform(final Message<?> message) 

System.out.println( "KAFKA Message Headers " + message.getHeaders());

final Map<String, Map<Integer, List<Object>>> origData =  (Map<String, Map<Integer, List<Object>>>) message.getPayload();

        // some code to figure-out the nonPartitionedData
        return MessageBuilder.withPayload(nonPartitionedData).build();
    

上面的打印语句只打印两个一致的标题

KAFKA Message Headers id=9c8f09e6-4b28-5aa1-c74c-ebfa53c01ae4, timestamp=1437066957272

在发送 Kafka 消息时,传递了一些标头,包括 KafkaHeaders.MESSAGE_KEY,但我也没有回复,想知道​​是否有办法完成此操作?

【问题讨论】:

【参考方案1】:

不幸的是,它不能那样工作......

Producer 部分 (KafkaProducerMessageHandler) 如下所示:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());

如您所见,我们不会向 Kafka topic 发送任何 messageHeaders。仅payload 并且恰好在 Kafka 协议指定的 messageKey 之下。

从另一端Consumer 端 (KafkaHighLevelConsumerMessageSource) 执行此逻辑:

if (!payloadMap.containsKey(messageAndMetadata.partition())) 
    final List<Object> payload = new ArrayList<Object>();
    payload.add(messageAndMetadata.message());
    payloadMap.put(messageAndMetadata.partition(), payload);

如您所见,我们在这里并不关心messageKey

KafkaMessageDrivenChannelAdapter (&lt;int-kafka:message-driven-channel-adapter&gt;) 适合您!它在将消息发送到频道之前执行此操作:

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(generateMessageId, generateTimestamp);

Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, key);
rawHeaders.put(KafkaHeaders.TOPIC, metadata.getPartition().getTopic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, metadata.getPartition().getId());
rawHeaders.put(KafkaHeaders.OFFSET, metadata.getOffset());
rawHeaders.put(KafkaHeaders.NEXT_OFFSET, metadata.getNextOffset());

if (!this.autoCommitOffset) 
    rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);

【讨论】:

...some headers were passed including... Kafka 没有标头的概念;如果你想传输自定义信息,它必须在有效载荷中。 它究竟在哪里设置这些标题?我在这里找不到:github.com/spring-projects/spring-integration-kafka/blob/master/… 因为那里也改变了匹配。请先认识 Spring Kafka 2.1:docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html【参考方案2】:

如前所述,Kafka 中没有消息头的概念。因为我过去曾遇到过同样的问题,所以我编写了一个small library 来帮助解决这个问题。它可能会派上用场。

【讨论】:

以上是关于如何在使用消息时访问 Kafka 标头?的主要内容,如果未能解决你的问题,请参考以下文章

如何在kubernetes上构建kafka集群后公开kafka以进行外部访问?

如何在 Kafka Connect S3 中解析记录头?

Key为nulll时Kafka如何选择分区(Partition)

如何在 JmsTemplate 中发送标头消息?

如何在 Spring Boot 应用程序中向 STOMP CREATED 消息添加自定义标头?

Kafka实战:如何把Kafka消息时延秒降10倍(上)