键/值数组的双调排序

Posted

技术标签:

【中文标题】键/值数组的双调排序【英文标题】:Bitonic sort for key/value array 【发布时间】:2016-07-25 15:34:43 【问题描述】:

我正在尝试修改对cl_ints 数组进行排序的the Intel's Bitonic Sorting 算法,以对cl_int2s 数组进行排序(基于键——即cl_int2.x)。

英特尔的示例由一个简单的主机代码和一个 OpenCL 内核组成,该内核在一次排序操作(多通道)期间被多次调用。 内核一次加载 4 个数组项为 cl_int4 并对其进行操作。

我没有修改主机代码算法,只修改了设备代码。 核函数变化列表:

将第一个内核的参数类型从int4*修改为int8*(加载四个键值对) 仅使用 theArray 元素的 .even 组件来比较值 (<) 创建 "pseudomask" (int4) 并在此基础上创建 maskpseudomask.xxyyzzww(以捕获值)

虽然我修改过的内核的输出是由第一个组件 (cl_int2.x) 完美排序的 cl_int2 数组,但值 (cl_int2.y) 不正确 – 一个项目的值在下一个重复4 或 8 个项目,然后使用新值并重复...

我确定有一个小错误,但我找不到它。

Diff of the original Intel code and my modified version.

编辑:当每个键 (cl_int2.x) 都是唯一的时,cl_int2 数组的排序完美无缺。


示例输入:http://pastebin.com/92qB1csT

示例输出:http://pastebin.com/dsU97Npn

(正确排序的数组:http://pastebin.com/Nb56BuQK)

修改后的内核代码(已注释):

// Copyright (c) 2009-2011 Intel Corporation
// https://software.intel.com/en-us/articles/bitonic-sorting

// Modified to sort int2 key-value array

__kernel void BitonicSort(__global int8* theArray,
                         const uint stage,
                         const uint passOfStage,
                         const uint dir)

    size_t i = get_global_id(0);
    int8 srcLeft, srcRight, mask;
    int4 pseudomask;
    int4 imask10 = (int4)(0,  0, -1, -1);
    int4 imask11 = (int4)(0, -1,  0, -1);

    if(stage > 0)
    
        if(passOfStage > 0)    // upper level pass, exchange between two fours,
        
            size_t r = 1 << (passOfStage - 1);
            size_t lmask = r - 1;
            size_t left = ((i>>(passOfStage-1)) << passOfStage) + (i & lmask);
            size_t right = left + r;

            srcLeft = theArray[left];
            srcRight = theArray[right];
            pseudomask = srcLeft.even < srcRight.even;
            mask = pseudomask.xxyyzzww;

            int8 imin = (srcLeft & mask) | (srcRight & ~mask);
            int8 imax = (srcLeft & ~mask) | (srcRight & mask);

            if( ((i>>(stage-1)) & 1) ^ dir )
            
                theArray[left]  = imin;
                theArray[right] = imax;
            
            else
            
                theArray[right] = imin;
                theArray[left]  = imax;
            
        
        else    // last pass, sort inside one four
        
            srcLeft = theArray[i];
            srcRight = srcLeft.s45670123;
            pseudomask = (srcLeft.even < srcRight.even) ^ imask10;
            mask = pseudomask.xxyyzzww;

            if(((i >> stage) & 1) ^ dir)
            
                srcLeft = (srcLeft & mask) | (srcRight & ~mask);

                srcRight = srcLeft.s23016745;
                pseudomask = (srcLeft.even < srcRight.even) ^ imask11;
                mask = pseudomask.xxyyzzww;

                theArray[i] = (srcLeft & mask) | (srcRight & ~mask);
            
            else
            
                srcLeft = (srcLeft & ~mask) | (srcRight & mask);

                srcRight = srcLeft.s23016745;
                pseudomask = (srcLeft.even < srcRight.even) ^ imask11;
                mask = pseudomask.xxyyzzww;

                theArray[i] = (srcLeft & ~mask) | (srcRight & mask);
            
        
    
    else    // first stage, sort inside one four
    
        /*
         *  To convert this code to int2 sorter, do this:
         *      1. instead of loading int4, load int8 (key,value, key,value, ...)
         *      2. when there is a vector swizzling, replace component index with two consecutive indices:
         *           srcLeft.yxwz  ->  srcLeft.s23016745
         *         use this rewrite rule:
         *           x  y  z  w
         *           01 23 45 67
         *      3. replace comparison operands with only their keys swizzled:
         *           mask = srcLeft < srcRight;    ->    pseudomask = srcLeft.even < srcRight.even; mask = pseudomask.xxyyzzww;
         */

        //  make bitonic sequence out of 4.
        int4 imask0 = (int4)(0, -1, -1,  0); // -1 in comparison = true (all bits set - two's complement)
        srcLeft = theArray[i];
        srcRight = srcLeft.s23016745;

        /*
         * This XOR mask flips bits, so that in `mask` are the following
         * results (remember that srcRight is srcLeft with swapped component pairs):
         *
         *      [ left.x<left.y, left.x<left.y,    left.w<left.z, left.w<left.z  ]
         *  or: [ left.x<left.y, left.x<left.y,    left.z>left.w, left.z>left.w  ]
         */
        pseudomask = (srcLeft.even < srcRight.even) ^ imask0;
        mask = pseudomask.xxyyzzww;

        if( dir )
            srcLeft = (srcLeft & mask) | (srcRight & ~mask);  // make sure the numbers are sorted like this:
        else
            srcLeft = (srcLeft & ~mask) | (srcRight & mask);

        /*
         *  Now the pairs of numbers in `srcLeft` are sorted according to the specified `dir`ection.
         *  If dir == true, then
         *    The components `x` and `y` are swapped so that `x` < `y`. Moreover `z` and `w` are swapped so that `z` > `w`. This resembles up-hill: /\
         *  else
         *    The components `x` and `y` are swapped so that `x` > `y`. Moreover `z` and `w` are swapped so that `z` < `w`. This resembles down-hill: \/
         *
         *  This swapping is achieved by creating `srcLeft`, which is in normal order, and `srcRight`, which has component pairs switched (xyzw -> yxwz).
         *  Then the `mask` is created. The mask bits are redundant because it applies to vector component pairs (so in order to implement key-value sorting,
         *  I have to increase the length of masks!).
         *
         *  The non-ordered component pairs in `srcLeft` are masked out by `mask` while the inverted `mask` is applied to the (pair-wise switched) `srcRight`.
         *
         *  This (the previous) first flipping just makes a 4-bitonic sequence.
         */


        /*
         *  This second step just sorts the bitonic sequence
         */
        srcRight = srcLeft.s45670123; // inverts the bitonic sequence

        // [ left.a<left.c, left.b<left.d,    left.a<left.c, left.b<left.d ]
        pseudomask = (srcLeft.even < srcRight.even) ^ imask10;  // imask10 = (noflip, noflip,  flip, flip)
        mask = pseudomask.xxyyzzww;

        // even or odd (The output of this thread is sorted monotonic sequence. The monotonicity changes and thus preparing bitonic sequence for the next pass.).
        if((i & 1) ^ dir)
        
            // this sorts the bitonic sequence, hence splitting it
            srcLeft = (srcLeft & mask) | (srcRight & ~mask);

            srcRight = srcLeft.s23016745;
            pseudomask = (srcLeft.even < srcRight.even) ^ imask11;
            mask = pseudomask.xxyyzzww;

            theArray[i] = (srcLeft & mask) | (srcRight & ~mask);
        
        else
        
            srcLeft = (srcLeft & ~mask) | (srcRight & mask);

            srcRight = srcLeft.s23016745;
            pseudomask = (srcLeft.even < srcRight.even) ^ imask11;
            mask = pseudomask.xxyyzzww;

            theArray[i] = (srcLeft & ~mask) | (srcRight & mask);
        
    

主机端代码:

void ExecuteSortKernel(cl_kernel kernel, cl_command_queue queue, cl_mem cl_input_buffer, cl_int arraySize, cl_uint sortAscending)

    cl_int numStages = 0;

    cl_int stage;
    cl_int passOfStage;

    for (cl_int temp = arraySize; temp > 2; temp >>= 1)
        numStages++;

    clSetKernelArg(kernel, 0, sizeof(cl_mem), (void *) &cl_input_buffer);
    clSetKernelArg(kernel, 3, sizeof(cl_uint), (void *) &sortAscending);

    for (stage = 0; stage < numStages; stage++) 
        clSetKernelArg(kernel, 1, sizeof(cl_uint), (void *) &stage);

        for (passOfStage = stage; passOfStage >= 0; passOfStage--) 
            clSetKernelArg(kernel, 2, sizeof(cl_uint), (void *) &passOfStage);

            // set work-item dimensions
            size_t gsz = arraySize / (2*4);
            size_t global_work_size[1] =  passOfStage ? gsz : gsz << 1 ;    //number of quad items in input array

            // execute kernel
            clEnqueueNDRangeKernel(queue, kernel, 1, NULL, global_work_size, NULL, 0, NULL, NULL);
        
    

【问题讨论】:

int4 伪掩码;你是说 int8 伪掩码吗? @SamerTufail 我认为应该是int4,因为我在其中存储了int4 值(例如pseudomask = srcLeft.even &lt; srcRight.evensrcLeft.evenint4)。此外,如果我尝试将类型更改为int8,代码将无法编译。 抱歉,我只将它读为 srcLeft,它是 int8 @SamerTufail 没什么好道歉的 :) 我很高兴有人正在阅读我的问题! 【参考方案1】:

我终于解决了这个问题!

棘手的部分在于原始英特尔代码处理加载的 4 元组中相邻对的相等值的方式 - 它没有明确处理它

这些错误存在于第一个 stage 在每个其他 stages 的最后一个 passOfStage(即passOfStage = 0)中。这些代码部分在一个 4 元组(由 cl_int8 数组 theArray 表示)中交换单独的 2 元组。

让我们以这段摘录为例(对于 4 元组中相等的相邻 2 元组,它不能正常工作):

imask0     = (int4)(0, -1, -1,  0);
srcLeft    = theArray[i];  // int8
srcRight   = srcLeft.s23016745;
pseudomask = (srcLeft.even < srcRight.even) ^ imask0;
mask       = pseudomask.xxyyzzww;
result     = (srcLeft & mask) | (srcRight & ~mask);

想象一下当我们使用这个(未修复的)代码和srcLeft.even = (int4)(7,7, 5,5) 时会发生什么。操作srcLeft.even &lt; srcRight.even 会产生(int4)(0,0,0,0),然后我们用imask0 屏蔽这个结果,我们会得到……pseudomask = (int4)(0,-1,-1,0)——即imask 本身。然而,这是错误的。

pseudomask 的值是形成此模式所必需的:(int4)(a,a, b,b)(其中ab 可以是0-1)。这意味着只需进行以下比较即可形成正确的maskquasimask = srcLeft.s07 &lt; srcRight.s07。然后正确的掩码将被创建为mask = quasimask.xxxxyyyy。前 2 个xes 屏蔽了 4 元组的第一个 2 元组中的第一个键值对(4 元组 = theArray 中的一个元素)。由于我们想要对相应的 2 元组进行位掩码(由 imask0 指定为 0-1 对),我们添加另一个 xx。我们对 4 元组中的第二个 2 元组进行类似的位掩码,这给我们留下了yyyy

imask11 移位的视觉示例

srcLeft:                        x  y  z  w
                                <  <  <  <
srcRight [relative to srcLeft]: y  x  w  z
^ imask0:                       0 -1  0  1
------------------------------------------
(srcLeft<srcRight)^imask0:      x  x  z  z

固定的、功能齐全的版本(我已经评论了固定的部分):

__kernel void BitonicSort(__global int8* theArray,
                         const uint stage,
                         const uint passOfStage,
                         const uint dir)

    size_t i = get_global_id(0);
    int8 srcLeft, srcRight, mask;
    int4 pseudomask;
    int4 imask10 = (int4)(0,  0, -1, -1);
    int4 imask11 = (int4)(0, -1,  0, -1);

    if(stage > 0)
    
        if(passOfStage > 0)    // upper level pass, exchange between two fours
        
            size_t r = 1 << (passOfStage - 1);
            size_t lmask = r - 1;
            size_t left = ((i>>(passOfStage-1)) << passOfStage) + (i & lmask);
            size_t right = left + r;

            srcLeft = theArray[left];
            srcRight = theArray[right];
            pseudomask = srcLeft.even < srcRight.even;
            mask = pseudomask.xxyyzzww; // here we interchange individual components, so no mask is applied and hence no 2 pairs must contain the same bit-pattern

            int8 imin = (srcLeft & mask) | (srcRight & ~mask);
            int8 imax = (srcLeft & ~mask) | (srcRight & mask);

            if( ((i>>(stage-1)) & 1) ^ dir )
            
                theArray[left]  = imin;
                theArray[right] = imax;
            
            else
            
                theArray[right] = imin;
                theArray[left]  = imax;
            
        
        else    // last pass, sort inside one four
        
            srcLeft = theArray[i];
            srcRight = srcLeft.s45670123;
            pseudomask = (srcLeft.even < srcRight.even) ^ imask10;
            mask = pseudomask.xxyyxxyy;

            if(((i >> stage) & 1) ^ dir)
            
                srcLeft = (srcLeft & mask) | (srcRight & ~mask);

                srcRight = srcLeft.s23016745;
                pseudomask = (srcLeft.even < srcRight.even) ^ imask11;
                mask = pseudomask.xxxxzzzz; // the 0th and 1st elements must contain the exact same value (as well as 2nd and 3rd)

                theArray[i] = (srcLeft & mask) | (srcRight & ~mask);
            
            else
            
                srcLeft = (srcLeft & ~mask) | (srcRight & mask);

                srcRight = srcLeft.s23016745;
                pseudomask = (srcLeft.even < srcRight.even) ^ imask11;
                mask = pseudomask.xxxxzzzz; // the 0th and 1st elements must contain the exact same value (as well as 2nd and 3rd)

                theArray[i] = (srcLeft & ~mask) | (srcRight & mask);
            
        
    
    else    // first stage, sort inside one four
    
        int4 imask0 = (int4)(0, -1, -1,  0);
        srcLeft = theArray[i];
        srcRight = srcLeft.s23016745;

        pseudomask = (srcLeft.even < srcRight.even) ^ imask0;
        mask = pseudomask.xxxxwwww; // the 0th and 1st elements must contain the exact same value (as well as 2nd and 3rd)

        if( dir )
            srcLeft = (srcLeft & mask) | (srcRight & ~mask);
        else
            srcLeft = (srcLeft & ~mask) | (srcRight & mask);


        srcRight = srcLeft.s45670123;
        pseudomask = (srcLeft.even < srcRight.even) ^ imask10;
        mask = pseudomask.xxyyxxyy; // the 0th and 2nd elements must contain the exact same value (as well as 1st and 3rd)

        if((i & 1) ^ dir)
        
            srcLeft = (srcLeft & mask) | (srcRight & ~mask);

            srcRight = srcLeft.s23016745;
            pseudomask = (srcLeft.even < srcRight.even) ^ imask11;
            mask = pseudomask.xxxxzzzz; // the 0th and 1st elements must contain the exact same value (as well as 2nd and 3rd)

            theArray[i] = (srcLeft & mask) | (srcRight & ~mask);
        
        else
        
            srcLeft = (srcLeft & ~mask) | (srcRight & mask);

            srcRight = srcLeft.s23016745;
            pseudomask = (srcLeft.even < srcRight.even) ^ imask11;
            mask = pseudomask.xxxxzzzz; // the 0th and 1st elements must contain the exact same value (as well as 2nd and 3rd)

            theArray[i] = (srcLeft & ~mask) | (srcRight & mask);
        
    

【讨论】:

以上是关于键/值数组的双调排序的主要内容,如果未能解决你的问题,请参考以下文章

OpenCL 双调排序 CPU 版

三十分钟理解:双调排序Bitonic Sort,适合并行计算的排序算法

OpenCL 双调排序 GPU 版

基数排序及其并行化

php如何按数组键值排序?

如何对 Perl 哈希值进行排序并相应地对键进行排序(可能在两个数组中)?