C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)

Posted 小丑快学习

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)相关的知识,希望对你有一定的参考价值。

采用计数器的方式防止条件竞争。

template<typename T>
class lock_free_queue
{
private:
    struct node;
    struct counted_node_ptr
    {
        int external_count;//记录外部线程通过指针的引用
        node* ptr;
    };
    std::atomic<counted_node_ptr> head;
    std::atomic<counted_node_ptr> tail; // 1 
    //计数器
    struct node_counter
    {
        unsigned internal_count : 30;
        unsigned external_counters : 2; // 大小不会超过2,同时最多由next,和head或者tail指向。
    };
    struct node
    {
        std::atomic<T*> data;
        std::atomic<node_counter> count; // 3 
        counted_node_ptr next;
        node()
        {
            node_counter new_count;
            new_count.internal_count = 0;
            new_count.external_counters = 2; // next 和 tail 同时指向,因为新创建的节点将会放入到队列中
            count.store(new_count);
            next.ptr = nullptr;
            next.external_count = 0;

        }

        /*
            释放对当前节点的引用,释放节点引用时语言将该节点得到内部计数减一,
            如果减一后节点的外部引用以及内部引用计数为零,则删除该节点。
            释放操作由节点自己完成,所以该函数为node的成员函数。
            所以该函数释放的是该函数的计数器的外部引用节点。
        */
        void release_ref()
        {
            node_counter old_counter = count.load(std::memory_order_relaxed);
            node_counter new_counter;
            do
            {
                new_counter = old_counter;
                --new_counter.internal_count; // 内部计数减一
            } while (!count.compare_exchange_strong( // 2 
                old_counter, new_counter,
                std::memory_order_acquire, std::memory_order_relaxed));

            if (!new_counter.internal_count &&
                !new_counter.external_counters)
            {
                delete this; // 3 
            }
        }//release_ref

    };//node

    /*
        增加对对节点的外部引用计数,
        同时增加两个计数指针的引用,
        增加的是外部节点的引用,而非节点的引用计数器。
    */
    static void increase_external_count(
        std::atomic<counted_node_ptr>& counter,
        counted_node_ptr& old_counter)
    {
        counted_node_ptr new_counter;
        do
        {
            new_counter = old_counter;
            ++new_counter.external_count;
        } while (!counter.compare_exchange_strong(
            old_counter, new_counter,
            std::memory_order_acquire, std::memory_order_relaxed));
        old_counter.external_count = new_counter.external_count;
    }//increase_external_count

    /*
        释放一个节点的引用,同时将外部节点的引用计数加入到内部节点中,
        当节点的引用计数为零时将节点删除。计数器中的外部引用也会减一

    */
    static void free_external_counter(counted_node_ptr
        & old_node_ptr)
    {
        node* const ptr = old_node_ptr.ptr;
        int const count_increase = old_node_ptr.external_count - 2;
        node_counter old_counter =
            ptr->count.load(std::memory_order_relaxed);
        node_counter new_counter;
        do
        {
            new_counter = old_counter;
            --new_counter.external_counters; // 1 计数器外部节点引用减一,此时tail和next已经没有指向该节点
            new_counter.internal_count += count_increase; // 2 将外部节点引用的计数加到内部节点引用 
        } while (!ptr->count.compare_exchange_strong( // 3 
            old_counter, new_counter,
            std::memory_order_acquire, std::memory_order_relaxed));

        if (!new_counter.internal_count &&
            !new_counter.external_counters)
        {
            delete ptr; // 如果引用为零,便删除节点
        }
    }//free_external_counter

public:
    /*
        数据进对队列成功后会将计数器的外部计数减一,
        因为此时只有一个next指向节点
    */
    void push(T new_value)
    {
        std::unique_ptr<T> new_data(new T(new_value));
        counted_node_ptr new_next;
        new_next.ptr = new node;
        new_next.external_count = 1;
        counted_node_ptr old_tail = tail.load();
        for (;;)
        {
            increase_external_count(tail, old_tail); // 增加外部节点的引用(非计数器)
            T* old_data = nullptr;
            if (old_tail.ptr->data.compare_exchange_strong( // 6 
                old_data, new_data.get()))
            {
                old_tail.ptr->next = new_next;
                old_tail = tail.exchange(new_next);
                free_external_counter(old_tail); // 此时内部计数的外部计数将减一 
                new_data.release();
                break;
            }
            old_tail.ptr->release_ref();//指针离开节点则内部计数减一
        }
    }//push

    std::unique_ptr<T> pop()
    {
        counted_node_ptr
            old_head = head.load(std::memory_order_relaxed); // 1 
        for (;;)
        {
            increase_external_count(head, old_head); // 增加节点引用(非计数器引用)
            node* const ptr = old_head.ptr;
            if (ptr == tail.load().ptr)//条件满足则队列为空
            {
                ptr->release_ref(); // 3 
                return std::unique_ptr<T>();
            }
            if (head.compare_exchange_strong(old_head, ptr->next)) // 条件满足则取出数据
            {
                T* const res = ptr->data.exchange(nullptr);//取出数据
                free_external_counter(old_head); //释放计数器节点
                return std::unique_ptr<T>(res);
            }
            ptr->release_ref(); // 指针离开节点,则内部数据减一
        }
    }//pop





};

在push操作中,代码 6 ,if (old_tail.ptr->data.compare_exchange_strong( old_data, new_data.get()))条件失败就会导致一直循环,从而导致cpu一直被占用,因此,这种情况实际上类似于自旋锁,这将使得效率非常的低,因此,我们可以想办法让忙等待的线程帮助其他线程完成操作,因而减少忙等带来的浪费。

template<typename T>
class lock_free_queue
{
private:

	struct node 
	{ 
		std::atomic<T*> data; 
		std::atomic<node_counter> count; 
		std::atomic<counted_node_ptr> next; // 1 
	}; 
public: 
	std::unique_ptr<T> pop() 
	{ 
		counted_node_ptr 
		old_head=head.load(std::memory_order_relaxed); 
		for(;;) 
		{ 
			increase_external_count(head,old_head); 
			node* const ptr=old_head.ptr; 
			if(ptr==tail.load().ptr) 
			{ 
				return std::unique_ptr<T>(); 
			}
			counted_node_ptr next=ptr->next.load(); // 2 
			if(head.compare_exchange_strong(old_head,next)) 
			{ 
				T* const res=ptr->data.exchange(nullptr); 
				free_external_counter(old_head); 
				return std::unique_ptr<T>(res); 
			}
			ptr->release_ref(); 
		} 
	}

    /*
        更新尾节点
    */
    void set_new_tail(counted_node_ptr& old_tail, // 1 
        counted_node_ptr const& new_tail)
    {
        node* const current_tail_ptr = old_tail.ptr;
        while (!tail.compare_exchange_weak(old_tail, new_tail) && 
            old_tail.ptr == current_tail_ptr);//如果没有线程修改尾节点或者pop尾节点,那么修改尾节点
        if (old_tail.ptr == current_tail_ptr) // 3 
            free_external_counter(old_tail); // 4 
        else
            current_tail_ptr->release_ref(); // 5 
    }
public:
    //提高性能的push
    void push(T new_value)
    {
        std::unique_ptr<T> new_data(new T(new_value));
        counted_node_ptr new_next;
        new_next.ptr = new node;
        new_next.external_count = 1;
        counted_node_ptr old_tail = tail.load();
        for (;;)
        {
            increase_external_count(tail, old_tail);
            T* old_data = nullptr;
            if (old_tail.ptr->data.compare_exchange_strong( // 6 将数据放入到原来的节点中
                old_data, new_data.get()))
            {
                counted_node_ptr old_next = { 0 };//old_next 应该为空
                if (!old_tail.ptr->next.compare_exchange_strong( // 7 
                    old_next, new_next))//如果原来的next不空,则说明已经有其他线程已经修改尾指针
                {
                    delete new_next.ptr; // 8 此时数据已经push成功,因此释放new出的节点
                    new_next = old_next; // 9 new_next也应该置空
                }
                set_new_tail(old_tail, new_next);//修改tail指针
                new_data.release();
                break;
            }
            else // 10  说明其他线程已经修改数据
            {
                counted_node_ptr old_next = { 0 };
                if (old_tail.ptr->next.compare_exchange_strong( // 如果还未修改尾指针,则由该线程修改 
                    old_next, new_next))//此处是复制操作,非指针操作,但是ptr指针为浅拷贝
                {
                    old_next = new_next; // 12 old_next 指向的node和old_tail指向的是同一个
                    new_next.ptr = new node; // 13 因为该线程new的节点已经称为了tail,因此,需要重新new
                }
                set_new_tail(old_tail, old_next); // 更新节点指针,tail指向下一个节点
            }
        }
    }
};

以上是关于C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)的主要内容,如果未能解决你的问题,请参考以下文章

C++并发编程----实现线无锁线程安全的数据结构(《C++ Concurrency in Action》 读书笔记)

C++并发编程----利用锁实现线程安全的数据结构(《C++ Concurrency in Action》 读书笔记)

c++11 并发队列的生产方案 BlockingConcurrentQueue

多线程编程之无锁队列

Linux(程序设计):24---无锁CAS(附无锁队列的实现)

java并发编程:管程内存模型无锁并发线程池AQS原理与锁线程安全集合类并发设计模式