多线程读写多个文件02

Posted helloworld6379

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程读写多个文件02相关的知识,希望对你有一定的参考价值。

/**
 * 除1002 1004 1009 1010的其他流程
 */
@Component
public class AutoTestOtherQueryBL {

    private final Log logger = LogFactory.getLog(this.getClass());
    @Autowired
    private SendRequestService sendRequestService;


    public List<AutoMaticTestPojo> dealData(List<File> filesOtherQuery) {
        int size = filesOtherQuery.size();
        Map<String, File> filesOtherOut = new HashMap<>(16); //所有1004返回
        if (filesOtherQuery != null && size > 0) {
            for (File file : filesOtherQuery) {
                if (file.getName().contains("Out")) {
                    int i = file.getName().indexOf("-");
                    String serialNo = file.getName().substring(0, i);
                    filesOtherOut.put(serialNo, file);
                }
            }
        }

        logger.info(" 其他查询准备落库数量 ::: " + ( size - filesOtherOut.size()) );
        //int queryNumber = size - filesOtherOut.size();
        List<AutoMaticTestPojo> automaticTestPojoList = new ArrayList<>();
        ThreadPoolExecutor threadPoolExecutor = null;
        if (filesOtherQuery != null && size > 0) {
            if(size <= 30){
                BlockingQueue blockingQueue = new ArrayBlockingQueue<>(16);
                threadPoolExecutor = new ThreadPoolExecutor(size, (size + 3), 15, TimeUnit.MINUTES, blockingQueue);
                OtherQueryThread otherQueryThread = new OtherQueryThread(filesOtherOut, filesOtherQuery, automaticTestPojoList,sendRequestService,0, size);
                threadPoolExecutor.execute(otherQueryThread);
            }else{
                BlockingQueue blockingQueue = new ArrayBlockingQueue<>(16);
                threadPoolExecutor = new ThreadPoolExecutor(30, (30 + 3), (size/30), TimeUnit.MINUTES, blockingQueue);
                //取余,把余数给最后一个线程
                int m = size%30;
                //每个线程分配多少个任务
                int s = (size-m)/30;
                for (int i = 0 ; i < 29; i++){
                    OtherQueryThread otherQueryThread = new OtherQueryThread(filesOtherOut, filesOtherQuery, automaticTestPojoList,sendRequestService,s*i, s*(i+1));
                    threadPoolExecutor.execute(otherQueryThread);
                }
                //创建第30个线程
                OtherQueryThread otherQueryThread = new OtherQueryThread(filesOtherOut, filesOtherQuery, automaticTestPojoList,sendRequestService,s*29, s*30 + m);
                threadPoolExecutor.execute(otherQueryThread);;
            }

            threadPoolExecutor.shutdown();//不会触发中断
            boolean flag = true;
            while (flag) {
                if (threadPoolExecutor.isTerminated()) {
                    flag = false;
                    logger.info(" 其他查询落库数量 ::: " + automaticTestPojoList.size());
                    return automaticTestPojoList;
                }
            }
        }

        return automaticTestPojoList;
    }
}
-----------------------------------------------------------------------------------------------------------------
技术交流群:816227112
public class OtherQueryThread implements Runnable {

    private final Log logger = LogFactory.getLog(this.getClass());
    private Map<String, File> filesOtherOut;
    private List<File> filesOtherQuery;
    private List<AutoMaticTestPojo> automaticTestPojoList;
    private SendRequestService sendRequestService;
    private int start;
    private int end;


    public OtherQueryThread(Map<String, File> filesOtherOut, List<File> filesOtherQuery, List<AutoMaticTestPojo> automaticTestPojoList, SendRequestService sendRequestService, int start, int end) {
        this.filesOtherOut = filesOtherOut;
        this.filesOtherQuery = filesOtherQuery;
        this.automaticTestPojoList = automaticTestPojoList;
        this.sendRequestService = sendRequestService;
        this.start = start;
        this.end = end;
    }

    @Override
    public void run() {

        for (int i = this.start; i < this.end; i++) {
            SAXReader saxReader = new SAXReader();
            Document docOtherReq = null;
            AutoMaticTestPojo automaticTestPojo = new AutoMaticTestPojo();
            File file = filesOtherQuery.get(i);
            if (file != null && file.getName().contains("In")) {
                try {
                    docOtherReq = saxReader.read(file);
                } catch (DocumentException e) {
                    e.printStackTrace();
                }

                Element rootOtherReq = docOtherReq.getRootElement();
                String serialNo = rootOtherReq.element("Header").element("SerialNo").getTextTrim();//其他老核心请求流水
                //String policyNo = rootOtherReq.element("App").element("Req").element("PolicyNo").getTextTrim();//其他老核心对应1004保单号
                String flag = rootOtherReq.element("Header").element("TransCode").getTextTrim();

                automaticTestPojo.setTrialTransNo(serialNo);
                //automaticTestPojo.setTrialPolicyNo( policyNo );
                automaticTestPojo.setTemp3(flag);
                automaticTestPojo.setSystem("04");
                automaticTestPojo.setMakeDate(PubFun.getCurrentDate());
                automaticTestPojo.setMakeTime(PubFun.getCurrentTime());

                // 读取对应的返回报文
                if (StringUtils.isNotEmpty(serialNo)) {
                    //查今天对应的报文
                    File fileout = this.filesOtherOut.get(serialNo);
                    if (fileout != null && fileout.exists() && fileout.isFile()) {
                        Document docOtherReturn = null;
                        try {
                            docOtherReturn = saxReader.read(fileout);
                        } catch (DocumentException e) {
                            e.printStackTrace();
                        }

                        Element root1004Return = docOtherReturn.getRootElement();
                        String insuredmessage = root1004Return.element("Header").element("RetMsg").getTextTrim();//老核心交易信息
                        automaticTestPojo.setTrialMessage(insuredmessage);

                    } else {
                        automaticTestPojo.setTrialMessage("没有对应的返回报文。");
                    }
                }

                String xml = docOtherReq.asXML();
                logger.info("other请求 xml:" + xml);
                AutoMaticTestPojo returnPojo = send(xml, flag);
                automaticTestPojo.setInsuredMessage(returnPojo.getInsuredMessage());
                automaticTestPojo.setInsuredTransNo(returnPojo.getInsuredTransNo());
                //automaticTestPojo.setInsuredPolicyNo(returnPojo.getInsuredPolicyNo());
                this.automaticTestPojoList.add(automaticTestPojo);
            }
        }


    }

    public AutoMaticTestPojo send(String xml, String flag) {
        AutoMaticTestPojo automaticTestPojo = new AutoMaticTestPojo();
        if (!"1002".equals(flag.trim()) && !"1004".equals(flag.trim()) && !"1009".equals(flag.trim()) && !"1010".equals(flag.trim())) {
            String resultRevoke = "";
            logger.info("sendRequestService:  " + sendRequestService);
            resultRevoke = sendRequestService.dealData(xml);
            logger.info("other返回 xml:" + resultRevoke);

            if (resultRevoke.contains("<RetMsg>")) {
                automaticTestPojo.setInsuredMessage(resultRevoke.substring(resultRevoke.indexOf("<RetMsg>") + 8, resultRevoke.indexOf("</RetMsg>")));
            } else {
                automaticTestPojo.setInsuredMessage("返回报文未找到RetMsg");
            }
            if (resultRevoke.contains("<SerialNo>")) {
                automaticTestPojo.setInsuredTransNo(resultRevoke.substring(resultRevoke.indexOf("<SerialNo>") + 10, resultRevoke.indexOf("</SerialNo>")));
            }

        }
        return automaticTestPojo;
    }


}

  

















以上是关于多线程读写多个文件02的主要内容,如果未能解决你的问题,请参考以下文章

C# 多线程同步访问一个文件怎么处理

多个用户访问同一段代码

C#使用读写锁三行代码简单解决多线程并发写入文件时线程同步的问题

线程同步-使用ReaderWriterLockSlim类

多个请求是多线程吗

多线程线程同步