java+rabbbitmq
Posted timelessmemoryli
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java+rabbbitmq相关的知识,希望对你有一定的参考价值。
首先安装rabbitmq,安装之后运行rabbitmq-plugins.bat enable rabbitmq_management(开启该插件)和
net stop RabbitMQ && net start RabbitMQ;
项目中引入amqp-client包;编写简单示例
ConnectionUtil.java
public class ConnectionUtil
private static final String RABBIT_HOST = "localhost";
private static final int RABBIT_PORT = 5672;
private static final String RABBIT_USERNAME = "guest";
private static final String RABBIT_PASSWORD = "guest";
private static Connection connection = null;
public static Connection getConnection()
if (connection == null)
try
ConnectionFactory connectionFactory = getConnectionFactory();
connection = connectionFactory.newConnection();
catch (Exception e)
throw new RuntimeException("获取connection连接失败");
return connection;
private static ConnectionFactory getConnectionFactory()
ConnectionFactory connectionFactory = new ConnectionFactory();
// 配置连接信息
connectionFactory.setHost(RABBIT_HOST);
connectionFactory.setPort(RABBIT_PORT);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername(RABBIT_USERNAME);
connectionFactory.setPassword(RABBIT_PASSWORD);
// 网络异常自动连接恢复
connectionFactory.setAutomaticRecoveryEnabled(true);
// 每10秒尝试重试连接一次
connectionFactory.setNetworkRecoveryInterval(10000);
return connectionFactory;
生产者Producer.java
public class Producer
private static final String QUEUE_NAME = "wav_queue";
public static void main(String[] args) throws Exception
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel(1);
/*
* 参数1:队列名称
* 参数2:为true时server重启队列不会消失
* 参数3:队列是否是独占的,如果为true只能被一个connection使用,其他连接建立时会抛出异常
* 参数4:队列不再使用时是否自动删除(没有连接,并且没有未处理的消息)
* 参数5:建立队列时的其他参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!";
for (int i = 1; i < 21; i++)
message = message + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("生产者 send :" + message);
Thread.sleep(1000);
channel.close();
connection.close();
消费者Consumer.java
public class Consumer
private static final String QUEUE_NAME = "wav_queue";
public static void main(String[] args) throws IOException
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel(1);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel)
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
try
Thread.sleep(2000);
catch (InterruptedException e)
e.printStackTrace();
channel.basicAck(envelope.getDeliveryTag(), false);
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("消费者 consumer " + new String(body, "UTF-8"));
;
//监听队列,当b为true时,为自动提交(只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费),
// 当b为false时,为手动提交(消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
// 如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
//如果选用自动确认,在消费者拿走消息执行过程中出现宕机时,消息可能就会丢失!!)
//使用channel.basicAck(envelope.getDeliveryTag(),false);进行消息确认
channel.basicConsume(QUEUE_NAME, true, consumer);
spring整合rabbitmq,添加jar包
amqp-client、spring-rabbit、spring-amqp、spring-retry;
配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"
default-autowire="byName" default-lazy-init="false">
<context:property-placeholder location="classpath*:/config/database.properties" ignore-unresolvable="true"/>
<context:component-scan base-package="com.xxi" />
<!-- 连接配置 -->
<rabbit:connection-factory id="rabbit-connectionFactory" host="$mq.host" username="$mq.username" password="$mq.password" port="$mq.port" virtual-host="$mq.vhost"/>
<rabbit:admin connection-factory="rabbit-connectionFactory"/>
<!-- 声明一个消息队列(
durable:是否持久化
exclusive: 仅创建者可以使用的私有队列,断开后自动删除
auto_delete: 当所有消费客户端连接断开后,是否自动删除队列) -->
<rabbit:queue id="wav_queue" name="wav_queue" durable="true" auto-delete="false" exclusive="false" />
<!-- 定义交换机
rabbit:direct-exchange:定义exchange模式为direct,意思就是消息与一个特定的路由键完全匹配,才会转发。
rabbit:binding:设置消息queue匹配的key
-->
<rabbit:direct-exchange name="wav_queue.exchange" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="wav_queue" key="wav_queue.key" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 消息对象json转换类 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
<!-- spring template声明,注入到类中,用于将消息发送到指定队列-->
<rabbit:template exchange="wav_queue.exchange" id="amqpTemplate" connection-factory="rabbit-connectionFactory" message-converter="jsonMessageConverter" />
<!-- 消费者配置 -->
<!-- 队列监听类 -->
<bean id="wavqueueListener" class="com.biu.xx.xx.xxx.WavQueueListener"/>
<!-- 监听容器配置 -->
<rabbit:listener-container connection-factory="rabbit-connectionFactory" acknowledge="manual">
<rabbit:listener queues="wav_queue" ref="wavqueueListener" method="onMessage"/>
</rabbit:listener-container>
</beans>
WavQueueListener.java
public class WavQueueListener implements ChannelAwareMessageListener
private static Log logger = LogFactory.getLog(WavQueueListener.class);
@Override
public void onMessage(Message message, Channel channel) throws Exception
try
String ackMessage = new String(message.getBody(), "utf-8");
logger.debug("接收到:" + ackMessage);
catch (Exception e)
logger.error(e.getMessage());
MQProducer.java
@Component
public class MQProducer
@Autowired
private AmqpTemplate amqpTemplate;
public void sendDataToQueue(String queueKey, Object object)
if (queueKey == null)
queueKey = "wav_queue.key";
System.out.println("--"+amqpTemplate);
try
amqpTemplate.convertAndSend(queueKey, object);
System.out.println("------消息发送成功------");
catch (Exception e)
System.out.println(e);
以上是关于java+rabbbitmq的主要内容,如果未能解决你的问题,请参考以下文章
SpringBoot RabbitMq解析框中的json消息自动
如果有的话,在 java 中声明给定类的空对象字段时会发生哪些特定于类的操作?
java中,当实例化子类时会递归调用父类中的构造方法。这个说法对么?为啥