多线程Callable处理数据
Posted mjtabu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程Callable处理数据相关的知识,希望对你有一定的参考价值。
1.数据拆分多线程Callable处理
1.定义一个20大小的线程池
2.根据数据 List 处理每个线程可以分到的数量List
3.Callable 线程处理数据
4.Future 获取Callcable线程处理后的数据
5.把 Future 获取的数据重新 addAll 进 List
6.返回数据
代码如下:
public List<String> packageStrings(List<String> list) throws InterruptedException, ExecutionException{ // 开始时间 long start = System.currentTimeMillis(); // 线程数量 int threadNum = 20; // 每条线程处理的数据 int count = list.size() / threadNum; if (0 != list.size() % count) { threadNum++; } // 创建一个线程池 ExecutorService exec = Executors.newFixedThreadPool(threadNum); // 定义一个任务集合 List<Callable<List<String>>> tasks = new ArrayList<Callable<List<String>>>(); int end = 0; for (int i = 0; i < threadNum; i++) { if(i*count+count>list.size()) { end = list.size(); } else { end = i*count+count; } // 确定每条线程的数据 List<String> cutList = list.subList(i*count, end); final List<String> listStr = cutList; final List<String> newList = new ArrayList<String>(); Callable<List<String>> task = new Callable<List<String>>() { @Override public List<String> call() throws Exception { for (String str : listStr) { // 封装处理数据 str = packageString(str); newList.add(str); } return newList; } }; // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系 tasks.add(task); } // 获取数据并重新set List<Future<List<String>>> results = exec.invokeAll(tasks); list.clear(); for (Future<List<String>> future : results) { list.addAll(future.get()); } // 关闭线程池 exec.shutdown(); //System.out.println(list.toString()); System.err.println(" 执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒"); return list; }
2.数据未拆分多线程Callable处理数据
1.定义一个20大小的线程池
2.定义 Callable 的任务集合
3.遍历List处理数据
4.把 Callable 添加到 Callable 的任务集合中
5.把通过 Future 获取的数据并重新 add 进 List
6.返回数据
代码如下:
public List<String> CallablePackageStrings(List<String> list) throws InterruptedException, ExecutionException{ // 开始时间 long start = System.currentTimeMillis(); // 线程数量 int threadNum = 20; // 创建一个线程池 ExecutorService exec = Executors.newFixedThreadPool(threadNum); // 定义一个任务集合 List<Callable<String>> tasks = new ArrayList<Callable<String>>(); // 处理数据 for (final String str : list) { Callable<String> task = new Callable<String>() { @Override public String call() throws Exception { // 封装处理数据 return packageString(str); } }; // 这里提交的任务容器列表和返回的Future列表存在顺序对应的关系 tasks.add(task); } // 获取数据并重新set List<Future<String>> results = exec.invokeAll(tasks); list.clear(); for (Future<String> future : results) { list.add(future.get()); } // 关闭线程池 exec.shutdown(); //System.out.println(list.toString()); System.err.println(" 执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒"); return list; }
3.数据未拆分多线程Callable处理数据
1.定义一个20大小的线程池
2.定义Future的任务集合
3.遍历List处理数据
4.把 Future 添加到 Future 的任务集合中
5.把通过 Future 获取的数据并重新 add 进 List
6.返回数据
代码如下:
public List<String> packageStrings(List<String> list) throws InterruptedException, ExecutionException{ // 开始时间 long start = System.currentTimeMillis(); // 线程数量 int threadNum = 20; // 创建一个线程池 ExecutorService exec = Executors.newFixedThreadPool(threadNum); // 定义FutureTask任务集合 List<Future<String>> futures = new ArrayList<Future<String>>(); // 处理数据 for (final String str : list) { Future<String> future = exec.submit(new Callable<String>() { @Override public String call() throws Exception { // 封装处理数据 return packageString(str); } }); futures.add(future); } list.clear(); for (Future<String> future : futures) { list.add(future.get()); } // 关闭线程池 exec.shutdown(); //System.out.println(list.toString()); System.err.println(" 执行任务消耗了 :" + (System.currentTimeMillis() - start) + "毫秒"); return list; }
以上三种推荐第三种:
第二种是在第一种基础上进行优化
第三种是在第二种基础上进行冗余代码去除
以上是关于多线程Callable处理数据的主要内容,如果未能解决你的问题,请参考以下文章