初识rabbitMQ

Posted hxz-nl

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了初识rabbitMQ相关的知识,希望对你有一定的参考价值。

19/5/29  对于rabbitMQ ,我已经研究了几天。 之前完全的没有接触过,所以有很多的概念,很多的坑要踩

首先是安装 rabbitmq 这个就不记录了。

1、引入 Maven

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.3.RELEASE</version>
</dependency>



2、配置 ,写配置文件
<!--步骤1、配置链接工厂-->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="${mq.address}"/>
<property name="port" value="${mq.port}"/>
<property name="password" value="${mq.pwd}"/>
<property name="username" value="${mq.user}"/>
<property name="publisherConfirms" value="true"/>
<property name="publisherReturns" value="true"/>
<property name="virtualHost" value="${mq.vhost}"/>
<property name="requestedHeartBeat" value="50"/>
</bean>
<!--步骤2、创建rabbitTemplate 消息模板-->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<!--构造方法需要链接信息-->
<constructor-arg ref="connectionFactory"/>
<!--配置交换机-->
<property name="exchange" value="${mq.exchange}"/>
<!--配置路由键-->
<property name="routingKey" value="${mq.routingKey}"/>
<!--配置队列-->
<property name="queue" value="${mq.queue}"/>
<!--配置消息转换-->
<property name="messageConverter" ref="serializerMessageConverter"/>
<property name="confirmCallback" ref="rabbitTemplateConfig" />
<property name="returnCallback" ref="rabbitTemplateConfig" />
<property name="mandatory" value="true" />
</bean>
<bean id="rabbitTemplateConfig" class="mq.RabbitTemplateConfig"/>
<!--注入消息转换器-->
<bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"/>
<!--引入元素文件-->
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="properties">
<bean class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations">
<list>
<value>classpath:conf/value.properties</value>
</list>
</property>
<property name="fileEncoding" value="UTF-8"/>
</bean>
</property>
</bean>
<!--申明消费者-->
<bean id="rmqConsumer" class="mq.RmqConsumer" />
<bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="rmqConsumer" />
<property name="defaultListenerMethod" value="rmqConsumeMessage"/>
<property name="messageConverter" ref="serializerMessageConverter"/>
</bean>
<!--注册监听-->
<bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/>
<property name="messageListener" ref="messageListenerAdapter"/>
<property name="acknowledgeMode" value="MANUAL"/>
</bean>

这个是我关于rabbitMQ 所用的配置,下面记录一下具体的作用。
(1、)配置链接
  通过配置链接工厂从而链接到rabbitMQ
<!--步骤1、配置链接工厂-->
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="host" value="${mq.address}"/>//链接的地址 127.0.0.1
<property name="port" value="${mq.port}"/>//端口号 5627
<property name="password" value="${mq.pwd}"/> //密码
<property name="username" value="${mq.user}"/> //用户名
<property name="publisherConfirms" value="true"/> //是否开启提交到交换机的回调
<property name="publisherReturns" value="true"/> //是否开启发送到队列的错误回调
<property name="virtualHost" value="${mq.vhost}"/>// 虚拟机
<property name="requestedHeartBeat" value="50"/>//心跳时间(这个可删除,我不知道有什么用,以后有领悟再记录)
</bean>

属性文件中的内容
mq.address=127.0.0.1
mq.exchange=ceshi
mq.routingKey=ceshiRouting
mq.queue=ceshiQueues
mq.port=5672
mq.user=***
mq.pwd=t**an****
mq.timeout=5000
mq.vhost=testMQ

关于开启 ConfirmReturn 的回调 还需要在模板 rabbitTemplate 中进行设置

<!--步骤2、创建rabbitTemplate 消息模板-->
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<!--构造方法需要链接信息-->
<constructor-arg ref="connectionFactory"/>
<!--配置交换机-->
<property name="exchange" value="${mq.exchange}"/>
<!--配置路由键-->
<property name="routingKey" value="${mq.routingKey}"/>
<!--配置队列-->
<property name="queue" value="${mq.queue}"/>
<!--配置消息转换-->
<property name="messageConverter" ref="serializerMessageConverter"/>
<property name="confirmCallback" ref="rabbitTemplateConfig" />
<property name="returnCallback" ref="rabbitTemplateConfig" />
<property name="mandatory" value="true" />
</bean>
注册模板类的bean 类 org.springframework.amqp.rabbit.core.RabbitTemplate  
在其构造方法中传入链接工厂的引用, 如上 代码  重点看 下面这几行配置 
    <property name="confirmCallback" ref="rabbitTemplateConfig" />
<property name="returnCallback" ref="rabbitTemplateConfig" />
<property name="mandatory" value="true" />
这个就是上面提到的 回调,<property name="mandatory" value="true" />  这个是一定要的 ,删除了会导致 returnCallback 不起效 ,下面贴上实现类代码 

package mq;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;

/**
* @author tia
* @date 2019/5/2910:45
*/
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

/**
* 是否成功发送到交换器
* 成功、失败都会回调
* @param correlationData
* @param b
* @param s
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息唯一标识:"+correlationData);
System.out.println("确认结果:"+b);
System.out.println("失败原因:"+s);
}

/**
* 是否成功发送到队列(需要设置mandatory 为true)
* 失败回调
* @param message
* @param i
* @param s
* @param s1
* @param s2
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息主体:"+message);
System.out.println("消息主体:"+i);
System.out.println("描述:"+s);
System.out.println("交换器:"+s1);
System.out.println("路由键:"+s2);
}
}
偷了个懒,把两个回调放在了一起 implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback  这两个实现是一定要的。
这两个方法的作用,就是对消息进行重新发送,或是记录没有发送出去的消息,等等,看个人安排了。

在我的配置中是没有关于 队列的创建,交换器的创建,虚拟机的创建、绑定等的内容, 这些都在RabbitMQ 的后台完成了 图个简单。

到这里,就可以向mq发送消息了。我写的一个例子:
package mq;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import po.Message;

import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
import java.util.Date;

@RestController
@RequestMapping(value = "/mq",produces = "text/html;charset=UTF-8")
public class RmqProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(RmqProducer.class);
@Resource
private RabbitTemplate rabbitTemplate;

/**
*
发送信息
*/
public void sendMessage(String queueKey,Object msg) {
try {
// 发送信息
rabbitTemplate.convertAndSend(queueKey,"1");
} catch (Exception e) {
LOGGER.error("rmq消费者任务处理出现异常", e);
}
}
@RequestMapping("/sendMessage")
public void sendActiveCount(String activeMap) throws UnsupportedEncodingException {
Message message=new Message();
message.setFrom(1234566l);
message.setTo(754964641l);
message.setText("你妹妹好漂亮");
message.setDate(new Date());
message.setFromName("你妹妹");
String s = JSON.toJSONString(message);
for (int i = 0; i <100 ; i++) {
rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i));
}

}
}
主要的内容就是这个方法 rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i)); 哪儿都能发送。

再看看 消费者 怎么弄,可是花了我大量的时间 去弄这个。

<!--申明消费者-->
<bean id="rmqConsumer" class="mq.RmqConsumer" />
<bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="rmqConsumer" />
<property name="defaultListenerMethod" value="rmqConsumeMessage"/>
<property name="messageConverter" ref="serializerMessageConverter"/>
</bean>
<!--注册监听-->
<bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/>
<property name="messageListener" ref="messageListenerAdapter"/>
<property name="acknowledgeMode" value="MANUAL"/>
</bean>
这个监听是一定要有的,或许你可以使用注解来干掉他。
看到这个了吗? <property name="defaultListenerMethod" value="rmqConsumeMessage"/> 这个东西就是说默认去执行你 <constructor-arg ref="rmqConsumer" /> 这个类的 这个 方法的。不过也有其他的弊端就是 通道的问题
还有就是 如果实现了 implements ChannelAwareMessageListener 就不起效了。
看代码:
package mq;


import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;;


public class RmqConsumer implements ChannelAwareMessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(RmqConsumer.class);
int i=0;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
Object ddd=null;
JSONObject jsonObject=JSONObject.parseObject(new String(message.getBody(),"utf-8"));
po.Message message1 = JSON.toJavaObject(jsonObject, po.Message.class);
System.out.println(message1.toString());
if(i++%10==0)
System.out.println(ddd.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody());
System.out.println(e.getMessage());
}
}
}

这个里面没有配置里提到的方法,他被我吃了。因为他不生效了。
再说这个通道的问题 channel ,我这儿 消费者方法是不能抛出错误的,会停掉,所以只能处理, <property name="acknowledgeMode" value="MANUAL"/> 者个配置是在配置是否手动确认的。
MANUAL 手动确认 AUTO 自动确认(默认值) 如果开启自动确认,那么 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 
将会 报错,也就是说,他不需要手动确认的代码存在。它会默认所有的方法都进行 成功确认,这个真的很无奈。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 成功确认 他有两个参数 ,消息Tag 与是否批量确认。如果true 则批量确认 tag值小于该值的所有信息将被成功确认。
message.getMessageProperties().getDeliveryTag() 消息的Tag
如果你开启了手动确认,但并没有确认,那么你的消息就会处于未确认状态,就像这样 Unacked 100 ,Total 100, 发送100条消息,都没有确认。那rabbitMQ不会把它删除,一直堆积在内存中,后果,就看你怎么处理了.....
技术图片
 
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); 失败确认 这个方法有三个参数。 消息Tag 、是否批量确认、和 是否重新回到队列。前两个参数跟 成功确认相同, 最后一个如果为true 将重新回到队列顶端
注意 是队列顶端,下一次消费者就会调用返回队列的消息。如果这条消息有错误,那就意味着,程序会一直进行 失败确认 返回队列 ,死循环 。
所以 看这个 channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody()); 发送消息,它会把消息发送到队列的末尾,这样最后执行,就可以避免不消费其他正确的消息了。
 





以上是关于初识rabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

初识RabbitMQ

RabbitMQ 初识RabbitMQ

RabbitMQ 初识RabbitMQ

RabbitMQ 初识

RabbitMQ 初识

RabbitMQ——初识RabbitMQ & 安装步骤