rabbitmq
Posted tjqblog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq相关的知识,希望对你有一定的参考价值。
AMQP 协议模型
server: 又称broker,作用是接受客户端连接。
channel :网络信道 :几乎所有的操作都在channel上进行。客户端可以建立多个channel,每个channel代表一个会话任务,类似于数据库的session。
message:传递的实体数据,其结构由两部分组成 properties和body,前者是对消息的设置,比如消息的优先级,延迟等等。后者就是消息的内容。
virtual host:虚拟地址,用于逻辑隔离了(类似于redis的数据库的逻辑隔离) ,是最上层的消息路由,一个v host 可以有多个 exchange 和queue ,但是对于同一个 vhost 不能有相 同名称的exchange 和queue。
exchange :交换机 作用是接受消息, 根据路由键转发消息到绑定的queue。
binding : exchange 和queue之间的虚拟连接 也可是exchange 与exchange之间连接,binding可以包含路由键(routing key)
routing key :路由规则
queue:也称message queue,消息队列。
rabbitmq的整体架构及消息的流转
centos下的rabbitmq的安装
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
下载:
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
rabbitmq与erlang版本的对应关系 见官网
http://www.rabbitmq.com/which-erlang.html
rpm-ivh erlang-18.3-1.el7.centos.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
如果上面第二部出现问题,可以尝试一下步骤
rpm-ivh erlang-18.3-1.el7.centos.x86_64.rpm
yum -y install epel-release
yum -y install socat
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
修改配置文件:
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
比如修改密码、配置等等,例如:loopback_users 中的 <<"guest">>,只保留guest
服务启动和停止:
启动 rabbitmq-server start & 或者 service rabbitmq-server start rabbitmqctl start_app
可以看到 日志存放路径 日志名称默认是[email protected]主机名.log ,当然我们也可以修改主机名
停止 rabbitmqctl stop_app 或者 service rabbitmq-server stop 或者 kill -9
lsof -i:5672
管理插件:rabbitmq-plugins enable rabbitmq_management (打开管控台 才能访问管控台页面)
访问地址:http://116.62.220.199:15672/
密码和用户名均为 guest
可以使用rabbitmqctl 命令, 来添加用户 删除用户 列出用户列表 清除用户权限等等 也可以对vhost 队列进行丰富的操作,是一个很丰富的命令。 这些命令在管控台都有对应的操作。
rabbitmqctl reset 移除所有数据(须在rabbitmqctl start_app之后使用)
使用java客户端的hello world
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency>
package com.bujiang.magic.rabbitmq.quickstart; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Procuder { public static void main(String[] args)throws Exception { //1 创建工厂 ConnectionFactory factory =new ConnectionFactory(); factory.setHost("116.62.220.199"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("admin"); factory.setPassword("admin"); //2 从工厂获取连接 Connection connection = factory.newConnection(); //3 通过连接获取channel Channel channel = connection.createChannel(); //4 通过channel发布数据 for (int i = 0; i < 5; i++) { // channel.basicPublish(exchange, routingKey, props, body); //这里不指定exchange 只指定routingKey 这时mq走的是AMQP default模式 这种模式是routingKey 直接去queue里面找 找到就路由过去 channel.basicPublish("", "test001", null, ("Hello MQ"+i).getBytes()); } //关闭资源 channel.close(); connection.close(); } }
public class Consumer { public static void main(String[] args)throws Exception { //1 创建工厂 ConnectionFactory factory =new ConnectionFactory(); factory.setHost("116.62.220.199"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("admin"); factory.setPassword("admin"); //2 从工厂获取连接 Connection connection = factory.newConnection(); //3 通过连接获取channel Channel channel = connection.createChannel(); //4 创建一个队列 // channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments) String queueName = "test002"; channel.queueDeclare(queueName, true, false, false, null); //5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); // channel.basicConsume(queue, autoAck, callback) channel.basicConsume(queueName, true, queueingConsumer); //获取消息 for(;;) { Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("消费端收到消息---"+msg); BasicProperties pros = delivery.getProperties(); System.out.println(pros.toString()); Envelope envelope = delivery.getEnvelope(); System.out.println(envelope); } //关闭资源 // channel.close(); // connection.close(); } }
先启动 消费端 创建队列,再启动生产端 生产消息。
交换机
type 交换机的类型
direct :直连模式要求生产者的routekey完全和消费者的routekey匹配才会被接收,否则直接抛弃。
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer4DirectExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String exchangeType = "direct"; String queueName = "test_direct_queue"; String routingKey = "test.direct"; //表示声明了一个交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); //表示声明了一个队列 channel.queueDeclare(queueName, false, false, false, null); //建立一个绑定关系: channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4DirectExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_direct_exchange"; String routingKey = "test.direct";//这里的key 如果与消费者的key不一致,那么消费者将收不到消息 //5 发送 String msg = "Hello World RabbitMQ 4 Direct Exchange Message 111 ... "; channel.basicPublish(exchangeName, routingKey , null , msg.getBytes()); } }
topic 模式:所有发送到Topic Exchange 的消息被转发到所有关心的routeKey中指定的Topic的queue上,#匹配一个或者多个词,*匹配一个词,比如 log.#, 可以匹配log.aa log.aa.bbb等等,而log.* 只能匹配到log.后面一个词 如log.txt
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer4TopicExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_topic_exchange"; String exchangeType = "topic"; String queueName = "test_topic_queue"; String routingKey = "user.*"; // String routingKey = "user.#";//这里如果上一个user.* 使用过,然后现在使用user.#,那么这两个key将会同时存在 //这显然会混淆,因此我们需要在管控台手动在queues或者exchange上解绑 // 1 声明交换机 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); // 2 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 3 建立交换机和队列的绑定关系: channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4TopicExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_topic_exchange"; String routingKey1 = "user.save"; String routingKey2 = "user.update"; String routingKey3 = "user.delete.abc"; //5 发送 String msg = " Hello World RabbitMQ 4 Topic Exchange Message ..."; channel.basicPublish(exchangeName, routingKey1 , null , (routingKey1+msg).getBytes()); channel.basicPublish(exchangeName, routingKey2 , null , (routingKey2+msg).getBytes()); channel.basicPublish(exchangeName, routingKey3 , null , (routingKey3+msg).getBytes()); channel.close(); connection.close(); } }
fanout :不处理路由键,只需要简单的将队列绑定到交换机上,发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。这种模式的性能上最好的
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer4FanoutExchange { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory() ; connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(3000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_fanout_exchange"; String exchangeType = "fanout"; String queueName = "test_fanout_queue"; String routingKey = "jjkjkasasa"; //不设置路由键 设不设置都无所谓 channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //durable 是否持久化消息 QueueingConsumer consumer = new QueueingConsumer(channel); //参数:队列名称、是否自动ACK、Consumer channel.basicConsume(queueName, true, consumer); //循环获取消息 while(true){ //获取消息,如果没有消息,这一步将会一直阻塞 Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("收到消息:" + msg); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer4FanoutExchange { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 创建Connection Connection connection = connectionFactory.newConnection(); //3 创建Channel Channel channel = connection.createChannel(); //4 声明 String exchangeName = "test_fanout_exchange"; //5 发送 for(int i = 0; i < 10; i ++) { String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ..."; channel.basicPublish(exchangeName, "", null , msg.getBytes());//这里的routekey 设置不设置都无所谓 } channel.close(); connection.close(); } }
headers 这种方式用的比较少
durability 持久化,为true 时,rabbitmq 重启后 数据依然存在
auto delete:当最后绑定到exchange 上的队列删除后 ,自动删除该exchange。这个与上面消费者代码的autodelete属性是相反的,当队列无任何exchange关联时,该队列自动删除
internal :这个属性基本是为false就可以了。
Message的其他属性
import java.util.Map; import com.rabbitmq.client.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { public static void main(String[] args) throws Exception { //1 创建一个ConnectionFactory, 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //3 通过connection创建一个Channel Channel channel = connection.createChannel(); //4 声明(创建)一个队列 String queueName = "test001"; channel.queueDeclare(queueName, true, false, false, null); //5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6 设置Channel channel.basicConsume(queueName, true, queueingConsumer); while(true){ //7 获取消息 Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端: " + msg); BasicProperties properties = delivery.getProperties(); Map<String, Object> headers = properties.getHeaders(); System.err.println("headers get my1 value: " + headers.get("my1")); System.err.println(properties); //Envelope envelope = delivery.getEnvelope(); } } }
import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Procuder { public static void main(String[] args) throws Exception { //1 创建一个ConnectionFactory, 并进行配置 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 通过连接工厂创建连接 Connection connection = connectionFactory.newConnection(); //3 通过connection创建一个Channel Channel channel = connection.createChannel(); Map<String, Object> headers = new HashMap<>(); headers.put("my1", "111"); headers.put("my2", "222"); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2)//2 表示持久化消息,消息发到服务器上 假设未被消费 如果服务器重启,那么消息依然会存在,如果是1则表示服务器重启 消息将会被清除 .contentEncoding("UTF-8")//设置字符集编码 .expiration("10000")//设置过期时间 10000毫秒 过期,如果在这时间内 没有被消费 那么消息将会被 自动的移除 .headers(headers)//自定义属性 .build(); //4 通过Channel发送数据 for(int i=0; i < 5; i++){ String msg = "Hello RabbitMQ!"; //1 exchange 2 routingKey channel.basicPublish("", "test001", properties, msg.getBytes()); } //5 记得要关闭相关的连接 channel.close(); connection.close(); } }
如何保证消息的100%的投递成功。
幂等性 :对一件事情执行X次,可能100次 1000次,不管多少次,结果都是一致的 叫做幂等性。
消费端 如何保证幂等性 避免重复消费。(在海量订单生产的时候 难免出现消息的重复投递等问题,或者网络原因导致闪断 导致重发消息)。
1 唯一id+指纹码机智,利用数据库主键去重(好处:实现简单,坏处:有高并发的数据库写入瓶颈)
2 利用redis的原子性去实现 (redis缓存与数据库如何做到原子性?假设不入mysql库 全存到缓存中,那么如何设置定时同步策略?)
confirm确认消息
生产者投递消息后,rabbitmq broker 接收到消息 则会给生产者一个应答,生产者接收应答 用来确定这条消息是否正常发送到broker上,这种方式是消息可靠性投递的核心保障。
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 获取C onnection Connection connection = connectionFactory.newConnection(); //3 通过Connection创建一个新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.#"; String queueName = "test_confirm_queue"; //4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key channel.exchangeDeclare(exchangeName, "topic", true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费端: " + msg); } } }
import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { //1 创建ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 获取C onnection Connection connection = connectionFactory.newConnection(); //3 通过Connection创建一个新的Channel Channel channel = connection.createChannel(); //4 指定我们的消息投递模式: 消息的确认模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.save"; //5 发送一条消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); //6 添加一个确认监听 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!-----------"); //在一些情况下 比如磁盘写满 mq异常 或者queue容量到了上限 } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!-----------"); } }); } }
return消息机制
用于处理一些不可路由的消息,有些时候 当我们在发送消息的时候 exchange不存在或者routekey路由不到指定队列 我们需要去监听这种不可达的消息 需要使用return listen 来统一跟踪。
Mandatory:如果为true,则监听器会接收到不可达消息 进行后续处理,如果为false 那么对于不可达消息 broker端会自动删除。
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_return_exchange"; String routingKey = "return.#"; String queueName = "test_return_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery delivery = queueingConsumer.nextDelivery(); String msg = new String(delivery.getBody()); System.err.println("消费者: " + msg); } } }
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ReturnListener; import com.rabbitmq.client.AMQP.BasicProperties; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_return_exchange"; String routingKey = "return.save"; String routingKeyError = "abc.save"; String msg = "Hello RabbitMQ Return Message"; channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("---------handle return----------"); System.err.println("replyCode: " + replyCode); System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); //如果第三个参数 设置为false 那么将会被broker直接删除 不会被handleReturn监听 // channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } }
消费端的自定义监听
上面的所有代码中都是通过死循环来获取消息 ,下面我们通过自定义监听来做
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_consumer_exchange"; String routingKey = "consumer.#"; String queueName = "test_consumer_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //注意这里 自定义了一个监听 channel.basicConsume(queueName, true, new MyConsumer(channel)); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_consumer_exchange"; String routingKey = "consumer.save"; String msg = "Hello RabbitMQ Consumer Message"; for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
消费端的限流
假设mq服务器囤积了上万条未处理的数据,这时我们随机打开一个客户端 巨量的消息会瞬间推送过来 但是我们单个客户端无法同时处理这么多的数据。rabbitmq提供了一种qos(服务质量保证)的功能,即在非自动签收(ack = false)的前提下,如果一定数量的消息未被确认签收前,则 不进行新的消息消费。因此在真正的工作中 基本不会设置自动签收=true;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_qos_exchange"; String routingKey = "qos.save"; String msg = "Hello RabbitMQ QOS Message"; for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "qos.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //1 限流方式 第一件事就是 autoAck设置为 false // channel.basicQos(prefetchSize, prefetchCount, global); // prefetchSize:消息的大小限制,这里0表示不限制 // prefetchCount:最多同时消费数量 // global: 为true表示 channel级别,false表示consumer级别 channel.basicQos(0, 1, false); //这里的auto ack 必须设置为false channel.basicConsume(queueName, false, new MyConsumer(channel)); } }
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MyConsumer extends DefaultConsumer { //重写channel属性 ,这里必须覆盖这个属性 因为下面要用到 private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); //执行手动签收 // channel.basicAck(deliveryTag, multiple); // deliveryTag:消息标签 // multiple:是否批量签收 channel.basicAck(envelope.getDeliveryTag(), false);//如果这一段代码被注释,那么消费者消费一条消息后进入阻塞状态 等待ack确认,打开则不进入阻塞状态 } }
消费端ack 与重回队列
nack ,对于有些未处理成功的消息,我们返回broker nack信号 让其重新发消息给我们,不行则再进行几次这样的操作,直到达到我们设定的临界值N,这时我们发送ack 信号给broker,然后日志记录异常,最后人工进行补偿或者其他补偿措施。
重回队列,对于没有处理成功的消息,让消息重新扔给broker 让它重新给客户端发送一条消息,实际工作中 一般不设置重回队列。
import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_ack_exchange"; String routingKey = "ack.save"; for(int i =0; i<5; i ++){ Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num", i); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers) .build(); String msg = "Hello RabbitMQ ACK Message " + i; channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String queueName = "test_ack_queue"; String routingKey = "ack.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); // 手工签收 必须要关闭 autoAck channel.basicConsume(queueName, false, new MyConsumer(channel)); } }
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("body: " + new String(body)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if((Integer)properties.getHeaders().get("num") == 2) { //返回nack // channel.basicNack(deliveryTag, multiple, requeue); // requeue:消费失败时是否重新回队列 将消费失败的消息加入到队列的尾端, channel.basicNack(envelope.getDeliveryTag(), false, true); } else { //返回ack channel.basicAck(envelope.getDeliveryTag(), false); } } }
TTL
ttl:与redis的ttl 类似,表示剩余过期时间 表示消息在队列里 xx秒内未被消费,那么消息回被自动清空。
死信队列 DLX ,非常重要
当一个消息变成里死信之后,它能被重新发布到另一个exchange上,这个exchange 对应的队列就是死信队列,这个队列在实际生产中 是保存未被认领的消息的队列,以下情况有可能死DLX
消息被拒绝(basic.reject/basic.nack),并且requeue=false
ttl过期
队列达到最大长度
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_dlx_exchange";//正常交换机 String routingKey = "dlx.save";//正常路由key String msg = "Hello RabbitMQ DLX Message"; for(int i =0; i<1; i ++){ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .build(); channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } } }
import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("116.62.220.199"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 这就是一个普通的交换机 和 队列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); Map<String, Object> agruments = new HashMap<String, Object>(); String deadExchangeName = "aaadlx.exchange";//死信队列的交换机 agruments.put("x-dead-letter-exchange", deadExchangeName); //这个agruments属性,要设置到声明队列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); //要进行死信队列的声明: String deadQueueName = "dlx.queue"; String deadRouteKey = "#";//#表示所有的键都会被路由到这个队列 channel.exchangeDeclare(deadExchangeName, "topic", true, false, null); channel.queueDeclare(deadExchangeName, true, false, false, null); channel.queueBind(deadQueueName, deadExchangeName, deadRouteKey); channel.basicConsume(queueName, true, new MyConsumer(channel)); } }
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
先启动消费端建立好队列,再启动生产端,此时消息正常消费
将消费端关闭,再启动生产端,这时可以发现消息正常落在正常的队列test_dlx_queue上,10秒后 该消息将会从正常的队列中消失,进入死信队列dlx.queue上 被其收集。
spring amqp 的api熟悉
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.5</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
import java.util.UUID; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.support.ConsumerTagStrategy; import org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import com.bfxy.spring.adapter.MessageDelegate; import com.bfxy.spring.convert.ImageMessageConverter; import com.bfxy.spring.convert.PDFMessageConverter; import com.bfxy.spring.convert.TextMessageConverter; @Configuration @ComponentScan({"com.bfxy.spring.*"}) public class RabbitMQConfig { @Bean public ConnectionFactory connectionFactory(){ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("116.62.220.199:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); return connectionFactory; } //这个组件由于管理我们的控制台的 比如交换机的创建 队列的创建 及绑定信息等等 @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.setAutoStartup(true);//这里一定要设置为true,否则spring不会加载RabbitAdmin 这个类 return rabbitAdmin; } /** * 针对消费者配置 * 1. 设置交换机类型 * 2. 将队列绑定到交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 HeadersExchange :通过添加属性key-value匹配 DirectExchange:按照routingkey分发到指定队列 TopicExchange:多关键字匹配 */ @Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //队列持久 } @Bean public Binding binding001() { return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*"); } @Bean public TopicExchange exchange002() { return new TopicExchange("topic002", true, false); } @Bean public Queue queue002() { return new Queue("queue002", true); //队列持久 } @Bean public Binding binding002() { return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*"); } @Bean public Queue queue003() { return new Queue("queue003", true); //队列持久 } @Bean public Binding binding003() { return BindingBuilder.bind(queue003()).to(exchange001()).with("mq.*"); } @Bean public Queue queue_image() { return new Queue("image_queue", true); //队列持久 } @Bean public Queue queue_pdf() { return new Queue("pdf_queue", true); //队列持久 } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } @Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); //添加监控队列 container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf()); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); container.setDefaultRequeueRejected(false);//requeue container.setAcknowledgeMode(AcknowledgeMode.AUTO);// auto ack container.setExposeListenerChannel(true);//设置是否外漏 container.setConsumerTagStrategy(new ConsumerTagStrategy() {//消费端标签策略 @Override public String createConsumerTag(String queue) { return queue + "_" + UUID.randomUUID().toString(); } }); /**消息监听 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { String msg = new String(message.getBody()); System.err.println("----------消费者: " + msg); } }); */ /** * 1 适配器方式. 默认是有自己的方法名字的:handleMessage // 可以自己指定一个方法的名字: consumeMessage // 也可以添加一个转换器: 从字节数组转换为String MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage");//也可以自己直到默认监听方法,MessageListenerAdapter源码中 默认方法是handleMessage adapter.setMessageConverter(new TextMessageConverter());//自定义消息转换器 这样我们可以不必使用byte[],TextMessageConverter 会为我们自动转换 container.setMessageListener(adapter); */ /** * 2 适配器方式: 我们的队列名称 和 方法名称,指定队列的消息与被所绑定的方法处理 * MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setMessageConverter(new TextMessageConverter()); Map<String, String> queueOrTagToMethodName = new HashMap<>(); queueOrTagToMethodName.put("queue001", "method1"); queueOrTagToMethodName.put("queue002", "method2"); adapter.setQueueOrTagToMethodName(queueOrTagToMethodName); container.setMessageListener(adapter); */ // 1.1 支持json格式的转换器 对应testSendJsonMessage方法 /** MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); */ // 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换 /**对应 testSendJavaMessage方法 MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); */ //1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换 //根据自定义标签 来转换java对象 对应testSendMappingMessage测试方法 /** MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper(); Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>(); idClassMapping.put("order", com.bfxy.spring.entity.Order.class); idClassMapping.put("packaged", com.bfxy.spring.entity.Packaged.class); javaTypeMapper.setIdClassMapping(idClassMapping); jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper); adapter.setMessageConverter(jackson2JsonMessageConverter); container.setMessageListener(adapter); */ //1.4 ext convert MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //全局的转换器: 一个大的转换器 对应 testSendExtConverterMessage方法 ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter(); convert.addDelegate("json", jsonConvert); convert.addDelegate("application/json", jsonConvert); ImageMessageConverter imageConverter = new ImageMessageConverter(); convert.addDelegate("image/png", imageConverter); convert.addDelegate("image", imageConverter); PDFMessageConverter pdfConverter = new PDFMessageConverter(); convert.addDelegate("application/pdf", pdfConverter); adapter.setMessageConverter(convert); container.setMessageListener(adapter); return container; } }
import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.bfxy.spring.entity.Order; import com.bfxy.spring.entity.Packaged; import com.fasterxml.jackson.databind.ObjectMapper; @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Test public void contextLoads() { } @Autowired private RabbitAdmin rabbitAdmin; @Test public void testAdmin() throws Exception { rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false)); rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false)); rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false)); rabbitAdmin.declareQueue(new Queue("test.direct.queue", false)); rabbitAdmin.declareQueue(new Queue("test.topic.queue", false)); rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false)); //直连绑定 rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test.direct", "direct", new HashMap<>())); //topic 绑定 rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.topic.queue", false)) //直接创建队列 ,前面已经创建队列 这里再创建不会影响 .to(new TopicExchange("test.topic", false, false)) //直接创建交换机 建立关联关系 .with("user.#")); //指定路由Key //fanout绑定 rabbitAdmin.declareBinding( BindingBuilder .bind(new Queue("test.fanout.queue", false)) .to(new FanoutExchange("test.fanout", false, false))); //清空队列数据 rabbitAdmin.purgeQueue("test.topic.queue", false); } @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("desc", "信息描述.."); messageProperties.getHeaders().put("type", "自定义消息类型.."); Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties); rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() { //发送前 可以修改消息 @Override public Message postProcessMessage(Message message) throws AmqpException { System.err.println("------添加额外的设置---------"); message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述"); message.getMessageProperties().getHeaders().put("attr", "额外新加的属性"); return message; } }); } @Test public void testSendMessage2() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!"); rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!"); } @Test public void testSendMessage4Text() throws Exception { //1 创建消息 MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("text/plain"); Message message = new Message("mq 消息1234".getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.abc", message); rabbitTemplate.send("topic002", "rabbit.abc", message); } @Test public void testSendJsonMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("消息订单"); order.setContent("描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); } @Test public void testSendJavaMessage() throws Exception { Order order = new Order(); order.setId("001"); order.setName("订单消息"); order.setContent("订单描述信息"); ObjectMapper mapper = new ObjectMapper(); String json = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json); MessageProperties messageProperties = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties.setContentType("application/json"); //__TypeId__ 这个值是固定的 messageProperties.getHeaders().put("__TypeId__", "com.bfxy.spring.entity.Order"); Message message = new Message(json.getBytes(), messageProperties); rabbitTemplate.send("topic001", "spring.order", message); } @Test public void testSendMappingMessage() throws Exception { ObjectMapper mapper = new ObjectMapper(); Order order = new Order(); order.setId("001"); order.setName("订单消息"); order.setContent("订单描述信息"); String json1 = mapper.writeValueAsString(order); System.err.println("order 4 json: " + json1); MessageProperties messageProperties1 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties1.setContentType("application/json"); //这里无需写类的全路径了 写标签即可 messageProperties1.getHeaders().put("__TypeId__", "order"); Message message1 = new Message(json1.getBytes(), messageProperties1); rabbitTemplate.send("topic001", "spring.order", message1); Packaged pack = new Packaged(); pack.setId("002"); pack.setName("包裹消息"); pack.setDescription("包裹描述信息"); String json2 = mapper.writeValueAsString(pack); System.err.println("pack 4 json: " + json2); MessageProperties messageProperties2 = new MessageProperties(); //这里注意一定要修改contentType为 application/json messageProperties2.setContentType("application/json"); messageProperties2.getHeaders().put("__TypeId__", "packaged"); Message message2 = new Message(json2.getBytes(), messageProperties2); rabbitTemplate.send("topic001", "spring.pack", message2); } @Test public void testSendExtConverterMessage() throws Exception { // byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "picture.png")); // MessageProperties messageProperties = new MessageProperties(); // messageProperties.setContentType("image/png"); // messageProperties.getHeaders().put("extName", "png"); // Message message = new Message(body, messageProperties); // rabbitTemplate.send("", "image_queue", message); byte[] body = Files.readAllBytes(Paths.get("d:/002_books", "mysql.pdf")); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/pdf"); Message message = new Message(body, messageProperties); rabbitTemplate.send("", "pdf_queue", message); } }
import java.io.File; import java.util.Map; import com.bfxy.spring.entity.Order; import com.bfxy.spring.entity.Packaged; public class MessageDelegate { public void handleMessage(byte[] messageBody) { System.err.println("默认方法, 消息内容:" + new String(messageBody)); } public void consumeMessage(byte[] messageBody) { System.err.println("字节数组方法, 消息内容:" + new String(messageBody)); } public void consumeMessage(String messageBody) { System.err.println("字符串方法, 消息内容:" + messageBody); } public void method1(String messageBody) { System.err.println("method1 收到消息内容:" + new String(messageBody)); } public void method2(String messageBody) { System.err.println("method2 收到消息内容:" + new String(messageBody)); } public void consumeMessage(Map messageBody) { System.err.println("map方法, 消息内容:" + messageBody); } public void consumeMessage(Order order) { System.err.println("order对象, 消息内容, id: " + order.getId() + ", name: " + order.getName() + ", content: "+ order.getContent()); } public void consumeMessage(Packaged pack) { System.err.println("package对象, 消息内容, id: " + pack.getId() + ", name: " + pack.getName() + ", content: "+ pack.getDescription()); } public void consumeMessage(File file) { System.err.println("文件对象 方法, 消息内容:" + file.getName()); } }
public class ConverterBody { private byte[] body; public ConverterBody() { } public ConverterBody(byte[] body) { this.body = body; } public byte[] getBody() { return body; } public void setBody(byte[] body) { this.body = body; } }
import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.UUID; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; public class ImageMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------Image MessageConverter----------"); Object _extName = message.getMessageProperties().getHeaders().get("extName"); String extName = _extName == null ? "png" : _extName.toString(); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "d:/010_test/" + fileName + "." + extName; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.UUID; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; public class PDFMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { throw new MessageConversionException(" convert error ! "); } @Override public Object fromMessage(Message message) throws MessageConversionException { System.err.println("-----------PDF MessageConverter----------"); byte[] body = message.getBody(); String fileName = UUID.randomUUID().toString(); String path = "d:/010_test/" + fileName + ".pdf"; File f = new File(path); try { Files.copy(new ByteArrayInputStream(body), f.toPath()); } catch (IOException e) { e.printStackTrace(); } return f; } }
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; public class TextMessageConverter implements MessageConverter { //java 对象转message对象 @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } //message对象转java对象 @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if(null != contentType && contentType.contains("text")) { return new String(message.getBody()); } return message.getBody(); } }
public class Order { private String id; private String name; private String content; public Order() { } public Order(String id, String name, String content) { this.id = id; this.name = name; this.content = content; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
public class Packaged { private String id; private String name; private String description; public Packaged() { } public Packaged(String id, String name, String description) { this.id = id; this.name = name; this.description = description; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } }
spring boot中使用 rabbitmq
生产端
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring.rabbitmq.addresses=116.62.220.199:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
import java.util.Map; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import com.bfxy.springboot.entity.Order; @Component public class RabbitSender { //自动注入RabbitTemplate模板类 @Autowired private RabbitTemplate rabbitTemplate; //回调函数: confirm确认 final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("correlationData: " + correlationData); System.err.println("ack: " + ack); if(!ack){ System.err.println("异常处理...."); } } }; //回调函数: return返回 final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { System.err.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); } }; //发送消息方法调用: 构建Message消息 public void send(Object message, Map<String, Object> properties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData("1234567890"); rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData); } //发送消息方法调用: 构建自定义对象消息 public void sendOrder(Order order) throws Exception { rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData("0987654321");//这个就是消息的id 须做到全局唯一 做补偿策略的时候 需要通过这个id来找到这条消息 rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData); package com.bfxy.springboot.entity;
import java.io.Serializable; public class Order implements Serializable { private String id; private String name; public Order() { } public Order(String id, String name) { super(); this.id = id; this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.bfxy.springboot.entity.Order; import com.bfxy.springboot.producer.RabbitSender; @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Test public void contextLoads() { } @Autowired private RabbitSender rabbitSender; private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Test public void testSender1() throws Exception { Map<String, Object> properties = new HashMap<>(); properties.put("number", "12345"); properties.put("send_time", simpleDateFormat.format(new Date())); rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties); } @Test public void testSender2() throws Exception { Order order = new Order("001", "第一个订单"); rabbitSender.sendOrder(order); } }
消费端
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
spring.rabbitmq.addresses=116.62.220.199:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
import java.util.Map; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component public class RabbitReceiver { @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-1", durable="true"), exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { System.err.println("--------------------------------------"); System.err.println("消费端Payload: " + message.getPayload()); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); //手工ACK channel.basicAck(deliveryTag, false); } /** * * spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-1 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=springboot.* * @param order * @param channel * @param headers * @throws Exception */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"), key = "${spring.rabbitmq.listener.order.key}" ) ) @RabbitHandler public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception { System.err.println("--------------------------------------"); System.err.println("消费端order: " + order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); //手工ACK channel.basicAck(deliveryTag, false); } }
import java.io.Serializable; public class Order implements Serializable { private String id; private String name; public Order() { } public Order(String id, String name) { super(); this.id = id; this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
spring cloud stream ,非常好的组件 可以实现 生产端与消费端为不同类型的mq ,如生产端用rabbitmq 消费端使用kafka。其劣势是没法保证消息的100%投递,可能存在消息丢失的问题。其核心概念如下图
rabbitmq 主备模式
以上是关于rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章
带着新人学springboot的应用07(springboot+RabbitMQ 下)