iovxw

Rust 的 async/await

仍需努力

今天, Rust 的 Generator 支持终于被合并了!

这代表那拖了很久又很久的 Coroutine 也终于可以在 Nightly(2017-08-29) 里使用了

可喜可贺, 可喜可贺

那以后就可以愉快的在 Rust 里写异步代码啦?

想得美

坑在哪里我下面慢慢说


Rust 有一个库叫 futures 用来写异步程序, 这个库实现了 Future/Promise 模型里的各种组合子(其实就是把回调换个写法)

但是它写起来很别扭, 需要一大串 .and_then 之类的, 并且导致流程控制变得很复杂

所以人们迫切的需要一个更加同步的方式来写异步代码

在人类目前科技水平下, 这个方式叫 Coroutine


Rust 的 Coroutine 实现, 具体来说叫做 Stackless Coroutine(或者叫 Generator), 如其名, 就是不用额外栈的 Coroutine, 可以直接翻译为状态机

与之相对的还有 Stackful Coroutine, 需要在运行时为每个 Coroutine 弄个栈, 而这个栈一般是开在系统堆上面的, 慢

Rust 为了 Zero-cost Abstraction 理所当然的选择了 Stackless Coroutine (那些用 Stackful Coroutine 的语言, 为追求极致性能只能搞 Coroutine 复用, 感觉微妙)

(肯定有人要问了, 那干嘛其他语言不都用 Stackless 的呢? 傻孩子, 那是因为人家可以在任意嵌套调用中暂停, 你见过 Go 需要 await 吗? (当然这肯定不系统级 (笑)) 以及某些有 call/cc 的语言根本不在乎这点开销)


Coroutine/Generator 在 Rust 中写起来像 Closure, 但不接受任何参数

内部提供 yield 语句, 它可以 "暂停" 当前 Generator, 并允许之后继续执行

#![feature(generators, generator_trait)]

use std::ops::{Generator, GeneratorState};

fn main() {
    let mut generator = || {
        yield 1;
        return "foo"
    };

    match generator.resume() {
        GeneratorState::Yielded(1) => {}
        _ => panic!("unexpected value from resume"),
    }
    match generator.resume() {
        GeneratorState::Complete("foo") => {}
        _ => panic!("unexpected value from resume"),
    }
}

Rust 中所有的 Generator 都会被翻译为状态机来达到 Zero-cost Abstraction

#![feature(generators, generator_trait)]

use std::ops::Generator;

fn main() {
    let ret = "foo";
    let mut generator = move || {
        yield 1;
        return ret
    };

    generator.resume();
    generator.resume();
}

会被翻译成类似下面这样:

#![feature(generators, generator_trait)]

use std::ops::{Generator, GeneratorState};

fn main() {
    let ret = "foo";
    let mut generator = {
        enum __Generator {
            Start(&'static str),
            Yield1(&'static str),
            Done,
        }

        impl Generator for __Generator {
            type Yield = i32;
            type Return = &'static str;

            fn resume(&mut self) -> GeneratorState<i32, &'static str> {
                use std::mem;
                match mem::replace(self, __Generator::Done) {
                    __Generator::Start(s) => {
                        *self = __Generator::Yield1(s);
                        GeneratorState::Yielded(1)
                    }

                    __Generator::Yield1(s) => {
                        *self = __Generator::Done;
                        GeneratorState::Complete(s)
                    }

                    __Generator::Done => {
                        panic!("generator resumed after completion")
                    }
                }
            }
        }

        __Generator::Start(ret)
    };

    generator.resume();
    generator.resume();
}

总之就是一个很好用的特性, 而且还没任何额外开销, 不过只是这样用处并不大


futures 提供的 Future 代表一个将会在未来某个时间可用的值

这个值可能因为网络, 或其他什么原因 Block 住了, 现在无法得到

用户可以用它提供的各种 Combinator 写完后续的逻辑, 当值可用的时候会自动调用 (像不像回调?)

但也只比回调好了一点点而已, 逻辑一旦出现分支, 痛不欲生 (想知道有多痛, 请看这里)

而我们现在有了 Generator, 终于可以 "近乎" 完美的解决它了

每个 Future 都有一个 .poll 方法 (实现 Future 时唯一需要手动实现的方法)

调度器 (比如 tokio 提供的) 会在收到关联事件时调用它

它返回处理好的值或者 NotReady 表示值还未准备好 (此时调度器会在下一次事件后重试)

一个 Future.poll 大概长这样:

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
    if !self.step1_done {
        self.do_step1();
        if !self.step2_ready() {
            return Ok(Async::NotReady)
        }
    }
    if !self.step2_done {
        self.do_step2();
        if !self.step3_ready() {
            return Ok(Async::NotReady)
        }
    }
    if !self.step3_done {
        let result = self.do_step3();
        Ok(Async::Ready(result))
    } else {
        panic!("future already completed");
    }
}

充满了状态判断, 还容易不小心写错

有了 Generator 之后是这样的:

let generator = || {
    self.do_step1();
    if !self.step2_ready() {
        yield;
    }
    self.do_step2();
    if !self.step3_ready() {
        yield;
    }
    let result = self.do_step3();
    Ok(result)
};
GenFuture(generator)

yield 用于返回 NotReady (见下面), 免去了维护状态

GenFuture 用于将 Generator 转换为 Future, 实现非常简单:

struct GenFuture<T>(T);

impl<T> Future for GenFuture<T>
    where T: Generator<Yield = ()>,
          T::Return: Try,
{
    type Item = <T::Return as Try>::Ok;
    type Error = <T::Return as Try>::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.0.resume() {
            GeneratorState::Yielded(()) => Ok(Async::NotReady),
            GeneratorState::Complete(e) => e.into_result().map(Async::Ready),
        }
    }
}

同理, 在 Future 中使用 Future 也可以这么干

fn work() -> impl Future<Item = (), Error = Error> {
    get("http://exampl.com")
        .and_then(|response| {
            do_something(response)
        })
        .and_then(|result| {
            if result > 7 {
                Either::A(Ok(DEFAULT_VALUE))
            } else {
                Either::B(blahblah())
            }
        })
        .and_then(|result| {
            post(result)
        })
}
fn work() -> impl Future<Item = (), Error = Error> {
    let generator = || {
        let response = await!(get("http://example.com"))?;
        let result = if await!(do_something(response))? > 7 {
            DEFAULT_VALUE
        } else {
            await!(blahblah())?
        };
        await!(post(result))
    };
    GenFuture(generator)
}

await 的实现更加简单, 只需要一个宏

macro_rules! await {
    ($e:expr) => ({
        let mut future = $e;
        loop {
            match ::futures::Future::poll(&mut future) {
                ::futures::__rt::Ok(::futures::Async::Ready(e)) => {
                    break ::futures::__rt::Ok(e)
                }
                ::futures::__rt::Ok(::futures::Async::NotReady) => {}
                ::futures::__rt::Err(e) => {
                    break ::futures::__rt::Err(e)
                }
            }
            yield
        }
    })
}

神奇不神奇? Rust 里就这么有了 await

而这个 RFC 的提出者 Alex Crichton 还专门写了一个库 futures-await, 除了 await 还用 Procedural macro 实现了 async 自动修改函数签名以及异步的 Stream 迭代

感觉好像没什么毛病, 甚至相当完美


然而

事情并没有那么美好

目前这个 Coroutine/Generator 只是一个 eRFC(experimental RFC) 的实现, 永远不会 stabilize, 也就进不了 Rust stable, 除非有新的 RFC 完善它

然后 futures-await 现在的支持其实非常有限, 最简单的例子, #[async] 的函数并不能有借用的参数 (包括 self), 因为所有 lifetime 默认是 'static 的, 当然他们已经在解决了

同样由于上面的原因, trait 里用 async 并不靠谱

首先是 impl Trait 当前不支持 trait 里使用(需要等他们搞定高阶类型的实现, 猴年马月吧……), 但这个可以用 #[async(boxed)] 把返回值放 Box 里解决, 虽然进行了不必要的堆分配

然后就是 self, 借用肯定是不行了, 直接用总可以吧?

嗯……一半

self 在 trait 里不是对象安全的, 因为动态分发时无法推断类型大小, 就只有静态分发时可用

Box<Self> 又会导致重复的堆分配, 也坑

唯一的解决方案是 Rc<Self>, 但这……丑炸了


到这里总有种写了一半的感觉, 但我不想写了, 就这样吧