Spring Cloud Streams 没有在消息中设置 kafka 键?

Posted

技术标签:

【中文标题】Spring Cloud Streams 没有在消息中设置 kafka 键?【英文标题】:Spring Cloud Streams not setting the kafka key in the message? 【发布时间】:2017-03-25 00:14:54 【问题描述】:

故事是这样的。我有一个 kafka 代理和一个特定对象(我将其 jsonify 以通过我的主题发送),它有一个我想用作键的 ID。

目前我正在使用“partitionKeyExtractorClass”配置来设置提取ID并将其作为键返回的类。

看起来像这样:

def extractKey(Message<?> message) 
    log.info('Extracting key from message')
    String id = new JsonSlurper().parseText(new String(message.payload)).properties.id
    log.info("Got = $id")

    return id

我的实际问题是,当我浏览有关该主题的消息时,保存我的消息的 ConsumerRecord 说密钥为空...

这是一个错误吗?难道我做错了什么?这方面的文档并没有比这更进一步。

【问题讨论】:

【参考方案1】:

你看,你把partitionkey混在一起了。

目前KafkaMessageChannelBinder 不提供用于确定keyMessage 的选项。

只有您可以强大使用的现有功能是KafkaHeaders.MESSAGE_KEY

    Object messageKey = this.messageKeyExpression != null
            ? this.messageKeyExpression.getValue(this.evaluationContext, message)
            : message.getHeaders().get(KafkaHeaders.MESSAGE_KEY);

因此,在output 消息之前,您应该计算密钥并将其放入该标头中。

【讨论】:

这不是一个可接受的解决方案。我有同样的问题。我使用原始模式输出。我不希望我的消息中有任何标题,但需要设置密钥。我不想切换到 apache camel。 您的问题不清楚。这是当前 Kafka Binder 实现的一个限制,mesageKey 只是一个遗漏。不过,请随意填写问题。如果您非常关心的话,您可以配置 Kafka Binder 现在不映射该标头。而且我完全不明白 Apache Camel 是如何替代 Spring Cloud Stream 的…… 问题是我正在与 Spring Cloud Streams 之外的其他生产者和消费者集成流。消息密钥对于保证 kafka 主题的顺序很重要。拥有 Spring Cloud Streams 控制标头意味着所有其他生产者和消费者必须处理 Spring Cloud Streams 特定的标头。我需要 kafka 消息正是我想要的,而不是 Spring Cloud Streams 想要的。我需要一个简单的键和字符串消息。原始模式给了我这个,但没有维持秩序的关键。 和 Apache Camel 完全一样的能力;读取消息流,以某种方式处理它们,并从处理后的数据中路由/生成新消息。

以上是关于Spring Cloud Streams 没有在消息中设置 kafka 键?的主要内容,如果未能解决你的问题,请参考以下文章

带有Spring Cloud Stream的Kafka Streams进程中的Serd错误

Spring Cloud Stream Kafka Streams Binder KafkaException:无法启动流:“侦听器”不能为空

Spring Cloud Hystrix 仪表板不与 OAuth 一起使用

如何使用 terraform 创建 IBM Cloud Event Streams 触发器?

Spring Boot Reactive Streams

在Spring 5中调试Reactive Streams