《C++ Concurrency in Action》读书笔记三 同步并发操作

Posted sesiria

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《C++ Concurrency in Action》读书笔记三 同步并发操作相关的知识,希望对你有一定的参考价值。

本章要点

*等待事件

*使用futures等待一次性事件(waiting for one-off events with futures)

*等待时间限制

*使用同步操作来简化代码

这章主要描述了如何使用条件变量和futures来等待事件,以及如何使用他们来使线程同步操作更加简化.


CP4

1. 等待事件或者其他条件

a.如果一个线程需要等待另外一个线程处理的结果,可以采取不停的检测共享资源,等另外一个线程完成特定操作以后会修改共享资源。该线程检测到修改以后执行后续的操作,但这种方法需要不停的检测共享资源效率低下会浪费大量的系统资源。

b.使用std::this_thread::sleep_for()让等待线程sleep一会儿

bool flag;
std::mutex m;

void wait_for_flag()
{
    std::unique_lock<std::mutex> lk(m);
    while(!flag)
    {
        lk.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        lk.lock();
    }
}
但是这种方法有弊端,太短的sleep时间会浪费系统资源,太长的sleep时间会导致目标线程完成了工作以后等待线程依然在sleep.

c. 使用条件变量(condition variable)完成特定条件以后唤醒sleep中的线程

1) std::condition_variable  std::condition_variable_any.  需要#include <condition_variable>

前者需要配合std::mutex来使用,后者可以配合任意类mutex对象。

#include <queue>
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <sstream>


std::mutex mut;
std::queue<int> data_queue;
std::condition_variable data_cond;

//bool more_data_to_prepared();

void data_preparation_thread()
{
    for(int i = 0; i<= 99; i++)
    {
        int data = i;
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);
        data_cond.notify_one();
    }
}

void data_processing_thread()
{
    while(true)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(
            lk, []{return !data_queue.empty();});
        int data = data_queue.front();
        data_queue.pop();
        lk.unlock();
        std::cout<<"data poped! "<<data<<std::endl;
        if(data == 99)
            break;
    }
}

int main()
{
    std::thread t1(data_preparation_thread);
    std::thread t2(data_processing_thread);
    t1.join();
    t2.join();
    return 0;
}

使用队列(std::deque)来进行线程之间传递数据也是一种常用的方法,能减少许多同步问题和资源竞争问题。

2)使用条件变量的线程安全的队列

#include <memory>
#include <queue>
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>


template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut; //the mutex must be mutable because it will be  modified by the const function.
    std::queue<T> data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {
    }
    threadsafe_queue(const threadsafe_queue& other)
    {
        std::lock_guard<std::mutex> lk(other.mut);
        data_queue = other.data_queue;
    }
    threadsafe_queue& operator=(const threadsafe_queue&) = delete;


    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(new_value);
        data_cond.notify_one();
    }




    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return false;
        value = data_queue.pop();
        return true;
    }


    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return false;
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }


    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk, [this]{return !data_queue.empty();});
        std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
        data_queue.pop();
        return res;
    }


    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }
};

一个使用条件变量来控制三个线程按照固定的顺序循环输出A,B,C的例子

#include <memory>
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>

class threads_rolling
{
private:
    std::mutex mut;
    std::condition_variable m_cond;
    char m_flag;
    bool m_firstrun;
public:
    threads_rolling()
        :m_flag('A'),
        m_firstrun(true)
    {
    }
    void thread_A()
    {
        while(true)
        {
            std::unique_lock<std::mutex> lk(mut);
            if(m_firstrun)
            {
                m_firstrun = false;
            }else
            {
                m_cond.wait(lk, [=]{return m_flag == 'A';});
            }
            m_flag = 'B';
            std::cout<<"Output from thread A!"<<std::endl;
            m_cond.notify_all();
        }

    }

    void thread_B()
    {
        while(true)
        {
            std::unique_lock<std::mutex> lk(mut);
            m_cond.wait(lk, [=]{return m_flag == 'B';});
            m_flag = 'C';
            std::cout<<"Output from thread B!"<<std::endl;
            m_cond.notify_all();
        }
    }

    void thread_C()
    {
        while(true)
        {
            std::unique_lock<std::mutex> lk(mut);
            m_cond.wait(lk, [=]{return m_flag == 'C';});
            m_flag = 'A';
            lk.unlock();
            std::cout<<"Output from thread C!"<<std::endl;
            m_cond.notify_all();
        }
    }

};

int main()
{
    threads_rolling threads;
    std::thread t1(&threads_rolling::thread_A, &threads);
    std::thread t2(&threads_rolling::thread_B, &threads);
    std::thread t3(&threads_rolling::thread_C, &threads);
    t1.join();
    t2.join();
    t3.join();
    //use the ctrl+c to exit the test program!
    return 0;
}

2.使用futures来等待一次性的事件

std::furute<> 仅有一个future关联一个event

std::shared_future<>  多个future可以关联同一个event

1)从后台任务返回值

a.从一个异步任务返回值std::async() 语法同std::thread()

#include <memory>
#include <queue>
#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <condition_variable>

int find_the_answer_to_ltuae();

void do_other_stuff();

int main()
{
    std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
    do_other_stuff();
    std::cout<<"The answer is "<<the_answer.get()<<std::endl;
    return 0;
}

std:async()的基本用法

struct X
{
    void foo(int, std::string const&);
    std::string bar(std::string const&);
};

X x;
auto f1 = std::async(&X::foo, &x, 42, "Hello");
auto f2 = std::async(&X::bar, x, "goodbye");

struct Y
{
    double operator()(double);
};
Y y;
auto f3 = std::async(Y(), 3.141);
auto f4 = std::async(std::ref(y), 2.718);
X baz(X&);
std::async(baz, std::ref(X));

class move_only
{
public:
    move_only();
    move_only(move_only&&);
    move_only(move_only const&) = delete;
    move_only& operator = (move_only&&);
    move_only& operator = (move_only const&) = delete;
    
    void operator()();
};
auto f5 = std::async(move_only());


auto f6 = std::async(std::launch::async, Y(), 1.2); // run in new thread.
auto f7 = std::async(std::launch::deferred, baz, std::ref(x));  //run in wait() or get()
auto f8 = std::async(
        std::launch::deferred | std::launch::async,
        baz, std::ref(x));  //Implementation chooses
auto f9 = std::async(baz, std::ref(x));
f7.wait();  //invoke deferred function.
2)使用future来联合任务

std::packaged_task 包装一个可调用的对象,并且允许异步获取该可调用对象产生的结果,从包装可调用对象意义上来讲,std::packaged_task 与 std::function 类似,只不过 std::packaged_task 将其包装的可调用对象的执行结果传递给一个 std::future 对象(该对象通常在另外一个线程中获取 std::packaged_task 任务的执行结果)。

std::packaged_task 对象内部包含了两个最基本元素,一、被包装的任务(stored task),任务(task)是一个可调用的对象,如函数指针、成员函数指针或者函数对象,二、共享状态(shared state),用于保存任务的返回值,可以通过 std::future 对象来达到异步访问共享状态的效果。

可以通过 std::packged_task::get_future 来获取与共享状态相关联的 std::future 对象。在调用该函数之后,两个对象共享相同的共享状态,具体解释如下:

std::packaged_task 对象是异步 Provider,它在某一时刻通过调用被包装的任务来设置共享状态的值。
std::future 对象是一个异步返回对象,通过它可以获得共享状态的值,当然在必要的时候需要等待共享状态标志变为 ready.

#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <chrono>

int countdown(int start, int end)
{
    for(int i = start; i!= end; --i)
    {
        std::cout<<i<<std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }
    std::cout<<"Finished!\n";
    return start - end;
}

int main()
{
    std::packaged_task<int(int, int)> task(countdown);
    std::future<int> ret = task.get_future();

    std::thread th(std::move(task), 10, 0);

    int value = ret.get();

    std::cout<< "The countdown lasted for " <<value <<" seconds.\n";
    th.join();

    return 0;
}


std::packaged_task<> 的模板参数是函数签名 类似 int(std::string&, double*) 标示参数类型和返回类型。例子

template<>
class packaged_task<std::string(std::vector<char>*, int)>
{
public:
    template<typename Callable>
    explicit packaged_task(Callable&& f);
    std::future<std::string> get_future();
    void operator()(std::vector<char>*, int);
};

std::packaged_task成为一个函数对象,std::function, 可以作为参数传递给std::thread

一个在线程之间传递任务的例子

#include <deque>
#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <utility>

std::mutex m;
std::deque<std::packaged_task<void()>> tasks;

bool gui_shutdown_message_received();
void get_and_process_gui_message();

void gui_thread()
{
    while(!gui_shutdown_message_received())
    {
        get_and_process_gui_message();
        std::packaged_task<void()> task;
        {
            std::lock_guard<std::mutex> lk(m);
            if(tasks.empty())
                continue;
            task = std::move(tasks.front());
            tasks.pop_front();
        }
        task();
    }
}

std::thread gui_bg_thread(gui_thread);

template<typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
    std::packaged_task<void()> task(f);
    std::future<void> res = task.get_future();
    std::lock_guard<std::mutex> lk(m);
    tasks.push_back(std::move(task));
    return res;
}

3)使用std::promise

promise 对象可以保存某一类型 T 的值,该值可被 future 对象读取(可能在另外一个线程中),因此 promise 也提供了一种线程同步的手段。在 promise 对象构造时可以和一个共享状态(通常是std::future)相关联,并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值。


可以通过 get_future 来获取与该 promise 对象相关联的 future 对象,调用该函数之后,两个对象共享相同的共享状态(shared state)


promise 对象是异步 Provider,它可以在某一时刻设置共享状态的值。
future 对象可以异步返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标志变为 ready,然后才能获取共享状态的值。

#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <utility>

void print_string(std::future<std::string>& fut)
{
    std::string str = fut.get();
    std::cout<<"The string is: "<<str<<std::endl;
}

int main()
{
    std::promise<std::string> prom;
    std::future<std::string> fut = prom.get_future(); // associate with the future object.
    std::thread t(print_string, std::ref(fut));
    prom.set_value("Hello world!");
    t.join();

    return 0;
}
4)在future中保存异常

some_promise.set_exception(std::current_exception());



一个使用std::promised 单线程来管理多连接的例子

#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <utility>

void process_connections(connection_set & connections)
{
    while(!done(connections))
    {
        for(connection_iterator connection = connections.begin(), end = connections.end();
            connection! = end;
            ++connection)
        {
            if(connection->has_incoming_data())
            {
                data_packet data = connection->incoming();
                std::promise<payload_type>& p = connection->get_promise(data.id);
                p.set_value(data.payload);
            }
            if(connection->has_outgoing_data())
            {
                outgoing_packet data = connection->top_of_outgoing_queue();
                connection->send(data.payload);
                data.promise.set_value(true);
            }

        }
    }
}

5) 等待多个线程

多个线程同时访问一个std::future 会产生资源竞争的问题因为future的get()函数只能被调用一次,只后不会再有返回对象。

如果多个线程需要等待同一个事件的话,使用std::shared_future

std::promise<int> p;
std::future<int> f(p.get_future());
std::shared_future<int> sf(std::move(f));

4. 等待一个时间限制

有的时候客户不想等待,需要设定一个时间上限。

condition variable的两个成员

wait_for()

wait_until()

1)clocks

clock是一个类并且具有以下特诊

* 当前的时间  std::chrono::system_clock::now()

* 一个用来表示时间的值 some_clock::time_point

* 时钟的触发 std::ratio<1, 25>  表示1秒触发25次

* steady clock (稳固时钟)触发频率是稳定不变不可调的

std::chrono::high_resolution_clock

2)Durations

std::chrono::duration<60,1>    // 1minute

std::chrono::duration<1, 1000>  //  1ms

std::chrono::milliseconds ms(54802);

std::chrono::seconds s = std::chrono::duration_cast<std::chrono::seconds>(ms); // convert from ms to seconds.


std::chrono::milliseconds(1234).count() = 1234


std::future<int> f = std::async(some_task);

if(f.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready)

    do_something_with(f.get());

3) time point 时间点

std::chrono::time_point<> 第一个参数是clock ,第二个参数是duration

std::chrono::time_point<std::chrono::system_clock, std::chrono::minutes> 参考系统时间,根据分钟来衡量

使用时间限制来等待条件变量的例子

#include <iostream>
#include <mutex>
#include <thread>
#include <future>
#include <chrono>
#include <condition_variable>


std::condition_variable cv;
bool done;
std::mutex m;

bool wait_loop()
{
    auto const timeout = std::chrono::steady_clock::now() +
        std::chrono::milliseconds(500);
    std::unique_lock<std::mutex> lk(m);
    while(!done)
    {
        if(cv.wait_until(lk, timeout) == std::cv_status::timeout)
            break;
    }
    return done;
}

4)支持timeout的函数

4. 使用同步操作来简化代码

1)使用futures的编程

函数的返回值只和参数的类型和参数值有关,与其他任何状态无关。不发生共享资源的修改,不会参数资源竞争条件

一个快速排序的例子

template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
    if(input.empty())
    {
        return input;
    }
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const* pivot = *result.begin();

    auto divide_point = std::partition(input.begin(), input.end(),
        [&](T const& t){return t<pivot;});

    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);

    auto new_lower(sequential_quick_sort(std::move(lower_part)));
    auto new_higher(sequential_quick_sort(std::move(input)));

    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower);
    return result;
}

使用future改成了并行排序

template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
    if(input.empty())
    {
        return input;
    }
    std::list<T> result;
    result.splice(result.begin(), input, input.begin());
    T const* pivot = *result.begin();

    auto divide_point = std::partition(input.begin(), input.end(),
        [&](T const& t){return t<pivot;});

    std::list<T> lower_part;
    lower_part.splice(lower_part.end(), input, input.begin(), divide_point);


    std::future<std::list<T>> new_lower(std::async(¶llel_quick_sort<T>, std::move(lower_part)));
    //auto new_lower(parallel_quick_sort(std::move(lower_part)));
    auto new_higher(parallel_quick_sort(std::move(input)));

    result.splice(result.end(), new_higher);
    result.splice(result.begin(), new_lower.get());
    return result;
}

template<typename F, typename A>
std::future<std::result_of<F(A&&)::type> spawn_task(F&& f, A&& a)
{
    typedef std::result_of<F(A&&)>::type result_type;
    std::packaged_task<result_type(A&&)> task(std::move(f));
    std::future<result_type> res(task.get_future());
    std::thread  t(std::move(task), std::move(a));
    t.detach();
    return res;
}

2)使用同步来传递消息


以上是关于《C++ Concurrency in Action》读书笔记三 同步并发操作的主要内容,如果未能解决你的问题,请参考以下文章

C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)

C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)

C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)

Manning新书C++并行实战,592页pdf,C++ Concurrency in Action

C++并发编程----利用锁实现线程安全的数据结构(《C++ Concurrency in Action》 读书笔记)

C++并发编程----并发代码的设计(《C++ Concurrency in Action》 读书笔记)