找回密码
 立即注册
首页 业界区 安全 Rust异步运行时最小实现 - extreme 分享

Rust异步运行时最小实现 - extreme 分享

史华乐 13 小时前
Rust语言通过定义了Future Trait , 奠定了异步语法的基石,而Rust的异步代码时惰性的,必须有一个运行时来驱动,Rust本身还没提供这样的实现,社区中有不少开源方案,比如tokio等。
Tokio的运行时是一个事件循环,利用了不同平台的异步非阻塞特性,比如kqueue,epoll等。
我一直想要弄清楚runtime是怎么调度Future,而Future完成时又是怎么通知runtime,extreme 实现了一个最小运行时,可以让一窥究竟。
  1. use std::sync::{Arc, Condvar, Mutex};
  2. use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
  3. #[derive(Default)]
  4. struct Park(Mutex<bool>, Condvar);
  5. fn unpark(park: &Park) {
  6.     *park.0.lock().unwrap() = true;
  7.     park.1.notify_one();
  8. }
  9. static VTABLE: RawWakerVTable = RawWakerVTable::new(
  10.     |clone_me| unsafe {
  11.         let arc = Arc::from_raw(clone_me as *const Park);
  12.         std::mem::forget(arc.clone());
  13.         RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
  14.     },
  15.     |wake_me| unsafe { unpark(&Arc::from_raw(wake_me as *const Park)) },
  16.     |wake_by_ref_me| unsafe { unpark(&*(wake_by_ref_me as *const Park)) },
  17.     |drop_me| unsafe { drop(Arc::from_raw(drop_me as *const Park)) },
  18. );
  19. /// Run a `Future`.
  20. pub fn run<F: std::future::Future>(mut f: F) -> F::Output {
  21.     let mut f = unsafe { std::pin::Pin::new_unchecked(&mut f) };
  22.     let park = Arc::new(Park::default());
  23.     let sender = Arc::into_raw(park.clone());
  24.     let raw_waker = RawWaker::new(sender as *const _, &VTABLE);
  25.     let waker = unsafe { Waker::from_raw(raw_waker) };
  26.     let mut cx = Context::from_waker(&waker);
  27.     loop {
  28.         match f.as_mut().poll(&mut cx) {
  29.             Poll::Pending => {
  30.                 let mut runnable = park.0.lock().unwrap();
  31.                 while !*runnable {
  32.                     runnable = park.1.wait(runnable).unwrap();
  33.                 }
  34.                 *runnable = false;
  35.             }
  36.             Poll::Ready(val) => return val,
  37.         }
  38.     }
  39. }
复制代码
这个简短的例子表达了实现一个运行时的最低需求

  • 实现RawWakerVTable
  • 如何通过Waker唤醒runtime继续调度,这里用了信号量
本质上运行时可以抽象成一个不断运行的循环体,在循环体内不断调用Future的poll方法。
(当Future返回Poll:ending时,此处简化为使用信号量的等待操作)
这个例子也说明了Future的调用能返回时,需要调用存储在ctx里面的Waker::waker()方法,唤醒运行时继续执行阻塞的异步任务

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册