RabbitMQ学习笔记(持续更新ing)
Posted 抠脚的大灰狼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习笔记(持续更新ing)相关的知识,希望对你有一定的参考价值。
快速入门(java)
-
首先安装rabbitmq(单机版)
在我自己租的云服务器上,直接用docker进行安装(一行命令搞定)
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
然后在阿里云的控制台,放开
5672
和15672
端口随后,可以直接登录rabbitmq的管理后台
http://127.0.0.1:15672
,便能看到rabbitmq的情况rabbit会创建一个默认的用户,用户名
guest
,密码guest
-
基于java编写一个简单的生产者和消费者
创建一个简单的
maven
项目,引入rabbitmq
的java依赖包<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
把rabbitmq的相关信息放在一个常量类中
package com.yogurt.demo.rabbit; /** * @Author yogurtzzz * @Date 2021/12/14 9:42 **/ public class Constants private Constants() public static final String RABBIT_IP = "127.0.0.1"; public static final int RABBIT_PORT = 5672; public static final String RABBIT_USER = "guest"; public static final String RABBIT_PASSWORD = "guest"; public static final String RABBIT_QUEUE_NAME = "hello";
编写一个生产者,负责推送消息到rabbit
package com.yogurt.demo.rabbit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import static com.yogurt.demo.rabbit.Constants.*; public class Send public static void main(String[] argv) throws Exception // 连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置连接信息, ip, 端口号, 账号, 密码 factory.setHost(RABBIT_IP); factory.setPort(RABBIT_PORT); factory.setUsername(RABBIT_USER); factory.setPassword(RABBIT_PASSWORD); // 创建连接, 发送消息 (使用try-with-resource) try (Connection connection = factory.newConnection()) String message = "Hello Rabbit"; Channel channel = connection.createChannel(); //如果该名称的队列不存在, 则新建一个 channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null); // 向该队列发送一条消息 channel.basicPublish("", RABBIT_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'");
跑起来!
然后我们登录管理页面看看
可以看到名为
hello
的队列中,有1条消息,我们可以点击队列的名称,然后点击Get Messages
,获取队列中的消息,可以看到这条消息的内容是Hello Rabbit
说明消息成功发送到rabbitmq当中了
随后,我们编写一个消费者
package com.yogurt.demo.rabbit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; import static com.yogurt.demo.rabbit.Constants.*; /** * @Author yogurtzzz * @Date 2021/12/14 9:42 **/ public class Recv public static void main(String[] args) ConnectionFactory factory = new ConnectionFactory(); factory.setHost(RABBIT_IP); factory.setPort(RABBIT_PORT); factory.setUsername(RABBIT_USER); factory.setPassword(RABBIT_PASSWORD); // 获取连接 Connection connection = null; try connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); ; channel.basicConsume(RABBIT_QUEUE_NAME, true, deliverCallback, consumerTag -> ); catch (Exception e) e.printStackTrace();
跑起来!
消费者成功消费到了
上面的示例就是一个最基本的模型,只有一个生产者,一个队列,一个消费者。
下面演示一个生产者,多个消费者的情况
这是一种竞争消费的模式,在一个队列上,绑定了多个消费者,消费者会争抢着消费消息。
生产者
package com.yogurt.demo.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import static com.yogurt.demo.rabbit.Constants.*;
public class Send
public static void main(String[] argv) throws Exception
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RABBIT_IP);
factory.setPort(RABBIT_PORT);
factory.setUsername(RABBIT_USER);
factory.setPassword(RABBIT_PASSWORD);
// 获取连接, 发送消息
try (Connection connection = factory.newConnection())
// 从控制台读入
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true)
String message = reader.readLine();
// 输入 -1 则表示退出
if ("-1".equals(message)) return;
Channel channel = connection.createChannel();
channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
channel.basicPublish("", RABBIT_QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
消费者
package com.yogurt.demo.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static com.yogurt.demo.rabbit.Constants.*;
/**
* @Author yogurtzzz
* @Date 2021/12/14 9:42
**/
public class Recv implements Runnable
@Override
public void run()
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(RABBIT_IP);
factory.setPort(RABBIT_PORT);
factory.setUsername(RABBIT_USER);
factory.setPassword(RABBIT_PASSWORD);
long threadId = Thread.currentThread().getId();
// 获取连接
Connection connection = null;
try
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RABBIT_QUEUE_NAME, false, false, false, null);
System.out.println("Thread " + threadId + " [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Thread " + threadId + " [x] Received '" + message + "'");
;
channel.basicConsume(RABBIT_QUEUE_NAME, true, deliverCallback, consumerTag -> );
catch (Exception e)
e.printStackTrace();
public static void main(String[] args) throws IOException
Runnable runnable = new Recv();
// 启动5个消费者
for (int i = 0; i < 5; i++)
new Thread(runnable).start();
// stuck here
System.in.read();
先启动5个消费者
可以在管理后台看到现在有5个连接
再启动生产者,并在控制台输入一些信息
可以看到发送到rabbitmq的三条消息,成功被消费者消费(5个消费者争抢着消费,一条消息只会被一个消费者消费,此种模式下,rabbitmq会依次将消息推送给消费者,根据下图可以观察到,消费者的启动顺序为15,16,13,14,12。rabbitmq也按照这个顺序(轮询,Round-Robin)依次把消息交给对应的消费者进行消费)
快速入门(springboot)
上面介绍的是基于java
的简单教程,但是通常我们开发一个应用,会使用到框架,其中又以springboot为代表。下面介绍rabbitmq整合springboot的基本使用
-
创建一个springboot项目
-
pom.xml
中添加如下依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
在
application.yml
中配置rabbitmq的地址等spring: application: name: rabbitmq-demo rabbitmq: host: 127.0.0.1 port: 5672 username: yogurt password: yogurt virtual-host: /test
-
添加配置类,配置队列,consumer工厂,消息转换器等
package com.demo.rabbitmq.config; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig /** * 注册一个 MessageConverter, 发送消息时可以直接发送一个POJO **/ @Bean public MessageConverter messageConverter() return new Jackson2JsonMessageConverter(); /** * 新建一个队列, 队列名为 yogurt * **/ @Bean public Queue yogurt() return new Queue("yogurt"); /** * 配置consumer工厂 * **/ @Bean public SimpleRabbitListenerContainerFactory consumerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); // consumer 的 prefetch 设置 factory.setPrefetchCount(30); // 并发配置 - 同时开启5个消费者(5个线程) factory.setConcurrentConsumers(5); // 最大并发配置 (当消息堆积时, 会新开线程来处理, 最大能到20个) // 有点类似jdk的线程池 factory.setMaxConcurrentConsumers(20); // 消费者开启 手动ack 机制 factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 接收消息时, 可以直接将消息反序列化为 POJO factory.setMessageConverter(new Jackson2JsonMessageConverter()); configurer.configure(factory, connectionFactory); return factory;
-
定义一个POJO,表示发送到rabbitmq的消息
public class UserInfo implements Serializable private String name; private Integer age; private String career; private String gender; private String hometown; // 省略了构造函数和 getter/setter
-
创建生产者
package com.demo.rabbitmq.component; import com.demo.rabbitmq.data.UserInfo; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Profile; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * @Author yogurtzzz * @Date 2021/12/15 14:55 **/ @Profile("sender") @Component public class RabbitMqSender private int cnt = 0; // 由 rabbitmq-starter 自动注册进来的, 其实现目前只有1个 RabbitTemplate // 但为了依赖于接口, 最好用 AmqpTemplate 来接收 @Autowired private AmqpTemplate template; // 这里的 Queue 就是前面配置的名称为 yogurt 的队列 @Autowired private Queue queue; /** * 每4秒发送一条消息 * */ @Scheduled(fixedRate = 5000, initialDelay = 2000) public void send() cnt++; UserInfo info = new UserInfo("yogurt-" + cnt, 26, "Software Engineer", "Male", "China"); // 发送一个 UserInfo 对象到 rabbitmq template.convertAndSend(queue.getName(), info); System以上是关于RabbitMQ学习笔记(持续更新ing)的主要内容,如果未能解决你的问题,请参考以下文章