基于Springboot的RabbitMQ Demo消息推送
Posted Coder Hulk
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Springboot的RabbitMQ Demo消息推送相关的知识,希望对你有一定的参考价值。
- pom.xml添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- yaml链接配置
spring:
rabbitmq:
addresses: 10.211.55.10
username: admin
password: 123456
virtual-host: /
connection-timeout: 15000
publisher-confirms: true
publisher-returns: true
template.mandatory: true
#整合rabbitmq消费端配置
listener:
simple.concurrency: 5
simple.max-concurrency: 15
simple.acknowledge-mode: manual
simple.prefetch: 1
- java文件
Order
package com.tianjian.ims.rabbitmq.entity;
import java.io.Serializable;
/**
* 消息实体类
*
* @date 2022-11-24 11:38:36
*/
public class Order implements Serializable
private static final long serialVersionUID = 1L;
private long id;
private String content;
private long messageId; //存储消息发送的唯一标识
public Order()
public Order(long id, String content, long messageId)
this.id = id;
this.content = content;
this.messageId = messageId;
public long getId()
return id;
public void setId(Long id)
this.id = id;
public String getContent()
return content;
public void setContent(String content)
this.content = content;
public long getMessageId()
return messageId;
public void setMessageId(long messageId)
this.messageId = messageId;
@Override
public String toString()
return "Order" +
"id=" + id +
", content='" + content + '\\'' +
", messageId=" + messageId +
'';
RabbitMQSendController
package com.tianjian.ims.rabbitmq;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.tianjian.common.restful.ResponseResult;
import com.tianjian.common.restful.SuccResponseResult;
import com.tianjian.ims.rabbitmq.entity.Order;
/**
* 消息发送控制器
*
* @date 2022-11-24 11:28:36
*/
@RestController
@RequestMapping("/datame/rabbitmq")
public class RabbitMQSendController
@Resource
private RabbitMQSendService rabbitMQSendService;
@PostMapping("/v10/send")
public ResponseResult<String> sendMessage(@RequestBody Order order)
rabbitMQSendService.sendMessage(order);
return new SuccResponseResult<>("发送成功");
@PostMapping("/v10/send/count")
public ResponseResult<String> batchSendMessages(@PathVariable int count)
rabbitMQSendService.sendMessage(count);
return new SuccResponseResult<>(count+"条消息发送成功");
RabbitMQSendService
package com.tianjian.ims.rabbitmq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.tianjian.ims.rabbitmq.entity.Order;
/**
* 消息发送服务类
*
* @date 2022-11-24 10:41:19
*/
@Service
public class RabbitMQSendService
@Autowired
private OrderProducer orderProducer;
//发送消息
public void sendMessage(Order order)
Order orderNew = new Order();
orderNew.setId(1L);
orderNew.setContent("测试订单1");
orderNew.setMessageId(1l);
try
orderProducer.sendOrder(orderNew);
catch (Exception e)
e.printStackTrace();
//测试批量发送消息
public void sendMessage(int count)
for (int i = 0; i < count; i++)
Order orderNew = new Order();
orderNew.setId(i+0L);
orderNew.setContent("测试订单"+i);
orderNew.setMessageId(i+0L);
try
orderProducer.sendOrder(orderNew);
catch (Exception e)
e.printStackTrace();
OrderProducer
package com.tianjian.ims.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.tianjian.ims.rabbitmq.entity.Order;
/**
* 消息生产者 topic模式
*
* @date 2022-11-24 11:08:34
*/
@Component
public class OrderProducer
@Autowired
private RabbitTemplate rabbitTemplate;
//回调函数: confirm确认
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback()
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s)
System.out.println("CorrelationData: "+correlationData);
if (ack)
//如果confirm返回成功
System.out.println("发送成功");
else
//失败则进行具体的后续操作; 重试或者补偿等手段
System.out.println("异常处理....");
;
//发送消息
public void sendOrder(Order order) throws Exception
CorrelationData correlationData = new CorrelationData();
correlationData.setId(order.getMessageId()+"");
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.convertAndSend("order-exchange", //exchang 交换机
"order.demo", //routingKey 路由键
order, //消息体内容
correlationData); //correlationData 消息唯一ID
OrderCustomer
package com.tianjian.ims.rabbitmq;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import com.tianjian.ims.rabbitmq.entity.Order;
/**
* 消息消费者
*
* @date 2022-11-24 11:38:36
*/
@Component
public class OrderCustomer
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "order-queue",durable = "true"),
exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
key = "order.*"
))
@RabbitHandler
public void onOrderMessage(@Payload Order order,
@Headers Map<String,Object> headers,
Channel channel) throws Exception
//消费者操作————进行数据库写入、修改、同步等操作
System.out.println("---- 收到消息,开始消费 ----");
System.out.println("订单ID: "+ order.getId());
System.out.println("订单名称: "+ order.getContent());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手动确认消息,参数1:消息标签,参数2:是否确认多条消息
channel.basicAck(deliveryTag,false);
以上是关于基于Springboot的RabbitMQ Demo消息推送的主要内容,如果未能解决你的问题,请参考以下文章
Java RabbitMQ配置和使用,基于SpringBoot
自定义springboot组件--基于模板模式对原生springboot的rabbitmq组件进行扩展