RabbitMQ自学入门
Posted 走路带_风
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ自学入门相关的知识,希望对你有一定的参考价值。
// 连接RabbitMQ 简单工具类
public static Connection getConnection() throws Exception
//1. 定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 获取服务地址
factory.setHost("127.0.0.1");
//3. AMQP 5672
factory.setPort(5672);
//4. vhost
factory.setVirtualHost("/test");
//5. 用户名
factory.setUsername("test");
//6. 密码
factory.setPassword("test");
return factory.newConnection();
// 获取连接,生成通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
1)simple - 简单队列
涉及3个对象: 生产者 队列 RabbitMQ 消费者
生产者:
// 申明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 推送信息
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
消费者:
// 申明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel)
//获取到达的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
String msg = new String(body, "utf-8");
// 消费消息
System.out.println("new api recv: " + msg);
;
//4. 监听队列
channel.basicConsume(QUEUE_NAME,true,consumer);
缺点:耦合性高,生产者一 一对应消费者。
2)work queues - 工作队列
channel.basicQos(1); //保证每次只分发一个
boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
autoAck true 代表自动应答,
false代表需要如下手动应答:
channel.basicAck(envelope.getDeliveryTag(),false);
轮询分发:自动应答,任务消息总是均匀分配
公平分发:手动应答,能者多劳
3)消息持久化
Boolean durable = true;// 持久化,注意在已存在的队列上修改是不行的,可以删除原有队列或重新生成队列。
channel.queueDeclare(QUEUE_NAME,durable ,false,false,null);
4)订阅模式 publish/subscribe
- 一个生产者,多个消费者
- 每一个消费者都有自己的队列
- 生产者没有直接把消息发送到队列 而是发送到了交换机 转发器(exchange)
- 每个队列都要绑定到交换机上,
- 生产者发送的消息 经过交换机 达到队列 就能实现一个消息被多个消费者消费
生产者:
channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); // 声明交换机,"fanout"为分发
消费者:
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");//绑定队列到交换机转发器
5)Exchange(交换机 转发器)
一方面是接收生产者的消息,另一方面是向队列推送消息。
匿名转发 ""
fanout(不处理路由键)
direct (处理路由器)
6)路由模式
生产者:
channel.exchangeDeclare(EXCHANGE_NAME,"direct"); // 声明交换机
String routingKey = "info"; // error、warn.. 绑定路由
channel.basicPublish(EXCHANGE_NAME,routingKey ,null,msg.getBytes());
消费者:
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");// 消费者绑定到路由
7)事务机制
txSelect txCommit txRollback
txSelect:用户将当前 channel 设置成 transaction 模式
txCommit :用于提交事务;
txRollback:回滚事务。
这种模式很耗时,采用这种方式,降低了 RabbitMQ 的消息吞吐量
8)Conform 模式
生产者端 conform 模式的实现原理:
生产者将信道设置成 conform 模式,一旦信道进入 conform 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经到达目的队列了,如果消息和队列是持久化的,那么确认消息会将消息写入磁盘之后发出,broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
Conform 模式最大的好处就是他是异步。
开启 conform 模式 Channel.conformSelect()
编程模式:
①普通 发一条 waitForConfirms()
②批量的 发一批 channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException
③异步 conform 模式:提供一个 回调方法
异步模式:
channel 对象提供的 ConfirmListener() 回调方法只包含 deliveryTag (当前 channel 发出的消息序列),我们需要自己为每一个 Channel 维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中元素加 1 ,每回调一次 handleAck 方法,unconfirm 集合删掉相应的一条(multiple = false)或多条(multiple = true)记录。从程序运行效率上看,这个 unconfirm 集合最好采用有序集合 SortedSed 存储结构。
例如:
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener()
public void handleAck(long deliveryTag, boolean multiple) throws IOException
if (multiple)
confirmSet.headSet(deliveryTag + 1L).clear();
else
confirmSet.remove(deliveryTag);
public void handleNack(long deliveryTag, boolean multiple) throws IOException
if (multiple)
confirmSet.headSet(deliveryTag + 1L).clear();
else
confirmSet.remove(deliveryTag);
);
以上是关于RabbitMQ自学入门的主要内容,如果未能解决你的问题,请参考以下文章