新 tokio 中的变动
架构更新
几个小时前, tokio 0.1.0 正式发布了
如果没有一直关注, 肯定是一脸茫然
tokio-core 呢? reactor::Core
又跑哪去了??
其实这是计划了差不多半年的一个变动
因为之前的 futures 和 tokio 都只是 "能用" 的程度, 在被大量使用后暴露出了很多设计上的问题, 并且还有不少要放弃向后兼容才能修复
所以这次就攒到一起重新设计顺便分了个新的 crate
而旧的 tokio-core 会被保留做兼容, 新项目应该全部使用 tokio
除了包名变更, 值得注意的还有以下几点:
reactor::Core
更名为reactor::Reactor
(虽然是这样改了, 不过以后一般并不需要直接用到这货, 后详)- 移除了
reactor::Remote
,reactor::Handle
被标记为Send
+Sync
取代了reactor::Remote
的功能 (毕竟本来这俩就很重复没必要) - 将
reactor::Interval
和reactor::Timeout
移出, 单独分了一个 crate 叫 tokio-timer - tokio-io 合并进 tokio, 这个虽然还没动, 不过 tokio-io 已经被放到 tokio 的 workspace 里了
- Task execution 的抽象被移到 futures 层, 可以更方便的实现 executor
- 提供一个 default global event loop. 最有意思的来了, 结合上一条 executor 的抽象, tokio 的
Reactor
可以只管 IO 并 notify 其他Task
就行
关于 default global event loop, 下面细说一下
extern crate futures;
extern crate futures_cpupool;
extern crate tokio;
extern crate tokio_io;
use std::env;
use std::net::SocketAddr;
use futures::Future;
use futures::future::Executor;
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
use tokio_io::io::copy;
use tokio::net::TcpListener;
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse::<SocketAddr>().unwrap();
let socket = TcpListener::bind(&addr).unwrap();
println!("Listening on: {}", addr);
let pool = CpuPool::new(1);
let done = socket.incoming().for_each(move |socket| {
let (reader, writer) = socket.split();
let amt = copy(reader, writer);
let msg = amt.then(move |result| {
match result {
Ok((amt, _, _)) => println!("wrote {} bytes", amt),
Err(e) => println!("error: {}", e),
}
Ok(())
});
pool.execute(msg).unwrap();
Ok(())
});
done.wait().unwrap();
}
这是官方的 echo server example
值得注意的是 Reactor
不见了, done
这个 Future 直接调用了 .wait()
按旧的 tokio-core
的思维, 上面这个代码因为没有运行 Reactor
, 所有 IO 相关的 Task
都会被 block 住, 永远不会被 poll
如果之前对 futures 和 tokio 不是很了解的话, 这里补充一下:
一个 Future 或者 Stream 被运行后即 Task, Task NotReady 后会等待下一次 poll, 这个下一次 poll 的时间是由 Task 的 .notify()
何时被调用来决定的
而所有 IO 相关的, 在 tokio 里是运行一个 Reactor
(比如 Linux 里是 epoll), 由这个 Reactor
来调用 .notify()
那 Reactor
跑哪去了? TcpListener::bind
原本所需要的 Handle
参数又跑哪去了?
跟着源码来
impl TcpListener {
/// Create a new TCP listener associated with this event loop.
///
/// The TCP listener will bind to the provided `addr` address, if available.
/// If the result is `Ok`, the socket has successfully bound.
pub fn bind(addr: &SocketAddr) -> io::Result<TcpListener> {
let l = try!(mio::net::TcpListener::bind(addr));
TcpListener::new(l, &Handle::default())
}
哈! Handle
实现了 Default
! 看 Default
的实现又会发现它用了一个全局变量存了一个 Handle
的指针
impl Default for Handle {
fn default() -> Handle {
let mut fallback = HANDLE_FALLBACK.load(SeqCst);
// If the fallback hasn't been previously initialized then let's spin
// up a helper thread and try to initialize with that. If we can't
// actually create a helper thread then we'll just return a "defunkt"
// handle which will return errors when I/O objects are attempted to be
// associated.
if fallback == 0 {
let helper = match global::HelperThread::new() {
Ok(helper) => helper,
Err(_) => return Handle { inner: Weak::new() },
};
// If we successfully set ourselves as the actual fallback then we
// want to `forget` the helper thread to ensure that it persists
// globally. If we fail to set ourselves as the fallback that means
// that someone was racing with this call to `Handle::default`.
// They ended up winning so we'll destroy our helper thread (which
// shuts down the thread) and reload the fallback.
if set_fallback(helper.handle().clone()).is_ok() {
let ret = helper.handle().clone();
helper.forget();
return ret
}
fallback = HANDLE_FALLBACK.load(SeqCst);
}
// At this point our fallback handle global was configured so we use
// its value to reify a handle, clone it, and then forget our reified
// handle as we don't actually have an owning reference to it.
assert!(fallback != 0);
unsafe {
let handle = Handle::from_usize(fallback);
let ret = handle.clone();
drop(handle.into_usize());
return ret
}
}
}
global::HelperThread::new
更简单, 直接开了一个线程跑 Reactor
impl HelperThread {
pub fn new() -> io::Result<HelperThread> {
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle().clone();
let done = Arc::new(AtomicBool::new(false));
let done2 = done.clone();
let thread = thread::Builder::new().spawn(move || run(reactor, done))?;
Ok(HelperThread {
thread: Some(thread),
reactor: reactor_handle,
done: done2,
})
}
问题解决, tokio 用了简单暴力的方法——多开一个线程
不过由于是隐式的, 某些特定环境中可能会被坑
然后由于只有一个 event loop, 极端情况下还可能会成为瓶颈
好在其实是有 TcpListener::from_std
可以手动设置 Reactor
的, 注意不要被质量差的库偷偷用 Handle::default
就行了
至于还有什么 tokio::executor::current_thread
和 CpuPool
, 就是 .wait()
的高级版, 允许 spawn 多个 Task 同时运行在单个和多个线程就像是了
这次的更新只是开始, 下一步还有 futures 0.2 和 tower 发布