Spring Cloud Stream Kafka 是不是支持嵌入式标头?
Posted
技术标签:
【中文标题】Spring Cloud Stream Kafka 是不是支持嵌入式标头?【英文标题】:Does Spring Cloud Stream Kafka supports embedded headers?Spring Cloud Stream Kafka 是否支持嵌入式标头? 【发布时间】:2017-11-18 01:14:55 【问题描述】:根据这个话题:Kafka Spring Integration: Headers not coming for kafka consumer - 这不是对 Kafka 的标头支持
但是documentation 说:
spring.cloud.stream.kafka.binder.headers 将由 binder 传输的自定义 headers 列表。
默认值:空。
我无法让它与 spring-cloud-stream-binder-kafka 一起工作:1.2.0.RELEASE
发送日志:
MESSAGE (e23885fd-ffd9-42dc-ebe3-5a78467fee1f) SENT :
GenericMessage [payload=...,
headers=
content-type=application/json,
correlationId=51dd90b1-76e6-4b8d-b667-da25f214f383,
id=e23885fd-ffd9-42dc-ebe3-5a78467fee1f,
contentType=application/json,
timestamp=1497535771673
]
接收日志:
MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 1:
GenericMessage [payload=...,
headers=
kafka_offset=36,
id=448175f5-2b21-9a44-26b9-85f093b33f6b,
kafka_receivedPartitionId=0,
contentType=application/json;charset=UTF-8,
kafka_receivedTopic=new_patient, timestamp=1497535771715
]
MESSAGE (448175f5-2b21-9a44-26b9-85f093b33f6b) RECEIVED BY HANDLER 2 :
GenericMessage [payload=...,
headers=
kafka_offset=36,
id=448175f5-2b21-9a44-26b9-85f093b33f6b,
kafka_receivedPartitionId=0,
contentType=application/json;charset=UTF-8,
kafka_receivedTopic=new_patient, timestamp=1497535771715
]
我希望看到相同的消息 id 并在接收方获得 correlationId。
application.properties:
spring.cloud.stream.kafka.binder.headers=correlationId
spring.cloud.stream.bindings.newTest.destination=new_test
spring.cloud.stream.bindings.newTestCreated.destination=new_test
spring.cloud.stream.default.consumer.headerMode=embeddedHeaders
spring.cloud.stream.default.producer.headerMode=embeddedHeaders
发送消息:
@Publisher(channel = "testChannel")
public Object newTest(Object param)
...
return myObject;
【问题讨论】:
你能举个例子说明你想做什么吗? @MariusBogoevici 请看帖子更新... 【参考方案1】:是的,确实如此:http://docs.spring.io/spring-cloud-stream/docs/Chelsea.SR2/reference/htmlsingle/index.html#_consumer_properties
标题模式
当设置为 raw 时,禁用对输入的标头解析。仅对本机不支持消息标头且需要标头嵌入的消息传递中间件有效。当入站数据来自 Spring Cloud Stream 应用程序之外时很有用。
默认:embeddedHeaders
但这已经是 Spring Cloud Stream 的故事,而不是 Spring Kafka 本身。
【讨论】:
否,但默认情况下仅传输标头子集 - 请参阅 github.com/spring-cloud/spring-cloud-stream/blob/master/…。如果您想传输自己的标头,可以将它们的名称添加到 Artem 提供的属性中。 感谢您的回复!但这实际上是问题所在 - 它应该根据文档工作,但事实并非如此。 Pease,看更新,我已经添加了日志和配置。 查看我对 the other question you commented on 的更新 - 我使用 Dalston.SR1 (1.2.1) 测试了该示例,它工作得很好。 更新的项目在我的sandbox repo。 消息 id 应被视为内部属性(消息可能会在 Spring Integration、Spring Kafka、Spring Cloud Stream 之间进行内部转换)。理想情况下,您应该使用自己的自定义标头来传达消息身份。以上是关于Spring Cloud Stream Kafka 是不是支持嵌入式标头?的主要内容,如果未能解决你的问题,请参考以下文章
Spring cloud kafka stream pitfalls
Spring cloud kafka stream pitfalls
spring-cloud-stream kafka 消费者并发
spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息
多个 @EnableBinding 与 Kafka Spring Cloud Stream
spring.cloud.stream.kafka.bindings.<channelName>.producer.configuration 未应用