std::atomic_flag 停止多个线程

Posted

技术标签:

【中文标题】std::atomic_flag 停止多个线程【英文标题】:std::atomic_flag to stop multiple threads 【发布时间】:2016-02-27 12:32:47 【问题描述】:

我正在尝试使用std::atomic_flag 停止多个工作线程。从Issue using std::atomic_flag with worker thread开始,以下作品:

#include <iostream>
#include <atomic>
#include <chrono>
#include <thread>

std::atomic_flag continueFlag;
std::thread t;

void work()

    while (continueFlag.test_and_set(std::memory_order_relaxed)) 
        std::cout << "work ";
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    


void start()

    continueFlag.test_and_set(std::memory_order_relaxed);
    t = std::thread(&work);


void stop()

    continueFlag.clear(std::memory_order_relaxed);
    t.join();


int main()

    std::cout << "Start" << std::endl;
    start();
    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    std::cout << "Stop" << std::endl;
    stop();
    std::cout << "Stopped." << std::endl;

    return 0;

尝试重写成多个工作线程:

#include <iostream>
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>
#include <memory>

struct thread_data 
    std::atomic_flag continueFlag;
    std::thread thread;
;

std::vector<thread_data> threads;

void work(int threadNum, std::atomic_flag &continueFlag)

    while (continueFlag.test_and_set(std::memory_order_relaxed)) 
        std::cout << "work" << threadNum << " ";
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    


void start()

    const unsigned int numThreads = 2;

    for (int i = 0; i < numThreads; i++) 
        ////////////////////////////////////////////////////////////////////
        //PROBLEM SECTOR
        ////////////////////////////////////////////////////////////////////
        thread_data td;
        td.continueFlag.test_and_set(std::memory_order_relaxed);

        td.thread = std::thread(&work, i, td.continueFlag);

        threads.push_back(std::move(td));
        ////////////////////////////////////////////////////////////////////
        //PROBLEM SECTOR
        ////////////////////////////////////////////////////////////////////
    


void stop()

    //Flag stop
    for (auto &data : threads) 
        data.continueFlag.clear(std::memory_order_relaxed);
    
    //Join
    for (auto &data : threads) 
        data.thread.join();
    
    threads.clear();


int main()

    std::cout << "Start" << std::endl;
    start();
    std::this_thread::sleep_for(std::chrono::milliseconds(200));
    std::cout << "Stop" << std::endl;
    stop();
    std::cout << "Stopped." << std::endl;

    return 0;

我的问题是上面的“问题部门”。即创建线程。我无法理解如何实例化线程并将变量传递给工作线程。

现在的错误是引用此行threads.push_back(std::move(td)); 错误Error C2280 'thread_data::thread_data(const thread_data &amp;)': attempting to reference a deleted function

尝试像这样使用 unique_ptr:

        auto td = std::make_unique<thread_data>();
        td->continueFlag.test_and_set(std::memory_order_relaxed);

        td->thread = std::thread(&work, i, td->continueFlag);

        threads.push_back(std::move(td));

td-&gt;thread = std::thread(&amp;work, i, td-&gt;continueFlag); 行给出错误std::atomic_flag::atomic_flag(const std::atomic_flag &amp;)': attempting to reference a deleted function。我是否从根本上误解了 std::atomic_flag 的使用?真的是既不可动又不可复制吗?

【问题讨论】:

我没有看代码,所以这个可能不适用。但是std::atomic_flag 的级别相当低,使用起来有点棘手。在我看来std::atomic&lt;bool&gt; 在这里更合适。它看起来就像一个普通的boolif(my_flag) ...my_flag = true;my_flag = false; 根据标准std::atomic_flag保证无锁。一个简单的工作线程测试增加由while(continueFlag) 包裹的计数器,std::atomic_flagstd::atomic&lt;bool&gt; 快 100% 以上。鉴于这是一个综合示例,它仍然表明如果您经常轮询标志,标志版本会好得多。我倾向于在半关键部分进行投票,以了解工作是否被取消。 除非你在一些非常时髦的系统上std::atomic&lt;bool&gt; 将是无锁的。我非常怀疑它与std::atomic_flag 之间的任何实际速度差异是否会很明显。如果您的应用程序的性能受到检查原子变量的速度的限制,那么它没有做任何实际工作,需要重新设计。 【参考方案1】:

您的第一种方法实际上更接近事实。问题是它将对本地for 循环范围内的对象的引用作为参数传递给每个线程。但是,当然,一旦循环迭代结束,该对象就会超出范围并被销毁,从而使每个线程都具有对已销毁对象的引用,从而导致未定义的行为。

没有人关心您在创建线程后将对象移动到std::vector 的事实。线程接收到对本地范围对象的引用,这就是它所知道的一切。故事结束。

首先将对象移动到向量中,然后向每个线程传递对std::vector 中对象的引用也不起作用。一旦向量在内部重新分配,作为其自然增长的一部分,您将处于同一个泡菜中。

需要先创建整个threads 数组,然后再实际启动任何std::threads。如果RAII 原则被虔诚地遵循,那就意味着只需简单地调用std::vector::resize()

然后,在第二个循环中,遍历完整的 threads 数组,并为数组中的每个元素生成一个 std::thread

【讨论】:

我不确定这是否正确,但threads.push_back(std::move(td)); 应该调用push_back 作为右值引用,因此应该调用默认的移动构造函数。因此,移动的对象是否超出范围无关紧要。但是我发现的问题是std::atomic_flag 既不能移动也不能复制(毕竟这是有道理的)。您使用 resize() 的想法也不起作用,因为它调用移动或复制分配,因为它的内存被重新定位。但是矢量的新副本有效。看我的回答。 @Sheph -- 这当然很重要,因为在原始版本中,线程在移动之前接收对移动对象的引用。无论您如何移动它,线程都不在乎。它获取对被移动对象的引用,随后移动它对引用没有影响,因此每个线程继续尝试访问被销毁的对象。未定义的行为。你的最终解决方案是正确的。【参考方案2】:

我的unique_ptr 解决方案几乎在那里。我只需要像这样将调用作为 std::ref() 传递:

std::vector<std::unique_ptr<thread_data>> threads;

void start()

    const unsigned int numThreads = 2;

    for (int i = 0; i < numThreads; i++) 
        auto td = std::make_unique<thread_data>();
        td->continueFlag.test_and_set(std::memory_order_relaxed);
        td->thread = std::thread(&work, i, std::ref(td->continueFlag));

        threads.push_back(std::move(td));
    

不过,受上述 Sam 的启发,我也想出了一种非指针方式:

std::vector<thread_data> threads;

void start()

    const unsigned int numThreads = 2;

    //create new vector, resize doesn't work as it tries to assign/copy which atomic_flag
    //does not support
    threads = std::vector<thread_data>(numThreads);
    for (int i = 0; i < numThreads; i++) 
        auto& t = threads.at(i);
        t.continueFlag.test_and_set(std::memory_order_relaxed);
        t.thread = std::thread(&work, i, std::ref(t.continueFlag));
    

【讨论】:

以上是关于std::atomic_flag 停止多个线程的主要内容,如果未能解决你的问题,请参考以下文章

互锁变量访问(在布尔值上)和 std::atomic_flag 之间的区别

将 std::atomic_flag 包装在 getter/setter 中会使其“原子性”无效吗?

第31课 std::atomic原子变量

autohotkey:停止/结束包含多个块的当前热键线程

求助python多线程,执行到100多个停止了

如何停止一个正在运行的线程?