升级版 @Async,让异步任务无懈可击
Posted Java知音_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了升级版 @Async,让异步任务无懈可击相关的知识,希望对你有一定的参考价值。
1. 概览
Spring 的 @Async 注解,想必大家都非常熟悉,只需在方法上增加 @Aysnc
,便可以将其转化为异步操作,任务在后台线程池中运行。
由于数据存储于内存,服务重启存在任务丢失问题,所以,只适用于要求不太严谨的业务,对于要求严格的场景,只能另选方案。
1.1. 背景
在日常开发过程中,像记录日志这种非核心业务,才允许使用 Spring 的 Async 进行异步化,其他场景需要使用更加完备的 MQ 方案。
面对这种场景,免不了一顿编码、一通测试,咱们得时间就这样没有了。对于这种纯纯的技术需求,封装框架是投入产出比最高的事。
1.2. 目标
期望框架能够提供:
不需要 Coding,直接将一个方法转变为 MQ 的异步处理;
支持 顺序消息 特性,以处理对顺序有依赖的场景;
发送,消费可以分离,能够在不同的集群中完成,以更好的支持资源隔离;
2. 快速入门
框架基于 RocketMQ 进行构建,请自行完成 RocketMQ 的搭建。
2.1. 引入 RocketMQ
我们使用
rocketmq starter
完成基本配置。
首先,在 pom 中增加 rocketmq starter
依赖,具体如下:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
其次,在 application.yml 中添加 rocketmq 配置,具体如下:
rocketmq:
name-server: http://127.0.0.1:9876
producer:
group: async-demo
其中,name-server 根据具体情况进行配置。
配置完成,可以在项目中:
注入
RocketMQTemplate
进行消息发送;使用
@RocketMQMessageListener
标记处理方法,进行消息消费;
2.2. 添加 lego-starter 依赖
为了方便与 spring-boot 项目集成,lego 提供
lego-starter
,以完成快速接入。
在 pom 中增加 starter,具体如下:
<dependency>
<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>0.1.4-async_based_rocketmq-SNAPSHOT</version>
</dependency>
其中,自动配置机制将完成:
为
@AsyncBasedRocketMQ
注解方法,增加消息拦截,并启动 并行消费者 进行消息消费;为
@AsyncForOrderedBasedRocketMQ
注解方法,增加消息拦截,并启动 顺序消费者进行消息消费;
2.3. 并行消息异步处理
我们只需在方法上添加 @AsyncBasedRocketMQ
注解,完成基础配置,该方法便具有异步处理能力。具体如下:
@AsyncBasedRocketMQ(topic = "$async.test.normal.topic",
tag = "asyncTest1",
consumerGroup = "$async.test.normal.group1")
public void asyncTest1(Long id, String name, AsyncInputBean bean)
log.info("receive data id , name , bean", id, name, bean);
CallData callData = new CallData(id, name, bean);
this.callDatas.add(callData);
@AsyncBasedRocketMQ
定义如下:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncBasedRocketMQ
/**
* MQ topic
* @return
*/
String topic();
/**
* MQ tag
* @return
*/
String tag();
/**
* 消费组
* @return
*/
String consumerGroup();
/**
* nameServer 配置
* @return
*/
String nameServer() default "$rocketmq.name-server:";
/**
* 消费者运行的 profile,主要用于发送和消费分离的场景
* @return
*/
String consumerProfile() default "";
在 application 文件中增加相关配置,具体如下:
async:
test:
normal:
topic: normal-async-test-topic
group1: normal-async-test-group1
group2: normal-async-test-group2
写一个简单的单测,代码如下:
@Test
public void asyncTest1() throws InterruptedException
asyncService.getCallDatas().clear();;
Long id = RandomUtils.nextLong();
String name = String.valueOf(RandomUtils.nextLong());
AsyncInputBean bean = createAsyncInputBean();
asyncService.asyncTest1(id, name, bean);
List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
Assertions.assertTrue(CollectionUtils.isEmpty(callDatas));
TimeUnit.SECONDS.sleep(2);
List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
Assertions.assertFalse(CollectionUtils.isEmpty(callDatas));
AsyncService.CallData callData = callDatas.get(0);
Assertions.assertEquals(id, callData.getId());
Assertions.assertEquals(name, callData.getName());
Assertions.assertEquals(bean, callData.getBean());
运行单测,日志如下:
[ main] c.g.l.c.a.normal.NormalAsyncInterceptor : After serialize, data is xxxxx
[ main] c.g.l.c.a.normal.NormalAsyncInterceptor : success to send async Task to RocketMQ, args is xxxx, msg is yyyy, result is zzz
[MessageThread_1] com.geekhalo.lego.async.AsyncService : receive data id 8926281443373242368, name 1130519434586076160, bean
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxxx, cost: 31 ms
为了方便,对部分日志进行简化,但不影响分析结果。
从运行日志可以得出:
NormalAsyncInterceptor
运行在主线程中,主要完成:
对参数进行序列化
将序列化结果发送至
Rocketmq
业务服务AsyncService
运行在消费线程,主要完成:
调用业务方法
打印消费信息
2.4. 顺序消息异步处理
RocketMQ 支持顺序消息,通过指定 hashKey 可以保障相同 hashKey的 Message 路由到同一线程,以模拟顺序消费。
如果需要使用顺序消息,只需使用 @AsyncForOrderedBasedRocketMQ
即可,具体如下:
@AsyncForOrderedBasedRocketMQ(topic = "$async.test.order.topic",
tag = "asyncTest1",
shardingKey = "#id",
consumerGroup = "$async.test.order.group1")
public void asyncTestForOrder1(Long id, String name, AsyncInputBean bean)
log.info("receive data id , name , bean ", id, name, bean);
CallData callData = new CallData(id, name, bean);
this.callDatas.add(callData);
其中,shardingKey = "#id"
含义为,将参数 id 的值作为 shardingKey。
与 AsyncBasedRocketMQ
相比,核心配置不变,只增加 shardingKey
配置,具体定义如下:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncForOrderedBasedRocketMQ
/**
* MQ topic
* @return
*/
String topic();
/**
* MQ tag
* @return
*/
String tag();
/**
* 顺序消费设置的 hashKey
* @return
*/
String shardingKey();
/**
* 消费组
* @return
*/
String consumerGroup();
/**
* nameServer 配置
* @return
*/
String nameServer() default "$rocketmq.name-server:";
/**
* 消费者运行的 profile,主要用于发送和消费分离的场景
* @return
*/
String consumerProfile() default "";
在 application.yml 增加相关配置,具体如下:
async:
test:
order:
topic: order-async-test-topic
group1: order-async-test-group1
group2: order-async-test-group2
编写单元测试用例,具体如下:
@Test
public void asyncForOrderTest1() throws InterruptedException
List<InputData> inputDatas = new ArrayList<>();
Long[] ids = new Long[]RandomUtils.nextLong(), RandomUtils.nextLong(), RandomUtils.nextLong(), RandomUtils.nextLong();
String name = String.valueOf(RandomUtils.nextLong());
AsyncInputBean bean = createAsyncInputBean();
asyncService.getCallDatas().clear();
asyncService.asyncTestForOrder1(ids[0], name, bean);
inputDatas.add(new InputData(ids[0], name, bean));
List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
Assertions.assertTrue(CollectionUtils.isEmpty(callDatas));
for (int i = 0; i< 100; i++)
name = String.valueOf(RandomUtils.nextLong());
bean = createAsyncInputBean();
asyncService.asyncTestForOrder1(ids[i%ids.length], name, bean);
inputDatas.add(new InputData(ids[i%ids.length], name, bean));
TimeUnit.SECONDS.sleep(10);
List<AsyncService.CallData> callDatas = this.asyncService.getCallDatas();
Assertions.assertFalse(CollectionUtils.isEmpty(callDatas));
Assertions.assertEquals(inputDatas.size(), callDatas.size());
Map<Long, List<AsyncService.CallData>> callDataMap = callDatas.stream().collect(Collectors.groupingBy(AsyncService.CallData::getId));
Map<Long, List<InputData>> inputDataMap = inputDatas.stream().collect(Collectors.groupingBy(InputData::getId));
for (Long id : ids)
List<AsyncService.CallData> callDataToCheck = callDataMap.get(id);
List<InputData> inputDataToCheck = inputDataMap.get(id);
Assertions.assertEquals(callDataToCheck.size(), inputDataToCheck.size());
for (int j = 0; j < callDataToCheck.size(); j++)
AsyncService.CallData callData = callDataToCheck.get(j);
InputData inputData1 = inputDataToCheck.get(j);
Assertions.assertEquals(inputData1.getId(), callData.getId());
Assertions.assertEquals(inputData1.getName(), callData.getName());
Assertions.assertEquals(inputData1.getBean(), callData.getBean());
运行测试用例,观察日志如下:
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : After serialize, data is xxx
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 6723772904149174272, msg is yyy, result is zzz
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : After serialize, data xxx
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 6723772904149174272, msg is yyy, result is zzz
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : After serialize, data is xxx
[MessageThread_1] com.geekhalo.lego.async.AsyncService : receive data id 6723772904149174272, name 8410395540617317376, bean AsyncInputBean(id=81325280405335040, name=1950309494, age=976367396)
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxx, cost: 1 ms
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : success to send orderly async Task to RocketMQ, args is xxx , shardingKey is 8896761273036908544, msg is zzz
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : After serialize, data is xxx
[MessageThread_1] com.geekhalo.lego.async.AsyncService : receive data id 6723772904149174272, name 693725382660268032, bean AsyncInputBean(id=1379620281334973440, name=1090615484, age=1421031650)
[MessageThread_1] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt zzz, cost: 0 ms
[MessageThread_2] com.geekhalo.lego.async.AsyncService : receive data id 8896761273036908544, name 7307088811299682304, bean AsyncInputBean(id=594404553604282368, name=812325506, age=1784532908)
[ main] c.g.l.c.a.order.OrderedAsyncInterceptor : success to send orderly async Task to RocketMQ, args is xxx, shardingKey is 717741895048495104, msg is zzz
[MessageThread_2] g.l.c.a.s.AbstractAsyncConsumerContainer : consume data MessageExt xxx, cost: 0 ms
从日志上可见:
主线程 和 消费线程 交叉输出日志;
拦截器
OrderedAsyncInterceptor
基于sharding key
向 RocketMQ 发送顺序消息;AbstractAsyncConsumerContainer
对顺序消息进行消费;
2.5. 发送和消费分离
有时为了更好的对资源进行隔离,会单独部署一组集群,用于处理后台任务。
为支持该模式,AsyncBasedRocketMQ
和 AsyncForOrderedBasedRocketMQ
都提供了 consumerProfile
配置,用于指定 Consumer 在哪个 profile 下执行,如果不设置,则对环境不进行任何要求。
3.设计&扩展
3.1. 核心设计
在方法上添加注解后,框架自动完成:
增加
AsyncInterceptor Bean
,用于对方法进行拦截;启动
MQConsumer
监听消息变更,并调用 业务方法;
3.2. 核心流程
核心流程如下:
方法被调用,被
AsyncInterceptor
拦截;
首先,对调用参数进行序列化;
然后,将信息封装为
Message
最后,将
Message
发送至RocketMQ
消息在RocketMQ进行存储,并投放至 Consumer;
MQPushConsumer
,监听消息,并完成业务操作;
Consumer
获得Message
信息将消息进行反序列化,获得调用参数
使用调用参数调用业务方法
4. 项目信息
项目仓库地址:
https://gitee.com/litao851025/lego
项目文档地址:
https://gitee.com/litao851025/lego/wikis/support/asyncBasedRocketMQ
推荐
PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。点“在看”支持我们吧!
以上是关于升级版 @Async,让异步任务无懈可击的主要内容,如果未能解决你的问题,请参考以下文章