使用rabbitmq手动确认消息的,定时获取队列消息实现

Posted dmeck

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用rabbitmq手动确认消息的,定时获取队列消息实现相关的知识,希望对你有一定的参考价值。

描述问题

  最近项目中因为有些数据,需要推送到第三方系统中,因为数据会一直增加,并且需要与第三方系统做相关交互。

相关业务

  本着不影响线上运行效率的思想,我们将增加的消息放入rabbitmq,使用另一个应用获取消费,因为数据只是推送,并且业务的数据有15分钟左右的更新策略,对实时性不是很高所以我们需要一个定时任务来主动链接rabbit去消费,然后将数据以网络方式传送

相关分析

  网络上大致出现了相关的解决办法,但由于实现相关数据丢失及处理、性能和效率等相关基础业务的工作量,望而却步。。。。。。

  还好spring有相关的 org.springframework.amqp 工具包,简化的大量麻烦>_> 让我们开始吧

  了解rabbit的相关几个概念

 了解了这几个概念的时候你可能已经关注到了我们今天的主题SimpleMessageListenerContainer

 我们使用SimpleMessageListenerContainer容器设置消费队列监听,然后设置具体的监听Listener进行消息消费具体逻辑的编写,通过SimpleRabbitListenerContainerFactory我们可以完成相关SimpleMessageListenerContainer容器的管理,

  但对于使用此容器批量消费的方式,官方并没有相关说明,网络上你可能只找到这篇SimpleMessageListenerContainer批量消息处理对于问题描述是很清晰,但是回答只是说的比较简单

  下面我们就对这个问题的答案来个coding

解决办法

  首先我们因为需要失败重试,使用spring的RepublishMessageRecoverer可以解决这个问题,这显然有一个缺点,即将在整个重试期间占用线程。所以我们使用了死信队列

  相关配置

  1     @Bean
  2     ObjectMapper objectMapper() {
  3         ObjectMapper objectMapper = new ObjectMapper();
  4         DateFormat dateFormat = objectMapper.getDateFormat();
  5         JavaTimeModule javaTimeModule = new JavaTimeModule();
  6 
  7         SimpleModule module = new SimpleModule();
  8         module.addSerializer(new ToStringSerializer(Long.TYPE));
  9         module.addSerializer(new ToStringSerializer(Long.class));
 10         module.addSerializer(new ToStringSerializer(BigInteger.class));
 11 
 12         javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
 13         javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
 14         javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
 15 
 16         objectMapper.registerModule(module);
 17         objectMapper.registerModule(javaTimeModule);
 18         objectMapper.setConfig(objectMapper.getDeserializationConfig().with(new ObjectMapperDateFormatExtend(dateFormat)));//反序列化扩展日期格式支持
 19         objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
 20         objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 21         return objectMapper;
 22 }
 23 
 24 
 25 
 26   @Bean
 27   RabbitAdmin admin (ConnectionFactory aConnectionFactory) {
 28     return new RabbitAdmin(aConnectionFactory);
 29   }
 30 
 31   @Bean
 32   MessageConverter jacksonAmqpMessageConverter( ) {
 33     return new Jackson2JsonMessageConverter(objectMapper());
 34   }
 35 
 36 
 37   @Bean
 38   Queue bcwPushControlQueue (RabbitAdmin rabbitAdmin) {
 39     Queue queue = new Queue(Queues.QUEUE_BCW_PUSH);
 40     rabbitAdmin.declareQueue(queue);
 41     return queue;
 42   }
 43   @Bean
 44   Queue bcwPayControlQueue (RabbitAdmin rabbitAdmin) {
 45     Queue queue = new Queue(Queues.QUEUE_BCW_PAY);
 46     rabbitAdmin.declareQueue(queue);
 47     return queue;
 48   }
 49   @Bean
 50   Queue bcwPullControlQueue (RabbitAdmin rabbitAdmin) {
 51     Queue queue = new Queue(Queues.QUEUE_BCW_PULL);
 52     rabbitAdmin.declareQueue(queue);
 53     return queue;
 54   }
 55     /**
 56      * 声明一个交换机
 57      * @return
 58      */
 59   @Bean
 60   TopicExchange controlExchange () {
 61       return new TopicExchange(Exchanges.ExangeTOPIC);
 62   }
 63 
 64 
 65     /**
 66      * 延时重试队列
 67      */
 68     @Bean
 69     public Queue bcwPayControlRetryQueue() {
 70         Map<String, Object> arguments = new HashMap<>();
 71         arguments.put("x-message-ttl", 10 * 1000);
 72         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 73 //        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
 74         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
 75         return new Queue("queue_bcw@pay@retry", true, false, false, arguments);
 76     }
 77     /**
 78      * 延时重试队列
 79      */
 80     @Bean
 81     public Queue bcwPushControlRetryQueue() {
 82         Map<String, Object> arguments = new HashMap<>();
 83         arguments.put("x-message-ttl", 10 * 1000);
 84         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 85 //        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
 86         arguments.put("x-dead-letter-routing-key", "queue_bcw.push");
 87         return new Queue("queue_bcw@push@retry", true, false, false, arguments);
 88     }
 89     /**
 90      * 延时重试队列
 91      */
 92     @Bean
 93     public Queue bcwPullControlRetryQueue() {
 94         Map<String, Object> arguments = new HashMap<>();
 95         arguments.put("x-message-ttl", 10 * 1000);
 96         arguments.put("x-dead-letter-exchange", Exchanges.ExangeTOPIC);
 97 //        如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
 98 //        arguments.put("x-dead-letter-routing-key", "queue_bcw");
 99         return new Queue("queue_bcw@pull@retry", true, false, false, arguments);
100     }
101     @Bean
102     public Binding  bcwPayControlRetryBinding() {
103         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pay.retry");
104     }
105     @Bean
106     public Binding  bcwPushControlRetryBinding() {
107         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.push.retry");
108     }
109     @Bean
110     public Binding   bcwPullControlRetryBinding() {
111         return BindingBuilder.bind(bcwPushControlRetryQueue()).to(controlExchange()).with("queue_bcw.pull.retry");
112     }
113 
114   /**
115    * 队列绑定并关联到RoutingKey
116    *
117    * @param queueMessages 队列名称
118    * @param exchange      交换机
119    * @return 绑定
120    */
121   @Bean
122   Binding bcwPushBindingQueue(@Qualifier("bcwPushControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
123     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.push");
124   }
125   /**
126    * 队列绑定并关联到RoutingKey
127    *
128    * @param queueMessages 队列名称
129    * @param exchange      交换机
130    * @return 绑定
131    */
132   @Bean
133   Binding bcwPayBindingQueue(@Qualifier("bcwPayControlQueue") Queue queueMessages, @Qualifier("controlExchange") TopicExchange exchange) {
134     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pay");
135   }
136   /**
137    * 队列绑定并关联到RoutingKey
138    *
139    * @param queueMessages 队列名称
140    * @param exchange      交换机
141    * @return 绑定
142    */
143   @Bean
144   Binding bcwPullBindingQueue(@Qualifier("bcwPullControlQueue") Queue queueMessages,@Qualifier("controlExchange") TopicExchange exchange) {
145     return BindingBuilder.bind(queueMessages).to(exchange).with("queue_bcw.pull");
146   }
147 
148   @Bean
149   @ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
150   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
151           SimpleRabbitListenerContainerFactoryConfigurer configurer,
152           ConnectionFactory connectionFactory) {
153     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
154     configurer.configure(factory, connectionFactory);
155     factory.setMessageConverter(jacksonAmqpMessageConverter());
156     return factory;
157   }

下面就是我们的主题,定时任务使用的是org.springframework.scheduling

  1 /**
  2  * 手动确认消息的,定时获取队列消息实现
  3  */
  4 public abstract class QuartzSimpleMessageListenerContainer extends SimpleMessageListenerContainer {
  5     protected final Logger logger = LoggerFactory.getLogger(getClass());
  6     private List<Message> body = new LinkedList<>();
  7     public long start_time;
  8     private Channel channel;
  9     @Autowired
 10     private ObjectMapper objectMapper;
 11     @Autowired
 12     private RabbitTemplate rabbitTemplate;
 13 
 14     public QuartzSimpleMessageListenerContainer() {
 15         // 手动确认
 16         this.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 17 
 18         this.setMessageListener((ChannelAwareMessageListener)  (message,channel)  -> {
 19             long current_time = System.currentTimeMillis();
 20             int time = (int) ((current_time - start_time)/1000);
 21             logger.info("====接收到{}队列的消息=====",message.getMessageProperties().getConsumerQueue());
 22             Long retryCount = getRetryCount(message.getMessageProperties());
 23             if (retryCount > 3) {
 24                 logger.info("====此消息失败超过三次{}从队列的消息删除=====",message.getMessageProperties().getConsumerQueue());
 25                 try {
 26                     channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
 27                 } catch (IOException ex) {
 28                     ex.printStackTrace();
 29                 }
 30                 return;
 31             }
 32 
 33             this.body.add(message);
 34             /**
 35              * 判断数组数据是否满了,判断此监听器时间是否大于执行时间
 36              * 如果在最后延时时间段内没有业务消息,此监听器会一直开着
 37              */
 38             if(body.size()>=3 || time>60){
 39                 this.channel = channel;
 40                 callback();
 41             }
 42         });
 43 
 44 
 45 
 46     }
 47     private void callback(){
 48 //         channel = getChannel(getTransactionalResourceHolder());
 49         if(body.size()>0 && channel !=null &&  channel.isOpen()){
 50             try {
 51                 callbackWork();
 52             }catch (Exception e){
 53                 logger.error("推送数据出错:{}",e.getMessage());
 54 
 55                 body.stream().forEach(message -> {
 56                     Long retryCount = getRetryCount(message.getMessageProperties());
 57                     if (retryCount <= 3) {
 58                         logger.info("将消息置入延时重试队列,重试次数:" + retryCount);
 59                         rabbitTemplate.convertAndSend(Exchanges.ExangeTOPIC, message.getMessageProperties().getReceivedRoutingKey()+".retry", message);
 60                     }
 61                 });
 62 
 63             } finally{
 64 
 65                 logger.info("flsher too data");
 66 
 67                 body.stream().forEach(message -> {
 68                     //手动acknowledge
 69                     try {
 70                         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
 71                     } catch (IOException e) {
 72                         logger.error("手动确认消息失败!");
 73                         e.printStackTrace();
 74                     }
 75                 });
 76 
 77                 body.clear();
 78                 this.stop();
 79 
 80             }
 81         }
 82 
 83     }
 84     abstract void callbackWork() throws Exception;
 85     /**
 86      * 获取消息失败次数
 87      * @param properties
 88      * @return
 89      */
 90     private long getRetryCount(MessageProperties properties){
 91         long retryCount = 0L;
 92         Map<String,Object> header = properties.getHeaders();
 93         if(header != null && header.containsKey("x-death")){
 94             List<Map<String,Object>> deaths = (List<Map<String,Object>>)header.get("x-death");
 95             if(deaths.size()>0){
 96                 Map<String,Object> death = deaths.get(0);
 97                 retryCount = (Long)death.get("count");
 98             }
 99         }
100         return retryCount;
101     }
102 
103     @Override
104     @Scheduled(cron = "0 0/2 * * * ? ")
105     public void start() {
106         logger.info("start push data scheduled!");
107         //初始化数据,将未处理的调用stop方法,返还至rabbit
108         body.clear();
109         super.stop();
110         start_time = System.currentTimeMillis();
111         super.start();
112 
113         logger.info("end push data scheduled!");
114     }
115 
116     public List<WDNJPullOrder> getBody() {
117 
118         List<WDNJPullOrder> collect = body.stream().map(data -> {
119                     byte[] body = data.getBody();
120                     WDNJPullOrder readValue = null;
121                     try {
122                         readValue = objectMapper.readValue(body, new TypeReference<WDNJPullOrder>() {
123                         });
124                     } catch (IOException e) {
125                         logger.error("处理数据出错{}",e.getMessage());
126                     }
127                     return readValue;
128                 }
129         ).collect(Collectors.toList());
130 
131         return collect;
132 
133 
134     }
135 
136 }

 

后续

 

当然定时任务的启动,你可以写到相关rabbit容器实现的里面,但是这里并不是很需要,所以对于这个的小改动,同学你可以自己实现

 @Scheduled(cron = "0 0/2 * * * ? ")

public void start()

 

 

以上是关于使用rabbitmq手动确认消息的,定时获取队列消息实现的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ消息队列笔记

RabbitMQ消息队列笔记

RabbitMQ 消息队列学习

RabbitMQ学习总结

RabbitMQ确认机制问题处理

在 RabbitMQ 中手动确认消息