C ++中多线程的安全性与速度[关闭]

Posted

技术标签:

【中文标题】C ++中多线程的安全性与速度[关闭]【英文标题】:Safety vs speed of multithreading in C++ [closed] 【发布时间】:2018-12-16 02:45:39 【问题描述】:

如果我有一个要同时由多个线程更新的数组,那么最好/最快的方法是什么?例如,假设我有以下代码:

std::vector<float> vec;
vec.push_back(0.f);
for(int i = 0; i < 10000; i++) 
    std::thread([&] 
        // SAFETY CONSTRUCTS GO HERE
        vec[0] += 1; // OR MAYBE HERE
        // AND HERE? 
    );

// wait a little while, i.e. I was too lazy to write out joins
std::cout << vec[0];

如果我希望这是安全的并最终打印值 10000,那么最好/最快的方法是什么?

【问题讨论】:

在您的示例中,在任何给定时间,只有一个线程可以处理单个向量元素。因此它不能比单线程更快(如果有必要的障碍,它会慢得多)。也因为它们中的任何一个都可能在它们退出之前启动所有 10000 个,它们都将阻塞彼此的 cpu 资源。至少如果为单个线程编译循环也可能会被编译器优化为常量。 很难用这样一个基本的例子来概括如何优化线程。大多数问题不仅仅涉及一个变量。另外,该变量将如何使用?一些线程只读取变量而其他线程写入它吗?如果是这样,读者与作者的比例是多少。等等……等等…… 在这种情况下你不能使用std::atomic 吗?我删除了我的答案,因为我不完全理解它是如何工作的。 这个问题不是关于安全,而是关于正确性。这段代码可以做各种各样的事情来获得定义明确的结果;只有其中一些实际上产生了正确的结果。 【参考方案1】:

在您给出的示例中,最好/最安全的方法是不启动线程,而只需在循环中更新 v[0]。启动和同步线程的开销可能会超过您通过并行执行某些操作所获得的任何好处。

v 是一个非原子对象 (std::vector&lt;float&gt;) 而v[0] 实际上是一个函数调用。此类对象及其非静态成员函数无法保护自己免受多个线程的并发访问。要在多个线程中使用它们,v(和v[0])的每次直接使用都必须同步。

一般而言,涉及并发执行线程的安全性是通过同步访问由多个线程更新和访问的任何变量(或更一般地说,内存)来实现的。

如果使用互斥锁,这通常意味着所有访问共享数据的线程必须首先获取互斥锁,对共享变量执行操作(例如更新v[0]),然后释放互斥锁。如果一个线程没有抓取(或抓取然后释放)互斥锁,那么它所做的所有操作都不能触及共享变量。

如果您希望通过线程来提高性能,则需要在每个线程中完成大量工作,而无需对共享变量进行任何访问。由于可以同时执行部分工作,因此该工作可能会在更少的总运行时间内执行。为了体现性能优势,收益(例如通过同时执行大量操作)需要超过成本(启动线程、同步访问由多个线程访问的任何数据)。

这在与您显示的代码类似的任何内容中都不太可能发生。

关键是,当线程共享任何数据时,速度和安全性之间总是需要权衡取舍。安全需要同步更新共享变量 - 无一例外。性能提升通常来自不需要同步的事物(即不访问线程之间共享的变量)并且可以并行执行。

【讨论】:

【参考方案2】:

没有一种神奇的技术可以对共享数据进行高性能的并行访问,但有一些您会经常看到的通用技术。

我将使用并行求和数组的示例来回答我的问题,但这些技术非常普遍地适用于许多并行算法。

1) 首先避免共享数据

这可能是最安全、最快的方法。与其让您的工作线程直接更新共享状态,不如让它们每个都使用自己的本地状态,然后让您的主线程组合结果。对于数组求和示例,这可能如下所示:

int main() 
    std::vector<int> toSum = getSomeVector();
    std::vector<int> sums(NUM_THREADS);
    std::vector<std::thread> threads;

    int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
    for (int i = 0; i < NUM_THREADS; ++i) 
        auto chunkBegin = toSum.begin() + (i * chunkSize);
        auto chunkEnd = chunkBegin + chunkSize;

        threads.emplace_back([chunkBegin, chunkEnd](int& result) mutable 
            for (; chunkBegin != chunkEnd; ++chunkBegin) 
                result += *chunkBegin;
            
        , std::ref(sums[i]));
    

    for (std::thread& thd : threads) 
        thd.join();
    

    int finalSum = 0;
    for (int partialSum : sums) 
        finalSum += partialSum;
    

    std::cout << finalSum << '\n';

由于每个线程只对自己的部分和进行操作,因此它们不会相互干扰,也不需要额外的同步。您必须在最后做一些额外的工作才能将所有部分总和相加,但部分结果的数量很少,所以这个开销应该很小。

2) 互斥

您可以使用锁定机制来保护共享状态,而不是让每个线程在自己的状态上运行。通常,这是一个互斥锁,但有许多不同的锁定原语,它们的作用略有不同。这里的重点是确保一次只有一个线程使用共享状态。使用此技术时要非常小心,以避免在紧密循环中访问共享状态。由于一次只有一个线程可以持有锁,因此很容易意外地将您喜欢的并行代码转换回单线程代码,方法是使一次只能有一个线程工作。

例如,考虑以下情况:

int main() 
    std::vector<int> toSum = getSomeVector();
    int sum = 0;
    std::vector<std::thread> threads;

    int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
    std::mutex mtx;
    for (int i = 0; i < NUM_THREADS; ++i) 
        auto chunkBegin = toSum.begin() + (i * chunkSize);
        auto chunkEnd = chunkBegin + chunkSize;

        threads.emplace_back([chunkBegin, chunkEnd, &mtx, &sum]() mutable 
            for (; chunkBegin != chunkEnd; ++chunkBegin) 
                std::lock_guard guard(mtx);
                sum += *chunkBegin;
            
        );
    

    for (std::thread& thd : threads) 
        thd.join();
    

    std::cout << sum << '\n';

由于每个线程在其循环中锁定mtx,因此一次只能有一个线程在做任何工作。这里没有并行化,由于分配线程以及锁定和解锁互斥体的额外开销,此代码可能 比等效的单线程代码。

而是尽量独立地做,并尽可能少地访问你的共享状态。对于此示例,您可以执行与 (1) 中的示例类似的操作,并在每个线程中建立部分总和,最后只将它们添加到共享总和中一次:

int main() 
    std::vector<int> toSum = getSomeVector();
    int sum = 0;
    std::vector<std::thread> threads;

    int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
    std::mutex mtx;
    for (int i = 0; i < NUM_THREADS; ++i) 
        auto chunkBegin = toSum.begin() + (i * chunkSize);
        auto chunkEnd = chunkBegin + chunkSize;

        threads.emplace_back([chunkBegin, chunkEnd, &mtx, &sum]() mutable 
            int partialSum = 0;
            for (; chunkBegin != chunkEnd; ++chunkBegin) 
                partialSum += *chunkBegin;
            
            
                std::lock_guard guard(mtx);
                sum += partialSum;
            
        );
    

    for (std::thread& thd : threads) 
        thd.join();
    

    std::cout << sum << '\n';

3) 原子变量

原子变量是可以在线程之间“安全”共享的变量。它们非常强大,但也很容易出错。您必须担心诸如内存排序约束之类的事情,当您弄错它们时,可能很难调试并找出您做错了什么。

在其核心,原子变量可以实现为一个简单的变量,其操作由互斥锁或类似物保护。神奇之处在于实现,它经常使用特殊的 CPU 指令来协调对 CPU 级别的变量的访问,以避免大量的锁定和解锁开销。

虽然原子不是灵丹妙药。仍然涉及开销,并且您仍然可以通过过于频繁地访问您的原子来击中自己的脚。您的 CPU 进行了大量缓存,并且让多个线程写入原子变量可能意味着将内容溢出回内存,或者至少溢出到更高级别的缓存。再一次,如果您可以避免在线程中使用紧密循环访问共享状态,您应该这样做:

int main() 
    std::vector<int> toSum = getSomeVector();
    std::atomic<int> sum(0);
    std::vector<std::thread> threads;

    int chunkSize = std::ceil(toSum.size() / (float)NUM_THREADS);
    for (int i = 0; i < NUM_THREADS; ++i) 
        auto chunkBegin = toSum.begin() + (i * chunkSize);
        auto chunkEnd = chunkBegin + chunkSize;


        threads.emplace_back([chunkBegin, chunkEnd, &sum]() mutable 
            int partialSum = 0;
            for (; chunkBegin != chunkEnd; ++chunkBegin) 
                partialSum += *chunkBegin;
            
            // Since we don't care about the order that the threads update the sum,
            // we can use memory_order_relaxed.  This is a rabbit-hole I won't get
            // too deep into here though.
            sum.fetch_add(partialSum, std::memory_order_relaxed);
        );
    

    for (std::thread& thd : threads) 
        thd.join();
    

    std::cout << sum << '\n';

【讨论】:

以上是关于C ++中多线程的安全性与速度[关闭]的主要内容,如果未能解决你的问题,请参考以下文章

java中多线程并发并行线程与进程线程调度创建线程的方式

C中多线程的同步

Java中多线程,synchronized,与 AtomicInteger的问题

C++11中多线程例子

C++11中多线程例子

python中多进程和多线程的区别