并发5多线程并发解析单文件大数据了量解析入库,1800万数据8线程5分钟入库
Posted cutter_point
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发5多线程并发解析单文件大数据了量解析入库,1800万数据8线程5分钟入库相关的知识,希望对你有一定的参考价值。
1.首先机器要求8核,不然可能会慢点
2.数据库建表的时候,最后建那种nologging类型的表,不然归档日志满了,数据库入库会很慢,甚至丢数据,因为数据量很大,我们不可能一次性提交所有数据,只能分批提交
package com.ztesoft.interfaces.predeal.util; import com.ztesoft.interfaces.predeal.bl.IHandle; import java.io.ByteArrayOutputStream; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicLong; /** * @ProjectName: cutter-point * @Package: io.bigData * @ClassName: CsvBigTask * @Author: xiaof * @Description: ${description} * @Date: 2019/3/8 11:03 * @Version: 1.0 */ public class CsvBigTask implements Runnable { //拦截器 private CyclicBarrier cyclicBarrier; private AtomicLong atomicLong; //查询统计个数 private long start; //文件读取起始位置 private long totalSize; //结束位置 private int buffSize; private byte[] buff; //读取缓冲大小 private RandomAccessFile randomAccessFile; //随机读取文件 private IHandle iHandle; //接口对象,用来实现业务逻辑 private List tempData; public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, long start, long totalSize, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) { this.cyclicBarrier = cyclicBarrier; this.atomicLong = atomicLong; this.start = start; this.totalSize = totalSize; this.buffSize = buffSize; this.buff = new byte[buffSize]; this.randomAccessFile = randomAccessFile; this.iHandle = iHandle; } public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) { this.cyclicBarrier = cyclicBarrier; this.atomicLong = atomicLong; this.start = partitionPair.getStart(); this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1; this.buffSize = buffSize; this.buff = new byte[buffSize]; this.randomAccessFile = randomAccessFile; this.iHandle = iHandle; } public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, List tempData, IHandle iHandle) { this.cyclicBarrier = cyclicBarrier; this.atomicLong = atomicLong; this.start = partitionPair.getStart(); this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1; this.buffSize = buffSize; this.buff = new byte[buffSize]; this.randomAccessFile = randomAccessFile; this.iHandle = iHandle; this.tempData = tempData; } @Override public void run() { MappedByteBuffer mappedByteBuffer = null; //1.读取文件映射到内存中 try { //只读模式,不需要加锁,因为不涉及到资源的共享 mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, start, this.totalSize); //2.读取指定内存大小,并判断是否有进行换行 ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); //依次循环读取一定缓存的数据量 for(int i = 0; i < totalSize; i += buffSize) { //确定会读取的数据 int realReadLength = 0; if(i + buffSize > totalSize) { //如果超出范围 realReadLength = (int) (totalSize - i); } else { realReadLength = buffSize; } //3.如果进行了换行了,那么就清空一次输出,输出一行数据 //读取一次数据,这里为0的原因是randomAccessFile会进行seek位置起始的索引 //并且get之后,buffer会更新当前位置索引 mappedByteBuffer.get(buff, 0, realReadLength); //遍历这个buf,确定是否需要进行调用业务逻辑 for(int j = 0; j < realReadLength; ++j) { //遍历每一个字符,判断是不是换行,如果遍历到了换行符,那么就进行处理 byte temp = buff[j]; if(temp == ‘ ‘ || temp == ‘ ‘) { //这里要进行非空校验 String result = byteArrayOutputStream.toString("gbk"); if(result != null && !result.equals("")) { iHandle.handle(result, false, tempData); atomicLong.incrementAndGet(); } //输出之后,置空文件 byteArrayOutputStream.reset(); } else if (temp == 0) { break; } else { //如果不是换行符那么就把这个数据放入输出流缓存中 byteArrayOutputStream.write(temp); } } } //4.最后清空一次缓冲数据,循环结束之后,如果output对象中还有数据没有清空,说明那就是最后一行 if(byteArrayOutputStream.size() > 0) { String result = byteArrayOutputStream.toString("gbk"); if(result != null && !result.equals("")) { iHandle.handle(result, true, tempData); atomicLong.incrementAndGet(); } //输出之后,置空文件 byteArrayOutputStream.reset(); } else { //通知最后一行,如果为空 iHandle.handle("", true, tempData); } //5.栅栏最后拦截完成 cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } } }
业务逻辑实现接口类
package com.ztesoft.interfaces.predeal.bl; import java.util.List; /** * * @program: * @description: * @auther: xiaof * @date: 2019/3/1 18:08 */ public interface IHandle { void handle(String line, boolean lastLine, List list); }
一些辅助类,可要可不要,看业务逻辑
package com.ztesoft.interfaces.predeal.util; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; /** * @ProjectName: cutter-point * @Package: io.util * @ClassName: ConcurrentDateUtil * @Author: xiaof * @Description: ${description} * @Date: 2019/2/27 11:30 * @Version: 1.0 */ public class ConcurrentDateUtil { private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>(){ @Override protected DateFormat initialValue() { //("yyyy/MM/dd HH:mm:ss"); return new SimpleDateFormat("yyyyMMdd"); } }; private static ThreadLocal<DateFormat> threadLocalDateDir = new ThreadLocal<DateFormat>(){ @Override protected DateFormat initialValue() { //("yyyy/MM/dd HH:mm:ss"); return new SimpleDateFormat("yyyy/MM/dd"); } }; private static ThreadLocal<DateFormat> threadDatabase = new ThreadLocal<DateFormat>(){ @Override protected DateFormat initialValue() { //("yyyy/MM/dd HH:mm:ss"); return new SimpleDateFormat("yyyyMMddHHmmss"); } }; private static ThreadLocal<DateFormat> threadResourceFile = new ThreadLocal<DateFormat>(){ @Override protected DateFormat initialValue() { //("yyyy/MM/dd HH:mm:ss"); return new SimpleDateFormat("yyyyMMdd000000"); } }; public static Date parse(String dateStr) throws ParseException { return threadLocal.get().parse(dateStr); } public static Date parseDatabase(String dateStr) throws ParseException { return threadDatabase.get().parse(dateStr); } public static String format(Date date) { return threadLocal.get().format(date); } public static String formatDateDir(Date date) { return threadLocalDateDir.get().format(date); } public static Date parseDateDir(String dateStr) throws ParseException { return threadLocalDateDir.get().parse(dateStr); } public static String formatResourceFile(Date date) { return threadResourceFile.get().format(date); } }
package com.ztesoft.interfaces.predeal.util; import java.io.*; import java.nio.charset.Charset; import java.util.Enumeration; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; /** * @ProjectName: cutter-point * @Package: io.util * @ClassName: CreateFileUtil * @Author: xiaof * @Description: ${description} * @Date: 2019/3/1 17:23 * @Version: 1.0 */ public class CreateFileUtil { public static boolean createFile(File file) throws IOException { if(file.getParentFile().exists()) { //如果上级存在,那么直接创建 return file.createNewFile(); } else { file.getParentFile().mkdir(); return createFile(file); } } public static void unZipFile() { } /** * * @program: cn.cutter.common.util.ZipUtil * @description: 解压单个文件到当前目录 * @auther: xiaof * @date: 2019/3/3 13:33 */ public static String unZipSingleFileCurrentDir(File zipFile) throws IOException { //1.获取解压文件 ZipFile zipFile1 = new ZipFile(zipFile); String fileName = ""; //2.循环压缩文件中的文件内容 for(Enumeration enumeration = zipFile1.entries(); enumeration.hasMoreElements();) { //3.获取输出路径,也即是文件的父级目录 ZipEntry entry = (ZipEntry) enumeration.nextElement(); fileName = entry.getName(); // 判断路径是否存在,不存在则创建文件路径 InputStream in = zipFile1.getInputStream(entry); String outPath = zipFile.getParentFile().getPath(); // 判断路径是否存在,不存在则创建文件路径 File fileDir = zipFile.getParentFile(); if (!fileDir.exists()) { fileDir.mkdirs(); } //4.输出文件信息到当前目录 FileOutputStream out = new FileOutputStream((outPath + "/" + fileName).replaceAll("\*", "/")); byte[] buf1 = new byte[1024]; int len; while ((len = in.read(buf1)) > 0) { out.write(buf1, 0, len); } in.close(); out.close(); } return fileName; } }
package com.ztesoft.interfaces.predeal.util; import org.apache.log4j.Logger; import java.io.*; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.sql.Date; import java.text.ParseException; import java.util.HashSet; import java.util.Set; /** * @ProjectName: cutter-point * @Package: io.util * @ClassName: DataUtil * @Author: xiaof * @Description: ${description} * @Date: 2019/2/21 14:38 * @Version: 1.0 */ public class DataUtil { private final static Logger logger = Logger.getLogger(DataUtil.class); /** * * @program: io.util.DataUtil * @description: 对数据进行分区 * @auther: xiaof * @date: 2019/2/21 14:39 */ public static void partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile, Set partitionPairs) throws IOException { if(start > totalSize - 1) { return; } //每次获取length长度,并判断这个位置是否是换行符 PartitionPair partitionPair = new PartitionPair(); partitionPair.setStart(start); //判断这个length是否是换行符 long index = start + length; //递归终止条件 if(index > totalSize - 1) { //最后一个递归终止 partitionPair.setEnd(totalSize - 1); partitionPairs.add(partitionPair); } else { //设置位置并读取一个字节 randomAccessFile.seek(index); byte oneByte = randomAccessFile.readByte(); //判断是否是换行符号,如果不是换行符,那么读取到换行符为止 while(oneByte != ‘ ‘ && oneByte != ‘ ‘) { //不能越界 if(++index > totalSize - 1) { index = totalSize-1; break; } randomAccessFile.seek(index); oneByte = randomAccessFile.readByte(); } partitionPair.setEnd(index); //递归下一个位置 partitionPairs.add(partitionPair); partition(index + 1, length, totalSize, randomAccessFile, partitionPairs); } } /** * * @program: io.util.DataUtil * @description: 分片数据 * @auther: xiaof * @date: 2019/2/22 15:20 */ public static Set partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile) throws IOException { if(start > totalSize - 1) { return null; } //每次获取length长度,并判断这个位置是否是换行符 Set partitionPairs = new HashSet(); PartitionPair partitionPair = new PartitionPair(); partitionPair.setStart(start); //判断这个length是否是换行符 long index = start + length; //递归终止条件 if(index > totalSize - 1) { //最后一个递归终止 partitionPair.setEnd(totalSize - 1); partitionPairs.add(partitionPair); return partitionPairs; } else { //设置位置并读取一个字节 randomAccessFile.seek(index); byte oneByte = randomAccessFile.readByte(); //判断是否是换行符号,如果不是换行符,那么读取到换行符为止 while(oneByte != ‘ ‘ && oneByte != ‘ ‘) { //不能越界 if(++index > totalSize - 1) { index = totalSize-1; break; } randomAccessFile.seek(index); oneByte = randomAccessFile.readByte(); } partitionPair.setEnd(index); //递归下一个位置 partitionPairs.add(partitionPair); partitionPairs.addAll(partition(index + 1, length, totalSize, randomAccessFile)); } return partitionPairs; } public static Date getSQLDateThreadSafe(String dateStr) throws ParseException { return new Date(ConcurrentDateUtil.parse(dateStr).getTime()); } /** * 复制单个文件,这里考虑使用文件锁,保证线程安全 * @param oldPath String 原文件路径 如:c:/fqf.txt * @param newPath String 复制后路径 如:f:/fqf.txt * @return boolean */ public static void copyFile(String oldPath, String newPath) throws Exception { // int byteread = 0; File oldfile = new File(oldPath); if (oldfile.exists()) { //文件存在时 //对文件枷锁,然后进行复制操作, InputStream inStream = new FileInputStream(oldPath); //读入原文件 FileOutputStream fs = new FileOutputStream(newPath); FileChannel fileChannel = fs.getChannel(); //开始加锁 FileLock fileLock = null; try { while (true) { fileLock = fileChannel.lock(); //直接上锁 if(fileLock != null) { break; } else { //文件无法被锁定,1s后尝试 logger.warn(oldPath + " 文件无法被锁定,1s后尝试"); Thread.sleep(1000); } } //开始拷贝数据 byte[] buf = new byte[2048]; int len = 0; while((len = inStream.read(buf)) != -1) { fs.write(buf, 0, len); } //刷新 fileLock.release(); fs.flush(); fs.close(); inStream.close(); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { if(fileLock.isValid()) { fileLock.release(); } } } } /** * * @program: com.ztesoft.interfaces.predeal.util.DataUtil * @description: 删除文件 * @auther: xiaof * @date: 2019/3/5 18:08 */ public static void deletFile(String filePath) { File file = new File(filePath); file.delete(); } }
package com.ztesoft.interfaces.predeal.util; /** * @ProjectName: cutter-point * @Package: io.util * @ClassName: PartitionPair * @Author: xiaof * @Description: ${description} * @Date: 2019/2/21 14:36 * @Version: 1.0 */ public class PartitionPair { private long start; private long end; @Override public String toString() { return "start="+start+";end="+end; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (end ^ (end >>> 32)); result = prime * result + (int) (start ^ (start >>> 32)); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; PartitionPair other = (PartitionPair) obj; if (end != other.end) return false; return start == other.start; } public long getStart() { return start; } public void setStart(long start) { this.start = start; } public long getEnd() { return end; } public void setEnd(long end) { this.end = end; } }
这里开始,我们实战使用这个方法解析入库
package com.ztesoft.interfaces.predeal.bl; import com.ztesoft.interfaces.common.bll.CommonQueuePool; import com.ztesoft.interfaces.common.vo.CommonQueueVo; import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant; import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao; import com.ztesoft.isa.service.common.util.SpringContextUtil; import org.apache.commons.collections.MapUtils; import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @ProjectName: 湖北移动智慧装维支撑系统 * @Package: com.ztesoft.interfaces.predeal.bl * @ClassName: PreDealResourceServer * @Author: xiaof * @Description: 预处理综资ftp服务数据同步 * @Date: 2019/3/8 16:21 * @Version: 1.0 */ public class PreDealResourceServer extends Thread { private Logger logger = Logger.getLogger(PreDealResourceServer.class); private static Map<String, Object> config = new HashMap<>(); //获取dao层操作类 private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao"); private CommonQueuePool commonQueuePool; //配置线程池数量 private ScheduledExecutorService service; @Override public void run() { // 1.启动定时线程,设置定时启动时间(通过定时器),要求定时可配置,最好实时生效(这里目前考虑取消定时器,重新生成定时器) Map paramMap = new HashMap(); paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); paramMap.put("pkey", PreDealResourceConstrant.SERVER_KEY_INFO); paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap); // codea-线程池数量 // codeb-队列长度 service = Executors.newScheduledThreadPool(MapUtils.getInteger(paramMap, "CODEA", 8)); commonQueuePool = new CommonQueuePool<CommonQueueVo>(MapUtils.getInteger(paramMap, "CODEB", 3000)); //启动资源信息生产者 this.startResourceProducer(); //启动资源信息消费者 this.startResourceConsum(); } private void startResourceProducer() { //设置生产线程&消费线程 Map producerMap = new HashMap(); producerMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); producerMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_THREAD_KEY_INFO); producerMap = preDealResourceDao.qryPreDealResourceConfig(producerMap); //获取启动时间,和获取间隔时间 int initialDelay = MapUtils.getInteger(producerMap, "CODEA", 30); int period = MapUtils.getInteger(producerMap, "CODEB", 86400); PreDealResourceProducer preDealResourceProducer = new PreDealResourceProducer(commonQueuePool); Future preDealResourceUserFuture = service.scheduleAtFixedRate(preDealResourceProducer, initialDelay, period, TimeUnit.SECONDS); //吧结果存放进入map中,以便后面更新间隔时间 List preDealAAAUserFutureList = new ArrayList(); preDealAAAUserFutureList.add(preDealResourceUserFuture); config.put(MapUtils.getString(producerMap, "CODEC", "ProducerResourceThread"), preDealAAAUserFutureList); } private void startResourceConsum() { //启动消费者 Map consumMap = new HashMap(); consumMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); consumMap.put("pkey", PreDealResourceConstrant.CONSUM_RESOURCE_THREAD_KEY_INFO); consumMap = preDealResourceDao.qryPreDealResourceConfig(consumMap); int initialDelay = MapUtils.getInteger(consumMap, "CODEA", 30); int threadNum = MapUtils.getInteger(consumMap, "CODEB", 3); List preDealAAAUserFutureList = new ArrayList(); for(int i = 0; i < threadNum; ++i) { PreDealResourceConsum preDealResourceConsum = new PreDealResourceConsum(commonQueuePool); Future future = service.schedule(preDealResourceConsum, initialDelay, TimeUnit.SECONDS); //吧结果存放进入map中,以便后面更新间隔时间 preDealAAAUserFutureList.add(future); } config.put(MapUtils.getString(consumMap, "CODEC", "ConsumResourceThread"), preDealAAAUserFutureList); } /** * 启动,多态方式,避免重复启动 */ public void start(String... args) { this.start(); } public static Map<String, Object> getConfig() { return config; } public static void setConfig(Map<String, Object> config) { PreDealResourceServer.config = config; } }
package com.ztesoft.interfaces.predeal.bl; import com.ztesoft.interfaces.common.bll.CommonQueuePool; import com.ztesoft.interfaces.common.invoke.FtpInvoke; import com.ztesoft.interfaces.common.util.FtpTemplate; import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant; import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao; import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto; import com.ztesoft.interfaces.predeal.util.ConcurrentDateUtil; import com.ztesoft.interfaces.predeal.util.DataUtil; import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo; import com.ztesoft.isa.service.common.util.SpringContextUtil; import com.ztesoft.services.common.CommonHelper; import org.apache.commons.collections.MapUtils; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; import org.apache.log4j.Logger; import java.io.File; import java.io.FileOutputStream; import java.io.OutputStream; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.util.*; /** * @ProjectName: 湖北移动智慧装维支撑系统 * @Package: com.ztesoft.interfaces.predeal.bl * @ClassName: PreDealResourceProducer * @Author: xiaof * @Description: ${description} * @Date: 2019/3/10 15:03 * @Version: 1.0 */ public class PreDealResourceProducer implements Runnable { private final static Logger logger = Logger.getLogger(PreDealResourceProducer.class); //这个是要用来控制多线的队列 private final CommonQueuePool commonQueuePool; //获取dao层操作类 private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao"); public PreDealResourceProducer(CommonQueuePool commonQueuePool) { this.commonQueuePool = commonQueuePool; } @Override public void run() { // 2.定时启动之后,扫描ftp文件,下载到本地(并记录数据),(要求本地上传如果有文件,要能直接开始入库,不用通过远程服务器) try { //1.ftp连接服务器,获取AAAftp服务器 Map paramMap = new HashMap(); paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); paramMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_KEY); paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap); String resourceIp = MapUtils.getString(paramMap, "CODEA", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_IP); int resourcePort = MapUtils.getInteger(paramMap, "CODEB", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PORT); String resourceUserName = MapUtils.getString(paramMap, "CODEC", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_USERNAME); String resourcePasswd = MapUtils.getString(paramMap, "CODED", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PASSWD); String resourceRemoteDir = MapUtils.getString(paramMap, "CODEE", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_REMOTEDIR); String resourceLocalDir = MapUtils.getString(paramMap, "CODEH", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_LOCALDIR); String resourceDeleteMark = MapUtils.getString(paramMap, "CODEG", "0"); //PRODUCER_RESOURCE_CONSUM_LOCALDIR String resourceConsumLocalDir = MapUtils.getString(paramMap, "CODEI", PreDealResourceConstrant.PRODUCER_RESOURCE_CONSUM_LOCALDIR); // resourceLocalDir = "D:\湖北移动\任务\预处理\综资\ftp文件\sync"; // resourceConsumLocalDir = resourceLocalDir + "\consum"; //获取需要下载的文件目录,不在包含的文件里面的,那么就不用下载 Map paramMapFile = new HashMap(); paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE); paramMapFile = preDealResourceDao.qryPreDealResourceConfig(paramMapFile); String fileNameContain = MapUtils.getString(paramMapFile, "CODEA", ""); String endWith = MapUtils.getString(paramMapFile, "CODEB", ""); //1.获取指定时间 FtpTemplate ftpTemplate = new FtpTemplate(resourceIp, resourcePort, resourceUserName, resourcePasswd); //2.下载文件进入本地服务器,并删除服务器上文件(删除操作,我们做个开关,并且只有所有数据下载完成才能删除) try { ftpTemplate.operatorByPathAndName(resourceRemoteDir, resourceLocalDir, new FtpInvoke() { @Override public void doOperator(FTPClient ftp, String remoteDir, String localDir) throws Exception { // 转移到FTP服务器目录至指定的目录下 //1.获取远程文件,并存放进入队列汇总等待处理 //2.存放进入本地路径 //设置被动模式,服务端开端口给我用 ftp.enterLocalPassiveMode(); //获取所有文件 FTPFile[] ftpFile = ftp.listFiles(remoteDir); //建立输出到本地文件的输出流 OutputStream outputStream = null; for(int i = 0; i < ftpFile.length; ++i) { String ftpFileName = ftpFile[i].getName(); //判断文件过滤规则 String localFilePath = localDir + "/" + ftpFileName; File file = new File(localFilePath); //判断本地是否已经存在,如果存在,那么也不用获取,并且只获取 if(fileNameContain.contains(ftpFileName.substring(0, ftpFileName.lastIndexOf("_"))) && !file.exists() && ftpFileName.contains(endWith)) { //判断是话单文件 String filePath = remoteDir + "/" + ftpFileName; //下载文件 outputStream = new FileOutputStream(localDir + "/" + ftpFileName); ftp.retrieveFile(filePath, outputStream); logger.info("下载文件:" + ftpFileName + " 完成"); if(outputStream != null) { outputStream.flush(); outputStream.close(); outputStream = null; } } } if(outputStream != null) { outputStream.flush(); outputStream.close(); outputStream = null; } } }); } catch (Exception e) { logger.error(e.getMessage(), e); } //3.下载到本地之后,我们遍历本地所有文件 File localDir = new File(resourceLocalDir); //获取本地文件的所有文件 if(!localDir.exists()) { localDir.mkdirs(); } //4.获取所有文件(不是目录)之后,解析文件名,并依次吧数据送入队列中, File[] recFiles = localDir.listFiles(); for(int i = 0; i < recFiles.length; ++i) { if(recFiles[i].isDirectory()) { continue; //目录不操作 } //移动文件数据到相应的日期目录 String fileName = recFiles[i].getName(); String filePath = recFiles[i].getPath(); Calendar calendar = Calendar.getInstance();//获取当前日期 String consumLocalDir = resourceConsumLocalDir + "/" + ConcurrentDateUtil.formatDateDir(calendar.getTime()); File file = new File(consumLocalDir); if(!file.exists()) { file.mkdirs(); } //5.记录同步日志记录 //入库存放一条记录 PreDealResourceVo preDealResourceVo = new PreDealResourceVo(); String desPath = consumLocalDir + "/" + fileName; long omPredealSyncLogSeq = CommonHelper.getCommonDAO().getSeqNextVal(PreDealResourceConstrant.OM_PREDEAL_SYNC_LOG_SEQ); OmPredealSyncLogDto omPredealSyncLogDto = new OmPredealSyncLogDto(); omPredealSyncLogDto.setId(String.valueOf(omPredealSyncLogSeq)); omPredealSyncLogDto.setCreateTime(new Date()); omPredealSyncLogDto.setUpdateTime(new Date()); omPredealSyncLogDto.setState("0"); omPredealSyncLogDto.setTimes("0"); omPredealSyncLogDto.setLogName(recFiles[i].getName() + "资源信息同步"); omPredealSyncLogDto.setLogType(PreDealResourceConstrant.PRE_DEAL_RESOURCE_LOG_TYPE); omPredealSyncLogDto.setRemark(filePath); omPredealSyncLogDto.add(); //先交给消费者消费,只有当消费完毕,我们才移动和删除文件 preDealResourceVo.setOmPredealSyncLogDto(omPredealSyncLogDto); preDealResourceVo.setFileName(recFiles[i].getName()); preDealResourceVo.setFileFullPath(desPath); //移动数据进入对应的消费目录 DataUtil.copyFile(filePath, desPath); DataUtil.deletFile(filePath); commonQueuePool.put(preDealResourceVo); } //更新配置信息,由于文件是生成前就会删除,所以这个逻辑就不做了,有文件就全下载下来 paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE); //更新为今天 paramMapFile.put("codeb", ConcurrentDateUtil.formatResourceFile(new Date())); preDealResourceDao.updateConfig(paramMapFile); } catch (Exception e) { logger.error(e.getMessage(), e); } } }
package com.ztesoft.interfaces.predeal.bl; import com.ztesoft.interfaces.common.bll.CommonQueuePool; import com.ztesoft.interfaces.common.bll.CommonQueueWork; import com.ztesoft.interfaces.common.vo.CommonQueueVo; import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo; import org.apache.log4j.Logger; /** * @ProjectName: 湖北移动智慧装维支撑系统 * @Package: com.ztesoft.interfaces.predeal.bl * @ClassName: PreDealResourceConsum * @Author: xiaof * @Description: ${description} * @Date: 2019/3/10 14:44 * @Version: 1.0 */ public class PreDealResourceConsum implements Runnable { private Logger logger = Logger.getLogger(PreDealResourceConsum.class); //这个是要用来控制多线的队列 private final CommonQueuePool commonQueuePool; public PreDealResourceConsum(CommonQueuePool commonQueuePool) { this.commonQueuePool = commonQueuePool; } @Override public void run() { //只要不为空,那么就可以一直取 while(true) { //3.删除远端路径文件(考虑延后处理,比如解析完成之后删除),那么这里我们放消费者中处理 try { CommonQueueVo commonQueueVo = (CommonQueueVo) commonQueuePool.take(); CommonQueueWork commonQueueWork = null; //这里进行区分类型 if(commonQueueVo instanceof PreDealResourceVo) { commonQueueWork = new PreDealResourceWork(); } if(commonQueueWork != null) { commonQueueWork.doWork(commonQueueVo); } } catch (InterruptedException e) { // e.printStackTrace(); logger.error(e.getMessage(), e); } } } }
最后核心解析类实现
package com.ztesoft.interfaces.predeal.bl; import com.ztesoft.interfaces.common.bll.CommonQueueWork; import com.ztesoft.interfaces.common.vo.CommonQueueVo; import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant; import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao; import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto; import com.ztesoft.interfaces.predeal.util.CreateFileUtil; import com.ztesoft.interfaces.predeal.util.CsvBigTask; import com.ztesoft.interfaces.predeal.util.DataUtil; import com.ztesoft.interfaces.predeal.util.PartitionPair; import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo; import com.ztesoft.isa.service.common.util.SpringContextUtil; import org.apache.commons.collections.MapUtils; import org.apache.log4j.Logger; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.util.*; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicLong; /** * @ProjectName: 湖北移动智慧装维支撑系统 * @Package: com.ztesoft.interfaces.predeal.bl * @ClassName: PreDealResourceWork * @Author: xiaof * @Description: 解析资源ftp文件,并准备入库 * @Date: 2019/3/10 15:00 * @Version: 1.0 */ public class PreDealResourceWork implements CommonQueueWork { private final static Logger logger = Logger.getLogger(PreDealResourceWork.class); private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao"); private static ThreadLocal<List> tempDataThreadLocal = new ThreadLocal<List>() { @Override protected List initialValue() { return new ArrayList(); } }; @Override public void doWork(CommonQueueVo commonQueueVo) { PreDealResourceVo preDealResourceVo = (PreDealResourceVo) commonQueueVo; OmPredealSyncLogDto omPredealSyncLogDto = preDealResourceVo.getOmPredealSyncLogDto(); //1.获取文件 logger.info("===========1开始解压文件=======" + System.currentTimeMillis()); File zipFile = new File(preDealResourceVo.getFileFullPath()); //解压文件 try { String fileName = CreateFileUtil.unZipSingleFileCurrentDir(zipFile); String tableName = this.getTableName(fileName); //2.判断文件是否存在 logger.info("===========2判断文件是否存在=======" + System.currentTimeMillis()); File resourceFile = new File(zipFile.getParent() + "/" + fileName); if(!resourceFile.exists()) { omPredealSyncLogDto.setState("0"); omPredealSyncLogDto.setUpdateTime(new Date()); omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + ""); omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + " " + resourceFile.getPath() + " 文件不存在"); return; } //3.获取随机文件索引 logger.info("===========3获取随机文件索引=======" + System.currentTimeMillis()); RandomAccessFile randomAccessFile = new RandomAccessFile(resourceFile, "r"); long length = resourceFile.length(); //文件大小1G,获取处理器核心数 int availProcessors = Runtime.getRuntime().availableProcessors(); logger.info("===========3。1可使用线程=======" + availProcessors); long blockLength = length / availProcessors; byte b = ‘0‘; int index = 0; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); while(b != ‘ ‘ && b != ‘ ‘) { b = randomAccessFile.readByte(); byteArrayOutputStream.write(b); ++index; } //获取首行 String[] firstLine = byteArrayOutputStream.toString().split(" "); //文件分片 Set pairSets = DataUtil.partition(index, blockLength, length, randomAccessFile); //4.数据分片之后,分别启动处理线程 logger.info("===========4数据分片之后,分别启动处理线程=======" + pairSets + " 时间:" + System.currentTimeMillis()); final long startTime = System.currentTimeMillis(); AtomicLong atomicLong = new AtomicLong(0); //6.数据入库,这里做成可配置模式 logger.info("===========5数据入库=======" + System.currentTimeMillis()); Map workConfigMap = new HashMap(); workConfigMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE); workConfigMap.put("pkey", PreDealResourceConstrant.PRE_DEAL_RESOURCE_WORK); workConfigMap = preDealResourceDao.qryPreDealResourceConfig(workConfigMap); int mapSize = MapUtils.getInteger(workConfigMap, "CODEA", 1000); int bufSize = MapUtils.getInteger(workConfigMap, "CODEB", 3) * 1024 * 1024; String splitMark = MapUtils.getString(workConfigMap, "CODEC", " "); //先清空表数据,备份数据放在文件中 initTable(tableName, omPredealSyncLogDto); CyclicBarrier cyclicBarrier = new CyclicBarrier(pairSets.size(), new Runnable() { @Override public void run() { //吧最后的数据提交上去 logger.info("===========5数据入库结束=======" + System.currentTimeMillis()); omPredealSyncLogDto.setUpdateTime(new Date()); omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + ""); omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + " " + resourceFile.getPath() + " 入库完毕 use time: "+(System.currentTimeMillis()-startTime)); omPredealSyncLogDto.setState("1"); omPredealSyncLogDto.setNewTotal(String.valueOf(atomicLong.get())); omPredealSyncLogDto.update(); } }); for(Object pair : pairSets) { // List tempData = new ArrayList<>(); PartitionPair partitionPair = (PartitionPair) pair; CsvBigTask csvBigTask = new CsvBigTask(cyclicBarrier, atomicLong, partitionPair, bufSize, randomAccessFile, new ArrayList(), new IHandle() { @Override public void handle(String line, boolean lastLine, List tempData) { try { //转换数据为map // logger.info("===========5读取数据=======" + line + System.currentTimeMillis()); if(!line.equals("")) { String[] elements = line.split(splitMark); tempData.add(elements); } //每个3000条一提交 if((tempData.size() % mapSize == 0 && tempData.size() >= mapSize) || lastLine) { preDealResourceDao.addResourceBatch(tableName, firstLine, tempData); tempData.clear(); // logger.info("===========6批量提交数据=======" + System.currentTimeMillis()); } } catch (Exception e) { logger.error(line + e.getMessage(), e); } } }); Thread thread = new Thread(csvBigTask); thread.start(); } } catch (IOException e) { omPredealSyncLogDto.setState("0"); omPredealSyncLogDto.setUpdateTime(new Date()); omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + ""); omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + " " + preDealResourceVo.getFileFullPath() + e.getMessage()); logger.error(e.getMessage(), e); } finally { //7.最后更新日志 logger.info("===========7最后更新日志=======" + System.currentTimeMillis()); omPredealSyncLogDto.update(); } } private String getTableName(String fileName) { String tableName = ""; tableName = PreDealResourceConstrant.PRE_RESOURCE_TABLE + "_" + fileName.substring(0, fileName.lastIndexOf("_")); return tableName; } private void initTable(String tableName, OmPredealSyncLogDto omPredealSyncLogDto) { preDealResourceDao.truncateResourceTable(omPredealSyncLogDto, tableName); } }
最后公布一小段入库代码,这个就不全给了
PreDealResourceDaoImpl
@Override public void addResourceBatch(String tableName, String[] fields, List params) { try { // String sql = "insert into PRE_AAA_STAFF (domain_name,internet_account,brasid,band_info,registTime,FirstUseTime,lastLoginTime,status,bandwidth_M,broadband_account,CREATE_DATE,UPDATE_DATE) " + // "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?,sysdate,sysdate) "; //组装sql StringBuffer stringBuffer = new StringBuffer(" insert /*+ append */ into " + tableName + " ("); StringBuffer stringValue = new StringBuffer(" values ("); // Set<String> keySet = tableMap.keySet(); for(int i = 0; i < fields.length; ++i) { stringBuffer.append(fields[i] + ","); stringValue.append("?,"); } //添加时间 stringBuffer.append("create_date, update_date,"); stringValue.append("sysdate,sysdate,"); stringBuffer = stringBuffer.deleteCharAt(stringBuffer.length() - 1).append(" ) "); stringValue = stringValue.deleteCharAt(stringValue.length() - 1).append(")"); String sql = stringBuffer.append(stringValue).toString(); super.getJdbcOperations().batchUpdate(sql, new BatchPreparedStatementSetter() { @Override public void setValues(PreparedStatement ps, int i) throws SQLException { Object[] values = (Object[]) params.get(i); for(int j = 0; j < fields.length; ++j) { if(j < values.length) { ps.setString(j + 1, String.valueOf(values[j])); } else { ps.setObject(j + 1, null); } } } @Override public int getBatchSize() { return params.size(); } }); logger.info("====" + tableName + "入库 " + params.size() + " 条"); } catch (DataAccessException e) { logger.error(e.getMessage(), e); } }
以上是关于并发5多线程并发解析单文件大数据了量解析入库,1800万数据8线程5分钟入库的主要内容,如果未能解决你的问题,请参考以下文章