RabbitMQ

Posted (x²+y²-1)³=x²y³

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ相关的知识,希望对你有一定的参考价值。

RabbitMQ

RabbitMQ使用

安装RabbitMQ

RabbitMQ官网: https://www.rabbitmq.com/

RabbitMQ是使用Erlang开发,需要先安装Erlang。RabbitMQ与Erlang的版本对照表如下:

启动RabbitMQ

..\\RabbitMQ Server\\rabbitmq_server-3.11.13\\sbin目录下打开CMD,输入rabbitmq-server.bat,如下所示即表明启动成功。

打开RabbitMQ Management:http://localhost:15672/

初始用户名:guest

初始密码:guest

登录成功后如下所示:

新建用户

新建成功后如图所示:

新建虚拟主机

设置用户可访问的虚拟主机

RabbitMQ工作模式

导入依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>

simple简单模式

生产者:

package com.example.producer;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 * rabbit生产者:发送消息
 */
public class RabbitProducer 
    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        // IP地址,默认是localhost
        factory.setHost("localhost");
        // 端口,http端口:15672;amqp端口:5672,clustering端口:25672
        factory.setPort(5672);
        // 虚拟机,默认值 /
        factory.setVirtualHost("/test");
        // 用户名,默认值:guest
        factory.setUsername("root");
        // 密码,默认值:guest
        factory.setPassword("123456");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建channel
        Channel channel = connection.createChannel();
        /**
         * 创建队列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称
         * durable:是否持久化,持久化后,当rabbitmq重启后队列还在
         * exclusive:
         *   - 是否独占,只能有一个消费者监听这个队列
         *   - 当Connection关闭时,是否删除队列
         * autoDelete:是否自动删除。当没有Consumer时自动删除
         * arguments:参数设置。
         * 如果没有一个名字叫test的队列,则会创建该队列,如果有则不会创建
         */
        channel.queueDeclare("test",false,false,false,null);
        /**
         * 发送信息
         * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
         * exchange:交换机。简单模式下交换机会使用默认的 ""
         * routingKey:路由名称
         * props:配置信息
         * body:发送消息数据
         */
        String body = "hello rabbitmq ~~~";
        channel.basicPublish("","test",null,body.getBytes());
        // 释放资源
        channel.close();
        connection.close();
    

消费者:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 * rabbit消费者:接收信息
 */
public class RabbitConsumer 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("test",false,false,false,null);
        /**
         * 接收消息
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue:队列名称
         * autoAck:是否自动确认
         * callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel)
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,如交换机(exchange)、路由key(routingKey)...
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("consumerTag: "+consumerTag);
                System.out.println("envelope: "+envelope);
                System.out.println("properties: "+properties);
                System.out.println("body: "+new String(body));
            
        ;
        channel.basicConsume("test",true,consumer);
    

work工作模式

生产者:

package com.example.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 * RabbitMQ工作模式:WorkQueues
 */
public class ProducerInWorkQueues 
    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        // IP地址,默认是localhost
        factory.setHost("localhost");
        // 端口,http端口:15672;amqp端口:5672,clustering端口:25672
        factory.setPort(5672);
        // 虚拟机,默认值 /
        factory.setVirtualHost("/test");
        // 用户名,默认值:guest
        factory.setUsername("root");
        // 密码,默认值:guest
        factory.setPassword("123456");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建channel
        Channel channel = connection.createChannel();
        /**
         * 创建队列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列名称
         * durable:是否持久化,持久化后,当rabbitmq重启后队列还在
         * exclusive:
         *   - 是否独占,只能有一个消费者监听这个队列
         *   - 当Connection关闭时,是否删除队列
         * autoDelete:是否自动删除。当没有Consumer时自动删除
         * arguments:参数设置。
         * 如果没有一个名字叫test的队列,则会创建该队列,如果有则不会创建
         */
        channel.queueDeclare("work_queues",false,false,false,null);
        /**
         * 发送信息
         * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
         * exchange:交换机。简单模式下交换机会使用默认的 ""
         * routingKey:路由名称
         * props:配置信息
         * body:发送消息数据
         */
        for (int i = 0; i < 10; i++) 
            String body = "hello rabbitmq --- "+i;
            channel.basicPublish("","work_queues",null,body.getBytes());
        
        // 释放资源
        channel.close();
        connection.close();
    

消费者1:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInWorkQueuesOne 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queues",false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel)
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,如交换机(exchange)、路由key(routingKey)...
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body: "+new String(body));
            
        ;
        /**
         * 接收消息
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * @param queue 队列名称
         * @param autoAck 自动确认
         * @param callback 回调方法
         */
        channel.basicConsume("work_queues",true,consumer);
    

消费者2:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInWorkQueuesTwo 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("work_queues",false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel)
            /**
             * 回调方法,当收到消息后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,如交换机(exchange)、路由key(routingKey)...
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body: "+new String(body));
            
        ;
        /**
         * 接收消息
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * @param queue 队列名称
         * @param autoAck 自动确认
         * @param callback 回调方法
         */
        channel.basicConsume("work_queues",true,consumer);
    

publish/subscribe发布订阅模式

生产者:

package com.example.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 * RabbitMQ工作模式:PubSub
 */
public class ProducerInPubSub 
    public static void main(String[] args) throws IOException, TimeoutException 
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置参数
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建channel
        Channel channel = connection.createChannel();
        // 创建交换机
        /**
         * exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
         * @param exchange 交换机名称
         * @param type 交换机类型:direct(定向)、fanout(扇形【广播】)、topic(通配符的方式)、headers(参数匹配)
         * @param durable 是否持久化
         * @param autoDelete 是否自动删除
         * @param internal 是否内部使用,一般为false
         * @param arguments 参数列表
         */
        channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT,false,false,false,null);
        //创建队列
        channel.queueDeclare("fanout_queue1",false,false,false,null);
        channel.queueDeclare("fanout_queue2",false,false,false,null);
        //绑定队列和交换机
        /**
         * queueBind(String queue, String exchange, String routingKey)
         * @param queue 队列名称
         * @param exchange 交换机名称
         * @param routingKey 路由键,绑定规则。如果交换机类型为fanout,routingKey设置为"",即将消息分发给每一个绑定的queue。
         */
        channel.queueBind("fanout_queue1","fanout_exchange","");
        channel.queueBind("fanout_queue2","fanout_exchange","");
        // 发送消息
        String body = "欢迎来到我的世界";
        channel.basicPublish("fanout_exchange","",null,body.getBytes());
        // 释放资源
        channel.close();
        connection.close();
    

消费者1:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInPubSubOne 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body: "+new String(body));
            
        ;
        channel.basicConsume("fanout_queue1",true,consumer);
    

消费者2:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInPubSubTwo 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body: "+new String(body));
            
        ;
        channel.basicConsume("fanout_queue2",true,consumer);
    

routing路由模式

生产者:

package com.example.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ProducerInRouting 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建交换机
        channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,false,false,null);
        // 创建队列
        channel.queueDeclare("direct_queue1",false,false,false,null);
        channel.queueDeclare("direct_queue2",false,false,false,null);
        // 绑定队列和交换机
        // 队列1绑定 error
        channel.queueBind("direct_queue1","direct_exchange","error");
        // 队列2绑定 info error warning
        channel.queueBind("direct_queue2","direct_exchange","info");
        channel.queueBind("direct_queue2","direct_exchange","error");
        channel.queueBind("direct_queue2","direct_exchange","warning");
        // 消息内容
        String body = "欢迎来到我的世界";
        // 发送消息
        channel.basicPublish("direct_exchange","error",null,body.getBytes());
        // 释放资源
        channel.close();
        connection.close();
    

消费者1:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInRoutingOne 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println(new String(body));
            
        ;
        channel.basicConsume("direct_queue1",true,consumer);
    

消费者2:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInRoutingTwo 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println(new String(body));
            
        ;
        channel.basicConsume("direct_queue2",true,consumer);
    

topic主题模式

生产者:

package com.example.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ProducerInTopic 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建交换机
        channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,false,false,false,null);
        // 创建队列
        channel.queueDeclare("topic_queue1",false,false,false,null);
        channel.queueDeclare("topic_queue2",false,false,false,null);
        // 绑定交换机和队列
        // routingKey 系统名称.日志级别。需求:error级别和order系统的绑定到queue1
        // *:1个及以上单词;#:0个或多个单词
        channel.queueBind("topic_queue1","topic_exchange","#.error");
        channel.queueBind("topic_queue1","topic_exchange","order.#");
        // 所有信息绑定到queue2
        channel.queueBind("topic_queue2","topic_exchange","*.*");
        String body = "欢迎来到我的世界";
        // 发送消息
        channel.basicPublish("topic_exchange","goods.error",null,body.getBytes());
        // 释放资源
        channel.close();
        connection.close();
    

消费者1:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInTopicOne 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println(new String(body));
            
        ;
        channel.basicConsume("topic_queue1",true,consumer);
    

消费者2:

package com.example.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author admin
 */
public class ConsumerInTopicTwo 
    public static void main(String[] args) throws IOException, TimeoutException 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/test");
        factory.setUsername("root");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        Consumer consumer = new DefaultConsumer(channel) 
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println(new String(body));
            
        ;
        channel.basicConsume("topic_queue2",true,consumer);
    

rabbitmq页面出现/etc/rabbitmq/rabbitmq.config(not found)解决方法

如果出现页面出现/etc/rabbitmq/rabbitmq.config(not found)

解决如下:find / -name "rabbitmq.config.example"  找到相应的模板配置文件进行拷贝

cp /usr/share/doc/rabbitmq-server-3.6.6/rabbitmq.config.example /etc/rabbitmq/

mv rabbitmq.config.example rabbitmq.config

service rabbitmq-server restart即可


以上是关于RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq页面出现/etc/rabbitmq/rabbitmq.config(not found)解决方法

rabbitmq页面出现/etc/rabbitmq/rabbitmq.config(not found)解决方法

RabbitMQ:验证rabbitmq的版本

RabbitMQ 一文读懂

rabbitmq基础配置中文说明文档

启动rabbitmq教程