如何优化间接基数排序? (又名如何优化不可预测的内存访问模式)
Posted
技术标签:
【中文标题】如何优化间接基数排序? (又名如何优化不可预测的内存访问模式)【英文标题】:How to optimize an indirect radix sort? (a.k.a. how to optimize unpredictable memory access patterns) 【发布时间】:2013-11-15 11:59:23 【问题描述】:我用 C++ 编写了一个间接基数排序算法(间接,我的意思是它返回项目的索引):
#include <algorithm>
#include <iterator>
#include <vector>
template<class It1, class It2>
void radix_ipass(
It1 begin, It1 const end,
It2 const a, size_t const i,
std::vector<std::vector<size_t> > &buckets)
size_t ncleared = 0;
for (It1 j = begin; j != end; ++j)
size_t const k = a[*j][i];
while (k >= ncleared && ncleared < buckets.size())
buckets[ncleared++].clear();
if (k >= buckets.size())
buckets.resize(k + 1);
ncleared = buckets.size();
buckets[k].push_back(size_t());
using std::swap; swap(buckets[k].back(), *j);
for (std::vector<std::vector<size_t> >::iterator
j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
begin = std::swap_ranges(j->begin(), j->end(), begin);
template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
for (ptrdiff_t i = 0; i != end - begin; ++i) items[i] = i;
size_t smax = 0;
for (It i = begin; i != end; ++i)
size_t const n = i->size();
smax = n > smax ? n : smax;
std::vector<std::vector<size_t> > buckets;
for (size_t i = 0; i != smax; ++i)
radix_ipass(
items, items + (end - begin),
begin, smax - i - 1, buckets);
当我使用以下代码测试它时,它的执行速度似乎比 std::sort
快 40% 左右(3920 毫秒对比 6530 毫秒):
#include <functional>
template<class Key>
struct key_comp : public Key
explicit key_comp(Key const &key = Key()) : Key(key)
template<class T>
bool operator()(T const &a, T const &b) const
return this->Key::operator()(a) < this->Key::operator()(b);
;
template<class Key>
key_comp<Key> make_key_comp(Key const &key) return key_comp<Key>(key);
template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
T1 operator()(T1 a, T2 const &b) const return a += b; ;
template<class F>
struct deref : public F
deref(F const &f) : F(f)
typename std::iterator_traits<
typename F::result_type
>::value_type const
&operator()(typename F::argument_type const &a) const
return *this->F::operator()(a);
;
template<class T> deref<T> make_deref(T const &t) return deref<T>(t);
size_t xorshf96(void) // random number generator
static size_t x = 123456789, y = 362436069, z = 521288629;
x ^= x << 16;
x ^= x >> 5;
x ^= x << 1;
size_t t = x;
x = y;
y = z;
z = t ^ x ^ y;
return z;
#include <stdio.h>
#include <time.h>
#include <array>
int main(void)
typedef std::vector<std::array<size_t, 3> > Items;
Items items(1 << 24);
std::vector<size_t> ranks(items.size() * 2);
for (size_t i = 0; i != items.size(); i++)
ranks[i] = i;
for (size_t j = 0; j != items[i].size(); j++)
items[i][j] = xorshf96() & 0xFFF;
clock_t const start = clock();
if (1) radix_isort(items.begin(), items.end(), ranks.begin());
else // STL sorting
std::sort(
ranks.begin(),
ranks.begin() + items.size(),
make_key_comp(make_deref(std::bind1st(
add<Items::const_iterator, ptrdiff_t>(),
items.begin()))));
printf("%u ms\n",
(unsigned)((clock() - start) * 1000 / CLOCKS_PER_SEC),
std::min(ranks.begin(), ranks.end()));
return 0;
嗯,我想这是我能做的最好的了,我想。
但是在多次撞墙之后,我意识到在radix_ipass
开头的预取可以帮助将结果减少到1440 ms(!):
#include <xmmintrin.h>
...
for (It1 j = begin; j != end; ++j)
#if defined(_MM_TRANSPOSE4_PS) // should be defined if xmmintrin.h is included
enum N = 8 ;
if (end - j > N)
_mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0);
#endif
...
显然,瓶颈是内存带宽——访问模式是不可预测的。
所以现在我的问题是:我还能做些什么来让它更快地处理类似数量的数据?
还是没有太大的改进空间?
(我希望尽可能避免损害代码的可读性,所以如果可读性受到损害,改进应该是显着的。)
【问题讨论】:
这个问题似乎是题外话,因为它应该在codereview.stackexchange.com @JoachimPileborg:不过,我并没有要求任何人“审查”我的代码。这是一个优化问题,而不是代码审查请求。 您可能想阅读what's on topic on codereview:“如果您正在从您的项目中寻找关于以下领域的特定工作代码的反馈......性能......那么你在正确的地方!” @JoachimPileborg:我认为这个问题与您在 *** 上看到的性能问题类型更相似,而不是您在 CodeReview.SE 上看到的类型。 最终的printf
似乎缺少一些输出(比格式说明符更多的参数)。另外,我发现make_key_comp(..)
很难阅读和推理。在 C++11 中,我会使用 lambda;否则为自定义函数对象。不过,这不会影响性能(我测试过)。 size_t
需要 <cstddef>
(+ using std::size_t;
)。我不太明白为什么在只存储 12 位信息时使用 size_t
作为值类型:xorshf96() & 0xFFF
(切换到 16 位 int 会稍微提高 std::sort
的性能)。
【参考方案1】:
使用结合等级和值的更紧凑的数据结构可以将std::sort
的性能提高 2-3 倍。本质上,排序现在在vector<pair<Value,Rank>>
上运行。为此,Value
数据类型 std::array<integer_type, 3>
已替换为更紧凑的 pair<uint32_t, uint8_t>
数据结构。它只有半个字节未被使用,<
比较可以分两步完成,首先使用uint32_t
s 的大概有效比较(尚不清楚std::array<..>::operator<
使用的循环是否可以优化为同样快速的代码,但是用这种数据结构替换 std::array<integer_type,3>
会产生另一个性能提升)。
不过,它的效率不如基数排序。 (也许您可以使用预取来调整自定义 QuickSort?)
除了额外的排序方法之外,我已将xorshf96
替换为mt19937
,因为我知道如何为后者提供种子;)
可以通过两个命令行参数更改种子和值的数量:首先是种子,然后是计数。
使用 g++ 4.9.0 20131022 编译,使用 -std=c++11 -march=native -O3
,用于 64 位 linux
样本运行; 重要提示在 Core2Duo 处理器 U9400 上运行(旧且慢!)
项目数:16000000 使用 std::sort 持续时间:12260 毫秒 结果排序:真 种子:5648 项目数:16000000 使用 std::sort 持续时间:12230 毫秒 结果排序:真 种子:5648 项目数:16000000 使用 std::sort 持续时间:12230 毫秒 结果排序:真 种子:5648 项目数:16000000 使用带有打包数据结构的 std::sort 持续时间:4290 毫秒 结果排序:真 种子:5648 项目数:16000000 使用带有打包数据结构的 std::sort 持续时间:4270 毫秒 结果排序:真 种子:5648 项目数:16000000 使用带有打包数据结构的 std::sort 持续时间:4280 毫秒 结果排序:真 项目数:16000000 使用基数排序 持续时间:3790 毫秒 结果排序:真 种子:5648 项目数:16000000 使用基数排序 持续时间:3820 毫秒 结果排序:真 种子:5648 项目数:16000000 使用基数排序 持续时间:3780 毫秒 结果排序:真新的或更改的代码:
template<class It>
struct fun_obj
It beg;
bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
return beg[lhs] < beg[rhs];
;
template<class It>
fun_obj<It> make_fun_obj(It beg)
return fun_obj<It>beg;
struct uint32p8_t
uint32_t m32;
uint8_t m8;
uint32p8_t(std::array<uint16_t, 3> const& a)
: m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
, m8( a[2]&0xFF )
operator std::array<size_t, 3>() const
return m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
(m32&0xF)<<8 | m8;
friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
if(lhs.m32 < rhs.m32) return true;
if(lhs.m32 > rhs.m32) return false;
return lhs.m8 < rhs.m8;
;
#include <stdio.h>
#include <time.h>
#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>
int main(int argc, char* argv[])
std::cout.sync_with_stdio(false);
constexpr auto items_count_default = 2<<22;
constexpr auto seed_default = 42;
uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
std::cout << "seed: " << seed << "\n";
size_t const items_count = argc > 2 ? std::atoll(argv[2])
: items_count_default;
std::cout << "item count: " << items_count << "\n";
using Items_array_value_t =
#ifdef RADIX_SORT
size_t
#elif defined(STDSORT)
uint16_t
#elif defined(STDSORT_PACKED)
uint16_t
#endif
;
typedef std::vector<std::array<Items_array_value_t, 3> > Items;
Items items(items_count);
auto const ranks_count =
#ifdef RADIX_SORT
items.size() * 2
#elif defined(STDSORT)
items.size()
#elif defined(STDSORT_PACKED)
items.size()
#endif
;
//auto prng = xorshf96;
std::mt19937 gen(seed);
std::uniform_int_distribution<> dist;
auto prng = [&dist, &gen]return dist(gen);;
std::vector<size_t> ranks(ranks_count);
for (size_t i = 0; i != items.size(); i++)
ranks[i] = i;
for (size_t j = 0; j != items[i].size(); j++)
items[i][j] = prng() & 0xFFF;
std::cout << "using ";
clock_t const start = clock();
#ifdef RADIX_SORT
std::cout << "radix sort\n";
radix_isort(items.begin(), items.end(), ranks.begin());
#elif defined(STDSORT)
std::cout << "std::sort\n";
std::sort(ranks.begin(), ranks.begin() + items.size(),
make_fun_obj(items.cbegin())
//make_key_comp(make_deref(std::bind1st(
// add<Items::const_iterator, ptrdiff_t>(),
// items.begin())))
);
#elif defined(STDSORT_PACKED)
std::cout << "std::sort with a packed data structure\n";
using Items_ranks = std::vector< std::pair<uint32p8_t,
decltype(ranks)::value_type> >;
Items_ranks items_ranks;
size_t i = 0;
for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
items_ranks.emplace_back(*iI, i);
std::sort(begin(items_ranks), end(items_ranks),
[](Items_ranks::value_type const& lhs,
Items_ranks::value_type const& rhs)
return lhs.first < rhs.first;
);
std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
[](Items_ranks::value_type const& e) return e.second;
);
#endif
auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);
bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
make_fun_obj(items.cbegin()));
std::cout << "duration: " << duration << " ms\n"
<< "result sorted: " << std::boolalpha << sorted << "\n";
return 0;
完整代码:
#include <algorithm>
#include <iterator>
#include <vector>
#include <cstddef>
using std::size_t;
using std::ptrdiff_t;
#include <xmmintrin.h>
template<class It1, class It2>
void radix_ipass(
It1 begin, It1 const end,
It2 const a, size_t const i,
std::vector<std::vector<size_t> > &buckets)
size_t ncleared = 0;
for (It1 j = begin; j != end; ++j)
#if defined(_MM_TRANSPOSE4_PS)
constexpr auto N = 8;
if(end - j > N)
_mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0);
#else
#error SS intrinsic not found
#endif
size_t const k = a[*j][i];
while (k >= ncleared && ncleared < buckets.size())
buckets[ncleared++].clear();
if (k >= buckets.size())
buckets.resize(k + 1);
ncleared = buckets.size();
buckets[k].push_back(size_t());
using std::swap; swap(buckets[k].back(), *j);
for (std::vector<std::vector<size_t> >::iterator
j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
begin = std::swap_ranges(j->begin(), j->end(), begin);
template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
for (ptrdiff_t i = 0; i != end - begin; ++i) items[i] = i;
size_t smax = 0;
for (It i = begin; i != end; ++i)
size_t const n = i->size();
smax = n > smax ? n : smax;
std::vector<std::vector<size_t> > buckets;
for (size_t i = 0; i != smax; ++i)
radix_ipass(
items, items + (end - begin),
begin, smax - i - 1, buckets);
#include <functional>
template<class Key>
struct key_comp : public Key
explicit key_comp(Key const &key = Key()) : Key(key)
template<class T>
bool operator()(T const &a, T const &b) const
return this->Key::operator()(a) < this->Key::operator()(b);
;
template<class Key>
key_comp<Key> make_key_comp(Key const &key) return key_comp<Key>(key);
template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
T1 operator()(T1 a, T2 const &b) const return a += b; ;
template<class F>
struct deref : public F
deref(F const &f) : F(f)
typename std::iterator_traits<
typename F::result_type
>::value_type const
&operator()(typename F::argument_type const &a) const
return *this->F::operator()(a);
;
template<class T> deref<T> make_deref(T const &t) return deref<T>(t);
size_t xorshf96(void) // random number generator
static size_t x = 123456789, y = 362436069, z = 521288629;
x ^= x << 16;
x ^= x >> 5;
x ^= x << 1;
size_t t = x;
x = y;
y = z;
z = t ^ x ^ y;
return z;
template<class It>
struct fun_obj
It beg;
bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
return beg[lhs] < beg[rhs];
;
template<class It>
fun_obj<It> make_fun_obj(It beg)
return fun_obj<It>beg;
struct uint32p8_t
uint32_t m32;
uint8_t m8;
uint32p8_t(std::array<uint16_t, 3> const& a)
: m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
, m8( a[2]&0xFF )
operator std::array<size_t, 3>() const
return m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
(m32&0xF)<<8 | m8;
friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
if(lhs.m32 < rhs.m32) return true;
if(lhs.m32 > rhs.m32) return false;
return lhs.m8 < rhs.m8;
;
#include <stdio.h>
#include <time.h>
#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>
int main(int argc, char* argv[])
std::cout.sync_with_stdio(false);
constexpr auto items_count_default = 2<<22;
constexpr auto seed_default = 42;
uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
std::cout << "seed: " << seed << "\n";
size_t const items_count = argc > 2 ? std::atoll(argv[2]) : items_count_default;
std::cout << "item count: " << items_count << "\n";
using Items_array_value_t =
#ifdef RADIX_SORT
size_t
#elif defined(STDSORT)
uint16_t
#elif defined(STDSORT_PACKED)
uint16_t
#endif
;
typedef std::vector<std::array<Items_array_value_t, 3> > Items;
Items items(items_count);
auto const ranks_count =
#ifdef RADIX_SORT
items.size() * 2
#elif defined(STDSORT)
items.size()
#elif defined(STDSORT_PACKED)
items.size()
#endif
;
//auto prng = xorshf96;
std::mt19937 gen(seed);
std::uniform_int_distribution<> dist;
auto prng = [&dist, &gen]return dist(gen);;
std::vector<size_t> ranks(ranks_count);
for (size_t i = 0; i != items.size(); i++)
ranks[i] = i;
for (size_t j = 0; j != items[i].size(); j++)
items[i][j] = prng() & 0xFFF;
std::cout << "using ";
clock_t const start = clock();
#ifdef RADIX_SORT
std::cout << "radix sort\n";
radix_isort(items.begin(), items.end(), ranks.begin());
#elif defined(STDSORT)
std::cout << "std::sort\n";
std::sort(ranks.begin(), ranks.begin() + items.size(),
make_fun_obj(items.cbegin())
//make_key_comp(make_deref(std::bind1st(
// add<Items::const_iterator, ptrdiff_t>(),
// items.begin())))
);
#elif defined(STDSORT_PACKED)
std::cout << "std::sort with a packed data structure\n";
using Items_ranks = std::vector< std::pair<uint32p8_t,
decltype(ranks)::value_type> >;
Items_ranks items_ranks;
size_t i = 0;
for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
items_ranks.emplace_back(*iI, i);
std::sort(begin(items_ranks), end(items_ranks),
[](Items_ranks::value_type const& lhs,
Items_ranks::value_type const& rhs)
return lhs.first < rhs.first;
);
std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
[](Items_ranks::value_type const& e) return e.second;
);
#endif
auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);
bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
make_fun_obj(items.cbegin()));
std::cout << "duration: " << duration << " ms\n"
<< "result sorted: " << std::boolalpha << sorted << "\n";
return 0;
【讨论】:
以上是关于如何优化间接基数排序? (又名如何优化不可预测的内存访问模式)的主要内容,如果未能解决你的问题,请参考以下文章