超详细的RabbitMQ发布订阅模式讲解
Posted link可
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了超详细的RabbitMQ发布订阅模式讲解相关的知识,希望对你有一定的参考价值。
RabbitMQ发布订阅模式
相关视频教程(来自动力节点):https://www.bilibili.com/video/BV1Ap4y1D7tU
相关资料下载:http://www.bjpowernode.com/?51cto
在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。
生产者
生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。
package rabbitmq.publishsubscribe;
import java.util.Scanner;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Test1
public static void main(String[] args) throws Exception
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//定义名字为logs的交换机,交换机类型为fanout
//这一步是必须的,因为禁止发布到不存在的交换。
ch.exchangeDeclare("logs", "fanout");
while (true)
System.out.print("输入消息: ");
String msg = new Scanner(System.in).nextLine();
if ("exit".equals(msg))
break;
//第一个参数,向指定的交换机发送消息
//第二个参数,不指定队列,由消费者向交换机绑定队列
//如果还没有队列绑定到交换器,消息就会丢失,
//但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。
ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
System.out.println("消息已发送: "+msg);
c.close();
消费者
如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。
ReceiveLogs.java代码:
package rabbitmq.publishsubscribe;
import java.io.IOException;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
public class Test2
public static void main(String[] args) throws Exception
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();
//定义名字为 logs 的交换机, 它的类型是 fanout
ch.exchangeDeclare("logs", "fanout");
//自动生成对列名,
//非持久,独占,自动删除
String queueName = ch.queueDeclare().getQueue();
//把该队列,绑定到 logs 交换机
//对于 fanout 类型的交换机, routingKey会被忽略,不允许null值
ch.queueBind(queueName, "logs", "");
System.out.println("等待接收数据");
//收到消息后用来处理消息的回调对象
DeliverCallback callback = new DeliverCallback()
public void handle(String consumerTag, Delivery message) throws IOException
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: "+msg);
;
//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback()
public void handle(String consumerTag) throws IOException
;
ch.basicConsume(queueName, true, callback, cancel);
以上是关于超详细的RabbitMQ发布订阅模式讲解的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)
RabbitMq消息可靠性之确认模式 通俗易懂 超详细 内含案例
RabbitMq消息可靠性之回退模式 通俗易懂 超详细 内含案例