springCloud使用stream配置rabbitMq实现延时消息
Posted 好大的月亮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springCloud使用stream配置rabbitMq实现延时消息相关的知识,希望对你有一定的参考价值。
先安装rabbitMq延时插件
参考我另一篇文章
https://blog.csdn.net/weixin_43944305/article/details/120828003
上依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
消息通道
package com.fchan.springcloudstream.service;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface MyMessageChannel
String out = "out";
String in = "in";
@Output(out)
MessageChannel out();
@Input(in)
SubscribableChannel in();
// 发送延迟消息
@PostMapping("/delayed")
public String sendDelayedMessage(@RequestParam("body") String body,
@RequestParam("seconds") Integer seconds)
Map<String,Object> message = new HashMap<>();
message.put("body", body);
myMessageChannel.out().send(
MessageBuilder.withPayload(message)
.setHeader("x-delay", seconds * 1000)
.build()
);
log.info("发送延迟消息成功");
return "SUCCESS";
延时消息接收
package com.fchan.springcloudstream.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
@EnableBinding(MyMessageChannel.class)
public class MyConsumer
Logger log = LoggerFactory.getLogger(MyConsumer.class);
@StreamListener(MyMessageChannel.in)
public void input(Message<Map<String,Object>> message)
log.info("收到消息:", message.getPayload());
yml配置
spring:
rabbitmq:
host: 110.40.181.73
port: 35672
username: root
password: 10086
virtual-host: /fchan
cloud:
stream:
rabbit:
bindings:
# 消费者开启延时队列支持
in:
consumer:
delayed-exchange: true
# 生产者开启延时队列支持
out:
producer:
delayed-exchange: true
bindings:
in:
# 指定消息所属exchange
destination: test
# 指定消费者分组,在多实例的时候必需指定,防止重复消费
group: myIn
out:
destination: test
在启动项目后登陆rabbitmq管理页面可以看到exchange创建成功
localhost:8080/delayed?body=1231&seconds=5
测试成功
以上是关于springCloud使用stream配置rabbitMq实现延时消息的主要内容,如果未能解决你的问题,请参考以下文章