高并发下的web异步处理方案
Posted kuailefangyuan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发下的web异步处理方案相关的知识,希望对你有一定的参考价值。
高并发下的web异步处理方案
一、问题介绍
平时web开发时(使用的servlet或者基于servlet封装的SpringMVC框架),业务处理基本都是同步处理,即业务处理与web容器接收线程为同一线程,每一次Http请求都由一个线程从头到尾负责处理。
如果一个请求业务处理涉及IO操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待IO操作完成。而IO操作是非常慢的,将导致线程并不能及时地释放回线程池以供后续使用。在高并发请求条件下,web容器线程池将很快耗尽,降低服务吞吐量。
二、解决方案
针对上述问题,解决办法就是另起单独线程去处理业务请求,提前释放当前服务接收线程,避免当前接收线程阻塞等待得不到释放,以缓解高并发下的线程池资源紧张,提高服务吞吐量。
异步处理过程:当收到一个http请求后,tomcat等中间件的主线程调用副线程来执行业务处理,当副线程执行完成业务处理后,主线程再返回结果,在副线程执行业务处理过程中,主线程会空闲出来以处理其他请求,以此提升服务器的吞吐量。
servlet以及SpringMVC针对该问题都有对应的异步处理解决方案,具体如下:
2.1、Servlet3.0的异步处理
在Servlet 3.0中,我们可以从HttpServletRequest对象中获得AsyncContext异步处理上下文,Request和Response对象都可从中获取。AsyncContext可以从当前线程传给其他线程,并在新的线程中完成对请求的处理并返回结果给客户端,初始线程便可以还回给容器线程池以处理更多的请求。
@Slf4j
@WebServlet(name = "simpleAsync", urlPatterns = "/simpleAsync", asyncSupported = true)
public class SimpleAsyncServlet extends HttpServlet
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
log.info("初始线程开始");
AsyncContext asyncContext = request.startAsync();
asyncContext.start(() ->
log.info("处理业务开始");
try
Thread.sleep(2000L);
catch (InterruptedException e)
e.printStackTrace();
log.info("处理业务结束");
try
asyncContext.getResponse().getWriter().write("success");
catch (IOException e)
e.printStackTrace();
asyncContext.complete();
);
log.info("初始线程结束");
Servlet3.0异步处理的主要流程如下:
-
1、根据request获取AsyncContext异步处理上下文对象
-
2、调用AsyncContext的start()方法执行异步处理,该方法向申请一单独线程,并在新线程中处理业务请求,原线程则被回收到主线程池中。
-
3、业务请求处理完毕后调用complete()方法通知servlet容器。
事实上,高并发下这种方式对性能的改进不大,因为如果新的线程和初始线程共享同一个服务线程池的话,并不能改善线程池资源紧张的问题。
优化方案:使用单独的线程池执行业务处理,降低高并发下Servlet容器主线程池资源紧张问题。
@Slf4j
@WebServlet(name = "simpleAsyncWithThreadPool", urlPatterns = "/simpleAsyncWithThreadPool", asyncSupported = true)
public class SimpleAsyncServletWithThreadPool extends HttpServlet
public static ExecutorService executor = Executors.newFixedThreadPool(10);
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
log.info("初始线程开始");
AsyncContext asyncContext = request.startAsync();
// 使用自定义线程池去异步执行业务处理
executor.execute(new Runnable()
@Override
public void run()
log.info("处理业务开始");
try
Thread.sleep(2000L);
catch (InterruptedException e)
e.printStackTrace();
log.info("处理业务结束");
try
asyncContext.getResponse().getWriter().write("success");
catch (IOException e)
e.printStackTrace();
asyncContext.complete();
);
log.info("初始线程结束");
2.2、Servlet3.1的异步处理优化
Servlet 3.0对请求的处理虽然是异步的,但是对InputStream和OutputStream的IO操作却依然是阻塞的,对于数据量大的请求体或者返回体,阻塞IO也将导致不必要的等待。因此在Servlet 3.1中引入了非阻塞IO(参考下图红框内容),通过在HttpServletRequest和HttpServletResponse中分别添加ReadListener和WriterListener方式,只有在IO数据满足一定条件时(比如数据准备好时),才进行后续的操作。
@Slf4j
@WebServlet(name = "nonBlockingAsync", urlPatterns = "/nonBlockingAsync", asyncSupported = true)
public class NonBlockingAsyncServlet extends HttpServlet
@Autowired
private AsyncRestfulExecutor asyncRestfulExecutor;
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
log.info("初始线程开始");
AsyncContext asyncContext = request.startAsync();
ServletInputStream inputStream = request.getInputStream();
inputStream.setReadListener(new ReadListener()
@Override
public void onDataAvailable() throws IOException
@Override
public void onAllDataRead() throws IOException
asyncRestfulExecutor.execute(() ->
log.info("处理业务开始");
try
Thread.sleep(2000L);
catch (InterruptedException e)
e.printStackTrace();
log.info("处理业务结束");
try
asyncContext.getResponse().getWriter().write("success");
catch (IOException e)
e.printStackTrace();
asyncContext.complete();
);
@Override
public void onError(Throwable t)
asyncContext.complete();
);
log.info("初始线程结束");
2.3、SpringMVC的异步处理
首先展示一个SpringMVC同步处理的示例:
@Slf4j
@RestController
@RequestMapping("/user")
public class UserController
/**
* 同步处理
* @return
* @throws InterruptedException
*/
@GetMapping("/addUserSync")
@ResponseBody
public String addUserSync() throws InterruptedException
log.info("初始线程,开始");
log.info("业务处理,开始");
Thread.sleep(500L);
log.info("业务处理,结束");
log.info("初始线程,结束");
return "success";
执行结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qzzyvdeD-1645847837675)(C:\\Users\\meifangyuan\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220225221637418.png)]
根据日志输出可见,业务处理线程和初始线程为同一线程,高并发条件下,当业务处理耗时较长时,线程资源得不到释放,请求长时间占用服务连接池,将降低服务吞吐量。
DeferredResult和Callable都是为了异步生成返回值提供基本的支持。简单来说就是一个请求进来,如果你使用了DeferredResult或者Callable,在没有得到返回数据之前,DispatcherServlet和所有Filter就会退出Servlet容器线程,但响应保持打开状态,一旦返回数据有了,这个DispatcherServlet就会被再次调用并且处理,以异步产生的方式,向请求端返回值。
这么做的好处就是请求不会长时间占用服务连接池,提高服务器的吞吐量。
2.3.1、基于Callable的SpringMVC异步处理
@GetMapping("/addUserSync1")
@ResponseBody
public Callable<String> addUserAsync1() throws InterruptedException
log.info("初始线程,开始");
Callable<String> result = (()->
log.info("业务处理,开始");
Thread.sleep(500L);
log.info("业务处理,结束");
return "success";
);
log.info("初始线程,结束");
return result;
controller直接返回Callable对象,SpringMVC会使用内置的线程池去异步执行业务处理。
执行流程:
- 客户端请求服务
- SpringMVC 调用 Controller,Controller 返回一个 Callback 对象
- SpringMVC 调用 request.startAsync 并且将 Callback 提交到 TaskExecutor 中去执行
- DispatcherServlet 以及 Filters 等从应用服务器线程中结束,但 Response 仍旧是打开状态,也就是说暂时还不返回给客户端
- TaskExecutor 调用 Callback 返回一个结果,SpringMVC 将请求发送给应用服务器继续处理
- DispatcherServlet 再次被调用并且继续处理 Callback 返回的对象,最终将其返回给客户端
2.3.2、基于DeferredResult的SpringMVC异步处理
@GetMapping("/addUserAsync2")
@ResponseBody
public DeferredResult<String> addUserAsync2()
log.info("初始线程,开始");
DeferredResult<String> deferredResult = new DeferredResult(1000L);
/**
* 绑定回调通知
*/
// 异步处理业务完成后回调
deferredResult.onCompletion(new Runnable()
@Override
public void run()
log.info("异步处理执行完成回调");
);
// 异步处理业务超时回调
deferredResult.onTimeout(new Runnable()
@Override
public void run()
log.info("异步处理执行超时回调");
deferredResult.setErrorResult("异步处理执行超时");
);
// 异步处理异常回调
deferredResult.onError((throwable) ->
log.info("异步线程执行出错回调");
deferredResult.setErrorResult("异步线程执行出错");
);
// 使用自定义线程池处理
FIXED_THREAD_POOL.execute(new Runnable()
@SneakyThrows
@Override
public void run()
log.info("业务开始开始");
Thread.sleep(500L);
log.info("业务开始结束");
deferredResult.setResult("success");
);
log.info("初始线程,结束");
return deferredResult;
同样是异步处理,DefferedResult与Callable的区别在于:
-
DeferredResult 更灵活,可以主动 setResult 到 DeferredResult 中并返回,实现两个不相干的线程间通信
-
Callable 是由 SpringMVC 管理异步线程,而 DeferredResult 是自己创建线程池并管理异步线程
-
DefferedResult可以设置超时时间,完成|异常|超时回调。
3.3、基于DefferedResult的SpringMVC异步处理框架
基于DefferedResult实现的SpringMVC异步处理抽象基类,该基类持有自定义的异步处理线程池,提供通用的异步业务执行方法,普通业务处理Controller只需继承该抽象类即可直接调用asyncInvoke异步执行方法。
public abstract class AbstractAsyncProcessController
@Autowired
private BusinessProcessExecutor businessProcessExecutor;
/**
* 异步执行业务逻辑
*
* @param supplier 正常业务逻辑
* @param function 异常情况
* @param <T>
* @return
*/
protected <T> DeferredResult<T> asyncInvoke(Supplier<T> supplier, BiFunction<String, String, T> function)
DeferredResult<T> deferredResult = new DeferredResult<>();
CompletableFuture.supplyAsync(supplier, businessProcessExecutor)
.whenCompleteAsync((result, e) ->
if (e == null)
deferredResult.setResult(result);
return;
// 线程池不足
if (e instanceof ExecutionException)
// R r = apply(T t) 方法接受参数,返回执行结果
deferredResult.setResult(function.apply("9999", "系统繁忙"));
return;
// 其他内部错误
String errMsg = e.getMessage();
if (StringUtils.isNotBlank(errMsg))
errMsg = "系统内部错误,详细信息:" + errMsg;
else
errMsg = "系统内部错误,请联系系统维护人员";
deferredResult.setResult(function.apply("9999", errMsg));
);
return deferredResult;
protected static <T extends BaseResponse> T buildErrorResponse(BaseRequest request, Class<T> responseClass, String errCode, String errMsg)
T response;
try
response = responseClass.newInstance();
catch (InstantiationException | IllegalAccessException e)
throw new RuntimeException(errMsg, e);
BeanUtils.copyProperties(request, response);
response.setErrCode(errCode);
response.setErrMsg(errMsg);
return response;
以下提供一个异步处理controller示例:
@Slf4j
@RestController
@RequestMapping("/person")
public class PersonController extends AbstractAsyncProcessController
@Autowired
private PersonService personService;
@PostMapping("/add")
public DeferredResult<AddPersonResponse> add(@RequestBody AddPersonRequest request)
return asyncInvoke(() -> personService.addPerson(request),
(errCode, errMsg) -> buildErrorResponse(request, AddPersonResponse.class, errCode, errMsg));
以上是关于高并发下的web异步处理方案的主要内容,如果未能解决你的问题,请参考以下文章