RabbitMQ 安装部署(New)& 延时队列使用
Posted 高国藩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 安装部署(New)& 延时队列使用相关的知识,希望对你有一定的参考价值。
RabbitMQ 安装部署(New)
elang环境与MQ版本一定要对应,否则无法启动,Rabbit版本与插件版本一定要对应,负责无法加载插件
版本信息 | |
---|---|
centos | 7.3.0 |
erlang | Version: 23.0.2,Release: 2.el7 |
RabbitMQ | 3.8.0-1 |
rabbitmq_delayed_message_exchange | 3.8 |
安装脚本步骤:
-
卸载erlang
service rabbitmq-server stop --停止服务 yum list | grep erlang --查询当前环境 yum -y remove erlang-* --卸载 yum remove erlang.x86_64 --移除
-
卸载rabbitMq
yum list | grep rabbitmq --当前是否安装 yum -y remove rabbitmq-server.noarch --移除安装文件
-
安装erlang
yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm yum -y install epel-release sudo yum install erlang
[root@nginx-1 mq]# yum info erlang --校验版本信息 Loaded plugins: fastestmirror Loading mirror speeds from cached hostfile Installed Packages Name : erlang Arch : x86_64 Version : 23.0.2 Release : 2.el7 Size : 0.0 Repo : installed From repo : erlang-solutions Summary : General-purpose programming language and runtime environment URL : http://www.erlang.org License : ERPL Description : Erlang is a general-purpose programming language and runtime : environment. Erlang has built-in support for concurrency, distribution : and fault tolerance. Erlang is used in several large telecommunication : systems from Ericsson.
-
安装rabbitMq
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-3.8.0-1.el7.noarch.rpm yum -y install socat --安装socat依赖 rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc --导入什么签名 rpm -ivh rabbitmq-server-3.8.0-1.el7.noarch.rpm
-
部署插件
rabbitmq-delayed-message-exchange --插件名称 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0 --下载地址,选择3.8
RabbitMQ的有些插件没有集成在初始的安装中,它们需要额外安装,这些文件的后缀为
.ez
,安装时需要将.ez
文件拷贝到安装的插件目录。以下是不同系统中默认安装的插件目录路径:插件目录 Linux /usr/lib/rabbitmq/lib/rabbitmq_server-$version/plugins Windows C:\\Program Files\\RabbitMQ\\rabbitmq_server-version\\plugins(安装rabbitmq的目录) Homebrew /usr/local/Cellar/rabbitmq/version/plugins rabbitmq-plugins enable rabbitmq_delayed_message_exchange --启用插件 rabbitmq-plugins disable rabbitmq_delayed_message_exchange --弃用插件
-
授权用户访问
rabbitmq-plugins enable rabbitmq_management --启用网页插件 添加用户:rabbitmqctl add_user admin admin 添加权限:rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" 修改用户角色rabbitmqctl set_user_tags admin administrator 然后就可以远程访问了,然后可直接配置用户权限等信息。
-
启动命令脚本
service rabbitmq-server restart --重启 service rabbitmq-server stop --停止
Spring搭建延时队列-注解方式
此处应用于Spring、SpringBoot等开源框架
-
添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
定义工厂链接信息
package com.soul.home.lws.system.conf; import com.soul.home.lws.conf.properties.AutoConfigRabbitMqProperties; import com.soul.home.lws.system.mq.error.MQRepublishMessageRecoverer; import com.soul.home.lws.system.mq.callback.MessageConfirmCallback; import com.soul.home.lws.system.mq.callback.MessageReturnCallback; import org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean; import org.springframework.amqp.rabbit.connection.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; /** * @author gaoguofan * @date 2020/6/15 */ @Configuration @EnableConfigurationProperties(AutoConfigRabbitMqProperties.class) public class AutoConfigurationRabbitMqConfig @Autowired MessageConfirmCallback confirmCallback; @Autowired MessageReturnCallback returnCallback; @Autowired AutoConfigRabbitMqProperties rabbitMqProperties; @Bean public ConnectionFactory connectionFactory() com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = new com.rabbitmq.client.ConnectionFactory(); rabbitConnectionFactory.useNio(); rabbitConnectionFactory.setAutomaticRecoveryEnabled(true); rabbitConnectionFactory.setNetworkRecoveryInterval(10000); // rabbitConnectionFactory.setNioParams(new NioParams().setNbIoThreads(4)); rabbitConnectionFactory.setHost(rabbitMqProperties.getHost()); rabbitConnectionFactory.setPort(rabbitMqProperties.getPort()); rabbitConnectionFactory.setUsername(rabbitMqProperties.getUserName()); rabbitConnectionFactory.setPassword(rabbitMqProperties.getPassword()); // Clients can be configured to allow fewer channels per connection. rabbitConnectionFactory.setRequestedChannelMax(rabbitMqProperties.getRequestedChannelMax()); CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory); // this params server for Ack Exchange & Ack Exchange -> Queue,it shoule be seted true connectionFactory.setPublisherConfirms(true); // connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); // connectionFactory.setChannelCacheSize(rabbitMqProperties.getChannelCacheSize()); connectionFactory.setConnectionCacheSize(rabbitMqProperties.getChannelCacheSize()); connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION); // ConnectionFactory connectionFactory = new AbstractConnectionFactory(rabbitConnectionFactory) // @Override // public Connection createConnection() throws AmqpException // try // return new SimpleConnection(rabbitConnectionFactory.newConnection(), 100000); // catch (Exception e) // e.printStackTrace(); // // return null; // // ; return connectionFactory; @Bean public RetryTemplate retryTemplate() RetryTemplate retryTemplate = new RetryTemplate(); SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(); simpleRetryPolicy.setMaxAttempts(rabbitMqProperties.getMaxAttempts()); // retry Policy 指数退避策略,必须使用指数,内网连接很快 ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy(); exponentialBackOffPolicy.setInitialInterval(rabbitMqProperties.getInitialInterval()); exponentialBackOffPolicy.setMultiplier(rabbitMqProperties.getMultiplier()); exponentialBackOffPolicy.setMaxInterval(rabbitMqProperties.getMaxInterval()); retryTemplate.setBackOffPolicy(exponentialBackOffPolicy); retryTemplate.setRetryPolicy(simpleRetryPolicy); return retryTemplate; @Bean public StatelessRetryOperationsInterceptorFactoryBean statelessRetryOperationsInterceptorFactoryBean( MQRepublishMessageRecoverer messageRecoverer) StatelessRetryOperationsInterceptorFactoryBean interceptorFactoryBean = new StatelessRetryOperationsInterceptorFactoryBean(); interceptorFactoryBean.setMessageRecoverer(messageRecoverer); interceptorFactoryBean.setRetryOperations(retryTemplate()); return interceptorFactoryBean; /** * 业务回调不同,可配置scope进行单独注入 * @param connectionFactory * @return */ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // Ack Exchange rabbitTemplate.setConfirmCallback(confirmCallback); // Ack Exchange -> queue rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(returnCallback); return rabbitTemplate; @Bean public SimpleMessageConverter simpleMessageConverter() return new SimpleMessageConverter();
-
定义exchange&queue
package com.soul.home.lws.system.mq; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * durable属性和auto-delete属性可以同时生效; * durable属性和exclusive属性会有性质上的冲突,两者同时设置时,仅exclusive属性生效; * auto_delete属性和exclusive属性可以同时生效; * 除此之外, * Queue的“Exlusive owner”对应的是connection而不是channel; * Consumer存在于某个channel上的; * @author gaoguofan * @date 2020/6/15 */ @Configuration public class MQMessageQueuesConfig private Boolean exclusive = false; private Boolean autoDelete = false; private Boolean durable = true; @Bean(MQMessageQueueNames.DELAY_EXCHANGE_NAME) public DirectExchange directExchange() DirectExchange directExchange = new DirectExchange(MQMessageQueueNames.DELAY_EXCHANGE_NAME, true, false); return directExchange; @Bean(MQMessageQueueNames.DEAD_LETTER_EXCHANGE) public DirectExchange deadLetterExchange() DirectExchange directExchange = new DirectExchange(MQMessageQueueNames.DEAD_LETTER_EXCHANGE, true, false); return directExchange; @Bean(MQMessageQueueNames.DELAYED_PLUGS_EXCHANGE) public CustomExchange customExchange() Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(MQMessageQueueNames.DELAYED_PLUGS_EXCHANGE, "x-delayed-message", true, false, args); /** * 声明延时队列C 不设置TTL * 并绑定到对应的死信交换机 * @return */ @Bean("delayQueueC") public Queue delayQueueC() Map<String, Object> args = new HashMap<>(3); // x-dead-letter-exchange 这里声明当前队列绑定的死信交换机 args.put("x-dead-letter-exchange", MQMessageQueueNames.DEAD_LETTER_EXCHANGE); // x-dead-letter-routing-key 这里声明当前队列的死信路由key args.put("x-dead-letter-routing-key", MQMessageQueueNames.DEAD_LETTER_QUEUEC_ROUTING_KEY); return QueueBuilder.durable(MQMessageQueueNames.DELAY_QUEUEC_NAME).withArguments(args).build(); /** * 声明死信队列C 用于接收延时任意时长处理的消息 * @return */ @Bean("deadLetterQueueC") public Queue deadLetterQueueC() return new Queue(MQMessageQueueNames.DEAD_LETTER_QUEUEC_NAME); @Bean(MQMessageQueueNames.DELAYED_PLUGS_QUEUEC_NAME) public Queue autoDelayMQPlugs() return new Queue(MQMessageQueueNames.DELAYED_PLUGS_QUEUEC_NAME);
-
进行绑定
package com.soul.home.lws.system.mq; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author gaoguofan * @date 2020/6/15 */ @Configuration public class MQMessageQueueKeyBindConfg /** * 声明延时列C绑定关系 * @param queue * @param exchange * @return */ @Bean public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue, @Qualifier(MQMessageQueueNames.DELAY_EXCHANGE_NAME) DirectExchange exchange) return BindingBuilder.bind(queue).to(exchange).with(MQMessageQueueNames.DELAY_QUEUEC_ROUTING_KEY); /** * 声明死信队列C绑定关系 * @param queue * @param exchange * @return */ @Bean public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue, @Qualifier(MQMessageQueueNames.DEAD_LETTER_EXCHANGE) DirectExchange exchange) return BindingBuilder.bind(queue).to(exchange).with(MQMessageQueueNames.DEAD_LETTER_QUEUEC_ROUTING_KEY); /** * 插件延时队列绑定关系 * @param queue * @param customExchange * @return */ @Bean public Binding bindingNotify(@Qualifier(MQMessageQueueNames.DELAYED_PLUGS_QUEUEC_NAME) Queue queue, @Qualifier(MQMessageQueueNames.DELAYED_PLUGS_EXCHANGE) CustomExchange customExchange) return BindingBuilder.bind(queue).to(customExchange).with(MQMessageQueueNames.DELAYED_PLUGS_ROUTING_KEY).noargs();
-
定义生产者
package com.soul.home.lws.system.controler; import com.rabbitmq.client.AMQP; import com.soul.home.lws.route.conf.Api; import com.soul.home.lws.sql.dto.BaseDto; import com.soul.home.lws.system.mq.ExpirationMessagePostProcessor; import com.soul.home.lws.system.mq.MQMessageQueueNames; import com.soul.home.lws.system.mq.MQMessageQueuesConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; /** * @author gaoguofan * @date 2020/6/15 */ @RestController public class RabbitMessageController @Autowired RabbitTemplate rabbitTemplate; @GetMapping(value = "ts") public BaseDto sendMsg(Integer ttl) rabbitTemplate.convertAndSend(MQMessageQueueNames.DELAY_EXCHANGE_NAME, MQMessageQueueNames.DELAY_QUEUEC_ROUTING_KEY, "HELLO" + ttl, new ExpirationMessagePostProcessor(ttl, null)); return new BaseDto(0, null); @GetMapping(value = "ts2") public BaseDto sendMsg2(Integer ttl) Map<String, Object> headers = new HashMap<>(); headers.put("x-delay", ttl); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); rabbitTemplate.convertAndSend(MQMessageQueueNames.DELAYED_PLUGS_EXCHANGE, MQMessageQueueNames.DELAYED_PLUGS_ROUTING_KEY, "HELLO" + ttl, new ExpirationMessagePostProcessor(ttl, headers)); return new BaseDto(0, null);
-
定义消费者
package com.soul.home.lws.system.mq.listener; import com.soul.home.lws.conf.properties.AutoConfigRabbitMqProperties; import com.soul.home.lws.system.mq.error.MQErrorHandler; import com.soul.home.lws.system.mq.config.MQMessageQueueNames; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.MessageListenerContainer; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 监听队列容器 */ @Configuration public class MQMessageListenerContainerConfig @Autowired ConnectionFactory connectionFactory; @Autowired MQErrorHandler mqErrorHandler; @Autowired StatelessRetryOperationsInterceptorFactoryBean retryOperationsInterceptorFactoryBean; @Autowired AutoConfigRabbitMqProperties rabbitMqProperties; @Autowired MemberChargeNoticeListener memberChargeNoticeListener; /** * 消息预取数据计算: * channel.basicQos() = simpleMessageListenerContainer.setPrefetchCount(250) * 预取值大小与性
以上是关于RabbitMQ 安装部署(New)& 延时队列使用的主要内容,如果未能解决你的问题,请参考以下文章