了解RabbitMQ
Posted 时光与我恰经过
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了了解RabbitMQ相关的知识,希望对你有一定的参考价值。
简介:RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。
RabbitMQ的安装与配置百度都有,我就不介绍了,毕竟这里写起来也是比较麻烦的。
什么时候需要用到RabbitMQ呢?我从一个简单的例子来引入。
假设有这么一个功能,我们要对接收到的压缩包进行解压并处理,那么我们第一步想到的是否类似于这样:
public static void main(){ public void Do(){ while(true){ String guid = Receive(); Deal(guid); } } public String Receive(){ //接收文件并返回文件标识 return guid; } public void Deal(String guid){ //解压缩并处理 } }
这样我们就能把基本的功能实现了,先接收文件然后处理文件,咋一看好像没什么问题,但是仔细想想,这个程序是不是有点浪费,因为接收和处理两个方法每次执行只有一个方法在运行。那我们继续改一下:
public static void main(){ public void Do(){ while(true){ String guid = Receive(); } while(true){ Deal(guid); } } public String Receive(){ //接收文件并返回文件标识 return guid; } public void Deal(String guid){ //解压缩并处理 } }
这样问题又来了,怎么把guid传给deal呢,我们继续改造,加入一种数据结构:
public static void main(){ ArrayList list = new ArrayList(); public void Do(){ while(true){ String guid = Receive(); list.add(guid); } while(true){ if(list.size()>0){ String guid = list.get(0); Deal(guid); list.remove(0); } } } public String Receive(){ //接收文件并返回文件标识 return guid; } public void Deal(String guid){ //解压缩并处理 } }
我们实际操作中会有这样的情况,可能接收需要1秒钟,处理需要2秒钟,我们用两个线程分别执行,那很有可能有两个线程同时去判断list.size()>0,然后处理同一个对象,这是我们不愿意看到的。
想想大学时候学的队列,是不是很符合要求,先进先出,所以我们稍微改下,把ArrayList换成QueueList:
public static void main(){ QueueList list = new QueueList(); public void Do(){ while(true){ String guid = Receive(); list.enqueue(guid); } while(true){ String guid = list.dequeue(); Deal(guid); } } public String Receive(){ //接收文件并返回文件标识 return guid; } public void Deal(String guid){ //解压缩并处理 } }
那可能随着数据量越来越庞大,一个进程无法满足,我们就要创建多个进程,甚至多台机器。那么问题又来了,队列如何共享呢?
既然需要全局的队列,那么我们是否需要满足下面几点:
① 多个程序像连接数据库一个可以访问同一个队列
② 程序既可以enqueue又可以dequeue
③ 如果有新的数据入库,可以反过来通知程序去接收处理,而不是我们程序主动扫描
④ 程序有容错功能,宕机、停电或重启等操作,能让数据保留下来(持久化)
⑤ 一个数据,不能被两个程序同时访问,得有锁定功能
⑥ 既然是一个独立软件,就不能只管理一个队列,应该可以管理多个
想想我们自己来实现,是不是挺恐怖的!
这时候消息队列的概念就被引入了,我们已知的有微软的MessageQueue,开源的RabbitMQ,还有Apache的ActiveMQ
当然,如果需要在代码中引入的话,第一步肯定是引入jar包。我们来写一个简单的加入队列和取出队列功能
public static void main() throws IOException,TimeoutException{ ConnectionFactory connect = new ConnectionFactory(); //rabbitMQ IP connect.setHost("192.168.215.331"); //端口号 connect.setPort(5672); //用户名 connect.setUsername("lsd"); //密码 connect.setPassword("123456"); String queueName = "TESTMQ"; Connection connection = connect.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName,true,false,false,null); String msg = "Hello"; channel.basicPublish("",queueName,null,msg.getBytes("UTF-8")); channel.close(); connection.close(); }
我们在上面的程序里面新建了一个叫做TESTMQ的队列,并且往队列里面放入了一个字符串Hello
public static void main() throws IOException,TimeoutException{ ConnectionFactory connect = new ConnectionFactory(); //rabbitMQ IP connect.setHost("192.168.215.331"); //端口号 connect.setPort(5672); //用户名 connect.setUsername("lsd"); //密码 connect.setPassword("123456"); String queueName = "TESTMQ"; Connection connection = connect.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName,true,false,false,null); channel.basicQos(); QueueingConsumer consumer = new Queueingconsumer(channel); channel.basicConsume(queuqName,false,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] byte = delivery.getBody();
System.out.println(new String(byte));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
} }
我们在上面的程序里面从一个叫做TESTMQ的队列里面不停的取值(因为是死循环),很容易理解吧,第一个main函数可以不断执行,第二个main函数都能及时取到值,先讲到这儿,入门应该是够了
以上是关于了解RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章