JAVA多线程编程之异步

Posted l_learning

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA多线程编程之异步相关的知识,希望对你有一定的参考价值。

日常开发中我们在一个接口中需要处理多个任务,通常都是串行的,这样导致接口的响应时间是每个任务的执行时间的总和。为了缩短响应时间,通常会使用异步处理多任务。

需求举例:查询书籍基本信息,书籍详细信息,作者信息并将结果数据返回。
假设查询书籍基本信息花费500毫秒,查询书籍详细信息花费500毫秒,查询作者信息花费500毫秒,共计1500毫秒,使用异步处理时间一般都是远小于1500毫秒的。

下面使用异步调用方式优化接口

1、异步任务类

实现 Callable 接口,用来处理带返回结果的任务。taskId 用来区别返回结果集数据

package com.example.demo.task;

import java.util.concurrent.Callable;

/**
 * 异步任务
 * @param <T>
 */
public class AsynTaskCallable<T> implements Callable<T>

    private String taskId;

    private Callable<T> task;

    public AsynTaskCallable(String taskId, Callable<T> task) 
        this.taskId = taskId;
        this.task = task;
    

    @Override
    public T call() throws Exception 
        T callResult = task.call();
        TaskResult result = new TaskResult();
        result.setTaskId(taskId);
        result.setData(callResult);
        return (T) result;
    


2、异步任务调用类

用来调用异步任务辅助类,completionService 用来指定线程池执行异步任务,tasks 为带返回结果的任务,可以实现多场景复用,减少重复编写相似的代码。

package com.example.demo.task;

import com.sun.istack.internal.NotNull;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 异步任务调用
 */
public class AsynTaskHelper<T> 

    /**
     * 使用指定线程池执行异步任务
     */
    private CompletionService<TaskResult<T>> completionService = null;

    /**
     * 任务集合
     */
    private List<Callable> tasks = null;

    /**
     * 设置线程池
     * @param executorService 线程池
     * @return
     */
    public AsynTaskHelper setExecutorService(ExecutorService executorService)
        completionService = new ExecutorCompletionService(executorService);
        return this;
    

    /**
     * 添加任务,返回结果
     * @param taskId
     * @param task
     * @return
     */
    public AsynTaskHelper addTask(String taskId, Callable<T> task) 
        AsynTaskCallable callProxy = new AsynTaskCallable(taskId, task);
        if(null == tasks || tasks.isEmpty())
            tasks = new ArrayList<>();
        
        tasks.add(callProxy);
        return this;
    

    /**
     * 提交任务
     * @return
     */
    public AsynTaskHelper submit()
        if(null != tasks && !tasks.isEmpty())
            for (Callable callResult : tasks) 
                completionService.submit(callResult);
            
        
        return this;
    

    /**
     * 获取返回结果
     * @return Map<K, V> K为任务Id
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public Map<String, T> getResult() throws ExecutionException, InterruptedException 
        return getResult(2, TimeUnit.SECONDS);
    

    /**
     * 获取返回结果
     * @param timeout
     * @param unit
     * @return Map<K, V> K为任务Id
     * @throws InterruptedException
     * @throws ExecutionException
     */
    public Map<String, T> getResult(long timeout,@NotNull TimeUnit unit) throws InterruptedException, ExecutionException 
        Map<String, T> result = new HashMap<>();
        if(null == tasks)
            return result;
        
        for (int i = 0; i < tasks.size(); i++) 
            Future<TaskResult<T>> poll = completionService.poll(timeout, unit);
            if(null != poll)
                TaskResult<T> task = poll.get();
                if(null != poll && null != task)
                    result.put(task.getTaskId(), task.getData());
                
            
        
        return result;
    


3、任务结果类

用来接收异步任务返回结果数据

package com.example.demo.task;

/**
 * 任务结果数据
 * @param <T>
 */
public class TaskResult<T> 

    /**
     * 任务Id
     */
    private String taskId;

    /**
     * 返回数据
     */
    private T data;

    public String getTaskId() 
        return taskId;
    

    public void setTaskId(String taskId) 
        this.taskId = taskId;
    

    public T getData() 
        return data;
    

    public void setData(T data) 
        this.data = data;
    

    @Override
    public String toString() 
        return "TaskResult" +
                "taskId='" + taskId + '\\'' +
                ", data=" + data +
                '';
    

4、异步调用

指定线程池执行任务

ExecutorService executor = Executors.newFixedThreadPool(500);

正常业务操作

//查询Book信息
Callable<Book> bookCall = () -> bookService.get(bookId);
//查询BookDetail信息
Callable<BookDetail> bookDetailCall = () -> bookDetailService.get(bookId);
//查询Author信息
Callable<Author> auhtorCall = () -> authorService.get(bookId);

创建异步任务

//创建异步任务
AsynTaskHelper taskCallors = new AsynTaskHelper()
         .setExecutorService(executor)
         .addTask("book", bookCall)
         .addTask("bookDetail", bookDetailCall)
         .addTask("author", auhtorCall)
         .submit();

获取结果,因为任务是异步的,可能第一时间拿不到结果,这里使用自旋的方式获取结果,如果3秒后还是没有结果则直接返回。

do
   Map map = taskCallors.getResult();
    book = (Book) map.get("book");
    bookDetail = (BookDetail) map.get("bookDetail");
    author = (Author) map.get("author");
    runTime = System.currentTimeMillis() - beginTime;
 while ((null == book || null == bookDetail || null == author) && runTime < 3000);

完整示例调用代码

package com.example.demo.controller;

import com.example.demo.domain.Author;
import com.example.demo.domain.Book;
import com.example.demo.domain.BookDetail;
import com.example.demo.service.AuthorService;
import com.example.demo.service.BookDetailService;
import com.example.demo.service.BookService;
import com.example.demo.task.AsynTaskHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;

@RestController
public class BookController 

    @Autowired
    private BookService bookService;

    @Autowired
    private BookDetailService bookDetailService;

    @Autowired
    private AuthorService authorService;

    private ExecutorService executor = Executors.newFixedThreadPool(500);

    @GetMapping("books5/bookId")
    public Map find5(@PathVariable String bookId) throws ExecutionException, InterruptedException 
        Map<String, Object> result = new HashMap<>();

        Long beginTime = System.currentTimeMillis();
        System.out.println("开始并行查询,开始时间:" + beginTime);
        //查询Book信息
        Callable<Book> bookCall = () -> bookService.get(bookId);
        //查询BookDetail信息
        Callable<BookDetail> bookDetailCall = () -> bookDetailService.get(bookId);
        //查询Author信息
        Callable<Author> auhtorCall = () -> authorService.get(bookId);

        //创建异步任务
        AsynTaskHelper taskCallors = new AsynTaskHelper()
                .setExecutorService(executor)
                .addTask("book", bookCall)
                .addTask("bookDetail", bookDetailCall)
                .addTask("author", auhtorCall)
                .submit();

        Book book = null;
        BookDetail bookDetail = null;
        Author author = null;
        
        long runTime;
        do
            Map map = taskCallors.getResult();
            book = (Book) map.get("book");
            bookDetail = (BookDetail) map.get("bookDetail");
            author = (Author) map.get("author");
            runTime = System.currentTimeMillis() - beginTime;
         while ((null == book || null == bookDetail || null == author) && runTime < 3000);

        System.out.println("结束并行查询,总耗时:" + (System.currentTimeMillis() - beginTime));
        result.put("book", book);
        result.put("detail", bookDetail);
        result.put("author", author);

        return result;
    


通过 AsynTaskHelper 调用异步任务能缩短接口响应时间,进而提升系统并发能力,后续有类似的使用场景也支持复用,减少重复编码工作。

以上是关于JAVA多线程编程之异步的主要内容,如果未能解决你的问题,请参考以下文章

异步编程之 EventLoop

异步编程之 EventLoop

java同步异步和多线程编程

JAVA基础知识之网络编程——-网络基础(多线程下载,get,post)

java网络编程-面试题

多线程之异步编程: 经典和最新的异步编程模型,async与await