升级版 @Async,让异步任务无懈可击

Posted Java知音_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了升级版 @Async,让异步任务无懈可击相关的知识,希望对你有一定的参考价值。

1. 概览

Spring 的 @Async 注解,想必大家都非常熟悉,只需在方法上增加 @Aysnc ,便可以将其转化为异步操作,任务在后台线程池中运行。

由于数据存储于内存,服务重启存在任务丢失问题,所以,只适用于要求不太严谨的业务,对于要求严格的场景,只能另选方案。

1.1. 背景

在日常开发过程中,像记录日志这种非核心业务,才允许使用 Spring 的 Async 进行异步化,其他场景需要使用更加完备的 MQ 方案。

MQ 方案

面对这种场景,免不了一顿编码、一通测试,咱们得时间就这样没有了。对于这种纯纯的技术需求,封装框架是投入产出比最高的事。

1.2. 目标

期望框架能够提供:

  1. 不需要 Coding,直接将一个方法转变为 MQ 的异步处理;

  2. 支持 顺序消息 特性,以处理对顺序有依赖的场景;

  3. 发送,消费可以分离,能够在不同的集群中完成,以更好的支持资源隔离;

2. 快速入门

框架基于 RocketMQ 进行构建,请自行完成 RocketMQ 的搭建。

2.1. 引入 RocketMQ

我们使用 rocketmq starter 完成基本配置。

首先,在 pom 中增加 rocketmq starter 依赖,具体如下:

<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-spring-boot-starter</artifactId>
 <version>2.2.1</version>
</dependency>

其次,在 application.yml 中添加 rocketmq 配置,具体如下:

rocketmq:
  name-server: http://127.0.0.1:9876
  producer:
    group: async-demo

其中,name-server 根据具体情况进行配置。

配置完成,可以在项目中:

  1. 注入 RocketMQTemplate 进行消息发送;

  2. 使用 @RocketMQMessageListener 标记处理方法,进行消息消费;

2.2. 添加 lego-starter 依赖

为了方便与 spring-boot 项目集成,lego 提供 lego-starter,以完成快速接入。

在 pom 中增加 starter,具体如下:

<dependency>
 <groupId>com.geekhalo.lego</groupId>
 <artifactId>lego-starter</artifactId>
 <version>0.1.4-async_based_rocketmq-SNAPSHOT</version>
</dependency>

其中,自动配置机制将完成:

  1. @AsyncBasedRocketMQ 注解方法,增加消息拦截,并启动 并行消费者 进行消息消费;

  2. @AsyncForOrderedBasedRocketMQ 注解方法,增加消息拦截,并启动 顺序消费者进行消息消费;

2.3. 并行消息异步处理

我们只需在方法上添加 @AsyncBasedRocketMQ 注解,完成基础配置,该方法便具有异步处理能力。具体如下:

@AsyncBasedRocketMQ(topic = "$async.test.normal.topic",
        tag = "asyncTest1",
        consumerGroup = "$async.test.normal.group1")
public void asyncTest1(Long id, String name, AsyncInputBean bean)
    log.info("receive data id , name , bean", id, name, bean);

    CallData callData = new CallData(id, name, bean);
    this.callDatas.add(callData);

@AsyncBasedRocketMQ 定义如下:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncBasedRocketMQ 
    /**
     * MQ topic
     * @return
     */
    String topic();

    /**
     * MQ tag
     * @return
     */
    String tag();

    /**
     * 消费组
     * @return
     */
    String consumerGroup();

    /**
     * nameServer 配置
     * @return
     */
    String nameServer() default "$rocketmq.name-server:";

    /**
     * 消费者运行的 profile,主要用于发送和消费分离的场景
     * @return
     */
    String consumerProfile() default "";

在 application 文件中增加相关配置,具体如下:

async:
  test:
    normal:
      topic: normal-async-test-topic
      group1: normal-async-test-group1
      group2: normal-async-test-group2

写一个简单的单测,代码如下:

@Test
public void asyncTest1() throws InterruptedException 
    
    asyncService.getCallDatas().clear();;

    Long id = RandomUtils.nextLong();
    String name = String.valueOf(RandomUtils.nextLong());
    AsyncInputBean bean = createAsyncInputBean();
    asyncService.asyncTest1(id, name, bean);

    
        List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
        Assertions.assertTrue(CollectionUtils.isEmpty(callDatas));
    

    TimeUnit.SECONDS.sleep(2);

    
        List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
        Assertions.assertFalse(CollectionUtils.isEmpty(callDatas));

        AsyncService.CallData callData = callDatas.get(0);
        Assertions.assertEquals(id, callData.getId());
        Assertions.assertEquals(name, callData.getName());
        Assertions.assertEquals(bean, callData.getBean());
    

运行单测,日志如下:

[           main] c.g.l.c.a.normal.NormalAsyncInterceptor  : After serialize, data is xxxxx
[           main] c.g.l.c.a.normal.NormalAsyncInterceptor  : success to send async Task to RocketMQ, args is xxxx, msg is yyyy, result is zzz
[MessageThread_1] com.geekhalo.lego.async.AsyncService     : receive data id 8926281443373242368, name 1130519434586076160, bean
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxxx, cost: 31 ms

为了方便,对部分日志进行简化,但不影响分析结果。

从运行日志可以得出:

  1. NormalAsyncInterceptor 运行在主线程中,主要完成:

  • 对参数进行序列化

  • 将序列化结果发送至 Rocketmq

业务服务AsyncService 运行在消费线程,主要完成:

  • 调用业务方法

  • 打印消费信息

2.4. 顺序消息异步处理

RocketMQ 支持顺序消息,通过指定 hashKey 可以保障相同 hashKey的 Message 路由到同一线程,以模拟顺序消费。

如果需要使用顺序消息,只需使用 @AsyncForOrderedBasedRocketMQ 即可,具体如下:

@AsyncForOrderedBasedRocketMQ(topic = "$async.test.order.topic",
        tag = "asyncTest1",
        shardingKey = "#id",
        consumerGroup = "$async.test.order.group1")
public void asyncTestForOrder1(Long id, String name, AsyncInputBean bean)
    log.info("receive data id , name , bean ", id, name, bean);

    CallData callData = new CallData(id, name, bean);
    this.callDatas.add(callData);

其中,shardingKey = "#id" 含义为,将参数 id 的值作为 shardingKey。

AsyncBasedRocketMQ 相比,核心配置不变,只增加 shardingKey 配置,具体定义如下:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncForOrderedBasedRocketMQ 
    /**
     * MQ topic
     * @return
     */
    String topic();

    /**
     * MQ tag
     * @return
     */
    String tag();

    /**
     * 顺序消费设置的 hashKey
     * @return
     */
    String shardingKey();

    /**
     * 消费组
     * @return
     */
    String consumerGroup();

    /**
     * nameServer 配置
     * @return
     */
    String nameServer() default "$rocketmq.name-server:";

    /**
     * 消费者运行的 profile,主要用于发送和消费分离的场景
     * @return
     */
    String consumerProfile() default "";

在 application.yml 增加相关配置,具体如下:

async:
  test:
    order:
      topic: order-async-test-topic
      group1: order-async-test-group1
      group2: order-async-test-group2

编写单元测试用例,具体如下:

@Test
public void asyncForOrderTest1() throws InterruptedException 

    List<InputData> inputDatas = new ArrayList<>();
    Long[] ids = new Long[]RandomUtils.nextLong(), RandomUtils.nextLong(), RandomUtils.nextLong(), RandomUtils.nextLong();
    String name = String.valueOf(RandomUtils.nextLong());
    AsyncInputBean bean = createAsyncInputBean();

    asyncService.getCallDatas().clear();
    asyncService.asyncTestForOrder1(ids[0], name, bean);
    inputDatas.add(new InputData(ids[0], name, bean));

    
        List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
        Assertions.assertTrue(CollectionUtils.isEmpty(callDatas));
    


    for (int i = 0; i< 100; i++) 
        name = String.valueOf(RandomUtils.nextLong());
        bean = createAsyncInputBean();
        asyncService.asyncTestForOrder1(ids[i%ids.length], name, bean);
        inputDatas.add(new InputData(ids[i%ids.length], name, bean));
    



    TimeUnit.SECONDS.sleep(10);

    
        List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
        Assertions.assertFalse(CollectionUtils.isEmpty(callDatas));

        Assertions.assertEquals(inputDatas.size(), callDatas.size());

        Map<Long, List<AsyncService.CallData>> callDataMap = callDatas.stream().collect(Collectors.groupingBy(AsyncService.CallData::getId));
        Map<Long, List<InputData>> inputDataMap = inputDatas.stream().collect(Collectors.groupingBy(InputData::getId));

        for (Long id : ids)
            List<AsyncService.CallData> callDataToCheck = callDataMap.get(id);
            List<InputData> inputDataToCheck = inputDataMap.get(id);

            Assertions.assertEquals(callDataToCheck.size(), inputDataToCheck.size());

            for (int j = 0; j < callDataToCheck.size(); j++) 
                AsyncService.CallData callData = callDataToCheck.get(j);
                InputData inputData1 = inputDataToCheck.get(j);

                Assertions.assertEquals(inputData1.getId(), callData.getId());
                Assertions.assertEquals(inputData1.getName(), callData.getName());
                Assertions.assertEquals(inputData1.getBean(), callData.getBean());
            
        
    

运行测试用例,观察日志如下:

[           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : After serialize, data is xxx
[           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 6723772904149174272, msg is yyy, result is zzz
[           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : After serialize, data xxx
[           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 6723772904149174272, msg is yyy, result is zzz
[           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : After serialize, data is xxx
[MessageThread_1] com.geekhalo.lego.async.AsyncService     : receive data id 6723772904149174272, name 8410395540617317376, bean AsyncInputBean(id=81325280405335040, name=1950309494, age=976367396)
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxx, cost: 1 ms
[           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 8896761273036908544, msg is zzz
[           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : After serialize, data is xxx
[MessageThread_1] com.geekhalo.lego.async.AsyncService     : receive data id 6723772904149174272, name 693725382660268032, bean AsyncInputBean(id=1379620281334973440, name=1090615484, age=1421031650)
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt zzz, cost: 0 ms
[MessageThread_2] com.geekhalo.lego.async.AsyncService     : receive data id 8896761273036908544, name 7307088811299682304, bean AsyncInputBean(id=594404553604282368, name=812325506, age=1784532908)
[           main] c.g.l.c.a.order.OrderedAsyncInterceptor  : success to send orderly async Task to RocketMQ, args is xxx, shardingKey is 717741895048495104, msg is zzz
[MessageThread_2] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxx, cost: 0 ms

从日志上可见:

  1. 主线程 和 消费线程 交叉输出日志;

  2. 拦截器 OrderedAsyncInterceptor 基于 sharding key 向 RocketMQ 发送顺序消息;

  3. AbstractAsyncConsumerContainer 对顺序消息进行消费;

2.5. 发送和消费分离

有时为了更好的对资源进行隔离,会单独部署一组集群,用于处理后台任务。

为支持该模式,AsyncBasedRocketMQAsyncForOrderedBasedRocketMQ 都提供了 consumerProfile 配置,用于指定 Consumer 在哪个 profile 下执行,如果不设置,则对环境不进行任何要求。

3.设计&扩展

3.1. 核心设计

整体架构

在方法上添加注解后,框架自动完成:

  1. 增加 AsyncInterceptor Bean,用于对方法进行拦截;

  2. 启动 MQConsumer 监听消息变更,并调用 业务方法;

3.2. 核心流程

核心流程如下:

  1. 方法被调用,被 AsyncInterceptor 拦截;

  • 首先,对调用参数进行序列化;

  • 然后,将信息封装为 Message

  • 最后,将Message发送至 RocketMQ

消息在RocketMQ进行存储,并投放至 Consumer;

MQPushConsumer,监听消息,并完成业务操作;

  • Consumer 获得 Message 信息

  • 将消息进行反序列化,获得调用参数

  • 使用调用参数调用业务方法

4. 项目信息

项目仓库地址:

  • https://gitee.com/litao851025/lego

项目文档地址:

  • https://gitee.com/litao851025/lego/wikis/support/asyncBasedRocketMQ

推荐

Java面试题宝典

技术内卷群,一起来学习!!

PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!

以上是关于升级版 @Async,让异步任务无懈可击的主要内容,如果未能解决你的问题,请参考以下文章

让我们为C#异步编程Async正名

让我们再为C#异步编程Async正名

前端开发:解决异步回调必备技能——Async/Await和Promise

springboot隔离@Async异步任务的线程池

SpecFlow测试平台是否支持异步任务?

async和await异步编程资源汇总