批量任务体现多线程的威力!
Posted lingyejun
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了批量任务体现多线程的威力!相关的知识,希望对你有一定的参考价值。
背景
对于多线程的理解不是非常深刻,工作中用到多线程代码的机会也不多,前不久遇到了一个使用场景,通过编码实现后对于多线程的理解和应用有了更加深刻的理解。场景如下:现有给用户发送产品调研的需求,运营的同事拿来了一个Excel文件,要求给Excel里面大约六万个手机号发送调研短信。
最简单的方法就是一个循环然后单线程顺序发送,但是核心问题在于,给短信运营商发短信的接口响应时间较长,假设平均100ms的响应时间,那么单线程发送的话需要6万*0.1秒=6000秒。显然这个时间是不能接受的,运营商系统的发送接口我们是不能优化的,只得增强自己的发送和处理能力才能尽快的完成任务。
读取Excel中的信息
工具类代码,Maven中引入如下两个包
<dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>3.17</version> </dependency> <dependency> <groupId>org.apache.xmlbeans</groupId> <artifactId>xmlbeans</artifactId> <version>2.6.0</version> </dependency>
读取Excel的工具类代码
/** * 读取Excel的文件信息 * * @param fileName */ public static void readFromExcel(String fileName) { InputStream is = null; try { is = new FileInputStream(fileName); XSSFWorkbook workbook = new XSSFWorkbook(is); XSSFSheet sheet = workbook.getSheetAt(0); int num = 0; // 循环行Row for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) { XSSFRow row = sheet.getRow(rowNum); String phoneNumber = getStringValueFromCell(row.getCell(0)).trim(); phoneList.add(phoneNumber); } System.out.println(num); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 读取Excel里面Cell内容 * * @param cell * @return */ private static String getStringValueFromCell(XSSFCell cell) { // 单元格内的时间格式 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); // 单元格内的数字类型 DecimalFormat decimalFormat = new DecimalFormat("#.#####"); // 单元格默认为空 String cellValue = ""; if (cell == null) { return cellValue; } // 按类型读取 if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) { cellValue = cell.getStringCellValue(); } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) { // 日期转为时间形式 if (DateUtil.isCellDateFormatted(cell)) { double d = cell.getNumericCellValue(); Date date = DateUtil.getJavaDate(d); cellValue = dateFormat.format(date); } else { // 其他转为数字 cellValue = decimalFormat.format((cell.getNumericCellValue())); } } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) { cellValue = ""; } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) { cellValue = String.valueOf(cell.getBooleanCellValue()); } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) { cellValue = ""; } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) { cellValue = cell.getCellFormula().toString(); } return cellValue; }
模拟运营商发送短信的方法
/** * 外部接口耗时长,通过多线程增强 * * @param userPhone */ public void sendMsgToPhone(String userPhone) { try { Thread.sleep(SEND_COST_TIME); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send message to : " + userPhone); }
多线程
简单的单线程发送
/** * 单线程发送 * * @param phoneList * @return */ private long singleThread(List<String> phoneList) { long start = System.currentTimeMillis(); /*// 直接主线程执行 for (String phoneNumber : phoneList) { threadOperation.sendMsgToPhone(phoneNumber); }*/ SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList); smet.start(); long totalTime = System.currentTimeMillis() - start; System.out.println("单线程发送总时间:" + totalTime); return totalTime; }
对于大批量发短信的场景,如果使用单线程将全部一千个号码发送完毕的话,大约需要103132ms,可见效率低下,耗费时间较长。
多线程发送短信中的一个核心要点是,将全部手机号码拆分成多个组后,分配给每个线程进行执行。
两个线程的示例
/** * 两个线程发送 * * @param phoneList * @return */ private long twoThreads(List<String> phoneList) { long start = System.currentTimeMillis(); List<String> list1 = phoneList.subList(0, phoneList.size() / 2); List<String> list2 = phoneList.subList(phoneList.size() / 2, phoneList.size()); SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1); smet.start(); SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2); smet1.start(); return 0; }
另一种方便的分配方式
/** * 另外一种分配方式 * * @param phoneList */ private void otherThread(List<String> phoneList) { for (int threadNo = 0; threadNo < 10; threadNo++) { int numbersPerThread = 10; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10); SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list); smet.start(); if (list.size() < numbersPerThread) { break; } } }
线程池发送
/** * 线程池发送 * * @param phoneList * @return */ private void threadPool(List<String> phoneList) { for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) { int numbersPerThread = 10; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10); threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list)); } threadOperation.executorService.shutdown(); }
使用Callable发送
/** * 多线程发送 * * @param phoneList * @return */ private void multiThreadSend(List<String> phoneList) { List<Future<Long>> futures = new ArrayList<>(); for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) { int numbersPerThread = 100; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100); Future<Long> future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo))); futures.add(future); } for (Future<Long> future : futures) { try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } threadOperation.executorService.shutdown(); }
使用多线程发送,将发送任务进行分割然后分配给每个线程执行,执行完毕需要10266ms,可见执行效率明显提升,消耗时间明显缩短。
完整代码
package com.lingyejun.tick.authenticator; import org.apache.poi.ss.usermodel.DateUtil; import org.apache.poi.xssf.usermodel.XSSFCell; import org.apache.poi.xssf.usermodel.XSSFRow; import org.apache.poi.xssf.usermodel.XSSFSheet; import org.apache.poi.xssf.usermodel.XSSFWorkbook; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.*; public class ThreadOperation { // 发短信的同步等待时间 private static final long SEND_COST_TIME = 100L; // 手机号文件 private static final String FILE_NAME = "/Users/lingye/Downloads/phone_number.xlsx"; // 手机号列表 private static List<String> phoneList = new ArrayList<>(); // 单例对象 private static volatile ThreadOperation threadOperation; // 线程个数 private static final int THREAD_POOL_SIZE = 10; // 初始化线程池 private ExecutorService executorService = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); public ThreadOperation() { // 从本地文件中读取手机号码 readFromExcel(FILE_NAME); } public static void main(String[] args) { ThreadOperation threadOperation = getInstance(); //threadOperation.singleThread(phoneList); threadOperation.multiThreadSend(phoneList); } /** * 单例获取对象 * * @return */ public static ThreadOperation getInstance() { if (threadOperation == null) { synchronized (ThreadOperation.class) { if (threadOperation == null) { threadOperation = new ThreadOperation(); } } } return threadOperation; } /** * 读取Excel的文件信息 * * @param fileName */ public static void readFromExcel(String fileName) { InputStream is = null; try { is = new FileInputStream(fileName); XSSFWorkbook workbook = new XSSFWorkbook(is); XSSFSheet sheet = workbook.getSheetAt(0); int num = 0; // 循环行Row for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) { XSSFRow row = sheet.getRow(rowNum); String phoneNumber = getStringValueFromCell(row.getCell(0)).trim(); phoneList.add(phoneNumber); } System.out.println(num); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** * 读取Excel里面Cell内容 * * @param cell * @return */ private static String getStringValueFromCell(XSSFCell cell) { // 单元格内的时间格式 SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); // 单元格内的数字类型 DecimalFormat decimalFormat = new DecimalFormat("#.#####"); // 单元格默认为空 String cellValue = ""; if (cell == null) { return cellValue; } // 按类型读取 if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) { cellValue = cell.getStringCellValue(); } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) { // 日期转为时间形式 if (DateUtil.isCellDateFormatted(cell)) { double d = cell.getNumericCellValue(); Date date = DateUtil.getJavaDate(d); cellValue = dateFormat.format(date); } else { // 其他转为数字 cellValue = decimalFormat.format((cell.getNumericCellValue())); } } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) { cellValue = ""; } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) { cellValue = String.valueOf(cell.getBooleanCellValue()); } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) { cellValue = ""; } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) { cellValue = cell.getCellFormula().toString(); } return cellValue; } /** * 外部接口耗时长,通过多线程增强 * * @param userPhone */ public void sendMsgToPhone(String userPhone) { try { Thread.sleep(SEND_COST_TIME); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("send message to : " + userPhone); } /** * 单线程发送 * * @param phoneList * @return */ private long singleThread(List<String> phoneList) { long start = System.currentTimeMillis(); /*// 直接主线程执行 for (String phoneNumber : phoneList) { threadOperation.sendMsgToPhone(phoneNumber); }*/ SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList); smet.start(); long totalTime = System.currentTimeMillis() - start; System.out.println("单线程发送总时间:" + totalTime); return totalTime; } /** * 另外一种分配方式 * * @param phoneList */ private void otherThread(List<String> phoneList) { for (int threadNo = 0; threadNo < 10; threadNo++) { int numbersPerThread = 10; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10); SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list); smet.start(); if (list.size() < numbersPerThread) { break; } } } /** * 两个线程发送 * * @param phoneList * @return */ private long twoThreads(List<String> phoneList) { long start = System.currentTimeMillis(); List<String> list1 = phoneList.subList(0, phoneList.size() / 2); List<String> list2 = phoneList.subList(phoneList.size() / 2, phoneList.size()); SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1); smet.start(); SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2); smet1.start(); return 0; } /** * 线程池发送 * * @param phoneList * @return */ private void threadPool(List<String> phoneList) { for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) { int numbersPerThread = 10; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10); threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list)); } threadOperation.executorService.shutdown(); } /** * 多线程发送 * * @param phoneList * @return */ private void multiThreadSend(List<String> phoneList) { List<Future<Long>> futures = new ArrayList<>(); for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) { int numbersPerThread = 100; List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100); Future<Long> future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo))); futures.add(future); } for (Future<Long> future : futures) { try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } threadOperation.executorService.shutdown(); } public class SendMsgExtendThread extends Thread { private List<String> numberListByThread; public SendMsgExtendThread(List<String> numberList) { numberListByThread = numberList; } @Override public void run() { long startTime = System.currentTimeMillis(); for (int i = 0; i < numberListByThread.size(); i++) { System.out.print("no." + (i + 1)); sendMsgToPhone(numberListByThread.get(i)); } System.out.println("== single thread send " + numberListByThread.size() + "execute time:" + (System.currentTimeMillis() - startTime) + " ms"); } } public class SendMsgImplCallable implements Callable<Long> { private List<String> numberListByThread; private String threadName; public SendMsgImplCallable(List<String> numberList, String threadName) { numberListByThread = numberList; this.threadName = threadName; } @Override public Long call() throws Exception { Long startMills = System.currentTimeMillis(); for (String number : numberListByThread) { sendMsgToPhone(number); } Long endMills = System.currentTimeMillis(); return endMills - startMills; } } }
以上是关于批量任务体现多线程的威力!的主要内容,如果未能解决你的问题,请参考以下文章