高并发下的web异步处理方案

Posted kuailefangyuan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发下的web异步处理方案相关的知识,希望对你有一定的参考价值。

高并发下的web异步处理方案

一、问题介绍

​ 平时web开发时(使用的servlet或者基于servlet封装的SpringMVC框架),业务处理基本都是同步处理,即业务处理与web容器接收线程为同一线程,每一次Http请求都由一个线程从头到尾负责处理。

​ 如果一个请求业务处理涉及IO操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待IO操作完成。而IO操作是非常慢的,将导致线程并不能及时地释放回线程池以供后续使用。在高并发请求条件下,web容器线程池将很快耗尽,降低服务吞吐量。

二、解决方案

​ 针对上述问题,解决办法就是另起单独线程去处理业务请求,提前释放当前服务接收线程,避免当前接收线程阻塞等待得不到释放,以缓解高并发下的线程池资源紧张,提高服务吞吐量。

​ 异步处理过程:当收到一个http请求后,tomcat等中间件的主线程调用副线程来执行业务处理,当副线程执行完成业务处理后,主线程再返回结果,在副线程执行业务处理过程中,主线程会空闲出来以处理其他请求,以此提升服务器的吞吐量。

​ servlet以及SpringMVC针对该问题都有对应的异步处理解决方案,具体如下:

2.1、Servlet3.0的异步处理

​ 在Servlet 3.0中,我们可以从HttpServletRequest对象中获得AsyncContext异步处理上下文,Request和Response对象都可从中获取。AsyncContext可以从当前线程传给其他线程,并在新的线程中完成对请求的处理并返回结果给客户端,初始线程便可以还回给容器线程池以处理更多的请求。

@Slf4j
@WebServlet(name = "simpleAsync", urlPatterns = "/simpleAsync", asyncSupported = true)
public class SimpleAsyncServlet extends HttpServlet 

    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException 
        log.info("初始线程开始");
        AsyncContext asyncContext = request.startAsync();
        asyncContext.start(() -> 
            log.info("处理业务开始");
            try 
                Thread.sleep(2000L);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            log.info("处理业务结束");

            try 
                asyncContext.getResponse().getWriter().write("success");
             catch (IOException e) 
                e.printStackTrace();
            
            asyncContext.complete();
        );
        log.info("初始线程结束");
    

Servlet3.0异步处理的主要流程如下:

  • 1、根据request获取AsyncContext异步处理上下文对象

  • 2、调用AsyncContext的start()方法执行异步处理,该方法向申请一单独线程,并在新线程中处理业务请求,原线程则被回收到主线程池中。

  • 3、业务请求处理完毕后调用complete()方法通知servlet容器。

事实上,高并发下这种方式对性能的改进不大,因为如果新的线程和初始线程共享同一个服务线程池的话,并不能改善线程池资源紧张的问题。

优化方案:使用单独的线程池执行业务处理,降低高并发下Servlet容器主线程池资源紧张问题。

@Slf4j
@WebServlet(name = "simpleAsyncWithThreadPool", urlPatterns = "/simpleAsyncWithThreadPool", asyncSupported = true)
public class SimpleAsyncServletWithThreadPool extends HttpServlet 

    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException 
        log.info("初始线程开始");

        AsyncContext asyncContext = request.startAsync();
        // 使用自定义线程池去异步执行业务处理
        executor.execute(new Runnable() 
            @Override
            public void run() 
                log.info("处理业务开始");
                try 
                    Thread.sleep(2000L);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                log.info("处理业务结束");

                try 
                    asyncContext.getResponse().getWriter().write("success");
                 catch (IOException e) 
                    e.printStackTrace();
                
                asyncContext.complete();
            
        );

        log.info("初始线程结束");
    

2.2、Servlet3.1的异步处理优化

​ Servlet 3.0对请求的处理虽然是异步的,但是对InputStream和OutputStream的IO操作却依然是阻塞的,对于数据量大的请求体或者返回体,阻塞IO也将导致不必要的等待。因此在Servlet 3.1中引入了非阻塞IO(参考下图红框内容),通过在HttpServletRequest和HttpServletResponse中分别添加ReadListener和WriterListener方式,只有在IO数据满足一定条件时(比如数据准备好时),才进行后续的操作。

@Slf4j
@WebServlet(name = "nonBlockingAsync", urlPatterns = "/nonBlockingAsync", asyncSupported = true)
public class NonBlockingAsyncServlet extends HttpServlet 

    @Autowired
    private AsyncRestfulExecutor asyncRestfulExecutor;

    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException 
        log.info("初始线程开始");
        AsyncContext asyncContext = request.startAsync();

        ServletInputStream inputStream = request.getInputStream();

        inputStream.setReadListener(new ReadListener() 
            @Override
            public void onDataAvailable() throws IOException 

            

            @Override
            public void onAllDataRead() throws IOException 
                asyncRestfulExecutor.execute(() -> 
                    log.info("处理业务开始");
                    try 
                        Thread.sleep(2000L);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                    log.info("处理业务结束");

                    try 
                        asyncContext.getResponse().getWriter().write("success");
                     catch (IOException e) 
                        e.printStackTrace();
                    

                    asyncContext.complete();

                );
            

            @Override
            public void onError(Throwable t) 
                asyncContext.complete();
            
        );
        log.info("初始线程结束");
    

2.3、SpringMVC的异步处理

首先展示一个SpringMVC同步处理的示例:

@Slf4j
@RestController
@RequestMapping("/user")
public class UserController 

    /**
     * 同步处理
     * @return
     * @throws InterruptedException
     */
    @GetMapping("/addUserSync")
    @ResponseBody
    public String addUserSync() throws InterruptedException 
        log.info("初始线程,开始");
        log.info("业务处理,开始");
        Thread.sleep(500L);
        log.info("业务处理,结束");
        log.info("初始线程,结束");
        return "success";
    

执行结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qzzyvdeD-1645847837675)(C:\\Users\\meifangyuan\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220225221637418.png)]

根据日志输出可见,业务处理线程和初始线程为同一线程,高并发条件下,当业务处理耗时较长时,线程资源得不到释放,请求长时间占用服务连接池,将降低服务吞吐量。

DeferredResult和Callable都是为了异步生成返回值提供基本的支持。简单来说就是一个请求进来,如果你使用了DeferredResult或者Callable,在没有得到返回数据之前,DispatcherServlet和所有Filter就会退出Servlet容器线程,但响应保持打开状态,一旦返回数据有了,这个DispatcherServlet就会被再次调用并且处理,以异步产生的方式,向请求端返回值。

这么做的好处就是请求不会长时间占用服务连接池,提高服务器的吞吐量。

2.3.1、基于Callable的SpringMVC异步处理

    @GetMapping("/addUserSync1")
    @ResponseBody
    public Callable<String> addUserAsync1() throws InterruptedException 
        log.info("初始线程,开始");
        Callable<String> result = (()->
            log.info("业务处理,开始");
            Thread.sleep(500L);
            log.info("业务处理,结束");
            return "success";
        );
        log.info("初始线程,结束");
        return result;
    

controller直接返回Callable对象,SpringMVC会使用内置的线程池去异步执行业务处理。

执行流程:

  • 客户端请求服务
  • SpringMVC 调用 Controller,Controller 返回一个 Callback 对象
  • SpringMVC 调用 request.startAsync 并且将 Callback 提交到 TaskExecutor 中去执行
  • DispatcherServlet 以及 Filters 等从应用服务器线程中结束,但 Response 仍旧是打开状态,也就是说暂时还不返回给客户端
  • TaskExecutor 调用 Callback 返回一个结果,SpringMVC 将请求发送给应用服务器继续处理
  • DispatcherServlet 再次被调用并且继续处理 Callback 返回的对象,最终将其返回给客户端

2.3.2、基于DeferredResult的SpringMVC异步处理

    @GetMapping("/addUserAsync2")
    @ResponseBody
    public DeferredResult<String> addUserAsync2() 
        log.info("初始线程,开始");
        DeferredResult<String> deferredResult = new DeferredResult(1000L);
        /**
         * 绑定回调通知
         */
        // 异步处理业务完成后回调
        deferredResult.onCompletion(new Runnable() 
            @Override
            public void run() 
                log.info("异步处理执行完成回调");
            
        );

        // 异步处理业务超时回调
        deferredResult.onTimeout(new Runnable() 
            @Override
            public void run() 
                log.info("异步处理执行超时回调");
                deferredResult.setErrorResult("异步处理执行超时");
            
        );

        // 异步处理异常回调
        deferredResult.onError((throwable) -> 
            log.info("异步线程执行出错回调");
            deferredResult.setErrorResult("异步线程执行出错");
        );

        // 使用自定义线程池处理
        FIXED_THREAD_POOL.execute(new Runnable() 
            @SneakyThrows
            @Override
            public void run() 
                log.info("业务开始开始");
                Thread.sleep(500L);
                log.info("业务开始结束");

                deferredResult.setResult("success");
            
        );
        log.info("初始线程,结束");
        return deferredResult;
    

同样是异步处理,DefferedResult与Callable的区别在于:

  • DeferredResult 更灵活,可以主动 setResult 到 DeferredResult 中并返回,实现两个不相干的线程间通信

  • Callable 是由 SpringMVC 管理异步线程,而 DeferredResult 是自己创建线程池并管理异步线程

  • DefferedResult可以设置超时时间,完成|异常|超时回调。

3.3、基于DefferedResult的SpringMVC异步处理框架

​ 基于DefferedResult实现的SpringMVC异步处理抽象基类,该基类持有自定义的异步处理线程池,提供通用的异步业务执行方法,普通业务处理Controller只需继承该抽象类即可直接调用asyncInvoke异步执行方法。

public abstract class AbstractAsyncProcessController 

    @Autowired
    private BusinessProcessExecutor businessProcessExecutor;

    /**
     * 异步执行业务逻辑
     *
     * @param supplier 正常业务逻辑
     * @param function 异常情况
     * @param <T>
     * @return
     */
    protected  <T> DeferredResult<T> asyncInvoke(Supplier<T> supplier, BiFunction<String, String, T> function) 
        DeferredResult<T> deferredResult = new DeferredResult<>();
        CompletableFuture.supplyAsync(supplier, businessProcessExecutor)
                .whenCompleteAsync((result, e) -> 
                    if (e == null) 
                        deferredResult.setResult(result);
                        return;
                    

                    // 线程池不足
                    if (e instanceof ExecutionException) 
                        // R r = apply(T t) 方法接受参数,返回执行结果
                        deferredResult.setResult(function.apply("9999", "系统繁忙"));
                        return;
                    

                    // 其他内部错误
                    String errMsg = e.getMessage();
                    if (StringUtils.isNotBlank(errMsg)) 
                        errMsg = "系统内部错误,详细信息:" + errMsg;
                     else 
                        errMsg = "系统内部错误,请联系系统维护人员";
                    
                    deferredResult.setResult(function.apply("9999", errMsg));
                );
        return deferredResult;
    

    protected static <T extends BaseResponse> T buildErrorResponse(BaseRequest request, Class<T> responseClass, String errCode, String errMsg) 
        T response;
        try 
            response = responseClass.newInstance();
         catch (InstantiationException | IllegalAccessException e) 
            throw new RuntimeException(errMsg, e);
        
        BeanUtils.copyProperties(request, response);
        response.setErrCode(errCode);
        response.setErrMsg(errMsg);
        return response;
    


以下提供一个异步处理controller示例:

@Slf4j
@RestController
@RequestMapping("/person")
public class PersonController extends AbstractAsyncProcessController 

    @Autowired
    private PersonService personService;

    @PostMapping("/add")
    public DeferredResult<AddPersonResponse> add(@RequestBody AddPersonRequest request) 
        return asyncInvoke(() -> personService.addPerson(request),
                (errCode, errMsg) -> buildErrorResponse(request, AddPersonResponse.class, errCode, errMsg));
    


以上是关于高并发下的web异步处理方案的主要内容,如果未能解决你的问题,请参考以下文章

PHP开发中多种方案实现高并发下的抢购秒杀功能

关于高并发下kafka producer send异步发送耗时问题的分析

【golang】高并发下TCP常见问题解决方案

PHP实现高并发下抢购,秒杀四种方案,你用过哪种?

php结合redis实现高并发下的抢购秒杀功能

php结合redis实现高并发下的抢购秒杀功能