利用spring batch 读取数据库中的数据写入到txt文件中

Posted 月亮之城

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用spring batch 读取数据库中的数据写入到txt文件中相关的知识,希望对你有一定的参考价值。

写一个job,其配置文件为:

技术分享
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
        http://www.springframework.org/schema/batch
        http://www.springframework.org/schema/batch/spring-batch-3.0.xsd"
    default-lazy-init="true">
    <!-- 将数据库中数据查询出来入到txt文件中 -->
    <batch:job id="batchCreateReconFileJob" job-repository="jobRepository">
        <!-- 从channel ftp获取对账文件到本地临时目录以供读取解析入库   -->
        <batch:step id="batchCreateReconFile0">
            <batch:tasklet>
                <batch:chunk reader="batchCreateReconFileReader" processor="batchCreateReconFileProcessor"
                             writer="batchCreateReconFileWriter" commit-interval="1000"/>
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="batchCreateReconFileReader" parent="abstractCursorReader" scope="step">
        <property name="dataSource" ref="dataSource" />
        <property name="sql">
            <value>
                <![CDATA[
                SELECT
                    qp.pay_no AS payNo,
                    qp.channel_pay_no AS channelPayNo,
                    qp.bank_pay_no AS bankPayNo,
                    sign_no AS signNo
                FROM channel_qptrade qp WHERE create_time >= #{jobParameters[startDate]} and create_time<#{jobParameters[endDate]}
                
                ]]>
            </value>
        </property>
        <property name="rowMapper">
            <bean class="org.springframework.jdbc.core.BeanPropertyRowMapper">
                <property name="mappedClass" value="com.ninefbank.smallpay.clear.vo.ChannelQptradeVO"/>
            </bean>
        </property>
    </bean>


     <bean id="batchCreateReconFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">  
        <property name="resource" value="file:#{jobParameters[‘outputFilePath‘]}"></property>  
        <property name="lineAggregator">  
            <bean class="org.springframework.batch.item.file.transform.FormatterLineAggregator">  
                <property name="fieldExtractor">  
                    <bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">  
                        <property name="names" value="payNo,channelPayNo,bankPayNo,signNo"></property>  
                    </bean>  
                </property>  
                <property name="format" value="%s,%s,%s,%s"></property>  
            </bean>  
        </property>  
    </bean>  
      
</beans>
job-batch-create-recon-file.xml

处理类为

package com.ninefbank.smallpay.clear.batchReconFile.processer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Service;

import com.ninefbank.smallpay.clear.vo.ChannelQptradeVO;

/**
 * OuterReconProcessor
 * 
 * 外部对账 订单处理流程
 */
@Service("batchCreateReconFileProcessor")
public class BatchCreateReconFileProcessor implements ItemProcessor<ChannelQptradeVO, ChannelQptradeVO> {

    private static Logger logger = LoggerFactory.getLogger(BatchCreateReconFileProcessor.class);


    /**
     *
     * 检查对账成功重复状态
     *
     * 检测异常,长款 ,短款 状态 金额
     *
     * @param data
     * @return
     * @throws Exception
     */
    @Override
    public ChannelQptradeVO process(ChannelQptradeVO data) throws Exception {

        logger.info("批量生成外部对账文件:{start} {}", new Object[] { data.toString() });
        return data;
    }
}

写一个测试的controller:

package com.ninefbank.smallpay.clear.controller;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;


import org.apache.commons.lang3.StringUtils;
//import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.ninefbank.smallpay.clear.constant.ClearConstants;
import com.ninefbank.smallpay.clear.service.IBatchCreateReconFileService;
//import com.ninefbank.smallpay.clear.service.IHandleFillDiffFileService;
import com.ninefbank.smallpay.clear.service.ITaskStartInHandService;
import com.ninefbank.smallpay.common.exception.ApplicationException;
import com.ninefbank.smallpay.common.util.DateUtil;

/**
 *
 * TaskStartInHand
 * 
 * ll
 * ll
 * 2015年12月23日 下午8:26:01
 * 
 * @version 1.0.0
 *
 */
@Controller
@RequestMapping("/taskBatchCreateReconFileStart")
public class BatchCreateReconFileController {
    
    private static Logger logger = LoggerFactory.getLogger(BatchCreateReconFileController.class);
    
    @Autowired
    private IBatchCreateReconFileService batchCreateReconFileService;
    
    /**
     * 按日期手动启动单个任务
     * 
     * @param jobId
     * @param transDate 交易日期
     * @return
     */
    @RequestMapping(value="/run")
    @ResponseBody
    public Map<String, Object> execute(String jobId, String transDate, String transDate1){
        Map<String, Object> ret = new HashMap<String, Object>();
        
        if(StringUtils.isBlank(transDate)){
            ret.put("result", "fail");
            return ret;
        }
        
        Map<String, Object> params = new HashMap<String, Object>();
        params.put("jobId", jobId);
        params.put("transDate", transDate);
        params.put("transDate1", transDate1);
        
        try {
            boolean flag = batchCreateReconFileService.run(params);
            if(flag){
                ret.put("result", "success");
            }else{
                ret.put("result", "fail");
            }
        } catch (ApplicationException e) {
            logger.error("手动触发任务{}失败!!!", new Object[]{jobId}, e);
            ret.put("result", "fail");
        }
        return ret;
    }
    
}

里面涉及到的service:

package com.ninefbank.smallpay.clear.service;

import java.util.Map;

import com.ninefbank.smallpay.common.exception.ApplicationException;

public interface IBatchCreateReconFileService {
    public boolean run(Map<String, Object> params) throws ApplicationException;
}

涉及到的实现类:

package com.ninefbank.smallpay.clear.service.impl;

import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.stereotype.Service;

import com.ninefbank.smallpay.clear.constant.ClearConstants;
import com.ninefbank.smallpay.clear.datasync.jobParams.JobParamsDataSync;
import com.ninefbank.smallpay.clear.inner.JobInnerParams;
import com.ninefbank.smallpay.clear.service.IBatchCreateReconFileService;
import com.ninefbank.smallpay.clear.service.ITaskStartInHandService;
import com.ninefbank.smallpay.clear.util.ClearConfigUtils;
import com.ninefbank.smallpay.clear.util.JobParamsUtil;
import com.ninefbank.smallpay.common.exception.ApplicationException;
import com.ninefbank.smallpay.common.util.DateUtil;
import com.ninefbank.smallpay.common.util.SpringContextHolder;

/**
 *
 * TaskStartInHandServiceImpl
 *
 * ll
 * ll
 * 2015年12月23日 下午8:36:06
 *
 * @version 1.0.0
 *
 */
@Service("batchCreateReconFileService")
public class BatchCreateReconFileServiceImpl implements IBatchCreateReconFileService {

    private static Logger logger = LoggerFactory.getLogger(BatchCreateReconFileServiceImpl.class);

    // 工作线程池
    public static ExecutorService workThreadPool = Executors.newCachedThreadPool();

    /* (non-Javadoc)
     * @see com.ninefbank.smallpay.clear.service.ITaskStartInHandService#run(java.lang.String)
     * @param taskType:
     *             内部对账:innerReconSetJob(银行卡理财&委托收款、理财金转出、理财兑付)
     *             内部对账: innerReconOneSetJob(支付冲正、委托提现、代付)
     *             内部对账: innerReconTwoSetJob(提现冻结、提现划拨、提现解冻)
     */
    @Override
    public boolean run(Map<String, Object> params) throws ApplicationException {
        String jobId = (String)params.get("jobId");
        String startDate = (String)params.get("transDate");
        String endDate = (String)params.get("transDate1");

        logger.info("开始执行任务,jobId:{}", new Object[]{jobId});

        boolean flag = false;

        if(StringUtils.isBlank(jobId) || StringUtils.isBlank(startDate)){
            return false;
        }

        final String temp = jobId;
        final String startDateS = startDate;
        final String endDateS = endDate;
        try {
            workThreadPool.execute(getNamedThread(new Runnable() {

                @Override
                public void run() {
                    call((Job)SpringContextHolder.getBean(temp), startDateS, "startDate", endDateS,"endDate",temp);

                }
            }));

            flag = true;
        } catch (Exception e) {
            e.printStackTrace();
            flag = false;
        }

        return flag;

    }

    private void call(Job job, String startDate, String paramName,String endDate, String paramName1, String jobId) throws ApplicationException{
        JobLauncher launcher = SpringContextHolder.getBean("jobLauncher");
        JobExecution result = null;
        try {
            JobParametersBuilder builder = getJPB(jobId, startDate,endDate);
            result = launcher.run(job, builder.toJobParameters());

        } catch (Exception e) {
            logger.error("执行job失败,job名称:{}", new Object[]{job.getName()}, e);
            throw new ApplicationException("执行job失败");
        }

        ExitStatus es = result.getExitStatus();
        String exitCode = es.getExitCode();
        if (ExitStatus.COMPLETED.getExitCode().equals(exitCode)) {
            logger.info("任务执行完成,job名称:{};exitCode={};exitDesc={}", new Object[]{job.getName(), exitCode, es.getExitDescription()});

        } else {
            logger.debug("任务执行失败,job名称:{};exitCode={};exitDesc={}", new Object[]{job.getName(), exitCode, es.getExitDescription()});
        }
    }
    
    private JobParametersBuilder getJPB(String jobId, String transDate, String endDate){
        JobParametersBuilder builder = null;
        Date startDate = null;
    
        switch (jobId) {
                
            case "innerReconSetJob"://内部对账【银行卡理财、理财金转出、理财兑付】
                builder = new JobInnerParams().getJobParametersBuilder();
                startDate = DateUtil.getDateStartTime(transDate, ClearConstants.DATE_FORMAT_8);
                builder.addDate("startDate", startDate);
                builder.addDate("endDate", DateUtil.addDate(startDate, Calendar.DAY_OF_MONTH, 1));
                break;
            case "innerReconOneSetJob"://内部对账【支付冲正、委托提现、代付】
                builder = new JobInnerParams().getJobParametersBuilder();
                startDate = DateUtil.getDateStartTime(transDate, ClearConstants.DATE_FORMAT_8);
                builder.addDate("startDate", startDate);
                builder.addDate("endDate", DateUtil.addDate(startDate, Calendar.DAY_OF_MONTH, 1));
                break;
            case "innerReconTwoSetJob"://内部对账【提现冻结、提现划拨、提现解冻】
                builder = new JobInnerParams().getJobParametersBuilder();
                startDate = DateUtil.getDateStartTime(transDate, ClearConstants.DATE_FORMAT_8);
                builder.addDate("startDate", startDate);
                builder.addDate("endDate", DateUtil.addDate(startDate, Calendar.DAY_OF_MONTH, 1));
                break;
            case "batchCreateReconFileJob"://内部对账【提现冻结、提现划拨、提现解冻】
                builder = new JobInnerParams().getJobParametersBuilder();
                startDate = DateUtil.getDateStartTime(transDate, ClearConstants.DATE_FORMAT_8);
                builder.addString("startDate", transDate);
                builder.addString("endDate", endDate);
                String tempPath =ClearConfigUtils.CLEAR_PROPS.getProperty("fy_ftp_file_path")+"fixedLengthOutputFile.txt";
               
//                builder.addString(  
//                        "outputFilePath",  
//                        "E:\\batchRecon\\fixedLengthOutputFile.txt"); 
                builder.addString("outputFilePath",tempPath);
                break;
            default:
                builder = JobParamsUtil.getJobParametersBuilder();
                break;
        }
        
        return builder;
    }

    /**
     * 获取线程并setName
     *
     * @param command
     * @return
     */
    public Thread getNamedThread(Runnable command) {
        Thread thread = new Thread(command);
        // 设置线程name为"Clear."+方法名
        thread.setName("Clear." + Thread.currentThread().getStackTrace()[2].getMethodName());
        return thread;
    }

    
}

配置文件中的生成文件路径为:

fy_ftp_file_path=F:\\0921\\tmp\\ftp01\\dev\\

 

启动项目访问controler就可以了:ip :port/项目名/taskBatchCreateReconFileStart/run?jobId=batchCreateReconFileJob&transDate=2017-06-01&transDate1=2017-09-21

以上是关于利用spring batch 读取数据库中的数据写入到txt文件中的主要内容,如果未能解决你的问题,请参考以下文章

Spring Batch 并行读取数据库

Spring-batch学习总结—ItemReader普通文件,数据库,XML,多文件数据读取

Spring Batch中如何读取多个CSV文件合并数据进行处理?

为啥我的 Spring Batch 多线程步骤在任何处理之前执行所有读取?

Spring Batch在大型企业中的最佳实践

Spring Batch在大型企业中的最佳实践