超详细的RabbitMQ发布订阅模式讲解

Posted link可

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了超详细的RabbitMQ发布订阅模式讲解相关的知识,希望对你有一定的参考价值。

RabbitMQ发布订阅模式

相关视频教程(来自动力节点):https://www.bilibili.com/video/BV1Ap4y1D7tU

相关资料下载:http://www.bjpowernode.com/?51cto

超详细的RabbitMQ发布订阅模式讲解_rabbitmq

在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。

生产者

生产者发出日志消息,看起来与前一教程没有太大不同。最重要的更改是,我们现在希望将消息发布到logs交换机,而不是无名的日志交换机。我们需要在发送时提供一个routingKey,但是对于fanout交换机类型,该值会被忽略。

package rabbitmq.publishsubscribe;


import java.util.Scanner;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class Test1
public static void main(String[] args) throws Exception
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setPort(5672);
f.setUsername("admin");
f.setPassword("admin");


Connection c = f.newConnection();
Channel ch = c.createChannel();


//定义名字为logs的交换机,交换机类型为fanout
//这一步是必须的,因为禁止发布到不存在的交换。
ch.exchangeDeclare("logs", "fanout");


while (true)
System.out.print("输入消息: ");
String msg = new Scanner(System.in).nextLine();
if ("exit".equals(msg))
break;



//第一个参数,向指定的交换机发送消息
//第二个参数,不指定队列,由消费者向交换机绑定队列
//如果还没有队列绑定到交换器,消息就会丢失,
//但这对我们来说没有问题;即使没有消费者接收,我们也可以安全地丢弃这些信息。
ch.basicPublish("logs", "", null, msg.getBytes("UTF-8"));
System.out.println("消息已发送: "+msg);



c.close();

消费者

如果还没有队列绑定到交换器,消息就会丢失,但这对我们来说没有问题;如果还没有消费者在听,我们可以安全地丢弃这些信息。

ReceiveLogs.java代码:

package rabbitmq.publishsubscribe;


import java.io.IOException;


import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;


public class Test2
public static void main(String[] args) throws Exception
ConnectionFactory f = new ConnectionFactory();
f.setHost("192.168.64.140");
f.setUsername("admin");
f.setPassword("admin");
Connection c = f.newConnection();
Channel ch = c.createChannel();


//定义名字为 logs 的交换机, 它的类型是 fanout
ch.exchangeDeclare("logs", "fanout");


//自动生成对列名,
//非持久,独占,自动删除
String queueName = ch.queueDeclare().getQueue();


//把该队列,绑定到 logs 交换机
//对于 fanout 类型的交换机, routingKey会被忽略,不允许null值
ch.queueBind(queueName, "logs", "");


System.out.println("等待接收数据");


//收到消息后用来处理消息的回调对象
DeliverCallback callback = new DeliverCallback()
@Override
public void handle(String consumerTag, Delivery message) throws IOException
String msg = new String(message.getBody(), "UTF-8");
System.out.println("收到: "+msg);

;


//消费者取消时的回调对象
CancelCallback cancel = new CancelCallback()
@Override
public void handle(String consumerTag) throws IOException

;


ch.basicConsume(queueName, true, callback, cancel);

以上是关于超详细的RabbitMQ发布订阅模式讲解的主要内容,如果未能解决你的问题,请参考以下文章

Kafka集群安装部署(超详细操作演示)—— Linux

RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)

RabbitMq消息可靠性之确认模式 通俗易懂 超详细 内含案例

RabbitMq消息可靠性之回退模式 通俗易懂 超详细 内含案例

RabbitMQ指南之三:发布/订阅模式(Publish/Subscribe)

#yyds干货盘点#RabbitMQ的简单模式案例讲解,非常详细