Boost Shared_lock / Unique_lock,给写者优先权?

Posted

技术标签:

【中文标题】Boost Shared_lock / Unique_lock,给写者优先权?【英文标题】:Boost Shared_lock / Unique_lock, giving the writer priority? 【发布时间】:2018-01-10 11:36:44 【问题描述】:

我有一个使用 Boost Thread 锁定编写的多线程应用程序。

在这种情况下,有一个写者和多个读者。正如我现在所拥有的,作者似乎要等待所有读者完成才能再次写作。

我想要的是给作者优先权,这样如果它想再写一次,无论如何它都会这样做。读者围绕它工作。 例如:

现在:

Writer;
reader1;
reader2;
reader3;
reader4;

我想要的是:

Writer;
reader1;
reader2;
Writer(if ready);
reader3;
reader4;

这可能吗?我的代码复制如下:

typedef boost::shared_mutex Lock;
typedef boost::unique_lock< Lock > WriteLock;
typedef boost::shared_lock< Lock > ReadLock;
Lock frameLock;

cv::Mat currentFrame;
bool frameOk;

void writer()

    while (true)
    
        cv::Mat frame;
        cv::Mat src = cv::imread("C:\\grace_17.0001.jpg");
        cv::resize(src, frame, cv::Size(src.cols / 4, src.rows / 4));

        int64 t0 = cv::getTickCount();


        WriteLock w_lock(frameLock);
        frame.copyTo(currentFrame);
        frameLock.unlock();

        frameOk = true; // tells read we have at least one frame

        int64 t1 = cv::getTickCount();
        double secs = (t1 - t0) / cv::getTickFrequency();
        std::cout << "wait time WRITE: " << secs * 1000 << std::endl;
    


void readerTwo(int wait)

    while (true)
    
        if (frameOk) // if first frame is written
        
            static cv::Mat readframe;

            int64 t0 = cv::getTickCount();


            //gets frame
            ReadLock r_lockz(frameLock);
            currentFrame.copyTo(readframe);
            r_lockz.unlock();

            std::cout << "READ: " << std::to_string(wait)<< std::endl;

            cv::imshow(std::to_string(wait), readframe);
            cv::waitKey(1);
            std::this_thread::sleep_for(std::chrono::milliseconds(20));
        
    




void main()

    const int readerthreadcount = 50;

    std::vector<boost::thread*> readerthread;

    boost::thread* wThread = new boost::thread(writer);


    for (int i = 0; i<readerthreadcount; i++) 
        ostringstream id;  id << "reader" << i + 1;
        readerthread.push_back(new boost::thread(readerTwo, (i)));
    

    wThread->join(); delete wThread;

    for (int i = 0; i<readerthreadcount; i++) 
        readerthread[i]->join(); delete readerthread[i];
    


谢谢。

【问题讨论】:

Boost 文档说“注意 shared_mutex 中缺少读写器优先级策略”,这表明仅使用 boost::shared_mutex 可能无法做到这一点,因为它使用了一个公平的算法旨在防止读取器或写入器饥饿。 谢谢。我将考虑使用其他一些技术来获得我需要的结果。 写入器总是必须等待所有读取器完成后才能再次写入。想想看——所有读者都读过锁,因为它是可共享的。没有办法强制他们释放读锁,读锁和写锁不能同时存在。所以作者必须等到所有读者释放他们的锁。 【参考方案1】:

写入器饥饿是读取器/写入器锁的典型问题。

不幸的是,必须针对每个算法和每个体系结构调整读取器/写入器锁。 (至少在开发出更智能的东西之前。)

这可能吗?

是的,这是可能的。带条件变量。有一个计数等待作家。当写入器进入时,它获取互斥锁,增加 waitingWriters,然后在条件 readerCount == 0 时等待。当读取器线程结束时,它获取互斥锁,减少 readerCount,如果 readerCount == 0 则向写入器条件发出信号。阅读器线程进来,它获取互斥锁。如果 waitingWriters == 0,则增加 readerCount 并释放互斥锁。否则等待条件 waitingWriters == 0。当写入线程完成时,它会获取互斥锁。如果 waitingWriters == 0,则表示读取器条件。否则,它会发出下一个作家的信号。

注意我刚刚给你的这个算法:

    现在优先于读取写入。这是另一个极端 读取可以饥饿而不是写入。 仅使用 1 个互斥锁。 (不是一个 读取器互斥体和写入器互斥体) 不适用于快速读取,即读取操作短于一个调度时间片的读取。为此,您需要使用自旋锁(查看 Big Reader)

调整取决于许多因素,其中最重要的是读取线程与写入线程的比率以及关键部分的长度。

【讨论】:

【参考方案2】:

这是我的高效写入优先共享互斥锁。在最佳情况下,它只需要一个原子交换来锁定和解锁 - 与其他需要两个原子交换的实现相反。

#pragma once
#include <cstdint>
#include <cassert>
#include <thread>
#include <new>
#include <atomic>
#include "semaphore.h"

static_assert(std::atomic<std::uint64_t>::is_always_lock_free, "std::uint64_t must be lock-free");

class alignas(std::hardware_constructive_interference_size) wprio_shared_mutex

public:
         wprio_shared_mutex();
         wprio_shared_mutex( wprio_shared_mutex const & ) = delete;
         ~wprio_shared_mutex();
    void lock_shared();
    void unlock_shared();
    void shared_to_write();
    void lock_writer();
    void write_to_shared();
    void unlock_writer();
    bool we_are_writer();

private:
    std::atomic<std::uint64_t> m_atomic; // bit  0 - 20: readers
                                         // bit 21 - 41: waiting readers
                                         // bit 42 - 62: waiting writers
                                         // bit      61: writer-flag
    std::thread::id            m_writerId;
    std::uint32_t              m_writerRecursionCount;
    semaphore                  m_releaseReadersSem,
                               m_releaseWriterSem;

    static unsigned const      WAITING_READERS_BASE   = 21,
                               WAITING_WRITERS_BASE   = 42,
                               WRITER_FLAG_BASE       = 63;
    static std::uint64_t const MASK21                 = 0x1FFFFFu;
    static std::uint64_t const READERS_MASK           = MASK21,
                               WAITING_READERS_MASK   = MASK21           << WAITING_READERS_BASE,
                               WAITING_WRITERS_MASK   = MASK21           << WAITING_WRITERS_BASE,
                               WRITER_FLAG_MASK       = (std::uint64_t)1 << WRITER_FLAG_BASE;
    static std::uint64_t const READER_VALUE           = (std::uint64_t)1,
                               WAITING_READERS_VALUE  = (std::uint64_t)1 << WAITING_READERS_BASE,
                               WAITING_WRITERS_VALUE  = (std::uint64_t)1 << WAITING_WRITERS_BASE;

    static bool check( std::uint64_t flags );
;

inline
bool wprio_shared_mutex::check( std::uint64_t flags )

    unsigned readers        = (unsigned)(flags                           & MASK21),
             waitingReaders = (unsigned)((flags >> WAITING_READERS_BASE) & MASK21),
             waitingWriters = (unsigned)((flags >> WAITING_WRITERS_BASE) & MASK21),
             writerFlag     = (unsigned)((flags >> WRITER_FLAG_BASE)     & 1);
    if( readers && (waitingReaders || writerFlag) )
        return false;
    if( waitingReaders && (readers || !writerFlag) )
        return false;
    if( waitingWriters && !(writerFlag || readers) )
        return false;
    if( writerFlag && readers )
        return false;
    return true;


wprio_shared_mutex::wprio_shared_mutex()

    m_atomic.store( 0, std::memory_order_relaxed );


wprio_shared_mutex::~wprio_shared_mutex()

    assert(m_atomic == 0);


void wprio_shared_mutex::lock_shared()

    using namespace std;
    for( uint64_t cmp = m_atomic.load( std::memory_order_relaxed ); ; )
    
        assert(check( cmp ));
        if( (cmp & WRITER_FLAG_MASK) == 0 )
        [[likely]]
        
            if( m_atomic.compare_exchange_weak( cmp, cmp + READER_VALUE, memory_order_acquire, memory_order_relaxed ) )
                [[likely]]
                return;
        
        else
            if( m_atomic.compare_exchange_weak( cmp, cmp + WAITING_READERS_VALUE, memory_order_relaxed, memory_order_relaxed ) )
            [[likely]]
            
                m_releaseReadersSem.forced_wait();
                return;
            
    


void wprio_shared_mutex::unlock_shared()

    using namespace std;
    for( uint64_t cmp = m_atomic.load( std::memory_order_relaxed ); ; )
    
        assert(check( cmp ));
        assert((cmp & READERS_MASK) >= READER_VALUE);
        if( (cmp & READERS_MASK) != READER_VALUE || (cmp & WAITING_WRITERS_MASK) == 0 )
        [[likely]]
        
            if( m_atomic.compare_exchange_weak( cmp, cmp - READER_VALUE, memory_order_relaxed, memory_order_relaxed ) )
                [[likely]]
                return;
        
        else
        
            assert(!(cmp & WRITER_FLAG_MASK));
            if( m_atomic.compare_exchange_weak( cmp, (cmp - READER_VALUE - WAITING_WRITERS_VALUE) | WRITER_FLAG_MASK, memory_order_relaxed, memory_order_relaxed ) )
            [[likely]]
            
                m_releaseWriterSem.forced_release( 1 );
                return;
            
        
    


void wprio_shared_mutex::shared_to_write()

    using namespace std;
    for( uint64_t cmp = m_atomic.load( std::memory_order_relaxed ); ; )
    
        assert(check( cmp ));
        assert((cmp & READERS_MASK) >= READER_VALUE);
        if( (cmp & READERS_MASK) == READER_VALUE )
        [[likely]]
        
            assert(!(cmp & WRITER_FLAG_MASK));
            if( m_atomic.compare_exchange_weak( cmp, (cmp - READER_VALUE) | WRITER_FLAG_MASK, memory_order_acquire, memory_order_relaxed ) )
            [[likely]]
            
                m_writerId             = this_thread::get_id();
                m_writerRecursionCount = 0;
                return;
            
        
        else
        
            assert((cmp & READERS_MASK) > READER_VALUE);
            if( m_atomic.compare_exchange_weak( cmp, cmp - READER_VALUE + WAITING_WRITERS_VALUE, memory_order_relaxed, memory_order_relaxed ) )
            [[likely]]
            
                m_releaseWriterSem.forced_wait();
                m_writerId             = this_thread::get_id();
                m_writerRecursionCount = 0;
                return;
            
        
    


void wprio_shared_mutex::lock_writer()

    using namespace std;
    uint64_t cmp = m_atomic.load( std::memory_order_acquire );
    if( (cmp & WRITER_FLAG_MASK) && m_writerId == this_thread::get_id() )
    
        ++m_writerRecursionCount;
        return;
    
    for( ; ; )
    
        assert(check( cmp ));
        if( (cmp & (WRITER_FLAG_MASK | READERS_MASK)) == 0 )
        [[likely]
        
            if( m_atomic.compare_exchange_weak( cmp, cmp | WRITER_FLAG_MASK, memory_order_acquire, memory_order_relaxed ) )
            [[likely]
            
                m_writerId             = this_thread::get_id();
                m_writerRecursionCount = 0;
                return;
            
        
        else
            if( m_atomic.compare_exchange_weak( cmp, cmp + WAITING_WRITERS_VALUE, memory_order_relaxed, memory_order_relaxed ) )
            [[likely]]
            
                m_releaseWriterSem.forced_wait();
                m_writerId             = this_thread::get_id();
                m_writerRecursionCount = 0;
                return;
            
    


void wprio_shared_mutex::unlock_writer()

    using namespace std;
    uint64_t cmp = m_atomic.load( std::memory_order_relaxed );
    if( (cmp & WRITER_FLAG_MASK) && m_writerRecursionCount && m_writerId == this_thread::get_id() )
    
        --m_writerRecursionCount;
        return;
    
    m_writerId = thread::id();
    for( ; ; )
    
        assert(cmp & WRITER_FLAG_MASK && !(cmp & READERS_MASK));
        assert(check( cmp ));
        if( (cmp & WAITING_WRITERS_MASK) != 0 )
            [[unlikely]]
            if( m_atomic.compare_exchange_weak( cmp, cmp - WAITING_WRITERS_VALUE, memory_order_release, memory_order_relaxed ) )
            [[likely]]
            
                m_releaseWriterSem.forced_release( 1 );
                return;
            
            else
                continue;
        if( (cmp & WAITING_READERS_MASK) != 0 )
        [[unlikely]]
        
            uint64_t wakeups = (cmp & WAITING_READERS_MASK) >> WAITING_READERS_BASE;
            if( m_atomic.compare_exchange_weak( cmp, (cmp & ~WRITER_FLAG_MASK) - (cmp & WAITING_READERS_MASK) + wakeups, memory_order_release, memory_order_relaxed ) )
            [[likely]]
            
                m_releaseReadersSem.forced_release( (unsigned)wakeups );
                return;
            
            else
                continue;
        
        if( m_atomic.compare_exchange_weak( cmp, 0, memory_order_release, memory_order_relaxed ) )
            [[likely]]
            return;
    


bool wprio_shared_mutex::we_are_writer()

    return (m_atomic.load( std::memory_order_relaxed ) & WRITER_FLAG_MASK) && m_writerId == std::this_thread::get_id();

该算法允许继续读取,但是一旦写入者注册写入,更多的读取者被排入队列并等待当前读取者完成;而这一切都是通过一个 64 位原子值完成的!

代码允许读者递归和作者递归。但是当你多次阅读时,你不应该做 shared_to_write();那你就会陷入僵局。对于共享阅读来说,递归的能力是天生的,没有额外的开销。但是对于写作,还有一个额外的递归计数器以及一个 thread::id。

我不打算在这里包含我的信号量类,因为它应该是不言自明的。在我的信号量类中,我有强制等待和强制释放;这是两个函数,如果失败,会重复等待或释放。

[[likely]]- 和 [[unlikely]]- 标签是 C++20 优化提示。您可以使用早期的编译器删除它们。 we_are_writer 方法检查当前线程是否具有写所有权。这可以用于 f.e.使用 assert() 进行调试。

共享互斥体通过 alignas() 指令与缓存线对齐。但是由于对象末尾的两个信号量,整个对象本身可能比缓存线大。但是短锁定路径的数据位于适合缓存行的标头中。如果对象末尾的信号量不适合同一个缓存行,这应该不会有什么坏处,因为休眠锁定无论如何都很慢。

对象既不能复制,也不能移动,因为信号量可能也不能。这可能是 f.e.因为 POSIX 信号量依赖于不可复制的 sem_t-datatype,它可能直接嵌入到 C++ 信号量数据类型中,从而使其不可复制或可移动。

【讨论】:

以上是关于Boost Shared_lock / Unique_lock,给写者优先权?的主要内容,如果未能解决你的问题,请参考以下文章

boost::unique_lock/upgrade_to_unique_lock && boost::shared_lock 可以同时存在吗?它让我担心

C++ 多线程 读锁之后不能跟写锁 shared_lock之后不能跟unique_lock 死锁 解决方案

Boost Shared_lock / Unique_lock,给写者优先权?

C++ boost::upgrade_lock boost::upgrade_to_unique_lock如何使用 例子

C++ boost::upgrade_lock upgrade_to_unique_lock 升级锁 是什么 怎么用

C++ boost锁的概述