使用CompletionService结合ExecutorService批处理调用存储过程任务实例

Posted pypua

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用CompletionService结合ExecutorService批处理调用存储过程任务实例相关的知识,希望对你有一定的参考价值。

此实例为java多线程并发调用存储过程实例,只做代码记载,不做详细描述

1.线程池构造初始化类CommonExecutorService.java

package com.pupeiyuan.go;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CommonExecutorService {

    private static final int CAPACITY = 10000;
    private static final int CORE_POOL_SIZE = 100;
    private static final int MAXIMUM_POOL_SIZE = 1000;
    private static final Long KEEP_ALIVE_TIME = 100L;
    
    private CommonExecutorService() {
    }

    
    public static ExecutorService getExecutorService() {
        return executorService;
    }

    /**
     * 构造请求线程池,队列大小为1000
     */
    private static final ExecutorService executorService = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE,
            KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY), new ThreadFactory() {

                AtomicInteger poolNumber = new AtomicInteger(1);
                AtomicInteger threadNumber = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    String namePrefix = String.format("reportPool-%s-thread-", poolNumber.getAndIncrement());
                    return new Thread(r, namePrefix + threadNumber.getAndIncrement());
                }
            });
}

2.基础任务类BaseTask并实现Callable

package com.pupeiyuan.go;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.logging.Logger;

public abstract class BaseTask implements Callable<Map<String, Object>>{

//    protected static final Logger logger = Logger.getLogger(BaseTask.class);
    /**
     * 组织ID
     */
    private String orgId;
    /**
     * 组织类型
     */
    private String orgType;
    /**
     * 时间维度
     */
    private String timeDim;
    /**
     * 指标列表
     */
    private List<String> indexIdList;
    /**
     * 指标类别
     */
    private String indexCategory;
    /**
     * 指标权限
     */
    private Set<String> validIndexSet;
    /**
     * 存储过程名称
     */
    private String procName;
    /**
     * 后台设置的有效指标集合
     */
    private Set<String> backendValidIndexSet;    

    /**
     * 保存任务结果
     */
    private Map<String, Object> taskResultMap;
    
    /**
     * 记录日志用到的userId
     */
    private String userId;
    
    public BaseTask() {
        // TODO Auto-generated constructor stub
    }

    @Override
    public Map<String, Object> call() {
        List<String> paramList = makeParamList();
        ThreadUtils.set(getUserId());
        List<Map<String, Object>> retListMap = callProc(paramList);
        ThreadUtils.remove();
        Map<String, Object> retMap = handleBiResult(retListMap);        
        this.setTaskResultMap(retMap);
        return retMap;
    }

    /**
     * 构造请求参数
     * 
     * @return
     */
        public List<String> makeParamList() {
        List<String> params = new ArrayList<String>();
        //uuid to old_id
        params.add(CommonConstant.getOldIdByUUID(getOrgId(),getOrgType()));
        params.add(ReportUtils.getBIOrgType(getOrgType()));
            params.add(getTimeDim());
                if (getIndexIdList() != null && getIndexIdList().size() > 0) {
            params.add(StringUtils.join(getIndexIdList(), ","));
        }
        if (StringUtils.isNotBlank(getIndexCategory())) {
            params.add(getIndexCategory());
        }
        return params;
    }
    
    private List<Map<String, Object>> callProc(List<String> paramList) {
        System.out.println("thread:"+Thread.currentThread().getName());
        
        CallProcedureIDao callProcedureIDao = SpringContextHolder.getBean(CallProcedureIDao.class);
        List<Map<String, Object>> retListMap = null;
        try {
            retListMap = callProcedureIDao.getCallProcedureResult(getProcName(), paramList);            
        }
        catch(Exception e) {
            logger.error(e.getMessage(),e);
        }        
        return retListMap;
    }
    
    public abstract  Map<String, Object> handleBiResult(List<Map<String, Object>> retListMap);
    /**
     * @return the orgId
     */
    public String getOrgId() {
        return orgId;
    }

    /**
     * @param orgId the orgId to set
     */
    public void setOrgId(String orgId) {
        this.orgId = orgId;
    }

    /**
     * @return the orgType
     */
    public String getOrgType() {
        return orgType;
    }

    /**
     * @param orgType the orgType to set
     */
    public void setOrgType(String orgType) {
        this.orgType = orgType;
    }

    /**
     * @return the timeDim
     */
    public String getTimeDim() {
        return timeDim;
    }

    /**
     * @param timeDim the timeDim to set
     */
    public void setTimeDim(String timeDim) {
        this.timeDim = timeDim;
    }

    /**
     * @return the indexIdList
     */
    public List<String> getIndexIdList() {
        return indexIdList;
    }

    /**
     * @param indexIdList the indexIdList to set
     */
    public void setIndexIdList(List<String> indexIdList) {
        this.indexIdList = indexIdList;
    }

    /**
     * @return the validIndexSet
     */
    public Set<String> getValidIndexSet() {
        return validIndexSet;
    }

    /**
     * @param validIndexSet the validIndexSet to set
     */
    public void setValidIndexSet(Set<String> validIndexSet) {
        this.validIndexSet = validIndexSet;
    }

    /**
     * @return the procName
     */
    public String getProcName() {
        return procName;
    }

    /**
     * @param procName the procName to set
     */
    public void setProcName(String procName) {
        this.procName = procName;
    }

    /**
     * @return the indexCategory
     */
    public String getIndexCategory() {
        return indexCategory;
    }

    /**
     * @param indexCategory the indexCategory to set
     */
    public void setIndexCategory(String indexCategory) {
        this.indexCategory = indexCategory;
    }

    
    /**
     * @return the statCommonIService
     */
    public StatCommonIService getStatCommonIService() {
        return SpringContextHolder.getBean(StatCommonIService.class);
    }

    
    /**
     * @return the sysContactsIService
     */
    public SysContactsIService getSysContactsIService() {
        return SpringContextHolder.getBean(SysContactsIService.class);
    }


    /**
     * @return the backendValidIndexSet
     */
    public Set<String> getBackendValidIndexSet() {
        return backendValidIndexSet;
    }

    /**
     * @param backendValidIndexSet the backendValidIndexSet to set
     */
    public void setBackendValidIndexSet(Set<String> backendValidIndexSet) {
        this.backendValidIndexSet = backendValidIndexSet;
    }

    
    /**
     * @return the taskResultMap
     */
    public Map<String, Object> getTaskResultMap() {
        return taskResultMap;
    }

    /**
     * @param taskResultMap the taskResultMap to set
     */
    public void setTaskResultMap(Map<String, Object> taskResultMap) {
        this.taskResultMap = taskResultMap;
    }

    /**
     * @return the userId
     */
    public String getUserId() {
        return userId;
    }

    /**
     * @param userId the userId to set
     */
    public void setUserId(String userId) {
        this.userId = userId;
    }

    
    
    
}

3.具体任务类继承BaseTask继承BaseTask

package com.pupeiyuan.go;

public class PieTask extends BaseTask{

    public PieTask() {
        // TODO Auto-generated constructor stub
    }    
    
    public Map<String, Object>(List<Map<String, Object>> retListMap) {
        Map<String, Object> retMap = new HashMap<String, Object>();
        if (retListMap != null && retListMap.size() > 0) {
            List<String> inputIdList = getIndexIdList();
            Map<String, Object> map;
            String indexId;
            for(int i = 0; i < inputIdList.size(); i++) {
                indexId = inputIdList.get(i);
                map = retListMap.get(i);
                if (map == null) {//空数据处理                    
                    retMap.put(indexId,makePieItem(indexId,CommonConstant.INDEX_VALUE_STAR,getValidIndexSet()));
                } else {                                
                    String indexValue = (String)map.get("INDEX_VALUE");                    
                    retMap.put(indexId,makePieItem(indexId,indexValue,getValidIndexSet()));
                }
            }            
        } else {//空数据处理
            retMap = makePieFakeData(getIndexIdList(),getValidIndexSet());            
        }
        return retMap;
    }
    
    
    
    //没有数据处理
    public static HrPieItem makePieItem(String indexId, String indexValue, Set<String> validIndexSet) {
        HrPieItem item = new HrPieItem();    
        IndexInfoVo indexObj = CommonConstant.getIndexObj(indexId);    
        if (indexObj == null) {
            logger.error("ERROR:We dont find this indexId("+indexId+") info.");
            return item;
        }
        //查看权限        
        boolean bValid = validIndexSet.contains(indexId);
        bValid = true;
        item.setIndexId(indexId);
        String name = indexObj.getName();                
        String[] items = StringUtils.split(name, "\|");
        if (items != null ) {
            if (items.length == 1) {
                item.setIndexName(CommonConstant.getIndexNameWithDp(items[0],bValid));                        
            } else if(items.length == 2) {
                item.setIndexName(CommonConstant.getIndexNameWithDp(items[0],bValid));
                item.setIndexShortName(CommonConstant.getIndexNameWithDp(items[1],bValid));
            }
        }
        item.setIndexUnit(indexObj.getUnit());
        item.setIndexValue(CommonConstant.getIndexValueWithDp(indexValue,bValid));
        
        return item;
    }
        
    //没有数据处理
    public static Map<String, Object> makePieFakeData(List<String> indexIdList, Set<String> validIndexSet) {
        Map<String, Object> retMap = new HashMap<String, Object>();
        HrPieItem item;
        for (int i = 0 ;i <indexIdList.size(); i++) {
            String indexId = indexIdList.get(i);
            item = makePieItem(indexId,CommonConstant.INDEX_VALUE_STAR,validIndexSet);
            retMap.put(indexId, item);
        }
        return retMap;
    }
    //转换扇区百分比 
    public static void convertPercentage(List<HrPieItem> pieItems) {
        Double sum = new Double(0);
        //计算各扇区总和
        for (HrPieItem hrPieItem : pieItems) {
            if(null!=hrPieItem.getIndexValue()&&!"".equals(hrPieItem.getIndexValue())&&!"--".equals(hrPieItem.getIndexValue())) {
                sum+=Double.parseDouble(hrPieItem.getIndexValue());
            }
        }
        if(sum>0) {
            //计算分区所占百分比
            for (HrPieItem hrPieItem : pieItems) {
                if(null!=hrPieItem.getIndexValue()&&!"".equals(hrPieItem.getIndexValue())&&!"--".equals(hrPieItem.getIndexValue())) {
                    double percentage = Double.parseDouble(hrPieItem.getIndexValue())/sum;
                    percentage = (double) Math.round(percentage * 10000)/100;
                    hrPieItem.setPercentage(String.valueOf(percentage)+"%");
                }
            }
        }
    }

4.多线程处理触发类TaskExecutor

package com.pupeiyuan.go;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class TaskExecutor {

    private static ExecutorService commonExecutorService = CommonExecutorService.getExecutorService(); 
    private static ExecutorCompletionService<Map<String, Object>> completionService = new ExecutorCompletionService<Map<String, Object>>(commonExecutorService); 
    
    
    public TaskExecutor() {         
    }    

    public static void compute(List<BaseTask> taskList) {        
        if (taskList != null && taskList.size() > 0) {
            for (BaseTask task : taskList) {
                completionService.submit(task);
            }
            
            try {
                for (int i = 0; i < taskList.size(); i++) {
                    Future<Map<String, Object>> future = completionService.take();
                    future.get(5000, TimeUnit.MILLISECONDS);                    
                }                
            } catch (InterruptedException e) {                
                e.printStackTrace();
            }
            catch (ExecutionException e) {                
                e.printStackTrace();
            } catch (TimeoutException e) {                
                e.printStackTrace();
            }
        }        
    }
    }

5.调用存储过程实现类CallProcedureDaoImpl

package com.pupeiyuan.go;

public class CallProcedureDaoImpl implements CallProcedureIDao {

    protected final Log logger = LogFactory.getLog(this.getClass());

    private JdbcTemplate biJdbcTemplate;

    private JdbcTemplate jdbcTemplate;

    /**
     *
     * @param procedureName 存储过程名称包含预编译参数 {call testpro(?,?)} 最后一个参数为输出参数为游标类型
     * @param params 对应存储过程执行参数是有顺序的,参数类型为字符类型 不包含存储的输出参数
     * @return
     */
    @Override
    @SuppressWarnings("unchecked")
    public List<Map<String,Object>> getCallProcedureResult(final String procedureName, final List<String> params) {
        final String userId = (String) ThreadUtils.get();
        final long startTime = System.currentTimeMillis();
        //logger.info("开始调用存储过程【"+procedureName+"】,入参【" + JSON.toJSONString(params) + "】");
        List<Map<String,Object>> resultList = null;
        try {
            resultList = (List<Map<String,Object>>) biJdbcTemplate.execute(
                new CallableStatementCreator() {
                    public CallableStatement createCallableStatement(Connection con) throws SQLException {
                        String storedProc = procedureName;// 调用的sql
                        CallableStatement cs = con.prepareCall(storedProc);
                        for (int i=0; i< params.size();i++) {
                            cs.setString((i+1), params.get(i));// 设置输入参数的值
                        }
                        cs.registerOutParameter((params.size()+1),OracleTypes.CURSOR);// 注册输出参数的类型
                        return cs;
                    }
                }, new CallableStatementCallback() {
                    public Object doInCallableStatement(CallableStatement cs) throws SQLException,DataAccessException {
                        List<Map<String,Object>> resultsMap = new ArrayList<Map<String,Object>>();
                        cs.execute();
                        ResultSet rs = (ResultSet) cs.getObject((params.size()+1));;// 此处值必须跟游标返回的值下标是统一个下标
                        if (rs!=null) {
                            ResultSetMetaData rsmd = rs.getMetaData();
                            List<String> columNames = new ArrayList<String>();
                            for(int i=1; i<= rsmd.getColumnCount(); i++){
                                columNames.add(rsmd.getColumnName(i)); //将字段名放在List中
                            }
                            if (!CollectionUtils.isEmpty(columNames)) {
                                while (rs.next()) {// 转换每行的返回值到Map中
                                    Map<String,Object> rowMap = new HashMap<String,Object>();
                                    for (String columName : columNames) {
                                        rowMap.put(columName, rs.getObject(columName));
                                    }
                                    resultsMap.add(rowMap);
                                }
                            }
                            rs.close();
                        }
                        return resultsMap;
                    }
                });
            final long endTime = System.currentTimeMillis();
//            logger.info("结束调用存储过程【"+procedureName+"】,入参【"+ JSON.toJSONString(params) + "】,查询存储过程返回数据条数【"+resultList.size()+"】总耗时:" + (endTime-startTime) + "毫秒");
//            logger.info("本次调用存储过程返回数据:"+JSON.toJSONString(resultList));
            final List<Map<String, Object>> finalResultList = resultList;
            jdbcTemplate.update("INSERT INTO PROCEDURE_LOGS VALUES(?,?,?,?,?,?,?,?)",
                new PreparedStatementSetter() {
                    public void setValues(PreparedStatement ps) throws SQLException {
                        ps.setString(1, JugHelper.generalUUID());
                        ps.setString(2, procedureName);
                        ps.setString(3, JSON.toJSONString(params));
                        ps.setString(4, JSON.toJSONString(finalResultList));
                        ps.setTimestamp(5, new Timestamp(new Date().getTime()));
                        ps.setInt(6, Integer.valueOf((endTime-startTime)+""));
                        ps.setString(7, "1");// 正常
                        ps.setString(8, AppStringUtils.isNotEmpty(userId) ? userId : "");// 用户ID
                    }
                }
            );
        } catch (Exception e) {
            final long endTime = System.currentTimeMillis();
            final String errorMsg = getStackTrace(e);
            jdbcTemplate.update("INSERT INTO PROCEDURE_LOGS VALUES(?,?,?,?,?,?,?,?)",
                    new PreparedStatementSetter() {
                        public void setValues(PreparedStatement ps) throws SQLException {
                            ps.setString(1, JugHelper.generalUUID());
                            ps.setString(2, procedureName);
                            ps.setString(3, JSON.toJSONString(params));
                            ps.setString(4, errorMsg);
                            ps.setTimestamp(5, new Timestamp(new Date().getTime()));
                            ps.setInt(6, Integer.valueOf((endTime-startTime)+""));
                            ps.setString(7, "0");// 异常
                            ps.setString(8, AppStringUtils.isNotEmpty(userId) ? userId : "");// 用户ID
                        }
                    }
            );
        }
        return resultList;
    }

    /**
     * 获取完整的异常堆栈信息
     * @param throwable
     * @return
     */
    protected String getStackTrace(Throwable throwable) {
        StringWriter sw = null;
        PrintWriter pw = null;
        try {
            sw = new StringWriter();
            pw = new PrintWriter(sw, true);
            throwable.printStackTrace(pw);
            return sw.getBuffer().toString();
        } catch (Exception e) {
            e.printStackTrace();
            return e.getMessage();
        }
        finally {
            if (sw!=null) {
                try {
                    sw.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            if (pw!=null) {
                pw.close();
            }
        }
    }

    public void setBiJdbcTemplate(JdbcTemplate biJdbcTemplate) {
        this.biJdbcTemplate = biJdbcTemplate;
    }

    public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

6.工具类ThreadUtils

package com.pupeiyuan.go;

public class ThreadUtils {

     /*ThreadLocal:将变量与当前线程绑定*/
    private static ThreadLocal<Object> threadLocal = new ThreadLocal<Object>();

    public static void set(Object value) {
        threadLocal.set(value);
    }

    public static Object get() {
        return threadLocal.get();
    }

    public static void remove() {
        threadLocal.remove();
    }
}

7.业务类

package com.pupeiyuan.go;

public class test {
    //取出PIE的指标        
    List<String> pieWorkYearIndexList = HrIndexConstant.getPieWorkYearIndexList();
    List<String> pieEducationIndexList = HrIndexConstant.getPieEducationIndexList();        
    List<String> pieCapacityIndexList = HrIndexConstant.getPieCapacityIndexList();        
    List<String> pieConsultorStarIndexList = HrIndexConstant.getPieConsultorStarIndexList();
    List<String> pieRecruitChannelIndexList = HrIndexConstant.getPieRecruitChannelIndexList();        
    
    List<String> allIndex = new ArrayList<String>();
    allIndex.addAll(pieWorkYearIndexList);
    allIndex.addAll(pieCapacityIndexList);
    allIndex.addAll(pieEducationIndexList);
    allIndex.addAll(pieConsultorStarIndexList);
    allIndex.addAll(pieRecruitChannelIndexList);
    //获取指标权限
    Set<String> validIndexSet = CommonConstant.getValidIndexList(orgId, postId, roleId, allIndex);
    validIndexSet = HrIndexConstant.handleHrInnerPriv(validIndexSet,postId);
    //
    PieTask workYearTask = new PieTask();
    workYearTask.setOrgId(orgId);
    workYearTask.setOrgType(orgType);
    workYearTask.setTimeDim(timeDim);
    workYearTask.setIndexIdList(pieWorkYearIndexList);        
    workYearTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE"));
    workYearTask.setValidIndexSet(validIndexSet);
    workYearTask.setUserId(userId);
    
    PieTask educationTask = new PieTask();
    educationTask.setOrgId(orgId);
    educationTask.setOrgType(orgType);
    educationTask.setTimeDim(timeDim);
    educationTask.setIndexIdList(pieEducationIndexList);
    educationTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE"));
    educationTask.setValidIndexSet(validIndexSet);
    educationTask.setUserId(userId);
    
    PieTask consultorStarTask = new PieTask();
    consultorStarTask.setOrgId(orgId);
    consultorStarTask.setOrgType(orgType);
    consultorStarTask.setTimeDim(timeDim);
    consultorStarTask.setIndexIdList(pieConsultorStarIndexList);
    consultorStarTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE"));
    consultorStarTask.setValidIndexSet(validIndexSet);
    consultorStarTask.setUserId(userId);    
    
    PieTask recruitChannelTask = new PieTask();
    recruitChannelTask.setOrgId(orgId);
    recruitChannelTask.setOrgType(orgType);
    recruitChannelTask.setTimeDim(timeDim);
    recruitChannelTask.setIndexIdList(pieRecruitChannelIndexList);
    recruitChannelTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE"));
    recruitChannelTask.setValidIndexSet(validIndexSet);
    recruitChannelTask.setUserId(userId);
    
    /*CapacityPieTask capacityTask = new CapacityPieTask();
    capacityTask.setOrgId(orgId);
    capacityTask.setOrgType(orgType);
    capacityTask.setTimeDim(timeDim);
    capacityTask.setIndexIdList(pieCapacityIndexList);
    capacityTask.setProcName(ProcedureConstant.PROC_HR_MAP.get("PROC_HR_PIE"));
    capacityTask.setValidIndexSet(validIndexSet);
    capacityTask.setUserId(userId);
    
    
    
        
    */
    
    List<BaseTask> taskList = new ArrayList<BaseTask>(5);
    taskList.add(workYearTask);
    //taskList.add(capacityTask);
    taskList.add(educationTask);
    taskList.add(consultorStarTask);
    taskList.add(recruitChannelTask);

    TaskExecutor.compute(taskList);        

    //后续处理
    List<HrPieData> pieList = new ArrayList<HrPieData>();
    HrPieData pieData = new HrPieData();
    pieData.setPieName("司龄分布");
    pieData.setSubIndexList(getPieItemResult(pieWorkYearIndexList, workYearTask.getTaskResultMap()));
    workYearTask.convertPercentage(pieData.getSubIndexList());
    pieList.add(pieData);
    
    pieData = new HrPieData();
    pieData.setPieName("学历分布");
    pieData.setSubIndexList(getPieItemResult(pieEducationIndexList, educationTask.getTaskResultMap()));
    educationTask.convertPercentage(pieData.getSubIndexList());
    pieList.add(pieData);        
    
    pieData = new HrPieData();
    pieData.setPieName("经纪人星级分布");
    pieData.setSubIndexList(getPieItemResult(pieConsultorStarIndexList, consultorStarTask.getTaskResultMap()));
    consultorStarTask.convertPercentage(pieData.getSubIndexList());
    pieList.add(pieData);
    
    
    pieData = new HrPieData();
    pieData.setPieName("招聘渠道分布");
    pieData.setSubIndexList(getPieItemResult(pieRecruitChannelIndexList, recruitChannelTask.getTaskResultMap()));
    recruitChannelTask.convertPercentage(pieData.getSubIndexList());
    pieList.add(pieData);
    
}

 

以上是关于使用CompletionService结合ExecutorService批处理调用存储过程任务实例的主要内容,如果未能解决你的问题,请参考以下文章

Java技术指南「并发编程专题」CompletionService框架基本使用和原理探究(基础篇

Java线程之CompletionService

Java深层系列「并发编程系列」让我们一起探索一下CompletionService的技术原理和使用指南

completionService

通俗易懂的JUC源码剖析-CompletionService

深入浅出Java并发编程指南「难点 - 核心 - 遗漏」让我们一起探索一下CompletionService的技术原理和使用指南