C++ 并行就地基数排序
Posted
技术标签:
【中文标题】C++ 并行就地基数排序【英文标题】:C++ parallel in place radix sort 【发布时间】:2016-03-07 13:30:07 【问题描述】:我正在尝试使用基数 256 实现并行就地基数排序。在我看来,函数 srt 在单线程中运行良好。但是,当更多线程用于随机数据时,有时会出现错误:“访问冲突读取位置”后跟函数 srt 的“标记 [索引]”地址。它在函数 srt 中的第 15 行中断,即“tmp = marker[index]”,其中 index 的值为 63。任何人都可以解释发生了什么吗?
inline
void count(unsigned* list, int size, int* histogram)
for (int i = 0; i < size; ++i)
++histogram[(list[i]>>24)];
void srt(int* histogram, unsigned** marker)
static const int bin_size = 256;
int left = histogram[0];
int index;
int tmp;
while (left-- > 0)
index = *marker[0] >> 24;
while (index != 0)
tmp = *marker[index];
*marker[index]++ = *marker[0];
*marker[0] = tmp;
index = *marker[0] >> 24;
++marker[0];
for (int k = 1; k < bin_size; ++k)
left = histogram[k] - (marker[k] - marker[k - 1]);
while (left-- > 0)
index = *marker[k] >> 24;
while (index != k)
tmp = *marker[index];
*marker[index]++ = *marker[k];
*marker[k] = tmp;
index = *marker[k] >> 24;
++marker[k];
void parallel_sort(unsigned* list, int size)
//Build histogram
static const int bin_size = 256;
int histogram[bin_size] = 0 ;
int histogram1[bin_size] = 0 ;
int histogram2[bin_size] = 0 ;
int histogram3[bin_size] = 0 ;
const int partial_size = size / 4;
count(list, partial_size, histogram);
count(&list[partial_size], partial_size, histogram1);
count(&list[2 * partial_size], partial_size, histogram2);
count(&list[3 * partial_size], partial_size + (size % 4), histogram3);
unsigned int* marker[bin_size];
unsigned int* marker1[bin_size];
unsigned int* marker2[bin_size];
unsigned int* marker3[bin_size];
unsigned int* previous = list;
for ( int i = 0; i < bin_size; ++i )
marker[i] = previous;
marker1[i] = marker[i] + histogram[i];
marker2[i] = marker1[i] + histogram1[i];
marker3[i] = marker2[i] + histogram2[i];
previous = marker3[i] + histogram3[i];
//Breaks in srt in any of those threads
thread t21(srt, histogram1, marker1);
thread t22(srt, histogram2, marker2);
thread t23(srt, histogram3, marker3);
srt(histogram, marker);
t21.join();
t22.join();
t23.join();
//TODO
int main()
const int size = 100000;
unsigned list[size];
srand(time(NULL));
for (int i = 0;i < size;++i)
list[i] = rand()*rand();
parallel_sort(list, size);
我正在使用 i3 dell、Windows 10、Visual Studio 2015 和以下选项
/MP /GS /analyze- /W3 /Zc:wchar_t /ZI /Gm /Od /sdl /Fd"Debug\vc140.pdb" /Zc:inline /fp:precise /D "_MBCS" /errorReport:prompt /WX- /Zc:forScope /RTC1 /Gd /Oy- /MDd /Fa"Debug\" /EHsc /nologo /Fo"Debug\" /Fp"
【问题讨论】:
如果你可以调试你的代码(带断点),它会更容易找到它崩溃的地方 在函数 srt 中,在索引为 63 的 "tmp=*marker[index]" 中。 嗯,您正在使用list
初始化您的 marker
。那么你在哪里声明list
?我认为这是因为您正在做marker2[i] = marker1[i] + histogram1[i];
和类似的行。想象一下,如果histogram1[i]
有一个非常大的值,使marker2[i]
指向list
之外的东西会怎样?
感谢帮助。这个带有标记的想法是错误的。
你可以使用这种基数排序并忘记它codereview.stackexchange.com/questions/255443/…
【参考方案1】:
问题出在这里:
int main()
const int size = 100000;
unsigned list[size];
srand(time(NULL));
for (int i = 0;i < size;++i)
list[i] = rand()*rand();
parallel_sort(list, size);
如果list[i] = rand()*rand();
产生的数字大于size
100,000 怎么办?您的marker
数组指针绝对会超出list
的界限,这就是您的程序崩溃的原因。
总而言之,您应该确保所有要排序的值都在数组最大大小以下,因为您使用的是基数排序。
【讨论】:
请注意,这只是基数排序的第一遍,它基于前导 8 位构建直方图。 (它使用 >> 24 删除后续位。)这就是每个直方图的大小为 256 的原因。【参考方案2】:尝试下面的并行基数代码:
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <assert.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/time.h>
enum errors
INVALID_USAGE,
ERROR_OPENING_INPUT,
ERROR_OPENING_OUTPUT,
INVALID_FORMAT,
MEMORY_ERROR,
THREAD_CREATE_ERROR,
THREAD_JOIN_ERROR
;
const unsigned char KEY_LENGTH = 7;
const unsigned char CHARKEY_LENGTH = KEY_LENGTH + 1;
const unsigned short INPUT_SPACE = 256;
struct thread_info
unsigned char **input;
unsigned char **output;
unsigned short thread_idx;
unsigned int first_idx;
unsigned int last_idx;
pthread_barrier_t *barrier;
unsigned int *local_counters; // array of array of integer
unsigned int thread_count;
;
inline unsigned int coordinate(const unsigned int line)
return (line * CHARKEY_LENGTH);
// map each string of the memory array input to a pointer in output
inline void map_strings(unsigned char *const input, unsigned char
**output, const unsigned int size)
for(unsigned int i = 0; i < size; ++i)
output[i] = &(input[coordinate(i)]);
inline unsigned short core_count()
// Linux, Solaris, AIX, etc:
return (unsigned short)sysconf(_SC_NPROCESSORS_ONLN);
void print_usage(const char *const prog_name)
fprintf(stderr, "Usage: "
"%s input_file ouput_file\n", prog_name);
/*
counter the occurences of each char in input at radix i
first_idx and last_idx define the range of the input where the count
is needed
the result is stored in counter
*/
inline void count_char(const unsigned char *const *const input,
const unsigned int first_idx,
const unsigned int last_idx,
const unsigned int radix,
unsigned int *counter)
memset(counter, 0, INPUT_SPACE*sizeof(unsigned int));
for(unsigned int i = first_idx; i < last_idx; ++i)
const unsigned char c = input[i][radix];
++(counter[c]);
/* compute the offset of the current thread
local_counters: array of all coutners
thread_idx: current thread idx
thread_count: number of thread
offset: the array which is populated with the offset
*/
inline void compute_offset(const unsigned int *const local_counters,
const unsigned int thread_idx,
const unsigned int thread_count,
unsigned int *const offset)
unsigned int local_offset[INPUT_SPACE];
unsigned int global_counter[INPUT_SPACE];
for(unsigned int i = 0; i < INPUT_SPACE; ++i)
global_counter[i] = 0;
for(unsigned int thread_i = 0; thread_i < thread_count; ++thread_i)
if (thread_idx == thread_i)
local_offset[i] = global_counter[i];
const unsigned int value = local_counters[thread_i*INPUT_SPACE +
i];
global_counter[i] += value;
unsigned int previous_offset = 0;
for(unsigned int i = 1; i < INPUT_SPACE; ++i)
previous_offset += global_counter[i-1];
offset[i] = previous_offset + local_offset[i];
inline void sort_input(unsigned char *const *const input,
unsigned int *const offset,
const unsigned int first_idx,
const unsigned int last_idx,
const unsigned int radix,
unsigned char **const output)
for(unsigned int i = first_idx; i < last_idx; ++i)
const unsigned char c = input[i][radix];
const unsigned int current_offset = offset[c]++;
output[current_offset] = input[i];
void *concurrent_radix(void *arg)
thread_info *const info = (thread_info*)arg;
const unsigned short thread_idx = info->thread_idx;
const unsigned int first_idx = info->first_idx;
const unsigned int last_idx = info->last_idx;
const unsigned int thread_count = info->thread_count;
unsigned int *const local_counters = info->local_counters;
unsigned char **input = info->input;
unsigned char **output = info->output;
pthread_barrier_t *const barrier = info->barrier;
unsigned int *const counter = &(local_counters[thread_idx *
INPUT_SPACE]);
int radix = KEY_LENGTH - 1;
do
count_char(input, first_idx, last_idx, radix, counter);
pthread_barrier_wait(barrier);
unsigned int offset[INPUT_SPACE];
compute_offset(local_counters, thread_idx, thread_count, offset);
sort_input(input, offset, first_idx, last_idx, radix, output);
pthread_barrier_wait(barrier);
unsigned char **const temp = input;
input = output;
output = temp;
--radix;
while(radix >= 0);
return NULL;
// sort the radix index
inline void threaded_radix (unsigned char **input, unsigned char
**output, const unsigned int nb_keys)
const unsigned short nb_core = core_count();
pthread_t threads[nb_core];
thread_info threads_arg[nb_core];
unsigned int local_counters[nb_core * INPUT_SPACE];
pthread_barrier_t barrier;
pthread_barrier_init(&barrier, NULL, nb_core);
const unsigned int range = nb_keys / nb_core;
unsigned int last_idx = 0;
for (unsigned short i = 0; i < nb_core; ++i)
const unsigned int first_idx = last_idx;
last_idx = last_idx + range;
thread_info &info = threads_arg[i];
info.input = input;
info.output = output;
info.first_idx = first_idx;
info.last_idx = last_idx;
info.thread_idx = i;
info.thread_count = nb_core;
info.local_counters = local_counters;
info.barrier = &barrier;
threads_arg[nb_core-1].last_idx = nb_keys;
for (unsigned short i = 1; i < nb_core; ++i)
pthread_create(&threads[i], NULL, concurrent_radix, (void *)&
(threads_arg[i]));
concurrent_radix((void *)&(threads_arg[0]));
for (unsigned short i = 1; i < nb_core; ++i)
pthread_join(threads[i], NULL);
pthread_barrier_destroy(&barrier);
inline void radix_sort(unsigned char *input, unsigned char **output,
const unsigned int nb_keys)
unsigned char **buffer = (unsigned char **)malloc(nb_keys *
sizeof(unsigned char*));
map_strings(input, buffer, nb_keys);
threaded_radix(buffer, output, nb_keys);
free(buffer);
int main(const int argc, const char *const argv[])
if(argc < 3)
print_usage(argv[0]);
return INVALID_USAGE;
// import the data in a table
FILE *input = fopen(argv[1], "r");
if(!input)
const int errsv = errno;
fprintf(stderr, "%s: %s\n", argv[1], strerror(errsv));
print_usage(argv[0]);
return ERROR_OPENING_INPUT;
unsigned int input_size;
int converted = fscanf(input, "%d\n", &input_size);
if(converted != 1)
fprintf(stderr, "Invalid file format");
return INVALID_FORMAT;
unsigned char *input_table = (unsigned char *)malloc(input_size *
CHARKEY_LENGTH * sizeof(unsigned char));
unsigned char **output_table = (unsigned char **)malloc(input_size *
sizeof(unsigned char*));
if(!input_table || !output_table)
fprintf(stderr, "Error: not enough memory\n");
return MEMORY_ERROR;
for(unsigned int i = 0; i<input_size; ++i)
unsigned char * key = &(input_table[coordinate(i)]);
size_t size_read = fread(key, 1, CHARKEY_LENGTH, input);
key[KEY_LENGTH] = '\0';
if(size_read != CHARKEY_LENGTH)
if(feof(input))
assert(i == (input_size - 1));
else
fprintf(stderr, "Invalid file format");
return INVALID_FORMAT;
fclose(input);
// sort
struct timeval tick1, tick2;
gettimeofday(&tick1, NULL);
radix_sort(input_table, output_table, input_size);
gettimeofday(&tick2, NULL);
const double ellapsed = (tick2.tv_sec + tick2.tv_usec/1000000.0) -
(tick1.tv_sec + tick1.tv_usec/1000000.0);
printf("time for action = %g seconds\n", ellapsed);
// write the output
FILE *output = fopen(argv[2], "w+");
if(!input)
const int errsv = errno;
fprintf(stderr, "%s: %s\n", argv[2], strerror(errsv));
print_usage(argv[0]);
return ERROR_OPENING_OUTPUT;
fprintf(output, "%d\n", input_size);
for(unsigned int i = 0; i < input_size;++i)
fprintf(output, "%s\n", output_table[i]);
fclose(output);
free(input_table);
free(output_table);
return 0;
不要忘记添加选项 -pthread 来编译它。
【讨论】:
>-pthread
>Visual Studio 2015
... 我应该告诉为什么这对 OP 完全没用吗?以上是关于C++ 并行就地基数排序的主要内容,如果未能解决你的问题,请参考以下文章