串行程序并行化

Posted bruce128

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了串行程序并行化相关的知识,希望对你有一定的参考价值。

    考虑这样一个问题:统计某个工程的代码行数。首先想到的思路便是,递归文件树,每层递归里,循环遍历父文件夹下的所有子文件,如果子文件是文件夹,那么再对这个文件夹进行递归调用。于是问题很轻松的解决了。这个方案可以优化吗? 了

    再回想这个问题,可以发现,循环里的递归调用其实相互之间是独立的,互不干扰,各自统计自己路径下的代码文件的行数。于是,发现了这个方案的可优化点——利用线程池进行并行处理。于是一个串行的求解方案被改进成了并行方案。

    不能光说不练,写了一个Demo,对串行方案和并行方案进行了量化对比。代码如下:

import java.io.*;
import java.util.Queue;
import java.util.concurrent.*;

/**
 * Created by cdlvsheng on 2016/5/16.
 */
public class ParallelSequentialContrast {
	
	int                    coreSize = Runtime.getRuntime().availableProcessors();
	ThreadPoolExecutor     exec     = new ThreadPoolExecutor(coreSize * 4, coreSize * 5, 0, TimeUnit.SECONDS,
			new LinkedBlockingQueue<Runnable>(10000), new ThreadPoolExecutor.CallerRunsPolicy());
	Queue<Future<Integer>> queue    = new ConcurrentLinkedQueue<Future<Integer>>();

	private int countLineNum(File f) {
		if (!f.getName().endsWith("java") && !f.getName().endsWith(".js") && !f.getName().endsWith(".vm")) return 0;

		int sum = 0;
		try {
			BufferedReader br  = new BufferedReader(new FileReader(f));
			String         str = null;
			while ((str = br.readLine()) != null) sum++;
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return sum;
	}

	private class Task implements Callable<Integer> {
		File f;

		public Task(File f) {
			this.f = f;
		}

		public Integer call() throws Exception {
			int sum = 0;

			if (f.isDirectory()) {
				File[] fs = f.listFiles();
				for (File file : fs) {
					if (file.isDirectory()) queue.add(exec.submit(new Task(file)));
					else sum += countLineNum(file);
				}
			} else sum += countLineNum(f);

			return sum;
		}
	}

	public int parallelTraverse(File f) {
		queue.add(exec.submit(new Task(f)));
		int sum = 0;
		while (!queue.isEmpty()) {
			try {
				Future<Integer> future = queue.poll();
				sum += future.get();
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		exec.shutdown();
		return sum;
	}


	public int sequentialTraverse(File f) {
		int sum = 0;

		if (f.isDirectory()) {
			File[] fs = f.listFiles();
			for (File file : fs) {
				if (file.isDirectory()) sum += sequentialTraverse(file);
				else sum += countLineNum(file);
			}
		} else sum += countLineNum(f);

		return sum;
	}

	public void parallelTest(ParallelSequentialContrast psc, String pathname) {
		long start    = System.currentTimeMillis();
		int  sum      = psc.parallelTraverse(new File(pathname));
		long duration = System.currentTimeMillis() - start;
		System.out.println(String.format("parallel test, %d lines of code were found, time cost is %d ms", sum, duration));
	}

	public void sequentialTest(ParallelSequentialContrast psc, String pathname) {
		long start    = System.currentTimeMillis();
		int  sum      = psc.sequentialTraverse(new File(pathname));
		long duration = System.currentTimeMillis() - start;
		System.out.println(String.format("sequential test, %d lines of code were found, time cost is %d ms", sum, duration));
	}

	public static void main(String[] args) {
		ParallelSequentialContrast psc      = new ParallelSequentialContrast();
		String                     pathname = "D:\\\\Code_Git";
		psc.sequentialTest(psc, pathname);
		psc.parallelTest(psc, pathname);
	}
}

    因为要不断的扫磁盘(虽然我的是固态硬盘),所以并行方案的线程池开的很大。IO密集型程序的相对CPU密集型程序的线程池会更大。

    程序运行结果如下:

sequential test, 415079 lines of code were found, time cost is 364 ms
parallel test, 415079 lines of code were found, time cost is 163 ms

    可以发现,在结果同等精确的情况下,串行方案耗时是并行方案的两倍多。这个是在我个人PC上做的测试,如果是线上服务器运行,恐怕差距只会更加明显。

    如果一个大任务,由许多个相互独立的子任务组成,我们就可以在这里找突破点,把一个串行程序并行化,榨干多和服务器的性能!

    JDK1.7提供了一个Fork/Join的框架,其原理与这个并行方案如出一辙。Fork/Join框架在每fork一个任务后,都会把这个任务甩进一个工作队列,供线程池消费。所谓框架,也就是把常见问题的解决方案模板化,傻瓜化。关于Fork/Join框架,在我的前面一篇博客里有介绍:点击打开链接


以上是关于串行程序并行化的主要内容,如果未能解决你的问题,请参考以下文章

如何使用带有串行内循环的openMP并行化外循环以进行数组添加

并行代码比串行代码慢(值函数迭代示例)

OpenMP 循环运行代码比串行循环慢

尽管并行编译,Mex 文件仍串行执行

运行 SYCL 代码时结果不正确。在尝试并行化循环时

使用ray进行最近邻搜索的并行化