并行程序没有速度增加与线性程序

Posted

技术标签:

【中文标题】并行程序没有速度增加与线性程序【英文标题】:Parallel program no speed increase vs linear program 【发布时间】:2013-07-31 22:33:45 【问题描述】:

我创建了一个更复杂程序的模型程序,该程序将利用多线程和多个硬盘驱动器来提高性能。数据量如此之大,以至于无法将所有数据读入内存,因此数据将被读取、处理并以块的形式写回。该测试程序采用流水线设计,能够在 3 个不同的线程上同时读取、处理和写入。因为读写是针对不同的硬盘,所以同时读写是没有问题的。但是,使用多线程的程序似乎比其线性版本慢 2 倍(也在代码中)。我试图在运行一个块后不破坏读写线程,但同步似乎比当前版本更慢。我想知道我是否做错了什么或如何改进。谢谢。

使用 i3-2100 @ 3.1ghz 和 16GB 内存进行测试。

#include <iostream>
#include <fstream>
#include <ctime>
#include <thread>

#define CHUNKSIZE 8192    //size of each chunk to process
#define DATASIZE 2097152  //total size of data

using namespace std;

int data[3][CHUNKSIZE];
int run = 0;
int totalRun = DATASIZE/CHUNKSIZE;

bool finishRead = false, finishWrite = false;

ifstream infile;
ofstream outfile;

clock_t starttime, endtime;

/*
    Process a chunk of data(simulate only, does not require to sort all data)
*/
void quickSort(int arr[], int left, int right) 

    int i = left, j = right;
    int tmp;
    int pivot = arr[(left + right) / 2];

    while (i <= j) 
        while (arr[i] < pivot) i++;
        while (arr[j] > pivot) j--;
        if (i <= j) 
            tmp = arr[i];
            arr[i] = arr[j];
            arr[j] = tmp;
            i++;
            j--;
        
    ;

    if (left < j) quickSort(arr, left, j);
    if (i < right) quickSort(arr, i, right);


/*
    Find runtime
*/
void diffclock()
    double diff = (endtime - starttime)/(CLOCKS_PER_SEC/1000);
    cout<<"Total run time: "<<diff<<"ms"<<endl;


/*
    Read a chunk of data
*/
void readData()

    for(int i = 0; i < CHUNKSIZE; i++)
        infile>>data[run%3][i];
    
    finishRead = true;



/*
    Write a chunk of data
*/
void writeData()

    for(int i = 0; i < CHUNKSIZE; i++)
        outfile<<data[(run-2)%3][i]<<endl;
    
    finishWrite = true;


/*
    Pipelines Read, Process, Write using multithread
*/
void threadtransfer()

    starttime = clock();

    infile.open("/home/pcg/test/iothread/source.txt");
    outfile.open("/media/pcg/Data/test/iothread/ThreadDuplicate.txt");

    thread read, write;

    run = 0;
    readData();

    run = 1;
    readData();
    quickSort(data[(run-1)%3], 0, CHUNKSIZE - 1);

    run = 2;
    while(run < totalRun)
        //cout<<run<<endl;
        finishRead = finishWrite = false;
        read = thread(readData);
        write = thread(writeData);
        read.detach();
        write.detach();
        quickSort(data[(run-1)%3], 0, CHUNKSIZE - 1);
        while(!finishRead||!finishWrite)  //check if next cycle is ready.
        run++;
    


    quickSort(data[(run-1)%3], 0, CHUNKSIZE - 1);
    writeData();

    run++;
    writeData();

    infile.close();
    outfile.close();

    endtime = clock();
    diffclock();


/*
    Linearly read, sort, and write a chunk and repeat.
*/
void lineartransfer()

    int totalRun = DATASIZE/CHUNKSIZE;
    int holder[CHUNKSIZE];
    starttime = clock();

    infile.open("/home/pcg/test/iothread/source.txt");
    outfile.open("/media/pcg/Data/test/iothread/Linearduplicate.txt");

    run = 0;

    while(run < totalRun)

        for(int i = 0; i < CHUNKSIZE; i++) infile>>holder[i];
        quickSort(holder, 0, CHUNKSIZE - 1);
        for(int i = 0; i < CHUNKSIZE; i++) outfile<<holder[i]<<endl;
        run++;
    

    endtime = clock();
    diffclock();


/*
    Create large amount of data for testing
*/
void createData()
    outfile.open("/home/pcg/test/iothread/source.txt");

    for(int i = 0; i < DATASIZE; i++)
        outfile<<rand()<<endl;

    
    outfile.close();




int main()

    int mode=0;
    cout<<"Number of threads: "<<thread::hardware_concurrency()<<endl;
    cout<<"Enter mode\n1.Create Data\n2.thread copy\n3.linear copy\ninput mode:";
    cin>>mode;

    if(mode == 1) createData();
    else if(mode == 2) threadtransfer();
    else if(mode == 3) lineartransfer();

    return 0;

【问题讨论】:

【参考方案1】:

不要忙着等待。这浪费了宝贵的 CPU 时间,并且很可能会减慢其余时间(更不用说编译器可以将其优化为无限循环,因为它无法猜测这些标志是否会改变,所以一开始它甚至都不正确)。也不要detach()。将detach() 和忙等待都替换为join()

while (run < totalRun) 
    read = thread(readData);
    write = thread(writeData);
    quickSort(data[(run-1)%3], 0, CHUNKSIZE - 1);
    read.join();
    write.join();
    run++;


至于全局设计,好吧,如果您不希望处理 (quickSort) 部分超过读/写时间,我想忽略全局变量是可以接受的。我会使用消息队列在各个线程之间传递缓冲区(如果需要,可以添加更多的处理线程,并行执行相同的任务或按顺序执行不同的任务)但也许那是因为我习惯了这样做。

【讨论】:

【参考方案2】:

由于您是在 Linux 机器上使用 clock 来测量时间,我希望无论您运行一个线程还是多个线程,总 CPU 时间(大致)相同。

也许您想改用time myprog?或者使用gettimeofday 来获取时间(这会给你一个以秒+纳秒为单位的时间[尽管纳秒可能不“准确”到最后一位数字]。

编辑: 接下来,写入文件时不要使用endl。它使事情变慢了很多,因为 C++ 运行时会刷新到文件,这是一个操作系统调用。几乎可以肯定,它以某种方式防止了多个线程,因此您有三个线程一次同步地写数据,一行。很可能需要运行单个线程的近 3 倍。另外,不要从三个不同的线程写入同一个文件——这会以一种或另一种方式出现问题。

【讨论】:

不过总体来说,运行时间还是比线性版本慢很多。 我明白了。感谢您的建议。时钟似乎确实是个问题。如果不查看时间输出,两者似乎确实在相似的时间运行。 你的文件处理肯定也有问题,我保证!【参考方案3】:

如果我错了,请纠正我,但您的线程函数似乎基本上是一个线性函数,其工作量是线性函数的 3 倍?

在线程程序中,您将创建三个线程并在每个线程上运行一次 readData/quicksort 函数(分配您的工作量),但在您的程序中,线程模拟似乎实际上只是读取三遍,快速排序三遍,写了三遍,把每三遍的时间加起来。

【讨论】:

在threadtransfer函数中,While循环外的函数调用用于设置/完成管道,因为您需要先将数据读入内存,然后才能在下一个周期处理它。只有在 while 循环部分内,所有 3 个进程才会同时运行。

以上是关于并行程序没有速度增加与线性程序的主要内容,如果未能解决你的问题,请参考以下文章

小技巧matlab中进行并行运算仿真加快仿真速度

巧妙拆分bolt提升Storm集群吞吐量 增加并行处理速度 Storm & kafka处理实时日志实战topology经验谈

编程基础——加速度控制指令

如何控制 Airflow 安装的并行性或并发性?

资源的调度——线程池

串口与并口的区别