Spring 集成异步任务线程池
Posted 在奋斗的大道
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring 集成异步任务线程池相关的知识,希望对你有一定的参考价值。
功能概述:
Spring通过实例化任务执行器(TaskExecutor)来构建一个任务线程池。
而实际开发中有些任务一般是非阻碍的,即异步的,所以要在配置类中通过@EnableAsync
开启对异步任务的支持,并通过在实例Bean的方法上使用@Async
注解来声明其是一个异步任务。
异步任务线程池实现整体步骤:
1、编写一个任务线程类,在spring 中通过实现org.springframework.scheduling.annotation.AsyncConfigurer接口,并覆写相关方法。
2、将任务线程类声明为配置对象通过@Configuration 标签实现,通过@EnableAsync 标签实现任务线程类对异步任务的支持。通过@ComponentScan 标签实现任务线程类实例化/组件化
eg:
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* solr 异步任务线程池配置对象
* @ClassName: ThreadPoolExecutorConfigure
* @Description:TODO(这里用一句话描述这个类的作用)
*/
@Configuration
@ComponentScan("com.zzg.async")
@EnableAsync //1 开启异步任务
public class ThreadPoolExecutorConfigure implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);// 核心线程数
executor.setMaxPoolSize(10);// 非核心线程数
executor.setQueueCapacity(200);// 队列大小
executor.setKeepAliveSeconds(60);// 非核心线程空闲时间(秒)
executor.setThreadNamePrefix("solrExecutor-");// 线程前缀
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 拒绝策略-丢弃
// 该方法就是这里的关键,用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。
executor.setWaitForTasksToCompleteOnShutdown(true);
// 该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
executor.setAwaitTerminationSeconds(60);
// 补全初始化方法
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
// TODO Auto-generated method stub
return null;
}
}
3、定义一个SolrAsyncCallback 类对象,通过@Component 标签实现类的实例化/组件化,在该实例对象的solrAddAsync(Long sid) 方法添加@Async 标签实现方法异步执行。
package com.zzg.composite.component;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import com.zzg.composite.service.ApacheSolrArchInfoService;
@Component
public class SolrAsyncCallback {
private static Logger logger = LoggerFactory.getLogger(SolrAsyncCallback.class);
@Autowired
@Qualifier("httpSolrServer")
private HttpSolrServer apacheSolrArchInfo;
@Autowired
private ApacheSolrArchInfoService apacheSolrArchInfoService;
/**
* 同步案卷索引数据删除
* @Title: solrDelete
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param: @param sid
* @param: @throws SolrServerException
* @param: @throws IOException
* @return: void
* @throws
*/
public void solrDelete(Long sid) throws SolrServerException, IOException{
String query = "sid:" + sid;
apacheSolrArchInfo.deleteByQuery(query);
apacheSolrArchInfo.commit();
}
/**
* 异步案卷索引数据新增
* @Title: solrAddAsync
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param: @param sid
* @param: @throws InterruptedException
* @return: void
* @throws
*/
@Async
public void solrAddAsync(Long sid) throws InterruptedException {
UpdateResponse response = null;
SolrInputDocument doc = new SolrInputDocument();
if (sid != null) {
Map<String, Object> map = apacheSolrArchInfoService.getIndexDataForArch(sid);
Iterator<Map.Entry<String, Object>> entries = map.entrySet().iterator();
while (entries.hasNext()) {
Map.Entry<String, Object> entry = entries.next();
System.out.println("Key = " + entry.getKey() + ", Value = " + entry.getValue());
doc.addField(entry.getKey(), entry.getValue());
}
if (!doc.isEmpty()) {
try {
System.out.println("doucument field:" + doc.toString());
response = apacheSolrArchInfo.add(doc);
apacheSolrArchInfo.commit();
} catch (SolrServerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("异步任务更新索引异常信息:{}", e.getMessage(), e);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
logger.error("异步任务更新索引异常信息:{}", e.getMessage(), e);
}
response.getStatus();
}
}
}
}
Spring 项目异步任务线程池启动提示的错误信息:
启动Spring项目时,提示如下错误信息:
java.lang.IllegalStateException: ThreadPoolTaskExecutor not initialized
at org.springframework.util.Assert.state(Assert.java:70)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.getThreadPoolExecutor(ThreadPoolTaskExecutor.java:260)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:318)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:280)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:130)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:673)
at com.zzg.component.SolrAsyncCallback$$EnhancerBySpringCGLIB$$8282a49c.solrAddAsync(<generated>)
at com.zzg.basic.service.XmlToClassServiceImpl.doPlatformDataImport(XmlToClassServiceImpl.java:490)
at com.zzg.job.ImportJob.execute(ImportJob.java:62)
at com.dangdang.ddframe.job.executor.type.SimpleJobExecutor.process(SimpleJobExecutor.java:41)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:206)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.process(AbstractElasticJobExecutor.java:171)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:150)
at com.dangdang.ddframe.job.executor.AbstractElasticJobExecutor.execute(AbstractElasticJobExecutor.java:122)
at com.dangdang.ddframe.job.lite.internal.schedule.LiteJob.execute(LiteJob.java:26)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
问题产生原因:
初始化的线程池对象,没有调用 executor.initialize(); 方法所致。
以上是关于Spring 集成异步任务线程池的主要内容,如果未能解决你的问题,请参考以下文章
spring异步线程任务Async,自定义配置线程池,Java
Spring Boot中有多个@Async异步任务时,记得做好线程池的隔离!
Spring Boot中有多个@Async异步任务时,记得做好线程池的隔离!