分布式环境下rabbitmq发布与订阅端

Posted 太白的技术博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式环境下rabbitmq发布与订阅端相关的知识,希望对你有一定的参考价值。

假设rabbitmq配置了集群,且客户端连接rabbitmq-server通过lvs实现HA但一般情况下不建议做LB。在分布式系统的环境下,由于节点的非预知性,使用spring amqp模板进行配置不足以灵活到满足弹性扩展的需求,因此,更加方便的方式是通过rabbitmq原生的java client进行订阅和发布。在我们的场景中,某些节点需要同时是发布端和订阅端以便做到弹性扩展,无需额外的配置。以fanout类型为例,如下所示:

发布端:

/**  

* @Title: Send.java

* @Package com.cyl.rabbitmq

* @Description: TODO(用一句话描述该文件做什么)

* @author [email protected]  

* @date 2016年4月25日 下午12:52:59

* @version V1.0  

*/

package com.cyl.rabbitmq;

 

import java.io.IOException;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

 

/**

 * @author zjhua

 *

 */

public class Send {

 

    public static void main(String[] args) throws IOException { 

        ConnectionFactory factory = new ConnectionFactory(); 

        factory.setHost("localhost"); 

        Connection connection;

connection = factory.newConnection();

 

Channel channel = connection.createChannel(); 

 

channel.exchangeDeclare("fanout_random", "fanout");

 

String message = "Hello World ";

for(int i=0;i<10000;i++) {

channel.basicPublish("fanout_random", "", null, (message + i).getBytes());

try {

Thread.sleep(5000);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

System.out.println(" [x] Sent ‘" + message + "‘"); 

 

channel.close(); 

connection.close();

    } 

}

 

订阅端:

/**  

* @Title: Reqv.java

* @Package com.cyl.rabbitmq

* @Description: TODO(用一句话描述该文件做什么)

* @author [email protected]  

* @date 2016年4月25日 下午12:56:33

* @version V1.0  

*/

package com.cyl.rabbitmq;

 

import java.io.IOException;

 

import com.rabbitmq.client.*;

 

/**

 * @author zjhua

 *

 */

public class Reqv { 

 

    public static void main(String[] argv) throws Exception { 

 

        ConnectionFactory factory = new ConnectionFactory(); 

        factory.setHost("localhost"); 

        Connection connection = factory.newConnection(); 

        Channel channel = connection.createChannel(); 

 

        channel.exchangeDeclare("fanout_random", "fanout");

        String queueName = channel.queueDeclare().getQueue(); --对于某些场景,比如缓存同步,使用exclusive/auto-deletequeue会比较合适

        channel.queueBind(queueName, "fanout_random", "");

 

        Consumer consumer = new DefaultConsumer(channel) {

            @Override

            public void handleDelivery(String consumerTag, Envelope envelope,

                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

              String message = new String(body, "UTF-8");

              System.out.println(" [x] Received ‘" + message + "‘");

            }

          };

          channel.basicConsume(queueName, true, consumer);

    }

}

 

如果要同时作为订阅端、发布端,只要在容器启动时配置监听事件,其中包含订阅端逻辑即可。发布端作为基础服务供业务子系统使用。

以上是关于分布式环境下rabbitmq发布与订阅端的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ下的生产消费者模式与订阅发布模式

window下搭建celery+rabbitmq 分布式任务调度

.Net Core对于RabbitMQ封装分布式事件总线

.Net Core对于RabbitMQ封装分布式事件总线

RabbitMQ分布式消息队列服务器(Windows下安装和部署)

RabbitMQ小记