高效实现多线程排序算法的关键是啥?天真的实现无法正常工作

Posted

技术标签:

【中文标题】高效实现多线程排序算法的关键是啥?天真的实现无法正常工作【英文标题】:What is the key to implement multi thread sorting algorithm efficient? The naive implementations doesn't work properly高效实现多线程排序算法的关键是什么?天真的实现无法正常工作 【发布时间】:2016-07-10 18:48:36 【问题描述】:

我的第一个版本的双线程选择排序为每次迭代启动一个新线程。

一些比较(左 - 一个线程,右 - 两个线程版本,时间 - 毫秒):

Size: 500       time 1  time 5
Size: 1000      time 1  time 9
Size: 1500      time 4  time 12
Size: 2000      time 5  time 16
Size: 2500      time 10 time 22
Size: 3000      time 14 time 26
Size: 3500      time 19 time 30
Size: 4000      time 24 time 36
Size: 4500      time 30 time 43
Size: 5000      time 37 time 49
Size: 5500      time 46 time 57
Size: 6000      time 55 time 66
Size: 6500      time 63 time 76
Size: 7000      time 74 time 80
Size: 7500      time 85 time 92
Size: 8000      time 96 time 102
Size: 8500      time 108        time 109
Size: 9000      time 122        time 124
Size: 9500      time 135        time 132
Size: 10000     time 150        time 144
Size: 10500     time 165        time 156
Size: 11000     time 181        time 174
Size: 11500     time 200        time 177
Size: 12000     time 218        time 195
Size: 12500     time 235        time 205
Size: 13000     time 255        time 214
Size: 13500     time 273        time 226
Size: 14000     time 296        time 245

在 9500 大小的数组之后,两个线程工作得更快。 在我的第二个实现中,一个线程启动一次。但它的性能令人难以置信。

我的 CPU 有 4 个核心。

Size: 0 time 0  time 0
Size: 50        time 0  time 151
Size: 100       time 0  time 1276
Size: 150       time 0  time 2089
Size: 200       time 0  time 3925
Size: 250       time 0  time 5303

代码:

//one thread
template<class ItType>
void selectionSortThreadsHelper2(ItType beg, ItType end)

    //sorting element by element
    for (auto it = beg; it != end; ++it) 

        ItType middleIt = it + std::distance(it, end) / 2;

        auto search = [&]  return std::min_element(it, middleIt); ;
        //search
        std::future<ItType> minFirstHalfResult(std::async(std::launch::async , search));

        //wait searching
        ItType minSecondHalfIt = std::min_element(middleIt, end);
        ItType minFirstHalfIt = minFirstHalfResult.get();

        //swap if
        ItType minIt = *minFirstHalfIt < *minSecondHalfIt ? minFirstHalfIt : minSecondHalfIt;
        if (minIt != it)
            std::iter_swap(minIt, it);
    


//two thread
template<class ItType>
void selectionSortThreadsHelper3(ItType beg, ItType end)

    bool quit = false;
    bool readyFlag = false;
    bool processed = false;
    std::mutex readyMutex;
    std::condition_variable readyCondVar;

    ItType it;
    ItType middleIt;
    ItType minFirstHalfIt;
    auto search = [&]() 
        while (true) 
            std::unique_lock<std::mutex> ul(readyMutex);
            readyCondVar.wait(ul, [&] return readyFlag; );

            if (quit)
                return;

            minFirstHalfIt = std::min_element(it, middleIt);

            processed = true;

            ul.unlock();
            readyCondVar.notify_one();
        
    ;

    std::future<void> f(std::async(std::launch::async, search));

    //sorting element by element
    for (it = beg; it != end; ++it) 

        middleIt = it + std::distance(it, end) / 2;

        //say second thread to start searching
        
            std::lock_guard<std::mutex> lg(readyMutex);
            readyFlag = true;
        
        readyCondVar.notify_one();
        //std::this_thread::yield();

        ItType minSecondHalfIt = std::min_element(middleIt, end);

        //wait second thread
        
            std::unique_lock<std::mutex> ul(readyMutex);
            readyCondVar.wait(ul, [&]  return processed; );
            processed = false;
            readyFlag = false;
        
        readyCondVar.notify_all();

        //swap if
        ItType minIt = *minFirstHalfIt < *minSecondHalfIt ? minFirstHalfIt : minSecondHalfIt;
        if (minIt != it)
            std::iter_swap(minIt, it);
    

    //quit thread
    
        std::lock_guard<std::mutex> lg(readyMutex);
        readyFlag = true;
        quit = true;
    
    readyCondVar.notify_all();

    f.get();

【问题讨论】:

关键是正确地平衡可用内核的数量。 您的代码实际上根本没有使用线程。您调用std::async 来创建未来,然后立即等待未来完成并交付结果。这与没有线程或期货的顺序搜索没有什么不同。 哦,我明白了。我解决了它,两个线程版本在 9000 大小的数组和更多的情况下工作得更快。但是第二个实现呢? 您可能对std::experimental::parallel::sort感兴趣 【参考方案1】:

有很多方法可以在不编写自己的情况下获得并行排序。首先,有一个实验性的并行命名空间,可以让你说sort(par, data.begin(), data.end()):http://en.cppreference.com/w/cpp/experimental/parallelism/existing#sort

该命名空间正在被合并到 C++17 中的标准中,因此它应该在某个时候位于 std:: 命名空间中 (https://parallelstl.codeplex.com/)。还有一个基于 OpenMP 的较旧的非标准 GNU g++ 并行排序实现:https://gcc.gnu.org/onlinedocs/libstdc++/manual/parallel_mode.html

最后,网上有很多页面描述了如何在 C++11 中编写自己的并行排序。尝试搜索。这是一个非常全面的页面:https://software.intel.com/en-us/articles/a-parallel-stable-sort-using-c11-for-tbb-cilk-plus-and-openmp

【讨论】:

谢谢,但为了学习目的自己编写代码很有趣。我明白这似乎并不容易:(【参考方案2】:

使用 Windows 线程接口的多线程自下而上合并排序示例,在这种情况下,4 个线程意味着具有 4 个(或更多)内核的处理器。根据数组的大小,它的速度大约是单线程合并排序的 3 倍,这主要是由于每个内核的本地 L1 和 L2 缓存中发生的操作。信号量用于同时启动所有线程以进行基准测试。在我的系统(Intel 2600K 3.4ghz)上,排序 1600 万个 32 位整数大约需要 0.5 秒,而单线程合并排序大约需要 1.5 秒。

#include <cstdlib>
#include <ctime>
#include <iostream>
#include <windows.h>

#define SIZE (16*1024*1024)             // must be multiple of 4

static HANDLE hs0;                      // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;
static HANDLE ht1;                      // thread handles
static HANDLE ht2;
static HANDLE ht3;

static DWORD WINAPI Thread0(LPVOID);    // thread functions
static DWORD WINAPI Thread1(LPVOID);
static DWORD WINAPI Thread2(LPVOID);
static DWORD WINAPI Thread3(LPVOID);

static int  *pa;                        // pointers to buffers
static int  *pb;

void BottomUpMergeSort(int a[], int b[], size_t n);
void BottomUpMerge(int a[], int b[], size_t ll, size_t rr, size_t ee);
void BottomUpCopy(int a[], int b[], size_t ll, size_t rr);
size_t GetPassCount(size_t n);

int main()

int *array = new int[SIZE];
int *buffer = new int[SIZE];
clock_t ctTimeStart;                    // clock values
clock_t ctTimeStop;
    pa = array;
    pb = buffer;
    for(int i = 0; i < SIZE; i++)      // generate pseudo random data
        int r;
        r  = (((int)((rand()>>4) & 0xff))<< 0);
        r += (((int)((rand()>>4) & 0xff))<< 8);
        r += (((int)((rand()>>4) & 0xff))<<16);
        r += (((int)((rand()>>4) & 0x7f))<<24);
        array[i] = r;
    

    hs0 = CreateSemaphore(NULL,0,1,NULL);
    hs1 = CreateSemaphore(NULL,0,1,NULL);
    hs2 = CreateSemaphore(NULL,0,1,NULL);
    hs3 = CreateSemaphore(NULL,0,1,NULL);
    ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0);
    ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
    ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);

    ctTimeStart = clock();
    ReleaseSemaphore(hs0, 1, NULL);     // start sorts
    ReleaseSemaphore(hs1, 1, NULL);
    ReleaseSemaphore(hs2, 1, NULL);
    ReleaseSemaphore(hs3, 1, NULL);
    Thread0((LPVOID)NULL);
    WaitForSingleObject(ht2, INFINITE);
    // merge 1st and 2nd halves
    BottomUpMerge(pb, pa, 0, SIZE>>1, SIZE);
    ctTimeStop = clock();
    std::cout << "Number of ticks " << (ctTimeStop - ctTimeStart) << std::endl;

    for(int i = 1; i < SIZE; i++)      // check result 
        if(array[i-1] > array[i])
            std::cout << "failed" << std::endl;
        
    
    CloseHandle(ht3);
    CloseHandle(ht2);
    CloseHandle(ht1);
    CloseHandle(hs3);
    CloseHandle(hs2);
    CloseHandle(hs1);
    CloseHandle(hs0);
    delete[] buffer;
    delete[] array;
    return 0;


static DWORD WINAPI Thread0(LPVOID lpvoid)

    WaitForSingleObject(hs0, INFINITE); // wait for semaphore
    // sort 1st quarter
    BottomUpMergeSort(pa + 0*(SIZE>>2), pb + 0*(SIZE>>2), SIZE>>2);
    WaitForSingleObject(ht1, INFINITE); // wait for thead 1
    // merge 1st and 2nd quarter
    BottomUpMerge(pa + 0*(SIZE>>1), pb + 0*(SIZE>>1), 0, SIZE>>2, SIZE>>1);
    return 0;


static DWORD WINAPI Thread1(LPVOID lpvoid)

    WaitForSingleObject(hs1, INFINITE); // wait for semaphore
    // sort 2nd quarter
    BottomUpMergeSort(pa + 1*(SIZE>>2), pb + 1*(SIZE>>2), SIZE>>2);
    return 0;


static DWORD WINAPI Thread2(LPVOID lpvoid)

    WaitForSingleObject(hs2, INFINITE); // wait for semaphore
    // sort 3rd quarter
    BottomUpMergeSort(pa + 2*(SIZE>>2), pb + 2*(SIZE>>2), SIZE>>2);
    WaitForSingleObject(ht3, INFINITE); // wait for thread 3
    // merge 3rd and 4th quarter
    BottomUpMerge(pa + 1*(SIZE>>1), pb + 1*(SIZE>>1), 0, SIZE>>2, SIZE>>1);
    return 0;


static DWORD WINAPI Thread3(LPVOID lpvoid)

    WaitForSingleObject(hs3, INFINITE); // wait for semaphore
    // sort 4th quarter
    BottomUpMergeSort(pa + 3*(SIZE>>2), pb + 3*(SIZE>>2), SIZE>>2);
    return 0;


void BottomUpMergeSort(int a[], int b[], size_t n)

size_t s = 1;                               // run size 
    if(GetPassCount(n) & 1)                // if odd number of passes
        for(s = 1; s < n; s += 2)           // swap in place for 1st pass
            if(a[s] < a[s-1])
                std::swap(a[s], a[s-1]);
        s = 2;
    
    while(s < n)                           // while not done
        size_t ee = 0;                      // reset end index
        while(ee < n)                      // merge pairs of runs
            size_t ll = ee;                 // ll = start of left  run
            size_t rr = ll+s;               // rr = start of right run
            if(rr >= n)                    // if only left run
                rr = n;
                BottomUpCopy(a, b, ll, rr); //   copy left run
                break;                      //   end of pass
            
            ee = rr+s;                      // ee = end of right run
            if(ee > n)
                ee = n;
            BottomUpMerge(a, b, ll, rr, ee);
        
        std::swap(a, b);                    // swap a and b
        s <<= 1;                            // double the run size
    


void BottomUpMerge(int a[], int b[], size_t ll, size_t rr, size_t ee)

    size_t o = ll;                          // b[]       index
    size_t l = ll;                          // a[] left  index
    size_t r = rr;                          // a[] right index
    while(1)                               // merge data
        if(a[l] <= a[r])                   // if a[l] <= a[r]
            b[o++] = a[l++];                //   copy a[l]
            if(l < rr)                      //   if not end of left run
                continue;                   //     continue (back to while)
            do                              //   else copy rest of right run
                b[o++] = a[r++];
            while(r < ee);
            break;                          //     and return
         else                             // else a[l] > a[r]
            b[o++] = a[r++];                //   copy a[r]
            if(r < ee)                      //   if not end of right run
                continue;                   //     continue (back to while)
            do                              //   else copy rest of left run
                b[o++] = a[l++];
            while(l < rr);
            break;                          //     and return
        
    


void BottomUpCopy(int a[], int b[], size_t ll, size_t rr)

    do                                      // copy left run
        b[ll] = a[ll];
    while(++ll < rr);


size_t GetPassCount(size_t n)               // return # passes

    size_t i = 0;
    for(size_t s = 1; s < n; s <<= 1)
        i += 1;
    return(i);

【讨论】:

以上是关于高效实现多线程排序算法的关键是啥?天真的实现无法正常工作的主要内容,如果未能解决你的问题,请参考以下文章

希尔排序的介绍和希尔排序基本思想以及代码实现

排序算法——希尔排序

希尔排序

排序算法之希尔排序

图解排序算法之希尔排序

图解排序算法之希尔排序