Rust语言通过定义了Future Trait , 奠定了异步语法的基石,而Rust的异步代码时惰性的,必须有一个运行时来驱动,Rust本身还没提供这样的实现,社区中有不少开源方案,比如tokio等。
Tokio的运行时是一个事件循环,利用了不同平台的异步非阻塞特性,比如kqueue,epoll等。
我一直想要弄清楚runtime是怎么调度Future,而Future完成时又是怎么通知runtime,extreme 实现了一个最小运行时,可以让一窥究竟。- use std::sync::{Arc, Condvar, Mutex};
- use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
- #[derive(Default)]
- struct Park(Mutex<bool>, Condvar);
- fn unpark(park: &Park) {
- *park.0.lock().unwrap() = true;
- park.1.notify_one();
- }
- static VTABLE: RawWakerVTable = RawWakerVTable::new(
- |clone_me| unsafe {
- let arc = Arc::from_raw(clone_me as *const Park);
- std::mem::forget(arc.clone());
- RawWaker::new(Arc::into_raw(arc) as *const (), &VTABLE)
- },
- |wake_me| unsafe { unpark(&Arc::from_raw(wake_me as *const Park)) },
- |wake_by_ref_me| unsafe { unpark(&*(wake_by_ref_me as *const Park)) },
- |drop_me| unsafe { drop(Arc::from_raw(drop_me as *const Park)) },
- );
- /// Run a `Future`.
- pub fn run<F: std::future::Future>(mut f: F) -> F::Output {
- let mut f = unsafe { std::pin::Pin::new_unchecked(&mut f) };
- let park = Arc::new(Park::default());
- let sender = Arc::into_raw(park.clone());
- let raw_waker = RawWaker::new(sender as *const _, &VTABLE);
- let waker = unsafe { Waker::from_raw(raw_waker) };
- let mut cx = Context::from_waker(&waker);
- loop {
- match f.as_mut().poll(&mut cx) {
- Poll::Pending => {
- let mut runnable = park.0.lock().unwrap();
- while !*runnable {
- runnable = park.1.wait(runnable).unwrap();
- }
- *runnable = false;
- }
- Poll::Ready(val) => return val,
- }
- }
- }
复制代码 这个简短的例子表达了实现一个运行时的最低需求
- 实现RawWakerVTable
- 如何通过Waker唤醒runtime继续调度,这里用了信号量
本质上运行时可以抽象成一个不断运行的循环体,在循环体内不断调用Future的poll方法。
(当Future返回Poll: ending时,此处简化为使用信号量的等待操作)
这个例子也说明了Future的调用能返回时,需要调用存储在ctx里面的Waker::waker()方法,唤醒运行时继续执行阻塞的异步任务
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |