RabbitMQ基础理解

Posted 键盘诞生真理,鼠标指引方向

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ基础理解相关的知识,希望对你有一定的参考价值。

RabbitMQ消息中间件:

hello world模式:

不使用交换机(或者说使用了默认交换机),消息生产者和消息消费者通过Queue队列关联,生产者将消息放入队列,消费者取出进行消费。

 1 package Queue;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Provider {
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         ConnectionFactory connectionFactory=new ConnectionFactory();
14 
15         connectionFactory.setHost("1.117.107.150");
16 //        connectionFactory.setConnectionTimeout(5000);
17         connectionFactory.setPort(5672);
18         connectionFactory.setVirtualHost("/test");
19         connectionFactory.setUsername("test");
20         connectionFactory.setPassword("test");
21 
22         Connection connection = connectionFactory.newConnection();
23 
24         Channel channel = connection.createChannel();
25 
26         channel.queueDeclare("test",false,false,false,null);
27 
28         for(int i=1;i<21;i++){
29             channel.basicPublish("","test",null,(i+"hello").getBytes());
30         }
31 
32 
33         channel.close();
34         connection.close();
35     }
36 }
 1 package Queue;
 2 
 3 import com.rabbitmq.client.*;
 4 
 5 import java.io.IOException;
 6 import java.util.concurrent.TimeoutException;
 7 
 8 public class Customer1 {
 9 
10     public static void main(String[] args) throws IOException, TimeoutException {
11         ConnectionFactory connectionFactory=new ConnectionFactory();
12 
13         connectionFactory.setHost("1.117.107.150");
14 //        connectionFactory.setConnectionTimeout(5000);
15         connectionFactory.setPort(5672);
16         connectionFactory.setVirtualHost("/test");
17         connectionFactory.setUsername("test");
18         connectionFactory.setPassword("test");
19 
20         Connection connection = connectionFactory.newConnection();
21 
22         final Channel channel = connection.createChannel();
23 
24         channel.basicQos(1);//每次只接受一条消息进行消费,否则将会把所有消息放入队列,如果消费过程中宕机,消息将会丢失
25         channel.queueDeclare("test",false,false,false,null);//channel.queueDeclare(队列名,是否持久化,是否独占队列,是否自动删除,其他参数)
26 
27         channel.basicConsume("test",false,new DefaultConsumer(channel){//这里的false决定不自动应答,采用手动应答。
28             @Override
29             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
30                 System.out.println("new String = "+new String(body));
36                 channel.basicAck(envelope.getDeliveryTag(),false);//手动应答,告诉交换机,我队列中的消息处理完成。
37             }
38         });
39     }
40 }

 

好处:结构单一,耦合度一般,容易理解。

坏处:吞吐量小,只适合小型项目。

其实就是一个简单的生产者/消费者模式。

工作队列模式:

不使用交换机(或者说使用了默认交换机),消息生产者和消息消费者通过Queue队列关联,生产者将消息放入队列,消费者取出进行消费,但与hello woeld不同的是,消息消费者可以是多个,他们将平均消费队列中的消息。

 1 package Queue;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Provider {
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         ConnectionFactory connectionFactory=new ConnectionFactory();
14 
15         connectionFactory.setHost("1.117.107.150");
16 //        connectionFactory.setConnectionTimeout(5000);
17         connectionFactory.setPort(5672);
18         connectionFactory.setVirtualHost("/test");
19         connectionFactory.setUsername("test");
20         connectionFactory.setPassword("test");
21 
22         Connection connection = connectionFactory.newConnection();
23 
24         Channel channel = connection.createChannel();
25 
26         channel.queueDeclare("test",false,false,false,null);
27 
28         for(int i=1;i<21;i++){
29             channel.basicPublish("","test",null,(i+"hello").getBytes());
30         }
31 
32 
33         channel.close();
34         connection.close();
35     }
36 }
 1 package Queue;
 2 
 3 import com.rabbitmq.client.*;
 4 
 5 import java.io.IOException;
 6 import java.util.concurrent.TimeoutException;
 7 
 8 public class Customer1 {
 9 
10     public static void main(String[] args) throws IOException, TimeoutException {
11         ConnectionFactory connectionFactory=new ConnectionFactory();
12 
13         connectionFactory.setHost("1.117.107.150");
14 //        connectionFactory.setConnectionTimeout(5000);
15         connectionFactory.setPort(5672);
16         connectionFactory.setVirtualHost("/test");
17         connectionFactory.setUsername("test");
18         connectionFactory.setPassword("test");
19 
20         Connection connection = connectionFactory.newConnection();
21 
22         final Channel channel = connection.createChannel();
23 
24         channel.basicQos(1);
25         channel.queueDeclare("test",false,false,false,null);
26 
27         channel.basicConsume("test",false,new DefaultConsumer(channel){
28             @Override
29             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
30                 try {
31                     Thread.sleep(3000);
32                 } catch (InterruptedException e) {
33                     e.printStackTrace();
34                 }
35                 System.out.println("new String = "+new String(body));
36                 channel.basicAck(envelope.getDeliveryTag(),false);
37             }
38         });
39     }
40 }
package Queue;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer2 {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory=new ConnectionFactory();

        connectionFactory.setHost("1.117.107.150");
//        connectionFactory.setConnectionTimeout(5000);
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("test");
        connectionFactory.setPassword("test");

        Connection connection = connectionFactory.newConnection();

        final Channel channel = connection.createChannel();

        channel.basicQos(1);
        channel.queueDeclare("test",false,false,false,null);

        channel.basicConsume("test",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
          //加入了睡眠时间模拟不同消费者处理消息的速度,能者多劳,处理快的被分发的消息多,反之就少
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }
          //因为2比1少睡眠1秒,所以处理的消息比1多,手动应答后交换机就会给我一条新的消息,忽略控制台打印的时间,同样6秒消费者2处理3条,消费者1处理两条。 System.out.println(
"new String = "+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } }

 

好处:结构单一,耦合度一般,容易理解,吞吐量较hello world模式有所提升。

坏处:只适合消息吞吐量一般的小型项目。

本质上还是生产者/消费者模式,且多个消费者平均消费。

Publish/Subscribe发布订阅模式:

使用了交换机,消息生产者不在直接关联Queue队列,而是由exchange交换机负责消息分发,消息消费者依旧绑定Queue队列,Queue绑定交换机,多个Queue可以绑定到同一个交换机。生产者不再关心消息的去向,只负责将消息放入交换机。

 1 package Utils;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class RabbitMQUtils {
11 
12     private static ConnectionFactory connectionFactory;
13 
14     static {
15          connectionFactory = new ConnectionFactory();
16         connectionFactory.setHost("1.117.107.150");
17         connectionFactory.setPort(5672);
18         connectionFactory.setVirtualHost("/test");
19         connectionFactory.setUsername("guest");
20         connectionFactory.setPassword("guest");
21 
22     }
23 
24 
25     public static Connection getConnection(){
26         try {
27             return connectionFactory.newConnection();
28         } catch (IOException e) {
29             e.printStackTrace();
30         } catch (TimeoutException e) {
31             e.printStackTrace();
32         }
33         return null;
34     }
35 
36     public static void close(Connection conn, Channel chnn) throws IOException, TimeoutException {
37         if (chnn!=null) chnn.close();
38         if (conn!=null) conn.close();
39     }
40 
41 }
 1 package Fanout;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import Utils.RabbitMQUtils;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Provider {
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         Connection connection = RabbitMQUtils.getConnection();
14 
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare("logs", "Fanout");
18 
19         channel.basicPublish("logs","",null,"fanout message".getBytes());
20 
21         RabbitMQUtils.close(connection,channel);
22     }
23 }
package Fanout;

import com.rabbitmq.client.*;
import Utils.RabbitMQUtils;

import java.io.IOException;

public class Customer2 {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare("logs", "Fanout");

        String queue = channel.queueDeclare().getQueue();

        channel.queueBind(queue,"logs","");

        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });

    }

}

 

好处:生产者消费者进一步解耦,并且吞吐量有所提升。

坏处:只能进行广播式消息分发,不能将消息特定的发送给某一个队列。

P/S又叫广播模式,只要是订阅了当前交换机的所有队列,均可收到交换机发送的消息。分发消息时会指定一个路由key,但在此模式下,路由key会被忽略。

Routing路由--direct(直连):

使用了交换机,消息分发时需要指定路由key,只有订阅了该交换机的并且路由key完全匹配的Queue队列才能接受消息,否则消息会被Queue队列丢弃。

 1 package Routing.Direct;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import Utils.RabbitMQUtils;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Provider {
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         Connection connection = RabbitMQUtils.getConnection();
14 
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare("routing","direct");
18 
19         for(int i=1;i<22;i++){
20             if(i % 2==0){
21                 channel.basicPublish("routing","black",null,(i+": my name is black").getBytes());//black为路由key
22             }else if(i % 3==0){
23                 channel.basicPublish("routing","green",null,(i+": my name is green").getBytes());//green为路由key
24             }
25         }
26         RabbitMQUtils.close(connection,channel);
27 
28     }
29 }
package Routing.Direct;

import com.rabbitmq.client.*;
import Utils.RabbitMQUtils;

import java.io.IOException;

public class CustomerBlack {

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();

        channel.exchangeDeclare("routing","direct");

        String queue = channel.queueDeclare().getQueue();

        channel.queueBind(queue,"routing","black");//队列的路由key

        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(new String(body));
            }
        });
    }

}
 1 package Routing.Direct;
 2 
 3 import com.rabbitmq.client.*;
 4 import Utils.RabbitMQUtils;
 5 
 6 import java.io.IOException;
 7 
 8 public class CustomerGreen {
 9 
10     public static void main(String[] args) throws IOException {
11         Connection connection = RabbitMQUtils.getConnection();
12 
13         Channel channel = connection.createChannel();
14 
15         channel.exchangeDeclare("routing","direct");
16 
17         String queue = channel.queueDeclare().getQueue();
18 
19         channel.queueBind(queue,"routing","green");//队列的路由key
20 
21         channel.basicConsume(queue,true,new DefaultConsumer(channel){
22             @Override
23             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
24                 System.out.println(new String(body));
25             }
26         });
27     }
28 
29 }

 

好处:可以根据路由key将消息发送到指定队列,控制消息的去向。

坏处:不够灵活,只能路由key完全匹配的时候才能接收消息。

Routing路由--topic(动态路由):

路由key支持 “*”与“#”两个通配符,路由key由多个单词组成,单词之间使用“.”分隔开,单词可以使用通配符代替,进行动态路由。“*”代表一个单词,“#”代表0个1个或者多个单词。两个“.”之间为一个单词。 例如:one.rabbitmq.two,name.rabbitmq.sex,email.rabbitmq.phone等都可以与【*.rabbitmq.*】相匹配,而face.rabbitmq.one.two则会匹配失败。将路由规则修改为【#.rabbitmq.#】则one.rabbitmq.two,face.rabbitmq.one.two,one.two.rabbitmq.face,one.two.rabbitmq.three.face等都可以匹配成功。

 1 package Routing.Topic;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import Utils.RabbitMQUtils;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class Provider {
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         Connection connection = RabbitMQUtils.getConnection();
14 
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare("topic","topic");
18 
19         for(int i=1;i<22;i++){
20             if(i % 2==0){
21                 channel.basicPublish("topic","s.topic.t",null,(i+": my name is *").getBytes());
22             }else if(i % 3==0){
23                 channel.basicPublish("topic","topic.one.two",null,(i+": my name is #").getBytes());
24             }
25         }
26         RabbitMQUtils.close(connection,channel);
27     }
28 }
 1 package Routing.Topic;
 2 
 3 import com.rabbitmq.client.*;
 4 import Utils.RabbitMQUtils;
 5 
 6 import java.io.IOException;
 7 
 8 public class Customer1 {
 9 
10     public static void main(String[] args) throws IOException {
11         Connection connection = RabbitMQUtils.getConnection();
12 
13         Channel channel = connection.createChannel();
14 
15         channel.exchangeDeclare("topic","topic");
16 
17         String queue = channel.queueDeclare().getQueue();
18 
19         channel.queueBind(queue,"topic","*.topic.*");//只能接受三个单词且第二个单词为topic的路由key消息
20 
21         channel.basicConsume(queue,true,new DefaultConsumer(channel){
22             @Override
23             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
24                 System.out.println(new String(body));
25             }
26         });
27     }
28 }
 1 package Routing.Topic;
 2 
 3 import com.rabbitmq.client.*;
 4 import Utils.RabbitMQUtils;
 5 
 6 import java.io.IOException;
 7 
 8 public class Customer2 {
 9 
10     public static void main(String[] args) throws IOException {
11         Connection connection = RabbitMQUtils.getConnection();
12 
13         Channel channel = connection.createChannel();
14 
15         channel.exchangeDeclare("topic","topic");
16 
17         String queue = channel.queueDeclare().getQueue();
18 
19         channel.queueBind(queue,"topic","#.topic.#");//可以接收到所有第二个单词为topic的消息
20 
21         channel.basicConsume(queue,true,new DefaultConsumer(channel){
22             @Override
23             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
24                 System.out.println(new String(body));
25             }
26         });
27     }
28 }

 

好处:更加灵活。

 

 

以上是关于RabbitMQ基础理解的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ基础篇

RabbitMQ 基础知识

RabbitMQ 概念

rabbitmq-基础入门

金蝶handler中 collection 代码片段理解

RabbitMQ基础篇(下载安装并基础使用,内含各种坑问题)