C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)
Posted 小丑快学习
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)相关的知识,希望对你有一定的参考价值。
accumulate
1.利用数组进行线程划分
//仿函数
template<typename Iterator, typename T>
struct accumulate_block
void operator()(Iterator first, Iterator last, T& result)
result = std::accumulate(first, last, result); // 1
;
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
unsigned long const length = std::distance(first, last); // 2
//长度为零直接返回
if (!length)
return init;
unsigned long const min_per_thread = 25;//每个线程处理的数据
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ?
hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<T> results(num_threads); // 3 存放每一块的处理结果
std::vector<std::thread> threads(num_threads - 1); // 4 存放所有线程
Iterator block_start = first; // 5
//创建线程并放入vector中
for (unsigned long i = 0; i < (num_threads - 1); ++i)
Iterator block_end = block_start; // 6
std::advance(block_end, block_size);//定位block_end
threads[i] = std::thread( // 7
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]));
block_start = block_end; // 8
//主线程完成最后一块数据的计算
accumulate_block()(block_start, last, results[num_threads - 1]);
// 9 设置线程回收
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
//汇总结果并返回
return std::accumulate(results.begin(), results.end(), init);
上述代码并不是线程安全的代码,因为并没有对线程抛出的异常进行处理,不管是是主线程还是辅助线程,都没有对异常处理,因此,一旦发生异常程序将会终止。因为c++ future是异常安全的,因此,可以采用future进行结果的保存。如果相关任务抛出一个异常,那么异常就会被future捕捉到,并且使用get()的时 候获取数据时,这个异常会再次抛出。这样工作线程抛出的异常将会传递到主线程中。
/*
线程安全的并行求和算法
*/
template<typename Iterator,typename T>
struct accumulate_block
T operator()(Iterator first,Iterator last) // 1
return std::accumulate(first,last,T()); // 2
;
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ?
hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<T> > futures(num_threads - 1); // 3 存放各个线程计算的结果
std::vector<std::thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
Iterator block_end = block_start;
std::advance(block_end, block_size);//移动迭代器block_size个单位
std::packaged_task<T(Iterator, Iterator)> task( // 4 packaged绑定一个线程的运算
accumulate_block<Iterator, T>());
futures[i] = task.get_future(); // 5 将计算结果的future放置于对应的位置
threads[i] = std::thread(std::move(task), block_start, block_end);
// 6
block_start = block_end;
T last_result = accumulate_block()(block_start, last); // 7 主线程计算最后一个数据块
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join));
T result = init; // 8
for (unsigned long i = 0; i < (num_threads - 1); ++i)
result += futures[i].get(); // 9
result += last_result; // 10
return result;
实际上对线程的join应该考虑这样的一种情况,那就是当主线程发生异常时,还未对工作线程调用join函数,这样就有可能导致线程泄露,因此解决这种问题的惯用手段就是运用RAII,将线程容器放入到一个类中,然后由该类的析构函数对所有线程进行析构。因此,最后可以得到这样的并行求和函数。
class join_threads
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::thread>& threads_) :
threads(threads_)
~join_threads()
for (unsigned long i = 0; i < threads.size(); ++i)
if (threads[i].joinable())
threads[i].join();
;
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
unsigned long const length = std::distance(first, last);
if (!length)
return init;
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ?
hardware_threads : 2, max_threads)
unsigned long const block_size = length / num_threads;
std::vector<std::future<T> > futures(num_threads - 1);
std::vector<std::thread> threads(num_threads - 1);
join_threads joiner(threads); // 1 当主线程正常结束或者发生异常时,工作线程都可以完成回收。
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<T(Iterator, Iterator)> task(
accumulate_block<Iterator, T>());
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task), block_start, block_end);
block_start = block_end;
T last_result = accumulate_block()(block_start, last);
T result = init;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
result += futures[i].get(); // 2
result += last_result;
return result;
使用async保证线程安全
使用async能够保证线程异常安全,并且async能保证硬件资源的充分使用,并不会产生过多的线程。
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
unsigned long const length = std::distance(first, last); // 1
unsigned long const max_chunk_size = 25;
if (length <= max_chunk_size)
return std::accumulate(first, last, init); // 2
else
Iterator mid_point = first;
std::advance(mid_point, length / 2); // 3
std::future<T> first_half_result =
std::async(parallel_accumulate<Iterator, T>, // 4
first, mid_point, init);
T
second_half_result = parallel_accumulate(mid_point, last, T()); //
5
return first_half_result.get() + second_half_result; // 6
for_each
线程划分
class join_threads
private:
std::vector<std::thread> & threads;
public:
join_threads(vector<thread> & vec): threads(vec)
~join_threads()
for (auto & i : threads)
if (i.joinable()) i.join();
;
template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;
unsigned long const hardware_threads = std::thread::hardware_concurrency();
unsigned long const num_threads = std::min(hardware_threads != 0 ?
hardware_threads : 2, max_threads);
unsigned long const block_size = length / num_threads;
std::vector<std::future<void> > futures(num_threads - 1); // 1
std::vector<std::thread> threads(num_threads - 1);
join_threads joiner(threads);//joiner 是一种RAII手段,用于管理线程组,防止放生时来不及回收线程
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
Iterator block_end = block_start;
std::advance(block_end, block_size);
std::packaged_task<void(void)> task( [=]() //2
std::for_each(block_start, block_end, f);
);
futures[i] = task.get_future();
threads[i] = std::thread(std::move(task)); // 3
block_start = block_end;
std::for_each(block_start, last, f);
for (unsigned long i = 0; i < (num_threads - 1); ++i)
futures[i].get(); // 4 没有返回值时,该行的作用为检索是否有工作线程发生异常
利用async
template<typename Iterator, typename Func>
void parallel_for_each(Iterator first, Iterator last, Func f)
unsigned long const length = std::distance(first, last);
if (!length)
return;
unsigned long const min_per_thread = 25;
if (length < (2 * min_per_thread))
std::for_each(first, last, f); // 1 如果数据量比较小仅用一个线程
else
Iterator const mid_point = first + length / 2;
std::future<void> first_half = // 2 递归操作前半部分
std::async(¶llel_for_each<Iterator, Func>,
first, mid_point, f);
parallel_for_each(mid_point, last, f); // 3 操作后半部分
first_half.get(); // 4
find
find操作需要明确是否有其他线程已经完成查找操作,如果有线程已经完成查找操作,则所有线程应该停止查找。
一种办法,中断其他线程的一个办法就是使用一个原子变量作为一个标识,在处理过每一个 元素后就对这个标识进行检查。如果标识被设置,那么就有线程找到了匹配元素,所以算法 就可以停止并返回了。用这种方式来中断线程,就可以将那些没有处理的数据保持原样,并 且在更多的情况下,相较于串行方式,性能能提升很多。缺点就是,加载原子变量是一个很 慢的操作,会阻碍每个线程的运行。
template<typename Iterator, typename MatchType>
Iterator parallel_find(Iterator first, Iterator last, MatchType
match)
struct find_element // 1 在范围内查找,并进行异常处理
void operator()(Iterator begin, Iterator end,
MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag)
try
for ( ; (begin != end) && !done_flag->load(); ++begin) // 2
if (*begin == match)//如果查找成功
result->set_value(begin); // 3 查找成功
done_flag->store(true); // 4 设置标志
return;
catch (...) // 5
try
result->set_exception(std::current_exception()); // 6 发生异常则所有线程停止查找
done_flag->store(true);
catch (...) // 7
;
unsigned long const length = std::distance(first, last);
if (!length)
return last;
unsigned long const min以上是关于C++并发编程----异常安全的并行算法(《C++ Concurrency in Action》 读书笔记)的主要内容,如果未能解决你的问题,请参考以下文章
c++多线程编程:实现标准库accumulate函数的并行计算版本
Manning新书C++并行实战,592页pdf,C++ Concurrency in Action