RabbitMQ+Spring 结合使用
Posted HappyCowboy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ+Spring 结合使用相关的知识,希望对你有一定的参考价值。
1:创建一个Maven工程,pom.xml文件如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.liu</groupId> <artifactId>rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitmq</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!-- spring版本号 --> <spring.version>3.2.8.RELEASE</spring.version> <!-- log4j日志文件管理包版本 --> <slf4j.version>1.6.6</slf4j.version> <log4j.version>1.2.12</log4j.version> <!-- junit版本号 --> <junit.version>4.10</junit.version> </properties> <dependencies> <!-- 添加Spring依赖 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <!--单元测试依赖 --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> <scope>test</scope> </dependency> <!-- 日志文件管理包 --> <!-- log start --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <!-- log end --> <!--spring单元测试依赖 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <!--rabbitmq依赖 --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.1.3.RELEASE</version> </dependency> <dependency> <groupId>javax.validation</groupId> <artifactId>validation-api</artifactId> <version>1.1.0.Final</version> </dependency> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> <version>5.0.1.Final</version> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/resources</directory> <targetPath>${basedir}/target/classes</targetPath> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>true</filtering> </resource> <resource> <directory>src/main/resources</directory> <targetPath>${basedir}/target/resources</targetPath> <includes> <include>**/*.properties</include> <include>**/*.xml</include> </includes> <filtering>true</filtering> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-war-plugin</artifactId> <version>2.1.1</version> <configuration> <warSourceExcludes>${warExcludes}</warSourceExcludes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.4.3</version> <configuration> <testFailureIgnore>true</testFailureIgnore> </configuration> </plugin> <plugin> <inherited>true</inherited> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <executions> <execution> <id>attach-sources</id> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <configuration> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
2:配置文件如下:
log4j.properties:
log4j.rootLogger=debug,Console,Stdout
#Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
log4j.logger.java.sql.ResultSet=INFO
log4j.logger.org.apache=INFO
log4j.logger.java.sql.Connection=DEBUG
log4j.logger.java.sql.Statement=DEBUG
log4j.logger.java.sql.PreparedStatement=DEBUG
log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender
log4j.appender.Stdout.File = E://logs/log.log
log4j.appender.Stdout.Append = true
log4j.appender.Stdout.Threshold = DEBUG
log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
rabbitMQ.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans default-lazy-init="false" xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.1.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> <!--配置connection-factory,指定连接rabbit server参数 --> <rabbit:connection-factory id="connectionFactory" username="guest" password="guest" host="127.0.0.1" port="5672" /> <!--通过指定下面的admin信息,当前proceducer中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 标准的建立Queue的参数 --> <rabbit:queue-arguments id="amqpQueueArguments"> <!-- 暂时没有 --> </rabbit:queue-arguments> <rabbit:queue queue-arguments="amqpQueueArguments" id="amqpTemplateReplyQueue" name="test"/> <!--定义rabbit template用于数据的接收和发送 --> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-queue="amqpTemplateReplyQueue"> <rabbit:reply-listener concurrency="2"/> </rabbit:template> <!--定义queue --> <rabbit:queue name="queueTest" id="amqpTemplateRequestQueue" queue-arguments="amqpQueueArguments"/> <!-- 消息接收者 --> <bean id="messageReceiver" class="com.liu.rabbitmq.Consumer"></bean> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener queues="amqpTemplateRequestQueue" ref="messageReceiver"/> </rabbit:listener-container> </beans>
application-context.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd"> <import resource="classpath*:rabbitmq.xml" /> <!-- 扫描指定package下所有带有如@controller,@services,@resource,@ods并把所注释的注册为Spring Beans --> <context:component-scan base-package="com.liu.rabbitmq,com.liu.rabbitmq" /> <!-- 激活annotation功能 --> <context:annotation-config /> <!-- 激活annotation功能 --> <context:spring-configured /> </beans>
3:创建消费者和生产者:
Proceducer.java :
package com.liu.rabbitmq; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.stereotype.Service; /** * 功能概要:消息产生,提交到队列中去 * * @author liucc * @since 2016年12月1日 */ @Service public class Proceducer { private Logger logger = LoggerFactory.getLogger(Proceducer.class); @Resource private AmqpTemplate amqpTemplate; public void sendMessage(Object message){ //发送消息到消息队列服务器中,并得到回馈内容 Object object=amqpTemplate.convertSendAndReceive("queueTest",message,new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties props = message.getMessageProperties(); //把版本加入消息头中 props.setHeader("header", "1.0.0"); props.setExpiration(String.valueOf(30000)); logger.debug("设置RPC消息的TTL为{}", 30000); return message; } }); System.out.println(object); } }
Consumer.java:
package com.liu.rabbitmq; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import javax.annotation.Resource; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; /** * 功能概要:消费接收 * * @author liucc * @since 2016年12月1日 */ public class Consumer implements MessageListener { //private Logger logger = LoggerFactory.getLogger(Consumer.class); @Resource private AmqpTemplate amqpTemplate; @Override public void onMessage(Message message) { //logger.info("receive message:{}",message); try { //将字节流对象转换成Java对象 Person person=(Person) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject(); System.out.println("年龄:"+person.getAge()); } catch (Exception e) { e.printStackTrace(); } String replyTo = message.getMessageProperties().getReplyTo(); MessageConverter messageConverter=new SimpleMessageConverter(); MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().putAll(message.getMessageProperties().getHeaders()); String response=new String("收到返回消息"); //将Java对象转成Message对象,并作为返回的内容,回送给生产者 Message message2=messageConverter.toMessage(response, messageProperties); amqpTemplate.send(replyTo, message2); } }
4:测试
创建实体类:Person.java
package com.liu.rabbitmq; import java.io.Serializable; public class Person implements Serializable { private static final long serialVersionUID = 1L; private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Person(String name, int age) { super(); this.name = name; this.age = age; } }
RAbbitmqTest.java:
package com.liu.rabbitmq; import org.springframework.context.support.ClassPathXmlApplicationContext; public class RabbitmqTest { public static void main(String[] args) { ClassPathXmlApplicationContext context=new ClassPathXmlApplicationContext("application-context.xml"); Proceducer proceducer=(Proceducer) context.getBean("proceducer") ; Person person=new Person("liucc",22); System.out.println(person); proceducer.sendMessage(person); } }
注:本demo主要实现了rabbitMQ集合Spring框架的基本使用,生产者向消息队列服务器发送消息,消费者接收消息,并经过简单的处理,将消息返回给生产者,完成一个基本的通信过程。消息的传输对象可以是任何对象,在内部传输时都会别装换成RabbitMQ特有的对象Message(类似报文对象),消息的主体在Message的body部分。
以上是关于RabbitMQ+Spring 结合使用的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ:第一章:6 种工作模式以及消息确认机制(理论与代码相结合)
使用Spring Cache实现广告缓存并基于RabbitMQ实现双写一致
使用Spring Cache实现广告缓存并基于RabbitMQ实现双写一致
使用Spring Cache实现广告缓存并基于RabbitMQ实现双写一致
RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码