批量任务体现多线程的威力!

Posted lingyejun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了批量任务体现多线程的威力!相关的知识,希望对你有一定的参考价值。

背景

对于多线程的理解不是非常深刻,工作中用到多线程代码的机会也不多,前不久遇到了一个使用场景,通过编码实现后对于多线程的理解和应用有了更加深刻的理解。场景如下:现有给用户发送产品调研的需求,运营的同事拿来了一个Excel文件,要求给Excel里面大约六万个手机号发送调研短信。

最简单的方法就是一个循环然后单线程顺序发送,但是核心问题在于,给短信运营商发短信的接口响应时间较长,假设平均100ms的响应时间,那么单线程发送的话需要6万*0.1秒=6000秒。显然这个时间是不能接受的,运营商系统的发送接口我们是不能优化的,只得增强自己的发送和处理能力才能尽快的完成任务。

读取Excel中的信息

工具类代码,Maven中引入如下两个包

<dependency>
    <groupId>org.apache.poi</groupId>
    <artifactId>poi-ooxml</artifactId>
    <version>3.17</version>
</dependency>
<dependency>
    <groupId>org.apache.xmlbeans</groupId>
    <artifactId>xmlbeans</artifactId>
    <version>2.6.0</version>
</dependency>

读取Excel的工具类代码

    /**
     * 读取Excel的文件信息
     *
     * @param fileName
     */
    public static void readFromExcel(String fileName) {
        InputStream is = null;
        try {
            is = new FileInputStream(fileName);
            XSSFWorkbook workbook = new XSSFWorkbook(is);
            XSSFSheet sheet = workbook.getSheetAt(0);
            int num = 0;
            // 循环行Row
            for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) {
                XSSFRow row = sheet.getRow(rowNum);
                String phoneNumber = getStringValueFromCell(row.getCell(0)).trim();
                phoneList.add(phoneNumber);
            }
            System.out.println(num);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 读取Excel里面Cell内容
     *
     * @param cell
     * @return
     */
    private static String getStringValueFromCell(XSSFCell cell) {

        // 单元格内的时间格式
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
        // 单元格内的数字类型
        DecimalFormat decimalFormat = new DecimalFormat("#.#####");
        // 单元格默认为空
        String cellValue = "";

        if (cell == null) {
            return cellValue;
        }

        // 按类型读取
        if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) {
            cellValue = cell.getStringCellValue();
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) {
            // 日期转为时间形式
            if (DateUtil.isCellDateFormatted(cell)) {
                double d = cell.getNumericCellValue();
                Date date = DateUtil.getJavaDate(d);
                cellValue = dateFormat.format(date);
            } else {
                // 其他转为数字
                cellValue = decimalFormat.format((cell.getNumericCellValue()));
            }
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) {
            cellValue = "";
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) {
            cellValue = String.valueOf(cell.getBooleanCellValue());
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) {
            cellValue = "";
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) {
            cellValue = cell.getCellFormula().toString();
        }
        return cellValue;
    }  

模拟运营商发送短信的方法

    /**
     * 外部接口耗时长,通过多线程增强
     *
     * @param userPhone
     */
    public void sendMsgToPhone(String userPhone) {
        try {
            Thread.sleep(SEND_COST_TIME);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("send message to : " + userPhone);
    }

多线程   

简单的单线程发送

/**
     * 单线程发送
     *
     * @param phoneList
     * @return
     */
    private long singleThread(List<String> phoneList) {
        long start = System.currentTimeMillis();
        /*// 直接主线程执行
        for (String phoneNumber : phoneList) {
            threadOperation.sendMsgToPhone(phoneNumber);
        }*/
        SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList);
        smet.start();
        long totalTime = System.currentTimeMillis() - start;
        System.out.println("单线程发送总时间:" + totalTime);
        return totalTime;
    }

对于大批量发短信的场景,如果使用单线程将全部一千个号码发送完毕的话,大约需要103132ms,可见效率低下,耗费时间较长。

多线程发送短信中的一个核心要点是,将全部手机号码拆分成多个组后,分配给每个线程进行执行。

两个线程的示例

    /**
     * 两个线程发送
     *
     * @param phoneList
     * @return
     */
    private long twoThreads(List<String> phoneList) {
        long start = System.currentTimeMillis();
        List<String> list1 = phoneList.subList(0, phoneList.size() / 2);
        List<String> list2 = phoneList.subList(phoneList.size() / 2, phoneList.size());
        SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1);
        smet.start();
        SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2);
        smet1.start();
        return 0;
    }

 另一种方便的分配方式

    /**
     * 另外一种分配方式
     *
     * @param phoneList
     */
    private void otherThread(List<String> phoneList) {
        for (int threadNo = 0; threadNo < 10; threadNo++) {
            int numbersPerThread = 10;
            List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
            SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list);
            smet.start();
            if (list.size() < numbersPerThread) {
                break;
            }
        }
    }

线程池发送

    /**
     * 线程池发送
     *
     * @param phoneList
     * @return
     */
    private void threadPool(List<String> phoneList) {
        for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) {
            int numbersPerThread = 10;
            List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
            threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list));
        }
        threadOperation.executorService.shutdown();
    }

使用Callable发送

    /**
     * 多线程发送
     *
     * @param phoneList
     * @return
     */
    private void multiThreadSend(List<String> phoneList) {
        List<Future<Long>> futures = new ArrayList<>();
        for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) {
            int numbersPerThread = 100;
            List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100);
            Future<Long> future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo)));
            futures.add(future);
        }
        for (Future<Long> future : futures) {
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        threadOperation.executorService.shutdown();
    }

使用多线程发送,将发送任务进行分割然后分配给每个线程执行,执行完毕需要10266ms,可见执行效率明显提升,消耗时间明显缩短。

完整代码

package com.lingyejun.tick.authenticator;

import org.apache.poi.ss.usermodel.DateUtil;
import org.apache.poi.xssf.usermodel.XSSFCell;
import org.apache.poi.xssf.usermodel.XSSFRow;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.*;

public class ThreadOperation {

    // 发短信的同步等待时间
    private static final long SEND_COST_TIME = 100L;

    // 手机号文件
    private static final String FILE_NAME = "/Users/lingye/Downloads/phone_number.xlsx";

    // 手机号列表
    private static List<String> phoneList = new ArrayList<>();

    // 单例对象
    private static volatile ThreadOperation threadOperation;

    // 线程个数
    private static final int THREAD_POOL_SIZE = 10;

    // 初始化线程池
    private ExecutorService executorService = new ThreadPoolExecutor(THREAD_POOL_SIZE, THREAD_POOL_SIZE,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());

    public ThreadOperation() {
        // 从本地文件中读取手机号码
        readFromExcel(FILE_NAME);
    }

    public static void main(String[] args) {
        ThreadOperation threadOperation = getInstance();
        //threadOperation.singleThread(phoneList);
        threadOperation.multiThreadSend(phoneList);
    }

    /**
     * 单例获取对象
     *
     * @return
     */
    public static ThreadOperation getInstance() {
        if (threadOperation == null) {
            synchronized (ThreadOperation.class) {
                if (threadOperation == null) {
                    threadOperation = new ThreadOperation();
                }
            }
        }
        return threadOperation;
    }

    /**
     * 读取Excel的文件信息
     *
     * @param fileName
     */
    public static void readFromExcel(String fileName) {
        InputStream is = null;
        try {
            is = new FileInputStream(fileName);
            XSSFWorkbook workbook = new XSSFWorkbook(is);
            XSSFSheet sheet = workbook.getSheetAt(0);
            int num = 0;
            // 循环行Row
            for (int rowNum = 0, lastNum = sheet.getLastRowNum(); rowNum <= lastNum; rowNum++) {
                XSSFRow row = sheet.getRow(rowNum);
                String phoneNumber = getStringValueFromCell(row.getCell(0)).trim();
                phoneList.add(phoneNumber);
            }
            System.out.println(num);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 读取Excel里面Cell内容
     *
     * @param cell
     * @return
     */
    private static String getStringValueFromCell(XSSFCell cell) {

        // 单元格内的时间格式
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
        // 单元格内的数字类型
        DecimalFormat decimalFormat = new DecimalFormat("#.#####");
        // 单元格默认为空
        String cellValue = "";

        if (cell == null) {
            return cellValue;
        }

        // 按类型读取
        if (cell.getCellType() == XSSFCell.CELL_TYPE_STRING) {
            cellValue = cell.getStringCellValue();
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_NUMERIC) {
            // 日期转为时间形式
            if (DateUtil.isCellDateFormatted(cell)) {
                double d = cell.getNumericCellValue();
                Date date = DateUtil.getJavaDate(d);
                cellValue = dateFormat.format(date);
            } else {
                // 其他转为数字
                cellValue = decimalFormat.format((cell.getNumericCellValue()));
            }
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BLANK) {
            cellValue = "";
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_BOOLEAN) {
            cellValue = String.valueOf(cell.getBooleanCellValue());
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_ERROR) {
            cellValue = "";
        } else if (cell.getCellType() == XSSFCell.CELL_TYPE_FORMULA) {
            cellValue = cell.getCellFormula().toString();
        }
        return cellValue;
    }

    /**
     * 外部接口耗时长,通过多线程增强
     *
     * @param userPhone
     */
    public void sendMsgToPhone(String userPhone) {
        try {
            Thread.sleep(SEND_COST_TIME);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("send message to : " + userPhone);
    }

    /**
     * 单线程发送
     *
     * @param phoneList
     * @return
     */
    private long singleThread(List<String> phoneList) {
        long start = System.currentTimeMillis();
        /*// 直接主线程执行
        for (String phoneNumber : phoneList) {
            threadOperation.sendMsgToPhone(phoneNumber);
        }*/
        SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(phoneList);
        smet.start();
        long totalTime = System.currentTimeMillis() - start;
        System.out.println("单线程发送总时间:" + totalTime);
        return totalTime;
    }

    /**
     * 另外一种分配方式
     *
     * @param phoneList
     */
    private void otherThread(List<String> phoneList) {
        for (int threadNo = 0; threadNo < 10; threadNo++) {
            int numbersPerThread = 10;
            List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
            SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list);
            smet.start();
            if (list.size() < numbersPerThread) {
                break;
            }
        }
    }

    /**
     * 两个线程发送
     *
     * @param phoneList
     * @return
     */
    private long twoThreads(List<String> phoneList) {
        long start = System.currentTimeMillis();
        List<String> list1 = phoneList.subList(0, phoneList.size() / 2);
        List<String> list2 = phoneList.subList(phoneList.size() / 2, phoneList.size());
        SendMsgExtendThread smet = threadOperation.new SendMsgExtendThread(list1);
        smet.start();
        SendMsgExtendThread smet1 = threadOperation.new SendMsgExtendThread(list2);
        smet1.start();
        return 0;
    }

    /**
     * 线程池发送
     *
     * @param phoneList
     * @return
     */
    private void threadPool(List<String> phoneList) {
        for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) {
            int numbersPerThread = 10;
            List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 10);
            threadOperation.executorService.execute(threadOperation.new SendMsgExtendThread(list));
        }
        threadOperation.executorService.shutdown();
    }

    /**
     * 多线程发送
     *
     * @param phoneList
     * @return
     */
    private void multiThreadSend(List<String> phoneList) {
        List<Future<Long>> futures = new ArrayList<>();
        for (int threadNo = 0; threadNo < THREAD_POOL_SIZE; threadNo++) {
            int numbersPerThread = 100;
            List<String> list = phoneList.subList(threadNo * numbersPerThread, (threadNo * numbersPerThread) + 100);
            Future<Long> future = threadOperation.executorService.submit(threadOperation.new SendMsgImplCallable(list, String.valueOf(threadNo)));
            futures.add(future);
        }
        for (Future<Long> future : futures) {
            try {
                System.out.println(future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        threadOperation.executorService.shutdown();
    }

    public class SendMsgExtendThread extends Thread {

        private List<String> numberListByThread;

        public SendMsgExtendThread(List<String> numberList) {
            numberListByThread = numberList;
        }

        @Override
        public void run() {
            long startTime = System.currentTimeMillis();
            for (int i = 0; i < numberListByThread.size(); i++) {
                System.out.print("no." + (i + 1));
                sendMsgToPhone(numberListByThread.get(i));
            }
            System.out.println("== single thread send " + numberListByThread.size() + "execute time:" + (System.currentTimeMillis() - startTime) + " ms");
        }
    }

    public class SendMsgImplCallable implements Callable<Long> {

        private List<String> numberListByThread;

        private String threadName;

        public SendMsgImplCallable(List<String> numberList, String threadName) {
            numberListByThread = numberList;
            this.threadName = threadName;
        }

        @Override
        public Long call() throws Exception {
            Long startMills = System.currentTimeMillis();
            for (String number : numberListByThread) {
                sendMsgToPhone(number);
            }
            Long endMills = System.currentTimeMillis();
            return endMills - startMills;
        }
    }
}

  

以上是关于批量任务体现多线程的威力!的主要内容,如果未能解决你的问题,请参考以下文章

多线程 Thread 线程同步 synchronized

并发实战:多线程处理任务,结束后,执行后续操作

多个请求是多线程吗

多个用户访问同一段代码

详解C++多线程

多线程