基于Springboot的RabbitMQ Demo消息推送

Posted Coder Hulk

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于Springboot的RabbitMQ Demo消息推送相关的知识,希望对你有一定的参考价值。

  1. pom.xml添加依赖
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. yaml链接配置
spring:
  rabbitmq:
    addresses: 10.211.55.10
    username: admin
    password: 123456
    virtual-host: /
    connection-timeout: 15000
    publisher-confirms: true
    publisher-returns: true
    template.mandatory: true
    
    #整合rabbitmq消费端配置
    listener:
      simple.concurrency: 5
      simple.max-concurrency: 15
      simple.acknowledge-mode: manual
      simple.prefetch: 1
  1. java文件

Order

package com.tianjian.ims.rabbitmq.entity;
import java.io.Serializable;
 
/**
 * 消息实体类
 *
 * @date 2022-11-24 11:38:36
 */
public class Order implements Serializable
	private static final long serialVersionUID = 1L;
	
	private long id;
    private String content;
    private long messageId;   //存储消息发送的唯一标识
 
    public Order() 
    
 
    public Order(long id, String content, long messageId) 
        this.id = id;
        this.content = content;
        this.messageId = messageId;
    
 
    public long getId() 
        return id;
    
 
    public void setId(Long id) 
        this.id = id;
    
 
    public String getContent() 
		return content;
	

	public void setContent(String content) 
		this.content = content;
	

	public long getMessageId() 
        return messageId;
    
 
    public void setMessageId(long messageId) 
        this.messageId = messageId;
    
 
 
    @Override
    public String toString() 
        return "Order" +
                "id=" + id +
                ", content='" + content + '\\'' +
                ", messageId=" + messageId +
                '';
    

RabbitMQSendController

package com.tianjian.ims.rabbitmq;

import javax.annotation.Resource;

import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.tianjian.common.restful.ResponseResult;
import com.tianjian.common.restful.SuccResponseResult;
import com.tianjian.ims.rabbitmq.entity.Order;
/**
 * 消息发送控制器
 *
 * @date 2022-11-24 11:28:36
 */
@RestController
@RequestMapping("/datame/rabbitmq")
public class RabbitMQSendController 
	@Resource
	private RabbitMQSendService rabbitMQSendService;
	
	@PostMapping("/v10/send")
	public ResponseResult<String> sendMessage(@RequestBody Order order) 
		rabbitMQSendService.sendMessage(order);
		return new SuccResponseResult<>("发送成功");
	

	@PostMapping("/v10/send/count")
	public ResponseResult<String> batchSendMessages(@PathVariable int count) 
		rabbitMQSendService.sendMessage(count);
		return new SuccResponseResult<>(count+"条消息发送成功");
	
	


RabbitMQSendService

package com.tianjian.ims.rabbitmq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.tianjian.ims.rabbitmq.entity.Order;

/**
 * 消息发送服务类
 * 
 * @date 2022-11-24 10:41:19
 */
@Service
public class RabbitMQSendService 
	@Autowired
	private OrderProducer orderProducer;
	
	//发送消息
	public void sendMessage(Order order) 
    	Order orderNew = new Order();
    	orderNew.setId(1L);
    	orderNew.setContent("测试订单1");
    	orderNew.setMessageId(1l);
    	try 
			orderProducer.sendOrder(orderNew);
    	 catch (Exception e) 
			e.printStackTrace();
    	
	

	//测试批量发送消息
	public void sendMessage(int count) 
		for (int i = 0; i < count; i++) 
			Order orderNew = new Order();
			orderNew.setId(i+0L);
			orderNew.setContent("测试订单"+i);
			orderNew.setMessageId(i+0L);
			try 
				orderProducer.sendOrder(orderNew);
			 catch (Exception e) 
				e.printStackTrace();
			
		
	

OrderProducer

package com.tianjian.ims.rabbitmq;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.tianjian.ims.rabbitmq.entity.Order;
/**
 * 消息生产者 topic模式
 *
 * @date 2022-11-24 11:08:34
 */
@Component
public class OrderProducer 
    @Autowired
    private RabbitTemplate rabbitTemplate;
 
    //回调函数: confirm确认
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() 
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) 
            System.out.println("CorrelationData: "+correlationData);
            if (ack)
                //如果confirm返回成功
                System.out.println("发送成功");
            else
                //失败则进行具体的后续操作; 重试或者补偿等手段
                System.out.println("异常处理....");
            
        
    ;
 
    //发送消息
    public void sendOrder(Order order) throws Exception
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(order.getMessageId()+"");
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.convertAndSend("order-exchange",   //exchang 交换机
                "order.demo",   //routingKey  路由键
                order,      //消息体内容
                correlationData);       //correlationData 消息唯一ID
 
    

OrderCustomer

package com.tianjian.ims.rabbitmq;
import java.util.Map;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;
import com.tianjian.ims.rabbitmq.entity.Order;
 
/**
 * 消息消费者
 *
 * @date 2022-11-24 11:38:36
 */
@Component
public class OrderCustomer 
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "order-queue",durable = "true"),
            exchange = @Exchange(name = "order-exchange", durable = "true", type = "topic"),
            key = "order.*"
    ))
    
    @RabbitHandler
    public void onOrderMessage(@Payload Order order,
                               @Headers Map<String,Object> headers,
                               Channel channel) throws Exception
        //消费者操作————进行数据库写入、修改、同步等操作
        System.out.println("---- 收到消息,开始消费 ----");
        System.out.println("订单ID: "+ order.getId());
        System.out.println("订单名称: "+ order.getContent());
 
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
       
        //手动确认消息,参数1:消息标签,参数2:是否确认多条消息
        channel.basicAck(deliveryTag,false);
    

以上是关于基于Springboot的RabbitMQ Demo消息推送的主要内容,如果未能解决你的问题,请参考以下文章

Java RabbitMQ配置和使用,基于SpringBoot

自定义springboot组件--基于模板模式对原生springboot的rabbitmq组件进行扩展

基于SpringBoot+RabbitMQ+Redis开发的秒杀系统(异步下单热点数据缓存解决超卖)

一文带你彻底搞懂SpringBoot-RabbitMQ

springboot整合消息队列——RabbitMQ

带着新人学springboot的应用05(springboot+RabbitMQ 上)