RabbitMQ基础理解
Posted 键盘诞生真理,鼠标指引方向
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ基础理解相关的知识,希望对你有一定的参考价值。
RabbitMQ消息中间件:
hello world模式:
不使用交换机(或者说使用了默认交换机),消息生产者和消息消费者通过Queue队列关联,生产者将消息放入队列,消费者取出进行消费。
1 package Queue; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 ConnectionFactory connectionFactory=new ConnectionFactory(); 14 15 connectionFactory.setHost("1.117.107.150"); 16 // connectionFactory.setConnectionTimeout(5000); 17 connectionFactory.setPort(5672); 18 connectionFactory.setVirtualHost("/test"); 19 connectionFactory.setUsername("test"); 20 connectionFactory.setPassword("test"); 21 22 Connection connection = connectionFactory.newConnection(); 23 24 Channel channel = connection.createChannel(); 25 26 channel.queueDeclare("test",false,false,false,null); 27 28 for(int i=1;i<21;i++){ 29 channel.basicPublish("","test",null,(i+"hello").getBytes()); 30 } 31 32 33 channel.close(); 34 connection.close(); 35 } 36 }
1 package Queue; 2 3 import com.rabbitmq.client.*; 4 5 import java.io.IOException; 6 import java.util.concurrent.TimeoutException; 7 8 public class Customer1 { 9 10 public static void main(String[] args) throws IOException, TimeoutException { 11 ConnectionFactory connectionFactory=new ConnectionFactory(); 12 13 connectionFactory.setHost("1.117.107.150"); 14 // connectionFactory.setConnectionTimeout(5000); 15 connectionFactory.setPort(5672); 16 connectionFactory.setVirtualHost("/test"); 17 connectionFactory.setUsername("test"); 18 connectionFactory.setPassword("test"); 19 20 Connection connection = connectionFactory.newConnection(); 21 22 final Channel channel = connection.createChannel(); 23 24 channel.basicQos(1);//每次只接受一条消息进行消费,否则将会把所有消息放入队列,如果消费过程中宕机,消息将会丢失 25 channel.queueDeclare("test",false,false,false,null);//channel.queueDeclare(队列名,是否持久化,是否独占队列,是否自动删除,其他参数) 26 27 channel.basicConsume("test",false,new DefaultConsumer(channel){//这里的false决定不自动应答,采用手动应答。 28 @Override 29 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 30 System.out.println("new String = "+new String(body)); 36 channel.basicAck(envelope.getDeliveryTag(),false);//手动应答,告诉交换机,我队列中的消息处理完成。 37 } 38 }); 39 } 40 }
好处:结构单一,耦合度一般,容易理解。
坏处:吞吐量小,只适合小型项目。
其实就是一个简单的生产者/消费者模式。
工作队列模式:
不使用交换机(或者说使用了默认交换机),消息生产者和消息消费者通过Queue队列关联,生产者将消息放入队列,消费者取出进行消费,但与hello woeld不同的是,消息消费者可以是多个,他们将平均消费队列中的消息。
1 package Queue; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 ConnectionFactory connectionFactory=new ConnectionFactory(); 14 15 connectionFactory.setHost("1.117.107.150"); 16 // connectionFactory.setConnectionTimeout(5000); 17 connectionFactory.setPort(5672); 18 connectionFactory.setVirtualHost("/test"); 19 connectionFactory.setUsername("test"); 20 connectionFactory.setPassword("test"); 21 22 Connection connection = connectionFactory.newConnection(); 23 24 Channel channel = connection.createChannel(); 25 26 channel.queueDeclare("test",false,false,false,null); 27 28 for(int i=1;i<21;i++){ 29 channel.basicPublish("","test",null,(i+"hello").getBytes()); 30 } 31 32 33 channel.close(); 34 connection.close(); 35 } 36 }
1 package Queue; 2 3 import com.rabbitmq.client.*; 4 5 import java.io.IOException; 6 import java.util.concurrent.TimeoutException; 7 8 public class Customer1 { 9 10 public static void main(String[] args) throws IOException, TimeoutException { 11 ConnectionFactory connectionFactory=new ConnectionFactory(); 12 13 connectionFactory.setHost("1.117.107.150"); 14 // connectionFactory.setConnectionTimeout(5000); 15 connectionFactory.setPort(5672); 16 connectionFactory.setVirtualHost("/test"); 17 connectionFactory.setUsername("test"); 18 connectionFactory.setPassword("test"); 19 20 Connection connection = connectionFactory.newConnection(); 21 22 final Channel channel = connection.createChannel(); 23 24 channel.basicQos(1); 25 channel.queueDeclare("test",false,false,false,null); 26 27 channel.basicConsume("test",false,new DefaultConsumer(channel){ 28 @Override 29 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 30 try { 31 Thread.sleep(3000); 32 } catch (InterruptedException e) { 33 e.printStackTrace(); 34 } 35 System.out.println("new String = "+new String(body)); 36 channel.basicAck(envelope.getDeliveryTag(),false); 37 } 38 }); 39 } 40 }
package Queue; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Customer2 { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("1.117.107.150"); // connectionFactory.setConnectionTimeout(5000); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/test"); connectionFactory.setUsername("test"); connectionFactory.setPassword("test"); Connection connection = connectionFactory.newConnection(); final Channel channel = connection.createChannel(); channel.basicQos(1); channel.queueDeclare("test",false,false,false,null); channel.basicConsume("test",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//加入了睡眠时间模拟不同消费者处理消息的速度,能者多劳,处理快的被分发的消息多,反之就少 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
//因为2比1少睡眠1秒,所以处理的消息比1多,手动应答后交换机就会给我一条新的消息,忽略控制台打印的时间,同样6秒消费者2处理3条,消费者1处理两条。 System.out.println("new String = "+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }
好处:结构单一,耦合度一般,容易理解,吞吐量较hello world模式有所提升。
坏处:只适合消息吞吐量一般的小型项目。
本质上还是生产者/消费者模式,且多个消费者平均消费。
Publish/Subscribe发布订阅模式:
使用了交换机,消息生产者不在直接关联Queue队列,而是由exchange交换机负责消息分发,消息消费者依旧绑定Queue队列,Queue绑定交换机,多个Queue可以绑定到同一个交换机。生产者不再关心消息的去向,只负责将消息放入交换机。
1 package Utils; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class RabbitMQUtils { 11 12 private static ConnectionFactory connectionFactory; 13 14 static { 15 connectionFactory = new ConnectionFactory(); 16 connectionFactory.setHost("1.117.107.150"); 17 connectionFactory.setPort(5672); 18 connectionFactory.setVirtualHost("/test"); 19 connectionFactory.setUsername("guest"); 20 connectionFactory.setPassword("guest"); 21 22 } 23 24 25 public static Connection getConnection(){ 26 try { 27 return connectionFactory.newConnection(); 28 } catch (IOException e) { 29 e.printStackTrace(); 30 } catch (TimeoutException e) { 31 e.printStackTrace(); 32 } 33 return null; 34 } 35 36 public static void close(Connection conn, Channel chnn) throws IOException, TimeoutException { 37 if (chnn!=null) chnn.close(); 38 if (conn!=null) conn.close(); 39 } 40 41 }
1 package Fanout; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import Utils.RabbitMQUtils; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 Connection connection = RabbitMQUtils.getConnection(); 14 15 Channel channel = connection.createChannel(); 16 17 channel.exchangeDeclare("logs", "Fanout"); 18 19 channel.basicPublish("logs","",null,"fanout message".getBytes()); 20 21 RabbitMQUtils.close(connection,channel); 22 } 23 }
package Fanout; import com.rabbitmq.client.*; import Utils.RabbitMQUtils; import java.io.IOException; public class Customer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "Fanout"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"logs",""); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
好处:生产者消费者进一步解耦,并且吞吐量有所提升。
坏处:只能进行广播式消息分发,不能将消息特定的发送给某一个队列。
P/S又叫广播模式,只要是订阅了当前交换机的所有队列,均可收到交换机发送的消息。分发消息时会指定一个路由key,但在此模式下,路由key会被忽略。
Routing路由--direct(直连):
使用了交换机,消息分发时需要指定路由key,只有订阅了该交换机的并且路由key完全匹配的Queue队列才能接受消息,否则消息会被Queue队列丢弃。
1 package Routing.Direct; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import Utils.RabbitMQUtils; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 Connection connection = RabbitMQUtils.getConnection(); 14 15 Channel channel = connection.createChannel(); 16 17 channel.exchangeDeclare("routing","direct"); 18 19 for(int i=1;i<22;i++){ 20 if(i % 2==0){ 21 channel.basicPublish("routing","black",null,(i+": my name is black").getBytes());//black为路由key 22 }else if(i % 3==0){ 23 channel.basicPublish("routing","green",null,(i+": my name is green").getBytes());//green为路由key 24 } 25 } 26 RabbitMQUtils.close(connection,channel); 27 28 } 29 }
package Routing.Direct; import com.rabbitmq.client.*; import Utils.RabbitMQUtils; import java.io.IOException; public class CustomerBlack { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("routing","direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"routing","black");//队列的路由key channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } }); } }
1 package Routing.Direct; 2 3 import com.rabbitmq.client.*; 4 import Utils.RabbitMQUtils; 5 6 import java.io.IOException; 7 8 public class CustomerGreen { 9 10 public static void main(String[] args) throws IOException { 11 Connection connection = RabbitMQUtils.getConnection(); 12 13 Channel channel = connection.createChannel(); 14 15 channel.exchangeDeclare("routing","direct"); 16 17 String queue = channel.queueDeclare().getQueue(); 18 19 channel.queueBind(queue,"routing","green");//队列的路由key 20 21 channel.basicConsume(queue,true,new DefaultConsumer(channel){ 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 24 System.out.println(new String(body)); 25 } 26 }); 27 } 28 29 }
好处:可以根据路由key将消息发送到指定队列,控制消息的去向。
坏处:不够灵活,只能路由key完全匹配的时候才能接收消息。
Routing路由--topic(动态路由):
路由key支持 “*”与“#”两个通配符,路由key由多个单词组成,单词之间使用“.”分隔开,单词可以使用通配符代替,进行动态路由。“*”代表一个单词,“#”代表0个1个或者多个单词。两个“.”之间为一个单词。 例如:one.rabbitmq.two,name.rabbitmq.sex,email.rabbitmq.phone等都可以与【*.rabbitmq.*】相匹配,而face.rabbitmq.one.two则会匹配失败。将路由规则修改为【#.rabbitmq.#】则one.rabbitmq.two,face.rabbitmq.one.two,one.two.rabbitmq.face,one.two.rabbitmq.three.face等都可以匹配成功。
1 package Routing.Topic; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import Utils.RabbitMQUtils; 6 7 import java.io.IOException; 8 import java.util.concurrent.TimeoutException; 9 10 public class Provider { 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 Connection connection = RabbitMQUtils.getConnection(); 14 15 Channel channel = connection.createChannel(); 16 17 channel.exchangeDeclare("topic","topic"); 18 19 for(int i=1;i<22;i++){ 20 if(i % 2==0){ 21 channel.basicPublish("topic","s.topic.t",null,(i+": my name is *").getBytes()); 22 }else if(i % 3==0){ 23 channel.basicPublish("topic","topic.one.two",null,(i+": my name is #").getBytes()); 24 } 25 } 26 RabbitMQUtils.close(connection,channel); 27 } 28 }
1 package Routing.Topic; 2 3 import com.rabbitmq.client.*; 4 import Utils.RabbitMQUtils; 5 6 import java.io.IOException; 7 8 public class Customer1 { 9 10 public static void main(String[] args) throws IOException { 11 Connection connection = RabbitMQUtils.getConnection(); 12 13 Channel channel = connection.createChannel(); 14 15 channel.exchangeDeclare("topic","topic"); 16 17 String queue = channel.queueDeclare().getQueue(); 18 19 channel.queueBind(queue,"topic","*.topic.*");//只能接受三个单词且第二个单词为topic的路由key消息 20 21 channel.basicConsume(queue,true,new DefaultConsumer(channel){ 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 24 System.out.println(new String(body)); 25 } 26 }); 27 } 28 }
1 package Routing.Topic; 2 3 import com.rabbitmq.client.*; 4 import Utils.RabbitMQUtils; 5 6 import java.io.IOException; 7 8 public class Customer2 { 9 10 public static void main(String[] args) throws IOException { 11 Connection connection = RabbitMQUtils.getConnection(); 12 13 Channel channel = connection.createChannel(); 14 15 channel.exchangeDeclare("topic","topic"); 16 17 String queue = channel.queueDeclare().getQueue(); 18 19 channel.queueBind(queue,"topic","#.topic.#");//可以接收到所有第二个单词为topic的消息 20 21 channel.basicConsume(queue,true,new DefaultConsumer(channel){ 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 24 System.out.println(new String(body)); 25 } 26 }); 27 } 28 }
好处:更加灵活。
以上是关于RabbitMQ基础理解的主要内容,如果未能解决你的问题,请参考以下文章