iovxw

新 tokio 中的变动

架构更新

几个小时前, tokio 0.1.0 正式发布了

如果没有一直关注, 肯定是一脸茫然

tokio-core 呢? reactor::Core 又跑哪去了??

其实这是计划了差不多半年的一个变动

因为之前的 futurestokio 都只是 "能用" 的程度, 在被大量使用后暴露出了很多设计上的问题, 并且还有不少要放弃向后兼容才能修复

所以这次就攒到一起重新设计顺便分了个新的 crate

而旧的 tokio-core 会被保留做兼容, 新项目应该全部使用 tokio


除了包名变更, 值得注意的还有以下几点:

关于 default global event loop, 下面细说一下

extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;

use std::env;
use std::net::SocketAddr;

use futures::Future;
use futures::future::Executor;
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpListener;

fn main() {
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let addr = addr.parse::<SocketAddr>().unwrap();

    let socket = TcpListener::bind(&addr).unwrap();
    println!("Listening on: {}", addr);

    let pool = CpuPool::new(1);

    let done = socket.incoming().for_each(move |socket| {
        let (reader, writer) = socket.split();
        let amt = copy(reader, writer);

        let msg = amt.then(move |result| {
            match result {
                Ok((amt, _, _)) => println!("wrote {} bytes", amt),
                Err(e) => println!("error: {}", e),
            }

            Ok(())
        });

        pool.execute(msg).unwrap();

        Ok(())
    });

    done.wait().unwrap();
}

这是官方的 echo server example

值得注意的是 Reactor 不见了, done 这个 Future 直接调用了 .wait()

按旧的 tokio-core 的思维, 上面这个代码因为没有运行 Reactor, 所有 IO 相关的 Task 都会被 block 住, 永远不会被 poll

如果之前对 futurestokio 不是很了解的话, 这里补充一下:

一个 Future 或者 Stream 被运行后即 Task, Task NotReady 后会等待下一次 poll, 这个下一次 poll 的时间是由 Task 的 .notify() 何时被调用来决定的

而所有 IO 相关的, 在 tokio 里是运行一个 Reactor (比如 Linux 里是 epoll), 由这个 Reactor 来调用 .notify()

Reactor 跑哪去了? TcpListener::bind 原本所需要的 Handle 参数又跑哪去了?

跟着源码来

impl TcpListener {
    /// Create a new TCP listener associated with this event loop.
    ///
    /// The TCP listener will bind to the provided `addr` address, if available.
    /// If the result is `Ok`, the socket has successfully bound.
    pub fn bind(addr: &SocketAddr) -> io::Result<TcpListener> {
        let l = try!(mio::net::TcpListener::bind(addr));
        TcpListener::new(l, &Handle::default())
    }

哈! Handle 实现了 Default! 看 Default 的实现又会发现它用了一个全局变量存了一个 Handle 的指针

impl Default for Handle {
    fn default() -> Handle {
        let mut fallback = HANDLE_FALLBACK.load(SeqCst);

        // If the fallback hasn't been previously initialized then let's spin
        // up a helper thread and try to initialize with that. If we can't
        // actually create a helper thread then we'll just return a "defunkt"
        // handle which will return errors when I/O objects are attempted to be
        // associated.
        if fallback == 0 {
            let helper = match global::HelperThread::new() {
                Ok(helper) => helper,
                Err(_) => return Handle { inner: Weak::new() },
            };

            // If we successfully set ourselves as the actual fallback then we
            // want to `forget` the helper thread to ensure that it persists
            // globally. If we fail to set ourselves as the fallback that means
            // that someone was racing with this call to `Handle::default`.
            // They ended up winning so we'll destroy our helper thread (which
            // shuts down the thread) and reload the fallback.
            if set_fallback(helper.handle().clone()).is_ok() {
                let ret = helper.handle().clone();
                helper.forget();
                return ret
            }
            fallback = HANDLE_FALLBACK.load(SeqCst);
        }

        // At this point our fallback handle global was configured so we use
        // its value to reify a handle, clone it, and then forget our reified
        // handle as we don't actually have an owning reference to it.
        assert!(fallback != 0);
        unsafe {
            let handle = Handle::from_usize(fallback);
            let ret = handle.clone();
            drop(handle.into_usize());
            return ret
        }
    }
}

global::HelperThread::new 更简单, 直接开了一个线程跑 Reactor

impl HelperThread {
    pub fn new() -> io::Result<HelperThread> {
        let reactor = Reactor::new()?;
        let reactor_handle = reactor.handle().clone();
        let done = Arc::new(AtomicBool::new(false));
        let done2 = done.clone();
        let thread = thread::Builder::new().spawn(move || run(reactor, done))?;

        Ok(HelperThread {
            thread: Some(thread),
            reactor: reactor_handle,
            done: done2,
        })
    }

问题解决, tokio 用了简单暴力的方法——多开一个线程

不过由于是隐式的, 某些特定环境中可能会被坑

然后由于只有一个 event loop, 极端情况下还可能会成为瓶颈

好在其实是有 TcpListener::from_std 可以手动设置 Reactor 的, 注意不要被质量差的库偷偷用 Handle::default 就行了


至于还有什么 tokio::executor::current_threadCpuPool, 就是 .wait() 的高级版, 允许 spawn 多个 Task 同时运行在单个和多个线程就像是了


这次的更新只是开始, 下一步还有 futures 0.2 和 tower 发布