高效实现多线程排序算法的关键是啥?天真的实现无法正常工作
Posted
技术标签:
【中文标题】高效实现多线程排序算法的关键是啥?天真的实现无法正常工作【英文标题】:What is the key to implement multi thread sorting algorithm efficient? The naive implementations doesn't work properly高效实现多线程排序算法的关键是什么?天真的实现无法正常工作 【发布时间】:2016-07-10 18:48:36 【问题描述】:我的第一个版本的双线程选择排序为每次迭代启动一个新线程。
一些比较(左 - 一个线程,右 - 两个线程版本,时间 - 毫秒):
Size: 500 time 1 time 5
Size: 1000 time 1 time 9
Size: 1500 time 4 time 12
Size: 2000 time 5 time 16
Size: 2500 time 10 time 22
Size: 3000 time 14 time 26
Size: 3500 time 19 time 30
Size: 4000 time 24 time 36
Size: 4500 time 30 time 43
Size: 5000 time 37 time 49
Size: 5500 time 46 time 57
Size: 6000 time 55 time 66
Size: 6500 time 63 time 76
Size: 7000 time 74 time 80
Size: 7500 time 85 time 92
Size: 8000 time 96 time 102
Size: 8500 time 108 time 109
Size: 9000 time 122 time 124
Size: 9500 time 135 time 132
Size: 10000 time 150 time 144
Size: 10500 time 165 time 156
Size: 11000 time 181 time 174
Size: 11500 time 200 time 177
Size: 12000 time 218 time 195
Size: 12500 time 235 time 205
Size: 13000 time 255 time 214
Size: 13500 time 273 time 226
Size: 14000 time 296 time 245
在 9500 大小的数组之后,两个线程工作得更快。 在我的第二个实现中,一个线程启动一次。但它的性能令人难以置信。
我的 CPU 有 4 个核心。
Size: 0 time 0 time 0
Size: 50 time 0 time 151
Size: 100 time 0 time 1276
Size: 150 time 0 time 2089
Size: 200 time 0 time 3925
Size: 250 time 0 time 5303
代码:
//one thread
template<class ItType>
void selectionSortThreadsHelper2(ItType beg, ItType end)
//sorting element by element
for (auto it = beg; it != end; ++it)
ItType middleIt = it + std::distance(it, end) / 2;
auto search = [&] return std::min_element(it, middleIt); ;
//search
std::future<ItType> minFirstHalfResult(std::async(std::launch::async , search));
//wait searching
ItType minSecondHalfIt = std::min_element(middleIt, end);
ItType minFirstHalfIt = minFirstHalfResult.get();
//swap if
ItType minIt = *minFirstHalfIt < *minSecondHalfIt ? minFirstHalfIt : minSecondHalfIt;
if (minIt != it)
std::iter_swap(minIt, it);
//two thread
template<class ItType>
void selectionSortThreadsHelper3(ItType beg, ItType end)
bool quit = false;
bool readyFlag = false;
bool processed = false;
std::mutex readyMutex;
std::condition_variable readyCondVar;
ItType it;
ItType middleIt;
ItType minFirstHalfIt;
auto search = [&]()
while (true)
std::unique_lock<std::mutex> ul(readyMutex);
readyCondVar.wait(ul, [&] return readyFlag; );
if (quit)
return;
minFirstHalfIt = std::min_element(it, middleIt);
processed = true;
ul.unlock();
readyCondVar.notify_one();
;
std::future<void> f(std::async(std::launch::async, search));
//sorting element by element
for (it = beg; it != end; ++it)
middleIt = it + std::distance(it, end) / 2;
//say second thread to start searching
std::lock_guard<std::mutex> lg(readyMutex);
readyFlag = true;
readyCondVar.notify_one();
//std::this_thread::yield();
ItType minSecondHalfIt = std::min_element(middleIt, end);
//wait second thread
std::unique_lock<std::mutex> ul(readyMutex);
readyCondVar.wait(ul, [&] return processed; );
processed = false;
readyFlag = false;
readyCondVar.notify_all();
//swap if
ItType minIt = *minFirstHalfIt < *minSecondHalfIt ? minFirstHalfIt : minSecondHalfIt;
if (minIt != it)
std::iter_swap(minIt, it);
//quit thread
std::lock_guard<std::mutex> lg(readyMutex);
readyFlag = true;
quit = true;
readyCondVar.notify_all();
f.get();
【问题讨论】:
关键是正确地平衡可用内核的数量。 您的代码实际上根本没有使用线程。您调用std::async
来创建未来,然后立即等待未来完成并交付结果。这与没有线程或期货的顺序搜索没有什么不同。
哦,我明白了。我解决了它,两个线程版本在 9000 大小的数组和更多的情况下工作得更快。但是第二个实现呢?
您可能对std::experimental::parallel::sort感兴趣
【参考方案1】:
有很多方法可以在不编写自己的情况下获得并行排序。首先,有一个实验性的并行命名空间,可以让你说sort(par, data.begin(), data.end())
:http://en.cppreference.com/w/cpp/experimental/parallelism/existing#sort
该命名空间正在被合并到 C++17 中的标准中,因此它应该在某个时候位于 std::
命名空间中 (https://parallelstl.codeplex.com/)。还有一个基于 OpenMP 的较旧的非标准 GNU g++ 并行排序实现:https://gcc.gnu.org/onlinedocs/libstdc++/manual/parallel_mode.html
最后,网上有很多页面描述了如何在 C++11 中编写自己的并行排序。尝试搜索。这是一个非常全面的页面:https://software.intel.com/en-us/articles/a-parallel-stable-sort-using-c11-for-tbb-cilk-plus-and-openmp
【讨论】:
谢谢,但为了学习目的自己编写代码很有趣。我明白这似乎并不容易:(【参考方案2】:使用 Windows 线程接口的多线程自下而上合并排序示例,在这种情况下,4 个线程意味着具有 4 个(或更多)内核的处理器。根据数组的大小,它的速度大约是单线程合并排序的 3 倍,这主要是由于每个内核的本地 L1 和 L2 缓存中发生的操作。信号量用于同时启动所有线程以进行基准测试。在我的系统(Intel 2600K 3.4ghz)上,排序 1600 万个 32 位整数大约需要 0.5 秒,而单线程合并排序大约需要 1.5 秒。
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <windows.h>
#define SIZE (16*1024*1024) // must be multiple of 4
static HANDLE hs0; // semaphore handles
static HANDLE hs1;
static HANDLE hs2;
static HANDLE hs3;
static HANDLE ht1; // thread handles
static HANDLE ht2;
static HANDLE ht3;
static DWORD WINAPI Thread0(LPVOID); // thread functions
static DWORD WINAPI Thread1(LPVOID);
static DWORD WINAPI Thread2(LPVOID);
static DWORD WINAPI Thread3(LPVOID);
static int *pa; // pointers to buffers
static int *pb;
void BottomUpMergeSort(int a[], int b[], size_t n);
void BottomUpMerge(int a[], int b[], size_t ll, size_t rr, size_t ee);
void BottomUpCopy(int a[], int b[], size_t ll, size_t rr);
size_t GetPassCount(size_t n);
int main()
int *array = new int[SIZE];
int *buffer = new int[SIZE];
clock_t ctTimeStart; // clock values
clock_t ctTimeStop;
pa = array;
pb = buffer;
for(int i = 0; i < SIZE; i++) // generate pseudo random data
int r;
r = (((int)((rand()>>4) & 0xff))<< 0);
r += (((int)((rand()>>4) & 0xff))<< 8);
r += (((int)((rand()>>4) & 0xff))<<16);
r += (((int)((rand()>>4) & 0x7f))<<24);
array[i] = r;
hs0 = CreateSemaphore(NULL,0,1,NULL);
hs1 = CreateSemaphore(NULL,0,1,NULL);
hs2 = CreateSemaphore(NULL,0,1,NULL);
hs3 = CreateSemaphore(NULL,0,1,NULL);
ht1 = CreateThread(NULL, 0, Thread1, 0, 0, 0);
ht2 = CreateThread(NULL, 0, Thread2, 0, 0, 0);
ht3 = CreateThread(NULL, 0, Thread3, 0, 0, 0);
ctTimeStart = clock();
ReleaseSemaphore(hs0, 1, NULL); // start sorts
ReleaseSemaphore(hs1, 1, NULL);
ReleaseSemaphore(hs2, 1, NULL);
ReleaseSemaphore(hs3, 1, NULL);
Thread0((LPVOID)NULL);
WaitForSingleObject(ht2, INFINITE);
// merge 1st and 2nd halves
BottomUpMerge(pb, pa, 0, SIZE>>1, SIZE);
ctTimeStop = clock();
std::cout << "Number of ticks " << (ctTimeStop - ctTimeStart) << std::endl;
for(int i = 1; i < SIZE; i++) // check result
if(array[i-1] > array[i])
std::cout << "failed" << std::endl;
CloseHandle(ht3);
CloseHandle(ht2);
CloseHandle(ht1);
CloseHandle(hs3);
CloseHandle(hs2);
CloseHandle(hs1);
CloseHandle(hs0);
delete[] buffer;
delete[] array;
return 0;
static DWORD WINAPI Thread0(LPVOID lpvoid)
WaitForSingleObject(hs0, INFINITE); // wait for semaphore
// sort 1st quarter
BottomUpMergeSort(pa + 0*(SIZE>>2), pb + 0*(SIZE>>2), SIZE>>2);
WaitForSingleObject(ht1, INFINITE); // wait for thead 1
// merge 1st and 2nd quarter
BottomUpMerge(pa + 0*(SIZE>>1), pb + 0*(SIZE>>1), 0, SIZE>>2, SIZE>>1);
return 0;
static DWORD WINAPI Thread1(LPVOID lpvoid)
WaitForSingleObject(hs1, INFINITE); // wait for semaphore
// sort 2nd quarter
BottomUpMergeSort(pa + 1*(SIZE>>2), pb + 1*(SIZE>>2), SIZE>>2);
return 0;
static DWORD WINAPI Thread2(LPVOID lpvoid)
WaitForSingleObject(hs2, INFINITE); // wait for semaphore
// sort 3rd quarter
BottomUpMergeSort(pa + 2*(SIZE>>2), pb + 2*(SIZE>>2), SIZE>>2);
WaitForSingleObject(ht3, INFINITE); // wait for thread 3
// merge 3rd and 4th quarter
BottomUpMerge(pa + 1*(SIZE>>1), pb + 1*(SIZE>>1), 0, SIZE>>2, SIZE>>1);
return 0;
static DWORD WINAPI Thread3(LPVOID lpvoid)
WaitForSingleObject(hs3, INFINITE); // wait for semaphore
// sort 4th quarter
BottomUpMergeSort(pa + 3*(SIZE>>2), pb + 3*(SIZE>>2), SIZE>>2);
return 0;
void BottomUpMergeSort(int a[], int b[], size_t n)
size_t s = 1; // run size
if(GetPassCount(n) & 1) // if odd number of passes
for(s = 1; s < n; s += 2) // swap in place for 1st pass
if(a[s] < a[s-1])
std::swap(a[s], a[s-1]);
s = 2;
while(s < n) // while not done
size_t ee = 0; // reset end index
while(ee < n) // merge pairs of runs
size_t ll = ee; // ll = start of left run
size_t rr = ll+s; // rr = start of right run
if(rr >= n) // if only left run
rr = n;
BottomUpCopy(a, b, ll, rr); // copy left run
break; // end of pass
ee = rr+s; // ee = end of right run
if(ee > n)
ee = n;
BottomUpMerge(a, b, ll, rr, ee);
std::swap(a, b); // swap a and b
s <<= 1; // double the run size
void BottomUpMerge(int a[], int b[], size_t ll, size_t rr, size_t ee)
size_t o = ll; // b[] index
size_t l = ll; // a[] left index
size_t r = rr; // a[] right index
while(1) // merge data
if(a[l] <= a[r]) // if a[l] <= a[r]
b[o++] = a[l++]; // copy a[l]
if(l < rr) // if not end of left run
continue; // continue (back to while)
do // else copy rest of right run
b[o++] = a[r++];
while(r < ee);
break; // and return
else // else a[l] > a[r]
b[o++] = a[r++]; // copy a[r]
if(r < ee) // if not end of right run
continue; // continue (back to while)
do // else copy rest of left run
b[o++] = a[l++];
while(l < rr);
break; // and return
void BottomUpCopy(int a[], int b[], size_t ll, size_t rr)
do // copy left run
b[ll] = a[ll];
while(++ll < rr);
size_t GetPassCount(size_t n) // return # passes
size_t i = 0;
for(size_t s = 1; s < n; s <<= 1)
i += 1;
return(i);
【讨论】:
以上是关于高效实现多线程排序算法的关键是啥?天真的实现无法正常工作的主要内容,如果未能解决你的问题,请参考以下文章