RabbitMQ学习笔记(持续更新ing)

Posted 抠脚的大灰狼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习笔记(持续更新ing)相关的知识,希望对你有一定的参考价值。

快速入门(java)

  1. 首先安装rabbitmq(单机版)

    rabbitmq的安装(官网文档)

    在我自己租的云服务器上,直接用docker进行安装(一行命令搞定)

    docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
    

    然后在阿里云的控制台,放开567215672端口

    随后,可以直接登录rabbitmq的管理后台http://127.0.0.1:15672,便能看到rabbitmq的情况

    rabbit会创建一个默认的用户,用户名guest,密码guest

  2. 基于java编写一个简单的生产者和消费者

    rabbitmq的java教程(官网文档)

    创建一个简单的maven项目,引入rabbitmq的java依赖包

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.9.0</version>
    </dependency>
    

    把rabbitmq的相关信息放在一个常量类中

    package com.yogurt.demo.rabbit;
    
    /**
     * @Author yogurtzzz
     * @Date 2021/12/14 9:42
     **/
    public class Constants 
    
    	private Constants()  
    
    	public static final String RABBIT_IP = "127.0.0.1";
    
    	public static final int RABBIT_PORT = 5672;
    
    	public static final String RABBIT_USER = "guest";
    
    	public static final String RABBIT_PASSWORD = "guest";
    
    	public static final String RABBIT_QUEUE_NAME = "hello";
    
    
    

    编写一个生产者,负责推送消息到rabbit

    package com.yogurt.demo.rabbit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.nio.charset.StandardCharsets;
    
    import static com.yogurt.demo.rabbit.Constants.*;
    
    public class Send 
    
    	public static void main(String[] argv) throws Exception 
            // 连接工厂
    		ConnectionFactory factory = new ConnectionFactory();
    		// 设置连接信息, ip, 端口号, 账号, 密码
            factory.setHost(RABBIT_IP);
    		factory.setPort(RABBIT_PORT);
    		factory.setUsername(RABBIT_USER);
    		factory.setPassword(RABBIT_PASSWORD);
    		// 创建连接, 发送消息 (使用try-with-resource)
    		try (Connection connection = factory.newConnection()) 
    				String message = "Hello Rabbit";
    				Channel channel = connection.createChannel();
                    //如果该名称的队列不存在, 则新建一个
    				channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
    				// 向该队列发送一条消息	
                channel.basicPublish("", RABBIT_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
    				System.out.println(" [x] Sent '" + message + "'");
    		
    	
    
    

    跑起来!

    然后我们登录管理页面看看

    可以看到名为hello的队列中,有1条消息,我们可以点击队列的名称,然后点击Get Messages,获取队列中的消息,可以看到这条消息的内容是Hello Rabbit

    说明消息成功发送到rabbitmq当中了

    随后,我们编写一个消费者

    package com.yogurt.demo.rabbit;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.nio.charset.StandardCharsets;
    import static com.yogurt.demo.rabbit.Constants.*;
    
    /**
     * @Author yogurtzzz
     * @Date 2021/12/14 9:42
     **/
    public class Recv 
    
    	public static void main(String[] args) 
    		ConnectionFactory factory = new ConnectionFactory();
    		factory.setHost(RABBIT_IP);
    		factory.setPort(RABBIT_PORT);
    		factory.setUsername(RABBIT_USER);
    		factory.setPassword(RABBIT_PASSWORD);
    
    		// 获取连接
    		Connection connection = null;
    		try 
    			connection = factory.newConnection();
    			Channel channel = connection.createChannel();
    			channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
    			System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
    			DeliverCallback deliverCallback = (consumerTag, delivery) -> 
    				String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
    				System.out.println(" [x] Received '" + message + "'");
    			;
    			channel.basicConsume(RABBIT_QUEUE_NAME, true, deliverCallback, consumerTag ->  );
    		 catch (Exception e) 
    			e.printStackTrace();
    		
    	
    
    

    跑起来!

    消费者成功消费到了

上面的示例就是一个最基本的模型,只有一个生产者,一个队列,一个消费者。

下面演示一个生产者,多个消费者的情况

这是一种竞争消费的模式,在一个队列上,绑定了多个消费者,消费者会争抢着消费消息。

生产者

package com.yogurt.demo.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

import static com.yogurt.demo.rabbit.Constants.*;

public class Send 


	public static void main(String[] argv) throws Exception 
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(RABBIT_IP);
		factory.setPort(RABBIT_PORT);
		factory.setUsername(RABBIT_USER);
		factory.setPassword(RABBIT_PASSWORD);
		// 获取连接, 发送消息
		try (Connection connection = factory.newConnection()) 
			// 从控制台读入
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
			while (true) 
				String message = reader.readLine();
                // 输入 -1 则表示退出
				if ("-1".equals(message)) return;
				Channel channel = connection.createChannel();
				channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
				channel.basicPublish("", RABBIT_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
				System.out.println(" [x] Sent '" + message + "'");
			
		
	

消费者

package com.yogurt.demo.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static com.yogurt.demo.rabbit.Constants.*;

/**
 * @Author yogurtzzz
 * @Date 2021/12/14 9:42
 **/
public class Recv implements Runnable

	@Override
	public void run() 
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(RABBIT_IP);
		factory.setPort(RABBIT_PORT);
		factory.setUsername(RABBIT_USER);
		factory.setPassword(RABBIT_PASSWORD);

		long threadId = Thread.currentThread().getId();
		// 获取连接
		Connection connection = null;
		try 
			connection = factory.newConnection();
			Channel channel = connection.createChannel();
			channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
			System.out.println("Thread " + threadId + " [*] Waiting for messages. To exit press CTRL+C");

			DeliverCallback deliverCallback = (consumerTag, delivery) -> 
				String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
				System.out.println("Thread " + threadId + " [x] Received '" + message + "'");
			;
			channel.basicConsume(RABBIT_QUEUE_NAME, true, deliverCallback, consumerTag ->  );
		 catch (Exception e) 
			e.printStackTrace();
		
	

	public static void main(String[] args) throws IOException 
		Runnable runnable = new Recv();
		// 启动5个消费者
		for (int i = 0; i < 5; i++) 
			new Thread(runnable).start();
		
		// stuck here
		System.in.read();
	


先启动5个消费者

可以在管理后台看到现在有5个连接

再启动生产者,并在控制台输入一些信息

可以看到发送到rabbitmq的三条消息,成功被消费者消费(5个消费者争抢着消费,一条消息只会被一个消费者消费,此种模式下,rabbitmq会依次将消息推送给消费者,根据下图可以观察到,消费者的启动顺序为15,16,13,14,12。rabbitmq也按照这个顺序(轮询,Round-Robin)依次把消息交给对应的消费者进行消费)

快速入门(springboot)

上面介绍的是基于java的简单教程,但是通常我们开发一个应用,会使用到框架,其中又以springboot为代表。下面介绍rabbitmq整合springboot的基本使用

  1. 创建一个springboot项目

  2. pom.xml中添加如下依赖

    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  3. application.yml中配置rabbitmq的地址等

    spring:
      application:
        name: rabbitmq-demo
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: yogurt
        password: yogurt
        virtual-host: /test
    
  4. 添加配置类,配置队列,consumer工厂,消息转换器等

    package com.demo.rabbitmq.config;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitMqConfig 
    
        /**
        * 注册一个 MessageConverter, 发送消息时可以直接发送一个POJO
        **/
    	@Bean
    	public MessageConverter messageConverter() 
    		return new Jackson2JsonMessageConverter();
    	
    
    	/**
    	 * 新建一个队列, 队列名为 yogurt
    	 * **/
    	@Bean
    	public Queue yogurt() 
    		return new Queue("yogurt");
    	
    
        /**
        * 配置consumer工厂
        * **/
    	@Bean
    	public SimpleRabbitListenerContainerFactory consumerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
    	                                                            ConnectionFactory connectionFactory) 
    		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    		// consumer 的 prefetch 设置
            factory.setPrefetchCount(30);
    		// 并发配置 - 同时开启5个消费者(5个线程)
    		factory.setConcurrentConsumers(5);
            // 最大并发配置 (当消息堆积时, 会新开线程来处理, 最大能到20个)
            // 有点类似jdk的线程池
    		factory.setMaxConcurrentConsumers(20);
            // 消费者开启 手动ack 机制
    		factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            // 接收消息时, 可以直接将消息反序列化为 POJO
    		factory.setMessageConverter(new Jackson2JsonMessageConverter());
    		configurer.configure(factory, connectionFactory);
    		return factory;
    	
    
    
  5. 定义一个POJO,表示发送到rabbitmq的消息

    public class UserInfo implements Serializable 
    
    	private String name;
    
    	private Integer age;
    
    	private String career;
    
    	private String gender;
    
    	private String hometown;
        
        // 省略了构造函数和 getter/setter
    
    
  6. 创建生产者

    package com.demo.rabbitmq.component;
    
    import com.demo.rabbitmq.data.UserInfo;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Profile;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    /**
     * @Author yogurtzzz
     * @Date 2021/12/15 14:55
     **/
    @Profile("sender")
    @Component
    public class RabbitMqSender 
    
    	private int cnt = 0;
    
        // 由 rabbitmq-starter 自动注册进来的, 其实现目前只有1个  RabbitTemplate 
        // 但为了依赖于接口, 最好用 AmqpTemplate 来接收
    	@Autowired
    	private AmqpTemplate template;
    
        // 这里的 Queue 就是前面配置的名称为 yogurt 的队列
    	@Autowired
    	private Queue queue;
    
    
    	/**
    	 * 每4秒发送一条消息
    	 * */
    	@Scheduled(fixedRate = 5000, initialDelay = 2000)
    	public void send() 
    		cnt++;
    		UserInfo info = new UserInfo("yogurt-" + cnt, 26, "Software Engineer", "Male", "China");
    		// 发送一个 UserInfo 对象到 rabbitmq
            template.convertAndSend(queue.getName(), info);
    		System以上是关于RabbitMQ学习笔记(持续更新ing)的主要内容,如果未能解决你的问题,请参考以下文章

    一些个人笔记,持续更新ing

    flask插件全家桶集成学习---持续更新ing

    物联网使能服务--笔记(持续更新ing)

    Android第一行代码--学习笔记(更新中ing)

    MongoDB配置(持续学习更新ing……)

    Linux 开发工具使用(持续学习更新ing……)