RabbitMQ整合spring

Posted woms

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ整合spring相关的知识,希望对你有一定的参考价值。

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <beans xmlns="http://www.springframework.org/schema/beans"
  3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  4     xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:jdbc="http://www.springframework.org/schema/jdbc"
  5     xmlns:jee="http://www.springframework.org/schema/jee" xmlns:aop="http://www.springframework.org/schema/aop"
  6     xmlns:tx="http://www.springframework.org/schema/tx" xmlns:task="http://www.springframework.org/schema/task"
  7     xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  8     xsi:schemaLocation="http://www.springframework.org/schema/beans
  9      http://www.springframework.org/schema/beans/spring-beans.xsd
 10       http://www.springframework.org/schema/context
 11        http://www.springframework.org/schema/context/spring-context-4.3.xsd
 12         http://www.springframework.org/schema/mvc
 13          http://www.springframework.org/schema/mvc/spring-mvc.xsd
 14             http://www.springframework.org/schema/tx
 15             http://www.springframework.org/schema/tx/spring-tx.xsd
 16             http://www.springframework.org/schema/aop
 17             http://www.springframework.org/schema/aop/spring-aop.xsd
 18             http://www.springframework.org/schema/task
 19         http://www.springframework.org/schema/task/spring-task.xsd
 20             http://www.springframework.org/schema/rabbit
 21             http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd">
 22 
 23     <description>rabbitmq 连接服务配置</description>
 24     <!-- 不适用【发布确认】连接配置 -->
 25     <rabbit:connection-factory id="rabbitConnectionFactory"
 26         host="172.18.112.102" username="woms" password="woms" port="5672"
 27         virtual-host="lingyi" channel-cache-size="25" cache-mode="CHANNEL" publisher-confirms="true" publisher-returns="true" connection-timeout="200"/>
 28 
 29 
 30 
 31  <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
 32         <property name="backOffPolicy">
 33             <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
 34                 <property name="initialInterval" value="200" />
 35                 <property name="maxInterval" value="30000" />
 36             </bean>
 37         </property>
 38         <property name="retryPolicy">
 39             <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
 40                 <property name="maxAttempts" value="5"/>
 41             </bean>
 42         </property>
 43     </bean>
 44 
 45 
 46 
 47     <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 如果使用多exchange必须配置declared-by="connectAdmin" -->
 48     <rabbit:admin id="connectAdmin" connection-factory="connectionFactory"/>
 49 
 50     <rabbit:template id="ampqTemplate" connection-factory="connectionFactory"
 51         exchange="test-mq-exchange" return-callback="sendReturnCallback"
 52         message-converter="jsonMessageConverter" routing-key="test_queue_key"
 53         mandatory="true" confirm-callback="confirmCallback" retry-template="retryTemplate"/>
 54 
 55 
 56     <bean id="confirmCallback" class="ly.net.rabbitmq.MsgSendConfirmCallBack" />
 57     <bean id="sendReturnCallback" class="ly.net.rabbitmq.MsgSendReturnCallback" />
 58     <!-- 消息对象json转换类 -->
 59     <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
 60 
 61 
 62 
 63 
 64 
 65 
 66 
 67 
 68 
 69 
 70 
 71 
 72 
 73 
 74     <!-- queue配置 -->
 75     <!-- durable:是否持久化 -->
 76     <!-- exclusive: 仅创建者可以使用的私有队列,断开后自动删除 -->
 77     <!-- auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
 78     <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" declared-by="rabbitAdmin" />
 79 
 80 
 81 
 82 
 83     <!-- exchange配置 -->
 84     <!-- rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。 -->
 85     <!-- rabbit:binding:设置消息queue匹配的key -->
 86     <rabbit:direct-exchange name="test-mq-exchange"
 87         durable="true" auto-delete="false" id="test-mq-exchange" declared-by="rabbitAdmin">
 88         <rabbit:bindings>
 89             <rabbit:binding queue="test_queue_key" key="test_queue_key" />
 90         </rabbit:bindings>
 91     </rabbit:direct-exchange>
 92 
 93     <!-- <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false"> -->
 94     <!-- <rabbit:bindings> -->
 95     <!-- 设置消息Queue匹配的pattern (direct模式为key) -->
 96     <!-- <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/> -->
 97     <!-- </rabbit:bindings> -->
 98     <!-- </rabbit:topic-exchange> -->
 99 
100 
101     <bean id="mqConsumer" class="ly.net.rabbitmq.MQConsumer" />
102     <bean id="mqConsumer1" class="ly.net.rabbitmq.MQConsumerManual" />
103 
104     <!-- listener配置 消费者 自动确认 -->
105     <!-- queues:监听的队列,多个的话用逗号(,)分隔 ref:监听器 -->
106     <rabbit:listener-container
107         connection-factory="connectionFactory" acknowledge="auto"
108         message-converter="jsonMessageConverter">
109         <rabbit:listener queues="test_queue_key" ref="mqConsumer" />
110     </rabbit:listener-container>
111     <!-- 消费者 手动确认 -->
112     <rabbit:listener-container
113         connection-factory="connectionFactory" acknowledge="manual">
114         <rabbit:listener queues="test_queue_key" ref="mqConsumer1" />
115     </rabbit:listener-container>
116 
117 
118 
119 
120 
121 
122 
123 </beans>

 

 1 package ly.net.rabbitmq;
 2 
 3 
 4 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 5 import org.springframework.amqp.rabbit.support.CorrelationData;
 6 public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {
 7 
 8     @Override
 9     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
10         // TODO Auto-generated method stub
11         if (ack) {  
12             System.out.println("消息确认成功");  
13         } else {  
14             //处理丢失的消息  
15             System.out.println("消息确认失败,"+cause);  
16         } 
17     }  
18     
19 } 
package ly.net.rabbitmq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Autowired;

public class MsgSendReturnCallback implements RabbitTemplate.ReturnCallback{
    @Autowired
    private RabbitTemplate errorTemplate;
    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        String msgJson  = new String(message.getBody());  
        System.out.println("Returned Message:"+msgJson); 
        
        //重新发布
//        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(errorTemplate,"errorExchange", "errorRoutingKey");
//        Throwable cause = new Exception(new Exception("route_fail_and_republish"));
//        recoverer.recover(message,cause);
//        System.out.println("Returned Message:"+replyText);
//        
    }

}
 1 package ly.net.rabbitmq;
 2 
 3 import org.springframework.amqp.core.Message;
 4 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
 5 
 6 import com.rabbitmq.client.Channel;
 7 
 8 public class MQConsumerManual implements ChannelAwareMessageListener {
 9 
10     @Override
11     public void onMessage(Message message, Channel channel) throws Exception {
12         // TODO Auto-generated method stub
13         //手动确认
14         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
15     }
16 
17 }
@Service
public class MQProducerImpl implements MQProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    private final static Logger LOGGER = Logger.getLogger(MQProducerImpl.class);
   /*
    * convertAndSend:将Java对象转换为消息发送到匹配Key的交换机中Exchange,由于配置了JSON转换,这里是将Java对象转换成JSON字符串的形式。
    * 原文:Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.
    */
    @Override
    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(object);
            
        } catch (Exception e) {
            LOGGER.error(e);
        }

    }
}
public interface MQProducer {
    /**
     * 发送消息到指定队列
     * @param queueKey
     * @param object
     */
    public void sendDataToQueue(String queueKey, Object object);
}

 

以上是关于RabbitMQ整合spring的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合多个RabbitMQ

022 spring与Rabbitmq整合

Spring和SpringBoot整合RabbitMQ

Spring和SpringBoot整合RabbitMQ

Spring Boot 整合 RabbitMQ

Spring Boot 2.X - Spring Boot整合AMQP之RabbitMQ