Spring Cloud Stream Kafka 是不是支持嵌入式标头?

Posted

技术标签:

【中文标题】Spring Cloud Stream Kafka 是不是支持嵌入式标头?【英文标题】:Does Spring Cloud Stream Kafka supports embedded headers?Spring Cloud Stream Kafka 是否支持嵌入式标头? 【发布时间】:2017-11-18 01:14:55 【问题描述】:

根据这个话题:Kafka Spring Integration: Headers not coming for kafka consumer - 这不是对 Kafka 的标头支持

但是documentation 说:

spring.cloud.stream.kafka.binder.headers 将由 binder 传输的自定义 headers 列表。

默认值:空。

无法让它与 spring-cloud-stream-binder-kafka 一起工作:1.2.0.RELEASE

发送日志:

MESSAGE (e23885fd-ffd9-42dc-ebe3-5a78467fee1f) SENT : 
GenericMessage [payload=..., 
headers=
   content-type=application/json, 
   correlationId=51dd90b1-76e6-4b8d-b667-da25f214f383, 
   id=e23885fd-ffd9-42dc-ebe3-5a78467fee1f, 
   contentType=application/json, 
   timestamp=1497535771673
]

接收日志:

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 1: 
GenericMessage [payload=..., 
headers=
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
]

MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 2 :
GenericMessage [payload=..., 
headers=
    kafka_offset=36, 
    id=448175f5-2b21-9a44-26b9-85f093b33f6b, 
    kafka_receivedPartitionId=0, 
    contentType=application/json;charset=UTF-8, 
    kafka_receivedTopic=new_patient, timestamp=1497535771715
]

我希望看到相同的消息 id 并在接收方获得 correlationId

application.properties:

spring.cloud.stream.kafka.binder.headers=correlationId
spring.cloud.stream.bindings.newTest.destination=new_test
spring.cloud.stream.bindings.newTestCreated.destination=new_test
spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
spring.cloud.stream.default.producer.headerMode=embeddedHeaders

发送消息:

@Publisher(channel = "testChannel")
public Object newTest(Object param) 
    ...
    return myObject;

【问题讨论】:

你能举个例子说明你想做什么吗? @MariusBogoevici 请看帖子更新... 【参考方案1】:

是的,确实如此:http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_consumer_properties

标题模式

当设置为 raw 时,禁用对输入的标头解析。仅对本机不支持消息标头且需要标头嵌入的消息传递中间件有效。当入站数据来自 Spring Cloud Stream 应用程序之外时很有用。

默认:embeddedHeaders

但这已经是 Spring Cloud Stream 的故事,而不是 Spring Kafka 本身。

【讨论】:

否,但默认情况下仅传输标头子集 - 请参阅 github.com/spring-cloud/spring-cloud-stream/blob/master/…。如果您想传输自己的标头,可以将它们的名称添加到 Artem 提供的属性中。 感谢您的回复!但这实际上是问题所在 - 它应该根据文档工作,但事实并非如此。 Pease,看更新,我已经添加了日志和配置。 查看我对 the other question you commented on 的更新 - 我使用 Dalston.SR1 (1.2.1) 测试了该示例,它工作得很好。 更新的项目在我的sandbox repo。 消息 id 应被视为内部属性(消息可能会在 Spring Integration、Spring Kafka、Spring Cloud Stream 之间进行内部转换)。理想情况下,您应该使用自己的自定义标头来传达消息身份。

以上是关于Spring Cloud Stream Kafka 是不是支持嵌入式标头?的主要内容,如果未能解决你的问题,请参考以下文章

Spring cloud kafka stream pitfalls

Spring cloud kafka stream pitfalls

spring-cloud-stream kafka 消费者并发

spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息

多个 @EnableBinding 与 Kafka Spring Cloud Stream

spring.cloud.stream.kafka.bindings.<channelName>.producer.configuration 未应用