并发5多线程并发解析单文件大数据了量解析入库,1800万数据8线程5分钟入库

Posted cutter_point

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发5多线程并发解析单文件大数据了量解析入库,1800万数据8线程5分钟入库相关的知识,希望对你有一定的参考价值。

1.首先机器要求8核,不然可能会慢点

2.数据库建表的时候,最后建那种nologging类型的表,不然归档日志满了,数据库入库会很慢,甚至丢数据,因为数据量很大,我们不可能一次性提交所有数据,只能分批提交

package com.ztesoft.interfaces.predeal.util;

import com.ztesoft.interfaces.predeal.bl.IHandle;

import java.io.ByteArrayOutputStream;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @ProjectName: cutter-point
 * @Package: io.bigData
 * @ClassName: CsvBigTask
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/3/8 11:03
 * @Version: 1.0
 */
public class CsvBigTask implements Runnable {

    //拦截器
    private CyclicBarrier cyclicBarrier;
    private AtomicLong atomicLong; //查询统计个数
    private long start; //文件读取起始位置
    private long totalSize; //结束位置
    private int buffSize;
    private byte[] buff; //读取缓冲大小
    private RandomAccessFile randomAccessFile; //随机读取文件
    private IHandle iHandle; //接口对象,用来实现业务逻辑
    private List tempData;

    public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, long start, long totalSize, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) {
        this.cyclicBarrier = cyclicBarrier;
        this.atomicLong = atomicLong;
        this.start = start;
        this.totalSize = totalSize;
        this.buffSize = buffSize;
        this.buff = new byte[buffSize];
        this.randomAccessFile = randomAccessFile;
        this.iHandle = iHandle;
    }

    public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) {
        this.cyclicBarrier = cyclicBarrier;
        this.atomicLong = atomicLong;
        this.start = partitionPair.getStart();
        this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1;
        this.buffSize = buffSize;
        this.buff = new byte[buffSize];
        this.randomAccessFile = randomAccessFile;
        this.iHandle = iHandle;
    }

    public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, List tempData, IHandle iHandle) {
        this.cyclicBarrier = cyclicBarrier;
        this.atomicLong = atomicLong;
        this.start = partitionPair.getStart();
        this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1;
        this.buffSize = buffSize;
        this.buff = new byte[buffSize];
        this.randomAccessFile = randomAccessFile;
        this.iHandle = iHandle;
        this.tempData = tempData;
    }

    @Override
    public void run() {

        MappedByteBuffer mappedByteBuffer = null;
        //1.读取文件映射到内存中
        try {
            //只读模式,不需要加锁,因为不涉及到资源的共享
            mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, start, this.totalSize);
            //2.读取指定内存大小,并判断是否有进行换行
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            //依次循环读取一定缓存的数据量
            for(int i = 0; i < totalSize; i += buffSize) {
                //确定会读取的数据
                int realReadLength = 0;
                if(i + buffSize > totalSize) {
                    //如果超出范围
                    realReadLength = (int) (totalSize - i);
                } else {
                    realReadLength = buffSize;
                }
                //3.如果进行了换行了,那么就清空一次输出,输出一行数据
                //读取一次数据,这里为0的原因是randomAccessFile会进行seek位置起始的索引
                //并且get之后,buffer会更新当前位置索引
                mappedByteBuffer.get(buff, 0, realReadLength);
                //遍历这个buf,确定是否需要进行调用业务逻辑
                for(int j = 0; j < realReadLength; ++j) {
                    //遍历每一个字符,判断是不是换行,如果遍历到了换行符,那么就进行处理
                    byte temp = buff[j];
                    if(temp == ‘
‘ || temp == ‘
‘) {
                        //这里要进行非空校验
                        String result = byteArrayOutputStream.toString("gbk");
                        if(result != null && !result.equals("")) {
                            iHandle.handle(result, false, tempData);
                            atomicLong.incrementAndGet();
                        }
                        //输出之后,置空文件
                        byteArrayOutputStream.reset();
                    } else if (temp == 0) {
                        break;
                    } else {
                        //如果不是换行符那么就把这个数据放入输出流缓存中
                        byteArrayOutputStream.write(temp);
                    }
                }
            }

            //4.最后清空一次缓冲数据,循环结束之后,如果output对象中还有数据没有清空,说明那就是最后一行
            if(byteArrayOutputStream.size() > 0) {
                String result = byteArrayOutputStream.toString("gbk");
                if(result != null && !result.equals("")) {
                    iHandle.handle(result, true, tempData);
                    atomicLong.incrementAndGet();
                }
                //输出之后,置空文件
                byteArrayOutputStream.reset();
            } else {
                //通知最后一行,如果为空
                iHandle.handle("", true, tempData);
            }
            //5.栅栏最后拦截完成
            cyclicBarrier.await();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

业务逻辑实现接口类

package com.ztesoft.interfaces.predeal.bl;

import java.util.List;

/**
 *
 * @program:
 * @description: 
 * @auther: xiaof
 * @date: 2019/3/1 18:08
 */
public interface IHandle {

    void handle(String line, boolean lastLine, List list);

}

 

一些辅助类,可要可不要,看业务逻辑

package com.ztesoft.interfaces.predeal.util;

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ProjectName: cutter-point
 * @Package: io.util
 * @ClassName: ConcurrentDateUtil
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/2/27 11:30
 * @Version: 1.0
 */
public class ConcurrentDateUtil {

    private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            //("yyyy/MM/dd HH:mm:ss");
            return new SimpleDateFormat("yyyyMMdd");
        }
    };

    private static ThreadLocal<DateFormat> threadLocalDateDir = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            //("yyyy/MM/dd HH:mm:ss");
            return new SimpleDateFormat("yyyy/MM/dd");
        }
    };

    private static ThreadLocal<DateFormat> threadDatabase = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            //("yyyy/MM/dd HH:mm:ss");
            return new SimpleDateFormat("yyyyMMddHHmmss");
        }
    };

    private static ThreadLocal<DateFormat> threadResourceFile = new ThreadLocal<DateFormat>(){
        @Override
        protected DateFormat initialValue() {
            //("yyyy/MM/dd HH:mm:ss");
            return new SimpleDateFormat("yyyyMMdd000000");
        }
    };

    public static Date parse(String dateStr) throws ParseException {
        return threadLocal.get().parse(dateStr);
    }

    public static Date parseDatabase(String dateStr) throws ParseException {
        return threadDatabase.get().parse(dateStr);
    }

    public static String format(Date date) {
        return threadLocal.get().format(date);
    }

    public static String formatDateDir(Date date) {
        return threadLocalDateDir.get().format(date);
    }

    public static Date parseDateDir(String dateStr) throws ParseException {
        return threadLocalDateDir.get().parse(dateStr);
    }

    public static String formatResourceFile(Date date) {
        return threadResourceFile.get().format(date);
    }
}

 

package com.ztesoft.interfaces.predeal.util;

import java.io.*;
import java.nio.charset.Charset;
import java.util.Enumeration;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;

/**
 * @ProjectName: cutter-point
 * @Package: io.util
 * @ClassName: CreateFileUtil
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/3/1 17:23
 * @Version: 1.0
 */
public class CreateFileUtil {

    public static boolean createFile(File file) throws IOException {

        if(file.getParentFile().exists()) {
            //如果上级存在,那么直接创建
            return file.createNewFile();
        } else {
            file.getParentFile().mkdir();
            return createFile(file);
        }
    }

    public static void unZipFile() {

    }


    /**
     *
     * @program: cn.cutter.common.util.ZipUtil
     * @description: 解压单个文件到当前目录
     * @auther: xiaof
     * @date: 2019/3/3 13:33
     */
    public static String unZipSingleFileCurrentDir(File zipFile) throws IOException {
        //1.获取解压文件
        ZipFile zipFile1 = new ZipFile(zipFile);
        String fileName = "";
        //2.循环压缩文件中的文件内容
        for(Enumeration enumeration = zipFile1.entries(); enumeration.hasMoreElements();) {
            //3.获取输出路径,也即是文件的父级目录
            ZipEntry entry = (ZipEntry) enumeration.nextElement();
            fileName = entry.getName();
            // 判断路径是否存在,不存在则创建文件路径
            InputStream in = zipFile1.getInputStream(entry);
            String outPath = zipFile.getParentFile().getPath();
            // 判断路径是否存在,不存在则创建文件路径
            File fileDir = zipFile.getParentFile();
            if (!fileDir.exists()) {
                fileDir.mkdirs();
            }

            //4.输出文件信息到当前目录
            FileOutputStream out = new FileOutputStream((outPath + "/" + fileName).replaceAll("\*", "/"));
            byte[] buf1 = new byte[1024];
            int len;
            while ((len = in.read(buf1)) > 0) {
                out.write(buf1, 0, len);
            }
            in.close();
            out.close();
        }

        return fileName;
    }


}

 

package com.ztesoft.interfaces.predeal.util;

import org.apache.log4j.Logger;

import java.io.*;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.sql.Date;
import java.text.ParseException;
import java.util.HashSet;
import java.util.Set;

/**
 * @ProjectName: cutter-point
 * @Package: io.util
 * @ClassName: DataUtil
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/2/21 14:38
 * @Version: 1.0
 */
public class DataUtil {

    private final static Logger logger = Logger.getLogger(DataUtil.class);

    /**
     *
     * @program: io.util.DataUtil
     * @description: 对数据进行分区
     * @auther: xiaof
     * @date: 2019/2/21 14:39
     */
    public static void partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile, Set partitionPairs) throws IOException {
        if(start > totalSize - 1) {
            return;
        }

        //每次获取length长度,并判断这个位置是否是换行符
        PartitionPair partitionPair = new PartitionPair();
        partitionPair.setStart(start);
        //判断这个length是否是换行符
        long index = start + length;

        //递归终止条件
        if(index > totalSize - 1) {
            //最后一个递归终止
            partitionPair.setEnd(totalSize - 1);
            partitionPairs.add(partitionPair);

        } else {
            //设置位置并读取一个字节
            randomAccessFile.seek(index);
            byte oneByte = randomAccessFile.readByte();
            //判断是否是换行符号,如果不是换行符,那么读取到换行符为止
            while(oneByte != ‘
‘ && oneByte != ‘
‘) {
                //不能越界
                if(++index > totalSize - 1) {
                    index = totalSize-1;
                    break;
                }

                randomAccessFile.seek(index);
                oneByte = randomAccessFile.readByte();
            }

            partitionPair.setEnd(index);
            //递归下一个位置
            partitionPairs.add(partitionPair);

            partition(index + 1, length, totalSize, randomAccessFile, partitionPairs);
        }

    }


    /**
     *
     * @program: io.util.DataUtil
     * @description: 分片数据
     * @auther: xiaof
     * @date: 2019/2/22 15:20
     */
    public static Set partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile) throws IOException {
        if(start > totalSize - 1) {
            return null;
        }

        //每次获取length长度,并判断这个位置是否是换行符
        Set partitionPairs = new HashSet();
        PartitionPair partitionPair = new PartitionPair();
        partitionPair.setStart(start);
        //判断这个length是否是换行符
        long index = start + length;

        //递归终止条件
        if(index > totalSize - 1) {
            //最后一个递归终止
            partitionPair.setEnd(totalSize - 1);
            partitionPairs.add(partitionPair);
            return partitionPairs;

        } else {
            //设置位置并读取一个字节
            randomAccessFile.seek(index);
            byte oneByte = randomAccessFile.readByte();
            //判断是否是换行符号,如果不是换行符,那么读取到换行符为止
            while(oneByte != ‘
‘ && oneByte != ‘
‘) {
                //不能越界
                if(++index > totalSize - 1) {
                    index = totalSize-1;
                    break;
                }

                randomAccessFile.seek(index);
                oneByte = randomAccessFile.readByte();
            }

            partitionPair.setEnd(index);
            //递归下一个位置
            partitionPairs.add(partitionPair);

            partitionPairs.addAll(partition(index + 1, length, totalSize, randomAccessFile));
        }

        return partitionPairs;
    }

    public static Date getSQLDateThreadSafe(String dateStr) throws ParseException {
        return new Date(ConcurrentDateUtil.parse(dateStr).getTime());
    }

    /**
     *  复制单个文件,这里考虑使用文件锁,保证线程安全
     *  @param  oldPath  String  原文件路径  如:c:/fqf.txt
     *  @param  newPath  String  复制后路径  如:f:/fqf.txt
     *  @return  boolean
     */
    public static void copyFile(String  oldPath,  String  newPath) throws Exception {
//        int  byteread  =  0;
        File oldfile  =  new  File(oldPath);
        if  (oldfile.exists())  {  //文件存在时
            //对文件枷锁,然后进行复制操作,
            InputStream inStream  =  new  FileInputStream(oldPath);  //读入原文件
            FileOutputStream fs  =  new  FileOutputStream(newPath);
            FileChannel fileChannel = fs.getChannel();
            //开始加锁
            FileLock fileLock = null;
            try {
                while (true) {
                    fileLock = fileChannel.lock(); //直接上锁
                    if(fileLock != null) {
                        break;
                    } else {
                        //文件无法被锁定,1s后尝试
                        logger.warn(oldPath + " 文件无法被锁定,1s后尝试");
                        Thread.sleep(1000);
                    }
                }

                //开始拷贝数据
                byte[] buf = new byte[2048];
                int len = 0;
                while((len = inStream.read(buf)) != -1) {
                    fs.write(buf, 0, len);
                }

                //刷新
                fileLock.release();
                fs.flush();
                fs.close();
                inStream.close();

            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                if(fileLock.isValid()) {

                    fileLock.release();
                }
            }
        }

    }

    /**
     *
     * @program: com.ztesoft.interfaces.predeal.util.DataUtil
     * @description: 删除文件
     * @auther: xiaof
     * @date: 2019/3/5 18:08
     */
    public static void deletFile(String filePath) {
        File file = new File(filePath);
        file.delete();
    }

}

 

package com.ztesoft.interfaces.predeal.util;

/**
 * @ProjectName: cutter-point
 * @Package: io.util
 * @ClassName: PartitionPair
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/2/21 14:36
 * @Version: 1.0
 */
public class PartitionPair {
    private long start;
    private long end;

    @Override
    public String toString() {
        return "start="+start+";end="+end;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + (int) (end ^ (end >>> 32));
        result = prime * result + (int) (start ^ (start >>> 32));
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PartitionPair other = (PartitionPair) obj;
        if (end != other.end)
            return false;
        return start == other.start;
    }

    public long getStart() {
        return start;
    }

    public void setStart(long start) {
        this.start = start;
    }

    public long getEnd() {
        return end;
    }

    public void setEnd(long end) {
        this.end = end;
    }
}

 

这里开始,我们实战使用这个方法解析入库

package com.ztesoft.interfaces.predeal.bl;

import com.ztesoft.interfaces.common.bll.CommonQueuePool;
import com.ztesoft.interfaces.common.vo.CommonQueueVo;
import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant;
import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao;
import com.ztesoft.isa.service.common.util.SpringContextUtil;
import org.apache.commons.collections.MapUtils;
import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @ProjectName: 湖北移动智慧装维支撑系统
 * @Package: com.ztesoft.interfaces.predeal.bl
 * @ClassName: PreDealResourceServer
 * @Author: xiaof
 * @Description: 预处理综资ftp服务数据同步
 * @Date: 2019/3/8 16:21
 * @Version: 1.0
 */
public class PreDealResourceServer extends Thread {

    private Logger logger = Logger.getLogger(PreDealResourceServer.class);

    private static Map<String, Object> config = new HashMap<>();

    //获取dao层操作类
    private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao");

    private CommonQueuePool commonQueuePool;

    //配置线程池数量
    private ScheduledExecutorService service;

    @Override
    public void run() {
        //        1.启动定时线程,设置定时启动时间(通过定时器),要求定时可配置,最好实时生效(这里目前考虑取消定时器,重新生成定时器)
        Map paramMap = new HashMap();
        paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
        paramMap.put("pkey", PreDealResourceConstrant.SERVER_KEY_INFO);
        paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap);
//            codea-线程池数量
//            codeb-队列长度
        service = Executors.newScheduledThreadPool(MapUtils.getInteger(paramMap, "CODEA", 8));
        commonQueuePool = new CommonQueuePool<CommonQueueVo>(MapUtils.getInteger(paramMap, "CODEB", 3000));

        //启动资源信息生产者
        this.startResourceProducer();
        //启动资源信息消费者
        this.startResourceConsum();
    }

    private void startResourceProducer() {
        //设置生产线程&消费线程
        Map producerMap = new HashMap();
        producerMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
        producerMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_THREAD_KEY_INFO);
        producerMap = preDealResourceDao.qryPreDealResourceConfig(producerMap);
        //获取启动时间,和获取间隔时间
        int initialDelay = MapUtils.getInteger(producerMap, "CODEA", 30);
        int period = MapUtils.getInteger(producerMap, "CODEB", 86400);
        PreDealResourceProducer preDealResourceProducer = new PreDealResourceProducer(commonQueuePool);
        Future preDealResourceUserFuture = service.scheduleAtFixedRate(preDealResourceProducer, initialDelay, period, TimeUnit.SECONDS);

        //吧结果存放进入map中,以便后面更新间隔时间
        List preDealAAAUserFutureList = new ArrayList();
        preDealAAAUserFutureList.add(preDealResourceUserFuture);
        config.put(MapUtils.getString(producerMap, "CODEC", "ProducerResourceThread"), preDealAAAUserFutureList);
    }

    private void startResourceConsum() {
        //启动消费者
        Map consumMap = new HashMap();
        consumMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
        consumMap.put("pkey", PreDealResourceConstrant.CONSUM_RESOURCE_THREAD_KEY_INFO);
        consumMap = preDealResourceDao.qryPreDealResourceConfig(consumMap);
        int initialDelay = MapUtils.getInteger(consumMap, "CODEA", 30);
        int threadNum = MapUtils.getInteger(consumMap, "CODEB", 3);
        List preDealAAAUserFutureList = new ArrayList();
        for(int i = 0; i < threadNum; ++i) {
            PreDealResourceConsum preDealResourceConsum = new PreDealResourceConsum(commonQueuePool);
            Future future = service.schedule(preDealResourceConsum, initialDelay, TimeUnit.SECONDS);
            //吧结果存放进入map中,以便后面更新间隔时间
            preDealAAAUserFutureList.add(future);
        }
        config.put(MapUtils.getString(consumMap, "CODEC", "ConsumResourceThread"), preDealAAAUserFutureList);
    }

    /**
     * 启动,多态方式,避免重复启动
     */
    public void start(String... args) {
        this.start();
    }

    public static Map<String, Object> getConfig() {
        return config;
    }

    public static void setConfig(Map<String, Object> config) {
        PreDealResourceServer.config = config;
    }

}

 

package com.ztesoft.interfaces.predeal.bl;

import com.ztesoft.interfaces.common.bll.CommonQueuePool;
import com.ztesoft.interfaces.common.invoke.FtpInvoke;
import com.ztesoft.interfaces.common.util.FtpTemplate;
import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant;
import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao;
import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto;
import com.ztesoft.interfaces.predeal.util.ConcurrentDateUtil;
import com.ztesoft.interfaces.predeal.util.DataUtil;
import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo;
import com.ztesoft.isa.service.common.util.SpringContextUtil;
import com.ztesoft.services.common.CommonHelper;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.log4j.Logger;

import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.*;

/**
 * @ProjectName: 湖北移动智慧装维支撑系统
 * @Package: com.ztesoft.interfaces.predeal.bl
 * @ClassName: PreDealResourceProducer
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/3/10 15:03
 * @Version: 1.0
 */
public class PreDealResourceProducer implements Runnable {


    private final static Logger logger = Logger.getLogger(PreDealResourceProducer.class);

    //这个是要用来控制多线的队列
    private final CommonQueuePool commonQueuePool;

    //获取dao层操作类
    private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao");

    public PreDealResourceProducer(CommonQueuePool commonQueuePool) {
        this.commonQueuePool = commonQueuePool;
    }

    @Override
    public void run() {
//        2.定时启动之后,扫描ftp文件,下载到本地(并记录数据),(要求本地上传如果有文件,要能直接开始入库,不用通过远程服务器)
        try {
            //1.ftp连接服务器,获取AAAftp服务器
            Map paramMap = new HashMap();
            paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            paramMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_KEY);
            paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap);
            String resourceIp = MapUtils.getString(paramMap, "CODEA", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_IP);
            int resourcePort = MapUtils.getInteger(paramMap, "CODEB", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PORT);
            String resourceUserName = MapUtils.getString(paramMap, "CODEC", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_USERNAME);
            String resourcePasswd = MapUtils.getString(paramMap, "CODED", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PASSWD);
            String resourceRemoteDir = MapUtils.getString(paramMap, "CODEE", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_REMOTEDIR);
            String resourceLocalDir = MapUtils.getString(paramMap, "CODEH", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_LOCALDIR);
            String resourceDeleteMark = MapUtils.getString(paramMap, "CODEG", "0");
            //PRODUCER_RESOURCE_CONSUM_LOCALDIR
            String resourceConsumLocalDir = MapUtils.getString(paramMap, "CODEI", PreDealResourceConstrant.PRODUCER_RESOURCE_CONSUM_LOCALDIR);

//            resourceLocalDir = "D:\湖北移动\任务\预处理\综资\ftp文件\sync";
//            resourceConsumLocalDir = resourceLocalDir + "\consum";

            //获取需要下载的文件目录,不在包含的文件里面的,那么就不用下载
            Map paramMapFile = new HashMap();
            paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE);
            paramMapFile = preDealResourceDao.qryPreDealResourceConfig(paramMapFile);
            String fileNameContain = MapUtils.getString(paramMapFile, "CODEA", "");
            String endWith = MapUtils.getString(paramMapFile, "CODEB", "");

            //1.获取指定时间
            FtpTemplate ftpTemplate = new FtpTemplate(resourceIp, resourcePort, resourceUserName, resourcePasswd);
            //2.下载文件进入本地服务器,并删除服务器上文件(删除操作,我们做个开关,并且只有所有数据下载完成才能删除)
            try {
                ftpTemplate.operatorByPathAndName(resourceRemoteDir, resourceLocalDir, new FtpInvoke() {
                    @Override
                    public void doOperator(FTPClient ftp, String remoteDir, String localDir) throws Exception {
                        // 转移到FTP服务器目录至指定的目录下
                        //1.获取远程文件,并存放进入队列汇总等待处理
                        //2.存放进入本地路径
                        //设置被动模式,服务端开端口给我用
                        ftp.enterLocalPassiveMode();
                        //获取所有文件
                        FTPFile[] ftpFile = ftp.listFiles(remoteDir);
                        //建立输出到本地文件的输出流
                        OutputStream outputStream = null;

                        for(int i = 0; i < ftpFile.length; ++i) {
                            String ftpFileName = ftpFile[i].getName();
                            //判断文件过滤规则
                            String localFilePath = localDir + "/" + ftpFileName;
                            File file = new File(localFilePath);
                            //判断本地是否已经存在,如果存在,那么也不用获取,并且只获取
                            if(fileNameContain.contains(ftpFileName.substring(0, ftpFileName.lastIndexOf("_")))
                                    && !file.exists() && ftpFileName.contains(endWith)) { //判断是话单文件
                                String filePath = remoteDir + "/" + ftpFileName;
                                //下载文件
                                outputStream = new FileOutputStream(localDir + "/" + ftpFileName);
                                ftp.retrieveFile(filePath, outputStream);
                                logger.info("下载文件:" + ftpFileName + " 完成");
                                if(outputStream != null) {
                                    outputStream.flush();
                                    outputStream.close();
                                    outputStream = null;
                                }
                            }
                        }

                        if(outputStream != null) {
                            outputStream.flush();
                            outputStream.close();
                            outputStream = null;
                        }


                    }
                });
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            //3.下载到本地之后,我们遍历本地所有文件
            File localDir = new File(resourceLocalDir);
            //获取本地文件的所有文件
            if(!localDir.exists()) {
                localDir.mkdirs();
            }
            //4.获取所有文件(不是目录)之后,解析文件名,并依次吧数据送入队列中,
            File[] recFiles = localDir.listFiles();
            for(int i = 0; i < recFiles.length; ++i) {
                if(recFiles[i].isDirectory()) {
                    continue; //目录不操作
                }
                //移动文件数据到相应的日期目录
                String fileName = recFiles[i].getName();
                String filePath = recFiles[i].getPath();
                Calendar calendar = Calendar.getInstance();//获取当前日期
                String consumLocalDir = resourceConsumLocalDir + "/" + ConcurrentDateUtil.formatDateDir(calendar.getTime());
                File file = new File(consumLocalDir);
                if(!file.exists()) {
                    file.mkdirs();
                }
                //5.记录同步日志记录
                //入库存放一条记录
                PreDealResourceVo preDealResourceVo = new PreDealResourceVo();
                String desPath = consumLocalDir + "/" + fileName;
                long omPredealSyncLogSeq = CommonHelper.getCommonDAO().getSeqNextVal(PreDealResourceConstrant.OM_PREDEAL_SYNC_LOG_SEQ);
                OmPredealSyncLogDto omPredealSyncLogDto = new OmPredealSyncLogDto();
                omPredealSyncLogDto.setId(String.valueOf(omPredealSyncLogSeq));
                omPredealSyncLogDto.setCreateTime(new Date());
                omPredealSyncLogDto.setUpdateTime(new Date());
                omPredealSyncLogDto.setState("0");
                omPredealSyncLogDto.setTimes("0");
                omPredealSyncLogDto.setLogName(recFiles[i].getName() + "资源信息同步");
                omPredealSyncLogDto.setLogType(PreDealResourceConstrant.PRE_DEAL_RESOURCE_LOG_TYPE);
                omPredealSyncLogDto.setRemark(filePath);
                omPredealSyncLogDto.add();
                //先交给消费者消费,只有当消费完毕,我们才移动和删除文件
                preDealResourceVo.setOmPredealSyncLogDto(omPredealSyncLogDto);
                preDealResourceVo.setFileName(recFiles[i].getName());
                preDealResourceVo.setFileFullPath(desPath);

                //移动数据进入对应的消费目录
                DataUtil.copyFile(filePath, desPath);
                DataUtil.deletFile(filePath);

                commonQueuePool.put(preDealResourceVo);
            }
            //更新配置信息,由于文件是生成前就会删除,所以这个逻辑就不做了,有文件就全下载下来
            paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE);
            //更新为今天
            paramMapFile.put("codeb", ConcurrentDateUtil.formatResourceFile(new Date()));
            preDealResourceDao.updateConfig(paramMapFile);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}

 

package com.ztesoft.interfaces.predeal.bl;

import com.ztesoft.interfaces.common.bll.CommonQueuePool;
import com.ztesoft.interfaces.common.bll.CommonQueueWork;
import com.ztesoft.interfaces.common.vo.CommonQueueVo;
import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo;
import org.apache.log4j.Logger;

/**
 * @ProjectName: 湖北移动智慧装维支撑系统
 * @Package: com.ztesoft.interfaces.predeal.bl
 * @ClassName: PreDealResourceConsum
 * @Author: xiaof
 * @Description: ${description}
 * @Date: 2019/3/10 14:44
 * @Version: 1.0
 */
public class PreDealResourceConsum implements Runnable {

    private Logger logger = Logger.getLogger(PreDealResourceConsum.class);

    //这个是要用来控制多线的队列
    private final CommonQueuePool commonQueuePool;
    public PreDealResourceConsum(CommonQueuePool commonQueuePool) {
        this.commonQueuePool = commonQueuePool;
    }

    @Override
    public void run() {
        //只要不为空,那么就可以一直取
        while(true) {
            //3.删除远端路径文件(考虑延后处理,比如解析完成之后删除),那么这里我们放消费者中处理
            try {
                CommonQueueVo commonQueueVo = (CommonQueueVo) commonQueuePool.take();
                CommonQueueWork commonQueueWork = null;
                //这里进行区分类型
                if(commonQueueVo instanceof PreDealResourceVo) {
                    commonQueueWork = new PreDealResourceWork();
                }

                if(commonQueueWork != null) {
                    commonQueueWork.doWork(commonQueueVo);
                }
            } catch (InterruptedException e) {
//                e.printStackTrace();
                logger.error(e.getMessage(), e);
            }

        }
    }
}

 

最后核心解析类实现

package com.ztesoft.interfaces.predeal.bl;

import com.ztesoft.interfaces.common.bll.CommonQueueWork;
import com.ztesoft.interfaces.common.vo.CommonQueueVo;
import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant;
import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao;
import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto;
import com.ztesoft.interfaces.predeal.util.CreateFileUtil;
import com.ztesoft.interfaces.predeal.util.CsvBigTask;
import com.ztesoft.interfaces.predeal.util.DataUtil;
import com.ztesoft.interfaces.predeal.util.PartitionPair;
import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo;
import com.ztesoft.isa.service.common.util.SpringContextUtil;
import org.apache.commons.collections.MapUtils;
import org.apache.log4j.Logger;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.*;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @ProjectName: 湖北移动智慧装维支撑系统
 * @Package: com.ztesoft.interfaces.predeal.bl
 * @ClassName: PreDealResourceWork
 * @Author: xiaof
 * @Description: 解析资源ftp文件,并准备入库
 * @Date: 2019/3/10 15:00
 * @Version: 1.0
 */
public class PreDealResourceWork implements CommonQueueWork {

    private final static Logger logger = Logger.getLogger(PreDealResourceWork.class);

    private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao");

    private static ThreadLocal<List> tempDataThreadLocal = new ThreadLocal<List>() {
        @Override
        protected List initialValue() {
            return new ArrayList();
        }
    };

    @Override
    public void doWork(CommonQueueVo commonQueueVo) {

        PreDealResourceVo preDealResourceVo = (PreDealResourceVo) commonQueueVo;
        OmPredealSyncLogDto omPredealSyncLogDto = preDealResourceVo.getOmPredealSyncLogDto();
        //1.获取文件
        logger.info("===========1开始解压文件=======" + System.currentTimeMillis());
        File zipFile = new File(preDealResourceVo.getFileFullPath());
        //解压文件
        try {
            String fileName = CreateFileUtil.unZipSingleFileCurrentDir(zipFile);
            String tableName = this.getTableName(fileName);
            //2.判断文件是否存在
            logger.info("===========2判断文件是否存在=======" + System.currentTimeMillis());
            File resourceFile = new File(zipFile.getParent() + "/" + fileName);
            if(!resourceFile.exists()) {
                omPredealSyncLogDto.setState("0");
                omPredealSyncLogDto.setUpdateTime(new Date());
                omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + "");
                omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "
" + resourceFile.getPath() +  " 文件不存在");
                return;
            }
            //3.获取随机文件索引
            logger.info("===========3获取随机文件索引=======" + System.currentTimeMillis());
            RandomAccessFile randomAccessFile = new RandomAccessFile(resourceFile, "r");
            long length = resourceFile.length();
            //文件大小1G,获取处理器核心数
            int availProcessors = Runtime.getRuntime().availableProcessors();
            logger.info("===========3。1可使用线程=======" + availProcessors);
            long blockLength = length / availProcessors;
            byte b = ‘0‘;
            int index = 0;
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            while(b != ‘
‘ && b != ‘
‘) {
                b = randomAccessFile.readByte();
                byteArrayOutputStream.write(b);
                ++index;
            }
            //获取首行
            String[] firstLine = byteArrayOutputStream.toString().split("	");
            //文件分片
            Set pairSets = DataUtil.partition(index, blockLength, length, randomAccessFile);
            //4.数据分片之后,分别启动处理线程
            logger.info("===========4数据分片之后,分别启动处理线程=======" + pairSets + " 时间:" + System.currentTimeMillis());
            final long startTime = System.currentTimeMillis();
            AtomicLong atomicLong = new AtomicLong(0);
            //6.数据入库,这里做成可配置模式
            logger.info("===========5数据入库=======" + System.currentTimeMillis());
            Map workConfigMap = new HashMap();
            workConfigMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            workConfigMap.put("pkey", PreDealResourceConstrant.PRE_DEAL_RESOURCE_WORK);
            workConfigMap = preDealResourceDao.qryPreDealResourceConfig(workConfigMap);
            int mapSize = MapUtils.getInteger(workConfigMap, "CODEA", 1000);
            int bufSize = MapUtils.getInteger(workConfigMap, "CODEB", 3) * 1024 * 1024;
            String splitMark = MapUtils.getString(workConfigMap, "CODEC", "	");
            //先清空表数据,备份数据放在文件中
            initTable(tableName, omPredealSyncLogDto);

            CyclicBarrier cyclicBarrier = new CyclicBarrier(pairSets.size(), new Runnable() {
                @Override
                public void run() {
                    //吧最后的数据提交上去
                    logger.info("===========5数据入库结束=======" + System.currentTimeMillis());
                    omPredealSyncLogDto.setUpdateTime(new Date());
                    omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + "");
                    omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "
" + resourceFile.getPath() +  " 入库完毕 use time: "+(System.currentTimeMillis()-startTime));
                    omPredealSyncLogDto.setState("1");
                    omPredealSyncLogDto.setNewTotal(String.valueOf(atomicLong.get()));
                    omPredealSyncLogDto.update();
                }
            });

            for(Object pair : pairSets) {
//                List tempData = new ArrayList<>();
                PartitionPair partitionPair = (PartitionPair) pair;
                CsvBigTask csvBigTask = new CsvBigTask(cyclicBarrier, atomicLong, partitionPair, bufSize, randomAccessFile, new ArrayList(), new IHandle() {
                    @Override
                    public void handle(String line, boolean lastLine, List tempData) {
                        try {
                            //转换数据为map
//                            logger.info("===========5读取数据=======" + line + System.currentTimeMillis());
                            if(!line.equals("")) {
                                String[] elements = line.split(splitMark);
                                tempData.add(elements);
                            }

                            //每个3000条一提交
                            if((tempData.size() % mapSize == 0 && tempData.size() >= mapSize) || lastLine) {
                                preDealResourceDao.addResourceBatch(tableName, firstLine, tempData);
                                tempData.clear();
//                                logger.info("===========6批量提交数据=======" + System.currentTimeMillis());
                            }
                        } catch (Exception e) {
                            logger.error(line + e.getMessage(), e);
                        }
                    }
                });
                Thread thread = new Thread(csvBigTask);
                thread.start();
            }


        } catch (IOException e) {
            omPredealSyncLogDto.setState("0");
            omPredealSyncLogDto.setUpdateTime(new Date());
            omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + "");
            omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "
" + preDealResourceVo.getFileFullPath() + e.getMessage());
            logger.error(e.getMessage(), e);
        } finally {
            //7.最后更新日志
            logger.info("===========7最后更新日志=======" + System.currentTimeMillis());
            omPredealSyncLogDto.update();
        }
    }

    private String getTableName(String fileName) {
        String tableName = "";
        tableName = PreDealResourceConstrant.PRE_RESOURCE_TABLE + "_" + fileName.substring(0, fileName.lastIndexOf("_"));
        return tableName;
    }

    private void initTable(String tableName, OmPredealSyncLogDto omPredealSyncLogDto) {
        preDealResourceDao.truncateResourceTable(omPredealSyncLogDto, tableName);
    }

}

 

 

最后公布一小段入库代码,这个就不全给了

PreDealResourceDaoImpl

 

@Override
    public void addResourceBatch(String tableName, String[] fields, List params) {
        try {
//            String sql = "insert into PRE_AAA_STAFF (domain_name,internet_account,brasid,band_info,registTime,FirstUseTime,lastLoginTime,status,bandwidth_M,broadband_account,CREATE_DATE,UPDATE_DATE)
" +
//                    "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?,sysdate,sysdate) ";
            //组装sql
            StringBuffer stringBuffer = new StringBuffer(" insert /*+ append */ into " + tableName + " (");
            StringBuffer stringValue = new StringBuffer(" values (");
//            Set<String> keySet = tableMap.keySet();
            for(int i = 0; i < fields.length; ++i) {
                stringBuffer.append(fields[i] + ",");
                stringValue.append("?,");
            }

            //添加时间
            stringBuffer.append("create_date, update_date,");
            stringValue.append("sysdate,sysdate,");

            stringBuffer = stringBuffer.deleteCharAt(stringBuffer.length() - 1).append(" ) ");
            stringValue = stringValue.deleteCharAt(stringValue.length() - 1).append(")");
            String sql = stringBuffer.append(stringValue).toString();

            super.getJdbcOperations().batchUpdate(sql, new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    Object[] values = (Object[]) params.get(i);
                    for(int j = 0; j < fields.length; ++j) {
                        if(j < values.length) {
                            ps.setString(j + 1, String.valueOf(values[j]));
                        } else {
                            ps.setObject(j + 1, null);
                        }
                    }
                }

                @Override
                public int getBatchSize() {
                    return params.size();
                }
            });

            logger.info("====" + tableName + "入库 " + params.size() + " 条");

        } catch (DataAccessException e) {
            logger.error(e.getMessage(), e);
        }
    }

 

以上是关于并发5多线程并发解析单文件大数据了量解析入库,1800万数据8线程5分钟入库的主要内容,如果未能解决你的问题,请参考以下文章

多线程文件并发操作相关题目解析

并发实战:多线程处理任务,结束后,执行后续操作

Java中的并发工具类

2020年Java多线程与并发系列22道高频面试题(附思维导图和答案解析)

MongoDB大数据高并发读写性能测试报告

python 复习—并发编程实战——并发编程总结