java 使用Queue来队列执行异步任务

Posted -renyu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 使用Queue来队列执行异步任务相关的知识,希望对你有一定的参考价值。

先创建一个总的Handler(队列统一处理接口),名字就叫做 QueueTaskHandler

public interface QueueTaskHandler {

    void processData();
}

然后写一个队列服务类,就不多做说明了,我的注释已经写的很清楚了

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

@Component
public class QueueGenerationService{

    // 日志监控
    private static final Logger log = LoggerFactory.getLogger(QueueGenerationService.class);
    // 根据业务与服务器性能自行配置 这里我配置的是最多50000个任务
    // LinkedBlockingQueue构造的时候若没有指定大小,则默认大小为Integer.MAX_VALUE
    private final LinkedBlockingQueue<QueueTaskHandler> tasks = new LinkedBlockingQueue<QueueTaskHandler>(50000);
    // 类似于一个线程总管 保证所有的任务都在队列之中
    private ExecutorService service = Executors.newSingleThreadExecutor();
    // 检查服务是否运行
    private volatile boolean running = true;
    //线程状态
    private Future<?> serviceThreadStatus = null;

    @PostConstruct
    public void init() {
    serviceThreadStatus = service.submit(new Thread(new Runnable() {
        @Override
        public void run() {
        while (running) {
            try {
            //开始一个任务
            QueueTaskHandler task = tasks.take();
            try {
                task.processData();
            } catch (Exception e) {
                log.error("任务处理发生错误", e);
            }
            } catch (InterruptedException e) {
            log.error("服务停止,退出", e);
            running = false;
            }
        }
        }
    }, "save data thread"));
    }

    public boolean addData(QueueTaskHandler dataHandler) {
    if (!running) {
        log.warn("service is stop");
        return false;
    }
    //offer 队列已经满了,无法再加入的情况下
    boolean success = tasks.offer(dataHandler);
    if (!success) {
        log.warn("添加任务到队列失败");
    }
    return success;
    }

    public boolean checkServiceRun() {
    return running && !service.isShutdown() && !serviceThreadStatus.isDone();
    }

    public void activeService() {
    running = true;
    if (service.isShutdown()) {
        service = Executors.newSingleThreadExecutor();
        init();
        log.info("线程池关闭,重新初始化线程池及任务");
    }
    if (serviceThreadStatus.isDone()) {
        init();
        log.info("线程池任务结束,重新初始化任务");
    }
    }

    @PreDestroy
    public void destory() {
    running = false;
    service.shutdownNow();
    }
}

接下来就可以开始写你的业务Handler了

public class TestServiceHandler implements QueueTaskHandler {

    // ******* start 这一段并不是必要的,这是示范一个传值的方式
    private String name;

    private Integer age;

    public TestServiceHandler(String name) {
    this.name = name;
    }

    public TestServiceHandler(Integer age) {
    this.age = age;
    }

    public TestServiceHandler(String name, Integer age) {
    this.name = name;
    this.age = age;
    }

    // ****** end

    // 这里也就是我们实现QueueTaskHandler的处理接口
    @Override
    public void processData() {
    // 可以去做你想做的业务了
    // 这里需要引用spring的service的话,我写了一个工具类,下面会贴出来
    // ItestService testService = SpringUtils.getBean(ItestService.class);
    System.out.println("name > " + name + "," + "age > " + age);
    }

}

那么我们来在service中添加一个任务

    // 这里注入队列服务
@Autowired
private QueueGenerationService queueGenerationService;

  // 在方法中调用与传参的方式
  queueGenerationService.addData(new TestServiceHandler("小明",5));
 

整个过程就结束了,然后在你的业务Handler中如果需要使用其他的bean比如service,那么请试试我写的这个工具类

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

@Component
public class SpringUtils implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    /**
     * @return
     * @Description 获取applicationContext
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (SpringUtils.applicationContext == null) {
            SpringUtils.applicationContext = applicationContext;
        }
    }

    /**
     * @param name
     * @return
     * @Description 通过name获取 Bean.
     */
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    /**
     * @param clazz
     * @return
     * @Description 通过class获取Bean.
     */
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    /**
     * @param name
     * @param clazz
     * @return
     * @Description 通过name, 以及Clazz返回指定的Bean
     */
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

}

 

以上是关于java 使用Queue来队列执行异步任务的主要内容,如果未能解决你的问题,请参考以下文章

异步消息队列和处理,如 django 中的 Amazon Simple Queue 服务

使用Supervisor来管理你的Laravel队列

多线程

同步,异步,串行队列,并发队列,全局队列,主队列等概念的总结

我应该在 Fragment 中的啥生命周期状态下执行异步任务?

GCD