到现在,我们完成了异步Rust和Tokio相当全面的介绍. 现在我们将更深入的研究Rust异步运行时模型. 在本教程最开始,我们暗示过Rust的异步采用了一种独特的方式. 现在我们将解释其含义.
作为回顾,让我们写一个非常基础的异步函数. 与到目前为止的教程相比,这并不是什么新东西.
use tokio::net::TcpStream;
async fn my_async_fn() {
println!("hello from async");
let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap();
println!("async TCP operation complete");
}
我们调用这个函数并得到一些返回值. 我们调用.await
得到这个值.
#[tokio::main]
async fn main() {
let what_is_this = my_async_fn();
// 上面调用后,这里并没有任何打印内容
what_is_this.await;
// 文本被打印了,且socket链接建立和关闭
}
my_async_fn()
返回的是一个future值. 此Future它是一个实现了标准库中 std::future::Future trait 的值. 它们是包含正在进行异步计算的值.
std::future::Future trait定义如下:
use std::pin::Pin;
use std::task::{Context, Poll};
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx:&mut Context) -> Poll<Self::Output>;
}
associated type Output
是一个future完成后产生的类型.
Pin 类型是Rust在 async
函数中如何支持借用. 查看 standard library 了解更多的细节.
与其它语言实现的future不一样, 一个Rust的future不代表在后台发生的计算,而是Rust的future就是计算本身. future的所有者通过轮询future来推进计算. 这是通过调用 Future::poll
来完成的.
让我们来实现一个非常简单的future. 它有以下几个特点:
- 等待到一个特定的时间点.
- 输出一些文本到STDOUT.
- 产生一个String.
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// 现在忽略这一行
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}
在main函数中,我们实例化一个future并在它上面调用 .await
. 在异步函数中,我们可以在任何实现了 Future
的值上调用 .await
.
反过来说, 调用一个 async
函数会返回一个实现了 Future
的匿名类型. 在 async fn main()
中,生成的future大致为:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
enum MainFuture {
// 初始化时,从未轮询过
State0,
// 等待 `延迟` , 比如. `future.await` 这一行.
State1(Delay),
// future已经完成.
Terminated,
}
impl Future for MainFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()>
{
use MainFuture::*;
loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}
}
}
Terminated => {
panic!("future polled after completion")
}
}
}
}
}
Rust的Future是一种状态机. 这里 MainFuture
代表future可能的状态枚举. future开始于State0
状态. 当调用poll
时, future会尝试尽可能的推进其内部的状态.如果future能够完成,则返回包含异步计算输出的Poll::Ready
.
如果future不能够完成, 通常是由于资源不够而等待,这个时候返回Poll::Pending
. 接收到Poll::Pending
会向调用者表明future会在将来某个时刻完成,并且调用者应该稍候再次调用poll
函数.
我们还看到future由其它future组合. 在外部future上调用poll
会导致在内部future上调用poll
函数.
异步Rust函数返回future. 必须在Future上调用poll
来推进其状态. Future可以被其它Future组合. 因此,问题来了,调用最外部的future的poll
意味着什么?
回想一下,要运行异步函数,必须将它们传递给tokio::spawn
或者使用#[tokio::main]
注解main函数. 这样的结果是生成的外部future提交给Tokio的执行器.执行器负责在外部Future上调用Future::poll
,来驱动异步计算的完成.
为了更好的理解这一切是如何融合的,让我们实现自己的迷你版本的Tokio!完整的代码在 这里 .
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use futures::task;
fn main() {
let mut mini_tokio = MiniTokio::new();
mini_tokio.spawn(async {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
});
mini_tokio.run();
}
struct MiniTokio {
tasks: VecDeque<Task>,
}
type Task = Pin<Box<dyn Future<Output = ()> + Send>>;
impl MiniTokio {
fn new() -> MiniTokio {
MiniTokio {
tasks: VecDeque::new(),
}
}
/// 在 mini-tokio 实例之上产生一个future
fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
self.tasks.push_back(Box::pin(future));
}
fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);
while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
self.tasks.push_back(task);
}
}
}
}
这将运行异步块. 使用请求延迟来创建一个Delay
实例并等待它. 然而,我们的实现到目前为止有一个重大的缺陷. 我们的执行器绝不会休眠. 执行器不断
循环所有产生的future并对其进行轮询. 大多时候,future不准备执行更多的工作,并会返回Poll::pending
. 这一过程会消耗CPU并且通常没有效率.
理想的情况下,我们仅仅想让mini-tokio在future在有进展的时候去轮询future. 当阻塞任务的资源准备好执行请求操作的时候,就会发生这种情况. 如果
任务想从TCP socket中读取数据,那么我们只想在TCP socket接收到数据时轮询任务. 在我们的方案中,任务在到达指定的Instant
时被阻塞. 理想情况下
mini-tokio只会在该时间过去后再轮询任务.
为了达到这一目的,在对一个资源进行轮询且资源未准备好时,资源转换为就绪状态后将发送一个通知.
通过该系统(译者注: 指唤醒),资源在通知等待它的任务时表明已经准备就绪,可以继续进行一些其它的操作了.
让我们再次看看Future::poll
的定义:
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
poll
函数中的Context
参数有一个waker()
方法. 此方法返回一个绑定到当前任务的Waker .
Waker
中又有一个wake()
方法. 调用这个方法会向执行器发出信息,说明应该安排相关任务的执行计划. 当资源的状态转换到就绪状态时,它们会调用wake()
方法,来通知执行者轮询任务来推进资源的状态.
我们能更新Delay
来使用唤醒(wakers):
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::thread;
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// 在当前任务上获取一个waker句柄
let waker = cx.waker().clone();
let when = self.when;
// 生产一个定时器线程
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
waker.wake();
});
Poll::Pending
}
}
}
现在,一旦请求的持续时间过去后,调用任务就会得到通知,执行器可以确保再次安排任务. 下一步就是更新mini-tokio来监听唤醒通知.
我们的Delay
实现仍然有一些其它的问题. 我们将在后面修复它.
当一个future返回 Poll::Pending时,它必须确保waker能在某个时刻发出信息. 忘记此操作的结果就是任务会无限的挂起.
返回 Poll::Pending后忘记唤醒任务是常见bug的来源.
回忆一下Delay
的第一个迭代版本. 下面是future的实现:
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> {
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
}else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
在返回Poll::Pending
之前,我们调用了cx.waker().wake_by_ref()
. 这是为了满足future的组合操作. 通过返回Poll::Pending
,我们负责
发信号给waker. 因为我们没有实现计时器(timer)线程,所以我们向唤醒程序(waker)发送了内联信息. 这样做的结果是为了future能重新被调度,再次执行,
并且可能还没有完全准备好.
请注意,你可以向waker发送更多的不是必须的信号. 在这种特殊的情况下,即使我们没有准备好继续操作,我们也会向waker发出信号. 除了浪费一个CPU时钟周期外,并没有什么问题. 但是,这种特殊的实现将导致非常繁忙的循环.
接下来就是更新Mini Tokio 来接收waker的通知. 我们想让执行器在仅当任务被唤醒时才运行它们,为了做到这一点,Mini Tokio将提供自己的waker. 当waker被调用时,与它相关的任务会被排队执行. Mini Tokio在轮询future时将waker传递给future.
更新后的Mini Tokio将使用channel来存储计划任务. 通道(Channel)允许任务以队列的方式从任意线程执行. Wakers必须是 Send
和Sync
类型的,
因此我们使用 crossbeam 包中的channel,因为标准库中的channel不是Sync
的.
Send
与 Sync
是Rust提供的与并发相关的一种标记trait. Send 类型可以在不同的线程中传递. 大多数类型都是 Send ,但是像 Rc 却不是. 可以通过不可变引用并发访问的类型是 Sync. 一个类型是 Send 但不是 Sync ---- 一个很好的例子是 Cell,可以通过不可变引用对其修改,因此它不能并发的共享访问.
更多关于 Send
与 Sync
的细节可以参考chapter in the Rust book.
在 Cargo.toml
中添加如下依赖:
crossbeam = "0.7"
然后,更新MiniTokio
结构体:
use crossbeam::channel;
use std::sync::Arc;
struct MiniTokio {
scheduled: channel::Receiver<Arc<Task>>,
sender: channel::Sender<Arc<Task>>,
}
struct Task {
// 这一块后面再填写
}
Wakers是Sync
类型的并且它可以被克隆(Clone). 当调用wake
时,必须安排任务来执行. 为了实现这个,我们使用channel. 当在waker上调用wake()
时,
任务被推送到channel的发送方. 我们的Task
结构体将实现wake的逻辑. 为了做到这一点,它需要包含生成的Future和channel发送方.
use std::sync::{Arc, Mutex};
struct Task {
// Mutex能使用任务实现'同步(sync)'效果,在任意时刻仅能有一个线程能够访问future.
// Mutex (在此场景下)不需要非常正确,真实的tokio没有在这里使用Mutex,但是真实的tokio
// 使用了更多行的代码来实现这一点.
future: Mutex<Pin<Box<dyn Future<Output = ()> + Send>>>,
executor: channel::Sender<Arc<Task>>,
}
impl Task {
fn schedule(self: &Arc<Self>) {
self.executor.send(self.clone());
}
}
为了安排任务,Arc
将会被clone,并将它通过channel发送. 现在,我们需要将schedule
函数与 std::task::Waker 挂钩.
标准库提供了一套低级别的API manual vtable construction 来做这个. 这种策略为实现者提供了最大的灵活性,
但是需要大量的unsafe(不安全)的样板代码. 取而代之的是,我们可以直接使用RawWakerVTable,
我们使用futures包提供的ArcWake 工具.
这可以使我们能实现一个简单的trait,来将我们的Task
结构暴露为一个waker.
在Cargo.toml
中添加如下依赖来拉取futures
.
futures = "0.3"
然后实现futures::task::ArcWake .
use futures::task::ArcWake;
use std::sync::Arc;
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.schedule();
}
}
当上面的计时器(timer)线程调用waker.wake()
时,任务被发送到channel通道中去. 下一步我们在MiniTokio::run()
函数中实现接收和执行任务的功能.
impl MiniTokio {
fn run(&self) {
while let Ok(task) = self.scheduled.recv() {
task.poll();
}
}
/// 初始化一个新的 mini-tokio 实例.
fn new() -> MiniTokio {
let (sender, scheduled) = channel::unbounded();
MiniTokio { scheduled, sender }
}
/// 在 mini-tokio 实例上产生一个 future
///
/// 给future包装task并推其推送到 `scheduled` 队列中.
fn spawn<F>(&self, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
Task::spawn(future, &self.sender);
}
}
impl Task {
fn poll(self: Arc<Self>) {
// 从task实例上创建一个waker. 它使用 ArcWake
let waker = task::waker(self.clone());
let mut cx = Context::from_waker(&waker);
// 没有其它线程试图锁住future
let mut future = self.future.try_lock().unwrap();
// 轮询future
let _ = future.as_mut().poll(&mut cx);
}
// 使用指定的future产生一个新的task.
//
// 初始化一个新的包含了指定future的task,并将其它推送给 sender. channel另外一半的receiver将接收到它并执行.
fn spawn<F>(future: F, sender: &channel::Sender<Arc<Task>>)
where
F: Future<Output = ()> + Send + 'static,
{
let task = Arc::new(Task {
future: Mutex::new(Box::pin(future)),
executor: sender.clone(),
});
let _ = sender.send(task);
}
}
这里发生了多件事. 首先,实现了MiniTokio::run()
. 这个函数循环运行,从通道中接收计划任务. 当任务被唤醒时,任务被推送到channel中,这些
任务在执行时能够取得进展(译者注: 指poll后任务本身的状态能得到推进).
另外,MiniTokio::new()
与MiniTokio::spawn()
函数也使用channel来调整了一下,而不是使用VecDeque
. 当一个新的任务产生时,为它们分配
channel 发送部分的副本,任务可以在运行时使用该副本来调度本身.
Task::poll()
函数使用来自futures
包中的ArcWake
工具创建一个waker. 此waker用来创建一个task::Context
. task::Context
传递给poll
.
我们现在已经看到了异步Rust的端到端原理示例. Rust的async/await
特性背后由trait支持. 这就允许使用第三方包,像tokio来提供执行细节.
- Rust的异步操作是惰性的,需要调用者对其进行轮询.
- Wakers被传递给future,以将future与调用它的任务联系起来.
- 当一个资源没有准备好完成时,
Poll::Pending
被返回并记录任务的唤醒程序(waker). - 当一个资源变为就绪状态时,就会通知任务的唤醒程序(waker).
- 执行器接收到通知并安排任务来执行.
- 任务再一次被轮询,这一次资源是就绪状态并且任务能够取得进展.
回顾一下,当我们实现Delay
时,我们说过还要更多的问题要修复. Rust的异步模型允许单个future在执行时跨任务移动. 考虑一下如下代码:
use futures::future::poll_fn;
use std::future::Future;
use std::pin::Pin;
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let mut delay = Some(Delay { when });
poll_fn(move |cx| {
let mut delay = delay.take().unwrap();
let res = Pin::new(&mut delay).poll(cx);
assert!(res.is_pending());
tokio::spawn(async move {
delay.await;
});
Poll::Ready(())
}).await;
}
poll_fn
函数使用闭包来创建一个Future
实例. 上面的代码片段创建了一个Delay
实例,并将其轮询一次,然后将Delay
实例发送给一个新的任务,
再等待它. 在这个示例中,使用不同的Waker
实例多次调用Delay::poll
. 我们早期的实现中无法处理这种情况,并且由于通知了错误的任务,因此产生的
任务会永远处于休眠状态.
为了修复我们早期的实现,我们可以像下面这样做:
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread;
use std::time::{Duration, Instant};
struct Delay {
when: Instant,
waker: Option<Arc<Mutex<Waker>>>,
}
impl Future for Delay {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// 首时,如果第一次future被调用,产生一个计时器线程
// 如果计时器线程已经在运行了,要确保存储的 waker能匹配上当前任务的 waker
if let Some(waker) = &self.waker {
let mut waker = waker.lock().unwrap();
// 检查存储的waker能否匹配上当前任务的waker
// 这是必须的,因为 Delay future实例可能在调用poll时移动在不同的task中去
// 如果发生了这种情况, 给定的“Context”中包含的waker将有所不同,我们必须要更新存储的waker以反映此更改.
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
} else {
let when = self.when;
let waker = Arc::new(Mutex::new(cx.waker().clone()));
self.waker = Some(waker.clone());
// 这是第一次 poll 被调用时产生一个计时器线程
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
// 持续时间过去后,通过激活waker来通知调用者.
let waker = waker.lock().unwrap();
waker.wake_by_ref();
});
}
// 一旦waker被存储且计时器已经开始,就是检查delay是否完成的时候了.
// 通过检查当前时刻来完成.
//
// 如果持续时间过了后, future已经完成 Poll::Ready就会返回
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// 持续时间没有过去,future没有完成就返回 PollPending
//
// Future trait 要求当返回 Pending 时,future将确保一旦再次对future进行轮询,就会发出指定唤醒信息.
//
// 在我们的例子中,通过这里返回的 Pending 我们可以保证一旦请求的持续时间过去后,我们将调用包含在 Context 参数中的指定waker
// 我们通过产生一个计时器线程来确保这一点.
//
// 如果我们忘记激活waker,任务将会无限的持起.
Poll::Pending
}
}
}
它涉及到一点,但是这个想法是,在每次轮询时,future都会检查所提供的waker是否与先前记录的waker相匹配. 如果两个waker匹配,则什么也不发生. 如果它们不匹配,则原来记录的waker必须被更新.
我们演示了如何使用waker手动实现Delay
future. Wakers是异步Rust能工作的基础. 通常,不需要降低到该级别. 比如说,在Delay
的案例中,
我们可以使用tokio::sync::Notify 工具完全使用async/await
来实现它.
这个实用工具提供了基础的任务通知机制. 它处理了waker的一些细节,包括确保记录的waker与当前任务的waker匹配.
使用Notify,我们可以像下面这样,使用 async/await
实现Delay
功能:
use tokio::sync::Notify;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::thread;
async fn delay(dur: Duration) {
let when = Instant::now() + dur;
let notify = Arc::new(Notify::new());
let notify2 = notify.clone();
thread::spawn(move || {
let now = Instant::now();
if now < when {
thread::sleep(when - now);
}
notify2.notify_one();
});
notify.notified().await;
}
← Framing
→ Select