C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)

Posted 小丑快学习

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)相关的知识,希望对你有一定的参考价值。

accumulate

1.利用数组进行线程划分

//仿函数
template<typename Iterator, typename T>
struct accumulate_block
{
    void operator()(Iterator first, Iterator last, T& result)
    {
        result = std::accumulate(first, last, result); // 1 
    }
};

template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
    unsigned long const length = std::distance(first, last); // 2 
    //长度为零直接返回
    if (!length)
        return init;

    unsigned long const min_per_thread = 25;//每个线程处理的数据
    unsigned long const max_threads =
        (length + min_per_thread - 1) / min_per_thread;

    unsigned long const hardware_threads =
        std::thread::hardware_concurrency();

    unsigned long const num_threads =
        std::min(hardware_threads != 0 ?
            hardware_threads : 2, max_threads);

    unsigned long const block_size = length / num_threads;

    std::vector<T> results(num_threads); // 3 存放每一块的处理结果

    std::vector<std::thread> threads(num_threads - 1); // 4 存放所有线程
    
    Iterator block_start = first; // 5 
    //创建线程并放入vector中
    for (unsigned long i = 0; i < (num_threads - 1); ++i)
    {
        Iterator block_end = block_start; // 6 
        std::advance(block_end, block_size);//定位block_end
        threads[i] = std::thread( // 7 
            accumulate_block<Iterator, T>(),
            block_start, block_end, std::ref(results[i]));
        block_start = block_end; // 8 
    }
    //主线程完成最后一块数据的计算
    accumulate_block()(block_start, last, results[num_threads - 1]);
    // 9 设置线程回收
    std::for_each(threads.begin(), threads.end(),
        std::mem_fn(&std::thread::join));
    //汇总结果并返回
    return std::accumulate(results.begin(), results.end(), init);
    }

上述代码并不是线程安全的代码,因为并没有对线程抛出的异常进行处理,不管是是主线程还是辅助线程,都没有对异常处理,因此,一旦发生异常程序将会终止。因为c++ future是异常安全的,因此,可以采用future进行结果的保存。如果相关任务抛出一个异常,那么异常就会被future捕捉到,并且使用get()的时 候获取数据时,这个异常会再次抛出。这样工作线程抛出的异常将会传递到主线程中。

/*
    线程安全的并行求和算法
*/
template<typename Iterator,typename T> 
struct accumulate_block 
{ 
	T operator()(Iterator first,Iterator last) // 1 
	{ 
		return std::accumulate(first,last,T()); // 2 
	} 
}; 

template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
    unsigned long const length = std::distance(first, last);
    if (!length)
        return init;
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads =
        (length + min_per_thread - 1) / min_per_thread;
    unsigned long const hardware_threads =
        std::thread::hardware_concurrency();
    unsigned long const num_threads =
        std::min(hardware_threads != 0 ?
            hardware_threads : 2, max_threads);
    unsigned long const block_size = length / num_threads;
    std::vector<std::future<T> > futures(num_threads - 1); // 3 存放各个线程计算的结果
    std::vector<std::thread> threads(num_threads - 1); 
    Iterator block_start = first;
    for (unsigned long i = 0; i < (num_threads - 1); ++i)
    {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);//移动迭代器block_size个单位
        std::packaged_task<T(Iterator, Iterator)> task( // 4 packaged绑定一个线程的运算
            accumulate_block<Iterator, T>());
        futures[i] = task.get_future(); // 5 将计算结果的future放置于对应的位置
        threads[i] = std::thread(std::move(task), block_start, block_end);
        // 6
        block_start = block_end;
    }
    T last_result = accumulate_block()(block_start, last); // 7 主线程计算最后一个数据块
    std::for_each(threads.begin(), threads.end(),
        std::mem_fn(&std::thread::join));
    T result = init; // 8 
    for (unsigned long i = 0; i < (num_threads - 1); ++i)
    {
        result += futures[i].get(); // 9 
    }
    result += last_result; // 10 
    return result;
}

实际上对线程的join应该考虑这样的一种情况,那就是当主线程发生异常时,还未对工作线程调用join函数,这样就有可能导致线程泄露,因此解决这种问题的惯用手段就是运用RAII,将线程容器放入到一个类中,然后由该类的析构函数对所有线程进行析构。因此,最后可以得到这样的并行求和函数。

class join_threads
{
    std::vector<std::thread>& threads;
public:
    explicit join_threads(std::vector<std::thread>& threads_) :
        threads(threads_)
    {}
    ~join_threads()
    {
        for (unsigned long i = 0; i < threads.size(); ++i)
        {
            if (threads[i].joinable())
                threads[i].join();
        }
    }
};

template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
    unsigned long const length = std::distance(first, last);
    if (!length)
        return init;
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads =
        (length + min_per_thread - 1) / min_per_thread;
    unsigned long const hardware_threads =
        std::thread::hardware_concurrency();
    unsigned long const num_threads =
        std::min(hardware_threads != 0 ?
            hardware_threads : 2, max_threads)
        unsigned long const block_size = length / num_threads;
    std::vector<std::future<T> > futures(num_threads - 1);
    std::vector<std::thread> threads(num_threads - 1);
    join_threads joiner(threads); // 1 当主线程正常结束或者发生异常时,工作线程都可以完成回收。
    Iterator block_start = first;
    for (unsigned long i = 0; i < (num_threads - 1); ++i)
    {
        Iterator block_end = block_start;
        std::advance(block_end, block_size);
        std::packaged_task<T(Iterator, Iterator)> task(
            accumulate_block<Iterator, T>());
        futures[i] = task.get_future();
        threads[i] = std::thread(std::move(task), block_start, block_end);
        block_start = block_end;
    }
    T last_result = accumulate_block()(block_start, last);
    T result = init;
    for (unsigned long i = 0; i < (num_threads - 1); ++i)
    {
        result += futures[i].get(); // 2 
    }
    result += last_result;
    return result;
}

使用async保证线程安全

使用async能够保证线程异常安全,并且async能保证硬件资源的充分使用,并不会产生过多的线程。

template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
    unsigned long const length = std::distance(first, last); // 1 
    unsigned long const max_chunk_size = 25;
    if (length <= max_chunk_size)
    {
        return std::accumulate(first, last, init); // 2 
    }
    else
    {
        Iterator mid_point = first;
        std::advance(mid_point, length / 2); // 3 
        std::future<T> first_half_result =
            std::async(parallel_accumulate<Iterator, T>, // 4 
                first, mid_point, init);
        T
            second_half_result = parallel_accumulate(mid_point, last, T()); // 
        5
            return first_half_result.get() + second_half_result; // 6 
    }
}

for_each

线程划分

class join_threads {
private:
	std::vector<std::thread> & threads;
public:
	join_threads(vector<thread> & vec): threads(vec){
	}
	~join_threads() {
		for (auto & i : threads) {
			if (i.joinable()) {i.join();}
		}
	}
};


template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
{
	unsigned long const length = std::distance(first, last);
	if (!length)
		return;
	unsigned long const min_per_thread = 25;
	unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
	unsigned long const hardware_threads = std::thread::hardware_concurrency();
	unsigned long const num_threads = std::min(hardware_threads != 0 ?
												hardware_threads : 2, max_threads);
	unsigned long const block_size = length / num_threads;
	std::vector<std::future<void> > futures(num_threads - 1); // 1 
	std::vector<std::thread> threads(num_threads - 1);
	join_threads joiner(threads);//joiner 是一种RAII手段,用于管理线程组,防止放生时来不及回收线程
	Iterator block_start = first;
	for (unsigned long i = 0; i < (num_threads - 1); ++i)
	{
		Iterator block_end = block_start;
		std::advance(block_end, block_size);
		std::packaged_task<void(void)> task( [=](){ //2
				std::for_each(block_start, block_end, f);
			});
		futures[i] = task.get_future();
		threads[i] = std::thread(std::move(task)); // 3 
		block_start = block_end;
	}
	std::for_each(block_start, last, f);
	for (unsigned long i = 0; i < (num_threads - 1); ++i)
	{
		futures[i].get(); // 4 没有返回值时,该行的作用为检索是否有工作线程发生异常
	}
}

利用async

template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
{
	unsigned long const length = std::distance(first, last);
	if (!length)
		return;
	unsigned long const min_per_thread = 25;
	if (length < (2 * min_per_thread))
	{
		std::for_each(first, last, f); // 1 如果数据量比较小仅用一个线程
	}
	else
	{
		Iterator const mid_point = first + length / 2;
		std::future<void> first_half = // 2 递归操作前半部分
			std::async(&parallel_for_each<Iterator, Func>,
				first, mid_point, f);
		parallel_for_each(mid_point, last, f); // 3 操作后半部分
		first_half.get(); // 4 
	}
}

find

find操作需要明确是否有其他线程已经完成查找操作,如果有线程已经完成查找操作,则所有线程应该停止查找。
一种办法,中断其他线程的一个办法就是使用一个原子变量作为一个标识,在处理过每一个 元素后就对这个标识进行检查。如果标识被设置,那么就有线程找到了匹配元素,所以算法 就可以停止并返回了。用这种方式来中断线程,就可以将那些没有处理的数据保持原样,并 且在更多的情况下,相较于串行方式,性能能提升很多。缺点就是,加载原子变量是一个很 慢的操作,会阻碍每个线程的运行。

template<typename Iterator, typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType
	match)
{
	struct find_element // 1 在范围内查找,并进行异常处理
	{
		void operator()(Iterator begin, Iterator end,
			MatchType match,
			std::promise<Iterator>* result,
			std::atomic<bool>* done_flag)
		{
			try
			{
				for ( ; (begin != end) && !done_flag->load(); ++begin) // 2 
				{
					if (*begin == match)//如果查找成功
					{
						result->set_value(begin); // 3 查找成功
						done_flag->store(true); // 4 设置标志
						return;
					}
				}
			}
			catch (...) // 5 
			{
				try
				{
					result->set_exception(std::current_exception()); // 6 发生异常则所有线程停止查找
					done_flag->store(true);
				}
				catch (...) // 7 
				{
				}
			}
		}
	};
	unsigned long const length = std::distance(first, last);
	if (!length)
		C++11 并发编程基础:并发并行与C++多线程

为啥 C++ 标准库中没有线程池?

c++多线程编程:实现标准库accumulate函数的并行计算版本

Manning新书C++并行实战,592页pdf,C++ Concurrency in Action

C++并发编程----无锁实现线程安全队列(《C++ Concurrency in Action》 读书笔记)

现代 C++ 并发编程基础