springboot的异步调用
Posted 不死码农
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了springboot的异步调用相关的知识,希望对你有一定的参考价值。
package com.handsight.platform.fras.aapp; import java.util.Locale; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.i18n.LocaleContextHolder; import org.springframework.stereotype.Component; import com.handsight.platform.common.util.LogUtil; import com.handsight.platform.fras.data.StaticObject; import com.handsight.platform.fras.thread.service.AsyncService; @Component public class StartupListener implements ApplicationListener<ContextRefreshedEvent> { private final static Logger logger = LoggerFactory.getLogger(StartupListener.class); @Autowired private AsyncService asyncService; public StartupListener() { }; @Override public void onApplicationEvent(ContextRefreshedEvent event) { // 地域语言 setSystemDefaultLanguage(); logger.info("---------------- Start execute Async thread completed."); // 删除用户的相关访问信息线程 asyncService.execDeleteLogOutUserCacheInfo(); // 启动图片批量发送线程 asyncService.sendImageBatch(); logger.info("---------------- End execute Async thread completed."); } /** * 设置系统默认语言 */ private void setSystemDefaultLanguage() { Locale locale= LocaleContextHolder.getLocale(); locale = Locale.CHINA; // if(!Constants.SYSTEM_DEFAULT_LANGUAGE.equals(locale.getLanguage()) ) { // locale = Locale.US; // } LocaleContextHolder.setLocale(locale); StaticObject.locale =locale; LogUtil.info("This language is {0}", StaticObject.locale.getLanguage()); } }
使用:
package com.handsight.platform.fras.thread.service; import java.net.Socket; import java.util.List; import com.handsight.platform.common.exception.AppException; import com.handsight.platform.fras.mgt.pojo.T_user; import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq; /** * 异步线程的接口 * * @author wangh * */ public interface AsyncService { /** * 批量发送图片 * * @param userToken * @param facePhotoString * @throws AppException * @throws Exception */ public void sendImageBatch(); /** * 异步任务实现类: 用户退出后删除其可变用户令牌与账号的缓存信息;以及session信息 */ void execDeleteLogOutUserCacheInfo(); /** * 异步任务实现类: 将硬件状态数据存入数据库 */ void execHardwareStatusDataToDBAsync(); /** * 异步任务实现类:向算法发送任务 by http */ public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception; /** * 异步任务实现类:向算法发送任务 by http * * @throws Exception */ public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception; /** * 异步任务实现类:保存用户令牌及 用户相关信息 * * @param user * @throws Exception */ public void saveUserInfo(T_user user) throws Exception; /** * 存储用户登录地点信息 * * @param user * @return */ public void saveLoginLocationInfo(T_user user); /** * 更新用户信息 * * @param user * @return */ public void updateUserInfo(T_user user) throws Exception; /*** * 存储用户人脸信息及特征值 * * @param user * @param userToken * @param currentFaceCode * @throws Exception */ public void saveUserFaceCode(T_user user, String userToken, String currentFaceCode) throws Exception; /** * 异步任务实现类:接受算法产生的图片特征码 by http */ public void workReciveResultThread(Socket socket); /** * 异步任务实现类:更新用户人脸特征库 */ public void updateUserFaceCodeListThread(); /** * * 消息推送 * * @param platform * @param pushKey * @param content * @throws Exception */ public void pushMsg(String platform, String pushKey, String content) throws Exception; /** * 将月度登录失败次数加一 */ public void addOneMonthFailedNum(String userAccount) throws Exception; /** * 异步任务实现类:向算法发送任务 */ @Deprecated public void workSendTaskThread_skt(Socket socket); /** * 异步任务实现类:接受算法产生的图片特征码 socket */ @Deprecated public void workReciveResultThread_skt(Socket socket); }
实现:
package com.handsight.platform.fras.thread.service.impl; import com.handsight.platform.common.exception.AppException; import com.handsight.platform.common.util.HttpRequestUtil; import com.handsight.platform.common.util.JsonUtil; import com.handsight.platform.common.util.LogUtil; import com.handsight.platform.common.util.UuidUtil; import com.handsight.platform.fras.cache.UserCache; import com.handsight.platform.fras.constant.Constants; import com.handsight.platform.fras.constant.ErrorCodeMsg; import com.handsight.platform.fras.data.StaticObject; import com.handsight.platform.fras.mapper.UserMapper; import com.handsight.platform.fras.mgt.pojo.T_user; import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmReq; import com.handsight.platform.fras.mgt.pojo.TransferAlgorithmRes; import com.handsight.platform.fras.mgt.pojo.UserFaceBean; import com.handsight.platform.fras.pojo.MessageBean; import com.handsight.platform.fras.service.CommonService; import com.handsight.platform.fras.service.PushService; import com.handsight.platform.fras.service.RedisService; import com.handsight.platform.fras.thread.service.AsyncService; import com.handsight.platform.fras.util.BeanUtil; import com.handsight.platform.fras.util.EnumUtil; import org.apache.commons.lang3.StringUtils; import org.apache.http.NameValuePair; import org.apache.http.message.BasicNameValuePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.Socket; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import static com.handsight.platform.common.constant.Constants.HTTP_RES_CODE; import static com.handsight.platform.common.constant.Constants.HTTP_RES_CONTENT; import static com.handsight.platform.fras.constant.Constants.QUEUE_KEY_LOGOUT_USER_TOKEN; @Service public class AsyncServiceImpl implements AsyncService { private final static Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class); /** 定义一个每次要数据的大小(35K) */ public final static int PART_SIZE = 35 * 1024; static boolean sendTaskThreadStart = false; @Autowired private RedisService redisService; @Autowired private PushService pushService; @Autowired private CommonService commonService; @Autowired private UserMapper userMapper; @Autowired private UserCache userCache; @Value("${fras.server.ip}") private String serverIp; @Value("${fras.send.work.port}") private String sendWorkPort; @Value("${fras.send.batch.list.size}") private int batchListSize; @Value("${fras.send.batch.interval.ms.times}") private int sendIntervalTimes; @Override @Async // ("frasAsyncServiceExecutor") public void sendImageBatch() { long start = 0L; List<TransferAlgorithmReq> transferAlgorithmBeanList = null; while (true) { try { int cnt =0; // 每指定个数发送一次 while (!StaticObject.imageQueue.isEmpty() ) { if (StaticObject.transferAlgorithmBeanList.size() < batchListSize) { redisService.hmSet("a---batchListSize", UuidUtil.uuid(), StaticObject.transferAlgorithmBeanList.size() ); // TODO StaticObject.transferAlgorithmBeanList.add(StaticObject.imageQueue.take()); redisService.hmSet("cnt", UuidUtil.uuid(), ++cnt); start = System.currentTimeMillis(); } else { System.out.println("输出集合"+StaticObject.transferAlgorithmBeanList.size()); transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList); dealWithTask(transferAlgorithmBeanList);// 处理请求 } } // 不足指定个数每指定秒数发送一次 if (StaticObject.transferAlgorithmBeanList.size() > 0 && (((System.currentTimeMillis() - start)) >= sendIntervalTimes)) { transferAlgorithmBeanList = BeanUtil.deepCopy(StaticObject.transferAlgorithmBeanList); dealWithTask(transferAlgorithmBeanList);// 处理请求 } else { Thread.sleep(10); } } catch (Exception e) { logger.error("提取特征码异常!", e); } finally { } } } /** * 处理任务队列 * 当已发送的匹数大于3个时将延迟发送1秒 * * @param transferAlgorithmBeanList */ private void dealWithTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) { try { redisService.hmSet("a---executer", UuidUtil.uuid(), transferAlgorithmBeanList.size()); long num = redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, 1L); if( num > 3L) { Thread.sleep(1000); } // 批量发送图片 sendImageBatchTask(transferAlgorithmBeanList); // new Thread(new DealQueueThread(transferAlgorithmBeanList)).start(); StaticObject.transferAlgorithmBeanList.clear(); } catch (Exception e) { logger.error("特征码提取处理异常", e); } finally { } } /** * 批量发送图片 * * @param transferAlgorithmBeanList * @throws AppException * @throws Exception */ private void sendImageBatchTask(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception { long start = System.currentTimeMillis(); String dataRequest = ""; try { // 以表单形式向所有算法服务器发送指令 a dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList); List<NameValuePair> params = new ArrayList<NameValuePair>(); NameValuePair pair = new BasicNameValuePair("data", dataRequest); params.add(pair); String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/"; logger.info("传输数据完毕, 耗时:" + ((System.currentTimeMillis() - start) ) + "ms, num:" + transferAlgorithmBeanList.size() + " size: " + dataRequest.length() / 1024); // 处理结果 dealResponseResult(HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 1, params)); logger.info("获取结果耗时:" + (System.currentTimeMillis() - start)); } catch (AppException ape) { ErrorCodeMsg errorCodeMsg = EnumUtil.getByCode(ape.getCode(), ErrorCodeMsg.class); setExceptionForUser(errorCodeMsg, transferAlgorithmBeanList); } catch (Exception e) { setExceptionForUser(ErrorCodeMsg.AI_SERVER_ABNORMAL, transferAlgorithmBeanList); } finally { redisService.increment(Constants.CACHE_KEY_ALGORITHM_TASK_REQ_NUM, -1L); } } /** * 处理http的响应结果 * * @param resultMap */ private void dealResponseResult(Map<String, Object> resultMap) throws Exception { if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) { String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg()); throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg); } else { String code = (String) resultMap.get(HTTP_RES_CODE); String content = (String) resultMap.get(HTTP_RES_CONTENT); if (!Constants.HTTP_STATUS_CODE_200.equals(code)) { logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content); String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg()); throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg); } else { List<TransferAlgorithmRes> resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class); for (TransferAlgorithmRes res : resultList) { redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10); } } } } /** * * 为每个用户设置异常时的信息 * * @param erroCodeMsg * @param transferAlgorithmBeanList */ private void setExceptionForUser(ErrorCodeMsg erroCodeMsg, List<TransferAlgorithmReq> transferAlgorithmBeanList) { int msgCode = erroCodeMsg.getCode(); String msg = commonService.getMessage(erroCodeMsg.getMsg()); for (TransferAlgorithmReq req : transferAlgorithmBeanList) { redisService.setForTimeMIN(Constants.CACHE_KEY_EXCEPTION_ALGORITHM + req.getId(), new MessageBean(msgCode, msg), 10); } } /** * 异步任务实现类:向算法发送任务 by http * * @throws Exception */ @Override // @Async public void sendImageTaskThread(List<TransferAlgorithmReq> transferAlgorithmBeanList) throws AppException, Exception { long start = System.currentTimeMillis(); String dataRequest = ""; try { for (TransferAlgorithmReq req : transferAlgorithmBeanList) { System.out.println("http-------------------------------" + req.getId()); } List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>(); dataRequest = JsonUtil.getJsonString(transferAlgorithmBeanList); // 以表单形式向所有算法服务器发送指令 List<NameValuePair> params = new ArrayList<NameValuePair>(); // NameValuePair pair = new BasicNameValuePair("id", "1"); // params.add(pair); NameValuePair pair = new BasicNameValuePair("data", dataRequest); params.add(pair); String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_BATCH_METHOD + "/"; logger.info("传输数据完毕, 2222耗时:" + ((System.currentTimeMillis() - start) / 1000) + " num:" + transferAlgorithmBeanList.size() + " size: " + dataRequest.length() / 1024); Map<String, Object> resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params); if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) { String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg()); throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg); } else { String code = (String) resultMap.get(HTTP_RES_CODE); String content = (String) resultMap.get(HTTP_RES_CONTENT); if (!Constants.HTTP_STATUS_CODE_200.equals(code)) { logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content); String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg()); throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg); } else { resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class); for (TransferAlgorithmRes res : resultList) { redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10); } } } logger.info("获取结果333耗时:" + (System.currentTimeMillis() - start) / 1000); } catch (Exception e) { throw e; } finally { } } /** * 异步任务实现类:向算法发送任务 by http * * @throws Exception */ @Override // @Async("asyncServiceExecutor")、 @Async public void workSendTaskThread(String userToken, String facePhotoString) throws AppException, Exception { Map<String, Object> resultMap = null; long start = 0; try { if (!sendTaskThreadStart) { sendTaskThreadStart = true; start = System.currentTimeMillis(); while (true) { if ((System.currentTimeMillis() - start) / 1000 > 5) { break; } else { // TransferAlgorithmReq transferAlgorithmBean = (TransferAlgorithmReq) redisService.rightPop(Constants.QUEUE_TASK); TransferAlgorithmReq transferAlgorithmBean = StaticObject.imageQueue.poll(); if (StaticObject.transferAlgorithmBeanList != null && StaticObject.transferAlgorithmBeanList.size() <= 4) { if (transferAlgorithmBean != null) { StaticObject.transferAlgorithmBeanList.add(transferAlgorithmBean); logger.info("bean:" + transferAlgorithmBean.getId()); } } else { redisService.leftPush(Constants.QUEUE_TASK, transferAlgorithmBean); break; } } } } else { return; } List<TransferAlgorithmRes> resultList = new ArrayList<TransferAlgorithmRes>(); // 以表单形式向所有算法服务器发送指令 List<NameValuePair> params = new ArrayList<NameValuePair>(); NameValuePair pair = new BasicNameValuePair("id", userToken); params.add(pair); pair = new BasicNameValuePair("img", facePhotoString); // JsonUtil.getJsonString(StaticObject.transferAlgorithmBeanList) TODO params.add(pair); String httpUrlServer = Constants.HTTP_PREFIX + serverIp + Constants.COLON_SIGN + sendWorkPort + "/" + Constants.SEND_WORK_METHOD + "/"; resultMap = HttpRequestUtil.sendPostWithParamsForString(httpUrlServer, "", 2, params); if (resultMap.containsKey(Constants.EXCEPTION_ALGORITHM)) { String msg = commonService.getMessage(ErrorCodeMsg.AI_SERVER_ABNORMAL.getMsg()); throw new AppException(ErrorCodeMsg.AI_SERVER_ABNORMAL.getCode(), msg); } else { String code = (String) resultMap.get(HTTP_RES_CODE); String content = (String) resultMap.get(HTTP_RES_CONTENT); TransferAlgorithmRes sd = new TransferAlgorithmRes(userToken, 1002, "001,191,101"); // TODO TransferAlgorithmRes bean = JsonUtil.json2Obj(JsonUtil.getJsonString(sd), TransferAlgorithmRes.class); // TODO List<TransferAlgorithmRes> lst2 = new ArrayList<TransferAlgorithmRes>(); // TODO lst2.add(bean);// TODO content = JsonUtil.getJsonString(lst2);// TODO if (!Constants.HTTP_STATUS_CODE_200.equals(code)) { logger.error("提取图片的特征码出现异常:code: " + code + " msg: " + content); String msg = commonService.getMessage(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getMsg()); throw new AppException(ErrorCodeMsg.LOGIN_FAILED_TRY_AGAIN.getCode(), msg); } else { resultList = JsonUtil.readJson2Array(content, TransferAlgorithmRes.class); for (TransferAlgorithmRes res : resultList) { redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + res.getId(), res, 10); } // redisService.setForTimeMIN(Constants.FRAS_CACHE_KEY_ALGORITHM_RESULT + userToken, content, 10); } } logger.info("传输数据完毕,耗时:" + (System.currentTimeMillis() - start + " size:" + facePhotoString.length() / 1024)); } catch (Exception e) { throw e; } finally { StaticObject.transferAlgorithmBeanList.clear(); sendTaskThreadStart = false; } // return resultMap; } /** * 异步任务实现类:保存用户令牌及 用户相关信息 * * @param user * @throws Exception */ @Override @Async @Transactional(rollbackFor = Exception.class) public void saveUserInfo(T_user user) throws Exception { int cnt = 0; while (true) { try { try { cnt++; // 保存用户信息 commonService.saveUserInfo(user); break; } catch (Exception e) { if (cnt > 20) { logger.error("数据保存失败,开始重试次数:" + cnt, e); throw e; } else { Thread.sleep(5000); } logger.error("数据保存失败,开始重试次数:" + cnt, e); } } catch (Exception e) { logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e); break; } finally { } } } /** * 存储用户登录地点信息 * * @param user * @return */ @Override @Async public void saveLoginLocationInfo(T_user user) { int cnt = 0; while (true) { try { try { cnt++; commonService.checkSQLReturnCode(userMapper.saveLoginLocationInfo(user)); break; } catch (Exception e) { if (cnt > 20) { logger.error("数据保存失败,开始重试次数:" + cnt, e); throw e; } else { Thread.sleep(5000); } logger.error("数据保存失败,开始重试次数:" + cnt, e); } } catch (Exception e) { logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e); break; } finally { } } } /*** * 存储用户人脸信息及特征值 * * @param user * @param userToken * @param currentFaceCode * @throws Exception */ @Override public void saveUserFaceCode(T_user user, String userAccount, String currentFaceCode) throws Exception { int cnt = 0; while (true) { try { try { cnt++; commonService.saveUserFaceCode(user, userAccount, currentFaceCode); break; } catch (Exception e) { if (cnt > 20) { logger.error("数据保存失败,开始重试次数:" + cnt, e); throw e; } else { Thread.sleep(5000); } logger.error("数据保存失败,开始重试次数:" + cnt, e); } } catch (Exception e) { logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e); break; } finally { } } } /** * 更新用户信息 * * @param user * @return */ @Override public void updateUserInfo(T_user user) throws Exception { int cnt = 0; while (true) { try { try { cnt++; userMapper.updateUserInfo(user); // 更新用户信息 userCache.getUserInfo(user, true); break; } catch (Exception e) { if (cnt > 20) { logger.error("数据保存失败,开始重试次数:" + cnt, e); throw e; } else { Thread.sleep(5000); } logger.error("数据保存失败,开始重试次数:" + cnt, e); } } catch (Exception e) { logger.error("数据库连接发生异常,未保存数据 " + user.toString(), e); break; } finally { } } } /** * 异步任务实现类: 用户退出后删除其可变用户令牌与账号的缓存信息;以及session信息 */ @Override @Async public void execDeleteLogOutUserCacheInfo() { while (true) { String userTokenJson = null; // 加锁 // lock.lock(); try { try { userTokenJson = (String) redisService.rightPop(QUEUE_KEY_LOGOUT_USER_TOKEN); if (!waitForData(userTokenJson)) { continue; } } catch (Exception e) { } try { // session String sessionKey = Constants.SPRING_SESSION_NAME_NAMESPCE + userTokenJson; if (redisService.hasKey(sessionKey)) { redisService.delete(sessionKey); } } catch (Exception e) { logger.error("redis 连接异常", e); redisService.leftPush(QUEUE_KEY_LOGOUT_USER_TOKEN, userTokenJson); throw e; } } catch (Exception e) { logger.error("删除已退出的用户session信息失败,用户令牌:" + userTokenJson, e); } finally { // 解锁 // lock.unlock(); } } } /** * 异步任务实现类:接受算法产生的图片特征码 by http */ @Override // @Async("asyncServiceExecutor") public void workReciveResultThread(Socket socket) { while (true) { try { /** * 在从Socket的InputStream中接收数据时,像上面那样一点点的读就太复杂了, 有时候我们就会换成使用BufferedReader来一次读一行 * * BufferedReader的readLine方法是一次读一行的,这个方法是阻塞的,直到它读到了一行数据为止程序才会继续往下执行, * 那么readLine什么时候才会读到一行呢?直到程序遇到了换行符或者是对应流的结束符readLine方法才会认为读到了一行, * 才会结束其阻塞,让程序继续往下执行。 * 所以我们在使用BufferedReader的readLine读取数据的时候一定要记得在对应的输出流里面一定要写入换行符( * 流结束之后会自动标记为结束,readLine可以识别),写入换行符之后一定记得如果输出流不是马上关闭的情况下记得flush一下, * 这样数据才会真正的从缓冲区里面写入。 */ BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")); StringBuilder sb = new StringBuilder(); String temp = ""; while ((temp = br.readLine()) != null) { sb.append(temp.substring(1, temp.length() - 1).replaceAll("‘", "")); break; } String[] arr = sb.toString().split(Constants.COLON_SIGN); redisService.setForTimeMS(Constants.CACHE_KEY_RESULT + arr[0].trim(), arr[1].trim(), Constants.TIME_KEY_RESULT); logger.info("数据接受 --- Form Cliect[port:" + socket.getPort() + "] 消息内容:" + sb.toString()); } catch (Exception e) { logger.error("work Recive Result Thread abnormal!", e); } } } /** * 异步任务实现类:向算法发送任务 by socket */ @Deprecated @Override // @Async("asyncServiceExecutor") public void workSendTaskThread_skt(Socket socket) { String faceJson = ""; while (true) { try { // 获取任务 faceJson = (String) redisService.rightPop(Constants.QUEUE_TASK); if (StringUtils.isBlank(faceJson)) { continue; } // UserFaceBean userFaceBean = JsonUtil.json2Obj(faceJson, UserFaceBean.class); DataOutputStream outputStream = null; outputStream = new DataOutputStream(socket.getOutputStream()); byte[] jsonByte = faceJson.getBytes(); logger.info("发送的数据长度为:" + jsonByte.length); Map<String, Integer> sizeMap = new HashMap<String, Integer>(); int fileSize = jsonByte.length; sizeMap.put("size", fileSize); // 告诉服务器要发送文件的大小 outputStream.write(JsonUtil.getJsonString(sizeMap).getBytes()); int partCount = fileSize / PART_SIZE; int rest = fileSize % PART_SIZE; // 每次发送35K大小的数据 for (int index = 0; index < partCount; index++) { int beginIndex = index * PART_SIZE; int endIndex = (index + 1) * PART_SIZE; String temp = faceJson.substring(beginIndex, endIndex); outputStream.write(temp.getBytes()); } // 发送剩余的数据 if (rest != 0) { int beginIndex = partCount * PART_SIZE; int endIndex = partCount * PART_SIZE + rest; String temp = faceJson.substring(beginIndex, endIndex); outputStream.write(temp.getBytes()); } outputStream.flush(); Thread.sleep(10); logger.info("传输数据完毕"); } catch (Exception e) { redisService.leftPush(Constants.QUEUE_TASK, faceJson); logger.error("work Send Task Thread error!", e); } } } /** * 异步任务实现类:接受算法产生的图片特征码 by socket */ @Deprecated @Override // @Async("asyncServiceExecutor") public void workReciveResultThread_skt(Socket socket) { while (true) { try { /** * 在从Socket的InputStream中接收数据时,像上面那样一点点的读就太复杂了, 有时候我们就会换成使用BufferedReader来一次读一行 * * BufferedReader的readLine方法是一次读一行的,这个方法是阻塞的,直到它读到了一行数据为止程序才会继续往下执行, * 那么readLine什么时候才会读到一行呢?直到程序遇到了换行符或者是对应流的结束符readLine方法才会认为读到了一行, * 才会结束其阻塞,让程序继续往下执行。 * 所以我们在使用BufferedReader的readLine读取数据的时候一定要记得在对应的输出流里面一定要写入换行符( * 流结束之后会自动标记为结束,readLine可以识别),写入换行符之后一定记得如果输出流不是马上关闭的情况下记得flush一下, * 这样数据才会真正的从缓冲区里面写入。 */ BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8")); StringBuilder sb = new StringBuilder(); String temp = ""; while ((temp = br.readLine()) != null) { sb.append(temp.substring(1, temp.length() - 1).replaceAll("‘", "")); break; } String[] arr = sb.toString().split(Constants.COLON_SIGN); redisService.setForTimeMS(Constants.CACHE_KEY_RESULT + arr[0].trim(), arr[1].trim(), Constants.TIME_KEY_RESULT); logger.info("数据接受 --- Form Cliect[port:" + socket.getPort() + "] 消息内容:" + sb.toString()); } catch (Exception e) { logger.error("work Recive Result Thread abnormal!", e); } } } /** * 异步任务实现类: 将硬件状态数据存入数据库 */ @Override // @Async("asyncServiceExecutor") public void execHardwareStatusDataToDBAsync() { while (true) { String json = null; // 加锁 // lock.lock(); try { try { // json = (String) redisService.rightPop(REDIS_HARDWARE_RUNNING_DATA); if (!waitForData(json)) { continue; } } catch (Exception e) { } try { // TODO // hdwareStatusService.gatherHardwareStatusInfo(json); } catch (Exception e) { if (!(e instanceof AppException)) { LogUtil.error("", e); // redisService.leftPush(REDIS_HARDWARE_RUNNING_DATA, json); } LogUtil.error("hardware status data insert db abnormal:" + json, e); } } catch (Exception e) { LogUtil.error(json, e); } finally { // 解锁 // lock.unlock(); } } } /** * 异步任务实现类:更新用户人脸特征库 */ @Override // @Async("asyncServiceExecutor") public void updateUserFaceCodeListThread() { while (true) { String userFaceJson = null; // 加锁 // lock.lock(); try { try { userFaceJson = (String) redisService.rightPop(Constants.CACHE_KEY_UPDATE_FACE_CODE); if (!waitForData(userFaceJson)) { continue; } } catch (Exception e) { logger.error("redis exception", e); } try { // TODO // hdwareStatusService.gatherHardwareStatusInfo(json); updateUserFaceCode(userFaceJson); } catch (Exception e) { if (!(e instanceof AppException)) { logger.error("", e); redisService.leftPush(Constants.CACHE_KEY_UPDATE_FACE_CODE, userFaceJson); } LogUtil.error("hardware status data insert db abnormal:" + userFaceJson, e); } } catch (Exception e) { logger.error("system exception", e); } finally { // 解锁 // lock.unlock(); } } } /** * 消息推送 * * @param platform * @param pushKey * @param content * @throws Exception * @return */ @Override // @Async("asyncServiceExecutor") @Async public void pushMsg(String platform, String pushKey, String content) throws Exception { // 消息推送 pushService.pushMsg(platform, pushKey, content); } /** * 将月度登录失败次数加一 */ @Override // @Async("asyncServiceExecutor") @Async public void addOneMonthFailedNum(String userAccount) throws Exception { // 更新月度登录失败次数 commonService.checkSQLReturnCode(userMapper.addOneMonthFailedNum(userAccount)); // 更新用户月度登录失败次数 T_user param = new T_user(); param.setUserAccount(userAccount); userCache.getUserInfo(param, true); } /** * 更新用户特征码 * * @param userFaceJson * @throws Exception */ private void updateUserFaceCode(String userFaceJson) throws Exception { UserFaceBean faceBean = JsonUtil.json2Obj(userFaceJson, UserFaceBean.class); commonService.saveUserFaceCode(null, faceBean.getUserToken(), faceBean.getFeatureCodes()); } /** * 数据为空时, 返回false;否则,返回true * * @param json * @return */ private boolean waitForData(String json) { if (json == null) { try { Thread.sleep(1000); } catch (InterruptedException e) { LogUtil.error(json, e); } return false; } else { return true; } } }
以上是关于springboot的异步调用的主要内容,如果未能解决你的问题,请参考以下文章