如何优化间接基数排序? (又名如何优化不可预测的内存访问模式)

Posted

技术标签:

【中文标题】如何优化间接基数排序? (又名如何优化不可预测的内存访问模式)【英文标题】:How to optimize an indirect radix sort? (a.k.a. how to optimize unpredictable memory access patterns) 【发布时间】:2013-11-15 11:59:23 【问题描述】:

我用 C++ 编写了一个间接基数排序算法(间接,我的意思是它返回项目的索引):

#include <algorithm>
#include <iterator>
#include <vector>

template<class It1, class It2>
void radix_ipass(
    It1 begin, It1 const end,
    It2 const a, size_t const i,
    std::vector<std::vector<size_t> > &buckets)

    size_t ncleared = 0;
    for (It1 j = begin; j != end; ++j)
    
        size_t const k = a[*j][i];
        while (k >= ncleared && ncleared < buckets.size())
         buckets[ncleared++].clear(); 
        if (k >= buckets.size())
        
            buckets.resize(k + 1);
            ncleared = buckets.size();
        
        buckets[k].push_back(size_t());
        using std::swap; swap(buckets[k].back(), *j);
    
    for (std::vector<std::vector<size_t> >::iterator
        j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
    
        begin = std::swap_ranges(j->begin(), j->end(), begin);
    


template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)

    for (ptrdiff_t i = 0; i != end - begin; ++i)  items[i] = i; 
    size_t smax = 0;
    for (It i = begin; i != end; ++i)
    
        size_t const n = i->size();
        smax = n > smax ? n : smax;
    
    std::vector<std::vector<size_t> > buckets;
    for (size_t i = 0; i != smax; ++i)
    
        radix_ipass(
            items, items + (end - begin),
            begin, smax - i - 1, buckets);
    

当我使用以下代码测试它时,它的执行速度似乎比 std::sort 快​​ 40% 左右(3920 毫秒对比 6530 毫秒):

#include <functional>

template<class Key>
struct key_comp : public Key

    explicit key_comp(Key const &key = Key()) : Key(key)  
    template<class T>
    bool operator()(T const &a, T const &b) const
     return this->Key::operator()(a) < this->Key::operator()(b); 
;

template<class Key>
key_comp<Key> make_key_comp(Key const &key)  return key_comp<Key>(key); 

template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
 T1 operator()(T1 a, T2 const &b) const  return a += b;  ;

template<class F>
struct deref : public F

    deref(F const &f) : F(f)  
    typename std::iterator_traits<
        typename F::result_type
    >::value_type const
        &operator()(typename F::argument_type const &a) const
     return *this->F::operator()(a); 
;

template<class T> deref<T> make_deref(T const &t)  return deref<T>(t); 

size_t xorshf96(void)  // random number generator

    static size_t x = 123456789, y = 362436069, z = 521288629;
    x ^= x << 16;
    x ^= x >> 5;
    x ^= x << 1;
    size_t t = x;
    x = y;
    y = z;
    z = t ^ x ^ y;
    return z;


#include <stdio.h>
#include <time.h>

#include <array>

int main(void)

    typedef std::vector<std::array<size_t, 3> > Items;
    Items items(1 << 24);
    std::vector<size_t> ranks(items.size() * 2);
    for (size_t i = 0; i != items.size(); i++)
    
        ranks[i] = i;
        for (size_t j = 0; j != items[i].size(); j++)
         items[i][j] = xorshf96() & 0xFFF; 
    
    clock_t const start = clock();
    if (1)  radix_isort(items.begin(), items.end(), ranks.begin()); 
    else  // STL sorting
    
        std::sort(
            ranks.begin(),
            ranks.begin() + items.size(),
            make_key_comp(make_deref(std::bind1st(
                add<Items::const_iterator, ptrdiff_t>(),
                items.begin()))));
    
    printf("%u ms\n",
        (unsigned)((clock() - start) * 1000 / CLOCKS_PER_SEC),
        std::min(ranks.begin(), ranks.end()));
    return 0;

嗯,我想这是我能做的最好的了,我想。

但是在多次撞墙之后,我意识到在radix_ipass 开头的预取可以帮助将结果减少到1440 ms(!):

#include <xmmintrin.h>

...

    for (It1 j = begin; j != end; ++j)
    
#if defined(_MM_TRANSPOSE4_PS)  // should be defined if xmmintrin.h is included
        enum  N = 8 ;
        if (end - j > N)
         _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); 
#endif
        ...
    

显然,瓶颈是内存带宽——访问模式是不可预测的。

所以现在我的问题是:我还能做些什么来让它更快地处理类似数量的数据?

还是没有太大的改进空间?

(我希望尽可能避免损害代码的可读性,所以如果可读性受到损害,改进应该是显着的。)

【问题讨论】:

这个问题似乎是题外话,因为它应该在codereview.stackexchange.com @JoachimPileborg:不过,我并没有要求任何人“审查”我的代码。这是一个优化问题,而不是代码审查请求。 您可能想阅读what's on topic on codereview:“如果您正在从您的项目中寻找关于以下领域的特定工作代码的反馈......性能......那么你在正确的地方!” @JoachimPileborg:我认为这个问题与您在 *** 上看到的性能问题类型更相似,而不是您在 CodeReview.SE 上看到的类型。 最终的printf 似乎缺少一些输出(比格式说明符更多的参数)。另外,我发现make_key_comp(..) 很难阅读和推理。在 C++11 中,我会使用 lambda;否则为自定义函数对象。不过,这不会影响性能(我测试过)。 size_t 需要 &lt;cstddef&gt; (+ using std::size_t;)。我不太明白为什么在只存储 12 位信息时使用 size_t 作为值类型:xorshf96() &amp; 0xFFF(切换到 16 位 int 会稍微提高 std::sort 的性能)。 【参考方案1】:

使用结合等级和值的更紧凑的数据结构可以将std::sort 的性能提高 2-3 倍。本质上,排序现在在vector&lt;pair&lt;Value,Rank&gt;&gt; 上运行。为此,Value 数据类型 std::array&lt;integer_type, 3&gt; 已替换为更紧凑的 pair&lt;uint32_t, uint8_t&gt; 数据结构。它只有半个字节未被使用,&lt; 比较可以分两步完成,首先使用uint32_ts 的大概有效比较(尚不清楚std::array&lt;..&gt;::operator&lt; 使用的循环是否可以优化为同样快速的代码,但是用这种数据结构替换 std::array&lt;integer_type,3&gt; 会产生另一个性能提升)。

不过,它的效率不如基数排序。 (也许您可以使用预取来调整自定义 QuickSort?)

除了额外的排序方法之外,我已将xorshf96 替换为mt19937,因为我知道如何为后者提供种子;)

可以通过两个命令行参数更改种子和值的数量:首先是种子,然后是计数。

使用 g++ 4.9.0 20131022 编译,使用 -std=c++11 -march=native -O3,用于 64 位 linux

样本运行; 重要提示在 Core2Duo 处理器 U9400 上运行(旧且慢!)

项目数:16000000 使用 std::sort 持续时间:12260 毫秒 结果排序:真 种子:5648 项目数:16000000 使用 std::sort 持续时间:12230 毫秒 结果排序:真 种子:5648 项目数:16000000 使用 std::sort 持续时间:12230 毫秒 结果排序:真 种子:5648 项目数:16000000 使用带有打包数据结构的 std::sort 持续时间:4290 毫秒 结果排序:真 种子:5648 项目数:16000000 使用带有打包数据结构的 std::sort 持续时间:4270 毫秒 结果排序:真 种子:5648 项目数:16000000 使用带有打包数据结构的 std::sort 持续时间:4280 毫秒 结果排序:真 项目数:16000000 使用基数排序 持续时间:3790 毫秒 结果排序:真 种子:5648 项目数:16000000 使用基数排序 持续时间:3820 毫秒 结果排序:真 种子:5648 项目数:16000000 使用基数排序 持续时间:3780 毫秒 结果排序:真

新的或更改的代码:

template<class It>
struct fun_obj

        It beg;
        bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
        
                return beg[lhs] < beg[rhs];
        
;

template<class It>
fun_obj<It> make_fun_obj(It beg)

        return fun_obj<It>beg;


struct uint32p8_t

        uint32_t m32;
        uint8_t m8;

        uint32p8_t(std::array<uint16_t, 3> const& a)
                : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
                , m8( a[2]&0xFF )
        
        

        operator std::array<size_t, 3>() const
        
                return m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
                         (m32&0xF)<<8 | m8;
        

        friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
        
                if(lhs.m32 < rhs.m32) return true;
                if(lhs.m32 > rhs.m32) return false;
                return lhs.m8 < rhs.m8;
        
;

#include <stdio.h>
#include <time.h>

#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>

int main(int argc, char* argv[])

    std::cout.sync_with_stdio(false);

    constexpr auto items_count_default = 2<<22;
    constexpr auto seed_default = 42;

    uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
    std::cout << "seed: " << seed << "\n";

    size_t const items_count = argc > 2 ? std::atoll(argv[2])
                                        : items_count_default;
    std::cout << "item count: " << items_count << "\n";

    using Items_array_value_t =
        #ifdef RADIX_SORT
            size_t
        #elif defined(STDSORT)
            uint16_t
        #elif defined(STDSORT_PACKED)
            uint16_t
        #endif
    ;

    typedef std::vector<std::array<Items_array_value_t, 3> > Items;
    Items items(items_count);

    auto const ranks_count =
        #ifdef RADIX_SORT
            items.size() * 2
        #elif defined(STDSORT)
            items.size()
        #elif defined(STDSORT_PACKED)
            items.size()
    #endif
    ;

    //auto prng = xorshf96;
    std::mt19937 gen(seed);
    std::uniform_int_distribution<> dist;
    auto prng = [&dist, &gen]return dist(gen);;

    std::vector<size_t> ranks(ranks_count);
    for (size_t i = 0; i != items.size(); i++)
    
        ranks[i] = i;

        for (size_t j = 0; j != items[i].size(); j++)
         items[i][j] = prng() & 0xFFF; 
    

    std::cout << "using ";
    clock_t const start = clock();
    #ifdef RADIX_SORT
        std::cout << "radix sort\n";
        radix_isort(items.begin(), items.end(), ranks.begin());
    #elif defined(STDSORT)
        std::cout << "std::sort\n";
        std::sort(ranks.begin(), ranks.begin() + items.size(),
                  make_fun_obj(items.cbegin())
                  //make_key_comp(make_deref(std::bind1st(
                  //    add<Items::const_iterator, ptrdiff_t>(),
                  //    items.begin())))
                 );
    #elif defined(STDSORT_PACKED)
        std::cout << "std::sort with a packed data structure\n";
        using Items_ranks = std::vector< std::pair<uint32p8_t,
                                         decltype(ranks)::value_type> >;
        Items_ranks items_ranks;

        size_t i = 0;
        for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
        
            items_ranks.emplace_back(*iI, i);
        

        std::sort(begin(items_ranks), end(items_ranks),
                  [](Items_ranks::value_type const& lhs,
                     Items_ranks::value_type const& rhs)
                   return lhs.first < rhs.first; 
                 );
        std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
                       [](Items_ranks::value_type const& e)  return e.second; 
                      );
    #endif
    auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);

    bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
                                       make_fun_obj(items.cbegin()));

    std::cout << "duration: " << duration << " ms\n"
              << "result sorted: " << std::boolalpha << sorted << "\n";

    return 0;

完整代码:

#include <algorithm>
#include <iterator>
#include <vector>

#include <cstddef>
using std::size_t;
using std::ptrdiff_t;

#include <xmmintrin.h>

template<class It1, class It2>
void radix_ipass(
    It1 begin, It1 const end,
    It2 const a, size_t const i,
    std::vector<std::vector<size_t> > &buckets)

    size_t ncleared = 0;
    for (It1 j = begin; j != end; ++j)
    
        #if defined(_MM_TRANSPOSE4_PS)
            constexpr auto N = 8;
            if(end - j > N)
             _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); 
        #else
            #error SS intrinsic not found
        #endif

        size_t const k = a[*j][i];
        while (k >= ncleared && ncleared < buckets.size())
         buckets[ncleared++].clear(); 
        if (k >= buckets.size())
        
            buckets.resize(k + 1);
            ncleared = buckets.size();
        
        buckets[k].push_back(size_t());
        using std::swap; swap(buckets[k].back(), *j);
    
    for (std::vector<std::vector<size_t> >::iterator
        j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
    
        begin = std::swap_ranges(j->begin(), j->end(), begin);
    


template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)

    for (ptrdiff_t i = 0; i != end - begin; ++i)  items[i] = i; 
    size_t smax = 0;
    for (It i = begin; i != end; ++i)
    
        size_t const n = i->size();
        smax = n > smax ? n : smax;
    
    std::vector<std::vector<size_t> > buckets;
    for (size_t i = 0; i != smax; ++i)
    
        radix_ipass(
            items, items + (end - begin),
            begin, smax - i - 1, buckets);
    


#include <functional>

template<class Key>
struct key_comp : public Key

    explicit key_comp(Key const &key = Key()) : Key(key)  
    template<class T>
    bool operator()(T const &a, T const &b) const
     return this->Key::operator()(a) < this->Key::operator()(b); 
;

template<class Key>
key_comp<Key> make_key_comp(Key const &key)  return key_comp<Key>(key); 

template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
 T1 operator()(T1 a, T2 const &b) const  return a += b;  ;

template<class F>
struct deref : public F

    deref(F const &f) : F(f)  
    typename std::iterator_traits<
        typename F::result_type
    >::value_type const
        &operator()(typename F::argument_type const &a) const
     return *this->F::operator()(a); 
;

template<class T> deref<T> make_deref(T const &t)  return deref<T>(t); 

size_t xorshf96(void)  // random number generator

    static size_t x = 123456789, y = 362436069, z = 521288629;
    x ^= x << 16;
    x ^= x >> 5;
    x ^= x << 1;
    size_t t = x;
    x = y;
    y = z;
    z = t ^ x ^ y;
    return z;


template<class It>
struct fun_obj

        It beg;
        bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
        
                return beg[lhs] < beg[rhs];
        
;

template<class It>
fun_obj<It> make_fun_obj(It beg)

        return fun_obj<It>beg;


struct uint32p8_t

        uint32_t m32;
        uint8_t m8;

        uint32p8_t(std::array<uint16_t, 3> const& a)
                : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
                , m8( a[2]&0xFF )
        
        

        operator std::array<size_t, 3>() const
        
                return m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
                         (m32&0xF)<<8 | m8;
        

        friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
        
                if(lhs.m32 < rhs.m32) return true;
                if(lhs.m32 > rhs.m32) return false;
                return lhs.m8 < rhs.m8;
        
;

#include <stdio.h>
#include <time.h>

#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>

int main(int argc, char* argv[])

    std::cout.sync_with_stdio(false);

    constexpr auto items_count_default = 2<<22;
    constexpr auto seed_default = 42;

    uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
    std::cout << "seed: " << seed << "\n";
    size_t const items_count = argc > 2 ? std::atoll(argv[2]) : items_count_default;
    std::cout << "item count: " << items_count << "\n";

    using Items_array_value_t =
        #ifdef RADIX_SORT
            size_t
        #elif defined(STDSORT)
            uint16_t
        #elif defined(STDSORT_PACKED)
            uint16_t
        #endif
    ;

    typedef std::vector<std::array<Items_array_value_t, 3> > Items;
    Items items(items_count);

    auto const ranks_count =
        #ifdef RADIX_SORT
            items.size() * 2
        #elif defined(STDSORT)
            items.size()
        #elif defined(STDSORT_PACKED)
            items.size()
    #endif
    ;

    //auto prng = xorshf96;
    std::mt19937 gen(seed);
    std::uniform_int_distribution<> dist;
    auto prng = [&dist, &gen]return dist(gen);;

    std::vector<size_t> ranks(ranks_count);
    for (size_t i = 0; i != items.size(); i++)
    
        ranks[i] = i;

        for (size_t j = 0; j != items[i].size(); j++)
         items[i][j] = prng() & 0xFFF; 
    

    std::cout << "using ";
    clock_t const start = clock();
    #ifdef RADIX_SORT
        std::cout << "radix sort\n";
        radix_isort(items.begin(), items.end(), ranks.begin());
    #elif defined(STDSORT)
        std::cout << "std::sort\n";
        std::sort(ranks.begin(), ranks.begin() + items.size(),
                  make_fun_obj(items.cbegin())
                  //make_key_comp(make_deref(std::bind1st(
                  //    add<Items::const_iterator, ptrdiff_t>(),
                  //    items.begin())))
                 );
    #elif defined(STDSORT_PACKED)
        std::cout << "std::sort with a packed data structure\n";
        using Items_ranks = std::vector< std::pair<uint32p8_t,
                                         decltype(ranks)::value_type> >;
        Items_ranks items_ranks;

        size_t i = 0;
        for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
        
            items_ranks.emplace_back(*iI, i);
        

        std::sort(begin(items_ranks), end(items_ranks),
                  [](Items_ranks::value_type const& lhs,
                     Items_ranks::value_type const& rhs)
                   return lhs.first < rhs.first; 
                 );
        std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
                       [](Items_ranks::value_type const& e)  return e.second; 
                      );
    #endif
    auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);

    bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
                                       make_fun_obj(items.cbegin()));

    std::cout << "duration: " << duration << " ms\n"
              << "result sorted: " << std::boolalpha << sorted << "\n";

    return 0;

【讨论】:

以上是关于如何优化间接基数排序? (又名如何优化不可预测的内存访问模式)的主要内容,如果未能解决你的问题,请参考以下文章

快速排序及优化

基数排序和更改基数

排序优化

如何优化指针间接层

网站内部链接应该如何进行优化

基数排序