使用阻塞式队列处理大数据

Posted TGITCIC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用阻塞式队列处理大数据相关的知识,希望对你有一定的参考价值。

前言

我们都知道,JAVA对于文本文件在读时是独占的,即使可以用多线程去读也涉及到一个POS(定位读)的问题,这在设计框架上会带来许多的复杂性,同时也带来代码上的不可维护性以及会经常出一些千奇百怪的错误(多线程程序由其如此)。


传统阻塞式做法的敝病

  • 特点:多线程,阻塞式导入
  • 缺点:阻塞式,导入速度慢,线程状态无法精确记录,速度慢内存开销大

优秀的做法

  • 多线程
  • 非阻塞式
  • 内存开销恒定
  • 线程可以自由增加

我们将采用的做法

在数据提取的设计时基于以下几个指标考虑:

1)内存占用数始终必须恒定值

2)使用多线程非阻塞式算法,即不加线程锁机制

3) 尽可能少的占用数据库的打开游标数和CPU效率

4) 保证数据读和写的速度

技术分享



在此,我们将利用阻塞队列+多线程来加快我们的大数据文件的处理速度即使用
BlockingQueue<Map> queue = new ArrayBlockingQueue<Map>(TASK_LIST_SIZE)

为什么要使用BlockingQueue

  • 它会自动阻塞大于Queue Size的写入动作
  • 栈的机制,get一个队列中的item,相应的Queue中的item数就会减少一个
  • 因为有栈的机制,因此我们可以使用Queue中的这个机制无需多写一个Daemon线程来监控我们的所有的items是不是全取完了然后结束线程,更有甚者我看到过许多程序员写一个While循环,循环直至所有的item取完哪怕有很大一部分是在“空转”也在所不惜。
  • 读/处理完全相分离,读完后也一定处理完了

核心代码

读文件代码

public void run() {
	try {
		enumerate(super.fileName, super.colNames);
	} catch (Exception e) {
		logger.error("read txtFileName error, parse excel quit because :"
				+ e.getMessage(), e);
		try {
			Thread.interrupted();
		} catch (Exception ee) {
		}
		} finally {
		try {
			queue.put(DUMMY);
				// BatchTaskQueue.getInstance().taskList.put(DUMMY);
		} catch (Exception ex) {
		}
	}

}
这边需要注意的点有2处:
  • enumerate就是读,在这段代码下还有一个具体的enumerate的实现,它是顶部递归直到把一个文件内所有的ITEM全部queue.put到队列中去
  • 为什么finally块中要有一个queue.put(DUMMY)哈,一般程序员看到这个语句或者碰到一个什么DUMMY的最头疼了,这是什么个玩意的哈?

DUMMY在我们这边是这样定义的

protected static Map DUMMY = new HashMap();
它代表一个“空”的标志,比如说一个文件 有50万条记录,那么我们的queue中其实会放入50万零1条记录,最后那个1条记录就是这个DUMMY,它告诉另一个take即真正处理导出的线程(可能是一堆的线程,因为我们用的是多线程处理)你已经处理到没有记录可以“再让你处理了“,因此呢。。。因此你得结束了。。。所以我在这边说读完文件 ,正好处理完指的就是这个,因此我们在处理线程(子线程)中对这个DUMMY是如下处理的:

while (!done) {
	Map data = (Map) queue.take();
	if (data == EnumerationEnginee.DUMMY) {
		//no data
		queue.put(data);
		done = true;
	} else {
		// if (data != null) {
		for (Iterator it = data.keySet().iterator(); it.hasNext();) {
			String key = String.valueOf(it.next());
			System.out.print("import:>>>[" + key + "]  :  ["+ data.get(key) + "]");
		}
		System.out.println("\n");						
	}
}

处理Queue中item的代码(多线程)

public void run() {
	boolean done = false;
	try {
		synchronized (this) {
			while (!done) {
				Map data = (Map) queue.take();
				if (data == EnumerationEnginee.DUMMY) {
					//no data
					queue.put(data);
					done = true;
				} else {
					// if (data != null) {
					for (Iterator it = data.keySet().iterator(); it.hasNext();) {
						String key = String.valueOf(it.next());
						System.out.print("import:>>>[" + key + "]  :  ["+ data.get(key) + "]");
					}
					System.out.println("\n");						
				}
			}
		}
	} catch (Exception e) {
		logger.error("import file into db error:" + e.getMessage(), e);
		try {
			Thread.interrupted();
		} catch (Exception ie) {
		}
		try {
			queue.put(EnumerationEnginee.DUMMY);
			done = true;
		} catch (Exception ex) {

		}
	} finally {
		threadSignal.countDown();
	}

}

代码解读

一切源于需求,一切源于”业务“场景,这边的业务不是让大家去做业务,而是”idea“。

老习惯,注意下面红色加粗文字,我们就喜欢“”,YEAH!

大家知道了一个BlockQueue,OK,这东西的好处在于:

  1. 你可以设一个size=100的Queue,然后把几十万数据往里扔,当扔到100个的时候它会自动帮你阻塞住,然后你可以起一堆的线程去扫这个Queue里的item而且你扫一个(queue.take())一个,queue里实际的item就会自动减少一个,因此一个线程take后你不用担心另一个线程去”重复take”。这样我们的读和handle就可以相分离。
  2. 在多线程扫queue里的item时你要告诉线程,已经到queue的底啦,没东西可取了,你可以停了,因此当所有的handle线程都碰到queue的“底”时,它们就都会自动停止了,因此我说了,基本上可以做到读完文件中的条数,所有的handle线程也正好处理完。
最后:

我们以实际场景出发一般在handle时都是写数据库或者是NOSQL,因此涉及到一个key, value的问题,因此在这边我们往queue里put的是一个Map。

这就是核心设计思路,此处有一个地方需要高度注意:

DUMMY是一个“空”标准,可是你千万不能放一个NULL,因为一旦你放了NULL,在Queue.take, Queue.put时会直接出错,这将打乱整个线程的运行,因此你一定要New一个,如:
Map DUMMY = new HashMap();

看,要这样才行。

绝对不要Map DUMMP=null,那就完蛋了。D...D...D...D.E.A.D!


如何对整个多线程的process过程进行计时

请见BatchImportExec.java中以下这行:

技术分享

ImportTask.java中

技术分享

给出完整例子

业务需求


  1. 我们需要一个封装好的方法,传入一个文件,然后用多线程handle这个文件中的行数。
  2. 线程数,队列size可设
  3. 需要有一个计时的功能,即从处理开始到处理结束,这个过程一共耗时多少(不少人在多线程处理任务上的计时很头疼,在例子中一并解决该问题)
  4. 最后这个处理过程能够支持csv, txt, excel, 数据库...bla,bla,bla等多种格式的文件(由于篇幅有限我们在这边只实现 1)对于txt/csv和excel文件的处理 2)给出工厂方法可以便于大家自己去扩展这个FileParser。
  5. 处理大数据的excel文件 ,大家都知道我们无论是使用POI还是JXL都会遇上当EXCEL的行数超过65,535行时,你只要worksheet一下,整个JVM内存直接“爆掉”的经验,那么怎么去更高效更少内存的处理大数据量的EXCEL文件呢?如一个excel含有50万行数据时。。。你怎么处理?在此例子中一并给出解决方案。

主要框架代码

BatchDTO.java

package batchpoc;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;

public class BatchDTO implements Serializable {

	private String pkBtTaskId = "";
	private String taskName = "";
	private String actionType = "";
	private String taskDesc = "";
	private String status = "";
	private String commitedBy = "";
	private Date commitedTime = null;
	private String resultLog = "";
	private String batchId = "";
	private boolean headSkip = true;
	private String errorLogPath = "";
	private String logRootPath = "";
	private boolean errorFlag = false;
	private String campId = "";
	private String[] data = null;
	private long totalCount = 0;

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result
				+ ((actionType == null) ? 0 : actionType.hashCode());
		result = prime * result + ((batchId == null) ? 0 : batchId.hashCode());
		result = prime * result + ((campId == null) ? 0 : campId.hashCode());
		result = prime * result
				+ ((commitedBy == null) ? 0 : commitedBy.hashCode());
		result = prime * result
				+ ((commitedTime == null) ? 0 : commitedTime.hashCode());
		result = prime * result + Arrays.hashCode(data);
		result = prime * result + (errorFlag ? 1231 : 1237);
		result = prime * result
				+ ((errorLogPath == null) ? 0 : errorLogPath.hashCode());
		result = prime * result + (headSkip ? 1231 : 1237);
		result = prime * result
				+ ((logRootPath == null) ? 0 : logRootPath.hashCode());
		result = prime * result
				+ ((pkBtTaskId == null) ? 0 : pkBtTaskId.hashCode());
		result = prime * result
				+ ((resultLog == null) ? 0 : resultLog.hashCode());
		result = prime * result + ((status == null) ? 0 : status.hashCode());
		result = prime * result
				+ ((taskDesc == null) ? 0 : taskDesc.hashCode());
		result = prime * result
				+ ((taskName == null) ? 0 : taskName.hashCode());
		result = prime * result + (int) (totalCount ^ (totalCount >>> 32));
		return result;
	}

	public String getPkBtTaskId() {
		return pkBtTaskId;
	}

	public void setPkBtTaskId(String pkBtTaskId) {
		this.pkBtTaskId = pkBtTaskId;
	}

	public String getTaskName() {
		return taskName;
	}

	public void setTaskName(String taskName) {
		this.taskName = taskName;
	}

	public String getActionType() {
		return actionType;
	}

	public void setActionType(String actionType) {
		this.actionType = actionType;
	}

	public String getTaskDesc() {
		return taskDesc;
	}

	public void setTaskDesc(String taskDesc) {
		this.taskDesc = taskDesc;
	}

	public String getStatus() {
		return status;
	}

	public void setStatus(String status) {
		this.status = status;
	}

	public String getCommitedBy() {
		return commitedBy;
	}

	public void setCommitedBy(String commitedBy) {
		this.commitedBy = commitedBy;
	}

	public Date getCommitedTime() {
		return commitedTime;
	}

	public void setCommitedTime(Date commitedTime) {
		this.commitedTime = commitedTime;
	}

	public String getResultLog() {
		return resultLog;
	}

	public void setResultLog(String resultLog) {
		this.resultLog = resultLog;
	}

	public String getBatchId() {
		return batchId;
	}

	public void setBatchId(String batchId) {
		this.batchId = batchId;
	}

	public boolean isHeadSkip() {
		return headSkip;
	}

	public void setHeadSkip(boolean headSkip) {
		this.headSkip = headSkip;
	}

	public String getErrorLogPath() {
		return errorLogPath;
	}

	public void setErrorLogPath(String errorLogPath) {
		this.errorLogPath = errorLogPath;
	}

	public String getLogRootPath() {
		return logRootPath;
	}

	public void setLogRootPath(String logRootPath) {
		this.logRootPath = logRootPath;
	}

	public boolean isErrorFlag() {
		return errorFlag;
	}

	public void setErrorFlag(boolean errorFlag) {
		this.errorFlag = errorFlag;
	}

	public String getCampId() {
		return campId;
	}

	public void setCampId(String campId) {
		this.campId = campId;
	}

	public String[] getData() {
		return data;
	}

	public void setData(String[] data) {
		this.data = data;
	}

	public long getTotalCount() {
		return totalCount;
	}

	public void setTotalCount(long totalCount) {
		this.totalCount = totalCount;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj) {
			return true;
		}
		if (obj == null) {
			return false;
		}
		if (!(obj instanceof BatchDTO)) {
			return false;
		}
		BatchDTO other = (BatchDTO) obj;
		if (actionType == null) {
			if (other.actionType != null) {
				return false;
			}
		} else if (!actionType.equals(other.actionType)) {
			return false;
		}
		if (batchId == null) {
			if (other.batchId != null) {
				return false;
			}
		} else if (!batchId.equals(other.batchId)) {
			return false;
		}
		if (campId == null) {
			if (other.campId != null) {
				return false;
			}
		} else if (!campId.equals(other.campId)) {
			return false;
		}
		if (commitedBy == null) {
			if (other.commitedBy != null) {
				return false;
			}
		} else if (!commitedBy.equals(other.commitedBy)) {
			return false;
		}
		if (commitedTime == null) {
			if (other.commitedTime != null) {
				return false;
			}
		} else if (!commitedTime.equals(other.commitedTime)) {
			return false;
		}
		if (!Arrays.equals(data, other.data)) {
			return false;
		}
		if (errorFlag != other.errorFlag) {
			return false;
		}
		if (errorLogPath == null) {
			if (other.errorLogPath != null) {
				return false;
			}
		} else if (!errorLogPath.equals(other.errorLogPath)) {
			return false;
		}
		if (headSkip != other.headSkip) {
			return false;
		}
		if (logRootPath == null) {
			if (other.logRootPath != null) {
				return false;
			}
		} else if (!logRootPath.equals(other.logRootPath)) {
			return false;
		}
		if (pkBtTaskId == null) {
			if (other.pkBtTaskId != null) {
				return false;
			}
		} else if (!pkBtTaskId.equals(other.pkBtTaskId)) {
			return false;
		}
		if (resultLog == null) {
			if (other.resultLog != null) {
				return false;
			}
		} else if (!resultLog.equals(other.resultLog)) {
			return false;
		}
		if (status == null) {
			if (other.status != null) {
				return false;
			}
		} else if (!status.equals(other.status)) {
			return false;
		}
		if (taskDesc == null) {
			if (other.taskDesc != null) {
				return false;
			}
		} else if (!taskDesc.equals(other.taskDesc)) {
			return false;
		}
		if (taskName == null) {
			if (other.taskName != null) {
				return false;
			}
		} else if (!taskName.equals(other.taskName)) {
			return false;
		}
		if (totalCount != other.totalCount) {
			return false;
		}
		return true;
	}

}

BatchTask.java

package batchpoc;

import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BatchTask{
	protected final Logger logger = LoggerFactory.getLogger(this.getClass());
	public final static String TXT_IMP_EXP = "101";
	public final static String EXCEL_IMP_EXP = "102";
	public final static String TASK_RUNNING = "2";
	public final static String TASK_FINISHED = "4";
	public final static String TASK_FAILED = "5";
	protected BatchDTO taskContext = null;

	public BatchTask(BatchDTO taskContext) {
		this.taskContext = taskContext;
	}

	public abstract void doBatch() throws Exception;
}

EnumerationEngineeFactory.java,用于构建处理“读”多种格式文件的FileParser

package batchpoc;

import java.util.Map;
import java.util.concurrent.BlockingQueue;

import util.Constants;

public class EnumerationEngineeFactory {

	public static EnumerationEnginee getInstance(BlockingQueue<Map> queue,
			String type, String fileName, String colNames, boolean skipHeader,
			BatchDTO taskContext) {
		EnumerationEnginee task = null;
		if (type.equals(Constants.ENUMERATION_TXT_TASK)) {
			return new TxtEnumerationTask(queue, fileName, colNames,
					skipHeader, taskContext);
		} else if (type.equals(Constants.ENUMERATION_EXCEL_TASK)) {
			return new XLSEnumerationTask(queue, fileName, colNames,
					skipHeader, taskContext);
		}
		return task;
	}
}

EnumerationEnginee.java

package batchpoc;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class EnumerationEnginee implements Runnable {
	protected String fileName = "";
	protected String colNames = "";
	protected final Logger logger = LoggerFactory.getLogger(this.getClass());
	protected boolean skipHeader = true;
	protected BatchDTO taskContext = null;
	protected static Map DUMMY = new HashMap();
	protected BlockingQueue<Map> queue = null;

	public EnumerationEnginee(BlockingQueue<Map> queue, String fileName,
			String colNames, boolean skipHeader, BatchDTO taskContext) {
		this.fileName = fileName;
		this.colNames = colNames;
		this.skipHeader = skipHeader;
		this.taskContext = taskContext;
		this.queue = queue;
	}

	public abstract void enumerate(String fileName, String strKeys)
			throws Exception;

	public abstract void run();

}

ImportTask.java

package batchpoc;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ImportTask implements Runnable {
	private final Logger logger = LoggerFactory.getLogger(getClass());
	private BatchDTO taskContext = null;
	private CountDownLatch threadSignal = null;
	BlockingQueue<Map> queue = null;

	public ImportTask(BlockingQueue<Map> queue, BatchDTO taskContext,
			CountDownLatch threadSignal) {
		this.taskContext = taskContext;
		this.threadSignal = threadSignal;
		this.queue = queue;
	}

	public void run() {
		boolean done = false;
		try {
			synchronized (this) {
				while (!done) {
					Map data = (Map) queue.take();
					if (data == EnumerationEnginee.DUMMY) {
						//no data
						queue.put(data);
						done = true;
					} else {
						// if (data != null) {
						for (Iterator it = data.keySet().iterator(); it
								.hasNext();) {
							String key = String.valueOf(it.next());
							System.out.print("import:>>>[" + key + "]  :  ["
									+ data.get(key) + "]");
						}
						System.out.println("\n");						
					}
				}
			}
		} catch (Exception e) {
			logger.error("import file into db error:" + e.getMessage(), e);
			try {
				Thread.interrupted();
			} catch (Exception ie) {
			}
			try {
				queue.put(EnumerationEnginee.DUMMY);
				done = true;
			} catch (Exception ex) {

			}
		} finally {
			threadSignal.countDown();
		}

	}
}

MapUtil.java-用于Map中根据key值排序用

package batchpoc;

/*
 * Author: Mk
 * Created By: 2012-08-23
 */
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class MapUtil {
	public static <K, V extends Comparable<? super V>> Map<K, V> sortByValue(
			Map<K, V> map) {
		List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(
				map.entrySet());
		Collections.sort(list, new Comparator<Map.Entry<K, V>>() {
			public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) {
				return (String.valueOf(o1.getKey())).compareTo(String
						.valueOf(o2.getKey()));
			}
		});

		Map<K, V> result = new LinkedHashMap<K, V>();
		for (Map.Entry<K, V> entry : list) {
			result.put(entry.getKey(), entry.getValue());
		}
		return result;
	}
}

TxtEnumerationTask.java-这个就是专门用于读txt、csv等文本文件的FileParser,它在EnumerationEngineeFactory被调用

package batchpoc;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TxtEnumerationTask extends EnumerationEnginee {
	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public TxtEnumerationTask(BlockingQueue<Map> queue, String txtFileName,
			String colNames, boolean skipHeader, BatchDTO taskContext) {
		super(queue, txtFileName, colNames, taskContext.isHeadSkip(),
				taskContext);

	}

	@Override
	public void run() {
		try {
			enumerate(super.fileName, super.colNames);
		} catch (Exception e) {
			logger.error("read txtFileName error, parse excel quit because :"
					+ e.getMessage(), e);
			try {
				Thread.interrupted();
			} catch (Exception ee) {
			}
		} finally {
			try {
				queue.put(DUMMY);
			} catch (Exception ex) {
			}
		}

	}

	public void enumerate(String txtFileName, String strKeys) throws Exception {
		FileInputStream is = null;
		StringBuilder sb = new StringBuilder();
		String a_line = "";
		String[] columnNames = null;
		String[] cellValues = null;
		Map dataRow = new HashMap();
		int i = 0;
		try {
			File f = new File(txtFileName);
			if (f.exists()) {
				is = new FileInputStream(new File(txtFileName));
				BufferedReader br = new BufferedReader(new InputStreamReader(
						is, "UTF-8"));
				if (skipHeader) {
					br.readLine();
				}

				while ((a_line = br.readLine()) != null) {
					if (a_line.trim().length() > 0) {
						String[] data = a_line.split(",");
						for (int index = 0; index < data.length; index++) {
							dataRow.put(String.valueOf(index), data[index]);
						}
						dataRow = MapUtil.sortByValue(dataRow);
						queue.put(dataRow);
						dataRow = new HashMap();
						i++;
					}
				}
			}
		} catch (Exception e) {
			throw new Exception("import was interrupted, error happened in "
					+ i + "  row", e);
		} finally {
			try {
				if (is != null) {
					is.close();
					is = null;
				}
			} catch (Exception e) {
			}
		}
	}
}


XLSEnumerationTask.java-这个就是专门用于读excel文件的FileParser,它在EnumerationEngineeFactory被调用并且它支持读超过几十万行的XLS文件

package batchpoc;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import org.apache.poi.openxml4j.opc.OPCPackage;
import org.apache.poi.openxml4j.opc.PackageAccess;

public class XLSEnumerationTask extends EnumerationEnginee {

	public XLSEnumerationTask(BlockingQueue<Map> queue, String txtFileName,
			String colNames, boolean skipHeader, BatchDTO taskContext) {
		super(queue, txtFileName, colNames, taskContext.isHeadSkip(),
				taskContext);
	}

	@Override
	public void enumerate(String fileName, String strKeys) throws Exception {
		File xlsxFile = new File(fileName);
		if (xlsxFile.exists()) {
			// The package open is instantaneous, as it should be.
			OPCPackage p = OPCPackage.open(xlsxFile.getPath(),
					PackageAccess.READ);
			Map dataMap = new HashMap();
			XLSXParser xlsxParser = new XLSXParser(p, queue, true);
			xlsxParser.process();
		}
	}

	@Override
	public void run() {
		try {
			enumerate(super.fileName, super.colNames);
		} catch (Exception e) {
			logger.error("read excel file error, parse excel quit because :"
					+ e.getMessage(), e);
			try {
				Thread.interrupted();
			} catch (Exception ee) {
			}
		} finally {
			try {
				// queue.put(DUMMY);
				queue.put(DUMMY);
			} catch (Exception ex) {
			}
		}

	}

}

XLSXParser.java-这个大了,就是用来处理大数据量的XLS文件的

package batchpoc;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;

import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;

import org.apache.poi.openxml4j.exceptions.OpenXML4JException;
import org.apache.poi.openxml4j.opc.OPCPackage;
import org.apache.poi.openxml4j.opc.PackageAccess;
import org.apache.poi.ss.usermodel.BuiltinFormats;
import org.apache.poi.ss.usermodel.DataFormatter;
import org.apache.poi.xssf.eventusermodel.ReadOnlySharedStringsTable;
import org.apache.poi.xssf.eventusermodel.XSSFReader;
import org.apache.poi.xssf.model.StylesTable;
import org.apache.poi.xssf.usermodel.XSSFCellStyle;
import org.apache.poi.xssf.usermodel.XSSFRichTextString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.Attributes;
import org.xml.sax.ContentHandler;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.DefaultHandler;

/**

 */
public class XLSXParser {

	private final Logger logger = LoggerFactory.getLogger(getClass());

	/**
	 * The type of the data value is indicated by an attribute on the cell. The
	 * value is usually in a "v" element within the cell.
	 */
	enum xssfDataType {
		BOOL, ERROR, FORMULA, INLINESTR, SSTINDEX, NUMBER,
	}

	int countrows = 0;

	/**
	 * Derived from http://poi.apache.org/spreadsheet/how-to.html#xssf_sax_api
	 * <p/>
	 * Also see Standard ECMA-376, 1st edition, part 4, pages 1928ff, at
	 * http://www.ecma-international.org/publications/standards/Ecma-376.htm
	 * <p/>
	 * A web-friendly version is http://openiso.org/Ecma/376/Part4
	 */
	class MyXSSFSheetHandler extends DefaultHandler {

		/**
		 * Table with styles
		 */
		private StylesTable stylesTable;
		private Map<String, String> dataMap = new HashMap<String, String>();
		/**
		 * Table with unique strings
		 */
		private ReadOnlySharedStringsTable sharedStringsTable;

		/**
		 * Destination for data
		 */
		// private final PrintStream output;

		/**
		 * Number of columns to read starting with leftmost
		 */
		// private final int minColumnCount;

		// Set when V start element is seen
		private boolean vIsOpen;

		// Set when cell start element is seen;
		// used when cell close element is seen.
		private xssfDataType nextDataType;

		// Used to format numeric cell values.
		private short formatIndex;
		private String formatString;
		private final DataFormatter formatter;

		private int thisRow = 0;
		private int thisColumn = -1;
		// The last column printed to the output stream
		private int lastColumnNumber = -1;

		// Gathers characters as they are seen.
		private StringBuffer value;

		/**
		 * Accepts objects needed while parsing.
		 * 
		 * @param styles
		 *            Table of styles
		 * @param strings
		 *            Table of shared strings
		 * @param cols
		 *            Minimum number of columns to show
		 * @param target
		 *            Sink for output
		 */

		public MyXSSFSheetHandler(StylesTable styles,
				ReadOnlySharedStringsTable strings, Map<String, String> dataMap) {
			this.stylesTable = styles;
			this.sharedStringsTable = strings;
			// this.minColumnCount = cols;
			this.value = new StringBuffer();
			this.nextDataType = xssfDataType.NUMBER;
			this.formatter = new DataFormatter();
			this.dataMap = dataMap;
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see
		 * org.xml.sax.helpers.DefaultHandler#startElement(java.lang.String,
		 * java.lang.String, java.lang.String, org.xml.sax.Attributes)
		 */
		public void startElement(String uri, String localName, String name,
				Attributes attributes) throws SAXException {

			if ("inlineStr".equals(name) || "v".equals(name)) {
				vIsOpen = true;
				// Clear contents cache
				value.setLength(0);
			}
			// c => cell
			else if ("c".equals(name)) {
				// Get the cell reference
				String r = attributes.getValue("r");
				int firstDigit = -1;
				for (int c = 0; c < r.length(); ++c) {
					if (Character.isDigit(r.charAt(c))) {
						firstDigit = c;
						break;
					}
				}
				thisColumn = nameToColumn(r.substring(0, firstDigit));

				// Set up defaults.
				this.nextDataType = xssfDataType.NUMBER;
				this.formatIndex = -1;
				this.formatString = null;
				String cellType = attributes.getValue("t");
				String cellStyleStr = attributes.getValue("s");
				if ("b".equals(cellType))
					nextDataType = xssfDataType.BOOL;
				else if ("e".equals(cellType))
					nextDataType = xssfDataType.ERROR;
				else if ("inlineStr".equals(cellType))
					nextDataType = xssfDataType.INLINESTR;
				else if ("s".equals(cellType))
					nextDataType = xssfDataType.SSTINDEX;
				else if ("str".equals(cellType))
					nextDataType = xssfDataType.FORMULA;
				else if (cellStyleStr != null) {
					// It‘s a number, but almost certainly one
					// with a special style or format
					int styleIndex = Integer.parseInt(cellStyleStr);
					XSSFCellStyle style = stylesTable.getStyleAt(styleIndex);
					this.formatIndex = style.getDataFormat();
					this.formatString = style.getDataFormatString();
					if (this.formatString == null)
						this.formatString = BuiltinFormats
								.getBuiltinFormat(this.formatIndex);
				}
			}

		}

		/**
		 * 取值
		 * 
		 * @param str
		 * @return
		 */
		public String checkNumber(String str) {
			str = str.trim();
			String str2 = "";
			if (str != null && !"".equals(str)) {
				for (int i = 0; i < str.length(); i++) {
					if (str.charAt(i) >= 48 && str.charAt(i) <= 57) {
						str2 += str.charAt(i);
					}
				}
			}
			return str2.trim();
		}

		/*
		 * (non-Javadoc)
		 * 
		 * @see org.xml.sax.helpers.DefaultHandler#endElement(java.lang.String,
		 * java.lang.String, java.lang.String)
		 */

		public void endElement(String uri, String localName, String name)
				throws SAXException {
			String thisStr = null;
			// System.out.println("endElement----->" + name);
			// v => contents of a cell
			if ("v".equals(name)) {
				// Process the value contents as required.
				// Do now, as characters() may be called more than once
				switch (nextDataType) {

				case BOOL:
					char first = value.charAt(0);
					thisStr = first == ‘0‘ ? "FALSE" : "TRUE";
					break;

				case ERROR:
					thisStr = "\"ERROR:" + value.toString() + ‘"‘;
					break;

				case FORMULA:
					// A formula could result in a string value,
					// so always add double-quote characters.
					thisStr = ‘"‘ + value.toString() + ‘"‘;
					break;

				case INLINESTR:
					// TODO: have seen an example of this, so it‘s untested.
					XSSFRichTextString rtsi = new XSSFRichTextString(
							value.toString());
					if (rtsi != null) {
						thisStr = rtsi.toString().trim();
						thisStr = thisStr.substring(1, thisStr.length() - 1);
					}
					break;

				case SSTINDEX:
					String sstIndex = value.toString();
					try {
						int idx = Integer.parseInt(sstIndex);
						XSSFRichTextString rtss = new XSSFRichTextString(
								sharedStringsTable.getEntryAt(idx));
						if (rtss != null) {
							/*
							 * thisStr = rtss.toString().trim()
							 * .replaceAll("\\s*", "");
							 */
							thisStr = checkNumber(rtss.toString().trim());
							/*
							 * thisStr = thisStr .substring(1, thisStr.length()
							 * - 1);
							 */
						}
					} catch (NumberFormatException ex) {
						logger.error("Failed to parse SST index ‘" + sstIndex
								+ "‘: " + ex.toString(), ex);
					}
					break;

				case NUMBER:
					String n = value.toString();
					if (this.formatString != null)
						thisStr = formatter.formatRawCellContents(
								Double.parseDouble(n), this.formatIndex,
								this.formatString);
					else
						thisStr = n;
					break;

				default:
					thisStr = "(TODO: Unexpected type: " + nextDataType + ")";
					break;
				}

				// Output after we‘ve seen the string contents
				// Emit commas for any fields that were missing on this row
				if (lastColumnNumber == -1) {
					lastColumnNumber = 0;
				}
				// for (int i = lastColumnNumber; i < thisColumn; ++i) {
				// System.out.print("   col: " + i + "  ");
				// }
				// Might be the empty string.
				// output.print(thisStr);
				// System.out.println(thisStr);
				// System.out.println("thisRow...." + thisRow);
				if (thisRow > 0 && thisStr != null
						&& thisStr.trim().length() > 0) {
					// logger.info("dataMap.put()");
					dataMap.put(String.valueOf(thisColumn), thisStr);

				}
				// Update column
				if (thisColumn > -1)
					lastColumnNumber = thisColumn;

			} else if ("row".equals(name)) {
				try {
					if (dataMap.keySet().size() > 0) {
						dataMap = MapUtil.sortByValue(dataMap);
						if (toQueue) {
							queue.put(dataMap);
						}
					}
				} catch (Exception e) {
					logger.error(
							"put data into queue error: " + e.getMessage(), e);
				}
				thisRow++;
				dataMap = new HashMap<String, String>();
				lastColumnNumber = -1;

			}

		}

		/**
		 * Captures characters only if a suitable element is open. Originally
		 * was just "v"; extended for inlineStr also.
		 */
		public void characters(char[] ch, int start, int length)
				throws SAXException {
			if (vIsOpen)
				value.append(ch, start, length);
		}

		/**
		 * Converts an Excel column name like "C" to a zero-based index.
		 * 
		 * @param name
		 * @return Index corresponding to the specified name
		 */
		private int nameToColumn(String name) {
			int column = -1;
			for (int i = 0; i < name.length(); ++i) {
				int c = name.charAt(i);
				column = (column + 1) * 26 + c - ‘A‘;
			}
			return column;
		}

	}

	// /////////////////////////////////////

	private OPCPackage xlsxPackage;
	private BlockingQueue<Map> queue = null;
	private boolean toQueue = false;

	// private int minColumns;

	// private PrintStream output;

	/**
	 * Creates a new XLSX -> XML converter
	 * 
	 * @param pkg
	 *            The XLSX package to process
	 * @param output
	 *            The PrintStream to output the CSV to
	 * @param minColumns
	 *            The minimum number of columns to output, or -1 for no minimum
	 */
	public XLSXParser(OPCPackage pkg, BlockingQueue<Map> queue, boolean toQueue) {
		this.xlsxPackage = pkg;
		this.queue = queue;
		this.toQueue = toQueue;
		// this.minColumns = minColumns;
	}

	/**
	 * Parses and shows the content of one sheet using the specified styles and
	 * shared-strings tables.
	 * 
	 * @param styles
	 * @param strings
	 * @param sheetInputStream
	 */
	public void processSheet(StylesTable styles,
			ReadOnlySharedStringsTable strings, InputStream sheetInputStream)
			throws IOException, ParserConfigurationException, SAXException {

		InputSource sheetSource = new InputSource(sheetInputStream);
		SAXParserFactory saxFactory = SAXParserFactory.newInstance();
		SAXParser saxParser = saxFactory.newSAXParser();
		XMLReader sheetParser = saxParser.getXMLReader();
		Map<String, String> dataMap = new HashMap<String, String>();
		ContentHandler handler = new MyXSSFSheetHandler(styles, strings,
				dataMap);
		sheetParser.setContentHandler(handler);
		sheetParser.parse(sheetSource);
	}

	/**
	 * Initiates the processing of the XLS workbook file to CSV.
	 * 
	 * @throws IOException
	 * @throws OpenXML4JException
	 * @throws ParserConfigurationException
	 * @throws SAXException
	 */
	public void process() throws IOException, OpenXML4JException,
			ParserConfigurationException, SAXException {

		ReadOnlySharedStringsTable strings = new ReadOnlySharedStringsTable(
				this.xlsxPackage);
		XSSFReader xssfReader = new XSSFReader(this.xlsxPackage);

		StylesTable styles = xssfReader.getStylesTable();
		XSSFReader.SheetIterator iter = (XSSFReader.SheetIterator) xssfReader
				.getSheetsData();
		int index = 0;
		while (iter.hasNext()) {
			InputStream stream = iter.next();
			String sheetName = iter.getSheetName();
			// System.out.println(sheetName + " [index=" + index + "]:");
			processSheet(styles, strings, stream);
			stream.close();
			++index;
		}
	}

	public static void main(String[] args) throws Exception {
		/*
		 * if (args.length < 1) { System.err.println("Use:");
		 * System.err.println("  XLSX2CSV <xlsx file> [min columns]"); return; }
		 */

		// File xlsxFile = new File(args[0]);
		File xlsxFile = new File("d:/test.xlsx");
		if (!xlsxFile.exists()) {
			System.err
					.println("Not found or not a file: " + xlsxFile.getPath());
			return;
		}

		int minColumns = -1;
		// if (args.length >= 2)
		// minColumns = Integer.parseInt(args[1]);

		minColumns = 2;
		// The package open is instantaneous, as it should be.
		OPCPackage p = OPCPackage.open(xlsxFile.getPath(), PackageAccess.READ);
		XLSXParser xlsxParser = new XLSXParser(p, null, false);
		xlsxParser.process();
	}

}

这个用的是POI3.5以上版本并且需要有下面这几个LIB库辅助支持才能编译和运行通过:
<!-- poi start -->
	<dependency>
		<groupId>org.apache.poi</groupId>
		<artifactId>poi</artifactId>
		<version>${poi_version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.poi</groupId>
		<artifactId>poi-ooxml-schemas</artifactId>
		<version>${poi_version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.poi</groupId>
		<artifactId>poi-scratchpad</artifactId>
		<version>${poi_version}</version>
	</dependency>
        <dependency>
	<groupId>org.apache.poi</groupId>
	<artifactId>poi-ooxml</artifactId>
	<version>${poi_version}</version>
</dependency>
<!-- poi end -->

我在这边使用的是3.8,回头会给出详细的pom.xml文件
它不是按照传统的load内存的文式去读这个xls文件,而是把xls文件当成一个xml然后以SAX的模式去读取这个excel。

关键处理部位

public void endElement(String uri, String localName, String name)方法中如下语句:
if (thisRow > 0 && thisStr != null&& thisStr.trim().length() > 0) {
	// logger.info("dataMap.put()");
	dataMap.put(String.valueOf(thisColumn), thisStr);
					
}


} else if ("row".equals(name)) {
		try {
			if (dataMap.keySet().size() > 0) {
				dataMap = MapUtil.sortByValue(dataMap);
				if (toQueue) {
					queue.put(dataMap);
				}
			}
		} catch (Exception e) {
			logger.error(
					"put data into queue error: " + e.getMessage(), e);
		}

其它辅助类

UUID.java

package batchpoc;

public class UUID {
	protected static int count = 0;

	public static synchronized String getUUID() {
		count++;
		long time = System.currentTimeMillis();

		String timePattern = Long.toHexString(time);
		int leftBit = 14 - timePattern.length();
		if (leftBit > 0) {
			timePattern = "0000000000".substring(0, leftBit) + timePattern;
		}

		String uuid = timePattern
				+ Long.toHexString(Double.doubleToLongBits(Math.random()))
				+ Long.toHexString(Double.doubleToLongBits(Math.random()))
				+ "000000000000000000";

		uuid = uuid.substring(0, 32).toUpperCase();

		return uuid;
	}
}

GuidCreator.java

package batchpoc;

import java.net.*;
import java.util.*;
import java.security.*;

public class GuidCreator {
	private String seedingString = "";
	private String rawGUID = "";
	private boolean bSecure = false;
	private static Random myRand;
	private static SecureRandom mySecureRand;

	private static String s_id;

	public static final int BeforeMD5 = 1;
	public static final int AfterMD5 = 2;
	public static final int FormatString = 3;
	static {
		mySecureRand = new SecureRandom();
		long secureInitializer = mySecureRand.nextLong();
		myRand = new Random(secureInitializer);
		try {
			s_id = InetAddress.getLocalHost().toString();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
	}

	public GuidCreator() {
	}

	/*
	 * Constructor with security option. Setting secure true enables each random
	 * number generated to be cryptographically strong. Secure false defaults to
	 * the standard Random function seeded with a single cryptographically
	 * strong random number.
	 */
	public GuidCreator(boolean secure) {
		bSecure = secure;
	}

	/*
	 * Method to generate the random GUID
	 */
	private void getRandomGUID(boolean secure) {
		MessageDigest md5 = null;
		StringBuffer sbValueBeforeMD5 = new StringBuffer();

		try {
			md5 = MessageDigest.getInstance("MD5");
		} catch (NoSuchAlgorithmException e) {
			System.out.println("Error: " + e);
		}

		try {
			long time = System.currentTimeMillis();
			long rand = 0;

			if (secure) {
				rand = mySecureRand.nextLong();
			} else {
				rand = myRand.nextLong();
			}

			// This StringBuffer can be a long as you need; the MD5
			// hash will always return 128 bits. You can change
			// the seed to include anything you want here.
			// You could even stream a file through the MD5 making
			// the odds of guessing it at least as great as that
			// of guessing the contents of the file!
			sbValueBeforeMD5.append(s_id);
			sbValueBeforeMD5.append(":");
			sbValueBeforeMD5.append(Long.toString(time));
			sbValueBeforeMD5.append(":");
			sbValueBeforeMD5.append(Long.toString(rand));

			seedingString = sbValueBeforeMD5.toString();
			md5.update(seedingString.getBytes());

			byte[] array = md5.digest();
			StringBuffer sb = new StringBuffer();
			for (int j = 0; j < array.length; ++j) {
				int b = array[j] & 0xFF;
				if (b < 0x10)
					sb.append(‘0‘);
				sb.append(Integer.toHexString(b));
			}

			rawGUID = sb.toString();

		} catch (Exception e) {
			System.out.println("Error:" + e);
		}
	}

	public String createNewGuid(int nFormatType, boolean secure) {
		getRandomGUID(secure);
		String sGuid = "";
		if (BeforeMD5 == nFormatType) {
			sGuid = this.seedingString;
		} else if (AfterMD5 == nFormatType) {
			sGuid = this.rawGUID;
		} else {
			sGuid = this.toString();
		}
		return sGuid;
	}

	public String createNewGuid(int nFormatType) {
		return this.createNewGuid(nFormatType, this.bSecure);
	}

	/*
	 * Convert to the standard format for GUID (Useful for SQL Server
	 * UniqueIdentifiers, etc.) Example: C2FEEEAC-CFCD-11D1-8B05-00600806D9B6
	 */
	public String toString() {
		String raw = rawGUID.toUpperCase();
		StringBuffer sb = new StringBuffer();
		sb.append(raw.substring(0, 8));
		sb.append("-");
		sb.append(raw.substring(8, 12));
		sb.append("-");
		sb.append(raw.substring(12, 16));
		sb.append("-");
		sb.append(raw.substring(16, 20));
		sb.append("-");
		sb.append(raw.substring(20));

		return sb.toString();
	}

	public static void main(String args[]) {
		GuidCreator myGUID = new GuidCreator();
//		System.out.println("Seeding String="
//				+ myGUID.createNewGuid(GuidCreator.BeforeMD5));
//		System.out.println("rawGUID="
//				+ myGUID.createNewGuid(GuidCreator.AfterMD5));
		System.out.println("RandomGUID="
				+ myGUID.createNewGuid(GuidCreator.AfterMD5));
	}
}

GuidByRandom.java

package batchpoc;

import java.text.SimpleDateFormat;

public class GuidByRandom {
	private static int cnt = 0;

	public static synchronized String getGUID() throws Exception {
		StringBuffer code = new StringBuffer();
		try {
			java.util.Date dt = new java.util.Date(System.currentTimeMillis());
			SimpleDateFormat fmt = new SimpleDateFormat("yyyyMMddHHmmssSSS");//format system time 
			String randomCode = fmt.format(dt);
			cnt = (cnt + 1) % 10000; // You are free the set %100 to
			// 1000,100000
			code.append(randomCode).append(cnt);
			return code.toString();
		} catch (Exception e) {
			throw new Exception("createFileName error:" + e.getMessage(), e);
		}
	}

	public static void main(String[] args) throws Exception {
		System.out.println(getGUID());
	}
}

Constants.java

package util;

public class Constants {

	public final static String ENUMERATION_EXCEL_TASK = "excel";
	public final static String ENUMERATION_TXT_TASK = "txt";
}

StringUtil.java

package util;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Calendar;
import java.util.Date;
import java.sql.Blob;
import java.text.*;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StringUtil {
	protected final static Logger logger = LoggerFactory
			.getLogger(StringUtil.class);

	public static Object unserializeObj(byte[] bytes) {
		ByteArrayInputStream bais = null;
		try {
			// 反序列化
			bais = new ByteArrayInputStream(bytes);
			ObjectInputStream ois = new ObjectInputStream(bais);
			return ois.readObject();
		} catch (Exception e) {
			logger.error("unserializeObj error:" + e.getMessage(), e);
		}
		return null;
	}

	public static byte[] serializeObj(Object obj) {
		ByteArrayOutputStream bout = null;
		ObjectOutputStream out = null;
		byte[] bytes = null;
		try {
			bout = new ByteArrayOutputStream();
			out = new ObjectOutputStream(bout);
			out.writeObject(obj);
			out.flush();
			bytes = bout.toByteArray();
		} catch (Exception e) {
			logger.error("serializeObject error:" + e.getMessage(), e);
		} finally {
			try {
				if (out != null) {
					out.close();
					out = null;
				}
			} catch (Exception e) {
			}
			try {
				if (bout != null) {
					bout.close();
					bout = null;
				}
			} catch (Exception e) {
			}
		}
		return bytes;
	}

	public static String escpaeCharacters(String s) {
		String val = "";
		try {
			if (s == null || s.length() < 1) {
				return s;
			}
			StringBuilder sb = new StringBuilder(s.length() + 16);
			for (int i = 0; i < s.length(); i++) {
				char c = s.charAt(i);
				switch (c) {
				case ‘\‘‘:
					sb.append("′");// ′");
					break;
				case ‘′‘:
					sb.append("′");// ′");
					break;
				case ‘\"‘:
					sb.append(""");
					break;
				case ‘"‘:
					sb.append(""");
					break;
				case ‘&‘:
					sb.append("&");
					break;
				case ‘#‘:
					sb.append("#");
					break;
				case ‘\\‘:
					sb.append(‘¥‘);
					break;

				case ‘>‘:
					sb.append(‘>‘);
					break;
				case ‘<‘:
					sb.append(‘<‘);
					break;
				default:
					sb.append(c);
					break;
				}
			}
			val = sb.toString();
			return val;
		} catch (Exception e) {
			logger.error("sanitized characters error: " + e.getMessage(), e);
			return s;
		}
	}

	public static boolean isNotNullOrEmpty(String str) {
		return str != null && str.trim().length() > 0;
	}

	public static boolean isNull(Object... params) {
		if (params == null) {
			return true;
		}

		for (Object obj : params) {
			if (obj == null) {
				return true;
			}
		}
		return false;
	}

	public static String getString(Object val) {
		String rtnVal = "";
		try {
			rtnVal = (String) val;
			rtnVal = rtnVal.trim();
		} catch (Exception e) {
			rtnVal = "";
		}
		return rtnVal;
	}

	public static String nullToStr(Object val) {
		return ((val == null) ? "" : String.valueOf(val).trim());
	}

	public static int getInt(Object val) {
		int rtnVal = -1;
		String rtnValStr = "-1";
		try {
			rtnValStr = (String) val;
			rtnValStr = rtnValStr.trim();
			rtnVal = Integer.parseInt(rtnValStr);
		} catch (Exception e) {
			rtnVal = -1;
		}

		return rtnVal;
	}

	public static String convertDateToStr(Date dt) {
		String dateStr = "";
		DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
		if (dt != null) {
			dateStr = format.format(dt);
		}
		return dateStr;
	}

	public static String convertDateToStr(Date dt, String formatter) {
		String dateStr = "";
		DateFormat format = new SimpleDateFormat(formatter);
		if (dt != null) {
			dateStr = format.format(dt);
		}
		return dateStr;
	}

	public static Date convertStrToDateByFormat(String dateStr) {
		String inputDateStr = "";
		SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");
		Date date = null;
		try {
			inputDateStr = dateStr;
			if (dateStr == null || dateStr.trim().length() < 1) {
				inputDateStr = "1900-01-01";
			}
			java.util.Date d = sf.parse(inputDateStr.toString().trim());
			date = new Date(d.getTime());
		} catch (Exception e) {
			logger.error(
					"convertStrToDateByFormat(" + dateStr + ") error:"
							+ e.getMessage(), e);
		}
		return date;
	}

	public static Date convertStrToDateByFormat(String dateStr, String formatter) {
		String inputDateStr = "";
		SimpleDateFormat sf = new SimpleDateFormat(formatter);
		Date date = null;
		try {
			inputDateStr = dateStr;
			if (dateStr == null || dateStr.trim().length() < 1) {
				inputDateStr = "1900-01-01 01:01:01";
			}
			java.util.Date d = sf.parse(inputDateStr.toString().trim());
			date = new Date(d.getTime());
		} catch (Exception e) {
			logger.error(
					"convertStrToDateByFormat(" + dateStr + ") error:"
							+ e.getMessage(), e);
		}
		return date;
	}

	public static Object deepcopy(Object src) throws Exception {
		ByteArrayOutputStream byteout = null;
		ObjectOutputStream out = null;
		ByteArrayInputStream bytein = null;
		ObjectInputStream in = null;
		Object dest = null;
		try {
			byteout = new ByteArrayOutputStream();
			out = new ObjectOutputStream(byteout);
			out.writeObject(src);

			bytein = new ByteArrayInputStream(byteout.toByteArray());

			in = new ObjectInputStream(bytein);

			dest = (Object) in.readObject();
		} catch (Exception e) {
			throw new Exception("deep copy object[" + src
					+ "] error cause by: " + e.getMessage(), e);
		} finally {
			try {
				if (in != null) {
					in.close();
					in = null;
				}
			} catch (Exception e) {
			}
			try {
				if (bytein != null) {
					bytein.close();
					bytein = null;
				}
			} catch (Exception e) {
			}
			try {
				if (out != null) {
					out.close();
					out = null;
				}
			} catch (Exception e) {
			}
			try {
				if (byteout != null) {
					byteout.close();
					byteout = null;
				}
			} catch (Exception e) {
			}
		}
		return dest;

	}

	public static Object blobToObject(Blob blob) throws Exception {
		Object obj = null;
		ObjectInputStream in = null;
		try {
			in = new ObjectInputStream(blob.getBinaryStream());
			obj = in.readObject();
			return obj;
		} catch (Exception e) {
			throw new Exception(e);
		} finally {
			try {
				if (in != null) {
					in.close();
					in = null;
				}
			} catch (Exception e) {
			}
		}
	}

	public static long dateSub(String dateStr) throws ParseException {
		SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd");
		java.util.Date d = sdf.parse(dateStr);
		Calendar calendar = Calendar.getInstance();
		calendar.setTime(new Date());
		long currentTime = calendar.getTimeInMillis();
		calendar.setTime(d);
		long timeEnd = calendar.getTimeInMillis();
		long theDay = (timeEnd - currentTime) / (1000 * 60 * 60 * 24);
		return theDay;
	}

	public static boolean isNumeric(String str) {
		Pattern pattern = Pattern.compile("[0-9]*");
		return pattern.matcher(str).matches();
	}
}

工程使用maven,因此给出pom.xml完整内容

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>webpoc</groupId>
	<artifactId>webpoc</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>war</packaging>
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<java.version>1.8</java.version>
		<jetty.version>9.3.3.v20150827</jetty.version>
		<slf4j.version>1.7.7</slf4j.version>
		<spring.version>4.2.1.RELEASE</spring.version>
		<spring.session.version>1.0.2.RELEASE</spring.session.version>
		<javax.servlet-api.version>2.5</javax.servlet-api.version>
		<activemq_version>5.8.0</activemq_version>
		<poi_version>3.8</poi_version>
	</properties>
	<dependencies>

		<!-- poi start -->
		<dependency>
			<groupId>org.apache.poi</groupId>
			<artifactId>poi</artifactId>
			<version>${poi_version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.poi</groupId>
			<artifactId>poi-ooxml-schemas</artifactId>
			<version>${poi_version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.poi</groupId>
			<artifactId>poi-scratchpad</artifactId>
			<version>${poi_version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.poi</groupId>
			<artifactId>poi-ooxml</artifactId>
			<version>${poi_version}</version>
		</dependency>
		<!-- poi end -->
		<!-- active mq start -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.8.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
			<version>${activemq_version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.xbean</groupId>
			<artifactId>xbean-spring</artifactId>
			<version>3.16</version>
		</dependency>
		<!-- active mq end -->

		<!-- servlet start -->
		<dependency>
			<groupId>javax.servlet</groupId>
			<artifactId>servlet-api</artifactId>
			<version>${javax.servlet-api.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>javax.servlet.jsp</groupId>
			<artifactId>jsp-api</artifactId>
			<version>2.1</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>javax.servlet</groupId>
			<artifactId>jstl</artifactId>
			<version>1.2</version>
		</dependency>
		<!-- servlet end -->

		<!-- redis start -->
		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>2.5.2</version>
		</dependency>
		<dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>1.0.2</version>
		</dependency>
		<!-- redis end -->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>jcl-over-slf4j</artifactId>
			<version>${slf4j.version}</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>${slf4j.version}</version>
		</dependency>

		<!-- spring conf start -->
		<dependency>
			<groupId>org.springframework.data</groupId>
			<artifactId>spring-data-redis</artifactId>
			<version>1.5.2.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-webmvc</artifactId>
			<version>${spring.version}</version>
			<exclusions>
				<exclusion>
					<groupId>commons-logging</groupId>
					<artifactId>commons-logging</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-tx</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-aop</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context-support</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.data</groupId>
			<artifactId>spring-data-redis</artifactId>
			<version>1.4.1.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-orm</artifactId>
			<version>${spring.version}</version>
		</dependency>


		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${spring.version}</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.session</groupId>
			<artifactId>spring-session</artifactId>
			<version>${spring.session.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${spring.version}</version>
		</dependency>
		<!-- spring conf end -->
	</dependencies>
	<build>
		<sourceDirectory>src</sourceDirectory>
		<plugins>
			<plugin>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>
			<plugin>
				<artifactId>maven-war-plugin</artifactId>
				<version>2.4</version>
				<configuration>
					<warSourceDirectory>WebContent</warSourceDirectory>
					<failOnMissingWebXml>false</failOnMissingWebXml>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>


给出

以上是关于使用阻塞式队列处理大数据的主要内容,如果未能解决你的问题,请参考以下文章

Web在线聊天室(12) --- 收发消息(单例模式+阻塞式队列)

Web在线聊天室(12) --- 收发消息(单例模式+阻塞式队列)

多线程案例-阻塞式队列

# Java 常用代码片段

# Java 常用代码片段

多线程(七):单例模式+阻塞式队列