java 使用Queue来队列执行异步任务
Posted -renyu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 使用Queue来队列执行异步任务相关的知识,希望对你有一定的参考价值。
先创建一个总的Handler(队列统一处理接口),名字就叫做 QueueTaskHandler
public interface QueueTaskHandler { void processData(); }
然后写一个队列服务类,就不多做说明了,我的注释已经写的很清楚了
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @Component public class QueueGenerationService{ // 日志监控 private static final Logger log = LoggerFactory.getLogger(QueueGenerationService.class); // 根据业务与服务器性能自行配置 这里我配置的是最多50000个任务 // LinkedBlockingQueue构造的时候若没有指定大小,则默认大小为Integer.MAX_VALUE private final LinkedBlockingQueue<QueueTaskHandler> tasks = new LinkedBlockingQueue<QueueTaskHandler>(50000); // 类似于一个线程总管 保证所有的任务都在队列之中 private ExecutorService service = Executors.newSingleThreadExecutor(); // 检查服务是否运行 private volatile boolean running = true; //线程状态 private Future<?> serviceThreadStatus = null; @PostConstruct public void init() { serviceThreadStatus = service.submit(new Thread(new Runnable() { @Override public void run() { while (running) { try { //开始一个任务 QueueTaskHandler task = tasks.take(); try { task.processData(); } catch (Exception e) { log.error("任务处理发生错误", e); } } catch (InterruptedException e) { log.error("服务停止,退出", e); running = false; } } } }, "save data thread")); } public boolean addData(QueueTaskHandler dataHandler) { if (!running) { log.warn("service is stop"); return false; } //offer 队列已经满了,无法再加入的情况下 boolean success = tasks.offer(dataHandler); if (!success) { log.warn("添加任务到队列失败"); } return success; } public boolean checkServiceRun() { return running && !service.isShutdown() && !serviceThreadStatus.isDone(); } public void activeService() { running = true; if (service.isShutdown()) { service = Executors.newSingleThreadExecutor(); init(); log.info("线程池关闭,重新初始化线程池及任务"); } if (serviceThreadStatus.isDone()) { init(); log.info("线程池任务结束,重新初始化任务"); } } @PreDestroy public void destory() { running = false; service.shutdownNow(); } }
接下来就可以开始写你的业务Handler了
public class TestServiceHandler implements QueueTaskHandler { // ******* start 这一段并不是必要的,这是示范一个传值的方式 private String name; private Integer age; public TestServiceHandler(String name) { this.name = name; } public TestServiceHandler(Integer age) { this.age = age; } public TestServiceHandler(String name, Integer age) { this.name = name; this.age = age; } // ****** end // 这里也就是我们实现QueueTaskHandler的处理接口 @Override public void processData() { // 可以去做你想做的业务了 // 这里需要引用spring的service的话,我写了一个工具类,下面会贴出来 // ItestService testService = SpringUtils.getBean(ItestService.class); System.out.println("name > " + name + "," + "age > " + age); } }
那么我们来在service中添加一个任务
// 这里注入队列服务
@Autowired private QueueGenerationService queueGenerationService;
// 在方法中调用与传参的方式
queueGenerationService.addData(new TestServiceHandler("小明",5));
整个过程就结束了,然后在你的业务Handler中如果需要使用其他的bean比如service,那么请试试我写的这个工具类
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class SpringUtils implements ApplicationContextAware { private static ApplicationContext applicationContext; /** * @return * @Description 获取applicationContext */ public static ApplicationContext getApplicationContext() { return applicationContext; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (SpringUtils.applicationContext == null) { SpringUtils.applicationContext = applicationContext; } } /** * @param name * @return * @Description 通过name获取 Bean. */ public static Object getBean(String name) { return getApplicationContext().getBean(name); } /** * @param clazz * @return * @Description 通过class获取Bean. */ public static <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz); } /** * @param name * @param clazz * @return * @Description 通过name, 以及Clazz返回指定的Bean */ public static <T> T getBean(String name, Class<T> clazz) { return getApplicationContext().getBean(name, clazz); } }
以上是关于java 使用Queue来队列执行异步任务的主要内容,如果未能解决你的问题,请参考以下文章
异步消息队列和处理,如 django 中的 Amazon Simple Queue 服务
同步,异步,串行队列,并发队列,全局队列,主队列等概念的总结