使用DeferredResult实现异步处理REST服务示例

Posted 小志的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用DeferredResult实现异步处理REST服务示例相关的知识,希望对你有一定的参考价值。

一、同步处理和异步处理的理解

  • 同步处理:一个http请求进来,一个tomcat或者中间件会有一个相应的线程来处理http请求,所有的业务逻辑都会在此线程中执行,并会返回一个响应。

  • 异步处理:一个http请求进来,一个tomcat或者中间件的主线程会调用一个副线程去执行业务逻辑,当副线程执行完以后,主线程再把结果返回。当副线程处理业务逻辑的过程中,主线程可以空闲出来处理其他请求。

二、同步处理的代码示例

1、代码

package com.xz.springsecuritydemo.async;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @description: 同步处理服务
 * @author: xz
 */
@RestController
@RequestMapping("/syncOrder")
public class SyncController {
    private Logger logger = LoggerFactory.getLogger(getClass());

    //同步的方式执行
    @GetMapping("/getOrder")
    public String query(){
        logger.info("主线程开始执行=======");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("主线程执行结束--------");
        return "success!";
    }
}

2、模拟发送get请求,如下图:

3、查看控制台输出,如下图可知,都是同一个线程中执行,线程的名称都为nio-8001-exec-1。

三、使用DeferredResult实现异步处理的代码示例

1、使用DeferredResult实现异步处理的示意图如下:

  • 接收请求的为服务1,而处理请求的为服务2;
  • http请求发送到服务1后,服务1中的线程1会把请求发送到消息队列中;
  • 而服务2监听消息队列,当监听到消息队列中有请求消息后,服务2去处理具体逻辑并把处理结果再放回消息队列;
  • 同时服务1中有另外一个线程2去监听消息队列,如果发现消息队列中油处理结果的消息,会根据消息的结果返回http响应;
  • 服务1中的线程1和线程2是隔离的。


2、代码示例

  • 使用对象模拟消息队列

    package com.xz.springsecuritydemo.async;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Component;
    /**
     * @description: 1、使用对象模拟消息队列
     * @author: xz
     */
    @Component
    public class MockQueue {
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        private String placeOrder;//下单的消息
        private String completeOrder;//订单完成消息
    
        public String getPlaceOrder() {
            return placeOrder;
        }
    
        public void setPlaceOrder(String placeOrder) {
            new Thread(()->{
                //下单的处理放到单独的线程中
                logger.info("接收到下单消息:"+placeOrder);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                this.completeOrder = placeOrder;
                logger.info("下单请求处理完成:"+placeOrder);
            }).start();
        }
    
        public String getCompleteOrder() {
            return completeOrder;
        }
    
        public void setCompleteOrder(String completeOrder) {
            this.completeOrder = completeOrder;
        }
    }
    
  • 传递DeferredResult中的对象使用

    package com.xz.springsecuritydemo.async;
    
    import org.springframework.stereotype.Component;
    import org.springframework.web.context.request.async.DeferredResult;
    import java.util.HashMap;
    import java.util.Map;
    /**
     * @description: 2、传递DeferredResult中的对象使用
     * @author: xz
     */
    @Component
    public class DeferredResultHolder {
    
        private Map<String, DeferredResult<String>> map=new HashMap<>();
    
        public Map<String, DeferredResult<String>> getMap() {
            return map;
        }
    
        public void setMap(Map<String, DeferredResult<String>> map) {
            this.map = map;
        }
    }
    
  • 发送请求并把消息放入消息队列中

    package com.xz.springsecuritydemo.async;
    
    import org.apache.commons.lang.RandomStringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.context.request.async.DeferredResult;
    /**
     * @description: 3、发送请求并把消息放入消息队列中
     * @author: xz
     */
    @RestController
    @RequestMapping("/asyncOrder")
    public class OrderController {
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @Autowired
        private MockQueue mockQueue;
        @Autowired
        private DeferredResultHolder deferredResultHolder;
    
        public DeferredResult<String>  queryOrder(){
            logger.info("主线程开始");
            /**
             * 在下面代码中看不到另一个线程的业务处理
             */
            //生成随机订单号
            String orderNumber = RandomStringUtils.randomNumeric(8);//生成8位随机数
            //订单号放入消息队列
            mockQueue.setPlaceOrder(orderNumber);
            //声明DeferredResult
            DeferredResult<String> objectDeferredResult = new DeferredResult<>();
            //把orderNumber订单号和objectDeferredResult放到deferredResultHolder中
            deferredResultHolder.getMap().put(orderNumber,objectDeferredResult);
    
            logger.info("主线程结束");
            return objectDeferredResult;
        }
    
    }
    
  • 监听模拟的消息队列

    package com.xz.springsecuritydemo.async;
    
    import org.apache.commons.lang.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.stereotype.Component;
    /**
     * @description: 监听模拟的消息队列
     *           监听器,实现ApplicationListener接口并重写onApplicationEvent方法
     * @author: xz
     */
    @Component
    public class QueueListener implements ApplicationListener<ContextRefreshedEvent> {
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @Autowired
        private MockQueue mockQueue;
    
        @Autowired
        private DeferredResultHolder deferredResultHolder;
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            new Thread(()->{
                //因为是循环处理,需要放在单独的线程中执行
                while(true){
                    //模拟的队列中如果完成的订单有值
                    if(StringUtils.isNotBlank(mockQueue.getCompleteOrder())){
                        //获取订单号
                        String completeOrder= mockQueue.getCompleteOrder();
                        logger.info("返回订单处理结果"+completeOrder);
                        //getMap().get(completeOrder)从deferredResultHolder的map里根据completeOrder点单编号key获取value值
                        //setResult 返回浏览器的结果
                        deferredResultHolder.getMap().get(completeOrder).setResult("success");
                        //因为是模拟的消息队列,不会反复处理,所以置空。
                        mockQueue.setCompleteOrder(null);
                    }else{//模拟的队列中如果没有值,睡眠1秒
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }).start();
    
        }
    }
    

3、运行项目,模拟发送get请求,如下图:

4、输出结果如下:

以上是关于使用DeferredResult实现异步处理REST服务示例的主要内容,如果未能解决你的问题,请参考以下文章

(05)使用DeferredResult多线程异步处理请求

(05)使用DeferredResult多线程异步处理请求

Spring DeferredResult 异步请求

DeferredResult 如何实现长轮询?

DeferredResult 实现长轮询

理解Callable 和 Spring DeferredResult