创建原子引用计数的尝试因死锁而失败。这是正确的方法吗?

Posted

技术标签:

【中文标题】创建原子引用计数的尝试因死锁而失败。这是正确的方法吗?【英文标题】:An attempt to create atomic reference counting is failing with deadlock. Is this the right approach? 【发布时间】:2014-07-17 01:27:52 【问题描述】:

因此,我正在尝试创建写时复制映射,该映射尝试在读取端进行原子引用计数以不锁定。

有些地方不太对劲。我看到一些引用过度增加,一些下降为负数,所以有些东西并不是真正的原子。在我的测试中,我有 10 个读取线程循环 100 次,每个线程执行 get(),1 个写入线程执行 100 次写入。

它卡在编写器中,因为某些引用永远不会降为零,即使它们应该。

我正在尝试使用 explained by this blog 铺设的 128 位 DCAS 技术。

这有什么明显的错误,或者有没有更简单的方法来调试它而不是在调试器中使用它?

typedef std::unordered_map<std::string, std::string> StringMap;

static const int zero = 0;  //provides an l-value for asm code

class NonBlockingReadMapCAS 

public:

    class OctaWordMapWrapper 
    public:
        StringMap* fStringMap;
        //std::atomic<int> fCounter;
        int64_t fCounter;

        OctaWordMapWrapper(OctaWordMapWrapper* copy) : fStringMap(new StringMap(*copy->fStringMap)), fCounter(0)  

        OctaWordMapWrapper() : fStringMap(new StringMap), fCounter(0)  

        ~OctaWordMapWrapper() 
            delete fStringMap;
        

        /**
         * Does a compare and swap on an octa-word - in this case, our two adjacent class members fStringMap 
         * pointer and fCounter.
         */
        static bool inline doubleCAS(OctaWordMapWrapper* target, StringMap* compareMap, int64_t compareCounter, StringMap* swapMap, int64_t swapCounter ) 
            bool cas_result;
            __asm__ __volatile__
            (
             "lock cmpxchg16b %0;"    // cmpxchg16b sets ZF on success
             "setz       %3;"         // if ZF set, set cas_result to 1

             : "+m" (*target),
               "+a" (compareMap),     //compare target's stringmap pointer to compareMap
               "+d" (compareCounter), //compare target's counter to compareCounter
               "=q" (cas_result)      //results
             : "b"  (swapMap),        //swap target's stringmap pointer with swapMap
               "c"  (swapCounter)     //swap target's counter with swapCounter
             : "cc", "memory"
             );
            return cas_result;
        



    OctaWordMapWrapper* atomicIncrementAndGetPointer()
    

        if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))
            return this;
        else
            return NULL;
    


        OctaWordMapWrapper* atomicDecrement()
        
            while(true) 
                if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter -1))
                    break;
            
            return this;
        

        bool atomicSwapWhenNotReferenced(StringMap* newMap)
        
            return doubleCAS(this, this->fStringMap, zero, newMap, 0);
        
    
    __attribute__((aligned(16)));

    std::atomic<OctaWordMapWrapper*> fReadMapReference;
    pthread_mutex_t fMutex;


    NonBlockingReadMapCAS()  
        fReadMapReference = new OctaWordMapWrapper();
    

    ~NonBlockingReadMapCAS() 
       delete fReadMapReference;
    

    bool contains(const char* key) 
        std::string keyStr(key);
        return contains(keyStr);
    

    bool contains(std::string &key) 
        OctaWordMapWrapper *map;
        do 
            map = fReadMapReference.load()->atomicIncrementAndGetPointer();
         while (!map);
        bool result = map->fStringMap->count(key) != 0;
        map->atomicDecrement();
        return result;
    

    std::string get(const char* key) 
        std::string keyStr(key);
        return get(keyStr);
    

    std::string get(std::string &key) 
        OctaWordMapWrapper *map;
        do 
            map = fReadMapReference.load()->atomicIncrementAndGetPointer();
         while (!map);
        //std::cout << "inc " << map->fStringMap << " cnt " << map->fCounter << "\n";
        std::string value = map->fStringMap->at(key);
        map->atomicDecrement();
        return value;
    

    void put(const char* key, const char* value) 
        std::string keyStr(key);
        std::string valueStr(value);
        put(keyStr, valueStr);
    

    void put(std::string &key, std::string &value) 
        pthread_mutex_lock(&fMutex);
        OctaWordMapWrapper *oldWrapper = fReadMapReference;
        OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
        std::pair<std::string, std::string> kvPair(key, value);
        newWrapper->fStringMap->insert(kvPair);
        fReadMapReference.store(newWrapper);
        std::cout << oldWrapper->fCounter << "\n";
        while (oldWrapper->fCounter > 0);
        delete oldWrapper;
        pthread_mutex_unlock(&fMutex);

    

    void clear() 
        pthread_mutex_lock(&fMutex);
        OctaWordMapWrapper *oldWrapper = fReadMapReference;
        OctaWordMapWrapper *newWrapper = new OctaWordMapWrapper(oldWrapper);
        fReadMapReference.store(newWrapper);
        while (oldWrapper->fCounter > 0);
        delete oldWrapper;
        pthread_mutex_unlock(&fMutex);

    

;

【问题讨论】:

快速浏览一下:如果 doubleCAS 失败,atomicIncrementAndGetPointer 会发生什么?虽然它会再次调用,但是会为 compareCounter 发送什么值?您正在传递 this->fCounter,但由于该元素不是 volatile,旧值不会被传递吗? @DavidWohlferd 感谢您的浏览。我重新排列了代码,以便在 CAS 失败时重新加载并重新发送 fReadMapReference。仍然悬而未决,但我认为你指出的仍然是一个问题。 你不使用__sync_val_compare_and_swap()有什么原因吗? 看起来肯定会导致灾难的是,您没有正确控制复制和分配,即“三法则”。这和恕我直言,动态分配的过度使用使得这段代码很难正确。首先清理该代码,如果运气好的话,你会在某些你没有预料到的地方以及它导致错误行为的地方遇到编译器错误。 貌似有不少无用的代码,比如void *p2 = map-&gt;fStringMap,或者函数atomicDecrement返回值。尝试进行一些清理,只留下相关的内容。它可能会帮助那些试图帮助你的用户...... 【参考方案1】:

也许不是答案,但这对我来说似乎很可疑:

while (oldWrapper->fCounter > 0);
delete oldWrapper;

当计数器为 0 时,您可以让一个阅读器线程进入 atomicIncrementAndGetPointer(),从而通过删除包装器将地毯拉到阅读器线程下方。

编辑总结下面的 cmets 以获得潜在的解决方案:

我知道的最佳实现是将fCounterOctaWordMapWrapper 移动到fReadMapReference(实际上你根本不需要OctaWordMapWrapper 类)。当计数器为零时,交换写入器中的指针。因为您可能会遇到读取器线程的高竞争,这实际上会无限期地阻塞写入器,所以您可以为读取器锁定分配最高位 fCounter,即,当该位被设置时,读取器会旋转直到该位被清除。编写器在即将更改指针时设置此位 (__sync_fetch_and_or()),等待计数器降至零(即现有读取器完成其​​工作),然后交换指针并清除该位。

这种方法应该是防水的,尽管它显然会在写入时阻止读者。我不知道这在您的情况下是否可以接受,理想情况下您希望它是非阻塞的。

代码看起来像这样(未经测试!):

class NonBlockingReadMapCAS

public:
  NonBlockingReadMapCAS() :m_ptr(0), m_counter(0) 

private:
  StringMap *acquire_read()
  
    while(1)
    
      uint32_t counter=atom_inc(m_counter);
      if(!(counter&0x80000000))
        return m_ptr;
      atom_dec(m_counter);
      while(m_counter&0x80000000);
    
    return 0;
  

  void release_read()
  
    atom_dec(m_counter);
  

  void acquire_write()
  
    uint32_t counter=atom_or(m_counter, 0x80000000);
    assert(!(counter&0x80000000));
    while(m_counter&0x7fffffff);
  

  void release_write()
  
    atom_and(m_counter, uint32_t(0x7fffffff));
  

  StringMap *volatile m_ptr;
  volatile uint32_t m_counter;
;

只需在访问指针以进行读/写之前和之后调用acquire/release_read/write()。将atom_inc/dec/or/and() 分别替换为__sync_fetch_and_add()__sync_fetch_and_sub()__sync_fetch_and_or()__sync_fetch_and_and()。你实际上不需要doubleCAS()

正如@Quuxplusone 在下面的评论中正确指出的那样,这是单个生产者和多个消费者的实现。我修改了代码以正确断言以执行此操作。

【讨论】:

虽然我同意“忙于等待”while 循环 - 通知/等待模式可能会更可取,但我认为读者访问 oldWrapper 时没有任何风险987654337@ 在put() 方法的早期到达0 b/c,fReadMapReference 成员被换成了一个新成员,这样之后的所有后续读取器线程都将使用newWrapper 而不是oldWrapper。 while 循环的作用是在删除oldWrapper 之前等待任何现有的 读取器线程(在交换之前在那里)完成,从而避免从它们中拉出地毯。跨度> @Turix 我知道这是意图,但它不起作用。一旦读取器线程加载了fReadMapReference,就会有一个窗口,写入器线程可以通过删除指针来使其无效。考虑将fReadMapReference.load()-&gt;atomicIncrementAndGetPointer() 分成两行并将 Sleep(1000) 放在 fReadMapReference.load() 之后但在 atomicIncrementAndGetPointer() 之前。 @Turix 我认为正确的方法是将计数器添加到fReadMapReference,并在计数器为 0 时尝试在编写器线程中交换它。一旦交换,您可以安全地删除指针。 @marathon 也许有更好的方法,但您可以为“读者锁定”保留fReadMapReference.fCounter 的最高位,即如果该位设置为读者自旋。该位由编写器在即将更改指针时设置,并在交换指针之前等待计数器下降到 0。这至少可以防止编写器无限锁定。 请注意(至少截至 2014 年 7 月 24 日)上面的 NonBlockingReadMapCAS 假设只有一个写入器线程。如果任何线程尝试调用NonBlockingReadMapCAS::acquire_write,而任何其他线程已经持有写入者锁,那么一切都会中断。 (您可以在acquire_write 的顶部添加assert 来验证此前提条件。)【参考方案2】:

嗯,可能有很多问题,但这里是明显的两个。

最微不足道的错误在atomicIncrementAndGetPointer。你写道:

if (doubleCAS(this, this->fStringMap, this->fCounter, this->fStringMap, this->fCounter +1))

也就是说,您正试图以无锁方式增加this-&gt;fCounter。但这不起作用,因为您要两次获取旧值,而不能保证每次都读取相同的值。考虑以下事件序列:

线程 A 获取 this-&gt;fCounter(值为 0)并将参数 5 计算为 this-&gt;fCounter +1 = 1。 线程 B 成功递增计数器。 线程 A 获取 this-&gt;fCounter(值为 1)并将参数 3 计算为 this-&gt;fCounter = 1。 线程 A 执行doubleCAS(this, this-&gt;fStringMap, 1, this-&gt;fStringMap, 1)。当然,它成功了,但我们失去了我们试图做的“增量”。

你想要的更像

StringMap* oldMap = this->fStringMap;
int64_t oldCounter = this->fCounter;
if (doubleCAS(this, oldMap, oldValue, oldMap, oldValue+1))
    ...

另一个明显的问题是getput 之间存在数据竞争。考虑以下事件序列:

线程 A 开始执行 get:它获取 fReadMapReference.load() 并准备在该内存地址上执行 atomicIncrementAndGetPointer。 线程 B 完成执行put:它删除了那个内存地址。 (这样做是在它的权利范围内,因为包装器的引用计数仍然为零。) 线程 A 开始在已删除的内存地址上执行atomicIncrementAndGetPointer。如果你很幸运,你会出现段错误,但当然在实践中你可能不会。

如博文中所述:

垃圾收集接口被省略,但在实际应用中,您需要在删除节点之前扫描危险指针。

【讨论】:

【参考方案3】:

另一位用户提出了类似的方法,但如果您使用 gcc(也可能使用 clang)进行编译,您可以使用内部的 __sync_add_and_fetch_4 执行与您的汇编代码类似的操作,并且可能更具可移植性。 我在 Ada 库中实现引用计数时使用过它(但算法保持不变)。

int __sync_add_and_fetch_4 (int* ptr, int value);
// increments the value pointed to by ptr by value, and returns the new value

【讨论】:

【参考方案4】:

虽然我不确定您的阅读器线程是如何工作的,但我怀疑您的问题是您没有在您的 get() 方法中捕获和处理可能的 out_of_range 异常,这些异常可能来自以下行:std::string value = map-&gt;fStringMap-&gt;at(key);。请注意,如果在地图中找不到key,这将抛出并退出函数而不递减计数器,这将导致您描述的条件(在等待时卡在编写器线程中的 while 循环中)递减计数器)。

无论如何,无论这是否是您看到的问题的原因,您肯定需要处理此异常(以及任何其他异常)或修改您的代码,以免出现抛出异常的风险。对于at() 方法,我可能只使用find(),然后检查它返回的迭代器。但是,更一般地说,我建议使用 RAII 模式来确保在不解锁/递减的情况下不会让任何意外异常逃逸。例如,您可以检查 boost::scoped_lock 以包装您的 fMutex,然后为 OctaWordMapWrapper 增量/减量编写类似这样的简单内容:

class ScopedAtomicMapReader

public:
    explicit ScopedAtomicMapReader(std::atomic<OctaWordMapWrapper*>& map) : fMap(NULL)  
        do 
            fMap = map.load()->atomicIncrementAndGetPointer();
         while (NULL == fMap);
    

    ~ScopedAtomicMapReader() 
        if (NULL != fMap)
            fMap->atomicDecrement();
    

    OctaWordMapWrapper* map(void) 
        return fMap;
    

private:
    OctaWordMapWrapper* fMap;

; // class ScopedAtomicMapReader

有了类似的东西,例如,您的 contains()get() 方法将简化为(并且不受异常影响):

bool contains(std::string &key) 
    ScopedAtomicMapReader mapWrapper(fReadMapReference);
    return (mapWrapper.map()->fStringMap->count(key) != 0);


std::string get(std::string &key) 
    ScopedAtomicMapReader mapWrapper(fReadMapReference);
    return mapWrapper.map()->fStringMap->at(key);    // Now it's fine if this throws...

最后,虽然我认为您不应该这样做,但您也可以尝试将fCounter 声明为volatile(假设您在while循环中可以访问它在put() 方法中将位于与读取器线程上的写入不同的线程上。

希望这会有所帮助!

顺便说一句,还有一件小事:fReadMapReference 正在泄漏。我认为你应该在你的析构函数中删除它。

【讨论】:

谢谢。这些都是好点。我不允许在我的测试中使用不存在的键调用 at()。但正如你所说,我应该处理异常情况并修复 dtor。

以上是关于创建原子引用计数的尝试因死锁而失败。这是正确的方法吗?的主要内容,如果未能解决你的问题,请参考以下文章

npm install vercel 因错误的节点版本而失败,尽管它是正确的

Redis原子计数器incr,防止并发请求

如果一个 I/O 函数不能因 EINTR 而失败,这是不是意味着它永远不会阻塞?

死锁编码及定位分析

Vue CLI 3 - 构建不会因 lint 错误而失败

CodedUI 测试因缺少程序集引用而失败?