异步任务spring @Async注解源码解析

Posted 只会一点java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步任务spring @Async注解源码解析相关的知识,希望对你有一定的参考价值。

1.引子

开启异步任务使用方法:

1).方法上加@Async注解 

2).启动类或者配置类上@EnableAsync

2.源码解析

虽然spring5已经出来了,但是我们还是使用的spring4,本文就根据spring-context-4.3.14.RELEASE.jar来分析源码。

2.1.@Async

org.springframework.scheduling.annotation.Async 源码注释翻译:

 1 /**
 2  * Annotation that marks a method as a candidate for <i>asynchronous</i> execution.
 3  * Can also be used at the type level, in which case all of the type\'s methods are
 4  * considered as asynchronous.该注解可以标记一个异步执行的方法,也可以用来标注类,表示类中的所有方法都是异步执行的。
 5  *
 6  * <p>In terms of target method signatures, any parameter types are supported.
 7  * However, the return type is constrained to either {@code void} or
 8  * {@link java.util.concurrent.Future}. In the latter case, you may declare the
 9  * more specific {@link org.springframework.util.concurrent.ListenableFuture} or
10  * {@link java.util.concurrent.CompletableFuture} types which allow for richer
11  * interaction with the asynchronous task and for immediate composition with
12  * further processing steps.入参随意,但返回值只能是void或者Future.(ListenableFuture接口/CompletableFuture类)
13 * 14 * <p>A {@code Future} handle returned from the proxy will be an actual asynchronous 15 * {@code Future} that can be used to track the result of the asynchronous method 16 * execution. However, since the target method needs to implement the same signature, 17 * it will have to return a temporary {@code Future} handle that just passes a value 18 * through: e.g. Spring\'s {@link AsyncResult}, EJB 3.1\'s {@link javax.ejb.AsyncResult}, 19 * or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}. 20 * Future是代理返回的切实的异步返回,用以追踪异步方法的返回值。当然也可以使用AsyncResult类(实现ListenableFuture接口)(Spring或者EJB都有)或者CompletableFuture类 21 * @author Juergen Hoeller 22 * @author Chris Beams 23 * @since 3.0 24 * @see AnnotationAsyncExecutionInterceptor 25 * @see AsyncAnnotationAdvisor 26 */ 27 @Target({ElementType.METHOD, ElementType.TYPE}) 28 @Retention(RetentionPolicy.RUNTIME) 29 @Documented 30 public @interface Async { 31 32 /** 33 * A qualifier value for the specified asynchronous operation(s). 34 * <p>May be used to determine the target executor to be used when executing this 35 * method, matching the qualifier value (or the bean name) of a specific 36 * {@link java.util.concurrent.Executor Executor} or 37 * {@link org.springframework.core.task.TaskExecutor TaskExecutor} 38 * bean definition.用以限定执行方法的执行器名称(自定义):Executor或者TaskExecutor 39 * <p>When specified on a class level {@code @Async} annotation, indicates that the 40 * given executor should be used for all methods within the class. Method level use 41 * of {@code Async#value} always overrides any value set at the class level. 42 * @since 3.1.2 加在类上表示整个类都使用,加在方法上会覆盖类上的设置 43 */ 44 String value() default ""; 45 46 }

上图源码注释已经写的很清晰了哈,主要注意3点:

1)返回值:不要返回值直接void;需要返回值用AsyncResult或者CompletableFuture

2)可自定义执行器并指定例如:@Async("otherExecutor")

3)@Async  必须不同类间调用: A类--》B类.C方法()(@Async注释在B类/方法中),如果在同一个类中调用,会变同步执行,例如:A类.B()-->A类.@Async C(),原因是:底层实现是代理对注解扫描实现的,B方法上没有注解,没有生成相应的代理类。(当然把@Async加到类上也能解决但所有方法都异步了,一般不这么用!)

2.2 @EnableAsync

老规矩咱们直接看类注释:

//开启spring异步执行器,类似xml中的task标签配置,需要联合@Configuration注解一起使用
Enables Spring\'s asynchronous method execution capability, similar to functionality found in Spring\'s <task:*> XML namespace.
To be used together with @Configuration classes as follows, enabling annotation-driven async processing for an entire Spring application context:
 @Configuration
 @EnableAsync
 public class AppConfig {

 }
MyAsyncBean is a user-defined type with one or more methods annotated with either Spring\'s @Async annotation, the EJB 3.1 @javax.ejb.Asynchronous annotation, or any custom annotation specified via the annotation() attribute. The aspect is added transparently for any registered bean, for instance via this configuration:
 @Configuration
 public class AnotherAppConfig {

     @Bean
     public MyAsyncBean asyncBean() {
         return new MyAsyncBean();
     }
 }

//默认情况下spring会先搜索TaskExecutor类型的bean或者名字为taskExecutor的Executor类型的bean,都不存在使用SimpleAsyncTaskExecutor执行器
By default, Spring will be searching for an associated thread pool definition: either a unique TaskExecutor bean in the context, or an Executor bean named "taskExecutor" otherwise. If neither of the two is resolvable, a SimpleAsyncTaskExecutor will be used to process async method invocations. Besides, annotated methods having a void return type cannot transmit any exception back to the caller. By default, such uncaught exceptions are only logged.
To customize all this, implement AsyncConfigurer and provide:
your own Executor through the getAsyncExecutor() method, and your own AsyncUncaughtExceptionHandler through the getAsyncUncaughtExceptionHandler() method.//可实现AsyncConfigurer接口复写getAsyncExecutor获取异步执行器,getAsyncUncaughtExceptionHandler获取异步未捕获异常处理器
 @Configuration
 @EnableAsync
 public class AppConfig implements AsyncConfigurer {

     @Override
     public Executor getAsyncExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(7);
         executor.setMaxPoolSize(42);
         executor.setQueueCapacity(11);
         executor.setThreadNamePrefix("MyExecutor-");
         executor.initialize();
         return executor;
     }

     @Override
     public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return MyAsyncUncaughtExceptionHandler();
     }
 }
If only one item needs to be customized, null can be returned to keep the default settings. Consider also extending from AsyncConfigurerSupport when possible.
Note: In the above example the ThreadPoolTaskExecutor is not a fully managed Spring bean. Add the @Bean annotation to the getAsyncExecutor() method if you want a fully managed bean. In such circumstances it is no longer necessary to manually call the executor.initialize() method as this will be invoked automatically when the bean is initialized.
For reference, the example above can be compared to the following Spring XML configuration:
 <beans>

     <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/>

     <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/>

     <bean id="asyncBean" class="com.foo.MyAsyncBean"/>

     <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/>

 </beans>
 //注解类和xml基本一致,但是使用注解类还可以自定义线程名前缀(上面的AppConfig-》getAsyncExecutor-》setThreadNamePrefix
The above XML-based and JavaConfig-based examples are equivalent except for the setting of the thread name prefix of the Executor; this is because the <task:executor> element does not expose such an attribute. This demonstrates how the JavaConfig-based approach allows for maximum configurability through direct access to actual componentry.
The mode() attribute controls how advice is applied: If the mode is AdviceMode.PROXY (the default), then the other attributes control the behavior of the proxying. Please note that proxy mode allows for interception of calls through the proxy only; local calls within the same class cannot get intercepted that way.//这里就说明了@Async必须在不同方法中调用,即第一部分注意的第三点。
Note that if the mode() is set to AdviceMode.ASPECTJ, then the value of the proxyTargetClass() attribute will be ignored. Note also that in this case the spring-aspects module JAR must be present on the classpath, with compile-time weaving or load-time weaving applying the aspect to the affected classes. There is no proxy involved in such a scenario; local calls will be intercepted as well.//当然也可以用Aspect模式织入(需要引入spring-aspects模块需要的jar)

下面是源码:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

/**该属性用来支持用户自定义异步注解,默认扫描spring的@Async和EJB3.1的@code @javax.ejb.Asynchronous
* Indicate the \'async\' annotation type to be detected at either class
* or method level.
* <p>By default, both Spring\'s @{@link Async} annotation and the EJB 3.1
* {@code @javax.ejb.Asynchronous} annotation will be detected.
* <p>This attribute exists so that developers can provide their own
* custom annotation type to indicate that a method (or all methods of
* a given class) should be invoked asynchronously.
*/
Class<? extends Annotation> annotation() default Annotation.class;

/**标明是否需要创建CGLIB子类代理,AdviceMode=PROXY时才适用。注意设置为true时,其它spring管理的bean也会升级到CGLIB子类代理
* Indicate whether subclass-based (CGLIB) proxies are to be created as opposed
* to standard Java interface-based proxies.
* <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>.
* <p>The default is {@code false}.
* <p>Note that setting this attribute to {@code true} will affect <em>all</em>
* Spring-managed beans requiring proxying, not just those marked with {@code @Async}.
* For example, other beans marked with Spring\'s {@code @Transactional} annotation
* will be upgraded to subclass proxying at the same time. This approach has no
* negative impact in practice unless one is explicitly expecting one type of proxy
* vs. another &mdash; for example, in tests.
*/
boolean proxyTargetClass() default false;

/**标明异步通知将会如何实现,默认PROXY,如需支持同一个类中非异步方法调用另一个异步方法,需要设置为ASPECTJ
* Indicate how async advice should be applied.
* <p><b>The default is {@link AdviceMode#PROXY}.</b>
* Please note that proxy mode allows for interception of calls through the proxy
* only. Local calls within the same class cannot get intercepted that way; an
* {@link Async} annotation on such a method within a local call will be ignored
* since Spring\'s interceptor does not even kick in for such a runtime scenario.
* For a more advanced mode of interception, consider switching this to
* {@link AdviceMode#ASPECTJ}.
*/
AdviceMode mode() default AdviceMode.PROXY;

/**标明异步注解bean处理器应该遵循的执行顺序,默认最低的优先级(Integer.MAX_VALUE,值越小优先级越高)
* Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
* should be applied.
* <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
* after all other post-processors, so that it can add an advisor to
* existing proxies rather than double-proxy.
*/
int order() default Ordered.LOWEST_PRECEDENCE;

}

执行流程:

如上图,核心注解就是@Import(AsyncConfigurationSelector.class),一看就是套路ImportSelector接口的selectImports()方法,源码如下:

 1 /**查询器:基于@EanableAsync中定义的模式AdviceMode加在@Configuration标记的类上,确定抽象异步配置类的实现类
 2  * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based
 3  * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class.
 4  *
 5  * @author Chris Beams
 6  * @since 3.1
 7  * @see EnableAsync
 8  * @see ProxyAsyncConfiguration
 9  */
10 public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
11 
12     private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
13             "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
14 
15     /**
16      * {@inheritDoc}
17      * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for
18      * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively
19      */
20     @Override
21     public String[] selectImports(AdviceMode adviceMode) {
22         switch (adviceMode) {
23             case PROXY://如果配置的PROXY,使用ProxyAsyncConfiguration
24                 return new String[] { ProxyAsyncConfiguration.class.getName() };
25             case ASPECTJ://如果配置的ASPECTJ,使用ProxyAsyncConfiguration
26                 return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
27             default:
28                 return null;
29         }
30     }
31 
32 }

 

我们就选一个类ProxyAsyncConfiguration(JDK接口代理)看一下具体实现:

 1 /**
 2  * {@code @Configuration} class that registers the Spring infrastructure beans necessary
 3  * to enable proxy-based asynchronous method execution.
 4  *
 5  * @author Chris Beams
 6  * @author Stephane Nicoll
 7  * @since 3.1
 8  * @see EnableAsync
 9  * @see AsyncConfigurationSelector
10  */
11 @Configuration
12 @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
13 public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
14 
15     @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
16     @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
17     public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
18         Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
19         AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();//新建一个异步注解bean后处理器
20         Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
21         //如果@EnableAsync中用户自定义了annotation属性,即异步注解类型,那么设置           
       if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { 22 bpp.setAsyncAnnotationType(customAsyncAnnotation); 23 } 24 if (this.executor != null) {//Executor:设置线程任务执行器 25 bpp.setExecutor(this.executor); 26 } 27 if (this.exceptionHandler != null) {//AsyncUncaughtExceptionHandler:设置异常处理器 28 bpp.setExceptionHandler(this.exceptionHandler); 29 } 30 bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));//设置是否升级到CGLIB子类代理,默认不开启 31 bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));//设置执行优先级,默认最后执行 32 return bpp; 33 } 34 35 }

如上图,ProxyAsyncConfiguration就两点:

1.就是继承了AbstractAsyncConfiguration

2.定义了一个bean:AsyncAnnotationBeanPostProcessor

2.AbstractAsyncConfiguration源码:

 1 /**
 2  * Abstract base {@code Configuration} class providing common structure for enabling
 3  * Spring\'s asynchronous method execution capability.
 4  * 抽象异步配置类,封装了通用结构,用以支持spring的异步方法执行能力
 5  * @author Chris Beams
 6  * @author Stephane Nicoll
 7  * @since 3.1
 8  * @see EnableAsync
 9  */
10 @Configuration
11 public abstract class AbstractAsyncConfiguration implements ImportAware {
12 
13     protected AnnotationAttributes enableAsync;//enableAsync的注解属性
14 
15     protected Executor executor;//Doug Lea老李头设计的线程任务执行器
16 
17     protected AsyncUncaughtExceptionHandler exceptionHandler;//异常处理器
18 
19 
20     @Override
21     public void setImportMetadata(AnnotationMetadata importMetadata) {
22         this.enableAsync = AnnotationAttributes.fromMap(
23                 importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
24         if (this.enableAsync == null) {
25             throw new IllegalArgumentException(
26                     "@EnableAsync is not present on importing class " + importMetadata.getClassName());
27         }
28     }
29 
30     /**
31      * Collect any {@link AsyncConfigurer} beans through autowiring.
32      */
33     @Autowired(required = false)
34     void setConfigurers(Collection<AsyncConfigurer> configurers) {
35         if (CollectionUtils.isEmpty(configurers)) {
36             return;
37         }
38         if (configurers.size() > 1) {
39             throw new IllegalStateException("Only one AsyncConfigurer may exist");
40         }
41         AsyncConfigurer configurer = configurers.iterator().next();
42         this.executor = configurer.getAsyncExecutor();
43         this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
44     }
45 
46 }

很清晰哈,

属性:

1)注解属性

2)异步任务执行器

3)异常处理器

方法:

1)setImportMetadata 设置注解属性,即属性1

2)setConfigurers 设置异步任务执行器和异常处理器,即属性2,3

2.AsyncAnnotationBeanPostProcessor这个Bean,类图如下:

后面详细分析AOP详细过程。

2.3.AOP-Advisor切面初始化:(AsyncAnnotationBeanPostProcessor -》setBeanFactory())

AsyncAnnotationBeanPostProcessor这个类的Bean 初始化时 : BeanFactoryAware接口setBeanFactory方法中,对AsyncAnnotationAdvisor异步注解切面进行了构造。

 1 @Override
 2     public void setBeanFactory(BeanFactory beanFactory) {
 3         super.setBeanFactory(beanFactory);
 4 
 5         AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
 6         if (this.asyncAnnotationType != null) {
 7             advisor.setAsyncAnnotationType(this.asyncAnnotationType);
 8         }
 9         advisor.setBeanFactory(beanFactory);
10         this.advisor = advisor;
11     }

AsyncAnnotationAdvisor的类图如下:

2.4.AOP-生成代理类AopProxy(AsyncAnnotationBeanPostProcessor -》postProcessAfterInitialization())

具体的后置处理:AsyncAnnotationBeanPostProcessor的后置bean处理是通过其父类AbstractAdvisingBeanPostProcessor来实现的,

该类实现了BeanPostProcessor接口,复写postProcessAfterInitialization方法如下图所示:

 1 @Override
 2     public Object postProcessAfterInitialization(Object bean, String beanName) {
 3         if (bean instanceof AopInfrastructureBean) {
 4             // Ignore AOP infrastructure such as scoped proxies.
 5             return bean;
 6         }
 7         //把Advisor添加进bean  ProxyFactory-》AdvisedSupport-》Advised
 8         if (bean instanceof Advised) {
 9             Advised advised = (Advised) bean;
10             if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
11                 // Add our local Advisor to the existing proxy\'s Advisor chain...
12                 if (this.beforeExistingAdvisors) {
13                     advised.addAdvisor(0, this.advisor);
14                 }
15                 else {
16                     advised.addAdvisor(this.advisor);
17                 }
18                 return bean;
19             }
20         }
21         //构造ProxyFactory代理工厂,添加代理的接口,设置切面,最后返回代理类:AopProxy
22         if (isEligible(bean, beanName)) {
23             ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
24             if (!proxyFactory.isProxyTargetClass()) {
25                 evaluateProxyInterfaces(bean.getClass(), proxyFactory);
26             }
27             proxyFactory.addAdvisor(this.advisor);
28             customizeProxyFactory(proxyFactory);
29             return proxyFactory.getProxy(getProxyClassLoader());
30         }
31 
32         // No async proxy needed.
33         return bean;
34     }

isEligible用于判断这个类或者这个类中的某个方法是否含有注解,AsyncAnnotationAdvisor 实现了PointcutAdvisor接口,满足条件2下图:

19   public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
20         if (advisor instanceof IntroductionAdvisor) {
21             return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
22         }//满足第二分支PointcutAdvisor
23         else if (advisor instanceof PointcutAdvisor) {
24             PointcutAdvisor pca = (PointcutAdvisor) advisor;
25             return canApply(pca.getPointcut(), targetClass, hasIntroductions);
26         }
27         else {
28             // It doesn\'t have a pointcut so we assume it applies.
29             return trueSpring使用@Async注解

Spring使用@Async注解,多线程

spring async异步线程任务简例

spring async异步线程任务简例

Spring异步任务处理,@Async的配置和使用

Spring异步核心@Async注解的前世今生