Cartographer中的线程池操作
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Cartographer中的线程池操作相关的知识,希望对你有一定的参考价值。
0. 简介
在设计复杂的运行程序时,我们经常需要创建一定数量的线程,然而很多时候线程不都是一直执行的,会存在一些线程处于空闲状态。所以通过线程池的方式,可以有效的对线程进行分配。若线程池中有空闲线程,则从线程池中取出一个空闲的线程处理该任务,任务处理完后,该线程被放到线程池中;若线程池中无空闲线程,则将任务放入任务队列等待线程池中有线程空闲,这样的处理方式可以避免线程在建立与销毁时存在的开销。
1. 基础知识
- 多线程
- 原子操作
- 智能指针
2. Cartographer中的线程池
关键参数
- 任务队列(task_queue_):每个线程的DoWork()线程空闲时都会通过反复读取该队列来获得任务,各线程通过互斥锁防止同时读取。
- 等待队列(tasks_not_ready_):尚未ready的任务队列,其其依赖的任务(dependent_tasks_)还没准备好,直到dependent_tasks_都完成了,主任务(Task)才能执行将tasks_not_ready转为task_queue_。
- 任务依赖(dependent_tasks_):dependent_tasks_都完成了,主任务(Task)才能执行
核心函数
创建线程池:
ThreadPool::ThreadPool(int num_threads)
absl::MutexLock locker(&mutex_);
for (int i = 0; i != num_threads; ++i)
pool_.emplace_back([this]() ThreadPool::DoWork(); );
thread_pool::DoWork():
初始化的时候每个线程都执行该死循环函数ThreadPool::DoWork(),并直到析构才返回,只要task_queue有任务,就执行操作。
void ThreadPool::DoWork()
#ifdef __linux__
// This changes the per-thread nice level of the current thread on Linux. We
// do this so that the background work done by the thread pool is not taking
// away CPU resources from more important foreground threads.
CHECK_NE(nice(10), -1);
#endif
const auto predicate = [this]() EXCLUSIVE_LOCKS_REQUIRED(mutex_)
return !task_queue_.empty() || !running_;
;
for (;;)
std::shared_ptr<Task> task;
absl::MutexLock locker(&mutex_);
mutex_.Await(absl::Condition(&predicate));
if (!task_queue_.empty())
task = std::move(task_queue_.front());
task_queue_.pop_front();
else if (!running_)
return;
CHECK(task);
CHECK_EQ(task->GetState(), common::Task::DEPENDENCIES_COMPLETED);
Execute(task.get());
在初始化成功并拿到task_queue_后,该Task将会按以下几个状态顺序执行:
enum State NEW, DISPATCHED, DEPENDENCIES_COMPLETED, RUNNING, COMPLETED ;
- NEW(新建任务,还未schedule到线程池)
- DISPATCHED(任务已经schedule到线程池)
- DEPENDENCIES_COMPLETED(任务依赖已经执行完成)
- RUNNING(任务执行中)
- COMPLETED(任务完成)
task->SetWorkItem(task)
新建task实例,状态默认为NEW,然后通过task->SetWorkItem设置任务(示例中运行的函数为DrainWorkQueue)
auto scan_matcher_task = absl::make_unique<common::Task>();
scan_matcher_task->SetWorkItem(
[&submap_scan_matcher, &scan_matcher_options]()
submap_scan_matcher.fast_correlative_scan_matcher =
absl::make_unique<scan_matching::FastCorrelativeScanMatcher2D>(
*submap_scan_matcher.grid, scan_matcher_options);
);
submap_scan_matcher.creation_task_handle =
thread_pool_->Schedule(std::move(scan_matcher_task));
task2->AddDependency(task1)
该函数是可选的,有依赖的任务才需添加,其含义是task2依赖于task1,只有在task1执行完后,task2才能执行。
auto constraint_task = absl::make_unique<common::Task>();
constraint_task->SetWorkItem([=]() LOCKS_EXCLUDED(mutex_)
ComputeConstraint(submap_id, submap, node_id, true, /* match_full_submap */
constant_data, transform::Rigid2d::Identity(),
*scan_matcher, constraint);
);
constraint_task->AddDependency(scan_matcher->creation_task_handle);
auto constraint_task_handle =
thread_pool_->Schedule(std::move(constraint_task));
thread_pool->Schedule(task)
将task赋给tasks_not_ready_并将task状态变为DISPATCHED
,判断其依赖的任务是否加载完成,若完成则将状态置为DEPENDENCIES_COMPLETED
,然后task加入task_queue_并从tasks_not_ready_移除等待线程执行任务;若依赖未完成,则等待依赖的task执行完。在cartographer中存在有不同的数据用于动态加载,可能存在有依赖,此时Schedule起到了层级的作用。
std::weak_ptr<Task> ThreadPool::Schedule(std::unique_ptr<Task> task)
std::shared_ptr<Task> shared_task;
std::lock_guard<std::mutex> lock(mutex_);
auto insert_result =
tasks_not_ready_.insert(std::make_pair(task.get(), std::move(task)));
if (!insert_result.second)
return insert_result.first->second;
// throw std::runtime_error("a task has been scheduled twice");
shared_task = insert_result.first->second;
SetThreadPool(shared_task.get());
return shared_task;
以上是关于Cartographer中的线程池操作的主要内容,如果未能解决你的问题,请参考以下文章
cartographer线程池ThreadPool::DoWork()
京东二面:线程池中的线程抛出了异常,该如何处理?大部分人都会答错!