RabbitMQ 安装部署(New)& 延时队列使用

Posted 高国藩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 安装部署(New)& 延时队列使用相关的知识,希望对你有一定的参考价值。

RabbitMQ 安装部署(New)

elang环境与MQ版本一定要对应,否则无法启动,Rabbit版本与插件版本一定要对应,负责无法加载插件

版本信息
centos7.3.0
erlangVersion: 23.0.2,Release: 2.el7
RabbitMQ3.8.0-1
rabbitmq_delayed_message_exchange3.8

安装脚本步骤:

  1. 卸载erlang

    service rabbitmq-server stop  --停止服务
    yum list | grep erlang		  --查询当前环境	
    yum -y remove erlang-*		  --卸载 
    yum remove erlang.x86_64      --移除
    
  2. 卸载rabbitMq

    yum list | grep rabbitmq				--当前是否安装
    yum -y remove rabbitmq-server.noarch	--移除安装文件
    
  3. 安装erlang

    yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
    wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
    rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
    yum -y install epel-release
    sudo yum install erlang
    
    [root@nginx-1 mq]# yum info erlang           --校验版本信息
    Loaded plugins: fastestmirror
    Loading mirror speeds from cached hostfile
    Installed Packages
    Name        : erlang
    Arch        : x86_64
    Version     : 23.0.2
    Release     : 2.el7
    Size        : 0.0  
    Repo        : installed
    From repo   : erlang-solutions
    Summary     : General-purpose programming language and runtime environment
    URL         : http://www.erlang.org
    License     : ERPL
    Description : Erlang is a general-purpose programming language and runtime
                : environment. Erlang has built-in support for concurrency, distribution
                : and fault tolerance. Erlang is used in several large telecommunication
                : systems from Ericsson.
    
  4. 安装rabbitMq

    wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-3.8.0-1.el7.noarch.rpm
    yum -y install socat	--安装socat依赖
    rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc  --导入什么签名
    rpm -ivh rabbitmq-server-3.8.0-1.el7.noarch.rpm
    
  5. 部署插件

    rabbitmq-delayed-message-exchange        --插件名称
    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0  --下载地址,选择3.8
    

    RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为.ez,安装时需要将.ez文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:

    插件目录
    Linux/usr/lib/rabbitmq/lib/rabbitmq_server-$version/plugins
    WindowsC:\\Program Files\\RabbitMQ\\rabbitmq_server-version\\plugins(安装rabbitmq的目录)
    Homebrew/usr/local/Cellar/rabbitmq/version/plugins
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange   --启用插件
    rabbitmq-plugins disable rabbitmq_delayed_message_exchange  --弃用插件
    
  6. 授权用户访问

    rabbitmq-plugins enable rabbitmq_management  --启用网页插件
    添加用户:rabbitmqctl add_user admin admin
    添加权限:rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
    修改用户角色rabbitmqctl set_user_tags admin administrator
    然后就可以远程访问了,然后可直接配置用户权限等信息。
    
  7. 启动命令脚本

    service rabbitmq-server restart  --重启
    service rabbitmq-server stop     --停止  
    

Spring搭建延时队列-注解方式

此处应用于Spring、SpringBoot等开源框架

  1. 添加依赖

    <dependency>
         <groupId>org.springframework.boot</groupId>
    	 <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 定义工厂链接信息

    package com.soul.home.lws.system.conf;
    
    import com.soul.home.lws.conf.properties.AutoConfigRabbitMqProperties;
    import com.soul.home.lws.system.mq.error.MQRepublishMessageRecoverer;
    import com.soul.home.lws.system.mq.callback.MessageConfirmCallback;
    import com.soul.home.lws.system.mq.callback.MessageReturnCallback;
    import org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean;
    import org.springframework.amqp.rabbit.connection.*;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.SimpleMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.retry.backoff.ExponentialBackOffPolicy;
    import org.springframework.retry.policy.SimpleRetryPolicy;
    import org.springframework.retry.support.RetryTemplate;
    
    /**
     * @author gaoguofan
     * @date 2020/6/15
     */
    @Configuration
    @EnableConfigurationProperties(AutoConfigRabbitMqProperties.class)
    public class AutoConfigurationRabbitMqConfig 
    
        @Autowired
        MessageConfirmCallback confirmCallback;
        @Autowired
        MessageReturnCallback returnCallback;
    
        @Autowired
        AutoConfigRabbitMqProperties rabbitMqProperties;
    
        @Bean
        public ConnectionFactory connectionFactory() 
            com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = new com.rabbitmq.client.ConnectionFactory();
            rabbitConnectionFactory.useNio();
            rabbitConnectionFactory.setAutomaticRecoveryEnabled(true);
            rabbitConnectionFactory.setNetworkRecoveryInterval(10000);
    //        rabbitConnectionFactory.setNioParams(new NioParams().setNbIoThreads(4));
            rabbitConnectionFactory.setHost(rabbitMqProperties.getHost());
            rabbitConnectionFactory.setPort(rabbitMqProperties.getPort());
            rabbitConnectionFactory.setUsername(rabbitMqProperties.getUserName());
            rabbitConnectionFactory.setPassword(rabbitMqProperties.getPassword());
    //        Clients can be configured to allow fewer channels per connection.
            rabbitConnectionFactory.setRequestedChannelMax(rabbitMqProperties.getRequestedChannelMax());
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory);
            // this params server for Ack Exchange & Ack Exchange -> Queue,it shoule be seted true
            connectionFactory.setPublisherConfirms(true);
    //        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
    //        connectionFactory.setChannelCacheSize(rabbitMqProperties.getChannelCacheSize());
            connectionFactory.setConnectionCacheSize(rabbitMqProperties.getChannelCacheSize());
            connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
    //        ConnectionFactory connectionFactory = new AbstractConnectionFactory(rabbitConnectionFactory) 
    //            @Override
    //            public Connection createConnection() throws AmqpException 
    //                try 
    //                    return new SimpleConnection(rabbitConnectionFactory.newConnection(), 100000);
    //                 catch (Exception e) 
    //                    e.printStackTrace();
    //                
    //                return null;
    //            
    //        ;
            return connectionFactory;
        
    
        @Bean
        public RetryTemplate retryTemplate() 
            RetryTemplate retryTemplate = new RetryTemplate();
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(rabbitMqProperties.getMaxAttempts());
            // retry Policy 指数退避策略,必须使用指数,内网连接很快
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(rabbitMqProperties.getInitialInterval());
            exponentialBackOffPolicy.setMultiplier(rabbitMqProperties.getMultiplier());
            exponentialBackOffPolicy.setMaxInterval(rabbitMqProperties.getMaxInterval());
            retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            return retryTemplate;
        
    
        @Bean
        public StatelessRetryOperationsInterceptorFactoryBean statelessRetryOperationsInterceptorFactoryBean(
                MQRepublishMessageRecoverer messageRecoverer) 
            StatelessRetryOperationsInterceptorFactoryBean interceptorFactoryBean =
                    new StatelessRetryOperationsInterceptorFactoryBean();
            interceptorFactoryBean.setMessageRecoverer(messageRecoverer);
            interceptorFactoryBean.setRetryOperations(retryTemplate());
            return interceptorFactoryBean;
        
    
        /**
         * 业务回调不同,可配置scope进行单独注入
         * @param connectionFactory
         * @return
         */
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) 
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            // Ack Exchange
            rabbitTemplate.setConfirmCallback(confirmCallback);
            // Ack Exchange -> queue
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback(returnCallback);
            return rabbitTemplate;
        
    
        @Bean
        public SimpleMessageConverter simpleMessageConverter() 
            return new SimpleMessageConverter();
        
    
    
    
    
    
  3. 定义exchange&queue

    package com.soul.home.lws.system.mq;
    
    import org.springframework.amqp.core.CustomExchange;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * durable属性和auto-delete属性可以同时生效;
     * durable属性和exclusive属性会有性质上的冲突,两者同时设置时,仅exclusive属性生效;
     * auto_delete属性和exclusive属性可以同时生效;
     * 除此之外,
     * Queue的“Exlusive owner”对应的是connection而不是channel;
     * Consumer存在于某个channel上的;
     * @author gaoguofan
     * @date 2020/6/15
     */
    @Configuration
    public class MQMessageQueuesConfig 
    
        private Boolean exclusive = false;
        private Boolean autoDelete = false;
        private Boolean durable = true;
    
    
        @Bean(MQMessageQueueNames.DELAY_EXCHANGE_NAME)
        public DirectExchange directExchange() 
            DirectExchange directExchange = new DirectExchange(MQMessageQueueNames.DELAY_EXCHANGE_NAME, true, false);
            return directExchange;
        
    
        @Bean(MQMessageQueueNames.DEAD_LETTER_EXCHANGE)
        public DirectExchange deadLetterExchange() 
            DirectExchange directExchange = new DirectExchange(MQMessageQueueNames.DEAD_LETTER_EXCHANGE, true, false);
            return directExchange;
        
    
    
        @Bean(MQMessageQueueNames.DELAYED_PLUGS_EXCHANGE)
        public CustomExchange customExchange() 
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(MQMessageQueueNames.DELAYED_PLUGS_EXCHANGE, "x-delayed-message", true, false, args);
        
    
    
        /**
         * 声明延时队列C 不设置TTL
         * 并绑定到对应的死信交换机
         * @return
         */
        @Bean("delayQueueC")
        public Queue delayQueueC()
            Map<String, Object> args = new HashMap<>(3);
            // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", MQMessageQueueNames.DEAD_LETTER_EXCHANGE);
            // x-dead-letter-routing-key  这里声明当前队列的死信路由key
            args.put("x-dead-letter-routing-key", MQMessageQueueNames.DEAD_LETTER_QUEUEC_ROUTING_KEY);
            return QueueBuilder.durable(MQMessageQueueNames.DELAY_QUEUEC_NAME).withArguments(args).build();
        
    
    
        /**
         * 声明死信队列C 用于接收延时任意时长处理的消息
         * @return
         */
        @Bean("deadLetterQueueC")
        public Queue deadLetterQueueC()
            return new Queue(MQMessageQueueNames.DEAD_LETTER_QUEUEC_NAME);
        
    
    
        @Bean(MQMessageQueueNames.DELAYED_PLUGS_QUEUEC_NAME)
        public Queue autoDelayMQPlugs() 
            return new Queue(MQMessageQueueNames.DELAYED_PLUGS_QUEUEC_NAME);
        
    
    
    
    
    
  4. 进行绑定

    package com.soul.home.lws.system.mq;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author gaoguofan
     * @date 2020/6/15
     */
    @Configuration
    public class MQMessageQueueKeyBindConfg 
    
    
        /**
         * 声明延时列C绑定关系
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
                                     @Qualifier(MQMessageQueueNames.DELAY_EXCHANGE_NAME) DirectExchange exchange)
            return BindingBuilder.bind(queue).to(exchange).with(MQMessageQueueNames.DELAY_QUEUEC_ROUTING_KEY);
        
    
        /**
         * 声明死信队列C绑定关系
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
                                          @Qualifier(MQMessageQueueNames.DEAD_LETTER_EXCHANGE) DirectExchange exchange)
            return BindingBuilder.bind(queue).to(exchange).with(MQMessageQueueNames.DEAD_LETTER_QUEUEC_ROUTING_KEY);
        
    
    
        /**
         *  插件延时队列绑定关系
         * @param queue
         * @param customExchange
         * @return
         */
        @Bean
        public Binding bindingNotify(@Qualifier(MQMessageQueueNames.DELAYED_PLUGS_QUEUEC_NAME) Queue queue,
                                     @Qualifier(MQMessageQueueNames.DELAYED_PLUGS_EXCHANGE) CustomExchange customExchange) 
            return BindingBuilder.bind(queue).to(customExchange).with(MQMessageQueueNames.DELAYED_PLUGS_ROUTING_KEY).noargs();
        
    
    
    
    
    
  5. 定义生产者

    package com.soul.home.lws.system.controler;
    
    import com.rabbitmq.client.AMQP;
    import com.soul.home.lws.route.conf.Api;
    import com.soul.home.lws.sql.dto.BaseDto;
    import com.soul.home.lws.system.mq.ExpirationMessagePostProcessor;
    import com.soul.home.lws.system.mq.MQMessageQueueNames;
    import com.soul.home.lws.system.mq.MQMessageQueuesConfig;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author gaoguofan
     * @date 2020/6/15
     */
    @RestController
    public class RabbitMessageController 
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @GetMapping(value = "ts")
        public BaseDto sendMsg(Integer ttl) 
            rabbitTemplate.convertAndSend(MQMessageQueueNames.DELAY_EXCHANGE_NAME,
                    MQMessageQueueNames.DELAY_QUEUEC_ROUTING_KEY, "HELLO" + ttl,
                    new ExpirationMessagePostProcessor(ttl, null));
            return new BaseDto(0, null);
        
    
    
        @GetMapping(value = "ts2")
        public BaseDto sendMsg2(Integer ttl) 
            Map<String, Object> headers = new HashMap<>();
            headers.put("x-delay", ttl);
            AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
            rabbitTemplate.convertAndSend(MQMessageQueueNames.DELAYED_PLUGS_EXCHANGE,
                    MQMessageQueueNames.DELAYED_PLUGS_ROUTING_KEY, "HELLO" + ttl,
                    new ExpirationMessagePostProcessor(ttl, headers));
            return new BaseDto(0, null);
        
    
    
    
    
    
  6. 定义消费者

    package com.soul.home.lws.system.mq.listener;
    
    import com.soul.home.lws.conf.properties.AutoConfigRabbitMqProperties;
    import com.soul.home.lws.system.mq.error.MQErrorHandler;
    import com.soul.home.lws.system.mq.config.MQMessageQueueNames;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 监听队列容器
     */
    @Configuration
    public class MQMessageListenerContainerConfig 
    
        @Autowired
        ConnectionFactory connectionFactory;
        @Autowired
        MQErrorHandler mqErrorHandler;
        @Autowired
        StatelessRetryOperationsInterceptorFactoryBean retryOperationsInterceptorFactoryBean;
        @Autowired
        AutoConfigRabbitMqProperties rabbitMqProperties;
        @Autowired
        MemberChargeNoticeListener memberChargeNoticeListener;
    
    
        /**
         *  消息预取数据计算:
         *  channel.basicQos() = simpleMessageListenerContainer.setPrefetchCount(250)
         *  预取值大小与性

    以上是关于RabbitMQ 安装部署(New)& 延时队列使用的主要内容,如果未能解决你的问题,请参考以下文章

    rabbitmq的安装部署

    celery-rabbitmq 安装部署

    rabbitmq部署记录一

    centOS7 部署 rabbitMQ

    rabbitmq 原理&部署&使用

    redis安装和可视化grafana部署