如何在一个范围内无锁更新索引到最大值?
Posted
技术标签:
【中文标题】如何在一个范围内无锁更新索引到最大值?【英文标题】:How to lock-free update index to maximum over a range? 【发布时间】:2017-07-05 15:24:57 【问题描述】:这个问题最好用一些简单的代码来解释。
struct foo
static constexpr auto N=8;
double data[N]; // initialised at construction
int max; // index to maximum: data[max] is largest value
// if value < data[index]:
// – update data[index] = value
// - update max
void update(int index, double value)
if(value >= data[index])
return;
data[index] = value;
if(index==max) // max unaffected if index!=max
for(index=0; index!=N; ++index)
if(data[index] > data[max])
max = index;
;
现在,我想让foo::update()
线程安全,即允许来自不同线程的并发调用,其中参与的线程不能使用相同的index
调用。一种方法是向foo
添加一个互斥锁或简单的自旋锁(可以假定争用较低):
struct foo
static constexpr auto N=8;
std::atomic_flag lock = ATOMIC_FLAG_INIT;
double data[N];
int max;
// index is unique to each thread
// if value < data[index]:
// – update data[index] = value
// - update max
void update(int index, double value)
if(value >= data[index])
return;
while(lock.test_and_set(std::memory_order_acquire)); // aquire spinlock
data[index] = value;
if(index==max)
for(index=0; index!=N; ++index)
if(data[index] > data[max])
max = index;
lock.clear(std::memory_order_release); // release spinlock
;
但是,我怎样才能实现foo::update()
无锁(您可以将data
和max
视为atomic
)?
注意:这是原始帖子的更简单版本,与树结构无关。
【问题讨论】:
这是一个难题;无锁树本身就是一个研究课题 @LWimsey 我已经更改/简化了问题以避免树引起的任何并发症。value = data[max=index];
应该是max = index;
,不是吗?
@Tsyvarev 不在最初发布的代码中:value
也必须更新(因为它在上一行中使用过)。我将编辑帖子以避免这种情况并使其更清晰。
您的手动锁不安全:您通常需要 seq_cst 来获取锁,以防止关键部分中的负载在实际获取锁的存储之前可见。请记住,acquire
仅适用于测试和设置的加载部分,而不适用于存储。在 x86 上,它将编译为 lock xchg
或 lock bts
或其他东西,因此您只能在架构上看到问题,其中弱于 seq_cst
的原子 RMW 实际上可以在 asm 中以这种方式编译。 (这可能不是完全正确的推理,但我很确定acquire
太弱了。)
【参考方案1】:
所以,IIUC,数组只有在低于现有值时才会获得新值(我不会担心初始值是如何到达那里的),如果当前最大值降低,则找到一个新的最大值.
其中一些并不太难。 但有些……更难。
所以“if value
auto oldval = data[index].load(memory_order_relaxed);
do
if (value <= oldval) return;
while ( ! data[index].compare_exchange_weak(oldval, value) );
// (note that oldval is updated to data[index] each time comp-exch fails)
所以现在 data[index] 有了新的较低值。惊人的。而且相对容易。 现在大约是最大值。
第一个问题 - max 可以出错吗?因为它目前可能是错误的(在我们的场景中,我们在处理 max 之前更新 data[index])。
可能在某些方面是错误的,而不是其他方面吗?即假设我们的数据只有两个条目:
data[2] = 3, 7 ;
我们想做update(1, 2)
即将7
更改为2
。 (因此更新最大值!)
场景A:先设置数据,再设置最大值:
data[1] = 2;
pause(); // ie scheduler pauses this thread
max = 0; // data[0]==3 is now max
如果另一个线程在pause()
进来,那么data[max]
是错误的:2
而不是3
:-(
场景 B:先设置最大值:
max = 0; // it will be "shortly"?
pause();
data[1] = 2;
现在线程可以将 data[max] 读取为 3 而 7 仍在数据中。但是 7 会“很快”变成 2,这样可以吗?它是否比方案 A“错误更少”?取决于用途? (即,如果重要的是“哪个是最大值”,我们有这个权利。但如果最大值是唯一重要的事情,为什么要存储所有数据?)
问“是错的好吗”似乎很奇怪,但在某些无锁情况下,这实际上是一个有效的问题。对我来说,B 有机会在某些用途上没问题,而 A 则不然。
还有,这很重要:
data[max] 总是错误的,即使是在完美的算法中
我的意思是你需要意识到 data[max],你一读到它就已经过时了——如果你生活在一个无锁的世界里。因为它可能在您阅读后立即发生了变化。 (也因为数据和最大值是独立变化的。但即使你有一个函数 getMaxValue() ,它一返回就会过时。)
这样好吗?因为,如果没有,你显然需要一把锁。但如果没问题,我们可以利用它来发挥我们的优势——我们可能会返回一个我们知道有些不正确/过时的答案,但不会比你从外面看到的更不正确。
如果两种情况都不行,那么您必须同时更新 max 和 data[index]。这很难,因为它们不适合无锁大小的块。
因此,您可以添加一个间接层:
struct DataAndMax double data[N]; int max; ;
DataAndMax * ptr;
每当您需要更新 max 时,您需要创建一个全新的 DataAndMax 结构(即分配一个新结构),以某种方式将其全部填充好,然后自动将 ptr 交换到新结构。 如果在您准备新数据时其他线程更改了 ptr,那么您需要重新开始,因为您的数据中需要他们的新数据。
如果 ptr 已经改变了两次,那么它可能看起来就像它没有改变,但实际上它确实改变了:假设 ptr 当前具有值 0xA000
并且第二个线程分配了一个新的 DataAndStruct在0xB000
,并将ptr 设置为0xB000
,并在0xA000
释放旧的。现在又一个线程(第 3 个)进来了,分配了另一个 DataAndStruct - 并且看到分配器返回给你0xA000
(为什么不呢,它刚刚被释放了!)。所以这第三个线程将 ptr 设置为 0xA000
。
当您尝试将 ptr 设置为 0xC000
时,这一切都会发生。你所看到的只是 ptr 是 0xA000
,后来仍然是 0xA000
,所以你认为它(及其数据)没有改变。然而它有——它从0xA000
到0xB000
(当你不看的时候)又回到0xA000
——地址相同,但数据不同。这称为 ABA 问题。
现在,如果您知道最大线程数,您可以预先分配:
DataAndMax dataBufs[NUM_THREADS];
DataAndMax * ptr; // current DataAndMax
然后永远不会分配/删除,也永远不会出现 ABA 问题。或者还有其他方法可以避免 ABA。
让我们回过头来想想我们将如何——无论如何——返回一个可能已经过时的最大值。我们可以使用它吗?
所以你进来,首先检查你要写的索引是不是重要的:
if (index != max)
// we are not touching max,
// so nothing fancy here!
data[index] value;
return;
// else do it the hard way:
//...
但这已经是错误的了。 在 if 之后和 set 之前,max 可能已经改变了。 每个集都需要更新max!?!?
因此,如果 N 很小,您可以线性搜索 以获得最大值。如果有人在搜索时进行更新可能是错误的,但请记住 - 如果有人在搜索后立即进行更新 或在“在此处插入魔法”之后进行更新也可能是错误的。因此,除了可能很慢之外,搜索与任何算法一样正确。你会发现有一段时间,最大的东西。
如果 N == 8,我会使用搜索。确实。
您可以使用 memory_order_relaxed
搜索 8 个条目,这比尝试使用更强大的原子操作来维护任何内容要快。
我还有其他想法:
更多簿记?单独存储maxValue?
double data[N];
double maxValue;
int indexOfMax;
bool wasMax = false;
if (index == indexOfMax)
wasMax = true;
data[index] = value;
if (wasMax || index == indexOfMax)
findMax(&indexOfMax, &maxValue); // linear search
这可能需要在某个地方使用 CAS 循环。仍然是线性搜索,但可能不那么频繁了?
也许您在每个条目中都需要额外的数据?还不确定。
嗯嗯。
这并不简单。因此,如果有一个正确的算法(我认为有,在某些限制范围内)它不太可能没有错误。即一个正确的算法可能实际上存在,但你没有找到它——你找到的是一个看起来正确的算法。
【讨论】:
atomic<double>
现在存在。 Compilers currently suck at it, but C++11 exposes pretty much everything that x86 supports,TSX(事务性内存)除外。在内存中的 double
上没有对 atomic +=
或 atomic max()
的硬件支持,因此您(或编译器)必须使用 CAS(x86 cmpxchg
,它完全能够在 64 -bit double
位模式)。你认为 C++20 可以做什么?
关于max
立即过时的好处。存储最大值和索引可能会有所帮助。也许在 int32 index
和 float val
的 8B atomic<struct>
中,您可以使用单个原子加载进行加载。然后,您也许可以控制错误发生的方式。 (例如,也许您可以安排更新的顺序,以便 maxval
始终
Hrm,然后存入数组后重新检查最大值?不,这不好,我不认为当另一个线程将最大值降低到您刚刚存储的值以下时,您不能拥有轻量级的快速路径(无需等待,甚至不是原子 RMW)而不会出现错误。
我想如果你知道不同的线程不能有相同的index
,你可能不想要一个连续的数组。让多个线程写入同一个缓存行会导致错误共享,您可以通过让它们各自写入自己的索引来避免这种情况。读取你写的缓存行的其他线程比他们写它的糟糕得多,因为它只需要成为共享的,而不是无效的;您可以将其恢复为已修改,而无需重新加载。 (MESI)。
您指出了 CAS 的一个问题 - 最初的 load
通常是一个普通的问题,因此您首先浪费时间(通常)将线路置于 S 状态,然后几乎立即尝试当cmpxchg
通过时使其处于 M 状态,这可能会增加线路被盗的窗口,并且通常会增加缓存和一致性流量。我想知道是否首先在同一行上盲写相邻的、未使用的位置,以便最初获得 M 中的行,在某些情况下实际上是一种优化。 LL/SC 没有这个问题,因为 LL 已经很特殊了。以上是关于如何在一个范围内无锁更新索引到最大值?的主要内容,如果未能解决你的问题,请参考以下文章