springboot的异步调用

Posted 不死码农

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot的异步调用相关的知识,希望对你有一定的参考价值。

package com.handsight.platform.fras.aapp;

import java.util.Locale;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.i18n.LocaleContextHolder;
import org.springframework.stereotype.Component;

import com.handsight.platform.common.util.LogUtil;
import com.handsight.platform.fras.data.StaticObject;
import com.handsight.platform.fras.thread.service.AsyncService;

@Component
public class StartupListener implements ApplicationListener<ContextRefreshedEvent> {

    private final static Logger logger = LoggerFactory.getLogger(StartupListener.class);

    @Autowired
    private AsyncService asyncService;

    public StartupListener() {
    };

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {

        // 地域语言
        setSystemDefaultLanguage();

        logger.info("---------------- Start execute Async thread completed.");
        // 删除用户的相关访问信息线程
        asyncService.execDeleteLogOutUserCacheInfo();

        // 启动图片批量发送线程
        asyncService.sendImageBatch();

        logger.info("---------------- End execute Async thread completed.");

    }

    /**
     * 设置系统默认语言
     */
    private  void setSystemDefaultLanguage() {
        Locale locale= LocaleContextHolder.getLocale();
        locale = Locale.CHINA;
//        if(!Constants.SYSTEM_DEFAULT_LANGUAGE.equals(locale.getLanguage()) ) {
//            locale = Locale.US;
//        }
        LocaleContextHolder.setLocale(locale);
        StaticObject.locale =locale;
        LogUtil.info("This language is {0}", StaticObject.locale.getLanguage());
    }

}

使用:

package com.handsight.platform.fras.thread.service;

import java.net.Socket;
import java.util.List;

import com.handsight.platform.common.exception.AppException;
import com.handsight.platform.fras.mgt.pojo.T_user;
import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq;

/**
 * 异步线程的接口
 *
 * @author wangh
 *
 */
public interface AsyncService {


    /**
     * 批量发送图片
     *
     * @param userToken
     * @param facePhotoString
     * @throws AppException
     * @throws Exception
     */
    public void sendImageBatch();

    /**
     * 异步任务实现类: 用户退出后删除其可变用户令牌与账号的缓存信息;以及session信息
     */
    void execDeleteLogOutUserCacheInfo();

    /**
     * 异步任务实现类: 将硬件状态数据存入数据库
     */
    void execHardwareStatusDataToDBAsync();

    /**
     * 异步任务实现类:向算法发送任务 by http
     */
    public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception;

    /**
     * 异步任务实现类:向算法发送任务 by http
     *
     * @throws Exception
     */
    public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception;

    /**
     * 异步任务实现类:保存用户令牌及 用户相关信息
     *
     * @param user
     * @throws Exception
     */
    public void saveUserInfo(T_user user) throws Exception;

    /**
     * 存储用户登录地点信息
     *
     * @param user
     * @return
     */
    public void saveLoginLocationInfo(T_user user);

    /**
     * 更新用户信息
     *
     * @param user
     * @return
     */
    public void updateUserInfo(T_user user) throws Exception;

    /***
     * 存储用户人脸信息及特征值
     *
     * @param user
     * @param userToken
     * @param currentFaceCode
     * @throws Exception
     */
    public void saveUserFaceCode(T_user user, String userToken, String currentFaceCode) throws Exception;

    /**
     * 异步任务实现类:接受算法产生的图片特征码 by http
     */
    public void workReciveResultThread(Socket socket);

    /**
     * 异步任务实现类:更新用户人脸特征库
     */
    public void updateUserFaceCodeListThread();

    /**
     *
     * 消息推送
     *
     * @param platform
     * @param pushKey
     * @param content
     * @throws Exception
     */
    public void pushMsg(String platform, String pushKey, String content) throws Exception;

    /**
     * 将月度登录失败次数加一
     */
    public void addOneMonthFailedNum(String userAccount) throws Exception;

    /**
     * 异步任务实现类:向算法发送任务
     */
    @Deprecated
    public void workSendTaskThread_skt(Socket socket);

    /**
     * 异步任务实现类:接受算法产生的图片特征码 socket
     */
    @Deprecated
    public void workReciveResultThread_skt(Socket socket);
}

实现:

package com.handsight.platform.fras.thread.service.impl;

import com.handsight.platform.common.exception.AppException;
import com.handsight.platform.common.util.HttpRequestUtil;
import com.handsight.platform.common.util.JsonUtil;
import com.handsight.platform.common.util.LogUtil;
import com.handsight.platform.common.util.UuidUtil;
import com.handsight.platform.fras.cache.UserCache;
import com.handsight.platform.fras.constant.Constants;
import com.handsight.platform.fras.constant.ErrorCodeMsg;
import com.handsight.platform.fras.data.StaticObject;
import com.handsight.platform.fras.mapper.UserMapper;
import com.handsight.platform.fras.mgt.pojo.T_user;
import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq;
import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmRes;
import com.handsight.platform.fras.mgt.pojo.UserFaceBean;
import com.handsight.platform.fras.pojo.MessageBean;
import com.handsight.platform.fras.service.CommonService;
import com.handsight.platform.fras.service.PushService;
import com.handsight.platform.fras.service.RedisService;
import com.handsight.platform.fras.thread.service.AsyncService;
import com.handsight.platform.fras.util.BeanUtil;
import com.handsight.platform.fras.util.EnumUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.NameValuePair;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.handsight.platform.common.constant.Constants.HTTP_RES_CODE;
import static com.handsight.platform.common.constant.Constants.HTTP_RES_CONTENT;
import static com.handsight.platform.fras.constant.Constants.QUEUE_KEY_LOGOUT_USER_TOKEN;

@Service
public class AsyncServiceImpl implements AsyncService {

    private final static Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);

    /** 定义一个每次要数据的大小(35K) */
    public final static int PART_SIZE = 35 * 1024;

    static boolean sendTaskThreadStart = false;

    @Autowired
    private RedisService redisService;

    @Autowired
    private PushService pushService;

    @Autowired
    private CommonService commonService;

    @Autowired
    private UserMapper userMapper;

    @Autowired
    private UserCache userCache;

    @Value("${fras.server.ip}")
    private String serverIp;

    @Value("${fras.send.work.port}")
    private String sendWorkPort;

    @Value("${fras.send.batch.list.size}")
    private int batchListSize;
    @Value("${fras.send.batch.interval.ms.times}")
    private int sendIntervalTimes;


    @Override
    @Async // ("frasAsyncServiceExecutor")
    public  void sendImageBatch() {
        long start = 0L;
         List<TransferAlgorithmReq> transferAlgorithmBeanList = null;
        while (true) {
            try {

                int cnt =0;
                // 每指定个数发送一次
                while (!StaticObject.imageQueue.isEmpty() ) {

                    if (StaticObject.transferAlgorithmBeanList.size() < batchListSize) {
                        redisService.hmSet("a---batchListSize", UuidUtil.uuid(), StaticObject.transferAlgorithmBeanList.size() ); // TODO
                        StaticObject.transferAlgorithmBeanList.add(StaticObject.imageQueue.take());
                        redisService.hmSet("cnt", UuidUtil.uuid(), ++cnt);
                        start = System.currentTimeMillis();
                    } else {
                        System.out.println("输出集合"+StaticObject.transferAlgorithmBeanList.size());
                        transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList);
                        dealWithTask(transferAlgorithmBeanList);// 处理请求
                    }
                }
                // 不足指定个数每指定秒数发送一次
                if (StaticObject.transferAlgorithmBeanList.size() > 0 && (((System.currentTimeMillis() - start)) >= sendIntervalTimes)) {

                    transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList);
                    dealWithTask(transferAlgorithmBeanList);// 处理请求
                } else {
                    Thread.sleep(10);
                }
            } catch (Exception e) {
                logger.error("提取特征码异常!", e);
            } finally {
            }
        }

    }

    /**
     * 处理任务队列
     * 当已发送的匹数大于3个时将延迟发送1秒
     *
     * @param transferAlgorithmBeanList
     */
    private void dealWithTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) {
        try {
            redisService.hmSet("a---executer", UuidUtil.uuid(), transferAlgorithmBeanList.size());
            long num = redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, 1L);
            if( num > 3L) {
                Thread.sleep(1000);
            }
            // 批量发送图片
            sendImageBatchTask(transferAlgorithmBeanList);

//            new Thread(new DealQueueThread(transferAlgorithmBeanList)).start();
            StaticObject.transferAlgorithmBeanList.clear();
        } catch (Exception e) {
            logger.error("特征码提取处理异常", e);
        } finally {
        }
    }

    /**
     * 批量发送图片
     *
     * @param transferAlgorithmBeanList
     * @throws AppException
     * @throws Exception
     */
    private void sendImageBatchTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception {

        long start = System.currentTimeMillis();
        String dataRequest = "";
        try {

            // 以表单形式向所有算法服务器发送指令 a
            dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList);
            List<NameValuePair> params = new ArrayList<NameValuePair>();
            NameValuePair pair = new BasicNameValuePair("data", dataRequest);
            params.add(pair);
            String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/";

            logger.info("传输数据完毕, 耗时:" + ((System.currentTimeMillis() - start) ) + "ms,  num:" + transferAlgorithmBeanList.size() + "  size: " + dataRequest.length() / 1024);

            // 处理结果
            dealResponseResult(HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 1, params));

            logger.info("获取结果耗时:" + (System.currentTimeMillis() - start));
        } catch (AppException ape) {

            ErrorCodeMsg errorCodeMsg = EnumUtil.getByCode(ape.getCode(), ErrorCodeMsg.class);
            setExceptionForUser(errorCodeMsg, transferAlgorithmBeanList);
        }  catch (Exception e) {
            setExceptionForUser(ErrorCodeMsg.AI_SERVER_ABNORMAL, transferAlgorithmBeanList);
        } finally {
            redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, -1L);
        }
    }

    /**
     * 处理http的响应结果
     *
     * @param resultMap
     */
    private void dealResponseResult(Map<String, Object> resultMap) throws Exception {

        if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) {

            String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg());
            throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg);
        } else {

            String code = (String) resultMap.get(HTTP_RES_CODE);
            String content = (String) resultMap.get(HTTP_RES_CONTENT);

            if (!Constants.HTTP_STATUS_CODE_200.equals(code)) {
                logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content);
                String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg());
                throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg);
            } else {

                List<TransferAlgorithmRes> resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class);
                for (TransferAlgorithmRes res : resultList) {
                    redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10);
                }
            }
        }
    }

    /**
     *
     * 为每个用户设置异常时的信息
     *
     * @param erroCodeMsg
     * @param transferAlgorithmBeanList
     */
    private void setExceptionForUser(ErrorCodeMsg erroCodeMsg, List<TransferAlgorithmReq> transferAlgorithmBeanList) {

        int msgCode = erroCodeMsg.getCode();
        String msg = commonService.getMessage(erroCodeMsg.getMsg());
        for (TransferAlgorithmReq req : transferAlgorithmBeanList) {
            redisService.setForTimeMIN(Constants.CACHE_KEY_EXCEPTION_ALGORITHM + req.getId(), new MessageBean(msgCode, msg), 10);
        }
    }

    /**
     * 异步任务实现类:向算法发送任务 by http
     *
     * @throws Exception
     */
    @Override
//    @Async
    public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception {

        long start = System.currentTimeMillis();
        String dataRequest = "";
        try {
            for (TransferAlgorithmReq req : transferAlgorithmBeanList) {
                System.out.println("http-------------------------------" + req.getId());
            }

            List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>();
            dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList);
            // 以表单形式向所有算法服务器发送指令
            List<NameValuePair> params = new ArrayList<NameValuePair>();
//            NameValuePair pair = new BasicNameValuePair("id", "1");
//            params.add(pair);
            NameValuePair pair = new BasicNameValuePair("data", dataRequest);
            params.add(pair);
            String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/";
            logger.info("传输数据完毕, 2222耗时:" + ((System.currentTimeMillis() - start) / 1000) + "  num:" + transferAlgorithmBeanList.size() + "  size: " + dataRequest.length() / 1024);
            Map<String, Object> resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params);

            if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) {
                String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg());
                throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg);
            } else {
                String code = (String) resultMap.get(HTTP_RES_CODE);
                String content = (String) resultMap.get(HTTP_RES_CONTENT);

                if (!Constants.HTTP_STATUS_CODE_200.equals(code)) {
                    logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content);
                    String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg());
                    throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg);
                } else {
                    resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class);
                    for (TransferAlgorithmRes res : resultList) {
                        redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10);
                    }
                }
            }
            logger.info("获取结果333耗时:" + (System.currentTimeMillis() - start) / 1000);
        } catch (Exception e) {
            throw e;
        } finally {
        }
    }

    /**
     * 异步任务实现类:向算法发送任务 by http
     *
     * @throws Exception
     */
    @Override
//    @Async("asyncServiceExecutor")、
    @Async
    public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception {

        Map<String, Object> resultMap = null;

        long start = 0;
        try {

            if (!sendTaskThreadStart) {
                sendTaskThreadStart = true;
                start = System.currentTimeMillis();
                while (true) {
                    if ((System.currentTimeMillis() - start) / 1000 > 5) {
                        break;
                    } else {
//                        TransferAlgorithmReq transferAlgorithmBean = (TransferAlgorithmReq) redisService.rightPop(Constants.QUEUE_TASK);
                        TransferAlgorithmReq transferAlgorithmBean = StaticObject.imageQueue.poll();
                        if (StaticObject.transferAlgorithmBeanList != null && StaticObject.transferAlgorithmBeanList.size() <= 4) {
                            if (transferAlgorithmBean != null) {
                                StaticObject.transferAlgorithmBeanList.add(transferAlgorithmBean);
                                logger.info("bean:" + transferAlgorithmBean.getId());
                            }
                        } else {
                            redisService.leftPush(Constants.QUEUE_TASK, transferAlgorithmBean);
                            break;
                        }
                    }
                }
            } else {
                return;
            }

            List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>();

            // 以表单形式向所有算法服务器发送指令
            List<NameValuePair> params = new ArrayList<NameValuePair>();
            NameValuePair pair = new BasicNameValuePair("id", userToken);
            params.add(pair);
            pair = new BasicNameValuePair("img", facePhotoString); // JsonUtil.getJsonString(StaticObject.transferAlgorithmBeanList) TODO
            params.add(pair);
            String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_METHOD + "/";
            resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params);

            if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) {
                String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg());
                throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg);
            } else {
                String code = (String) resultMap.get(HTTP_RES_CODE);
                String content = (String) resultMap.get(HTTP_RES_CONTENT);

                TransferAlgorithmRes sd = new TransferAlgorithmRes(userToken, 1002, "001,191,101"); // TODO
                TransferAlgorithmRes bean = JsonUtil.json2Obj(JsonUtil.getJsonString(sd), TransferAlgorithmRes.class); // TODO
                List<TransferAlgorithmRes> lst2 = new ArrayList<TransferAlgorithmRes>(); // TODO
                lst2.add(bean);// TODO
                content = JsonUtil.getJsonString(lst2);// TODO

                if (!Constants.HTTP_STATUS_CODE_200.equals(code)) {
                    logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content);
                    String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg());
                    throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg);
                } else {
                    resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class);
                    for (TransferAlgorithmRes res : resultList) {
                        redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10);
                    }

//                    redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + userToken, content, 10);
                }
            }
            logger.info("传输数据完毕,耗时:" + (System.currentTimeMillis() - start + "  size:" + facePhotoString.length() / 1024));
        } catch (Exception e) {
            throw e;
        } finally {
            StaticObject.transferAlgorithmBeanList.clear();
            sendTaskThreadStart = false;
        }

//        return resultMap;
    }

    /**
     * 异步任务实现类:保存用户令牌及 用户相关信息
     *
     * @param user
     * @throws Exception
     */
    @Override
    @Async
    @Transactional(rollbackFor = Exception.class)
    public void saveUserInfo(T_user user) throws Exception {

        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    // 保存用户信息
                    commonService.saveUserInfo(user);
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("数据保存失败,开始重试次数:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("数据保存失败,开始重试次数:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e);
                break;
            } finally {
            }
        }
    }

    /**
     * 存储用户登录地点信息
     *
     * @param user
     * @return
     */
    @Override
    @Async
    public void saveLoginLocationInfo(T_user user) {
        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    commonService.checkSQLReturnCode(userMapper.saveLoginLocationInfo(user));
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("数据保存失败,开始重试次数:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("数据保存失败,开始重试次数:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e);
                break;
            } finally {
            }
        }
    }

    /***
     * 存储用户人脸信息及特征值
     *
     * @param user
     * @param userToken
     * @param currentFaceCode
     * @throws Exception
     */
    @Override
    public void saveUserFaceCode(T_user user, String userAccount, String currentFaceCode) throws Exception {

        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    commonService.saveUserFaceCode(user, userAccount, currentFaceCode);
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("数据保存失败,开始重试次数:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("数据保存失败,开始重试次数:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e);
                break;
            } finally {
            }
        }
    }

    /**
     * 更新用户信息
     *
     * @param user
     * @return
     */
    @Override
    public void updateUserInfo(T_user user) throws Exception {

        int cnt = 0;
        while (true) {
            try {
                try {
                    cnt++;
                    userMapper.updateUserInfo(user);
                    // 更新用户信息
                    userCache.getUserInfo(user, true);
                    break;
                } catch (Exception e) {
                    if (cnt > 20) {
                        logger.error("数据保存失败,开始重试次数:" + cnt, e);
                        throw e;
                    } else {
                        Thread.sleep(5000);
                    }
                    logger.error("数据保存失败,开始重试次数:" + cnt, e);
                }
            } catch (Exception e) {
                logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e);
                break;
            } finally {
            }
        }

    }

    /**
     * 异步任务实现类: 用户退出后删除其可变用户令牌与账号的缓存信息;以及session信息
     */
    @Override
    @Async
    public void execDeleteLogOutUserCacheInfo() {

        while (true) {
            String userTokenJson = null;
            // 加锁
            // lock.lock();
            try {
                try {
                    userTokenJson = (String) redisService.rightPop(QUEUE_KEY_LOGOUT_USER_TOKEN);
                    if (!waitForData(userTokenJson)) {
                        continue;
                    }
                } catch (Exception e) {
                }

                try {
                    // session
                    String sessionKey = Constants.SPRING_SESSION_NAME_NAMESPCE + userTokenJson;
                    if (redisService.hasKey(sessionKey)) {
                        redisService.delete(sessionKey);
                    }
                } catch (Exception e) {
                    logger.error("redis 连接异常", e);
                    redisService.leftPush(QUEUE_KEY_LOGOUT_USER_TOKEN, userTokenJson);
                    throw e;
                }
            } catch (Exception e) {
                logger.error("删除已退出的用户session信息失败,用户令牌:" + userTokenJson, e);
            } finally {
                // 解锁
                // lock.unlock();
            }
        }
    }

    /**
     * 异步任务实现类:接受算法产生的图片特征码 by http
     */
    @Override
//    @Async("asyncServiceExecutor")
    public void workReciveResultThread(Socket socket) {
        while (true) {
            try {

                /**
                 * 在从Socket的InputStream中接收数据时,像上面那样一点点的读就太复杂了, 有时候我们就会换成使用BufferedReader来一次读一行
                 *
                 * BufferedReader的readLine方法是一次读一行的,这个方法是阻塞的,直到它读到了一行数据为止程序才会继续往下执行,
                 * 那么readLine什么时候才会读到一行呢?直到程序遇到了换行符或者是对应流的结束符readLine方法才会认为读到了一行,
                 * 才会结束其阻塞,让程序继续往下执行。
                 * 所以我们在使用BufferedReader的readLine读取数据的时候一定要记得在对应的输出流里面一定要写入换行符(
                 * 流结束之后会自动标记为结束,readLine可以识别),写入换行符之后一定记得如果输出流不是马上关闭的情况下记得flush一下,
                 * 这样数据才会真正的从缓冲区里面写入。
                 */
                BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
                StringBuilder sb = new StringBuilder();
                String temp = "";
                while ((temp = br.readLine()) != null) {
                    sb.append(temp.substring(1, temp.length() - 1).replaceAll("‘", ""));
                    break;
                }

                String[] arr = sb.toString().split(Constants.COLON_SIGN);

                redisService.setForTimeMS(Constants.CACHE_KEY_RESULT + arr[0].trim(), arr[1].trim(), Constants.TIME_KEY_RESULT);

                logger.info("数据接受 --- Form Cliect[port:" + socket.getPort() + "] 消息内容:" + sb.toString());

            } catch (Exception e) {
                logger.error("work Recive Result Thread abnormal!", e);
            }
        }
    }

    /**
     * 异步任务实现类:向算法发送任务 by socket
     */
    @Deprecated
    @Override
//    @Async("asyncServiceExecutor")
    public void workSendTaskThread_skt(Socket socket) {

        String faceJson = "";
        while (true) {
            try {
                // 获取任务
                faceJson = (String) redisService.rightPop(Constants.QUEUE_TASK);
                if (StringUtils.isBlank(faceJson)) {
                    continue;
                }

//                UserFaceBean userFaceBean = JsonUtil.json2Obj(faceJson, UserFaceBean.class);

                DataOutputStream outputStream = null;
                outputStream = new DataOutputStream(socket.getOutputStream());
                byte[] jsonByte = faceJson.getBytes();

                logger.info("发送的数据长度为:" + jsonByte.length);
                Map<String, Integer> sizeMap = new HashMap<String, Integer>();
                int fileSize = jsonByte.length;
                sizeMap.put("size", fileSize);

                // 告诉服务器要发送文件的大小
                outputStream.write(JsonUtil.getJsonString(sizeMap).getBytes());

                int partCount = fileSize / PART_SIZE;
                int rest = fileSize % PART_SIZE;

                // 每次发送35K大小的数据
                for (int index = 0; index < partCount; index++) {
                    int beginIndex = index * PART_SIZE;
                    int endIndex = (index + 1) * PART_SIZE;
                    String temp = faceJson.substring(beginIndex, endIndex);
                    outputStream.write(temp.getBytes());
                }
                // 发送剩余的数据
                if (rest != 0) {
                    int beginIndex = partCount * PART_SIZE;
                    int endIndex = partCount * PART_SIZE + rest;
                    String temp = faceJson.substring(beginIndex, endIndex);
                    outputStream.write(temp.getBytes());
                }

                outputStream.flush();
                Thread.sleep(10);
                logger.info("传输数据完毕");
            } catch (Exception e) {
                redisService.leftPush(Constants.QUEUE_TASK, faceJson);
                logger.error("work Send Task Thread error!", e);
            }
        }
    }

    /**
     * 异步任务实现类:接受算法产生的图片特征码 by socket
     */
    @Deprecated
    @Override
//    @Async("asyncServiceExecutor")
    public void workReciveResultThread_skt(Socket socket) {
        while (true) {
            try {

                /**
                 * 在从Socket的InputStream中接收数据时,像上面那样一点点的读就太复杂了, 有时候我们就会换成使用BufferedReader来一次读一行
                 *
                 * BufferedReader的readLine方法是一次读一行的,这个方法是阻塞的,直到它读到了一行数据为止程序才会继续往下执行,
                 * 那么readLine什么时候才会读到一行呢?直到程序遇到了换行符或者是对应流的结束符readLine方法才会认为读到了一行,
                 * 才会结束其阻塞,让程序继续往下执行。
                 * 所以我们在使用BufferedReader的readLine读取数据的时候一定要记得在对应的输出流里面一定要写入换行符(
                 * 流结束之后会自动标记为结束,readLine可以识别),写入换行符之后一定记得如果输出流不是马上关闭的情况下记得flush一下,
                 * 这样数据才会真正的从缓冲区里面写入。
                 */
                BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"));
                StringBuilder sb = new StringBuilder();
                String temp = "";
                while ((temp = br.readLine()) != null) {
                    sb.append(temp.substring(1, temp.length() - 1).replaceAll("‘", ""));
                    break;
                }

                String[] arr = sb.toString().split(Constants.COLON_SIGN);

                redisService.setForTimeMS(Constants.CACHE_KEY_RESULT + arr[0].trim(), arr[1].trim(), Constants.TIME_KEY_RESULT);

                logger.info("数据接受 --- Form Cliect[port:" + socket.getPort() + "] 消息内容:" + sb.toString());

            } catch (Exception e) {
                logger.error("work Recive Result Thread abnormal!", e);
            }
        }
    }

    /**
     * 异步任务实现类: 将硬件状态数据存入数据库
     */
    @Override
//    @Async("asyncServiceExecutor")
    public void execHardwareStatusDataToDBAsync() {
        while (true) {
            String json = null;
            // 加锁
            // lock.lock();
            try {
                try {
//                    json = (String) redisService.rightPop(REDIS_HARDWARE_RUNNING_DATA);
                    if (!waitForData(json)) {
                        continue;
                    }
                } catch (Exception e) {
                }

                try {
//                    TODO
//                    hdwareStatusService.gatherHardwareStatusInfo(json);
                } catch (Exception e) {
                    if (!(e instanceof AppException)) {
                        LogUtil.error("", e);
//                        redisService.leftPush(REDIS_HARDWARE_RUNNING_DATA, json);
                    }
                    LogUtil.error("hardware status data insert db abnormal:" + json, e);
                }
            } catch (Exception e) {
                LogUtil.error(json, e);
            } finally {
                // 解锁
                // lock.unlock();
            }
        }
    }

    /**
     * 异步任务实现类:更新用户人脸特征库
     */
    @Override
//    @Async("asyncServiceExecutor")
    public void updateUserFaceCodeListThread() {

        while (true) {
            String userFaceJson = null;
            // 加锁
            // lock.lock();
            try {
                try {
                    userFaceJson = (String) redisService.rightPop(Constants.CACHE_KEY_UPDATE_FACE_CODE);
                    if (!waitForData(userFaceJson)) {
                        continue;
                    }
                } catch (Exception e) {
                    logger.error("redis exception", e);
                }

                try {
//                    TODO
//                    hdwareStatusService.gatherHardwareStatusInfo(json);
                    updateUserFaceCode(userFaceJson);
                } catch (Exception e) {
                    if (!(e instanceof AppException)) {
                        logger.error("", e);
                        redisService.leftPush(Constants.CACHE_KEY_UPDATE_FACE_CODE, userFaceJson);
                    }
                    LogUtil.error("hardware status data insert db abnormal:" + userFaceJson, e);
                }
            } catch (Exception e) {
                logger.error("system exception", e);
            } finally {
                // 解锁
                // lock.unlock();
            }
        }

    }

    /**
     * 消息推送
     *
     * @param platform
     * @param pushKey
     * @param content
     * @throws Exception
     * @return
     */
    @Override
//     @Async("asyncServiceExecutor")
    @Async
    public void pushMsg(String platform, String pushKey, String content) throws Exception {
        // 消息推送
        pushService.pushMsg(platform, pushKey, content);
    }

    /**
     * 将月度登录失败次数加一
     */
    @Override
//    @Async("asyncServiceExecutor")
    @Async
    public void addOneMonthFailedNum(String userAccount) throws Exception {
        // 更新月度登录失败次数
        commonService.checkSQLReturnCode(userMapper.addOneMonthFailedNum(userAccount));

        // 更新用户月度登录失败次数
        T_user param = new T_user();
        param.setUserAccount(userAccount);
        userCache.getUserInfo(param, true);
    }

    /**
     * 更新用户特征码
     *
     * @param userFaceJson
     * @throws Exception
     */
    private void updateUserFaceCode(String userFaceJson) throws Exception {
        UserFaceBean faceBean = JsonUtil.json2Obj(userFaceJson, UserFaceBean.class);
        commonService.saveUserFaceCode(null, faceBean.getUserToken(), faceBean.getFeatureCodes());
    }

    /**
     * 数据为空时, 返回false;否则,返回true
     *
     * @param json
     * @return
     */
    private boolean waitForData(String json) {
        if (json == null) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                LogUtil.error(json, e);
            }
            return false;
        } else {
            return true;
        }
    }
}

 

以上是关于springboot的异步调用的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot异步调用方法遇到的问题

SpringBoot异步调用方法遇到的问题

SpringBoot系列——@Async优雅的异步调用

#yyds干货盘点# springboot使用@Async实现异步调用

Spring Boot 中的异步调用

SpringBoot @Async实现异步调用