rabbitMQ 学习
Posted baoguochun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitMQ 学习相关的知识,希望对你有一定的参考价值。
1,消息队列解决什么问题
异步处理
应用解耦
流量消峰(高并发、 秒杀)
日志处理......
2,virtual host
相当于mysql数据库的db,一般以/开头
授权
3,使用java开发rabbitMQ
(1) 简单队列
p:消息生产者
红色:消息队列
c:消息消费者
3个对象,生产者、消息队列、消费者
使用步骤
生产端:
1,获取链接
2,获取通道
3,创建队列声明
4,发布消息
消费端:
1,获取链接
2,创建通道
3,定义队列消费者
4,监听队列
简单队列的不足:耦合性高,一一对应消费者,如果有多个消费者就无法使用;
work queues 工作队列
4,工作队列(work queues)
分为两种 模式 轮询和公平
协议(amqp)
为什么会出现工作队列,simple queue(round )是一一对应,实际开发中,生产者发送消息是好不费力,而消费者一般是需要跟业务相结合,消费者接收到消息就需要处理,可能需要花费时间,
这时候队列就会积压很多消息;
轮训分发的机制,不管谁忙谁闲都是你一个完成一个进行分发;
公平分发机制-fair dipatch:生产端,basicQos(perfetchCount=1); 消费成功后告诉MQ,才再发一条消息;使用公平分发必须关闭自动应答ack ;能者多劳!
5,消息应答与消息持久化
消息应答:
boolean aotoAck=true;
channel.basicConsume("mySimpleQueue", aotoAck, consumer);
boolean aotoAck=true(自动确认模式)一旦 rabbitMQ将消息分发给消费者就会从内存中删除该消息,
这种情况下如果消费端服务出现异常,未能正确的消费消息,该消息会丢失;
boolean aotoAck=false(手动模式),如果有消费者异常,就会将该消息交付给其他消费者rabbitMQ支持消费应答,消费者发送一个消息应答,告诉RabbitMQ已已消费完成,
然后rabbitMQ可以删除该消息,消息应答默认是打开的-false
消息持久化:
当RabbitMQ服务宕机,存储在内存中的消息会丢失,这是就需要消息持久化;
channel.queueDeclare("mySimpleQueue", durable, false, false, null);
durable:持久化
6,订阅模式publish/subscribe
以上的消息只能由一个消费者消费,不能给多个消费者消费!
订阅模式:
1,一个生产者多个消费者,
2,每个消费者都有自己的队列,
3,生产者没有将消息直接发送给队列,而是发送给交换机或者叫转发器exchange
4,每个队列都要绑定到交换机
5,生产者发送的消息经过交换机,然后到达队列,就能实现一个消息被多个消费者消费
代码:
生产者:
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//发送消息
String exchange="exchageMSG";
channel.basicPublish(EXCHANGE_NAME, "", null, exchange.getBytes());
channel.close();
connection.close();
发送消息后图形化界面有新的交换机生成,消息并没有存储,因为rabbitMQ中只有队列能存储消息!
消费者1-代码
public static final String EXCHANGE_NAME="my_exchange";
public static final String QUEUENAME="test_send_email_quene";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUENAME, false, false, false, null);
//绑定队列到交换机exchange
channel.queueBind(QUEUENAME, EXCHANGE_NAME, "");
channel.basicQos(1);//保证一次只分发一个
//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println(msg);
channel.basicAck(envelope.getDeliveryTag(), false);
};
};
boolean autoAck=false;
channel.basicConsume(QUEUENAME, autoAck, consumer);
}
消费者2代码
public static final String EXCHANGE_NAME="my_exchange";
public static final String QUEUENAME="test_send_message_quene";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUENAME, false, false, false, null);
//绑定队列到交换机exchange
channel.queueBind(QUEUENAME, EXCHANGE_NAME, "");
channel.basicQos(1);//保证一次只分发一个
//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println(msg);
channel.basicAck(envelope.getDeliveryTag(), false);
};
};
boolean autoAck=false;
channel.basicConsume(QUEUENAME, autoAck, consumer);
}
可视化界面截图
7,交换机-转发器(Exchange)
一方面是接受生产者的消息,另一方面是向队列推送消息
匿名转发 exchangeName "";
fanout(不处理路由键)
Direct(处理路由键)
8,路由模式
(
rabbit使用步骤
服务端发送消息:1,获取链接;2,通过链接获取通道;3;通过通道声明交换机;4,发布消息(包括交换机和路由); 5,关闭资源
消费端消费消息: 1,获取链接;2,获取通道;3,通过通道声明队列;4,绑定交换机和路由;5,创建consumer,重写handledelivery方法,获取消息 ;6,监听consumer ;
)
生产者代码
public static final String EXCHANGE_NAME="rounting_exchange";
public static final String ROUTINGKEY="info";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message="my rounting exchange";
channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes());
System.out.println("send...");
channel.close();
connection.close();
}
消费者1:
public static final String QUEUENAME="test_queue_direct_1";
public static final String EXCHANGE_NAME="rounting_exchange";
public static final String ROUTINGKEY="error";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUENAME, false, false, false, null);
channel.basicQos(1);//保证一次只发一个
channel.queueBind(QUEUENAME, EXCHANGE_NAME, ROUTINGKEY);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body);
channel.basicAck(envelope.getDeliveryTag(), false); //告诉MQ已成功收到消息,手动确认
System.out.println(msg);
System.out.println("test_queue_direct_1");
}
};
boolean autoAck=false;//取消自动应答,默认为关闭状态
channel.basicConsume(QUEUENAME, autoAck, consumer);
}
消费者2
public static final String QUENENAME="test_queue_direct_2";
public static final String EXCHANGENAME="rounting_exchange";
public static final String ROUTINGKEYNAME1="error";
public static final String ROUTINGKEYNAME2="info";
public static final String ROUTINGKEYNAME3="warn";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUENENAME, false, false, false, null);
channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME1);
channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME2);
channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME3);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String msg=new String(body);
channel.basicAck(envelope.getDeliveryTag(), false);
System.out.println(msg);
System.out.println(QUENENAME);
}
};
boolean autoAck=false;//关闭自动确认
channel.basicConsume(QUENENAME, autoAck, consumer);
}
以上是关于rabbitMQ 学习的主要内容,如果未能解决你的问题,请参考以下文章