Rabbitmq消费者保证幂等性
Posted WontCode
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbitmq消费者保证幂等性相关的知识,希望对你有一定的参考价值。
如图:
示例代码:
pom文件添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml配置rabbitmq连接
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /guoVirtualHost
listener:
simple:
retry:
# 开启消费者(出现异常)会进行重试
enabled: true
# 默认重试无限次,所以需指定重试次数
max-attempts: 5
# 重试间隔时间
initial-interval: 3000
# 保证消费者会消费消息,手动确认
acknowledge-mode: manual
#定义交换机、路由、队列名称
guoguo:
order:
exchange: guoguo_order_exchange
routingKey: guoguo.order
queue: guoguo_order_queue
定义RabbitmqConfig配置类,Exchange,Routingkey,Queue
@Component
public class OrderConfig
//交换机名称
@Value("$guoguo.order.exchange")
private String orderExchange;
//路由key
@Value("$guoguo.order.routingKey")
private String orderRoutingKey;
//队列名称
@Value("$guoguo.order.queue")
private String orderQueue;
@Bean
public DirectExchange orderExchange()
return new DirectExchange(orderExchange);
@Bean
public Queue orderQueue()
return new Queue(orderQueue,true,false,false);
@Bean
public Binding orderBinding(Queue orderQueue,DirectExchange orderExchange)
//方式1:Queue orderQueue,DirectExchange orderExchange 会去ioc容器中找到Bean
return BindingBuilder.bind(orderQueue).to(orderExchange).with(orderRoutingKey);
//方式2:直接调用方法
//return BindingBuilder.bind(orderQueue()).to( orderExchange()).with(orderRoutingKey);
定义发送消息工具类:
@Component
@Slf4j
public class SendMessageUtlis
@Autowired
RabbitTemplate amqpTemplate;
@Value("$guoguo.order.exchange")
private String orderExchange;
@Value("$guoguo.order.routingKey")
private String orderRoutingKey;
public void sendMessage(Order order)
amqpTemplate.convertAndSend(orderExchange,orderRoutingKey,order,message ->
return message;
);
发送消息:
@RequestMapping("/addOrder")
public String addOrder()
String quanjuId = System.currentTimeMillis()+ "";
//生产者投递消息时携带全局id发送到mq服务器端
Order order = new Order(1, "mq演示", 66, "男",quanjuId);
log.info("全局id:"+quanjuId);
sendMessage.sendMessage(order );
//返回全局id给客户端
return quanjuId;
消费者监听消息:
@Autowired
OrderDao orderDao;
@RabbitListener(queues = "guoguo_order_queue")
public void process(Order order, Message message,Channel channel) throws IOException
if (StringUtils.isEmpty(order.getQuanjuId()))
return;
//拿到全局id
Order orders = orderDao.getQuanjuId(order.getQuanjuId());
if (order!= null)
log.info("已经被消费");
//如果不为null,则已经被消费过,也需要告诉mq服务器端,将该消息删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
return;
int result = orderDao.saveOrder(order);
log.info("插入成功");
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
以上是关于Rabbitmq消费者保证幂等性的主要内容,如果未能解决你的问题,请参考以下文章
如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?
RabbitMQ消息中间件技术精讲10 高级篇三 幂等性保障不重复消费