Spring异步核心@Async注解的前世今生

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring异步核心@Async注解的前世今生相关的知识,希望对你有一定的参考价值。

Spring异步核心@Async注解的前世今生


@Async使用演示

  • 开启异步支持@EnableAsync
@Configuration
@EnableAsync
public class AsyncConfig 

  • 在容器bean的方法上标注@Async注解
@Service
@Slf4j
public class HelloService 
  @Async
  public void hello()
      System.out.println("hello world");
      log.info("当前线程信息: ",Thread.currentThread().getName());
  

此时该方法执行的时候,就会采用异步方式执行了。

输出如下(当前线程名):

当前线程:SimpleAsyncTaskExecutor-1

可以很明显的发现,它使用的是线程池SimpleAsyncTaskExecutor,这也是Spring默认给我们提供的线程池(其实它不是一个真正的线程池,后面会有讲述)。下面原理部分讲解后,你就能知道怎么让它使用我们自定义的线程池了。


@Async注解使用细节

  • @Async注解一般用在方法上,如果用在类上,那么这个类所有的方法都是异步执行的;
  • @Async可以放在任何方法上,哪怕你是private的(若是同类调用,请务必注意注解失效的情况~~~)
  • 所使用的@Async注解方法的类对象应该是Spring容器管理的bean对象
  • @Async可以放在接口处(或者接口方法上)。但是只有使用的是JDK的动态代理时才有效,CGLIB会失效。因此建议:统一写在实现类的方法上
  • 需要注解@EnableAsync来开启异步注解的支持
  • 若你希望得到异步调用的返回值,请你的返回值用Futrue变量包装起来

分析

@Async注解的实现和动态代理有关,说明是和spring aop相关,aop核心是创建代理,然后在被代理对象方法执行前,决定选择哪些拦截器先执行。那么可以想到,如果让我们来实现@Async注解的功能,那么大致思路如下:

  • 往容器中放入一个BeanPostProcessor,该后置处理器负责对容器中类上或者方法上标注了@Async注解的bean进行代理。
  • 被代理对象中会放入一个Advisor,该增强器由AsyncInterceptor和AsyncPointcut组成
  • AsyncPointcut会在被代理对象方法调用时,判断目前方法上是否存储@Async注解,然后决定是否将AsyncInterceptor加入当前方法执行的拦截器链中。
  • AsyncInterceptor负责拦截目标方法执行,并将目标方法通过线程池提交执行。

其实Spring设计的思路就是这样的,有了上面的思路,再去看源码,会有一种了然于心的感觉。


源码分析

@EnableAsync

@EnableXXX这种启动某个功能的开关模型,相信大家在使用spring过程中,已经见过多次了,其实这些注解背后的原理都是一个套路:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync 

	 //默认情况下,要开启异步操作,要在相应的方法或者类上加上@Async注解或者EJB3.1规范下的@Asynchronous注解。
	 //这个属性使得开发人员可以自己设置开启异步操作的注解(可谓非常的人性化了,但是大多情况下用Spring的就足够了)
	Class<? extends Annotation> annotation() default Annotation.class;
	// true表示启用CGLIB代理
	boolean proxyTargetClass() default false;
	
	// 代理方式:默认是PROXY  采用Spring的动态代理(含JDK动态代理和CGLIB)
	// 若改为:AdviceMode.ASPECTJ表示使用AspectJ静态代理方式。
	// 它能够解决同类内方法调用不走代理对象的问题,但是一般情况下都不建议这么去做,不要修改这个参数值
	AdviceMode mode() default AdviceMode.PROXY;
	// 直接定义:它的执行顺序(因为可能有多个@EnableXXX)
	int order() default Ordered.LOWEST_PRECEDENCE;


@Target(ElementType.METHOD, ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async 

	// May be used to determine the target executor to be used when executing this method
	// 意思是这个value值是用来指定执行器的(写入执行器BeanName即可采用特定的执行器去执行此方法)
	String value() default "";


最重要的,还是上面的@Import注解导入的类:AsyncConfigurationSelector


AsyncConfigurationSelector

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> 

	// 用于支持AspectJ这种静态代理Mode
	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";

	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) 
		// 这里AdviceMode 进行不同的处理,从而向Spring容器注入了不同的Bean~~~
		switch (adviceMode) 
			// 大多数情况下都走这里,ProxyAsyncConfiguration会被注入到Bean容器里面~~~
			case PROXY:
				return new String[]  ProxyAsyncConfiguration.class.getName() ;
			case ASPECTJ:
				return new String[]  ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME ;
			default:
				return null;
		
	


AdviceModeImportSelector父类的这个抽象更加的重要。它的实现类至少有如下两个:

这个父类抽象得非常的好,它的作用:抽象实现支持了AdviceMode,并且支持通用的@EnableXXX模式。

//@since 3.1  它是一个`ImportSelector`
public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector 

	// 默认都叫mode
	public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";
	// 显然也允许子类覆盖此方法
	protected String getAdviceModeAttributeName() 
		return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
	
	
	// importingClassMetadata:注解的信息--就是@EnableAsync注解信息
	@Override
	public final String[] selectImports(AnnotationMetadata importingClassMetadata) 
		// 拿到AdviceModeImportSelector对应泛型类型--也就是注解类型--@Enablexxx系列注解
		Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
		Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
		
		//从传入的importingClassMetadata中拿到这里我们需要的@Enablexxx注解信息
		//然后提前注解中属性值,封装为AnnotationAttributes后返回
		AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
		if (attributes == null) 
			throw new IllegalArgumentException(String.format( "@%s is not present annType.getSimpleName(), importingClassMetadata.getClassName()));
		

		// 拿到AdviceMode,最终交给子类,让她自己去实现  决定导入哪个Bean吧
		AdviceMode adviceMode = attributes.getEnum(this.getAdviceModeAttributeName());
		//抽象方法,子类实现--子类返回字符串数组,每一个元素都是一个全类名
		//代表需要放入ioc中的bean对象
		String[] imports = selectImports(adviceMode);
		if (imports == null) 
			throw new IllegalArgumentException(String.format("Unknown AdviceMode: '%s'", adviceMode));
		
		return imports;
	

	// 子类去实现  具体导入哪个Bean
	@Nullable
	protected abstract String[] selectImports(AdviceMode adviceMode);


该抽象提供了支持AdviceMode的较为通用的实现,若我们自己想自定义,可以考虑实现此类。

由此可议看出,@EnableAsync最终是向容器内注入了ProxyAsyncConfiguration这个Bean。由名字可议看出,它是一个配置类。


ProxyAsyncConfiguration

// 它是一个配置类,角色为ROLE_INFRASTRUCTURE  框架自用的Bean类型
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration 

	// 它的作用就是诸如了一个AsyncAnnotationBeanPostProcessor,它是个BeanPostProcessor
	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() 
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		
		// customAsyncAnnotation:自定义的注解类型
		// AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation") 为拿到该注解该字段的默认值
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
	
		// 相当于如果你指定了AsyncAnnotationType,那就set进去吧
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) 
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		
		
		// 只有自定义了AsyncConfigurer的实现类,自定义了一个线程执行器,这里才会有值
		if (this.executor != null) 
			bpp.setExecutor(this.executor);
		
		// 同上,异步线程异常的处理器~~~~~
		if (this.exceptionHandler != null) 
			bpp.setExceptionHandler(this.exceptionHandler);
		
		
		// 这两个参数,就不多说了。
		// 可以看到,order属性值,最终决定的是BeanProcessor的执行顺序的
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	



// 它的父类:
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware 

	// 此注解@EnableAsync的元信息
	protected AnnotationAttributes enableAsync;
	// 异步线程池
	protected Executor executor;
	// 异步异常的处理器
	protected AsyncUncaughtExceptionHandler exceptionHandler;

    //该接口由ImportAware 提供,作用是告知是哪个注解上通过Import注解导入的该配置类
	@Override
	public void setImportMetadata(AnnotationMetadata importMetadata) 
		// 拿到@EnableAsync注解的元数据信息~~~
		this.enableAsync = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
		if (this.enableAsync == null) 
			throw new IllegalArgumentException("@EnableAsync is not present on importing class " + importMetadata.getClassName());
		
	

	/**
	 * Collect any @link AsyncConfigurer beans through autowiring.
	 */
	 // doc说得很明白。它会把所有的`AsyncConfigurer`的实现类都搜集进来,然后进行类似属性的合并
	 // 备注  虽然这里用的是Collection 但是AsyncConfigurer的实现类只允许有一个
	 //默认是没有提供AsyncConfigurer实现类的,这是留给用户自定义的接口
	@Autowired(required = false)
	void setConfigurers(Collection<AsyncConfigurer> configurers) 
      
		if (CollectionUtils.isEmpty(configurers)) 
			return;
		
      	//AsyncConfigurer用来配置线程池配置以及异常处理器,而且在Spring环境中最多只能有一个
      	//在这里我们知道了,如果想要自己去配置线程池,只需要实现AsyncConfigurer接口,并且不可以在Spring环境中有多个实现AsyncConfigurer的类。
		if (configurers.size() > 1) 
			throw new IllegalStateException("Only one AsyncConfigurer may exist");
		
		// 拿到唯一的AsyncConfigurer ,然后赋值~~~~   默认的请参照这个类:AsyncConfigurerSupport(它并不会被加入进Spring容器里)
		AsyncConfigurer configurer = configurers.iterator().next();
		this.executor = configurer.getAsyncExecutor();
		this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
	


从上可知,真正做文章的最终还是AsyncAnnotationBeanPostProcessor这个后置处理器,下面我们来重点看看它


AsyncAnnotationBeanPostProcessor

AsyncAnnotationBeanPostProcessor这个BeanPostBeanPostProcessor很显然会对带有能够引发异步操作的注解(比如@Async)的Bean进行处理

从该类的继承体系可以看出,大部分功能都是在抽象类里完成的,它不关乎于@Async,而是这一类技术都是这样子处理的。

首先,ProxyProcessorSupport这里就不用多说了,在讲解自动代理创建器的时候有说过。

Spring读源码系列之AOP–07—aop自动代理创建器(拿下AOP的最后一击)


AbstractAdvisingBeanPostProcessor

从这个名字也能看出来。它主要处理AdvisingBean,也就是处理Advisor和Bean的关系的

// 它继承自,ProxyProcessorSupport,说明它也拥有Aop的通用配置
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor 
    //当前后置处理器需要用到的增强器
	@Nullable
	protected Advisor advisor;
	//如果当前后置处理器拦截到了某个已经被代理的bean,那么需要将advisor插入到被代理bean已有增强器链中
	//那么是将当前advisor插入到已有增强器链中的那个位置呢?
	//如果下面这个属性为true,那么会插入已有增强链的头部
	protected boolean beforeExistingAdvisors = false;
	
	//缓存符合条件的bean---属于被advisor切入范围的bean
	private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);
  
	// 当遇到一个pre-object的时候,是否把该processor所持有得advisor放在现有的增强器们之前执行
	// 默认是false,会放在最后一个位置上的
	public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) 
		this.beforeExistingAdvisors = beforeExistingAdvisors;
	
	
	// 不处理
	@Override
	public Object postProcessBeforeInitialization(Object bean, String beanName) 
		return bean;
	

	// Bean已经实例化、初始化完成之后执行。
	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) 
		// 忽略AopInfrastructureBean的Bean,并且如果没有advisor也会忽略不处理~~~~~
		if (bean instanceof AopInfrastructureBean || this.advisor == null) 
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		
		
		// 如果这个Bean已经被代理过了(比如已经被AOP切中了),那本处就无需再重复创建代理了嘛
		// 直接向里面添加advisor就成了
		if (bean instanceof Advised) 
			Advised advised = (Advised) bean;
			// 注意此advised不能是已经被冻结了的。且源对象必须是Eligible合格的
			if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) 
				// 把自己持有的这个advisor放在首位(如果beforeExistingAdvisors=true)
				if (this.beforeExistingAdvisors) 
					advised.addAdvisor(0, this.advisor);
				
				// 否则就是尾部位置
				else 
					advised.addAdvisor(this.advisor);
				
				// 最终直接返回即可,因为已经没有必要再创建一次代理对象了
				return bean;
			
		

		// 如果这个Bean是符合advisor增强范围的---->这个时候是没有被代理过的
		if (isEligible(bean, beanName)) 
			// 以当前的配置,创建一个ProxyFactory 
			ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
			// 如果不是使用CGLIB常见代理,那就去分析出它所实现的接口们  然后放进ProxyFactory 里去
			if (!proxyFactory.isProxyTargetClass()) 
				evaluateProxyInterfaces(bean.getClass(), proxyFactory);
			
			
			// 切面就是当前持有得advisor
			proxyFactory.addAdvisor(this.advisor);
			// 留给子类,自己还可以对proxyFactory进行自定义~~~~~
			customizeProxyFactory(proxyFactory);
			// 最终返回这个代理对象~~~~~
			return proxyFactory.getProxy(getProxyClassLoader());
		

		// No async proxy needed.(相当于没有做任何的代理处理,返回原对象)
		return bean;
	
	
	// 检查这个Bean是否是合格的
	protected boolean isEligible(Object bean, String beanName) 
		return isEligible(bean.getClass());
	
	
	protected boolean isEligible(Class<?> targetClass) 
		// 如果已经被缓存着了,那肯定靠谱啊
		Boolean eligible = this.eligibleBeans.get(targetClass);
		if (eligible != null) 
			return eligible;
		
		// 如果没有切面(就相当于没有给配置增强器,那铁定是不合格的)
		if (this.advisor == null) 
			return false;
		
	
		// 这个重要了:看看这个advisor是否能够切入进targetClass这个类,能够切入进取的也是合格的
		//利用advisor内部提供的pointcut进行匹配判断
		eligible = AopUtils.canApply(this.advisor, targetClass);
		this.eligibleBeans.put(targetClass, eligible);
		return eligible;
	

	// 子类可以复写。比如`AbstractBeanFactoryAwareAdvisingPostProcessor`就复写了这个方法~~~
	protected ProxyFactory prepareProxyFactory(Object bean, String beanName) 
		ProxyFactory proxyFactory = new ProxyFactory();
		proxyFactory.copyFrom(this);
		proxyFactory.setTarget(bean);
		return proxyFactory;
	

	// 子类复写~
	protected void customizeProxyFactory(ProxyFactory proxyFactory) 
	

MethodValidationPostProcessor属于JSR-303校验方面的范畴,不是本文的内容,因此不会讲述


AbstractBeanFactoryAwareAdvisingPostProcessor

从名字可以看出,它相较于父类,就和BeanFactory有关了,也就是和Bean容器相关了。

public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor
		implements BeanFactoryAware 

	// Bean工厂
	@Nullable
	private ConfigurableListableBeanFactory beanFactory;

	// 如果这个Bean工厂不是ConfigurableListableBeanFactory ,那就set一个null
	// 我们的`DefaultListableBeanFactory`显然就是它的子类
	@Override
	public void setBeanFactory(BeanFactory beanFactory) 
		this.beanFactory = (beanFactory instanceof ConfigurableListableBeanFactory ?
				(ConfigurableListableBeanFactory) beanFactory : null);
	

	
	@Override
	protected ProxyFactory prepareProxyFactory(Object bean, String beanName) 
	

以上是关于Spring异步核心@Async注解的前世今生的主要内容,如果未能解决你的问题,请参考以下文章

标记接口,注解和注解处理器的前世今生

标记接口,注解和注解处理器的前世今生

#yyds干货盘点#你真的知道Spring注解驱动的前世今生吗?这篇文章让你豁然开朗!

springboot异步注解@Async

async&await的前世今生

async await的前世今生