rabbitMQ学习
Posted baoguochun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitMQ学习相关的知识,希望对你有一定的参考价值。
1,消息队列解决什么问题
异步处理
应用解耦
流量消峰(高并发、 秒杀)
日志处理......
2,virtual host
相当于mysql数据库的db,一般以/开头
授权
3,使用java开发rabbitMQ
(1) 简单队列
p:消息生产者
红色:消息队列
c:消息消费者
3个对象,生产者、消息队列、消费者
使用步骤
生产端:
1,获取链接
2,获取通道
3,创建队列声明
4,发布消息
消费端:
1,获取链接
2,创建通道
3,定义队列消费者
4,监听队列
简单队列的不足:耦合性高,一一对应消费者,如果有多个消费者就无法使用;
work queues 工作队列
4,工作队列(work queues)
分为两种 模式 轮询和公平
协议(amqp)
为什么会出现工作队列,simple queue(round )是一一对应,实际开发中,生产者发送消息是好不费力,而消费者一般是需要跟业务相结合,消费者接收到消息就需要处理,可能需要花费时间,
这时候队列就会积压很多消息;
轮训分发的机制,不管谁忙谁闲都是你一个完成一个进行分发;
公平分发机制-fair dipatch:生产端,basicQos(perfetchCount=1); 消费成功后告诉MQ,才再发一条消息;使用公平分发必须关闭自动应答ack ;能者多劳!
5,消息应答与消息持久化
消息应答:
boolean aotoAck=true;
channel.basicConsume("mySimpleQueue", aotoAck, consumer);
boolean aotoAck=true(自动确认模式)一旦 rabbitMQ将消息分发给消费者就会从内存中删除该消息,
这种情况下如果消费端服务出现异常,未能正确的消费消息,该消息会丢失;
boolean aotoAck=false(手动模式),如果有消费者异常,就会将该消息交付给其他消费者rabbitMQ支持消费应答,消费者发送一个消息应答,告诉RabbitMQ已已消费完成,
然后rabbitMQ可以删除该消息,消息应答默认是打开的-false
消息持久化:
当RabbitMQ服务宕机,存储在内存中的消息会丢失,这是就需要消息持久化;
channel.queueDeclare("mySimpleQueue", durable, false, false, null);
durable:持久化
6,订阅模式publish/subscribe
以上的消息只能由一个消费者消费,不能给多个消费者消费!
订阅模式:
1,一个生产者多个消费者,
2,每个消费者都有自己的队列,
3,生产者没有将消息直接发送给队列,而是发送给交换机或者叫转发器exchange
4,每个队列都要绑定到交换机
5,生产者发送的消息经过交换机,然后到达队列,就能实现一个消息被多个消费者消费
代码:
生产者:
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//发送消息
String exchange="exchageMSG";
channel.basicPublish(EXCHANGE_NAME, "", null, exchange.getBytes());
channel.close();
connection.close();
发送消息后图形化界面有新的交换机生成,消息并没有存储,因为rabbitMQ中只有队列能存储消息!
消费者1-代码
public static final String EXCHANGE_NAME="my_exchange";
public static final String QUEUENAME="test_send_email_quene";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUENAME, false, false, false, null);
//绑定队列到交换机exchange
channel.queueBind(QUEUENAME, EXCHANGE_NAME, "");
channel.basicQos(1);//保证一次只分发一个
//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
com.rabbitmq.client.Envelope envelope,
com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws
IOException {
String msg = new String(body);
System.out.println(msg);
channel.basicAck(envelope.getDeliveryTag(), false);
};
};
boolean autoAck=false;
channel.basicConsume(QUEUENAME, autoAck, consumer);
}
消费者2代码
public static final String EXCHANGE_NAME="my_exchange";
public static final String QUEUENAME="test_send_message_quene";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUENAME, false, false, false, null);
//绑定队列到交换机exchange
channel.queueBind(QUEUENAME, EXCHANGE_NAME, "");
channel.basicQos(1);//保证一次只分发一个
//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,
com.rabbitmq.client.Envelope envelope,
com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws
IOException {
String msg = new String(body);
System.out.println(msg);
channel.basicAck(envelope.getDeliveryTag(), false);
};
};
boolean autoAck=false;
channel.basicConsume(QUEUENAME, autoAck, consumer);
}
可视化界面截图
7,交换机-转发器(Exchange)
一方面是接受生产者的消息,另一方面是向队列推送消息
匿名转发 声明路由时不指定routingkey,exchangeName "";
fanout(不处理路由键)
Direct(处理路由键)
8,路由模式
(
rabbit使用步骤
服务端发送消息:1,获取链接;2,通过链接获取通道;3;通过通道声明交换机;4,发布消息(包括交换机和路由); 5,关闭资源
消费端消费消息: 1,获取链接;2,获取通道;3,通过通道声明队列;4,绑定交换机和路由;5,创建consumer,重写handledelivery方法,获取消息 ;6,监听consumer ;
)
生产者代码
public static final String EXCHANGE_NAME="rounting_exchange";
public static final String ROUTINGKEY="info";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message="my rounting exchange";
channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes());
System.out.println("send...");
channel.close();
connection.close();
}
消费者1:
public static final String QUEUENAME="test_queue_direct_1";
public static final String EXCHANGE_NAME="rounting_exchange";
public static final String ROUTINGKEY="error";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUENAME, false, false, false, null);
channel.basicQos(1);//保证一次只发一个
channel.queueBind(QUEUENAME, EXCHANGE_NAME, ROUTINGKEY);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body);
channel.basicAck(envelope.getDeliveryTag(), false); //告诉MQ已成功收到消息,手动确认
System.out.println(msg);
System.out.println("test_queue_direct_1");
}
};
boolean autoAck=false;//取消自动应答,默认为关闭状态
channel.basicConsume(QUEUENAME, autoAck, consumer);
}
消费者2
public static final String QUENENAME="test_queue_direct_2";
public static final String EXCHANGENAME="rounting_exchange";
public static final String ROUTINGKEYNAME1="error";
public static final String ROUTINGKEYNAME2="info";
public static final String ROUTINGKEYNAME3="warn";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUENENAME, false, false, false, null);
channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME1);
channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME2);
channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME3);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg=new String(body);
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println(msg);
System.out.println(QUENENAME);
}
};
boolean autoAck=false;//关闭自动确认
channel.basicConsume(QUENENAME, autoAck, consumer);
}
9,路由主题模式
将路由和谋模式(主题)匹配
#匹配一个货多个
.匹配一个
官方模型:
10,rabbitMQ消息确认机制(事物+confirm)
在rabbitMQ中可以通过持久化数据,解决RabbtMQ服务器异常的数据丢失问题。
问题:生产者将消息发送出去之后消息到底有没有到达rabbitMQ服务器?
默认情况下是无法知晓的,
两种方式解决问题:
AMQP 协议实现了事务机制
confirm模式
事务机制:
txSelect txCommit txRollBack
txSelect用于将当前channel设置成transaction模式
txCommit用于提交事务
txRollback用于回滚
以上操作都是生产者操作
缺点:降级消息的吞吐量;
confirm 模式:
生产者confirm模式的实现原理
成产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有的匹配队列之后,
broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确的到达目的队列了,如果消息和队列是可持久化的,那么确认消息后会将消息写
进磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic。ack的multiple域,表示这个序列号之前的所有消息
已经得到处理。
confirm模式最大的好处是它是异步处理;
Nack
开启confirm模式
channel.confirmSelect();
编程模式
1,普通 发一条 waitForConfirms();
2,批量 发一批waitForConfirms();
3,异步confirm 模式,提供一个回调方法
单条代码:
public static final String QUEUENAME="test_confirm_queue1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUENAME, false, false, false, null);
//生产者调用confirmSelect 将channel 设置为confirm模式
channel.confirmSelect();
String msg="provider confirm msg";
channel.basicPublish("", QUEUENAME, null, msg.getBytes());
if(!channel.waitForConfirms()) {
System.out.println("发送失败");
}else {
System.out.println("发送成功");
}
channel.close();
connection.close();
}
批量发送
private static final String QUEUENAME="test_confirm_queue1";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUENAME, false, false, false, null);
String msg = "test_confirm_queue2 msg2 ";
channel.confirmSelect();
for(int i=0;i<10;i++) {
channel.basicPublish("", QUEUENAME, null, (msg+i).getBytes());
}
if(channel.waitForConfirms()) {
System.out.println("批量发送成功");
}else {
System.out.println("批量发送失败");
}
channel.close();
connection.close();
}
异步模式
概述
channel对象提供一个ConfirmLisnenner(),回调方法只包含deliveryTag(当前chanel发出的消息序号)。我们需要自己为每一个Chanel维护一个unconfirm的消息序号集合,
每publish一条数据,集合中元素加1,每回调一次handleAck方法,unConfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上上看,
这个集合最好采用有序集合sortedSet 存储结构;
以上是关于rabbitMQ学习的主要内容,如果未能解决你的问题,请参考以下文章