解密 Rust 异步编程
Posted 原力注入
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了解密 Rust 异步编程相关的知识,希望对你有一定的参考价值。
Future
Rust
在实现异步编程的时候,采用 Poll
模型。最为核心的就是
enum Poll<T> {
Ready(T),
Pending,
}
我们定义了一个类型为 Poll
处于两种状态之一 Ready
Pending
Ready: 我们已经完成了任务可以返回结果 T
Pending:此时我们等待一些其他依赖,这时候我们需要将自己占用的资源释放出来,基于一个
wake()
函数再次将自己唤醒
最常见的就是 Socket
编程
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>; //返回值是一个 u8 的 bytes
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// 如果有数据直接返回 Ready 状态下可读的值
Poll::Ready(self.socket.read_buf())
} else {
// 没有数据就直接返回 Pending,等待数据,这里有一个 wake 函数等待被唤醒
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
对于多个 Pollable
的对象,组合在一起非常的合理,就和 Java
的 ComposeFunture
一样
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
// 尝试完成 A
if let Some(a) = &mut self.a {
if let Poll::Ready(()) = a.poll(wake) {
self.a.take();
}
}
// 尝试完成 B
if let Some(b) = &mut self.b {
if let Poll::Ready(()) = b.poll(wake) {
self.b.take();
}
}
if self.a.is_none() && self.b.is_none() {
// 都完成了就是 Ready
Poll::Ready(())
} else {
Poll::Pending
}
}
}
Wakeup
从上面的设计中可以发现,如果我们没有 wake
的话,我们只能一直的去轮训的获得是否已经 Ready
,整体的效率会比较低,因此我们可以通过 wake
函数将其唤醒。Rust
中对于 Wake
的定义如下:
pub trait ArcWake: Send + Sync {
fn wake(self: Arc<Self>) {
Self::wake_by_ref(&self)
}
// 对于我们来说,我们需要的就是实现这个 wake_by_ref
fn wake_by_ref(arc_self: &Arc<Self>);
}
让我们假设一个超时的场景,在运行一段时间之后,我们需要将其唤醒。定义一个 TimerFuture
对象
impl TimerFuture {
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// 启动一个新的线程处理我们的状态量
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration);
let mut shared_state = thread_shared_state.lock().unwrap();
// Thread Sleep 够了就算是超时了,这时候我们需要 wake 我们的工作对象了
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
Executor
上面只是提供了机制,我们还需要执行的本体,Rust
抽象了 Executor
出来
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
impl Spawner {
fn spawn(&self, future: impl Future<Output=()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued");
}
}
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
// 阻塞结束,完成了一部分的请求数据
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
// 创建一个 Waker,将这个任务本体放入
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
// 尝试poll一次,如果是 Pending
if let Poll::Pending = future.as_mut().poll(context) {
*future_slot = Some(future);
}
}
}
}
}
是不是有点懵,先不慌,我们看看怎么用的。
fn main() {
let (executor, spawner) = new_executor_and_spawner();
// 我们向 executor 扔一个任务,其实这里就扔到了 ready_queue 的队列中
spawner.spawn(async {
println!("howdy!");
// Wait for our timer future to complete after two seconds.
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
// 这里会直接运行
executor.run();
}
不过我们在此之前,我们需要定义好如下代码:
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
task_sender: SyncSender<Arc<Task>>,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
many tasks queued");
}
}
我们会发现实际上我们最终 wake
起来的还是我们自己,我们只是在控制 wake
的方式。
How it Work
核心的逻辑是,我们接收到这个任务,将其放置于我们的 Task Queue
中
spawner.spawn(async { // 这个 socpe 就是 Task
println!("howdy!");
TimerFuture::new(Duration::new(2, 0)).await;
println!("done!");
});
这个任务此时还没有 wake
对象,我们为其创建 Context
对象(抽象规定的,我们需要将 Wake 置于其中),并且创建一个 wake
对象,而实际上 wake
对象仅仅是是这个 Task
本身。
因此当我们进入 TimerFuture
的 Poll
函数的时候,我们就已经在等待 2
秒之后,Wake
触发如下逻辑
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self.task_sender.send(cloned).expect("too many tasks queued");
}
此时,我们只不过又将我们的任务又 Re submit
到我们的任务 Task Queue
去了,此时我们再将整个 Thread
的工作给重新唤醒进行处理即可。
线程的变化为
参考源码:
https://gist.github.com/yanickxia/270784bc004cb4c0a7b28b13ac9f2aba
Wake in real world
在真正的编程中,如果都是基于每一次启动一个线程会显得很不高效,因此一般都是基于信号来处理。
因此对于 socket
来说
impl Socket {
fn set_readable_callback(&self, waker: Waker) {
//这里其实就是 eventloop
let local_executor = self.local_executor;
// 此socket 的id
let id = self.id;
// 将需要监听的事件放入 eventloop 里面,等待 eventloop 回调
local_executor.event_map.insert(id, waker);
local_executor.add_io_event_interest(
&self.socket_file_descriptor,
Event { id, signals: READABLE },
);
}
}
值得注意的,本篇博客意在指出 Funture
的抽象机制是如何运行的,对于真正的系统比如 tokio
在实现 Executor
会比我们现在所设计的要复杂的多。
参考
async-book
原力注入
热点文章
以上是关于解密 Rust 异步编程的主要内容,如果未能解决你的问题,请参考以下文章