Spring 集成异步任务线程池

Posted 在奋斗的大道

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring 集成异步任务线程池相关的知识,希望对你有一定的参考价值。

功能概述:

Spring通过实例化任务执行器(TaskExecutor)来构建一个任务线程池。

而实际开发中有些任务一般是非阻碍的,即异步的,所以要在配置类中通过@EnableAsync开启对异步任务的支持,并通过在实例Bean的方法上使用@Async注解来声明其是一个异步任务。

异步任务线程池实现整体步骤:

1、编写一个任务线程类,在spring 中通过实现org.springframework.scheduling.annotation.AsyncConfigurer接口,并覆写相关方法。

2、将任务线程类声明为配置对象通过@Configuration 标签实现,通过@EnableAsync 标签实现任务线程类对异步任务的支持。通过@ComponentScan 标签实现任务线程类实例化/组件化

eg:



import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * solr 异步任务线程池配置对象
 * @ClassName:  ThreadPoolExecutorConfigure   
 * @Description:TODO(这里用一句话描述这个类的作用)   
 */
@Configuration
@ComponentScan("com.zzg.async")
@EnableAsync //1 开启异步任务
public class ThreadPoolExecutorConfigure implements AsyncConfigurer {

	@Override
	public Executor getAsyncExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(10);// 核心线程数
		executor.setMaxPoolSize(10);// 非核心线程数
		executor.setQueueCapacity(200);// 队列大小
		executor.setKeepAliveSeconds(60);// 非核心线程空闲时间(秒)
		executor.setThreadNamePrefix("solrExecutor-");// 线程前缀
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 拒绝策略-丢弃
		// 该方法就是这里的关键,用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。
		executor.setWaitForTasksToCompleteOnShutdown(true);
		// 该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
		executor.setAwaitTerminationSeconds(60);
		
		// 补全初始化方法
		executor.initialize();
		return executor;
	}

	@Override
	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		// TODO Auto-generated method stub
		return null;
	}

}

3、定义一个SolrAsyncCallback 类对象,通过@Component 标签实现类的实例化/组件化,在该实例对象的solrAddAsync(Long sid) 方法添加@Async 标签实现方法异步执行。

package com.zzg.composite.component;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import com.zzg.composite.service.ApacheSolrArchInfoService;

@Component
public class SolrAsyncCallback {
	private static Logger logger = LoggerFactory.getLogger(SolrAsyncCallback.class);
	
	@Autowired
	@Qualifier("httpSolrServer")
	private HttpSolrServer apacheSolrArchInfo;

	@Autowired
	private ApacheSolrArchInfoService apacheSolrArchInfoService;
	
	/**
	 * 同步案卷索引数据删除
	 * @Title: solrDelete   
	 * @Description: TODO(这里用一句话描述这个方法的作用)   
	 * @param: @param sid
	 * @param: @throws SolrServerException
	 * @param: @throws IOException      
	 * @return: void      
	 * @throws
	 */
	public void solrDelete(Long sid) throws SolrServerException, IOException{
		String query = "sid:" + sid;
		apacheSolrArchInfo.deleteByQuery(query);
		apacheSolrArchInfo.commit();
	}

	/**
	 * 异步案卷索引数据新增
	 * @Title: solrAddAsync   
	 * @Description: TODO(这里用一句话描述这个方法的作用)   
	 * @param: @param sid
	 * @param: @throws InterruptedException      
	 * @return: void      
	 * @throws
	 */
	@Async
	public void solrAddAsync(Long sid) throws InterruptedException {
		UpdateResponse response = null;

		SolrInputDocument doc = new SolrInputDocument();
		if (sid != null) {
			Map<String, Object> map = apacheSolrArchInfoService.getIndexDataForArch(sid);

			Iterator<Map.Entry<String, Object>> entries = map.entrySet().iterator();

			while (entries.hasNext()) {

				Map.Entry<String, Object> entry = entries.next();

				System.out.println("Key = " + entry.getKey() + ", Value = " + entry.getValue());
				doc.addField(entry.getKey(), entry.getValue());
			}

			if (!doc.isEmpty()) {
				try {
					System.out.println("doucument field:" + doc.toString());
					response = apacheSolrArchInfo.add(doc);
					apacheSolrArchInfo.commit();
				} catch (SolrServerException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
					logger.error("异步任务更新索引异常信息:{}", e.getMessage(), e);
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
					logger.error("异步任务更新索引异常信息:{}", e.getMessage(), e);
				}
				response.getStatus();

			}
		}
	}

}

Spring 项目异步任务线程池启动提示的错误信息:

启动Spring项目时,提示如下错误信息:

java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized
	at org.springframework.util.Assert.state(Assert.java:70)
	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.getThreadPoolExecutor(ThreadPoolTaskExecutor.java:260)
	at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:318)
	at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:280)
	at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:130)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673)
	at com.zzg.component.SolrAsyncCallback$$EnhancerBySpringCGLIB$$8282a49c.solrAddAsync(<generated>)
	at com.zzg.basic.service.XmlToClassServiceImpl.doPlatformDataImport(XmlToClassServiceImpl.java:490)
	at com.zzg.job.ImportJob.execute(ImportJob.java:62)
	at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:171)
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:150)
	at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:122)
	at com.dangdang.ddframe.job.lite.internal.schedule.LiteJob.execute(LiteJob.java:26)
	at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
	at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)


问题产生原因:

初始化的线程池对象,没有调用 executor.initialize(); 方法所致。

以上是关于Spring 集成异步任务线程池的主要内容,如果未能解决你的问题,请参考以下文章

spring异步线程任务Async,自定义配置线程池,Java

Spring Boot中有多个@Async异步任务时,记得做好线程池的隔离!

Spring Boot中有多个@Async异步任务时,记得做好线程池的隔离!

Spring Boot 自定义线程池使用@Async实现异步调用任务

Spring集成JavaMail并利用线程池发送邮件

#yyds干货盘点# springboot配置@Async异步任务的线程池