使用CompletionService结合ExecutorService批处理调用存储过程任务实例
Posted pypua
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用CompletionService结合ExecutorService批处理调用存储过程任务实例相关的知识,希望对你有一定的参考价值。
此实例为java多线程并发调用存储过程实例,只做代码记载,不做详细描述
1.线程池构造初始化类CommonExecutorService.java
package com.pupeiyuan.go; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class CommonExecutorService { private static final int CAPACITY = 10000; private static final int CORE_POOL_SIZE = 100; private static final int MAXIMUM_POOL_SIZE = 1000; private static final Long KEEP_ALIVE_TIME = 100L; private CommonExecutorService() { } public static ExecutorService getExecutorService() { return executorService; } /** * 构造请求线程池,队列大小为1000 */ private static final ExecutorService executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY), new ThreadFactory() { AtomicInteger poolNumber = new AtomicInteger(1); AtomicInteger threadNumber = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { String namePrefix = String.format("reportPool-%s-thread-", poolNumber.getAndIncrement()); return new Thread(r, namePrefix + threadNumber.getAndIncrement()); } }); }
2.基础任务类BaseTask并实现Callable
package com.pupeiyuan.go; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.logging.Logger; public abstract class BaseTask implements Callable<Map<String, Object>>{ // protected static final Logger logger = Logger.getLogger(BaseTask.class); /** * 组织ID */ private String orgId; /** * 组织类型 */ private String orgType; /** * 时间维度 */ private String timeDim; /** * 指标列表 */ private List<String> indexIdList; /** * 指标类别 */ private String indexCategory; /** * 指标权限 */ private Set<String> validIndexSet; /** * 存储过程名称 */ private String procName; /** * 后台设置的有效指标集合 */ private Set<String> backendValidIndexSet; /** * 保存任务结果 */ private Map<String, Object> taskResultMap; /** * 记录日志用到的userId */ private String userId; public BaseTask() { // TODO Auto-generated constructor stub } @Override public Map<String, Object> call() { List<String> paramList = makeParamList(); ThreadUtils.set(getUserId()); List<Map<String, Object>> retListMap = callProc(paramList); ThreadUtils.remove(); Map<String, Object> retMap = handleBiResult(retListMap); this.setTaskResultMap(retMap); return retMap; } /** * 构造请求参数 * * @return */ public List<String> makeParamList() { List<String> params = new ArrayList<String>(); //uuid to old_id params.add(CommonConstant.getOldIdByUUID(getOrgId(),getOrgType())); params.add(ReportUtils.getBIOrgType(getOrgType())); params.add(getTimeDim()); if (getIndexIdList() != null && getIndexIdList().size() > 0) { params.add(StringUtils.join(getIndexIdList(), ",")); } if (StringUtils.isNotBlank(getIndexCategory())) { params.add(getIndexCategory()); } return params; } private List<Map<String, Object>> callProc(List<String> paramList) { System.out.println("thread:"+Thread.currentThread().getName()); CallProcedureIDao callProcedureIDao = SpringContextHolder.getBean(CallProcedureIDao.class); List<Map<String, Object>> retListMap = null; try { retListMap = callProcedureIDao.getCallProcedureResult(getProcName(), paramList); } catch(Exception e) { logger.error(e.getMessage(),e); } return retListMap; } public abstract Map<String, Object> handleBiResult(List<Map<String, Object>> retListMap); /** * @return the orgId */ public String getOrgId() { return orgId; } /** * @param orgId the orgId to set */ public void setOrgId(String orgId) { this.orgId = orgId; } /** * @return the orgType */ public String getOrgType() { return orgType; } /** * @param orgType the orgType to set */ public void setOrgType(String orgType) { this.orgType = orgType; } /** * @return the timeDim */ public String getTimeDim() { return timeDim; } /** * @param timeDim the timeDim to set */ public void setTimeDim(String timeDim) { this.timeDim = timeDim; } /** * @return the indexIdList */ public List<String> getIndexIdList() { return indexIdList; } /** * @param indexIdList the indexIdList to set */ public void setIndexIdList(List<String> indexIdList) { this.indexIdList = indexIdList; } /** * @return the validIndexSet */ public Set<String> getValidIndexSet() { return validIndexSet; } /** * @param validIndexSet the validIndexSet to set */ public void setValidIndexSet(Set<String> validIndexSet) { this.validIndexSet = validIndexSet; } /** * @return the procName */ public String getProcName() { return procName; } /** * @param procName the procName to set */ public void setProcName(String procName) { this.procName = procName; } /** * @return the indexCategory */ public String getIndexCategory() { return indexCategory; } /** * @param indexCategory the indexCategory to set */ public void setIndexCategory(String indexCategory) { this.indexCategory = indexCategory; } /** * @return the statCommonIService */ public StatCommonIService getStatCommonIService() { return SpringContextHolder.getBean(StatCommonIService.class); } /** * @return the sysContactsIService */ public SysContactsIService getSysContactsIService() { return SpringContextHolder.getBean(SysContactsIService.class); } /** * @return the backendValidIndexSet */ public Set<String> getBackendValidIndexSet() { return backendValidIndexSet; } /** * @param backendValidIndexSet the backendValidIndexSet to set */ public void setBackendValidIndexSet(Set<String> backendValidIndexSet) { this.backendValidIndexSet = backendValidIndexSet; } /** * @return the taskResultMap */ public Map<String, Object> getTaskResultMap() { return taskResultMap; } /** * @param taskResultMap the taskResultMap to set */ public void setTaskResultMap(Map<String, Object> taskResultMap) { this.taskResultMap = taskResultMap; } /** * @return the userId */ public String getUserId() { return userId; } /** * @param userId the userId to set */ public void setUserId(String userId) { this.userId = userId; } }
3.具体任务类继承BaseTask继承BaseTask
package com.pupeiyuan.go; public class PieTask extends BaseTask{ public PieTask() { // TODO Auto-generated constructor stub } public Map<String, Object>(List<Map<String, Object>> retListMap) { Map<String, Object> retMap = new HashMap<String, Object>(); if (retListMap != null && retListMap.size() > 0) { List<String> inputIdList = getIndexIdList(); Map<String, Object> map; String indexId; for(int i = 0; i < inputIdList.size(); i++) { indexId = inputIdList.get(i); map = retListMap.get(i); if (map == null) {//空数据处理 retMap.put(indexId,makePieItem(indexId,CommonConstant.INDEX_VALUE_STAR,getValidIndexSet())); } else { String indexValue = (String)map.get("INDEX_VALUE"); retMap.put(indexId,makePieItem(indexId,indexValue,getValidIndexSet())); } } } else {//空数据处理 retMap = makePieFakeData(getIndexIdList(),getValidIndexSet()); } return retMap; } //没有数据处理 public static HrPieItem makePieItem(String indexId, String indexValue, Set<String> validIndexSet) { HrPieItem item = new HrPieItem(); IndexInfoVo indexObj = CommonConstant.getIndexObj(indexId); if (indexObj == null) { logger.error("ERROR:We dont find this indexId("+indexId+") info."); return item; } //查看权限 boolean bValid = validIndexSet.contains(indexId); bValid = true; item.setIndexId(indexId); String name = indexObj.getName(); String[] items = StringUtils.split(name, "\|"); if (items != null ) { if (items.length == 1) { item.setIndexName(CommonConstant.getIndexNameWithDp(items[0],bValid)); } else if(items.length == 2) { item.setIndexName(CommonConstant.getIndexNameWithDp(items[0],bValid)); item.setIndexShortName(CommonConstant.getIndexNameWithDp(items[1],bValid)); } } item.setIndexUnit(indexObj.getUnit()); item.setIndexValue(CommonConstant.getIndexValueWithDp(indexValue,bValid)); return item; } //没有数据处理 public static Map<String, Object> makePieFakeData(List<String> indexIdList, Set<String> validIndexSet) { Map<String, Object> retMap = new HashMap<String, Object>(); HrPieItem item; for (int i = 0 ;i <indexIdList.size(); i++) { String indexId = indexIdList.get(i); item = makePieItem(indexId,CommonConstant.INDEX_VALUE_STAR,validIndexSet); retMap.put(indexId, item); } return retMap; } //转换扇区百分比 public static void convertPercentage(List<HrPieItem> pieItems) { Double sum = new Double(0); //计算各扇区总和 for (HrPieItem hrPieItem : pieItems) { if(null!=hrPieItem.getIndexValue()&&!"".equals(hrPieItem.getIndexValue())&&!"--".equals(hrPieItem.getIndexValue())) { sum+=Double.parseDouble(hrPieItem.getIndexValue()); } } if(sum>0) { //计算分区所占百分比 for (HrPieItem hrPieItem : pieItems) { if(null!=hrPieItem.getIndexValue()&&!"".equals(hrPieItem.getIndexValue())&&!"--".equals(hrPieItem.getIndexValue())) { double percentage = Double.parseDouble(hrPieItem.getIndexValue())/sum; percentage = (double) Math.round(percentage * 10000)/100; hrPieItem.setPercentage(String.valueOf(percentage)+"%"); } } } }
4.多线程处理触发类TaskExecutor
package com.pupeiyuan.go; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class TaskExecutor { private static ExecutorService commonExecutorService = CommonExecutorService.getExecutorService(); private static ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<Map<String, Object>>(commonExecutorService); public TaskExecutor() { } public static void compute(List<BaseTask> taskList) { if (taskList != null && taskList.size() > 0) { for (BaseTask task : taskList) { completionService.submit(task); } try { for (int i = 0; i < taskList.size(); i++) { Future<Map<String, Object>> future = completionService.take(); future.get(5000, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } } }
5.调用存储过程实现类CallProcedureDaoImpl
package com.pupeiyuan.go; public class CallProcedureDaoImpl implements CallProcedureIDao { protected final Log logger = LogFactory.getLog(this.getClass()); private JdbcTemplate biJdbcTemplate; private JdbcTemplate jdbcTemplate; /** * * @param procedureName 存储过程名称包含预编译参数 {call testpro(?,?)} 最后一个参数为输出参数为游标类型 * @param params 对应存储过程执行参数是有顺序的,参数类型为字符类型 不包含存储的输出参数 * @return */ @Override @SuppressWarnings("unchecked") public List<Map<String,Object>> getCallProcedureResult(final String procedureName, final List<String> params) { final String userId = (String) ThreadUtils.get(); final long startTime = System.currentTimeMillis(); //logger.info("开始调用存储过程【"+procedureName+"】,入参【" + JSON.toJSONString(params) + "】"); List<Map<String,Object>> resultList = null; try { resultList = (List<Map<String,Object>>) biJdbcTemplate.execute( new CallableStatementCreator() { public CallableStatement createCallableStatement(Connection con) throws SQLException { String storedProc = procedureName;// 调用的sql CallableStatement cs = con.prepareCall(storedProc); for (int i=0; i< params.size();i++) { cs.setString((i+1), params.get(i));// 设置输入参数的值 } cs.registerOutParameter((params.size()+1),OracleTypes.CURSOR);// 注册输出参数的类型 return cs; } }, new CallableStatementCallback() { public Object doInCallableStatement(CallableStatement cs) throws SQLException,DataAccessException { List<Map<String,Object>> resultsMap = new ArrayList<Map<String,Object>>(); cs.execute(); ResultSet rs = (ResultSet) cs.getObject((params.size()+1));;// 此处值必须跟游标返回的值下标是统一个下标 if (rs!=null) { ResultSetMetaData rsmd = rs.getMetaData(); List<String> columNames = new ArrayList<String>(); for(int i=1; i<= rsmd.getColumnCount(); i++){ columNames.add(rsmd.getColumnName(i)); //将字段名放在List中 } if (!CollectionUtils.isEmpty(columNames)) { while (rs.next()) {// 转换每行的返回值到Map中 Map<String,Object> rowMap = new HashMap<String,Object>(); for (String columName : columNames) { rowMap.put(columName, rs.getObject(columName)); } resultsMap.add(rowMap); } } rs.close(); } return resultsMap; } }); final long endTime = System.currentTimeMillis(); // logger.info("结束调用存储过程【"+procedureName+"】,入参【"+ JSON.toJSONString(params) + "】,查询存储过程返回数据条数【"+resultList.size()+"】总耗时:" + (endTime-startTime) + "毫秒"); // logger.info("本次调用存储过程返回数据:"+JSON.toJSONString(resultList)); final List<Map<String, Object>> finalResultList = resultList; jdbcTemplate.update("INSERT INTO PROCEDURE_LOGS VALUES(?,?,?,?,?,?,?,?)", new PreparedStatementSetter() { public void setValues(PreparedStatement ps) throws SQLException { ps.setString(1, JugHelper.generalUUID()); ps.setString(2, procedureName); ps.setString(3, JSON.toJSONString(params)); ps.setString(4, JSON.toJSONString(finalResultList)); ps.setTimestamp(5, new Timestamp(new Date().getTime())); ps.setInt(6, Integer.valueOf((endTime-startTime)+"")); ps.setString(7, "1");// 正常 ps.setString(8, AppStringUtils.isNotEmpty(userId) ? userId : "");// 用户ID } } ); } catch (Exception e) { final long endTime = System.currentTimeMillis(); final String errorMsg = getStackTrace(e); jdbcTemplate.update("INSERT INTO PROCEDURE_LOGS VALUES(?,?,?,?,?,?,?,?)", new PreparedStatementSetter() { public void setValues(PreparedStatement ps) throws SQLException { ps.setString(1, JugHelper.generalUUID()); ps.setString(2, procedureName); ps.setString(3, JSON.toJSONString(params)); ps.setString(4, errorMsg); ps.setTimestamp(5, new Timestamp(new Date().getTime())); ps.setInt(6, Integer.valueOf((endTime-startTime)+"")); ps.setString(7, "0");// 异常 ps.setString(8, AppStringUtils.isNotEmpty(userId) ? userId : "");// 用户ID } } ); } return resultList; } /** * 获取完整的异常堆栈信息 * @param throwable * @return */ protected String getStackTrace(Throwable throwable) { StringWriter sw = null; PrintWriter pw = null; try { sw = new StringWriter(); pw = new PrintWriter(sw, true); throwable.printStackTrace(pw); return sw.getBuffer().toString(); } catch (Exception e) { e.printStackTrace(); return e.getMessage(); } finally { if (sw!=null) { try { sw.close(); } catch (IOException e) { e.printStackTrace(); } } if (pw!=null) { pw.close(); } } } public void setBiJdbcTemplate(JdbcTemplate biJdbcTemplate) { this.biJdbcTemplate = biJdbcTemplate; } public void setJdbcTemplate(JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; }
6.工具类ThreadUtils
package com.pupeiyuan.go; public class ThreadUtils { /*ThreadLocal:将变量与当前线程绑定*/ private static ThreadLocal<Object> threadLocal = new ThreadLocal<Object>(); public static void set(Object value) { threadLocal.set(value); } public static Object get() { return threadLocal.get(); } public static void remove() { threadLocal.remove(); } }
7.业务类
package com.pupeiyuan.go; public class test { //取出PIE的指标 List<String> pieWorkYearIndexList = HrIndexConstant.getPieWorkYearIndexList(); List<String> pieEducationIndexList = HrIndexConstant.getPieEducationIndexList(); List<String> pieCapacityIndexList = HrIndexConstant.getPieCapacityIndexList(); List<String> pieConsultorStarIndexList = HrIndexConstant.getPieConsultorStarIndexList(); List<String> pieRecruitChannelIndexList = HrIndexConstant.getPieRecruitChannelIndexList(); List<String> allIndex = new ArrayList<String>(); allIndex.addAll(pieWorkYearIndexList); allIndex.addAll(pieCapacityIndexList); allIndex.addAll(pieEducationIndexList); allIndex.addAll(pieConsultorStarIndexList); allIndex.addAll(pieRecruitChannelIndexList); //获取指标权限 Set<String> validIndexSet = CommonConstant.getValidIndexList(orgId, postId, roleId, allIndex); validIndexSet = HrIndexConstant.handleHrInnerPriv(validIndexSet,postId); // PieTask workYearTask = new PieTask(); workYearTask.setOrgId(orgId); workYearTask.setOrgType(orgType); workYearTask.setTimeDim(timeDim); workYearTask.setIndexIdList(pieWorkYearIndexList); workYearTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); workYearTask.setValidIndexSet(validIndexSet); workYearTask.setUserId(userId); PieTask educationTask = new PieTask(); educationTask.setOrgId(orgId); educationTask.setOrgType(orgType); educationTask.setTimeDim(timeDim); educationTask.setIndexIdList(pieEducationIndexList); educationTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); educationTask.setValidIndexSet(validIndexSet); educationTask.setUserId(userId); PieTask consultorStarTask = new PieTask(); consultorStarTask.setOrgId(orgId); consultorStarTask.setOrgType(orgType); consultorStarTask.setTimeDim(timeDim); consultorStarTask.setIndexIdList(pieConsultorStarIndexList); consultorStarTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); consultorStarTask.setValidIndexSet(validIndexSet); consultorStarTask.setUserId(userId); PieTask recruitChannelTask = new PieTask(); recruitChannelTask.setOrgId(orgId); recruitChannelTask.setOrgType(orgType); recruitChannelTask.setTimeDim(timeDim); recruitChannelTask.setIndexIdList(pieRecruitChannelIndexList); recruitChannelTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); recruitChannelTask.setValidIndexSet(validIndexSet); recruitChannelTask.setUserId(userId); /*CapacityPieTask capacityTask = new CapacityPieTask(); capacityTask.setOrgId(orgId); capacityTask.setOrgType(orgType); capacityTask.setTimeDim(timeDim); capacityTask.setIndexIdList(pieCapacityIndexList); capacityTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE")); capacityTask.setValidIndexSet(validIndexSet); capacityTask.setUserId(userId); */ List<BaseTask> taskList = new ArrayList<BaseTask>(5); taskList.add(workYearTask); //taskList.add(capacityTask); taskList.add(educationTask); taskList.add(consultorStarTask); taskList.add(recruitChannelTask); TaskExecutor.compute(taskList); //后续处理 List<HrPieData> pieList = new ArrayList<HrPieData>(); HrPieData pieData = new HrPieData(); pieData.setPieName("司龄分布"); pieData.setSubIndexList(getPieItemResult(pieWorkYearIndexList, workYearTask.getTaskResultMap())); workYearTask.convertPercentage(pieData.getSubIndexList()); pieList.add(pieData); pieData = new HrPieData(); pieData.setPieName("学历分布"); pieData.setSubIndexList(getPieItemResult(pieEducationIndexList, educationTask.getTaskResultMap())); educationTask.convertPercentage(pieData.getSubIndexList()); pieList.add(pieData); pieData = new HrPieData(); pieData.setPieName("经纪人星级分布"); pieData.setSubIndexList(getPieItemResult(pieConsultorStarIndexList, consultorStarTask.getTaskResultMap())); consultorStarTask.convertPercentage(pieData.getSubIndexList()); pieList.add(pieData); pieData = new HrPieData(); pieData.setPieName("招聘渠道分布"); pieData.setSubIndexList(getPieItemResult(pieRecruitChannelIndexList, recruitChannelTask.getTaskResultMap())); recruitChannelTask.convertPercentage(pieData.getSubIndexList()); pieList.add(pieData); }
以上是关于使用CompletionService结合ExecutorService批处理调用存储过程任务实例的主要内容,如果未能解决你的问题,请参考以下文章
Java技术指南「并发编程专题」CompletionService框架基本使用和原理探究(基础篇
Java深层系列「并发编程系列」让我们一起探索一下CompletionService的技术原理和使用指南
通俗易懂的JUC源码剖析-CompletionService
深入浅出Java并发编程指南「难点 - 核心 - 遗漏」让我们一起探索一下CompletionService的技术原理和使用指南