如何在一个范围内无锁更新索引到最大值?

Posted

技术标签:

【中文标题】如何在一个范围内无锁更新索引到最大值?【英文标题】:How to lock-free update index to maximum over a range? 【发布时间】:2017-07-05 15:24:57 【问题描述】:

这个问题最好用一些简单的代码来解释。

struct foo

    static constexpr auto N=8;
    double data[N];            //  initialised at construction
    int max;                   //  index to maximum: data[max] is largest value

    // if value < data[index]:
    //   – update data[index] = value
    //   - update max
    void update(int index, double value)
    
        if(value >= data[index])
            return;
        data[index] = value;
        if(index==max)         // max unaffected if index!=max
            for(index=0; index!=N; ++index)
                if(data[index] > data[max])
                    max = index;
    
;

现在,我想让foo::update() 线程安全,即允许来自不同线程的并发调用,其中参与的线程不能使用相同的index 调用。一种方法是向foo 添加一个互斥锁或简单的自旋锁(可以假定争用较低):

struct foo

    static constexpr auto N=8;
    std::atomic_flag lock = ATOMIC_FLAG_INIT;
    double data[N];
    int max;

    // index is unique to each thread
    // if value < data[index]:
    //   – update data[index] = value
    //   - update max
    void update(int index, double value)
    
        if(value >= data[index])
            return;
        while(lock.test_and_set(std::memory_order_acquire));  // aquire spinlock
          data[index] = value;
          if(index==max)
              for(index=0; index!=N; ++index)
                  if(data[index] > data[max])
                      max = index;
        lock.clear(std::memory_order_release);                // release spinlock
    
;

但是,我怎样才能实现foo::update()无锁(您可以将datamax 视为atomic)?


注意:这是原始帖子的更简单版本,与树结构无关。

【问题讨论】:

这是一个难题;无锁树本身就是一个研究课题 @LWimsey 我已经更改/简化了问题以避免树引起的任何并发症。 value = data[max=index]; 应该是max = index;,不是吗? @Tsyvarev 不在最初发布的代码中:value 也必须更新(因为它在上一行中使用过)。我将编辑帖子以避免这种情况并使其更清晰。 您的手动锁不安全:您通常需要 seq_cst 来获取锁,以防止关键部分中的负载在实际获取锁的存储之前可见。请记住,acquire 仅适用于测试和设置的加载部分,而不适用于存储。在 x86 上,它将编译为 lock xchglock bts 或其他东西,因此您只能在架构上看到问题,其中弱于 seq_cst 的原子 RMW 实际上可以在 asm 中以这种方式编译。 (这可能不是完全正确的推理,但我很确定acquire 太弱了。) 【参考方案1】:

所以,IIUC,数组只有在低于现有值时才会获得新值(我不会担心初始值是如何到达那里的),如果当前最大值降低,则找到一个新的最大值.

其中一些并不太难。 但有些……更难。

所以“if value

auto oldval = data[index].load(memory_order_relaxed);
do
   if (value <= oldval) return;
while ( ! data[index].compare_exchange_weak(oldval, value) );
// (note that oldval is updated to data[index] each time comp-exch fails)

所以现在 data[index] 有了新的较低值。惊人的。而且相对容易。 现在大约是最大值。

第一个问题 - max 可以出错吗?因为它目前可能是错误的(在我们的场景中,我们在处理 max 之前更新 data[index])。

可能在某些方面是错误的,而不是其他方面吗?即假设我们的数据只有两个条目:

data[2] =  3, 7 ;

我们想做update(1, 2) 即将7 更改为2。 (因此更新最大值!)

场景A:先设置数据,再设置最大值:

data[1] = 2;
pause(); // ie scheduler pauses this thread
max = 0; // data[0]==3 is now max

如果另一个线程在pause() 进来,那么data[max] 是错误的:2 而不是3 :-(

场景 B:先设置最大值:

max = 0; // it will be "shortly"?
pause();
data[1] = 2;

现在线程可以将 data[max] 读取为 3 而 7 仍在数据中。但是 7 会“很快”变成 2,这样可以吗?它是否比方案 A“错误更少”?取决于用途? (即,如果重要的是“哪个是最大值”,我们有这个权利。但如果最大值是唯一重要的事情,为什么要存储所有数据?)

问“是错的好吗”似乎很奇怪,但在某些无锁情况下,这实际上是一个有效的问题。对我来说,B 有机会在某些用途上没问题,而 A 则不然。

还有,这很重要:

data[max] 总是错误的,即使是在完美的算法中

我的意思是你需要意识到 data[max],你一读到它就已经过时了——如果你生活在一个无锁的世界里。因为它可能在您阅读后立即发生了变化。 (也因为数据和最大值是独立变化的。但即使你有一个函数 getMaxValue() ,它一返回就会过时。)

这样好吗?因为,如果没有,你显然需要一把锁。但如果没问题,我们可以利用它来发挥我们的优势——我们可能会返回一个我们知道有些不正确/过时的答案,但不会比你从外面看到的更不正确。

如果两种情况都不行,那么您必须同时更新 max 和 data[index]。这很难,因为它们不适合无锁大小的块。

因此,您可以添加一个间接层:

struct DataAndMax  double data[N]; int max; ;
DataAndMax * ptr;

每当您需要更新 max 时,您需要创建一个全新的 DataAndMax 结构(即分配一个新结构),以某种方式将其全部填充好,然后自动将 ptr 交换到新结构。 如果在您准备新数据时其他线程更改了 ptr,那么您需要重新开始,因为您的数据中需要他们的新数据。

如果 ptr 已经改变了两次,那么它可能看起来就像它没有改变,但实际上它确实改变了:假设 ptr 当前具有值 0xA000 并且第二个线程分配了一个新的 DataAndStruct在0xB000,并将ptr 设置为0xB000,并在0xA000 释放旧的。现在又一个线程(第 3 个)进来了,分配了另一个 DataAndStruct - 并且看到分配器返回给你0xA000(为什么不呢,它刚刚被释放了!)。所以这第三个线程将 ptr 设置为 0xA000

当您尝试将 ptr 设置为 0xC000 时,这一切都会发生。你所看到的只是 ptr 0xA000,后来仍然 0xA000,所以你认为它(及其数据)没有改变。然而它有——它从0xA0000xB000(当你不看的时候)又回到0xA000——地址相同,但数据不同。这称为 ABA 问题。

现在,如果您知道最大线程数,您可以预先分配:

DataAndMax dataBufs[NUM_THREADS];
DataAndMax * ptr; // current DataAndMax

然后永远不会分配/删除,也永远不会出现 ABA 问题。或者还有其他方法可以避免 ABA。

让我们回过头来想想我们将如何——无论如何——返回一个可能已经过时的最大值。我们可以使用它吗?

所以你进来,首先检查你要写的索引是不是重要的:

if (index != max) 
    // we are not touching max,
    // so nothing fancy here!
    data[index] value;
    return;

// else do it the hard way:
//...

但这已经是错误的了。 在 if 之后和 set 之前,max 可能已经改变了。 每个集都需要更新max!?!?

因此,如果 N 很小,您可以线性搜索 以获得最大值。如果有人在搜索时进行更新可能是错误的,但请记住 - 如果有人在搜索后立即进行更新 或在“在此处插入魔法”之后进行更新也可能是错误的。因此,除了可能很慢之外,搜索与任何算法一样正确。你会发现有一段时间,最大的东西。 如果 N == 8,我会使用搜索。确实。 您可以使用 memory_order_relaxed 搜索 8 个条目,这比尝试使用更强大的原子操作来维护任何内容要快。

我还有其他想法:

更多簿记?单独存储maxValue?

double data[N];
double maxValue;
int indexOfMax;


bool wasMax = false;
if (index == indexOfMax)
    wasMax = true;
data[index] = value; 
if (wasMax || index == indexOfMax)
    findMax(&indexOfMax, &maxValue); // linear search

这可能需要在某个地方使用 CAS 循环。仍然是线性搜索,但可能不那么频繁了?

也许您在每个条目中都需要额外的数据?还不确定。

嗯嗯。

这并不简单。因此,如果有一个正确的算法(我认为有,在某些限制范围内)它不太可能没有错误。即一个正确的算法可能实际上存在,但你没有找到它——你找到的是一个看起来正确的算法。

【讨论】:

atomic&lt;double&gt; 现在存在。 Compilers currently suck at it, but C++11 exposes pretty much everything that x86 supports,TSX(事务性内存)除外。在内存中的 double 上没有对 atomic += 或 atomic max() 的硬件支持,因此您(或编译器)必须使用 CAS(x86 cmpxchg,它完全能够在 64 -bit double 位模式)。你认为 C++20 可以做什么? 关于max 立即过时的好处。存储最大值和索引可能会有所帮助。也许在 int32 indexfloat val 的 8B atomic&lt;struct&gt; 中,您可以使用单个原子加载进行加载。然后,您也许可以控制错误发生的方式。 (例如,也许您可​​以安排更新的顺序,以便 maxval 始终 Hrm,然后存入数组后重新检查最大值?不,这不好,我不认为当另一个线程将最大值降低到您刚刚存储的值以下时,您不能拥有轻量级的快速路径(无需等待,甚至不是原子 RMW)而不会出现错误。 我想如果你知道不同的线程不能有相同的index,你可能不想要一个连续的数组。让多个线程写入同一个缓存行会导致错误共享,您可以通过让它们各自写入自己的索引来避免这种情况。读取你写的缓存行的其他线程比他们写它的糟糕得多,因为它只需要成为共享的,而不是无效的;您可以将其恢复为已修改,而无需重新加载。 (MESI)。 您指出了 CAS 的一个问题 - 最初的 load 通常是一个普通的问题,因此您首先浪费时间(通常)将线路置于 S 状态,然后几乎立即尝试当cmpxchg 通过时使其处于 M 状态,这可能会增加线路被盗的窗口,并且通常会增加缓存和一致性流量。我想知道是否首先在同一行上盲写相邻的、未使用的位置,以便最初获得 M 中的行,在某些情况下实际上是一种优化。 LL/SC 没有这个问题,因为 LL 已经很特殊了。

以上是关于如何在一个范围内无锁更新索引到最大值?的主要内容,如果未能解决你的问题,请参考以下文章

dynamoDB 中全局二级索引范围键的最大值

如何在反应中映射索引范围?

SwiftUI 滑块值不尊重范围

如何从最小索引到最大索引对 SQL 结果进行排序?

Java入门_一维数组_第二题_随机生成数

如何限制 QwtPlot 可以缩小到的最大范围?