多线程读写多个文件02
Posted helloworld6379
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程读写多个文件02相关的知识,希望对你有一定的参考价值。
/** * 除1002 1004 1009 1010的其他流程 */ @Component public class AutoTestOtherQueryBL { private final Log logger = LogFactory.getLog(this.getClass()); @Autowired private SendRequestService sendRequestService; public List<AutoMaticTestPojo> dealData(List<File> filesOtherQuery) { int size = filesOtherQuery.size(); Map<String, File> filesOtherOut = new HashMap<>(16); //所有1004返回 if (filesOtherQuery != null && size > 0) { for (File file : filesOtherQuery) { if (file.getName().contains("Out")) { int i = file.getName().indexOf("-"); String serialNo = file.getName().substring(0, i); filesOtherOut.put(serialNo, file); } } } logger.info(" 其他查询准备落库数量 ::: " + ( size - filesOtherOut.size()) ); //int queryNumber = size - filesOtherOut.size(); List<AutoMaticTestPojo> automaticTestPojoList = new ArrayList<>(); ThreadPoolExecutor threadPoolExecutor = null; if (filesOtherQuery != null && size > 0) { if(size <= 30){ BlockingQueue blockingQueue = new ArrayBlockingQueue<>(16); threadPoolExecutor = new ThreadPoolExecutor(size, (size + 3), 15, TimeUnit.MINUTES, blockingQueue); OtherQueryThread otherQueryThread = new OtherQueryThread(filesOtherOut, filesOtherQuery, automaticTestPojoList,sendRequestService,0, size); threadPoolExecutor.execute(otherQueryThread); }else{ BlockingQueue blockingQueue = new ArrayBlockingQueue<>(16); threadPoolExecutor = new ThreadPoolExecutor(30, (30 + 3), (size/30), TimeUnit.MINUTES, blockingQueue); //取余,把余数给最后一个线程 int m = size%30; //每个线程分配多少个任务 int s = (size-m)/30; for (int i = 0 ; i < 29; i++){ OtherQueryThread otherQueryThread = new OtherQueryThread(filesOtherOut, filesOtherQuery, automaticTestPojoList,sendRequestService,s*i, s*(i+1)); threadPoolExecutor.execute(otherQueryThread); } //创建第30个线程 OtherQueryThread otherQueryThread = new OtherQueryThread(filesOtherOut, filesOtherQuery, automaticTestPojoList,sendRequestService,s*29, s*30 + m); threadPoolExecutor.execute(otherQueryThread);; } threadPoolExecutor.shutdown();//不会触发中断 boolean flag = true; while (flag) { if (threadPoolExecutor.isTerminated()) { flag = false; logger.info(" 其他查询落库数量 ::: " + automaticTestPojoList.size()); return automaticTestPojoList; } } } return automaticTestPojoList; } }
-----------------------------------------------------------------------------------------------------------------
技术交流群:816227112
public class OtherQueryThread implements Runnable { private final Log logger = LogFactory.getLog(this.getClass()); private Map<String, File> filesOtherOut; private List<File> filesOtherQuery; private List<AutoMaticTestPojo> automaticTestPojoList; private SendRequestService sendRequestService; private int start; private int end; public OtherQueryThread(Map<String, File> filesOtherOut, List<File> filesOtherQuery, List<AutoMaticTestPojo> automaticTestPojoList, SendRequestService sendRequestService, int start, int end) { this.filesOtherOut = filesOtherOut; this.filesOtherQuery = filesOtherQuery; this.automaticTestPojoList = automaticTestPojoList; this.sendRequestService = sendRequestService; this.start = start; this.end = end; } @Override public void run() { for (int i = this.start; i < this.end; i++) { SAXReader saxReader = new SAXReader(); Document docOtherReq = null; AutoMaticTestPojo automaticTestPojo = new AutoMaticTestPojo(); File file = filesOtherQuery.get(i); if (file != null && file.getName().contains("In")) { try { docOtherReq = saxReader.read(file); } catch (DocumentException e) { e.printStackTrace(); } Element rootOtherReq = docOtherReq.getRootElement(); String serialNo = rootOtherReq.element("Header").element("SerialNo").getTextTrim();//其他老核心请求流水 //String policyNo = rootOtherReq.element("App").element("Req").element("PolicyNo").getTextTrim();//其他老核心对应1004保单号 String flag = rootOtherReq.element("Header").element("TransCode").getTextTrim(); automaticTestPojo.setTrialTransNo(serialNo); //automaticTestPojo.setTrialPolicyNo( policyNo ); automaticTestPojo.setTemp3(flag); automaticTestPojo.setSystem("04"); automaticTestPojo.setMakeDate(PubFun.getCurrentDate()); automaticTestPojo.setMakeTime(PubFun.getCurrentTime()); // 读取对应的返回报文 if (StringUtils.isNotEmpty(serialNo)) { //查今天对应的报文 File fileout = this.filesOtherOut.get(serialNo); if (fileout != null && fileout.exists() && fileout.isFile()) { Document docOtherReturn = null; try { docOtherReturn = saxReader.read(fileout); } catch (DocumentException e) { e.printStackTrace(); } Element root1004Return = docOtherReturn.getRootElement(); String insuredmessage = root1004Return.element("Header").element("RetMsg").getTextTrim();//老核心交易信息 automaticTestPojo.setTrialMessage(insuredmessage); } else { automaticTestPojo.setTrialMessage("没有对应的返回报文。"); } } String xml = docOtherReq.asXML(); logger.info("other请求 xml:" + xml); AutoMaticTestPojo returnPojo = send(xml, flag); automaticTestPojo.setInsuredMessage(returnPojo.getInsuredMessage()); automaticTestPojo.setInsuredTransNo(returnPojo.getInsuredTransNo()); //automaticTestPojo.setInsuredPolicyNo(returnPojo.getInsuredPolicyNo()); this.automaticTestPojoList.add(automaticTestPojo); } } } public AutoMaticTestPojo send(String xml, String flag) { AutoMaticTestPojo automaticTestPojo = new AutoMaticTestPojo(); if (!"1002".equals(flag.trim()) && !"1004".equals(flag.trim()) && !"1009".equals(flag.trim()) && !"1010".equals(flag.trim())) { String resultRevoke = ""; logger.info("sendRequestService: " + sendRequestService); resultRevoke = sendRequestService.dealData(xml); logger.info("other返回 xml:" + resultRevoke); if (resultRevoke.contains("<RetMsg>")) { automaticTestPojo.setInsuredMessage(resultRevoke.substring(resultRevoke.indexOf("<RetMsg>") + 8, resultRevoke.indexOf("</RetMsg>"))); } else { automaticTestPojo.setInsuredMessage("返回报文未找到RetMsg"); } if (resultRevoke.contains("<SerialNo>")) { automaticTestPojo.setInsuredTransNo(resultRevoke.substring(resultRevoke.indexOf("<SerialNo>") + 10, resultRevoke.indexOf("</SerialNo>"))); } } return automaticTestPojo; } }
以上是关于多线程读写多个文件02的主要内容,如果未能解决你的问题,请参考以下文章