解密 Rust 异步编程

Posted 原力注入

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解密 Rust 异步编程相关的知识,希望对你有一定的参考价值。

Future

Rust 在实现异步编程的时候,采用 Poll 模型。最为核心的就是

enum Poll<T> { Ready(T), Pending,}

我们定义了一个类型为 Poll 处于两种状态之一 Ready Pending

  • Ready: 我们已经完成了任务可以返回结果 T

  • Pending:此时我们等待一些其他依赖,这时候我们需要将自己占用的资源释放出来,基于一个 wake() 函数再次将自己唤醒


最常见的就是 Socket 编程

pub struct SocketRead<'a> { socket: &'a Socket,}
impl SimpleFuture for SocketRead<'_> { type Output = Vec<u8>; //返回值是一个 u8 的 bytes
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { if self.socket.has_data_to_read() { // 如果有数据直接返回 Ready 状态下可读的值 Poll::Ready(self.socket.read_buf()) } else { // 没有数据就直接返回 Pending,等待数据,这里有一个 wake 函数等待被唤醒 self.socket.set_readable_callback(wake); Poll::Pending } }}

对于多个 Pollable 的对象,组合在一起非常的合理,就和 Java 的 ComposeFunture 一样

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>where FutureA: SimpleFuture<Output = ()>, FutureB: SimpleFuture<Output = ()>,{ type Output = (); fn poll(&mut self, wake: fn()) -> Poll<Self::Output> { // 尝试完成 A if let Some(a) = &mut self.a { if let Poll::Ready(()) = a.poll(wake) { self.a.take(); } }
// 尝试完成 B if let Some(b) = &mut self.b { if let Poll::Ready(()) = b.poll(wake) { self.b.take(); } }
if self.a.is_none() && self.b.is_none() { // 都完成了就是 Ready Poll::Ready(()) } else { Poll::Pending } }}

Wakeup

从上面的设计中可以发现,如果我们没有 wake 的话,我们只能一直的去轮训的获得是否已经 Ready,整体的效率会比较低,因此我们可以通过 wake 函数将其唤醒。Rust 中对于 Wake 的定义如下:

pub trait ArcWake: Send + Sync { fn wake(self: Arc<Self>) { Self::wake_by_ref(&self) }
// 对于我们来说,我们需要的就是实现这个 wake_by_ref fn wake_by_ref(arc_self: &Arc<Self>);}

让我们假设一个超时的场景,在运行一段时间之后,我们需要将其唤醒。定义一个 TimerFuture 对象

impl TimerFuture { pub fn new(duration: Duration) -> Self { let shared_state = Arc::new(Mutex::new(SharedState { completed: false, waker: None, }));
// 启动一个新的线程处理我们的状态量 let thread_shared_state = shared_state.clone(); thread::spawn(move || { thread::sleep(duration); let mut shared_state = thread_shared_state.lock().unwrap(); // Thread Sleep 够了就算是超时了,这时候我们需要 wake 我们的工作对象了 shared_state.completed = true; if let Some(waker) = shared_state.waker.take() { waker.wake() } });
TimerFuture { shared_state } }}

Executor

上面只是提供了机制,我们还需要执行的本体,Rust 抽象了 Executor 出来

struct Executor { ready_queue: Receiver<Arc<Task>>,}
#[derive(Clone)]struct Spawner { task_sender: SyncSender<Arc<Task>>,}
impl Spawner { fn spawn(&self, future: impl Future<Output=()> + 'static + Send) { let future = future.boxed(); let task = Arc::new(Task { future: Mutex::new(Some(future)), task_sender: self.task_sender.clone(), }); self.task_sender.send(task).expect("too many tasks queued"); }}
impl Executor { fn run(&self) { while let Ok(task) = self.ready_queue.recv() { // 阻塞结束,完成了一部分的请求数据 let mut future_slot = task.future.lock().unwrap(); if let Some(mut future) = future_slot.take() { // 创建一个 Waker,将这个任务本体放入 let waker = waker_ref(&task); let context = &mut Context::from_waker(&*waker); // 尝试poll一次,如果是 Pending if let Poll::Pending = future.as_mut().poll(context) { *future_slot = Some(future); } } } }}

是不是有点懵,先不慌,我们看看怎么用的。

fn main() { let (executor, spawner) = new_executor_and_spawner();
// 我们向 executor 扔一个任务,其实这里就扔到了 ready_queue 的队列中 spawner.spawn(async { println!("howdy!"); // Wait for our timer future to complete after two seconds. TimerFuture::new(Duration::new(2, 0)).await; println!("done!"); });
// 这里会直接运行 executor.run();}

不过我们在此之前,我们需要定义好如下代码:

struct Task { future: Mutex<Option<BoxFuture<'static, ()>>>, task_sender: SyncSender<Arc<Task>>,}
impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { let cloned = arc_self.clone(); arc_self.task_sender.send(cloned).expect("too many tasks queued"); }}

我们会发现实际上我们最终 wake 起来的还是我们自己,我们只是在控制 wake 的方式。

How it Work

解密 Rust 异步编程

核心的逻辑是,我们接收到这个任务,将其放置于我们的 Task Queue 中

spawner.spawn(async { // 这个 socpe 就是 Task println!("howdy!"); TimerFuture::new(Duration::new(2, 0)).await; println!("done!");});

这个任务此时还没有 wake 对象,我们为其创建 Context 对象(抽象规定的,我们需要将 Wake 置于其中),并且创建一个 wake 对象,而实际上 wake 对象仅仅是是这个 Task 本身。

因此当我们进入 TimerFuture 的 Poll 函数的时候,我们就已经在等待 2 秒之后,Wake 触发如下逻辑

fn wake_by_ref(arc_self: &Arc<Self>) { let cloned = arc_self.clone(); arc_self.task_sender.send(cloned).expect("too many tasks queued");}

此时,我们只不过又将我们的任务又 Re submit 到我们的任务 Task Queue 去了,此时我们再将整个 Thread 的工作给重新唤醒进行处理即可。


线程的变化为


参考源码:

https://gist.github.com/yanickxia/270784bc004cb4c0a7b28b13ac9f2aba


Wake in real world

在真正的编程中,如果都是基于每一次启动一个线程会显得很不高效,因此一般都是基于信号来处理。
因此对于 socket 来说

impl Socket { fn set_readable_callback(&self, waker: Waker) { //这里其实就是 eventloop let local_executor = self.local_executor;
// 此socket 的id let id = self.id;
// 将需要监听的事件放入 eventloop 里面,等待 eventloop 回调 local_executor.event_map.insert(id, waker); local_executor.add_io_event_interest( &self.socket_file_descriptor, Event { id, signals: READABLE }, ); }}

值得注意的,本篇博客意在指出 Funture 的抽象机制是如何运行的,对于真正的系统比如 tokio 在实现 Executor 会比我们现在所设计的要复杂的多。

参考

  • async-book



原力注入



热点文章



以上是关于解密 Rust 异步编程的主要内容,如果未能解决你的问题,请参考以下文章

rust中的异步编程

# Rust异步网络编程

深入浅出Rust异步编程之Tokio

探索 Rust 异步简化编程

张汉东开讲啦,带你攻克 Rust 异步编程

硬核!Rust异步编程方式重大升级:新版Tokio如何提升10倍性能详解