喵星之旅-狂奔的兔子-rabbitmq的java客户端使用入门
Posted 喵星兔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了喵星之旅-狂奔的兔子-rabbitmq的java客户端使用入门相关的知识,希望对你有一定的参考价值。
一、简介
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
消息队列都涉及的生产者消费者模型,不做详解,本文只作为快速使用的参考文档。
消息队列主要有点对点和发布订阅模式。
其主要用途是异步、削峰,充当一个缓存的作用。只有可以异步处理时才可以使用消息队列。
官方参考文档地址:
https://www.rabbitmq.com/getstarted.html
需要添加jar包:
amqp-client-3.5.6.jar hamcrest-core-1.3.jar
本文jdk版本1.8。
二、直接使用队列
1、简单队列
特点是生产者消费者一一对应。
生产者一般效率很高,生产者很低,所以一一对应是不对的。所以这个一般不使用。
其结构如图所示:
先创建一个工具类:
package com.bunny.rabbit; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * 连接工具类 */ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class LianjieGongjulei { public static final String SIMPLE_QUEUE_NAME = "simplequeue"; public static final String WORK_QUEUE_NAME = "workqueue"; public static final String PUBSUB_EXCHANGE_NAME = "pubsubexchange"; public static final String ROUTING_EXCHANGE_NAME = "routingexchange"; public static final String TOPIC_EXCHANGE_NAME = "topicexchange"; public static final String PUBSUB_QUEUE_NAME1 = "pubsubqueue1"; public static final String PUBSUB_QUEUE_NAME2 = "pubsubqueue2"; public static final String ROUTING_QUEUE_NAME1 = "routingqueue1"; public static final String ROUTING_QUEUE_NAME2 = "routingqueue2"; public static final String TOPIC_QUEUE_NAME1 = "topicqueue1"; public static final String TOPIC_QUEUE_NAME2 = "topicqueue2"; public static Connection getConnection() { try { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setVirtualHost("/mybunny"); cf.setUsername("bunny"); cf.setPassword("bunny"); return cf.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; } }
生产者:
package com.bunny.rabbit.simple; import java.io.IOException; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * 简单队列生产者 */ public class Shengchanzhe { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); //如果已经存在,可以不声明,声明的话也没错。所以不管是否存在都可以声明。 ch.queueDeclare(LianjieGongjulei.SIMPLE_QUEUE_NAME, false, false, false, null); String mes = "Hello Kitty!"; ch.basicPublish("", LianjieGongjulei.SIMPLE_QUEUE_NAME, null, mes.getBytes()); System.out.println(" bunny Sent \'" + mes + "\'"); ch.close(); c.close(); } }
消费者:
package com.bunny.rabbit.simple; import java.io.IOException; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * 简单队列消费者 */ public class Xiaofeizhe { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); //如果已经存在,可以不声明,声明的话也没错。所以不管是否存在都可以声明。 ch.queueDeclare(LianjieGongjulei.SIMPLE_QUEUE_NAME, false, false, false, null); // 定义消费者 QueueingConsumer consumer = new QueueingConsumer(ch); ch.basicConsume(LianjieGongjulei.SIMPLE_QUEUE_NAME, true, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" bunny Received \'" + message + "\'"); } } }
2、work队列
手工确认则会按照消费端的能力进行分发(公平分发),否则是按照生产者的规则分发(轮询)。
一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费。
如图所示:
生产者:
package com.bunny.rabbit.work; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Shengchanzhe { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); ch.queueDeclare(LianjieGongjulei.WORK_QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++, Thread.sleep(100)) { String mes = "kitty" + i; ch.basicPublish("", LianjieGongjulei.WORK_QUEUE_NAME, null, mes.getBytes()); System.out.println(" bunny Sent \'" + mes + "\'"); } ch.close(); c.close(); } }
消费者1:
package com.bunny.rabbit.work; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.QueueingConsumer; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Xiaofeizhe1 { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); ch.queueDeclare(LianjieGongjulei.WORK_QUEUE_NAME, false, false, false, null); QueueingConsumer consumer = new QueueingConsumer(ch); // 监听队列,false表示手动返回完成状态,true表示自动 ch.basicConsume(LianjieGongjulei.WORK_QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" kitty Received \'" + message + "\'"); Thread.sleep(100); } } }
消费者2:
package com.bunny.rabbit.work; import java.io.IOException; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Xiaofeizhe2 { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); ch.queueDeclare(LianjieGongjulei.WORK_QUEUE_NAME, false, false, false, null); //手工确认用 ch.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(ch); // 监听队列,false表示手动返回完成状态,true表示自动 ch.basicConsume(LianjieGongjulei.WORK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" bunny Received \'" + message + "\'"); Thread.sleep(100); // 表示手动返回完成状态,如果没有下面的一行,将只能获取到上面ch.basicQos(1)里面写的数量 ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
三、通过交换机
1、发布订阅
1个生产者,多个消费者。
每一个消费者都有自己的一个队列。
生产者没有将消息直接发送到队列,而是发送到了交换机。
每个队列都要绑定到交换机。
生产者发送的消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
如图:
生产者:
package com.bunny.rabbit.pubsub; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Fabuzhe { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); //交换器的类型,常见的有direct,fanout,topic等,其中“fanout”是发布订阅 ch.exchangeDeclare(LianjieGongjulei.PUBSUB_EXCHANGE_NAME, "fanout"); String mes = "发布订阅信息"; ch.basicPublish(LianjieGongjulei.PUBSUB_EXCHANGE_NAME, "", null, mes.getBytes()); System.out.println(" pubsub Sent \'" + mes + "\'"); ch.close(); c.close(); } }
消费者1:
package com.bunny.rabbit.pubsub; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Dingyuezhe1 { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); ch.queueDeclare(LianjieGongjulei.PUBSUB_QUEUE_NAME1, false, false, false, null); // 绑定队列到交换机 ch.queueBind(LianjieGongjulei.PUBSUB_QUEUE_NAME1, LianjieGongjulei.PUBSUB_EXCHANGE_NAME, ""); // 监听队列,手动返回完成 ch.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(ch); // 监听队列,手动返回完成 ch.basicConsume(LianjieGongjulei.PUBSUB_QUEUE_NAME1, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String mes = new String(delivery.getBody()); System.out.println(" 订阅者1 Received \'" + mes + "\'"); Thread.sleep(100); // 监听队列,手动返回完成 ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者2:
package com.bunny.rabbit.pubsub; import java.io.IOException; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Dingyuezhe2 { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); ch.queueDeclare(LianjieGongjulei.PUBSUB_QUEUE_NAME2, false, false, false, null); // 绑定队列到交换机 ch.queueBind(LianjieGongjulei.PUBSUB_QUEUE_NAME2, LianjieGongjulei.PUBSUB_EXCHANGE_NAME, ""); // 监听队列,手动返回完成 ch.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(ch); // 监听队列,手动返回完成 ch.basicConsume(LianjieGongjulei.PUBSUB_QUEUE_NAME2, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String mes = new String(delivery.getBody()); System.out.println(" 订阅者2 Received \'" + mes + "\'"); Thread.sleep(100); // 监听队列,手动返回完成 ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
2、路由模式
在前面基础上,改变交换机类型,添加key信息。
如图:
生产者:
package com.bunny.rabbit.routing; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Fabuzhe { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); //交换器的类型,常见的有direct,fanout,topic等,其中“direct”是路由 ch.exchangeDeclare(LianjieGongjulei.ROUTING_EXCHANGE_NAME, "direct"); String mes1 = "路由直连信息1"; String mes2 = "路由直连信息2"; //第二个参数是消息key用于路由用 ch.basicPublish(LianjieGongjulei.ROUTING_EXCHANGE_NAME, "meskey1", null, mes1.getBytes()); ch.basicPublish(LianjieGongjulei.ROUTING_EXCHANGE_NAME, "meskey2", null, mes2.getBytes()); System.out.println(" pubsub Sent \'" + mes1 + "\'"); System.out.println(" pubsub Sent \'" + mes2 + "\'"); ch.close(); c.close(); } }
消费者1:
package com.bunny.rabbit.routing; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Dingyuezhe1 { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); ch.queueDeclare(LianjieGongjulei.ROUTING_QUEUE_NAME1, false, false, false, null); // 绑定队列到交换机,第三个参数是key ch.queueBind(LianjieGongjulei.ROUTING_QUEUE_NAME1, LianjieGongjulei.ROUTING_EXCHANGE_NAME, "meskey1"); ch.queueBind(LianjieGongjulei.ROUTING_QUEUE_NAME1, LianjieGongjulei.ROUTING_EXCHANGE_NAME, "meskey2"); // 监听队列,手动返回完成 ch.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(ch); // 监听队列,手动返回完成 ch.basicConsume(LianjieGongjulei.ROUTING_QUEUE_NAME1, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String mes = new String(delivery.getBody()); System.out.println(" 订阅者1 Received \'" + mes + "\'"); Thread.sleep(100); // 监听队列,手动返回完成 ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者2:
package com.bunny.rabbit.routing; import java.io.IOException; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Dingyuezhe2 { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); ch.queueDeclare(LianjieGongjulei.ROUTING_QUEUE_NAME2, false, false, false, null); // 绑定队列到交换机 ch.queueBind(LianjieGongjulei.ROUTING_QUEUE_NAME2, LianjieGongjulei.ROUTING_EXCHANGE_NAME, "meskey1"); // 监听队列,手动返回完成 ch.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(ch); // 监听队列,手动返回完成 ch.basicConsume(LianjieGongjulei.ROUTING_QUEUE_NAME2, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String mes = new String(delivery.getBody()); System.out.println(" 订阅者2 Received \'" + mes + "\'"); Thread.sleep(100); // 监听队列,手动返回完成 ch.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
3、主题模式
在路由基础上,模糊匹配。至于匹配规则,简单来说就是*代表一个单词,#代表多个单词。单词指的是字符串被“.”分割的部分。可以实际操作之后观察结果。
如图:
生产者:
package com.bunny.rabbit.topic; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Fabuzhe { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); //交换器的类型,常见的有direct,fanout,topic等,其中“direct”是路由 ch.exchangeDeclare(LianjieGongjulei.TOPIC_EXCHANGE_NAME, "topic"); String mes1 = "主题信息1"; String mes2 = "主题信息2"; String mes3 = "主题信息3"; String mes4 = "主题信息4"; //第二个参数是消息key用于路由用 ch.basicPublish(LianjieGongjulei.TOPIC_EXCHANGE_NAME, "mes.key.bunny", null, mes1.getBytes()); ch.basicPublish(LianjieGongjulei.TOPIC_EXCHANGE_NAME, "bunny", null, mes2.getBytes()); ch.basicPublish(LianjieGongjulei.TOPIC_EXCHANGE_NAME, "mes", null, mes3.getBytes()); ch.basicPublish(LianjieGongjulei.TOPIC_EXCHANGE_NAME, "key.ee", null, mes4.getBytes()); System.out.println(" topic Sent \'" + mes1 + "\'"); System.out.println(" topic Sent \'" + mes2 + "\'"); System.out.println(" topic Sent \'" + mes3 + "\'"); System.out.println(" topic Sent \'" + mes4 + "\'"); ch.close(); c.close(); } }
消费者1:
package com.bunny.rabbit.topic; import com.bunny.rabbit.LianjieGongjulei; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; /** * * @author bunny~~我是兔子我会喵,我叫喵星兔。 * */ public class Dingyuezhe1 { public static void main(String[] args) throws Exception { Connection c = LianjieGongjulei.getConnection(); Channel ch = c.createChannel(); ch.queueDeclare(LianjieGongjulei.TOPIC_QUEUE_NAME1, false, false, false, null); // 绑定队列到交换机,第三个参数是key ch.queueBind(LianjieGongjulei.TOPIC_QUEUE_NAME1, LianjieGongjulei.TOPIC_EXCHANGE_NAME, "*"); ch.queueBind(LianjieGongjulei.TOPIC_QUEUE_NAME1, LianjieGongjulei.TOPIC_EXCHANGE_NAME, "mes.#"); // 监听队列,手动返回完成 ch.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(ch); // 监听队列,手动返回完成 ch.basicConsume(LianjieGongjulei.TOPIC_QUEUE_NAME1, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String mes = new String(delivery.getBody()); System.out.println(" 订阅者1 Received \'" + mes + "\'"); Thread.sleep(100); // 监听队列,手动返回完成 ch.basicAck(deliver以上是关于喵星之旅-狂奔的兔子-rabbitmq的java客户端使用入门的主要内容,如果未能解决你的问题,请参考以下文章