Day650.MQ异步处理问题 -Java业务开发常见错误

Posted 阿昌喜欢吃黄桃

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day650.MQ异步处理问题 -Java业务开发常见错误相关的知识,希望对你有一定的参考价值。

MQ异步处理问题

Hi,我是阿昌,今天学习记录分享的是关于MQ异步处理问题

异步处理是互联网应用不可或缺的一种架构模式,大多数业务项目都是由同步处理、异步处理和定时任务处理三种模式相辅相成实现的。

区别于同步处理,异步处理无需同步等待流程处理完毕,因此适用场景主要包括:

  • 服务于主流程的分支流程。比如,在注册流程中,把数据写入数据库的操作是主流程,但注册后给用户发优惠券或欢迎短信的操作是分支流程,时效性不那么强,可以进行异步处理。
  • 用户不需要实时看到结果的流程。比如,下单后的配货、送货流程完全可以进行异步处理,每个阶段处理完成后,再给用户发推送或短信让用户知晓即可。

同时,异步处理因为可以有 MQ 中间件的介入用于任务的缓冲的分发,所以相比于同步处理,在应对流量洪峰、实现模块解耦和消息广播方面有功能优势。

不过,异步处理虽然好用,但在实现的时候却有三个最容易犯的错,分别是异步处理流程的可靠性问题、消息发送模式的区分问题,以及大量死信消息堵塞队列的问题。

先引入 amqp 依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

一、消息补偿闭环

使用类似 RabbitMQ、RocketMQ 等 MQ 系统来做消息队列实现异步处理,虽然说消息可以落地到磁盘保存,即使 MQ 出现问题消息数据也不会丢失,但是异步流程在消息发送、传输、处理等环节,都可能发生消息丢失。

此外,任何 MQ 中间件都无法确保 100% 可用,需要考虑不可用时异步流程如何继续进行。

因此,对于异步处理流程,必须考虑补偿或者说建立主备双活流程


案例场景:用户注册落数据库的流程为同步流程,会员服务收到消息后发送欢迎消息的流程为异步流程。

  • 蓝色的线,使用 MQ 进行的异步处理,我们称作主线,可能存在消息丢失的情况(虚线代表异步调用);
  • 绿色的线,使用补偿 Job 定期进行消息补偿,我们称作备线,用来补偿主线丢失的消息;
  • 考虑到极端的 MQ 中间件失效的情况,我们要求备线的处理吞吐能力达到主线的能力水平。

实现代码

首先,定义 UserController 用于注册 + 发送异步消息。对于注册方法,我们一次性注册 10 个用户,用户注册消息不能发送出去的概率为 50%。

@RestController
@Slf4j
@RequestMapping("user")
public class UserController 
    @Autowired
    private UserService userService;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("register")
    public void register() 
        //模拟10个用户注册
        IntStream.rangeClosed(1, 10).forEach(i -> 
            //落库
            User user = userService.register();
            //模拟50%的消息可能发送失败
            if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) 
                //通过RabbitMQ发送消息
               rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user);
                log.info("sent mq user ", user.getId());
            
        );
    

然后,定义 MemberService 类用于模拟会员服务。会员服务监听用户注册成功的消息,并发送欢迎短信。

我们使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂等,避免相同的用户进行补偿时重复发送短信:

@Component
@Slf4j
public class MemberService 
    //发送欢迎消息的状态
    private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();
    //监听用户注册成功的消息,发送欢迎消息
    @RabbitListener(queues = RabbitConfiguration.QUEUE)
    public void listen(User user) 
        log.info("receive mq user ", user.getId());
        welcome(user);
    
    //发送欢迎消息
    public void welcome(User user) 
        //去重操作
        if (welcomeStatus.putIfAbsent(user.getId(), true) == null) 
            try 
                TimeUnit.SECONDS.sleep(2);
             catch (InterruptedException e) 
            
            log.info("memberService: welcome new user ", user.getId());
        
    

对于 MQ 消费程序,处理逻辑务必考虑去重(支持幂等),原因有几个:

  • MQ 消息可能会因为中间件本身配置错误、稳定性等原因出现重复。
  • 自动补偿重复,比如本例,同一条消息可能既走 MQ 也走补偿,肯定会出现重复,而且考虑到高内聚,补偿 Job 本身不会做去重处理。
  • 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时间后处理程序又收到消息了,重复处理。我之前就遇到过一次由 MQ 故障引发的事故,MQ 中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了就先通过后台进行了人工处理,结果 MQ 系统恢复后消息又被重复处理了一次,造成大量资金重复发放。

定义补偿 Job 也就是备线操作

我们在 CompensationJob 中定义一个 @Scheduled 定时任务,5 秒做一次补偿操作,因为 Job 并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑是:每 5 秒补偿一次,按顺序一次补偿 5 个用户,下一次补偿操作从上一次补偿的最后一个用户 ID 开始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力。

@Component
@Slf4j
public class CompensationJob 
    //补偿Job异步处理线程池
    private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor(
            10, 10,
            1, TimeUnit.HOURS,
            new ArrayBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get());
    @Autowired
    private UserService userService;
    @Autowired
    private MemberService memberService;
    //目前补偿到哪个用户ID
    private long offset = 0;

    //10秒后开始补偿,5秒补偿一次
    @Scheduled(initialDelay = 10_000, fixedRate = 5_000)
    public void compensationJob() 
        log.info("开始从用户ID  补偿", offset);
        //获取从offset开始的用户
        userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> 
            compensationThreadPool.execute(() -> memberService.welcome(user));
            offset = user.getId();
        );
    

为了实现高内聚,主线和备线处理消息,最好使用同一个方法。

比如,本例中 MemberService 监听到 MQ 消息和 CompensationJob 补偿,调用的都是 welcome 方法。

此外值得一说的是,Demo 中的补偿逻辑比较简单,生产级的代码应该在以下几个方面进行加强:

  • 考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足补偿的吞吐量。
  • 考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前的用户再进行补偿,以方便和主线 MQ 实时流程错开,避免冲突。
  • 诸如当前补偿到哪个用户的 offset 数据,需要落地数据库。
  • 补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统。

运行程序,执行注册方法注册 10 个用户,输出如下:

[17:01:16.570] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 1
[17:01:16.571] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 5
[17:01:16.572] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 7
[17:01:16.573] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 8
[17:01:16.594] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 1
[17:01:18.597] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 1
[17:01:18.601] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 5
[17:01:20.603] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 5
[17:01:20.604] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 7
[17:01:22.605] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 7
[17:01:22.606] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 8
[17:01:24.611] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 8
[17:01:25.498] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 0 补偿
[17:01:27.510] [compensation-threadpool-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 2
[17:01:27.510] [compensation-threadpool-3] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 4
[17:01:27.511] [compensation-threadpool-2] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 3
[17:01:30.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 5 补偿
[17:01:32.500] [compensation-threadpool-6] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 6
[17:01:32.500] [compensation-threadpool-9] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 9
[17:01:35.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 9 补偿
[17:01:37.501] [compensation-threadpool-0] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 10
[17:01:40.495] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - 开始从用户ID 10 补偿
  • 总共 10 个用户,MQ 发送成功的用户有四个,分别是用户 1、5、7、8。
  • 补偿任务第一次运行,补偿了用户 2、3、4,第二次运行补偿了用户 6、9,第三次运行补充了用户 10。

针对消息的补偿闭环处理的最高标准是,能够达到补偿全量数据的吞吐量。

也就是说,如果补偿备线足够完善,即使直接把 MQ 停机,虽然会略微影响处理的及时性,但至少确保流程都能正常执行。


二、注意消息模式是广播还是工作队列

消息广播,和我们平时说的“广播”意思差不多,就是希望同一条消息,不同消费者都能分别消费;而队列模式,就是不同消费者共享消费同一个队列的数据,相同消息只能被某一个消费者消费一次。

比如,同一个用户的注册消息,会员服务需要监听以发送欢迎短信,营销服务同样需要监听以发送新用户小礼物。

但是,会员服务、营销服务都可能有多个实例,我们期望的是同一个用户的消息,可以同时广播给不同的服务(广播模式),但对于同一个服务的不同实例(比如会员服务 1 和会员服务 2),不管哪个实例来处理,处理一次即可(工作队列模式):

在实现代码的时候,我们务必确认 MQ 系统的机制,确保消息的路由按照我们的期望。

对于类似 RocketMQ 这样的 MQ 来说,实现类似功能比较简单直白:如果消费者属于一个组,那么消息只会由同一个组的一个消费者来消费;如果消费者属于不同组,那么每个组都能消费一遍消息。

而对于 RabbitMQ 来说,消息路由的模式采用的是队列 + 交换器,队列是消息的载体,交换器决定了消息路由到队列的方式,配置比较复杂,容易出错。所以,接下来我重点和你讲讲 RabbitMQ 的相关代码实现。

我们还是以上面的架构图为例,来演示使用 RabbitMQ 实现广播模式和工作队列模式的坑。

第一步,实现会员服务监听用户服务发出的新用户注册消息的那部分逻辑。

如果我们启动两个会员服务,那么同一个用户的注册消息应该只能被其中一个实例消费。我们分别实现 RabbitMQ 队列、交换器、绑定三件套。

其中,队列用的是匿名队列,交换器用的是直接交换器 DirectExchange,交换器绑定到匿名队列的路由 Key 是空字符串。在收到消息之后,我们会打印所在实例使用的端口:

//为了代码简洁直观,我们把消息发布者、消费者、以及MQ的配置代码都放在了一起
@Slf4j
@Configuration
@RestController
@RequestMapping("workqueuewrong")
public class WorkQueueWrong 

    private static final String EXCHANGE = "newuserExchange";
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping
    public void sendMessage() 
        rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
    

    //使用匿名队列作为消息队列
    @Bean
    public Queue queue() 
        return new AnonymousQueue();
    
  
    //声明DirectExchange交换器,绑定队列到交换器
    @Bean
    public Declarables declarables() 
        DirectExchange exchange = new DirectExchange(EXCHANGE);
        return new Declarables(queue(), exchange,
                BindingBuilder.bind(queue()).to(exchange).with(""));
    

    //监听队列,队列名称直接通过SpEL表达式引用Bean
    @RabbitListener(queues = "#queue.name")
    public void memberService(String userName) 
        log.info("memberService: welcome message sent to new user  from ", userName, System.getProperty("server.port"));

    
   

使用 12345 和 45678 两个端口启动两个程序实例后,调用 sendMessage 接口发送一条消息,输出的日志,显示同一个会员服务两个实例都收到了消息


出现这个问题的原因是,我们没有理清楚 RabbitMQ 直接交换器和队列的绑定关系。

如下图所示,RabbitMQ 的直接交换器根据 routingKey 对消息进行路由。由于我们的程序每次启动都会创建匿名(随机命名)的队列,所以相当于每一个会员服务实例都对应独立的队列,以空 routingKey 绑定到直接交换器。用户服务发出消息的时候也设置了 routingKey 为空,所以直接交换器收到消息之后,发现有两条队列匹配,于是都转发了消息:

要修复这个问题其实很简单,对于会员服务不要使用匿名队列,而是使用同一个队列即可。把上面代码中的匿名队列替换为一个普通队列:

private static final String QUEUE = "newuserQueue";
@Bean
public Queue queue() 
    return new Queue(QUEUE);

测试发现,对于同一条消息来说,两个实例中只有一个实例可以收到,不同的消息按照轮询分发给不同的实例。现在,交换器和队列的关系是这样的:


第二步,进一步完整实现用户服务需要广播消息给会员服务和营销服务的逻辑。

我们希望会员服务和营销服务都可以收到广播消息,但会员服务或营销服务中的每个实例只需要收到一次消息。

代码如下,我们声明了一个队列和一个广播交换器 FanoutExchange,然后模拟两个用户服务和两个营销服务:

@Slf4j
@Configuration
@RestController
@RequestMapping("fanoutwrong")
public class FanoutQueueWrong 
    private static final String QUEUE = "newuser";
    private static final String EXCHANGE =

以上是关于Day650.MQ异步处理问题 -Java业务开发常见错误的主要内容,如果未能解决你的问题,请参考以下文章

Day449.kafka概述&快速入门 -kafka

JavaWeb day08 ajax介绍JQuery框架JSONajax跨域访问

python基础学习日志day10-事件驱动模型

Day419.CompletableFuture 异步编排 -谷粒商城

Day419.CompletableFuture 异步编排 -谷粒商城

Python_Day11_同步IO和异步IO