RabbitMQ简单使用
Posted 穿越在未来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ简单使用相关的知识,希望对你有一定的参考价值。
这篇文章通过一个最简单的例子,让初学者能了解RabbitMQ如何完成生产消息和消息的。
所有的程序员在学习一门新技术的时候,都是从 Hello World 进入到Colorful World的,本节也将按照惯例,从HelloWorld开始,演示RabbitMQ的Produce和Consumer的简单使用。本RabbitMQ系列的演示代码默认都是使用Java语言。
设置账号
在开始HelloWorld之前,需要注意的是,RabbitMQ默认的账号是guest / guest,这个账号有限制,默认只能通过本地网络(localhost)访问,远程访问受限制,所以在实际发送和消费消息之前,需要设置新的账号和设置权限。具体账号和权限的内容敬请关注后面的更新。
添加账号
我们为HelloWorld创建一个新的用户为root,并设置密码为root,后续Java客户端代码中使用这个root账号发送和消费消息。
[root@hidden -]# rabbitmqct1 add user root root
Creating user "root"
设置权限
在创建好账号之后就要为这个账号创建权限了。
[root@hidden - ]# rabbitmqct1 set_permissions -p / root ".*" ".*" ".*"
Setting permissions for user "root" in vhost "/"
设置角色
最后需要为这个账号添加角色,这里我们添加管理员角色
[root@hidden - ]# rabbitmqct1 set user_tags root administrator
Setting tags for user "root" to [administrator]
经过上面的步骤,root账号就已经创建成功,也可以通过客户端链接rabbitmq的broker,如果遇到下面的问题,说明是账号出现的问题,参考上面的步骤设置,或者检查账号是否正确。
Exception in thread "main" com.rabbitmq.c1ient.AuthenticationFai1ureException :
ACCESS REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker 1ogfi1e.
添加Maven依赖
RabbitMQ的java版客户端的maven以来如下,可以根据自己的环境选择具体的版本即可。
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupld>com.rabbitmq</groupld>
<artifactld>amqp-client</artifactld>
<version>$rabbitmq.version</version>
</dependency>
Producer案例
Producer就是用来向MQ发送消息,下面就是一个Producer的HelloWorld。
import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerTest
@Test
public void helloWorld() throws IOException, TimeoutException
// 交换器名字
String exchangeName = "helloworld_exchange";
// 路由键
String routingKey = "helloworld_routing_key";
// 队列名字
String queueName = "helloworld_queue";
// 创建连接工厂,用来创建具体的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root");
// 根据上面的设置信息,创建具体的连接
Connection connection = connectionFactory.newConnection();
// 在创建的连接上,创建一个通道
Channel channel = connection.createChannel();
// 在通道上声明交换器
channel.exchangeDeclare(exchangeName, "direct", true, false, null);
// 在通道上声明队列
channel.queueDeclare(queueName, true, false, false, null);
// 声明交换器和队列的绑定关系
channel.queueBind(queueName, exchangeName, routingKey);
String message = "hello world";
// 往通道上发送消息,消息通过绑定键发送到指定的队列,也就是上面申明的绑定关系的队列
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 关闭通过和连接
channel.close();
connection.close();
Consumer案例
消息发送到MQ,Consumer就可以订阅队列,并开始消费消息,下面就是一个Consumer的HelloWorld。
import com.rabbitmq.client.*;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ConsumerTest
@Test
public void helloWorld() throws IOException, TimeoutException, InterruptedException
// 订阅的队列
String queueName = "helloworld_queue";
// 订阅队列所在的broker的地址信息,这里演示另一种创建连接的方式
Address[] addresses = new Address("localhost", 5672);
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection(addresses);
Channel channel = connection.createChannel();
// consumer端一般需要设置的值,表示一次消费的消息数量的最大值
channel.basicQos(64);
// 创建默认的Consumer,并实现回调函数,在回调中确认消息,发送ack
Consumer consumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println(Thread.currentThread().getName() + " -> consumer : " + consumerTag + " , receive message : " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
;
// 消费消息,异步执行
String consumeTag = channel.basicConsume(queueName, consumer);
System.out.println(consumeTag);
// 等到消费消息完成,并返回确认给broker
TimeUnit.SECONDS.sleep(10);
// 关闭资源
channel.close();
connection.close();
以上就是通过一个HelloWorld,了解下RabbitMQ的简单使用,后续会不定期更新RabbitMQ的内容,感兴趣的小伙伴敬请关注哦。
以上是关于RabbitMQ简单使用的主要内容,如果未能解决你的问题,请参考以下文章
学相伴狂神说 RabbitMQ笔记(简单使用RabbitMQ)
学相伴狂神说 RabbitMQ笔记(简单使用RabbitMQ)