Rust Async: smol源码分析-Executor篇

Posted Rust语言中文社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rust Async: smol源码分析-Executor篇相关的知识,希望对你有一定的参考价值。

本文来自知乎:https://zhuanlan.zhihu.com/p/137353103


smol是一个精简高效的异步运行时,包含有Executor,Reactor和Timer的实现。本文分析其中的Executor部分,借助于async_task(之前的文章已经详细分析过了)打下的基础,executor的实现非常清晰简洁,整个代码几个小时就能分析完毕。smol实现的executor有三类:

  1. thread-local:用于执行!Send的task,由Task::local创建;

  2. work-stealing: 多线程带工作窃取,执行Task::spawn创建的Task;

  3. blocking executor: 线程池执行带阻塞的task,由Task::blocking创建;

smol::run会执行参数提供的future至结束,同时充当工作线程的角色执行thread-local,work-stealing中spawn出来的task以及推进reactor中的IO事件和定时器。另外也提供了smol::block_on方法,来执行单个future。下面分别分析各个executor的实现细节。

smol整体结构图



Thread Local Executor

该Executor的特点是spawn出来的task和spawn调用所在的线程绑定,整个task从创建到执行到销毁都不会脱离该线程,因此可以用于!Send的Future。

结构定义

为了减少跨线程同步开销,ThreadLocalExecutor采用了并发和非并发两个队列:当其他线程唤醒task时,将task压入并发队列里;当本地线程要spawn新的task或者唤醒task时,压入非并发队列里。结构定义如下:

pub(crate)struct ThreadLocalExecutor{
// 非并发的主任务队列
queue: RefCell<VecDeque<Runnable>>,
// 当其他线程唤醒task时,放入该队列,支持并发调用
injector: Arc<SegQueue<Runnable>>,
// 用于通知executor线程,这样如果其阻塞在epoll上时可以立马被唤醒
event: IoEvent,
}

ThreadLocalExecutor::new

进行字段的初始化

pubfn new()-> ThreadLocalExecutor{
ThreadLocalExecutor{
queue: RefCell::new(VecDeque::new()),
injector: Arc::new(SegQueue::new()),
event: IoEvent::new().expect("cannot create an `IoEvent`"),
}
}

ThreadLocalExecutor::enter

executor嵌套(即在executor内部又创建executor),容易引发丢失通知导致死锁等问题.为了检测这种情况,通常的做法是设置一个线程局部变量,在进入executor前设置该变量,这样就可以检测嵌套的情况。enter函数接收一个闭包,在调用该闭包前将executor设置进thread local中,再执行闭包,调用结束时将thread local变量恢复。

scoped_thread_local!{
staticEXECUTOR: ThreadLocalExecutor
}

pubfn enter<T>(&self,f: implFnOnce()-> T)-> T{
// 已经设置了表示有嵌套
ifEXECUTOR.is_set(){
panic!("cannot run an executor inside another executor");
}
EXECUTOR.set(self,f)
}

ThreadLocalExecutor::spawn

spawn用于创建并调度task,其关键是记住当前spawn线程的id,当task唤醒的时候,拿唤醒的线程id和spawn线程id进行对比,如果相等则压入主队列,不相等则压入并发队列。

pubfn spawn<T: 'static>(future: implFuture<Output=T>+'static)-> Task<T>{
if!EXECUTOR.is_set(){
panic!("cannot spawn a thread-local task if not inside an executor");
}

EXECUTOR.with(|ex|{
// 这里使用弱引用是因为:Injector队列存有task,而task的waker(包含下面
// 的schedule闭包)含有injector的引用,这样可以避免循环引用。
letinjector=Arc::downgrade(&ex.injector);
letevent=ex.event.clone();
letid=thread_id();

letschedule=move|runnable|{
ifthread_id()==id{
// 是spawn时的线程,直接放入主队列
EXECUTOR.with(|ex|ex.queue.borrow_mut().push_back(runnable));
}elseifletSome(injector)=injector.upgrade(){
// 放入并发队列
injector.push(runnable);
}

// 通知executor的线程,
event.notify();
};

// 创建task,放进队列,并返回handle
let(runnable,handle)=async_task::spawn_local(future,schedule,());
runnable.schedule();
Task(Some(handle))
})
}

ThreadLocalExecutor::execute

execute的功能就是批量地从队列中取出task并执行,不过其没有写成死循环,这是为了将该executor和其他的executor以及reactor,timer等集成在同一个线程里跑。为了公平照顾两个队列,避免饥饿,将执行分成4组,每组执行50个task。

pubfn execute(&self)-> bool {
for_in0..4{
for_in0..50{
matchself.search(){
None=>{
returnfalse;
}
Some(r)=>{
// throttle的作用是防止某个task没完没了的执行导致
// 其他task饥饿,其机制下篇再介绍。
throttle::setup(||r.run());
}
}
}

// 从并发队列里把task取出来放进主队列。
self.fetch();
}

// 提醒调用者可能还有其他的任务要跑
true
}

// 查找下一个可执行的task
fn search(&self)-> Option<Runnable>{
// 从主队列里看看有没有
ifletSome(r)=self.queue.borrow_mut().pop_front(){
returnSome(r);
}

// 从并发队列里把task取出来放进主队列。
self.fetch();
// 再检查主队列有没有
self.queue.borrow_mut().pop_front()
}

fn fetch(&self){
letmutqueue=self.queue.borrow_mut();
whileletOk(r)=self.injector.pop(){
queue.push_back(r);
}
}

Blocking Executor

该executor主要特点是可以执行阻塞的task,有一个全局的单例,可以不依赖smol::run的驱动独立运行。实现机制是在背后自适应地开多个线程执行:当处于空闲状态时,没有线程创建和资源消耗;一旦有任务时,就开启和任务相应比例的线程(当然不超过上限500个)。当开启的线程没任务跑时会等待一段时间看有没有任务派发,没有就结束线程。

结构定义

struct State{
// 空闲的线程数
idle_count: usize,
// 总线程数
thread_count: usize,
// 任务队列
queue: VecDeque<Runnable>,
}

pub(crate)struct BlockingExecutor{
state: Mutex<State>,
// 用于唤醒空闲的线程起来工作
cvar: Condvar,
}

BlockingExecutor::get

用于获取全局单例

pubfn get()-> &'staticBlockingExecutor{
staticEXECUTOR: Lazy<BlockingExecutor>=Lazy::new(||BlockingExecutor{
state: Mutex::new(State{
idle_count: 0,
thread_count: 0,
queue: VecDeque::new(),
}),
cvar: Condvar::new(),
});
&EXECUTOR
}

BlockingExecutor::main_loop

该函数是工作线程跑的,不停从队列里取出task进行执行,没有task就等500毫秒退出。

fn main_loop(&'staticself){
letmutstate=self.state.lock().unwrap();
loop{
// 线程创建时是算为空闲状态, 当前线程正在工作,所以空闲数减一。
state.idle_count-=1;

// Run tasks in the queue.
whileletSome(runnable)=state.queue.pop_front(){
// 看看要不要建新的线程跑任务,主要state是move进grow_pool的,
// 因此调用完之后是释放了锁。
// 这样其他线程可以从队列中取任务。
self.grow_pool(state);

// async_task::Task::run在执行中出现panic,Handle那边是知道的,
// 会重新抛异常,因此这里并没有偷偷吞掉异常。
let_=panic::catch_unwind(||runnable.run());

// 重新获取锁继续循环
state=self.state.lock().unwrap();
}

// 空闲中
state.idle_count+=1;

// 先睡500毫秒,有任务再来唤醒我。
lettimeout=Duration::from_millis(500);
let(s,res)=self.cvar.wait_timeout(state,timeout).unwrap();
state=s;

// 已经超时了,又没任务,退出线程。
ifres.timed_out()&&state.queue.is_empty(){
state.idle_count-=1;
state.thread_count-=1;
break;
}
}
}

BlockingExecutor::grow_pool

如果当前的任务超过了5倍的空闲线程数,同时总线程不超过500个,则增加工作线程数。

fn grow_pool(&'staticself,mutstate: MutexGuard<'static,State>){
whilestate.queue.len()>state.idle_count*5&&state.thread_count<500{
state.idle_count+=1;
state.thread_count+=1;
// 任务有点多,把全部空闲线程唤醒起来干活。
self.cvar.notify_all();

thread::spawn(move||{
context::enter(||self.main_loop())
});
}
}

BlockingExecutor::spawn

标准操作,创建task,并开始调度执行。

pubfn spawn<T: Send +'static>(
&'staticself,
future: implFuture<Output=T>+Send+'static,
)-> Task<T>{
let(runnable,handle)=async_task::spawn(future,move|r|self.schedule(r),());
runnable.schedule();
Task(Some(handle))
}

// 把task压入队列,并且唤醒一个睡眠线程起来工作。
fn schedule(&'staticself,runnable: Runnable){
letmutstate=self.state.lock().unwrap();
state.queue.push_back(runnable);
self.cvar.notify_one();
self.grow_pool(state);
}

Work Stealing Executor

这个executor的特点是由一个或多个工作线程构成,每个线程拥有一个工作队列,当队列为空时,可以从其他线程窃取task进行执行。因此task的创建执行和销毁的整个生命周期内可能由多个线程经手,因此要求对应的Future实现Send。工作窃取的方式相比于只有一个全局共享队列的好处是由于每个线程都有一个队列,因此可以避免大量的线程间同步开销,同时也能够实现线程间工作任务的负载均衡。在实现上,这个executor并没有根据cpu的核数创建固定的工作线程,而是每个工作线程必须通过主动调用smol::run来加入工作线程。

数据结构

WorkStealingExecutor结构有一个全局的变量,用于工作线程加入executor,窃取其他worker的task,以及非工作线程spawn的task。Worker有一个slot用于缓存一个task,因为有些task刚poll完又ready,先放进slot里,这样下次执行时先从这个slot里取,可以减少执行的task切换开销,提高cache利用率,也可以避免压入队列时的同步开销. 和tokio/async_std类似。

pub(crate)struct WorkStealingExecutor{
// 用于非工作线程插入task。
injector: deque::Injector<Runnable>,
// 注册了用于窃取其他worker的task的handle
stealers: ShardedLock<Slab<deque::Stealer<Runnable>>>,
// 用于通知工作线程,这样如果其阻塞在epoll上时可以立马被唤醒
event: IoEvent,
}

pub(crate)struct Worker<'a>{
// 注册为工作线程时拿到的ID
key: usize,
slot: Cell<Option<Runnable>>,
// 工作队列,其他worker可以来窃取任务
queue: deque::Worker<Runnable>,
executor: &'aWorkStealingExecutor,
}

Executor全局变量和Worker线程局部变量

全局的executor用于工作线程加入,以及非工作线程spawn新的task。worker线程局部变量和ThreadLocalExecutor作用类似。

pubfn get()-> &'staticWorkStealingExecutor{
staticEXECUTOR: Lazy<WorkStealingExecutor>=Lazy::new(||WorkStealingExecutor{
injector: deque::Injector::new(),
stealers: ShardedLock::new(Slab::new()),
event: IoEvent::new().expect("cannot create an `IoEvent`"),
});
&EXECUTOR
}

scoped_thread_local!{
staticWORKER: for<'a>&'aWorker<'a>
}

implWorker<'_>{
// 进入worker的上下文
pubfn enter<T>(&self,f: implFnOnce()-> T)-> T{
// 已经设置了表示有嵌套
ifWORKER.is_set(){
panic!("cannot run an executor inside another executor");
}
WORKER.set(self,f)
}
}

WorkStealingExecutor::spawn

spawn用于创建并调度task。当task唤醒的时候,如果是工作线程则直接压入工作线程的队列,否则通过全局变量压入。

pubfn spawn<T: Send +'static>(
&'staticself,
future: implFuture<Output=T>+Send+'static,
)-> Task<T>{
letschedule=move|runnable|{
ifWORKER.is_set(){
// 表示当前在worker线程里,那么直接将task压入worker的队列。
WORKER.with(|w|w.push(runnable));
}else{
// 非worker线程只能通过全局的executor把task压入
self.injector.push(runnable);
// 通知工作线程
self.event.notify();
}
};

// 创建task,放进队列,并返回handle
let(runnable,handle)=async_task::spawn(future,schedule,());
runnable.schedule();
Task(Some(handle))
}

WorkStealingExecutor::worker

用于注册工作线程,把stealer的Handle写到全局executor中,这样其他线程可以来窃取task。

pubfn worker(&self)-> Worker<'_>{
letmutstealers=self.stealers.write().unwrap();
letvacant=stealers.vacant_entry();

// Create a worker and put its stealer handle into the executor.
letworker=Worker{
key: vacant.key(),
slot: Cell::new(None),
queue: deque::Worker::new_fifo(),
executor: self,
};
vacant.insert(worker.queue.stealer());

worker
}

WorkStealingExecutor::execute

和上面的类似,批量地从队列中取出task并执行,将执行分成4组,每组执行50个task。

pubfn execute(&self)-> bool {
for_in0..4{
for_in0..50{
matchself.search(){
None=>{
returnfalse;
}
Some(r)=>{
// 通知其他线程可能有任务可以窃取
self.executor.event.notify();

ifthrottle::setup(||r.run()){
// slot优化其实打乱了fifo的公平性,如果task在跑的过程中又ready了,
// 手动把它从slot里刷到队列中。
// 这样yield_now才能正常工作,不会导致task不停循环执行。
self.flush_slot();
}
}
}
}

// 也是为了公平性,偶尔将slot刷到队列里。
self.flush_slot();

// 从injector队列里窃取些任务到本地队列中,避免饥饿。
ifletSome(r)=self.steal_global(){
self.push(r);
}
}

// 通知调用者可能还有其他任务要跑
true
}

smol::run

三个executor的spawn方法分别通过Task::local,Task::blocking和Task::spawn对外暴露。spawn出来的task的执行通过smol::run来驱动(blocking executor自带线程池,不需要驱动)。它依次执行future,ThreadLocalExecutor::execute, WorkStealingExecutor::execute,以及reactor(后面作详细分析)。

pubfn run<T>(future: implFuture<Output=T>)-> T{
// 创建好executor和reactor
letlocal=ThreadLocalExecutor::new();
letws_executor=WorkStealingExecutor::get();
letworker=ws_executor.worker();
letreactor=Reactor::get();

// 使用local executor的IoEvent来创建Waker
letev=local.event().clone();
letwaker=async_task::waker_fn(move||ev.notify());
letcx=&mutContext::from_waker(&waker);
futures::pin_mut!(future);

// 用于设置执行前的上下文环境,比如tokio的runtime。
letenter=context::enter;
letenter=|f|local.enter(||enter(f));
letenter=|f|worker.enter(||enter(f));

enter(||{
letio_events=[local.event(),ws_executor.event()];

letmutyields=0;

// 依次执行task,然后调用reactor,并阻塞等通知:
loop{
ifletPoll::Ready(val)=throttle::setup(||future.as_mut().poll(cx)){
returnval;
}

letmore_local=local.execute();
letmore_worker=worker.execute();
// 执行reactor
ifletSome(reactor_lock)=reactor.try_lock(){
yields=0;
// poll reactor
react(reactor_lock,&io_events,more_local||more_worker);
continue;
}

ifmore_local||more_worker{
yields=0;
continue;
}

// 任务跑完了,释放线程时间片,下次再重试
yields+=1;
ifyields<=2{
thread::yield_now();
continue;
}

// 还没找到任务,开始阻塞线程在reactor上等通知。
yields=0;

letlock=reactor.lock();
letnotified=local.event().notified();
futures::pin_mut!(lock);
futures::pin_mut!(notified);

ifletEither::Left((reactor_lock,_))=block_on(future::select(lock,notified)){
react(reactor_lock,&io_events,false);
}
}
})
}

需要注意的使用事项

async_std的运行时采用的是延迟实例化,按需自动启动的策略,因此用户可以开箱即用,不需要做特殊的配置。而目前smol采用了和tokio类似的策略,整个运行时是需要手动启用,否则会产生运行时panic,考虑到目前tokio的这个策略对新用户困扰颇多,smol估计也会有类似的问题,目前已经有人提了几个issue了。同时smol::run并不会在后台开线程来启动整个工作窃取运行环境,而是当作一个工作线程加入工作窃取运行时中,因此需要类似下面的代码才能启动整个多线程的运行时:

letnum_threads=num_cpus::get().max(1);
for_in0..num_threads{
thread::spawn(||smol::run(future::pending::<()>()));
}

总结

smol整个代码非常精炼简洁,只有一千来行,本文只对其executor作了分析,而它的Reactor的部分也非常的精彩,可以将大量现有的库,如linux-timerfd,linux-inotify,uds等直接异步化,后面有空再做总结。


以上是关于Rust Async: smol源码分析-Executor篇的主要内容,如果未能解决你的问题,请参考以下文章

Rust async 编程

android-async-http框架源码分析

sh rust-temporary-exec.sh

Rust futures: async fn 中的 thread::sleep 和阻塞调用

FileCoin 挖矿教程 基本源码分析1

RUST Ex00 Async-std