Rabbit的直连交换机direct

Posted fuguang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbit的直连交换机direct相关的知识,希望对你有一定的参考价值。

直连交换机类型为:direct。加入了路由键routingKey的概念。

就是说 生产者投递消息给指定交换机的指定路由键。

只有绑定了此交换机指定路由键的消息队列才可以收到消息。

 

生产者:

package com.kf.queueDemo.exchange.direct;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 路由模式的生产者(带路由键)
 * @author kf
 *
 */
public class DirectProducer {
    //交换机
    private static String DIRECTEXCHANGE = "DIRECTEXCHANGE";
    //路由键
    private static String ROUTINGKEY = "SMS";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        
        Connection connection = RabbitConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机类型为路由模式
        channel.exchangeDeclare(DIRECTEXCHANGE, "direct");
        
        String msg = "direct_mes";
        
        //发送消息给 指定交换机EXCHANGENAME的指定路由键ROUTINGKEY上
        channel.basicPublish(DIRECTEXCHANGE, ROUTINGKEY, null, msg.getBytes());
        
        channel.close();
        connection.close();
        
        
        
        
    }

}

 

消费者:

package com.kf.queueDemo.exchange.direct;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
/**
 * 路由模式的邮件消费者
 * @author kf
 *
 */
public class DirectEMAILConsumer {
    //队列名
    private static String EMAILQUEUENAME = "EMAILQUEUENAME";
    //路由键名
    private static String SMSROUTINGKEY = "SMS";
    private static String EMAILROUTINGKEY = "EMAIL";
    //交换机
    private static String DIRECTEXCHANGE = "DIRECTEXCHANGE";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者启动=====");
        Connection connection = RabbitConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
        channel.queueDeclare(EMAILQUEUENAME, false, false, false, null);
        //绑定队列到交换机的指定路由键
        channel.queueBind(EMAILQUEUENAME, DIRECTEXCHANGE, EMAILROUTINGKEY);
        
        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("进入邮件接收消息的监听");
                String s = new String(body, "utf-8");
                System.out.println("邮件消费者接收到消息:"+s);
            };
        };
        
        //参数分别是:队列名,是否自动应答,监听的回调类
        channel.basicConsume(EMAILQUEUENAME, true, consumer);
        
    }

}

 

package com.kf.queueDemo.exchange.direct;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.kf.utils.RabbitConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;

/**
 * 路由模式的短信消费者
 * @author kf
 *
 */
public class DirectSMSConsumer {
    //队列名
    private static String SMSQUEUENAME = "SMSQUEUENAME";
    //路由键名
    private static String SMSROUTINGKEY = "SMS";
    private static String EMAILROUTINGKEY = "EMAIL";
    //交换机
    private static String DIRECTEXCHANGE = "DIRECTEXCHANGE";
    
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者启动=====");
        Connection connection = RabbitConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //创建队列:第一个参数是队列名,后面的参数还没搞清楚干嘛的
        channel.queueDeclare(SMSQUEUENAME, false, false, false, null);
        //绑定队列到交换机的指定路由键
        channel.queueBind(SMSQUEUENAME, DIRECTEXCHANGE, SMSROUTINGKEY);
        //绑定多个交换机的路由键
        channel.queueBind(SMSQUEUENAME, DIRECTEXCHANGE, EMAILROUTINGKEY);
        
        DefaultConsumer consumer = new DefaultConsumer(channel){
            public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("进入短信接收消息的监听");
                String s = new String(body, "utf-8");
                System.out.println("短信消费者接收到消息:"+s);
            };
        };
        
        //参数分别是:队列名,是否自动应答,监听的回调类
        channel.basicConsume(SMSQUEUENAME, true, consumer);
        
    }

}

 

以上是关于Rabbit的直连交换机direct的主要内容,如果未能解决你的问题,请参考以下文章

Rabbit主题交换机

RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)

RabbitMQ指南之四:路由(Routing)和直连交换机(Direct Exchange)

SpringBoot实战集成RebbitMQ

直连路由的配置

SpringBoot整合rabbitMQ,spring-boot-starter-amqp 的使用