Mini Redis Tokio Tutorial Zh
简介
本文档是 Tokio 教程中的 Mini Redis 项目的非官方中文翻译。个人英文水平有限,看看就好。
使用方法
点击左侧目录即可,然后在 github 项目中查看 Commit 历史记录,即可看到对应每一章节更改的内容。
英文 Tutorial 原链接: https://tokio.rs/tokio/tutorial
本文档最后更新日期:2024 年 10 月 21 日
Tutorial
Tokio 是 Rust 的异步运行时。提供了编写网络应用所需要的构建模块。它可以灵活地针对各种系统,从具有数十个内核的大型服务器到小型嵌入式设备。
从高层次上看,Tokio 提供了以下主要模块:
- 一个执行异步代码的多线程运行时。
- 一个标准库的异步版本。
- 一个庞大的库生态系统。
Tokio 在你的项目中扮演的角色
当用异步方式编写程序时,可以通过降低同时执行许多操作的成本,来让其能更好的扩展。然而,异步的 Rust 代码不能自动运行,所以你必须用一个运行时来执行,Tokio 就是最广泛使用的运行时,其使用量超过了其他运行时的总和。
此外,Tokio 也提供了很多好用的工具。当编写异步代码时,你不能使用 Rust 标准库提供的普通阻塞 API,你必须用它们的异步版本。这些替代版本将会由 Tokio 提供,在有意义的(make sense)地方反映了 Rust 标准库的 API。
Tokio 的优势
本节将会列出 Tokio 的一些优势。
快速
Tokio 是非常快速的,基于本身就很快的 Rust 构建,这是本着 Rust 的精神完成的,其目标是您不用通过手动编写等效的代码来提高性能。
Tokio 是可扩展的,建立在 async/await 语言功能(feature)之上,而 async/await 语言功能本身是可扩展的。在处理网络时,由于延迟,处理连接的速度是有限制的,因此唯一的扩展方法是一次处理多个连接。借助 async/await 语言功能,增加并发操作的数量变得非常容易,这将允许扩展到大量的并发任务。
可靠
Tokio 使用 Rust 构建,Rust 是一个可以使每个人都能构建可靠且高效的软件的语言。许多的研究发现,大约 ~70% 的高严重性安全漏洞是内存不安全的结果。而使用 Rust 可以消除这些应用程序中所有的该类错误。
Tokio 还非常注重提供一致性的行为,同样的代码不会导致其他结果。Tokio 的主要目标就是让用户编写可以预测行为的软件,该软件日复一夜地执行并且能够具有可靠的响应时间,不会出现不可预测的延迟峰值。
易用
借助 Rust 的 async/await 功能,编写异步程序的复杂性大大降低。与 Tokio 工具和充满活力的生态系统相结合,编写程序将会变得轻而易举。
Tokio 在合理的情况下(make sense)遵守标准库的命名约定。这使得可以轻松地将使用标准库写的代码转换为使用 Tokio 编写的代码。通过 Rust 强大的类型系统,轻松交付正确代码的能力将得到很大提升。
灵活
Tokio 提供了多种运行时变体。从多线程、work-stealing的运行时到轻量级的单线程运行时,应有尽有。这些运行时中的每一个都带有许多选项,允许用户根据自己的需要调整它们。
不该用 Tokio 的场景
纵使 Tokio 对于很多需要同时执行大量操作的项目很有用,但是也有一些情况不太适合使用 Tokio。
- 在多个线程上并行运行 CPU 密集型运算任务。Tokio 是专为 IO 密集型应用来设计的,而且其中单独任务大部分时间都在等待 IO。如果您的应用只做并行计算,那么您应该使用rayon。也就是说,仍然可以“混合搭配”,如果您两件事都要做的话。请看这篇文章了解实例。
- 读取大量文件。虽然 Tokio 似乎对于只需要读取大量文件的项目很有用,但是与普通线程池相比,Tokio 在这种情况没有任何优势。这是因为操作系统一般不提供异步文件 API。
- 发送单个 Web 请求。Tokio 的优势在于在同时做很多事情。如果需要使用用于异步 Rust 的库,例如reqwest,但是不需要同时做很多事,可以选择该库的阻塞版本,这样会使得项目更简单。当然,使用 Tokio 也行,但是与阻塞 API 相比没有真正的优势。如果该库不提供阻塞 API,请看有关于桥接同步代码的章节。
获取帮助
如果您在任何时候遇到困难,都可以在Discord或者Github discussions中获得帮助。不要羞于提问“初学者”问题,我们随时准备着乐于提供帮助
Setup 准备工作
本教程将会带您完整构建Redis客户端(client)和服务端(server)的过程。我们将从异步 Rust 的基础知识开始,并从这里开始构建。我们将会实现 Redis 命令的子集,但同时会对 Tokio 进行全面介绍。
Mini-Redis
本教程构建的项目Mini-Redis 在 GitHub 上。Mini-Redis 被设计于学习 Tokio,广受好评,但是同时也缺失了一些真正 Redis 中的功能。当然您可以在crate.io上找到可用于生产的 Redis 库。
我们将会在教程中直接使用 Mini-Redis。这让我们可以先使用 Mini-Redis 中的部分内容,之后再在本教程后面实现它们。
获取帮助
如果您在任何时候遇到困难,都可以在Discord或者Github discussions中获得帮助。不要羞于提问“初学者”问题,我们随时准备着乐于提供帮助
预先准备
读者应该已经熟悉Rust,《Rust book》是一本极好的入门资源。
虽然不是必须的,但是有使用Rust 标准库和其他语言网络编程的经验会有一些帮助。
无需提前知道关于 Redis 的知识。
Rust
在开始之前,你需要确保Rust工具链已经安装并随时可用。如果没装,使用rustup安装时最容易的。
本教程需要 Rust 最低版本是1.45.0,但是用最新稳定版是推荐的。
检查电脑上 Rust 版本,在终端中运行:
$ rustc --version
您应该看到这样的输出rustc 1.78.0 (9b00956e5 2024-04-29)
Mini-Redis 服务端
接下来,安装 Mini-Redis 服务端。这个可以用来测试我们构建的客户端。
$ cargo install mini-redis
通过运行服务端来确保安装成功:
$ mini-redis-server
然后,在另外一个终端窗口,尝试使用mini-redis-cli获取 key foo
$ mini-redis-cli get foo
你会看到(nil)。
准备好开始
就这样一切准备就绪了,转到下一页来编写您的第一个异步 Rust 应用程序。
Hello Tokio 你好 Tokio
我们将会从编写一个非常基础的 Tokio 应用开始。它将会连接到 Mini-Redis 服务器,把键(key) hello 的值(value)设置为world。然后读取 key。这将使用 Mini-Redis 客户端库(client library)来完成。
代码
建一个新 crate
让我们通过新建一个 Rust 应用开始:
$ cargo new my-redis
$ cd my-redis
添加依赖
接下来,打开Cargo.toml并且添加如下依赖到[dependencies]:
tokio = { version = "1", features = ["full"] }
mini-redis = "0.4"
编写代码
然后,打开main.rs,替换内容如下:
use mini_redis::{client, Result}; #[tokio::main] async fn main() -> Result<()> { // 对mini-redis的地址打开一个链接 let mut client = client::connect("127.0.0.1:6379").await?; // 设置key "hello" 的value为 "world" client.set("hello", "world".into()).await?; // 获取key "hello" let result = client.get("hello").await?; println!("got value from the server; result={:?}", result); Ok(()) }
确保 Mini-Redis 服务端正在运行。新建一个终端窗口,执行:
$ mini-redis-server
如果你还没安装 mini-redis,这样做:
$ cargo install mini-redis
现在,运行my-redis应用:
$ cargo run
got value from the server; result=Some(b"world")
成功了!
你可以在这里找到全部代码。
代码分解
让我们花点时间看看我们刚刚干了什么。虽然没有多少代码,但是却发生了很多事。
#![allow(unused)] fn main() { let mut client = client::connect("127.0.0.1:6379").await?; }
client::connect函数是mini-rediscrate 提供的。它与指定的远程地址异步建立 TCP 连接。连接建立后,client句柄(handle)被返回。虽然操作是异步执行的,但是我们编写的代码却看起来是同步的。该操作是异步的唯一提示就是.await操作符。
什么是异步编程?
大多数计算机程序的执行顺序与编写顺序相同。第一行执行,然后执行下一行,依此类推。同步编程时,当程序遇到无法立即完成的操作时,就会阻塞,直到操作完成。例如,建立 TCP 连接需要通过网络与对等方进行交换,这可能需要相当长的时间。在此期间,线程被阻塞。
使用异步编程时,无法立即完成的操作将在后台挂起。线程不会被阻塞,并且可以继续运行其他内容。操作完成后,任务将取消挂起,并从中断的位置继续处理。我们之前的示例只有一个任务,因此在挂起时不会发生任何事情,但异步程序通常有许多这样的任务。
尽管异步编程可以加快应用程序速度,但它通常会导致程序更加复杂。程序员需要跟踪异步操作完成后恢复工作所需的所有状态。纵观古今,这是一项繁琐且容易出错的任务。
编译时绿色线程
Rust 使用名为async/await的功能来实现异步编程。执行异步操作的函数使用async标记。在我们的例子中,connect函数的定义大概长这样:
#![allow(unused)] fn main() { use mini_redis::Result; use mini_redis::client::Client; use tokio::net::ToSocketAddrs; pub async fn connect<T: ToSocketAddrs>(addr: T) -> Result<Client> { // ... } }
async fn定义看起来像常规的同步函数,但是里面的操作是异步的。Rust 在编译时会把async fn转换为异步运行的例程(routine)。在async fn中任何调用.await时,都会把控制权返回给线程。当操作在后台进行时,该线程可以执行其他工作。
warning 尽管其他语言也实现了
async/await,但 Rust 中采用了独特的方法。主要是,Rust 中的异步操作都是懒惰的(lazy)。这会导致运行时语义与其他语言不同。
如果还是不太明白,别担心。我们将会在整个指南中探索更多async/await的内容。
使用async/await
异步函数可以像其他 Rust 函数一样被调用。然而,调用这些函数不会立刻执行函数体。相反的,调用async fn返回了一个代表操作的值。概念上类似于零参数闭包。要实际执行该操作,应该在返回值上使用.await操作符。
例如以下的程序:
async fn say_world() { println!("world"); } #[tokio::main] async fn main() { // 调用 `say_world()` 并没有执行 `say_world()` 的函数体. let op = say_world(); // 这个 println! 会先执行 println!("hello"); // 在 `op` 上调用 `.await` 才会开始执行 `say_world`. op.await; }
输出:
hello
world
async fn的返回值是实现了Future trait 的匿名类型。
异步main函数
用于启动应用的主函数(main function)与其他大多数 Rust 包中的不同。
- 是一个
async fn异步函数 - 带了
#[tokio::main]注解
async fn可以让我们进入异步上下文。然而,异步函数必须由运行时来执行。这个运行时包含了异步任务的调度程序,提供事件 I/O,计时器等等。运行时并不会自动启动,所以需要 main 函数来启动它。
#[tokio::main]是一个宏。会把async fn main()转换为同步的fn main(),初始化运行时实例,并执行异步的 main 函数。
例如接下来:
#[tokio::main] async fn main() { println!("hello"); }
可以转换为:
fn main() { let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { println!("hello"); }) }
Tokio 运行时的详细信息将会在稍后介绍。
Cargo features
当本教程依赖 Tokio 时, 将启用full feature(全部功能标志):
tokio = { version = "1", features = ["full"] }
Tokio 具有很多功能 (TCP, UDP, Unix sockets, timers, sync utilities, multiple scheduler types, etc)。并非所有程序都需要这些功能。当尝试优化编译时间或最终程序的占用空间时,应用可以决定仅使用的 feature。
Spawning 生成任务
现在我们来实现 Redis 服务端。
首先,把上一节客户端SET/GET代码移动到 example 文件中。这样我们就可以在服务器上运行它。
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
然后新建一个空文件src/main.rs,并继续。
接收套接字
我们 Redis 服务器要做的第一件事就是接收 TCP 套接字。这是通过绑定tokio::net::TcpListener到6379端口来完成的。
info Tokio 很多类型名与 Rust 标准库中同步类型相同。在合理情况(make sense)下,Tokio 暴露了与 std 相同的 API,但是使用
async fn。
然后在一个 loop 循环中接收套接字。每个套接字都会被处理然后关闭。现在,我们将会读取指令,将其打印到 stdout 并响应一个 error。
src/main.rs
use tokio::net::{TcpListener, TcpStream}; use mini_redis::{Connection, Frame}; #[tokio::main] async fn main() { // 把listener绑定到这个地址 let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); loop { // 第二个值包含了新连接的IP和端口 let (socket, _) = listener.accept().await.unwrap(); process(socket).await; } } async fn process(socket: TcpStream) { // 这个 `Connection` 让我们读/写Redis **帧(frames)** 而不是 // 比特流. 这个 `Connection` 类型是由 mini-redis 定义的。 let mut connection = Connection::new(socket); if let Some(frame) = connection.read_frame().await.unwrap() { println!("GOT: {:?}", frame); // 响应一个error let response = Frame::Error("unimplemented".to_string()); connection.write_frame(&response).await.unwrap(); } }
现在,运行这个接收循环:
$ cargo run
在独立的终端窗口,运行hello-redis例子(之前章节写的SET/GET命令):
$ cargo run --example hello-redis
输出应该是:
Error: "unimplemented"
服务端的终端窗口,输出应该是:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
并发
我们的服务端有个小问题(除了仅仅响应 error 之外)。它一次只能处理一个入站请求。当一个连接被接收,服务端将会停留在接收循环里,直到响应完全写入套接字。
我们希望 Redis 服务端可以处理许多并发请求。所以我们需要添加一些并发功能。
info 并发和并行不是一回事。如果你在两个任务之间交替,那你就是并发地处理两个任务,而不是并行的。如果是并行的话,你需要两个人,每个人专门负责一个任务。
使用 Tokio 的一个优点就是,异步代码允许您同时处理多个任务,而不用使用普通线程并行得处理它们。事实上,Tokio 可以在单个线程上运行多个任务!
为了同时处理连接,将会为每个入站连接生成一个新任务。连接会在此任务上进行处理。
接收循环变成下面这样:
use tokio::net::TcpListener; #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); loop { let (socket, _) = listener.accept().await.unwrap(); // 每个传入的套接字都会被生成一个新任务。套接字(的所有权) // 被移动到新任务并且在那儿处理。 tokio::spawn(async move { process(socket).await; }); } }
任务
一个 Tokio 任务就是一个异步绿色线程。它们是通过async块传递给tokio::spawn来创建的。tokio::spawn函数返回一个JoinHandle,调用者可以使用它来与生成的任务进行交互。async块可以有返回值。调用者可以使用.await作用于JoinHandle上,来获取返回值。
例如:
#[tokio::main] async fn main() { let handle = tokio::spawn(async { // 做一些异步任务 "return value" }); // 做一些其他任务 let out = handle.await.unwrap(); println!("GOT {}", out); }
等待JoinHandle返回一个Result。当一个任务执行中发生错误,JoinHandle会返回一个Err。任务 Panic 或者因为运行时关闭而被强制取消时,会发生这种情况。
任务是调度器管理的执行单元。生成任务会提交给 Tokio 调度器,然后调度器会确保这个任务在有工作时执行。生成的任务可以在生成它的同一个线程上执行,也可以在不同的线程上执行。任务生成后也可以在线程间移动。
Tokio 中的任务非常轻量。在底层,它们只需要一次性分配 64 字节内存。应用应该可以直接生成数千计,甚至数百万的任务。
'static bound
当在 Tokio 运行时生成一个任务时,它的类型生命周期必须是'static的。这意味着生成的任务不能包含任何任务外的数据的引用。
info 普遍认为
'static意味着“永远存在”,但事实并非如此。仅仅因为一个'static值不意味着存在内存泄漏。你可以在Rust 生命周期误会中阅读更多。
例如,下面的代码不能通过编译:
use tokio::task; #[tokio::main] async fn main() { let v = vec![1, 2, 3]; task::spawn(async { println!("Here's a vec: {:?}", v); }); }
尝试编译会导致以下错误:
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
这种情况是因为在默认情况下,变量不会移动move到异步块中。v vector 仍被main函数所拥有。println!行借用了v。Rust 编译器向我们解释了这一点,甚至给出了如何修改的建议!将第 7 行更改为task::spawn(async move {将会让编译器把v``移动move到生成的任务中。现在,这个任务拥有了它自己的数据,使其成为'static的。
如果必须同时从多个任务访问单个数据,那就必须使用Arc等同步原语来共享。
注意,刚刚的错误信息也提到了参数类型比'static生命周期活得更长。这种术语可能相当令人困惑,因为'static生命周期会持续到程序结束,所以如果比它活得还长,是不是出现了内存泄漏?解释是,它是个类型,不是必须比'static活得更长的值,并且这个值有可能在类型不再有效之前被摧毁。
当我们说一个值是'static时,意思是永远保持该值不是错误的。这很重要,因为编译器无能推断新生成的任务会持续多长时间。我们必须确保任务能够永远活着,这样 Tokio 就可以让任务运行它需要长的时间。
Info 框链接的文章使用了术语“受'static限制”而不是“它的类型活得比'static长”或者“值为'static,引用于T: 'static”。之前这些都是一个意思,但是与&'static T中的'static注解不同(这是个引用的生命周期,前面的不是)。
Send bound
由tokio::spawn生成的任务必须实现了Send。这使得Tokio运行时可以在线程之间移动任务,同时在.await处挂起任务。
当所有通过调用.await的数据都是Send的,任务才能是Send的。这有点微妙。当.await被调用,任务返回给调度器。下一次任务被执行时,它将会从上次返回时恢复。为了实现这种功能,任务必须保存.await之后的所有状态。如果这个状态是Send的,即可以跨线程移动,那么任务本身就可以跨线程移动。相反的,如果状态没实现Send,那任务也不是Send的。
例如,这个可以运行:
use tokio::task::yield_now; use std::rc::Rc; #[tokio::main] async fn main() { tokio::spawn(async { // 这歌代码块强制 `rc` 在 `.await` 之前 drop 了 { let rc = Rc::new("hello"); println!("{}", rc); } // `rc` 没再用。 当任务返回给(yield)调度器,它**没再**持续 yield_now().await; }); }
这个不行:
use tokio::task::yield_now; use std::rc::Rc; #[tokio::main] async fn main() { tokio::spawn(async { let rc = Rc::new("hello"); // `rc` 在 `.await` 之后使用了,它必须在任务状态中持续 yield_now().await; println!("{}", rc); }); }
尝试编译这段代码会导致:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
我们将会在下一章节更深入探讨此类特殊情况。
存储值
我们现在将会实现process函数来处理传入的指令。我们将会用一个HashMap来存储值。SET指令会插入到HashMap中,GET会加载它们。此外,我们会使用循环来为每个连接接收多条指令。
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Connection, Frame}; async fn process(socket: TcpStream) { use mini_redis::Command::{self, Get, Set}; use std::collections::HashMap; // HashMap用来存储值 let mut db = HashMap::new(); // 由 `mini-redis` 提供的 Connection ,处理解析从套接字传过来的帧 // (handles parsing frames from the socket) let mut connection = Connection::new(socket); // 使用 `read_frame` 来从连接中接收一条指令。 while let Some(frame) = connection.read_frame().await.unwrap() { let response = match Command::from_frame(frame).unwrap() { Set(cmd) => { // 值被存储为 `Vec<u8>` db.insert(cmd.key().to_string(), cmd.value().to_vec()); Frame::Simple("OK".to_string()) } Get(cmd) => { if let Some(value) = db.get(cmd.key()) { // `Frame::Bulk` 期望数据类型是 `Bytes`。 // 这种类型在本教程中稍后介绍。现在, // `&Vec<u8>` 可以使用 `into()` 转换为 `Bytes`。 Frame::Bulk(value.clone().into()) } else { Frame::Null } } cmd => panic!("unimplemented {:?}", cmd), }; // 写回响应,传回给客户端 connection.write_frame(&response).await.unwrap(); } } }
现在,启动服务端:
$ cargo run
然后在另外的终端窗口,运行hello-redis例子:
$ cargo run --example hello-redis
现在,输出应该是:
got value from the server; result=Some(b"world")
现在,我们可以获取和设置值,但还有个问题:连接之间,值不能被共享。如果另一个套接字连接,并尝试GET hellokey,它将找不到任何内容。
你可以在这里找到完整代码。
下一节,我们将会为所有套接字实现持久数据。
Shared state 共享状态
到目前为止,我们已经有了一个可正常工作的 key-value 服务端。然而,有个主要问题:状态不能跨连接共享。我们将在本文中解决。
策略
有好几种在 Tokio 中共享状态的方法。
- 使用互斥体(Mutex)保护(Guard)共享状态。
- 生成一个任务来管理状态,并使用消息传递(message passing)来操作它。
通常,您应该对简单数据使用第一种方法,对异步任务使用第二种方法(例如 I/O 原语操作)。在本章中,共享的数据是HashMap,对应的操作是insert和get。这两种操作都不是异步的,所以我们使用Mutex。
下一章将会介绍后一种方法。
添加bytes依赖
Mini-Redis 不用Vec<u8>,而是使用bytes库中的Bytes类型。Bytes的目标是为网络编程提供一种健壮的(robust)字节数组结构。它比较Vec<u8>添加的最大的特性就是浅克隆(shallow cloning)。换句话说,在Bytes实例上调用clone()不会导致底层数据被复制。相反的,Bytes实例是某些底层数据的引用计数(reference-counted)。Bytes大概是Arc<Vec<u8>>,但有些额外功能。
添加bytes库,需要在Cargo.toml中的[dependencies]添加:
bytes = "1"
初始化HashMap
HashMap将会在很多任务和潜在的许多线程中共享。为了支持这一点,需要包装在Arc<Mutex<_>>中。
首先,为了方便,在use语句后面添加一个类型别名。
#![allow(unused)] fn main() { use bytes::Bytes; use std::collections::HashMap; use std::sync::{Arc, Mutex}; type Db = Arc<Mutex<HashMap<String, Bytes>>>; }
然后,更新main函数来初始化HashMap,并且把Arc**句柄(handle)**传递给process函数。使用Arc允许Hashmap在很多任务中被引用,而这些任务可能运行在很多线程上。在整个 Tokio 中,术语句柄(handle)用来指代提供对某些共享状态访问权限的引用值。
use tokio::net::TcpListener; use std::collections::HashMap; use std::sync::{Arc, Mutex}; #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); println!("Listening"); let db = Arc::new(Mutex::new(HashMap::new())); loop { let (socket, _) = listener.accept().await.unwrap(); // Clone the handle to the hash map. let db = db.clone(); println!("Accepted"); tokio::spawn(async move { process(socket, db).await; }); } }
关于使用std::sync::Mutex
注意,使用std::sync::Mutex而不是tokio::sync::Mutex来守卫(guard)HashMap。一个常见的错误,是在异步代码中全都用tokio::sync::Mutex。异步互斥体(async mutex)是在调用.await时锁定(locked)的互斥体。
同步互斥体(sync mutex)会在等待请求锁(lock)时,阻塞当前线程。这样的话,将会阻止其他任务的处理。但是,使用tokio::sync::Mutex也没啥用。因为异步互斥体内部使用了同步互斥体。
根据经验,只要数据竞争保持在较低水平并且调用.await没有持有锁,就可以在异步代码中使用同步互斥体。
更新process()
process 函数不再初始化Hashmap。相反,它会使用HashMap的共享句柄来作为参数。当然在使用之前,也需要先给HashMap上锁。记住HashMap的 value 类型现在是Bytes(可以廉价地克隆),所以这个也得改。
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Connection, Frame}; async fn process(socket: TcpStream, db: Db) { use mini_redis::Command::{self, Get, Set}; // 由 `mini-redis` 提供的 Connection ,处理解析从套接字传过来的帧 // (handles parsing frames from the socket) let mut connection = Connection::new(socket); while let Some(frame) = connection.read_frame().await.unwrap() { let response = match Command::from_frame(frame).unwrap() { Set(cmd) => { let mut db = db.lock().unwrap(); db.insert(cmd.key().to_string(), cmd.value().clone()); Frame::Simple("OK".to_string()) } Get(cmd) => { let db = db.lock().unwrap(); if let Some(value) = db.get(cmd.key()) { Frame::Bulk(value.clone()) } else { Frame::Null } } cmd => panic!("unimplemented {:?}", cmd), }; // 写回响应,传回给客户端 connection.write_frame(&response).await.unwrap(); } } }
任务,线程,以及数据竞争
当数据竞争最少时,使用阻塞互斥锁(blocking mutex)来守卫(guard)较小的临界区(short critical sections)是可被接受的。当锁被争用时,执行任务的线程必须阻塞,并等待互斥体解锁。这不仅仅会阻塞当前任务,也同样会阻塞这个线程上调度的其他所有任务。
默认情况下,Tokio 运行时使用多线程调度器。任务会被运行时的调度器调度到任意数量的线程上。如果大量的任务都调度执行,并且它们都需要访问同一个互斥体,就会出现数据竞争。另一方面,如果 Tokio 使用current_thread运行时(当前线程运行时),那么互斥体将永远不会发生争用。
info >
current_thread运行时是一个轻量化的,单线程的运行时。当仅生成少量任务并且打开少量套接字(socket)时,这是个好选择。例如,当提供一个同步 API 桥(synchronous API bridge)在异步客户端库之上,这个选择运行效果很好。
如果同步互斥锁上的数据竞争成为问题,那么最好的结局方法并不是切换到 Tokio 互斥锁。考虑下面的选择:
- 使用一个专用任务,来管理状态,并使用消息传递。
- 对互斥体分片。
- 重构代码,来避免互斥体。
在我们的例子中,由于每个键都是独立的,所以把互斥体分片(mutex sharding)效果很好。为此,我们将引入N个不同的实例,而不是使用单个Mutex<HashMap<_, _>>。
#![allow(unused)] fn main() { type ShardedDb = Arc<Vec<Mutex<HashMap<String, Vec<u8>>>>>; fn new_sharded_db(num_shards: usize) -> ShardedDb { let mut db = Vec::with_capacity(num_shards); for _ in 0..num_shards { db.push(Mutex::new(HashMap::new())); } Arc::new(db) } }
然后呢,找到给定的 key 对应的值就变成了两步操作。首先,key 用来识别它属于哪一个分片。然后,在HashMap中查找 key。
#![allow(unused)] fn main() { let shard = db[hash(key) % db.len()].lock().unwrap(); shard.insert(key, value); }
上面说的简单实现需要使用固定数量的分片,并且一旦创建分片 map,分片的数量就不能更改了。dashmap提供了更复杂的分片哈希图(hash map)的实现。
在调用.await时持有MutexGuard
你可能像这样写代码:
#![allow(unused)] fn main() { use std::sync::{Mutex, MutexGuard}; async fn increment_and_do_stuff(mutex: &Mutex<i32>) { let mut lock: MutexGuard<i32> = mutex.lock().unwrap(); *lock += 1; do_something_async().await; } // 锁在此离开了作用域 }
当你尝试调用此函数时,你会遇到以下错误信息:
error: future cannot be sent between threads safely
--> src/lib.rs:13:5
|
13 | tokio::spawn(async move {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, i32>`
note: future is not `Send` as this value is used across an await
--> src/lib.rs:7:5
|
4 | let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
| -------- has type `std::sync::MutexGuard<'_, i32>` which is not `Send`
...
7 | do_something_async().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^ await occurs here, with `mut lock` maybe used later
8 | }
| - `mut lock` is later dropped here
这是因为std::sync::MutexGuard类型不是Send的。这意味着你不能发送(send)一个互斥锁到另外一个线程,这会报错,原因是 Tokio 运行时可以在任务调用.await时,在线程间移动它。为了避免这种情况,你应该重构代码,在调用.await之间,互斥锁就析构掉。
#![allow(unused)] fn main() { // 这样是正确的! async fn increment_and_do_stuff(mutex: &Mutex<i32>) { { let mut lock: MutexGuard<i32> = mutex.lock().unwrap(); *lock += 1; } // 锁在此离开了作用域 do_something_async().await; } }
注意,这样不行:
#![allow(unused)] fn main() { use std::sync::{Mutex, MutexGuard}; // This fails too. async fn increment_and_do_stuff(mutex: &Mutex<i32>) { let mut lock: MutexGuard<i32> = mutex.lock().unwrap(); *lock += 1; drop(lock); do_something_async().await; } }
这是因为编译器当前只能根据作用域来判断 future 是否是Send的。希望编译器之后能更新,来支持显式 drop,但是现在不行,必须使用作用域。
注意,此处讨论的错误在Spawning 章节的 Send bound 部分也讨论了。
你不该尝试生成不需要Send的任务来规避这个问题,因为如果 Tokio 在.await初挂起你的任务,同时这个任务持有锁,一些其他的任务可能被调度到相同的线程上,然后这些其他任务或许也会尝试锁定这个互斥体(lock that mutex),这可能导致死锁(deadlock),因为等待锁定互斥体的任务将阻止持有互斥锁的任务释放这个互斥锁(releasing the mutex)。
我们将讨论一些如何修复以下错误信息的方法:
重构代码,让它不跨.await持有锁
我们已经在上面代码片段中看到了一个例子,但是我们还有更强大的方法可以做到这一点。例如,你可以把互斥锁包装在结构体中,并且仅在该结构体的非异步方法内来给互斥体上锁。
#![allow(unused)] fn main() { use std::sync::Mutex; struct CanIncrement { mutex: Mutex<i32>, } impl CanIncrement { // This function is not marked async. fn increment(&self) { let mut lock = self.mutex.lock().unwrap(); *lock += 1; } } async fn increment_and_do_stuff(can_incr: &CanIncrement) { can_incr.increment(); do_something_async().await; } }
这种模式可以保证你不会遇到Send错误,因为互斥锁守卫(mutex guard)没有出现在异步函数中的任何位置。
生成一个任务,来管理状态,使用消息传递来操作它
这是本章节开头提到的第二种方法,当在 I/O 资源中共享资源时很常用。参阅下一章节了解更多细节。
使用 Tokio 提供的异步互斥体
也可以用 Tokio 提供的tokio::sync::Mutex类型。Tokio 互斥锁主要功能就是它可以在调用.await时保持,不会出现其他问题。另外提一下,异步互斥体(asynchronous mutex)比普通的互斥体(ordinary mutex)更昂贵(在时间空间上),所以通常最好使用其他两种方法。
#![allow(unused)] fn main() { use tokio::sync::Mutex; // 注意!这里使用了 Tokio mutex // 这可以过编译! // (但是这种情况重构代码可能更好) async fn increment_and_do_stuff(mutex: &Mutex<i32>) { let mut lock = mutex.lock().await; *lock += 1; do_something_async().await; } // 锁在此离开了作用域 }
Channels 管道
现在我们已经了解了一些关于 Tokio 并发的知识,让我们把它应用到客户端。把我们之前写的服务端代码放到独立的二进制文件中。
$ mkdir src/bin
$ mv src/main.rs src/bin/server.rs
并且创建一个新的二进制文件来存放客户端代码:
touch src/bin/client.rs
在这个文件中,我们将会编写本页的代码。每当想要运行它时,你需要先在一个独立的终端窗口启动服务器:
$ cargo run --bin server
然后运行客户端,也是在独立的终端窗口:
$ cargo run --bin client
完成之后,让我们开始 coding!
假设我们要并发地运行两个 Redis 指令。我们可以为每个指令生成一个任务。然后这两个命令将并发处理。
首先,我们可能这样写:
use mini_redis::client; #[tokio::main] async fn main() { // 与服务器建立连接 let mut client = client::connect("127.0.0.1:6379").await.unwrap(); // 产生两个任务,一个get一个key,另一个set一个key let t1 = tokio::spawn(async { let res = client.get("foo").await; }); let t2 = tokio::spawn(async { client.set("foo", "bar".into()).await; }); t1.await.unwrap(); t2.await.unwrap(); }
这个不会编译成功,因为两个任务可能在随机时刻访问client连接。但是Client并没有实现Copy,因此如果没有一些代码来让它共享,就不能编译成功。此外,Client::set参数类型是&mut self的,这意味着调用它需要独占访问权限。我们可以为每个任务单独开一个连接,但是这样并不理想。我们不能用std::sync::Mutex,因为调用.await时需要持有锁。我们可以用tokio::sync::Mutex,但是这只能让我们处理单个请求。如果客户端实现了流水线,异步锁会导致连接的利用率不足。
消息传递 Message passing
解决办法是用消息传递。该模式可以生成一个专用任务来处理client资源。希望发出请求的任务都可以发送一个消息到client任务。然后client任务代表这些发送者发出请求,并把响应返回来给发送者。
用这种方式,建立单个连接。管理client的任务能够获得独占的访问权限,以便于调用get和set。此外,这个管道还可以充当缓冲区。在client任务忙时,也可能有操作被发送到client任务。一旦client任务空闲,可以处理新的请求,它就会从管道中接收下一个请求。这可以带来更好的吞吐量,并且可以扩展支持连接池。
Tokio管道原语
Tokio提供了许多管道,每一种都有不同的用处。
- mpsc:多个生产者,单个消费者管道。可以向管道发送很多值。
- oneshot:单个生产者,单个消费者管道。一次只能发送一个值。
- broadcast:多个生产者,多个消费者管道。可以向管道发送很多值,每个接收者都能看到每个被发送到值。
- watch:多个生产者,多个消费者管道。可以向管道发送很多值,但是没有历史记录。接收者只能看到最新的值。
如果你需要一个多生产者多消费者管道,其中只能有一个消费者看到每一条消息,你可以使用async-channel库。还有一些在异步Rust以外用的库,比如std::sync::mpsc和crossbeam::channel。这些管道通过阻塞当前线程来等待消息,这在异步代码中是不允许的。
在本节课中,我们将会使用mpsc和oneshot。其他类型的消息传递通道将会在后续章节中探讨。本节的完整代码可以在这里找到。
定义消息类型
大多数情况下,当使用消息传递时,接收消息的任务会响应多个指令。在我们的例子中,此任务会对GET和SET指令做出响应。对此建立模型,我们首先定义一个Command枚举,并包含每个指令类型的变体。
#![allow(unused)] fn main() { use bytes::Bytes; #[derive(Debug)] enum Command { Get { key: String, }, Set { key: String, val: Bytes, } } }
创建管道
在main函数中,创建一个mpsc管道。
use tokio::sync::mpsc; #[tokio::main] async fn main() { // 创建一个容量为32的新管道 let (tx, mut rx) = mpsc::channel(32); // ... 其他代码一会儿在这儿写 }
mpsc管道是用来**发送(send)**指令到管理redis连接的任务的。多生产者能力可以允许消息从很多任务发送过来。创建通道会返回两个值,一个发送者(sender)和一个接收者(receiver)。这两个句柄(handle)可以分开使用。它们可以被移动(moved)到不同的任务中。
创建的管道容量是32。如果发送消息的速度比接收快,管道就会存储它们。一旦32条消息都被存储到管道里,调用send(...).await就会进入休眠状态(go to sleep),直到接收者移除了(处理了)一条消息。
从多个任务发送是由克隆发送者来完成的。例如:
use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel(32); let tx2 = tx.clone(); tokio::spawn(async move { tx.send("sending from first handle").await.unwrap(); }); tokio::spawn(async move { tx2.send("sending from second handle").await.unwrap(); }); while let Some(message) = rx.recv().await { println!("GOT = {}", message); } }
两条消息都被发送到单个接收者句柄。无法克隆mpsc管道的接收者。
当每个发送者都超出生命周期或其他原因被drop了,就不再能向管道中发送更多消息了。此时,接收者调用recv将返回None,这意味着所有的发送者都消失了,管道关闭了。
在我们的管理Redis连接任务的例子中,它知道一旦管道关闭了,Redis连接就可以关了,因为这个连接再也用不到了。
生成管理任务
接下来,生成一个任务处理来自管道中的消息。首先,客户端与Redis建立连接。然后,接收到的指令经由Redis连接发出。
#![allow(unused)] fn main() { use mini_redis::client; // `move` 关键字用来 **移动** `rx` 的所有权到任务中。 let manager = tokio::spawn(async move { // 与服务端建立连接 let mut client = client::connect("127.0.0.1:6379").await.unwrap(); // 开始接收消息 while let Some(cmd) = rx.recv().await { use Command::*; match cmd { Get { key } => { client.get(&key).await; } Set { key, val } => { client.set(&key, val).await; } } } }); }
现在,更新两个任务的代码来通过管道发送指令,而不是直接通过Redis连接发送。
#![allow(unused)] fn main() { // `Sender` 句柄被移动到任务里. 因为这儿有俩任务, // 我们需要另一个 `Sender` let tx2 = tx.clone(); // 生成俩任务,一个get一个key,一个set一个key let t1 = tokio::spawn(async move { let cmd = Command::Get { key: "foo".to_string(), }; tx.send(cmd).await.unwrap(); }); let t2 = tokio::spawn(async move { let cmd = Command::Set { key: "foo".to_string(), val: "bar".into(), }; tx2.send(cmd).await.unwrap(); }); }
在main函数底部,我们调用.await来等待连接句柄(join handles),以确保在进程退出之前全部完成这些指令。
#![allow(unused)] fn main() { t1.await.unwrap(); t2.await.unwrap(); manager.await.unwrap(); }
接收响应
最后一步,是接收从管理任务中返回的响应。GET指令需要获取到值,SET指令需要知道这个操作是否成功完成。
为了传递响应,使用oneshot管道。oneshot管道是单个生产者,单个消费者管道,针对发送单个值进行了优化。在我们的例子中,单个值就是响应。
与mpsc类似,oneshot::channel()返回一个发送者和一个接收者句柄。
#![allow(unused)] fn main() { use tokio::sync::oneshot; let (tx, rx) = oneshot::channel(); }
与mpsc不同的地方在于没有指定容量,因为容量就是一。此外,两个句柄都不能克隆。
为了接收从管理任务传过来的响应,在发送指令之前,需要建立一个oneshot管道。管道中的发送者这块被包含在了管理任务的指令中。接收者那块用来接收响应。
首先,更新Command枚举来包含发送者。为了方便,给发送者起个别名。
#![allow(unused)] fn main() { use tokio::sync::oneshot; use bytes::Bytes; /// 多个不同的命令在单个管道上复用 #[derive(Debug)] enum Command { Get { key: String, resp: Responder<Option<Bytes>>, }, Set { key: String, val: Bytes, resp: Responder<()>, }, } /// 由请求者提供,并由管理任务来发送指令的响应,返回给请求者 type Responder<T> = oneshot::Sender<mini_redis::Result<T>>; }
现在,更新发出命令的任务代码,来包含oneshot::Sender。
#![allow(unused)] fn main() { let t1 = tokio::spawn(async move { let (resp_tx, resp_rx) = oneshot::channel(); let cmd = Command::Get { key: "foo".to_string(), resp: resp_tx, }; // 发送GET请求 tx.send(cmd).await.unwrap(); // 等待响应 let res = resp_rx.await; println!("GOT = {:?}", res); }); let t2 = tokio::spawn(async move { let (resp_tx, resp_rx) = oneshot::channel(); let cmd = Command::Set { key: "foo".to_string(), val: "bar".into(), resp: resp_tx, }; // 发送SET请求 tx2.send(cmd).await.unwrap(); // 等待响应 let res = resp_rx.await; println!("GOT = {:?}", res); }); }
最后,更新管理任务,来通过oneshot管道发送响应。
#![allow(unused)] fn main() { while let Some(cmd) = rx.recv().await { match cmd { Command::Get { key, resp } => { let res = client.get(&key).await; // 使用 `_` 忽略错误 let _ = resp.send(res); } Command::Set { key, val, resp } => { let res = client.set(&key, val).await; // 使用 `_` 忽略错误 let _ = resp.send(res); } } } }
在onsshot::Sender上调用send会立即完成,并且不再需要.await。这是这是因为oneshot管道的send方法会总是会立即失败或成功,不需要等待。
当接收者那部分被drop了,在oneshot通道上发送值就会返回Err。这意味着接收者不再对响应作出反应。在我们的场景中,接收者对接收事件不再响应是合理的。resp.send()返回的Err不需要处理。
你可以在这儿找到完整代码。
对消息通道进行限制
每当引入并发或排队时,确保排队是有界的并且系统能够负载得起的非常重要。无界队列最终将会填满所有可用内容,并导致系统以不可预测的方式崩掉。
Tokyo会关注避免隐式排队。很大异步原因就是因为异步操作是惰性的(lazy)。考虑以下情况:
#![allow(unused)] fn main() { loop { async_op(); } }
如果异步操作都马上开始运行,那么这个循环会重复排队一个新的async_op任务,而不会确保之前的任务操作完成。这会导致隐式无界排队。基于回调的系统和基于eager future的系统很容易受到这种情况影响。
然而,Tokio和异步Rust不会让上述代码段中async_op运行。这是因为.await没有被调用。如果代码块使用了.await,那么循环在重新开始时会等待上一次操作完成。
#![allow(unused)] fn main() { loop { // 在 `async_op` 完成之前不会重复 async_op().await; } }
必须明确地引入并发和队列。执行此操作的方法有:
tokio::spawnselect!join!mpsc::channel
这样做时,注意确保总的并发量是有限的。例如,当写TCP接收循环时,确保总的打开的套接字(socket)是有限的。当使用mpsc::channel管道,选择管道的容量。具体的限定值取决于是什么应用。
小心地选择良好的边界是编写可靠Tokio应用的重要组成部分。
I/O
Tokio 中的 I/O 操作,与std中差不多,但是是异步的。又一个用来读的 trait AsyncRead 和一个用来写的 trait AsyncWrite。这些类型都实现了上述 trait(TcpStream, File, Stdout)。AsyncRead和AsyncWrite也为一些数据结构实现了,比如Vec<u8>和&[u8]。这就可以让在读写的时候使用字节数组。
本章节将展示几个例子介绍通过 Tokio 进行 I/O 读写。下一章会介绍更高级的 I/O 示例。
AsyncRead和AsyncWrite
这俩 trait 都提供了异步读写比特流的方法。这些 trait 上的方法通常不能直接调用,就像你不能直接从Futuretrait 中手动调用poll方法一样。但是,你仍然可以使用AsyncReadExt和AsyncWriteExt提供的实用方法来使用他们。
让我们简要看一下这些方法。这些函数都是异步的并且他们必须与.await搭配使用。
async fn read()
AsyncReadExt::read()提供了读取数据到缓冲区的异步方法,返回读取的字节数。
**注意:**当read()返回了Ok(0),这标志着流已经关闭了。再对read()调用都会立刻返回Ok(0)。对于TcpStream来说,这表示对套接字的读取部分已经关闭。
use tokio::fs::File; use tokio::io::{self, AsyncReadExt}; #[tokio::main] async fn main() -> io::Result<()> { let mut f = File::open("foo.txt").await?; let mut buffer = [0; 10]; // 最多读取10字节 let n = f.read(&mut buffer[..]).await?; println!("The bytes: {:?}", &buffer[..n]); Ok(()) }
async fn read_to_end()
AsyncReadExt::read_to_end读取流中所有的字节,直到 EOF。
use tokio::io::{self, AsyncReadExt}; use tokio::fs::File; #[tokio::main] async fn main() -> io::Result<()> { let mut f = File::open("foo.txt").await?; let mut buffer = Vec::new(); // 读取整个文件 f.read_to_end(&mut buffer).await?; Ok(()) }
async fn write()
AsyncWriteExt::write将缓冲区写入 writer,返回写入的字节数。
use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; #[tokio::main] async fn main() -> io::Result<()> { let mut file = File::create("foo.txt").await?; // 写入了这个字节字符串的前缀一部分,不是所有的都必须写入 let n = file.write(b"some bytes").await?; println!("Wrote the first {} bytes of 'some bytes'.", n); Ok(()) }
async fn write_all()
AsyncWriteExt::write_all将缓冲区所有字节都写入 writer。
use tokio::io::{self, AsyncWriteExt}; use tokio::fs::File; #[tokio::main] async fn main() -> io::Result<()> { let mut file = File::create("foo.txt").await?; file.write_all(b"some bytes").await?; Ok(()) }
这两个特征还有其他很多有用的方法。查看 API 文档以获取完整列表。
辅助函数 (Helper functions)
另外,就像std一样,tokio::io也提供了很多实用的辅助函数,比如处理standard input,standard output和standard error的 API。例如,tokio::io::copy可以异步地复制 reader 中的全部内容到 writer 中。
use tokio::fs::File; use tokio::io; #[tokio::main] async fn main() -> io::Result<()> { let mut reader: &[u8] = b"hello"; let mut file = File::create("foo.txt").await?; io::copy(&mut reader, &mut file).await?; Ok(()) }
注意,这里的字节数组也实现了AsyncRead。
回声服务器(Echo server)
让我们联系一下使用异步 I/O。我们将编写一个回声服务器。
回声服务器会绑定一个TcpListener并循环接收入站连接。对每个入站连接,从套接字中读取数据,然后立即写回到套接字中。客户端向服务端发送数据,并接收返回的相同数据。
我们将使用不同方式来实现回声服务器两次。
使用io::copy()
首先,我们将用io::copy()工具来实现回声逻辑。
你可以在一个新二进制文件中写代码:
$ touch src/bin/echo-server-copy.rs
然后你可以这样运行它(或仅仅是检查编译是否成功):
$ cargo run --bin echo-server-copy
你可以用标准命令行工具(比如 telnet)来测试这个服务端,或者写一个简单的客户端(比如tokio::net::TcpStream文档中的客户端)。
下面代码是一个 TCP 服务端,并有一个接收循环体。每当传入一个套接字,都会生成一个新任务。
use tokio::io; use tokio::net::TcpListener; #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:6142").await?; loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { // 在这里复制数据。 }); } }
刚刚说过,实用函数会需要一个 reader 和一个 writer,然后把数据从一个复制到另一个。然而,我们只有一个TcpStream,它同时实现了AsyncRead和AsyncWrite。因为io::copy对于 reader 和 writer 都需要&mut,所以套接字不能同时应用于两个参数。
#![allow(unused)] fn main() { // 这不能编译 io::copy(&mut socket, &mut socket).await }
把 reader + writer 分开
为了解决此问题,我们需要把套接字拆分为 reader 句柄和 writer 句柄。拆分 reader/writer 组合最好的方法取决于具体的类型。
任何 reader + writer 类型都需要使用io::split工具拆分。这个函数传入单个值,并返回拆分后的 reader 和 writer。这样两个句柄就可以单独使用,也可在单独的任务中使用。
例如,回声客户端可以这样并发处理读写:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; #[tokio::main] async fn main() -> io::Result<()> { let socket = TcpStream::connect("127.0.0.1:6142").await?; let (mut rd, mut wr) = io::split(socket); // 后台写入数据 tokio::spawn(async move { wr.write_all(b"hello\r\n").await?; wr.write_all(b"world\r\n").await?; // 有时,Rust类型推断需要点小帮助 Ok::<_, io::Error>(()) }); let mut buf = vec![0; 128]; loop { let n = rd.read(&mut buf).await?; if n == 0 { break; } println!("GOT {:?}", &buf[..n]); } Ok(()) }
由于io::split支持任何实现了AsyncRead + AsyncWrite的类型,并返回独立的句柄,这会导致在io::split内部使用Arc和Mutex。使用TcpSteam可以避免这种开销。TcpSteam提供了两个专门的分割函数。
TcpSteam::split获取了对流的引用,并返回一个 reader 和 writer 句柄。因为使用了引用,两个句柄都必须保持在和调用split()同样的任务上。这种特殊的分割是零成本的。因为不需要Arc或Mutex。TcpStream也提供了into_split,来支持可以在任务间移动的句柄,代价仅仅是一个Arc。
因为是在拥有TcpStream同一个任务上调用io::copy(),所以我们可以使用TcpStream::split。服务端中处理回声逻辑的任务应该这样写:
#![allow(unused)] fn main() { tokio::spawn(async move { let (mut rd, mut wr) = socket.split(); if io::copy(&mut rd, &mut wr).await.is_err() { eprintln!("failed to copy"); } }); }
在这儿可以找到完整代码。
手动拷贝
现在,让我们看看如何手动拷贝数据来编写回声服务器。为此,我们使用AsyncReadExt::read和AsyncWriteExt::write_all。
完整的回声服务器代码:
use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; #[tokio::main] async fn main() -> io::Result<()> { let listener = TcpListener::bind("127.0.0.1:6142").await?; loop { let (mut socket, _) = listener.accept().await?; tokio::spawn(async move { let mut buf = vec![0; 1024]; loop { match socket.read(&mut buf).await { // 返回值为 `Ok(0)` 的话说明远程关闭了 Ok(0) => return, Ok(n) => { // 拷贝数据返回到套接字中 if socket.write_all(&buf[..n]).await.is_err() { // 意料之外的套接字错误,我们不能为此干什么, // 只能把服务先关了 return; } } Err(_) => { // 意料之外的套接字错误,我们不能为此干什么, // 只能把服务先关了 return; } } } }); } }
(你可以把这些代码放到src/bin/echo-server.rs里,然后通过cargo run --bin echo-server来启动它)
让我们分析一下代码。首先,由于使用了AsyncRead和AsyncWrite,所以必须 use 一下。
#![allow(unused)] fn main() { use tokio::io::{self, AsyncReadExt, AsyncWriteExt}; }
分配缓冲区
这是为了从套接字中读取一些数据到缓冲区,然后将缓冲区的内容写回到套接字中。
#![allow(unused)] fn main() { let mut buf = vec![0; 1024]; }
避免使用在栈上的缓冲区。回想一下,注意到跨.await调用的任务数据都需要由任务来存储。这种情况下,buf是用来跨.await调用使用的。所有任务数据都必须存储在单个分配中。你可以把它看成一个枚举,其中每个枚举值都是特定调用.await时需要存储的数据。
如果缓冲区在栈上,每个传入的套接字生成的任务内部结构可能类似于:
#![allow(unused)] fn main() { struct Task { // 任务内部成员 task: enum { AwaitingRead { socket: TcpStream, buf: [BufferType], }, AwaitingWriteAll { socket: TcpStream, buf: [BufferType], } } } }
如果缓冲区在栈上,那么它将内连在任务结构体中。这会导致任务结构非常庞大。进一步说,缓冲区大小通常是页那么大。反过来,会导致任务大小很尴尬:$page-size + a-few-bytes。
相对于基本枚举来说,编译器对此也优化了异步块的布局。实际上,变量不会像枚举那样在变量变体之间移动。然而,任务结构体的大小至少与最大变量一样大。
正因如此,通常更高效的方法就是为缓冲区在堆上分配内存。
处理 EOF
当 TCP 流的读取部分关闭,调用read()会返回Ok(0)。此时退出循环非常重要。忘记在读到 EOF 时退出循环是常见的错误。
#![allow(unused)] fn main() { loop { match socket.read(&mut buf).await { // 返回了 `Ok(0)` 意味着远程连接关闭了 Ok(0) => return, // ... 这里处理其他情况 } } }
忘记从读循环中退出,通常会导致 100% CPU 无限循环。套接字关闭,socket.read()会立刻返回。然后就死循环了。
这儿可以找到完整代码。
Framing 解析帧
现在我们将为 Mini-Redis 框架层实现我们刚刚学过的 I/O 知识。获取字节流,并把它转换为帧流的过程叫解析帧。一帧就是两个对等点(peer)之间的传输单元。Redis 协议帧定义如下:
#![allow(unused)] fn main() { use bytes::Bytes; enum Frame { Simple(String), Error(String), Integer(u64), Bulk(Bytes), Null, Array(Vec<Frame>), } }
注意观察该帧仅由数据组成,没有任何语义。指令的解析和执行发生在更高的层次。
对于 HTTP 来说,一帧可能长这样:
#![allow(unused)] fn main() { enum HttpFrame { RequestHead { method: Method, uri: Uri, version: Version, headers: HeaderMap, }, ResponseHead { status: StatusCode, version: Version, headers: HeaderMap, }, BodyChunk { chunk: Bytes, }, } }
为了给 Mini-Redis 实现数据帧,我们将先实现Connection结构体,包含了TcpStream和读写mini_redis::Frame的值。
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Frame, Result}; struct Connection { stream: TcpStream, // ... 其他成员变量 } impl Connection { /// 从连接中读取一帧 /// /// 遇到 EOF 返回 `None` pub async fn read_frame(&mut self) -> Result<Option<Frame>> { // 在这里实现 } /// 向连接中写入一帧 pub async fn write_frame(&mut self, frame: &Frame) -> Result<()> { // 在这里实现 } } }
你可以在这儿找到有关 Redis 有线协议的详情。完整的Connection代码在这儿。
读取缓冲区
read_frame方法在返回前等待接收整个数据帧。对TcpStream::read()单次调用可能返回任意数量的数据。它可能有一整个帧,一部分帧或者多个帧。如果只接收到一部分帧,则传到缓冲区,再从套接字读取更多数据。如果接收到多个帧,则返回第一帧,其他数据存到缓冲区,直到下次调用read_frame。
如果还没创建connection.rs,这样创建:
touch src/connection.rs
为了实现这种功能,连接(Connection)需要一个读缓冲区。从套接字读取到数据会存到读缓冲区。当解析了一帧时,缓冲区中相应的数据就会删除。
我们将使用BytesMut来作为缓冲区类型,它是Bytes的可变版本。
#![allow(unused)] fn main() { use bytes::BytesMut; use tokio::net::TcpStream; pub struct Connection { stream: TcpStream, buffer: BytesMut, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream, // 为缓冲区开辟4kb的容量 buffer: BytesMut::with_capacity(4096), } } } }
接下来,我们实现read_frame()方法。
#![allow(unused)] fn main() { use tokio::io::AsyncReadExt; use bytes::Buf; use mini_redis::Result; pub async fn read_frame(&mut self) -> Result<Option<Frame>> { loop { // 尝试从缓冲区解析一个数据帧。如果缓冲区中有足够的数据, // 就返回数据帧 if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); } // 缓冲区中没有足够的数据组成一帧。 // 尝试从套接字中读更多数据。 // // 如果成功,返回字节的数量。 // 返回 `0` 表示 “读到了流的末尾” if 0 == self.stream.read_buf(&mut self.buffer).await? { // 远程连接关闭。对这情况是一种干净的关闭,在读缓冲区中应该 // 没有数据了。如果还有数据,意味着传输一帧的同时,对等点peer // 关闭了套接字。 if self.buffer.is_empty() { return Ok(None); } else { return Err("connection reset by peer".into()); } } } } }
分析一下代码。read_frame方法在循环体中运行。首先,调用self.parse_frame()。这会尝试从self.buffer中解析一个 Redis 帧。如果有足够的数据可以解析成一帧,就把该帧返回。否则,尝试从套接字中读取更多数据到缓冲区中。读取到更多数据之后,parse_frame()再次被调用。这回,如果接收到足够的数据,解析或许就会成功。
当从流中读取时,返回了0表示我们不会从对方接收到更多数据。如果缓冲区中仍有数据,说明是接收到了部分帧,但是连接突然终止了。这是一个错误情况需要返回Err。
Buf特征
从流中读取时,调用了read_buf,这个读取函数采用了实现bytescrate 中BufMut的值。
首先,考虑使用read()实现相同的读取循环。Vec<u8>可以用来替代BytesMut。
#![allow(unused)] fn main() { use tokio::net::TcpStream; pub struct Connection { stream: TcpStream, buffer: Vec<u8>, cursor: usize, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream, // 为缓冲区开辟4kb的容量 buffer: vec![0; 4096], cursor: 0, } } } }
为Connection实现read_frame():
#![allow(unused)] fn main() { use mini_redis::{Frame, Result}; pub async fn read_frame(&mut self) -> Result<Option<Frame>> { loop { if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); } // 确保buffer有足够容量 if self.buffer.len() == self.cursor { // 给buffer扩容 self.buffer.resize(self.cursor * 2, 0); } // 从流中读取到buffer中,记录读了多少字节 let n = self.stream.read( &mut self.buffer[self.cursor..]).await?; if 0 == n { if self.cursor == 0 { return Ok(None); } else { return Err("connection reset by peer".into()); } } else { // 更新指针 self.cursor += n; } } } }
当用字节数组读时,我们必须维护一个指针,跟踪已使用的缓冲区数量。必须保证把缓冲区的空部分传给read()。否则,我们会覆盖已经缓冲了的数据。如果缓冲区满了,必须扩容缓冲区才能继续读取。在parse_frame()中(不包括在内),我们需要解析包含在self.buffer[..self.cursor]中的数据。
由于将字节数组和指针搭配使用非常常见,bytecrate 提供了表示字节数组和指针的抽象。Buf特征可以被读取数据的类型实现。BufMut特征可以被写入数据的类型实现。当把T: BufMut传递给read_buf()时,缓冲区的内部指针就会由read_buf自动更新。正因如此,在我们之前写的read_frame中,我们不需要管理自己的指针。
解析
现在,让我们实现parse_frame()函数。解析由两步组成:
- 确保缓冲区中有一个完整的帧,并找到帧的末尾索引。
- 解析帧。
mini-redis crate 为每一步都提供了对应的函数:
我们将会再用Buf抽象来获取帮助。Buf被传递到Frame::check中。check会遍历缓冲区,内部指针将会前进。当check函数返回时,缓冲区内部指针指向帧的末尾。
对于Buf类型,我们使用std::io::Cursor<&[u8]>
#![allow(unused)] fn main() { use mini_redis::{Frame, Result}; use mini_redis::frame::Error::Incomplete; use bytes::Buf; use std::io::Cursor; fn parse_frame(&mut self) -> Result<Option<Frame>> { // 创建 `T: Buf` 类型. let mut buf = Cursor::new(&self.buffer[..]); // 检查是否已有一个完整的帧 match Frame::check(&mut buf) { Ok(_) => { // 获取帧的字节长度 let len = buf.position() as usize; // 重置内部指针位置,因为调用了 `parse` buf.set_position(0); // 解析帧 let frame = Frame::parse(&mut buf)?; // 丢弃缓冲区中的帧 self.buffer.advance(len); // 返回解析后的帧 Ok(Some(frame)) } // 缓冲区中没有足够数据 Err(Incomplete) => Ok(None), // 遇到了错误 Err(e) => Err(e.into()), } } }
完整的Frame::check函数可以在这儿找到。这里不会完整介绍它。
需要注意的是,相关事项使用了Buf的“字节迭代器”风格 API。它们获取数据并移动内部指针。例如,解析一帧,第一个字节被检查来确定类型。使用了Buf::get_u8函数。这会获取到当前指针位置的字节并让指针前进一次。
Buf特征也有很多其他实用方法。查看API 文档获取更多细节。
缓冲写入(Buffered writes)
解析帧的另一部分就是write_frame(frame)函数。这个函数把整个帧写入套接字。为了最小地调用write,我们使用缓冲写入。维护一个写入缓冲区,帧在写入套接字之前需要先编码到这个缓冲区。然而,不像read_frame(),整个帧并不总是缓存到字节数组中。
考虑到一些流中的帧。正在写入的值是Frame::Bulk(Bytes)。这些帧线性排列,有一个帧头,它由$符后跟了整个数据长度那么多的字节。帧大部分内容都是Bytes值。如果数据很大,把它复制到缓冲区的成本很高。
为了实现缓冲写入,我们需要BufWriter结构体。该结构体使用T: AsyncWrite初始化并实现了AsyncWrite。当在BufWriter上调用write时,写入不会直接写入writer,而是写入缓冲区。当缓冲区满了,内容将会被刷新到writer,然后缓冲区清空。还有一些特殊的优化用来绕过缓冲区,在特定的情况下。
在本教程,我们不会实现完整的write_frame()。完整实现在这儿。
首先,更新Connection结构体:
#![allow(unused)] fn main() { use tokio::io::BufWriter; use tokio::net::TcpStream; use bytes::BytesMut; pub struct Connection { stream: BufWriter<TcpStream>, buffer: BytesMut, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream: BufWriter::new(stream), buffer: BytesMut::with_capacity(4096), } } } }
接下来,实现write_frame():
#![allow(unused)] fn main() { use tokio::io::{self, AsyncWriteExt}; use mini_redis::Frame; async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> { match frame { Frame::Simple(val) => { self.stream.write_u8(b'+').await?; self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Error(val) => { self.stream.write_u8(b'-').await?; self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Integer(val) => { self.stream.write_u8(b':').await?; self.write_decimal(*val).await?; } Frame::Null => { self.stream.write_all(b"$-1\r\n").await?; } Frame::Bulk(val) => { let len = val.len(); self.stream.write_u8(b'$').await?; self.write_decimal(len as u64).await?; self.stream.write_all(val).await?; self.stream.write_all(b"\r\n").await?; } Frame::Array(_val) => unimplemented!(), } self.stream.flush().await; Ok(()) } }
这里使用的函数由AsyncWriteExt提供。它们也能用在TcpStream,但是不建议在没有中间缓冲区的情况下处理单个字节。
write_u8向writer写入一个字节。write_all向writer写入全部。write_decimal是mini-redis实现的方法。
该函数以调用self.stream.flush().await结束。因为BufWriter向中间缓冲区存储了需要写入的内容,所以调用write不能保证数据被写入套接字。Return之前,我们想要帧被写入到套接字中。调用fluse()会将在缓冲区挂起的数据写入到套接字中。
另一种选择是不在write_frame()中调用flush()。取而代之的是,在Connection中提供flush()函数。这将允许调用者写入多个小帧到写缓冲区里,然后通过write系统调用,把他们写入到套接字中。这样会导致ConnectionAPI变得复杂。所以我们决定在fn write_frame()中调用fluse().await。
Async in depth 深入异步
至此,我们已经基本浏览了异步 Rust 和 Tokio。现在我们将深挖 Rust 的异步运行时模型。在本教程的一开始,我们就指出异步 Rust 采用了独特的方式。现在将解释这是什么意思。
Futures 未来对象/期货
快速看一下这个基本的异步函数。与本教程已经涵盖的内容相比,这不是什么新鲜的。
#![allow(unused)] fn main() { use tokio::net::TcpStream; async fn my_async_fn() { println!("hello from async"); let _socket = TcpStream::connect("127.0.0.1:3000").await.unwrap(); println!("async TCP operation complete"); } }
我们调用该函数,并返回一个值。对这个值调用.await。
#[tokio::main] async fn main() { let what_is_this = my_async_fn(); // 到这行之前,什么也没打印。 what_is_this.await; // 打印出了文字,与套接字建立连接,关闭连接。 }
my_async_fn()返回了一个 future。Future 是实现了标准库中std::future::Futuretrait 的值。它们是包含了正在异步计算的值。
std::future::Futuretrait 的定义是:
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Future { type Output; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>; } }
关联类型Output是 future 一旦完成后产生的类型。Pin类型是 Rust 能够支持在async函数中借用(borrow)的方式。查看标准库文档了解细节。
与其他语言实现 future 的方式不同,Rust 的 future 不代表正在后台发生的计算,而是 Rust future就是计算本身。Future 的拥有者负责轮询(polling)future 来推进计算过程。这是通过调用Future::poll来完成的。
实现Future
让我们来实现一个简单的 future。这个 future 将会:
- 等到特定时刻。
- 向 STDOUT 输出一些文字。
- 产生一个字符串。
use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; struct Delay { when: Instant, } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // 现在忽略这行。 cx.waker().wake_by_ref(); Poll::Pending } } } #[tokio::main] async fn main() { let when = Instant::now() + Duration::from_millis(10); let future = Delay { when }; let out = future.await; assert_eq!(out, "done"); }
异步函数作为 Future
在 main 函数中,我们实例化了 future,并在其上调用.await。异步函数中,我们可以对任何实现了Future的值调用.await。确实,调用异步函数本就会返回一个实现了Future的匿名类型。async fn main()其实就会大致生成下面这样:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; enum MainFuture { // 初始化,永不轮询 State0, // 等待 `Delay`,换句话说,就是 `future.await` 那行。 State1(Delay), // future完成了。 Terminated, } impl Future for MainFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { use MainFuture::*; loop { match *self { State0 => { let when = Instant::now() + Duration::from_millis(10); let future = Delay { when }; *self = State1(future); } State1(ref mut my_future) => { match Pin::new(my_future).poll(cx) { Poll::Ready(out) => { assert_eq!(out, "done"); *self = Terminated; return Poll::Ready(()); } Poll::Pending => { return Poll::Pending; } } } Terminated => { panic!("future polled after completion") } } } } } }
Rust future 是个状态机。这里,MainFuture枚举表示未来可能发生的状态。Future 开始于State0状态。当调用poll时,future 会尽可能尝试推进其内部状态。如果 future 能够完成,返回Poll:Ready,其中包含着该异步计算的输出。
如果 future 无法完成,通常是由于它正在等待未准备好的资源,则返回Poll::Pending。接收到Poll::Pending后,向调用者表明 future 会在稍后完成,调用者应该稍后再调用poll。
我们也发现 future 是由其他 future 组成的。在其他 future 上调用poll会导致调用内部 future 的poll方法。
执行器 Executors
异步 Rust 函数返回 future。Future 必须调用poll来推进它们的状态。Future 都是由 future 组成的。那么问题是,谁来对最深层的 future 调用poll呢?
回想刚才的内容,要运行异步函数,不是传递给tokio::spawn,就是在 main 函数上加上#[tokio::main]注解。这可以让外部生成的 future 提交给 Tokio 的执行器。执行器负责在外部 future 上调用Future::poll,来驱动着异步计算完成。
Mini Tokio
为了更好理解他们是怎么组合到一起的,让我们实现我们自己的最小化的 Tokio 版本!完整代码可以在这儿找到。
use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use futures::task; fn main() { let mut mini_tokio = MiniTokio::new(); mini_tokio.spawn(async { let when = Instant::now() + Duration::from_millis(10); let future = Delay { when }; let out = future.await; assert_eq!(out, "done"); }); mini_tokio.run(); } struct MiniTokio { tasks: VecDeque<Task>, } type Task = Pin<Box<dyn Future<Output = ()> + Send>>; impl MiniTokio { fn new() -> MiniTokio { MiniTokio { tasks: VecDeque::new(), } } /// 在 mini-tokio 实例上生成一个future fn spawn<F>(&mut self, future: F) where F: Future<Output = ()> + Send + 'static, { self.tasks.push_back(Box::pin(future)); } fn run(&mut self) { let waker = task::noop_waker(); let mut cx = Context::from_waker(&waker); while let Some(mut task) = self.tasks.pop_front() { if task.as_mut().poll(&mut cx).is_pending() { self.tasks.push_back(task); } } } }
这可以运行异步块。创建了一个Delay实例用于等待所需时间。然而,我们的实现存在一个重大缺陷。我们的执行器永远不会休眠。执行器持续不断地用循环轮询所有的生成的 future。但大多数时候,future 还没准备好做更多工作,然后又返回了Poll::Pending。这个过程会消耗 CPU 周期,降低效率。
理想情况下,我们想要 mini-tokio 只在 future 有进展的时候来轮询一下 future。当任务需要的被阻塞的资源,转变为可以为请求使用时,就应该轮询一下。比如任务想从 TCP 套接字中读取数据,我们只想让它在 TCP 套接字接收到数据时轮询任务。在上述代码中,任务在给定Instant(时刻)之前被阻塞。理想状况下,mini-tokio 应该只在这个时刻后来轮询任务。
为了实现这一点,当资源被轮询时,发现资源并没有准备好。一旦资源处于就绪状态,应该发送一个提醒。
唤醒者 Wakers
我们之前缺失了 wakers。这是一个当资源准备好继续某些操作时,来通知正在等待的任务的系统。
让我们再看一下Future::poll的定义:
#![allow(unused)] fn main() { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>; }
poll的Context参数中有一个waker()方法。该方法返回一个绑定到当前任务的Waker。这个Waker有一个wake()方法。调用这个方法会向执行器发出信号,应该安排执行相关的任务。当资源转变为就绪时,调用wake()来通知执行器轮询这个任务,来推进整个过程。
更新Delay的实现
我们可以使用 wakers 来更新Delay:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use std::thread; struct Delay { when: Instant, } impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // 从当前任务获取一个waker的句柄 let waker = cx.waker().clone(); let when = self.when; // 生成一个定时器线程 thread::spawn(move || { let now = Instant::now(); if now < when { thread::sleep(when - now); } waker.wake(); }); Poll::Pending } } } }
现在,经过了请求的时间,调用任务就会被通知,然后执行器可以确保该任务再次被调用。下一步就是更新 mini-tokio,来坚挺唤醒通知(wake notifications)。
这里我们的Delay实现还是有点问题。我们一会儿再修复。
warning 当一个 future 返回
Poll::Pending时,它必须保证 waker 之后某时可以正常被调用。如果忘了这样做,会导致任务被无限期挂起。
返回Poll::Pending后,忘记唤醒任务是常见的 bug。
回想一下我们第一次写的Delay。这是 future 的实现:
#![allow(unused)] fn main() { impl Future for Delay { type Output = &'static str; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> { if Instant::now() >= self.when { println!("Hello world"); Poll::Ready("done") } else { // Ignore this line for now. cx.waker().wake_by_ref(); Poll::Pending } } } }
在返回Poll::Pending之前,我们调用了cx.waker().wake_by_ref()。这是为了满足 future 的定义。通过返回Poll::Pending,我们可以向 waker 发出信号。因为我们还没实现定时器线程,所以我们内联地向 waker 发出信号。这样做会让 future 立刻被重新安排执行,但是这个 future 很可能此时并没有准备好去完成。
注意你可以让不必要地向 waker 发出更多信号。在本例中,即使我们还没准备好继续操作,我们也会向 waker 发出信号。除了浪费 CPU 周期之外没毛病。但是,这种特定的实现会导致忙循环(busy loop)。
更新 Mini Tokio 代码
接下来是更新 Mini Tokio 来接收 waker 通知。我们只想让任务被唤醒时来执行任务,为此,Mini Tokio 将提供自己的 waker。当调用 waker,关联的任务就会被执行。Mini-Tokio 对 future 轮询时,会把 waker 传递给 future。
更新后的 Mini Tokio 会使用管道来存储计划执行的任务队列。管道可以让任务排队执行在任何线程上。Waker 必须实现了Send和Sync。
info
Send和Sync是 Rust 提供关于并发的 trait。可以发送到不同线程的类型是Send的。大多数类型都是Send的,但是像Rc这样的就不是。可以通过不可变引用并发访问的类型是Sync的。类型可以是Send但不是Sync——一个很好的例子是Cell,它可以通过不可变引用进行修改,这在并发访问是不安全的。
了解更多细节,可以看Rust book 中这一章节。
更新 MiniTokio 结构体。
#![allow(unused)] fn main() { use std::sync::mpsc; use std::sync::Arc; struct MiniTokio { scheduled: mpsc::Receiver<Arc<Task>>, sender: mpsc::Sender<Arc<Task>>, } struct Task { // 这儿一会儿再填。 } }
Waker 是Sync的,并且可以被克隆。当调用wake时,任务必须被安排执行。为了实现,我们需要有个管道。当我们调用wake()时,任务被发送到管道。我们的Task结构体将实现唤醒逻辑。为此,它需要包含生成的 future 和管道的发送部分。我们把Poll枚举和 future 都放在TaskFuture结构体中,以跟踪最新的Future::poll()结果,这是处理虚假唤醒(spurious wake-ups)所须的。更多细节在TaskFuture的poll()中实现。
#![allow(unused)] fn main() { use std::sync::{Arc, Mutex}; /// future持有一个结构,里面有最后一次调用 `poll` 的结果。 struct TaskFuture { future: Pin<Box<dyn Future<Output = ()> + Send>>, poll: Poll<()>, } struct Task { // `Mutex` 让 `Task` 实现了 `Sync`。 // 在任何给定的时刻只有一个线程可以访问 `task_future`。 // `Mutex` 不需要在这里有正确性。真正的Tokio // 在这里没使用锁,但真正的Tokio有非常多行代码, // 放在一篇教程里面写不下。 task_future: Mutex<TaskFuture>, executor: mpsc::Sender<Arc<Task>>, } impl Task { fn schedule(self: &Arc<Self>) { self.executor.send(self.clone()); } } }
为了安排任务,Arc被克隆并通过管道发送。现在,我们需要把调度函数和std::task::Waker挂钩(hook)。标准库提供了一个低等级 API,使用manual vtable construction(手动虚表结构)。这种策略为实现者提供了最大化的灵活性,但是需要一堆 unsafe 的样板代码。我们将使用futurescrate 提供的ArcWake工具,而不是直接使用RawVakerVTable。这使得我们可以实现一个简单的 trait 就能暴露我们的Task结构体作为一个 waker。
添加以下依赖到Cargo.toml中来拉取futures。
futures = "0.3"
然后实现futures::task::ArcWake。
#![allow(unused)] fn main() { use futures::task::{self, ArcWake}; use std::sync::Arc; impl ArcWake for Task { fn wake_by_ref(arc_self: &Arc<Self>) { arc_self.schedule(); } } }
当上面的定时器线程调用waker.wake()时,任务被传到管道中。接下来,我们在MiniTokio::run()中实现接收和执行任务。
#![allow(unused)] fn main() { impl MiniTokio { fn run(&self) { while let Ok(task) = self.scheduled.recv() { task.poll(); } } /// 初始化一个新的 mini-tokio 实例。 fn new() -> MiniTokio { let (sender, scheduled) = mpsc::channel(); MiniTokio { scheduled, sender } } /// 在 mini-tokio 实例上生成一个future /// /// 给定的 future 被包含在 `Task` 中并被传到 `调度` 队列中 /// 这个 future 将在调用 `run` 时执行 fn spawn<F>(&self, future: F) where F: Future<Output = ()> + Send + 'static, { Task::spawn(future, &self.sender); } } impl TaskFuture { fn new(future: impl Future<Output = ()> + Send + 'static) -> TaskFuture { TaskFuture { future: Box::pin(future), poll: Poll::Pending, } } fn poll(&mut self, cx: &mut Context<'_>) { // 允许虚假唤醒,即使一个 future 已经返回了 `Ready`。 // 然而,轮询一个已经返回了 `Ready` 的future是*不*被允许的。 // 对此,我们需要在调用前检查 future 是否仍处于挂起状态。 // 如果不这样做可能导致 panic 。 if self.poll.is_pending() { self.poll = self.future.as_mut().poll(cx); } } } impl Task { fn poll(self: Arc<Self>) { // 从 `Task` 实例中创建waker。 // 使用上述提到的 `ArcWake` impl。 let waker = task::waker(self.clone()); let mut cx = Context::from_waker(&waker); // 没有其他线程尝试锁这个 task_future。 let mut task_future = self.task_future.try_lock().unwrap(); // 轮询内部的 future task_future.poll(&mut cx); } // 对于给定的 future 生成新任务 // // 初始化包含给定 future 的新任务结构,推给 `sender` // 管道的接收部分会获取到这个任务并执行它。 fn spawn<F>(future: F, sender: &mpsc::Sender<Arc<Task>>) where F: Future<Output = ()> + Send + 'static, { let task = Arc::new(Task { task_future: Mutex::new(TaskFuture::new(future)), executor: sender.clone(), }); let _ = sender.send(task); } } }
这里发生了很多事情。首先,实现了MiniTokio::run()。该函数在循环体中执行,不断从管道中接收计划执行的任务。让任务被唤醒时,就被推送到管道中,然后这些任务就可以被执行来取得一些进展。
此外,MiniTokio::new()和MiniTokio::spawn()函数也被调整为使用管道,而不是一个VecDeque。当新任务生成时,他们会获取管道发送者的克隆,这让任务可以在运行时上调度自己。
Task::poll()函数使用futurescrate 的ArcWake创建了 waker。这个 waker 被用来创建task::Context的,它会被传递到poll。
总结
我们已经看到异步 Rust 如何端对端地工作。Rust 的async/await功能是由 trait 提供的。这让第三方 crate,比如 Tokio,提供执行的相关细节。
- 异步 Rust 操作是惰性的,需要调用者轮询它们。
- Waker 被传递给 future,把 future 与调用的任务联系起来。
- 当资源未就绪去完成操作,返回
Poll::Pending并记录任务的 waker。 - 当资源就绪,任务的 waker 会收到通知。
- 当执行器收到通知,就会安排任务执行。
- 再次轮询任务,这次资源就绪,任务可以推进。
一些尚未解决的问题
回想当我们实现Delay的 future,我们说还有些问题需要解决。Rust 异步模型允许单个 future 在执行时夸任务迁移。考虑以下代码:
use futures::future::poll_fn; use std::future::Future; use std::pin::Pin; #[tokio::main] async fn main() { let when = Instant::now() + Duration::from_millis(10); let mut delay = Some(Delay { when }); poll_fn(move |cx| { let mut delay = delay.take().unwrap(); let res = Pin::new(&mut delay).poll(cx); assert!(res.is_pending()); tokio::spawn(async move { delay.await; }); Poll::Ready(()) }).await; }
pull_fn函数使用闭包创建一个Future实例。上述代码片段创建了一个Delay实例,并轮询了一次,然后把Delay实例发送给等待它的新任务。此例中,使用不同的Waker 实例会导致多次调用Delay::poll。当这种情况发生时,你需要保证当 Waker 传递到最近一次轮询调用时调用唤醒。
当实现 future 时,你不能认为每次调用poll都能提供不同的Waker实例。poll 函数必须使用新的 waker 来更新任何先前记录的 waker。
我们稍早前实现的Delay每次轮询时都会产生一个新线程。这没啥问题,但如果轮询非常频繁可能导致效率低下(例如,如果你select!这个 future 和一些其他 future,只要发生事件就开始轮询二者)。一种方法是记住是否你已经生成了一个线程,尽在你还没生成线程时,才生成一个新线程。但是,如果这样做,你必须保证线程的Waker在调用 poll 之后更新,否则你就不会唤醒最新的Waker
为了修复之前的实现,我们可以这样做:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; use std::thread; use std::time::{Duration, Instant}; struct Delay { when: Instant, // 当我们生成了线程,这里是 Some,否则就是 None。 waker: Option<Arc<Mutex<Waker>>>, } impl Future for Delay { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { // 检查当前实例。如果经过了设定时间,就说明 // 该 future 完成了,返回 `Poll::Ready`。 if Instant::now() >= self.when { return Poll::Ready(()); } // 设定时间未完成。如果这事第一次调用 future, // 生成定时器线程。如果定时器线程已经运行了, // 确保存储的 `Waker` 匹配当前任务的waker。 if let Some(waker) = &self.waker { let mut waker = waker.lock().unwrap(); // 检查存储的 waker 是否匹配当前任务的 waker // 这对于 `Delay` 的 future 实例是必须的,因为它可能移动到 // 一个不同的任务,在两次调用 `poll`之间。如果这发生了 // waker 包含的给定 `Context` 就会不同,所以我们必须 // 更新存储的 waker ,来反映这种改变 if !waker.will_wake(cx.waker()) { *waker = cx.waker().clone(); } } else { let when = self.when; let waker = Arc::new(Mutex::new(cx.waker().clone())); self.waker = Some(waker.clone()); // 第一次调用 `poll`,生成定时器线程。 thread::spawn(move || { let now = Instant::now(); if now < when { thread::sleep(when - now); } // 经过了给定时间。通过调用waker来通知调用者 let waker = waker.lock().unwrap(); waker.wake_by_ref(); }); } // 现在,waker已经被存储,计时器线程也已经开启。 // 还没经过给定的时间(回想一下我们检查过这个事) // 因此future还没完成,我们需要返回 `Poll::Pending` // // `Future` trait 的实现需要当返回 `Pending` 时, // future 确保一旦 future 应该被再次轮询时, // 给定的 waker 已经收到信号。在我们的例子中, // 通过在这里返回 `Pending`,我们承诺一旦经过了给定的时长 // 我们将调用包含 `Context` 参数的给定的waker。 // 我们通过上面生成的计时器线程来确保这一点。 // // 如果我们忘记调用 waker,任务就会无限期挂起 Poll::Pending } } }
这有点复杂,但是想法是,每次调用poll的时候,future 会检查提供的 waker 是否与之前记录的 waker 相匹配。如果两个 waker 匹配,不用执行其他操作。如果不比配,那么记录的 waker 必须被更新。
通知工具 Notify utility
我们演示了一个Delayfuture 是如何通过使用 waker 手动实现的。Waker 是异步 Rust 工作的基础。通常情况下,没必要理解到这个层次。例如,在例子Delay中,我们可以使用tokio::sync::Notify工具来为它实现async/await。该工具提供了基本的任务通知机制。它会处理 waker 的细节,包含确保记录的 waker 与当前任务相匹配。
使用Notify,我们可以通过async/await来实现delay函数,就像这样:
#![allow(unused)] fn main() { use tokio::sync::Notify; use std::sync::Arc; use std::time::{Duration, Instant}; use std::thread; async fn delay(dur: Duration) { let when = Instant::now() + dur; let notify = Arc::new(Notify::new()); let notify_clone = notify.clone(); thread::spawn(move || { let now = Instant::now(); if now < when { thread::sleep(when - now); } notify_clone.notify_one(); }); notify.notified().await; } }
Select 选择先完成的
现在,当我们已经想要向系统添加并发时,我们可以生成新任务。现在我们将介绍一些Tokio并发执行异步代码的其他方法。
tokio::select!
tokio::select!宏允许在多个异步计算等待,并当单个计算完成时返回。
例如:
use tokio::sync::oneshot; #[tokio::main] async fn main() { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); tokio::spawn(async { let _ = tx1.send("one"); }); tokio::spawn(async { let _ = tx2.send("two"); }); tokio::select! { val = rx1 => { println!("rx1 completed first with {:?}", val); } val = rx2 => { println!("rx2 completed first with {:?}", val); } } }
这里使用了两个oneshot管道。任一管道都可以首先完成。select!语句会在两个管道上等待,并将val绑定到任务返回到值。当tx1或tx2完成时,这个语句块将被执行。
未完成的分支将会被drop。在上面例子,计算过程等待每个管道的oneshot::Receiver。尚未完成的管道的oneshot::Receiver将会被drop。
消除 Cancellation
对于异步Rust来说,消除任务是通过drop future来进行的。回想一下“深入异步”,异步Rust操作是通过使用future实现的,而future是惰性的(lazy)。该操作仅在future被轮询的时候实现。如果future被drop,操作就无法继续了,因为有关状态已经被drop了。
这表明,有时异步操作会在生成后台任务或在后台运行其他操作。例如,在上面的示例中,生成了一个任务来发送一个消息,然后返回。通常,后台任务将执行一些计算来生成值。
Future或其他类型都可以实现Drop来清理后台资源。Tokio的oneshot::Receiver实现了Drop,通过向Sender部分发送一个关闭通知。sender部分就可以接收到这个通知,然后丢弃正在进行中的操作来drop。
use tokio::sync::oneshot; async fn some_operation() -> String { // 在这里计算一些值 } #[tokio::main] async fn main() { let (mut tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); tokio::spawn(async { // 在某些操作和 oneshot 的 `closed` 通知上选择先完成的 tokio::select! { val = some_operation() => { let _ = tx1.send(val); } _ = tx1.closed() => { // `some_operation()` 被取消了 // 任务完成并且 `tx1` 被drop } } }); tokio::spawn(async { let _ = tx2.send("two"); }); tokio::select! { val = rx1 => { println!("rx1 completed first with {:?}", val); } val = rx2 => { println!("rx2 completed first with {:?}", val); } } }
Future实现
为了更好理解select!是如何工作的,让我们看看假设的Future实现什么样。这是一个简化版本。实际上,select!包含一些其他功能,例如随机选择首先轮询的分支。
use tokio::sync::oneshot; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; struct MySelect { rx1: oneshot::Receiver<&'static str>, rx2: oneshot::Receiver<&'static str>, } impl Future for MySelect { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { if let Poll::Ready(val) = Pin::new(&mut self.rx1).poll(cx) { println!("rx1 completed first with {:?}", val); return Poll::Ready(()); } if let Poll::Ready(val) = Pin::new(&mut self.rx2).poll(cx) { println!("rx2 completed first with {:?}", val); return Poll::Ready(()); } Poll::Pending } } #[tokio::main] async fn main() { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); // 使用 tx1 和 tx2 MySelect { rx1, rx2, }.await; }
MySelectfuture包含了每个分支的future。当MySelect被轮询时,第一个分支将被轮询。如果它就绪了,使用该值完成MySelect。.await收到future的输出之后,future被drop。这会让两个分支的future都drop。由于另外一个分支没完成,该操作实际上被消除了。
请回想一下上一节内容:
info
当一个future返回了Poll::Pending,它必须确保在将来某时给waker发出信号。忘了这样做会导致任务被无限期挂起。
MySelect实现中没有显式的使用Context参数。相反,waker要求是传递cx到内部的future。由于内部的future也得满足waker的需求,当从内部的future接收到Poll::Pending时,外部只返回Poll::Pending,这样MySelect也能满足waker的需求。
语法
select! 宏可以处理两个以上分支。当前的限制是64个分支。每个分支的结构如下:
<pattern> = <async expression> => <handler>,
<模式> = <异步表达式> => <处理句柄>,
当select!宏计算时,所有的<async expression>都会被并发执行。当某个异步表达式完成时,结果将会与<pattern>进行模式匹配。如果结果与模式匹配,则drop其他所有异步表达式,并执行<handler>。<handler>表达式可以访问由<pattern>建立的任何绑定。
<pattern>的基本例子是一个变量名,异步表达式的结果绑定到变量名,然后<handler>可以访问这个变量。这就是为什么最开始的例子中,<pattern>是val,并<handler>可以访问val。
如果<pattern>不匹配异步计算的结果,那么剩余的异步表达式将继续并发执行,直到有一个完成。此时,会对结果执行相同的逻辑。
因为select!接收任何异步表达式,所以定义更复杂的异步计算是可能的。
在这里,我们选择oneshot管道和一个TCP连接的输出,谁先输出。
use tokio::net::TcpStream; use tokio::sync::oneshot; #[tokio::main] async fn main() { let (tx, rx) = oneshot::channel(); // 生成一个任务向 oneshot 管道发送一个消息 tokio::spawn(async move { tx.send("done").unwrap(); }); tokio::select! { socket = TcpStream::connect("localhost:3465") => { println!("Socket connected {:?}", socket); } msg = rx => { println!("received message first {:?}", msg); } } }
在这里,我们选择一个oneshot和从一个TcpListener接收套接字,谁先完成。
use tokio::net::TcpListener; use tokio::sync::oneshot; use std::io; #[tokio::main] async fn main() -> io::Result<()> { let (tx, rx) = oneshot::channel(); tokio::spawn(async move { tx.send(()).unwrap(); }); let mut listener = TcpListener::bind("localhost:3465").await?; tokio::select! { _ = async { loop { let (socket, _) = listener.accept().await?; tokio::spawn(async move { process(socket) }); } // 帮帮 Rust 的类型推断 Ok::<_, io::Error>(()) } => {} _ = rx => { println!("terminating accept loop"); } } Ok(()) }
接收循环将会一直运行,直到遇到错误或rx接收到一个值。_模式表示我们忽略了异步计算的返回值。
返回值
tokio::select!宏返回<handler>表达式的结果。
async fn computation1() -> String { // .. 一些计算 } async fn computation2() -> String { // .. 一些计算 } #[tokio::main] async fn main() { let out = tokio::select! { res1 = computation1() => res1, res2 = computation2() => res2, }; println!("Got = {}", out); }
正因如此,它要求每个分支的<handler>表达式均为相同类型。如果一个select!的表达式的输出是不需要的,最后把handler表达式的输出设为()。
错误处理
使用?运算符来传播表达式中的错误。这是否能用取决于是否?被用在异步表达式或handler中。在异步表达式中使用?会将错误传播到异步表达式之外。这会让异步表达式输出一个Result。在handler中使用?将会立即传播错误到select!表达式外部。让我们再看一下接收循环例子:
use tokio::net::TcpListener; use tokio::sync::oneshot; use std::io; #[tokio::main] async fn main() -> io::Result<()> { // [初始化 `rx` oneshot 管道] let listener = TcpListener::bind("localhost:3465").await?; tokio::select! { res = async { loop { let (socket, _) = listener.accept().await?; tokio::spawn(async move { process(socket) }); } // 帮帮 Rust 的类型推断 Ok::<_, io::Error>(()) } => { res?; } _ = rx => { println!("terminating accept loop"); } } Ok(()) }
注意listener.accept().await?。?运算符传播的错误绑定到了res。发生错误时,res会变成Err(_)。然后,在handler中再用?。res?语句会把错误传播到main之外。
模式匹配
回想一下select!宏分支语法是这样定义的:
<pattern> = <async expression> => <handler>,
到目前为止,我们都只对<pattern>绑定一个变量。其实,任何Rust模式都可以用。例如,假设我们从多个MPSC管道中接收数据,我们可能这样做:
use tokio::sync::mpsc; #[tokio::main] async fn main() { let (mut tx1, mut rx1) = mpsc::channel(128); let (mut tx2, mut rx2) = mpsc::channel(128); tokio::spawn(async move { // 给 `tx1` and `tx2` 传点消息 }); tokio::select! { Some(v) = rx1.recv() => { println!("Got {:?} from rx1", v); } Some(v) = rx2.recv() => { println!("Got {:?} from rx2", v); } else => { println!("Both channels closed"); } } }
这个例子中,select!表达式会等待从rx1和rx2接收值。如果管道关闭,recv()会返回None。这不会匹配这个模式,该分支会被禁用。select!表达式会继续等待其他剩余的分支。
注意这个select!表达式包含一个else分支。这表示select!表达式必须计算出一个值。当使用模式匹配时,可能没有任何一个分支可以与其匹配。如果发生这种情况,就会计算else分支。
借用
当生成任务时,生成的异步表达式必须拥有其数据的所有权。select!宏则没有该限制。每个分支的异步表达式可以借用数据并并发操作。为了遵循Rust的借用规则,多个异步表达式可以不可变借用一条数据,或者一个异步表达式可以可变借用一条数据。
让我看一些例子。这里,我们将相同的数据发送到两个不同的TCP目的地。
#![allow(unused)] fn main() { use tokio::io::AsyncWriteExt; use tokio::net::TcpStream; use std::io; use std::net::SocketAddr; async fn race( data: &[u8], addr1: SocketAddr, addr2: SocketAddr ) -> io::Result<()> { tokio::select! { Ok(_) = async { let mut socket = TcpStream::connect(addr1).await?; socket.write_all(data).await?; Ok::<_, io::Error>(()) } => {} Ok(_) = async { let mut socket = TcpStream::connect(addr2).await?; socket.write_all(data).await?; Ok::<_, io::Error>(()) } => {} else => {} }; Ok(()) } }
data变量在两个异步表达式都被不可变借用。当其中一个操作成功完成时,另外一个被drop。因为模式匹配了Ok(_),如果一个表达式失败,另外一个会继续执行。
当涉及到每个分支的<handler>,select!会保证只有一个<handler>在运行。正因如此,每个<handler>都可以可变借用相同的数据(因为同时刻只有一个在运行)。
例如,这两个handler更改了out:
use tokio::sync::oneshot; #[tokio::main] async fn main() { let (tx1, rx1) = oneshot::channel(); let (tx2, rx2) = oneshot::channel(); let mut out = String::new(); tokio::spawn(async move { // 给 `tx1` and `tx2` 发送点数据。 }); tokio::select! { _ = rx1 => { out.push_str("rx1 completed"); } _ = rx2 => { out.push_str("rx2 completed"); } } println!("{}", out); }
循环
select!宏总是在循环中启用。本小节将通过一些实例介绍在循环中使用select!宏的常见方法。让我们通过选择多个管道谁先完成开始:
use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx1, mut rx1) = mpsc::channel(128); let (tx2, mut rx2) = mpsc::channel(128); let (tx3, mut rx3) = mpsc::channel(128); loop { let msg = tokio::select! { Some(msg) = rx1.recv() => msg, Some(msg) = rx2.recv() => msg, Some(msg) = rx3.recv() => msg, else => { break } }; println!("Got {:?}", msg); } println!("All channels have been closed."); }
这个示例选择了三个管道的接收者。当有其中一个管道接收到消息,消息会被写入STDOUT。当某个管道关闭时,recv()会返回None。通过使用模式匹配,select!宏会继续等待等待剩余的管道完成。当所有管道关闭,else分支会开始计算,然后循环终止。
select!宏会随机选择分支,来先检查是否就绪。当多个管道都有待处理值时,将随机选择一个管道接收。这是为了处理当接收循环处理消息比管道接收消息速度慢的情况。这意味着管道开始被填满。如果select!不去随机选一个分支来检查,每次的循环迭代中,rx1总是被首先检查。如果rx1总是有新消息,其他的管道将永远不会被检查。
info
如果select!计算时,多个管道有待处理的消息,只有一个管道的消息会被弹出。其他的管道都会保持不变,并且消息保留在管道中知道下一次循环迭代。没有消息会丢失。
恢复异步操作 Resuming an async operation
现在我们将展示如何跨多次调用select!来执行异步操作。在这个例子中,有一个类型为i32的MPSC管道,和一个异步函数。我们希望运行该异步函数,直到它从管道中接收到一个偶数。
async fn action() { // 一些异步逻辑 } #[tokio::main] async fn main() { let (mut tx, mut rx) = tokio::sync::mpsc::channel(128); let operation = action(); tokio::pin!(operation); loop { tokio::select! { _ = &mut operation => break, Some(v) = rx.recv() => { if v % 2 == 0 { break; } } } } }
注意,没在select!宏中调用action(),而是在循环外调用。action()的返回值给了operation,而且没调用.await。然后我们在operation上调用了tokio::pin!。
在select!循环体中,没传入operation,而是&mut operation。operation变量正在跟踪异步操作。循环的每次迭代都会执行相同的操作,而不是对action()发出新调用。
select!的另外一个分支从管道中接收消息,如果接收到了偶数,就跳出循环,否则继续再次执行select!。
这是我们第一次使用tokio::pin!。我们不打算深入讨论pin的细节。需要注意的是,对一个引用调用.await,引用的值必须是pinned或者实现了Unpin。
如果我们删掉tokio::pin!,尝试编译就会得到以下错误:
error[E0599]: no method named `poll` found for struct
`std::pin::Pin<&mut &mut impl std::future::Future>`
in the current scope
--> src/main.rs:16:9
|
16 | / tokio::select! {
17 | | _ = &mut operation => break,
18 | | Some(v) = rx.recv() => {
19 | | if v % 2 == 0 {
... |
22 | | }
23 | | }
| |_________^ method not found in
| `std::pin::Pin<&mut &mut impl std::future::Future>`
|
= note: the method `poll` exists but the following trait bounds
were not satisfied:
`impl std::future::Future: std::marker::Unpin`
which is required by
`&mut impl std::future::Future: std::future::Future`
虽然我们在上一章介绍了Future,也不能很清晰的解释这个错误。如果你在尝试对引用调用.await遇到了有关于Future未实现的错误,那么你可能需要pin future。
设定一个分支 Modifying a branch
让我们看一下稍微复杂一点的循环。我们有:
- 一管道的
i32值。 - 对
i32值的异步操作。
我们想要实现的逻辑:
- 等待从管道中接收一个偶数。
- 把这个偶数作为异步操作的输入。
- 等待操作完成,同时从管道中监听更多的偶数。
- 如果又接收到一个新偶数,但此时已存在的异步操作未完成,打断这个异步操作,并传入新偶数开始新的该异步操作。
async fn action(input: Option<i32>) -> Option<String> { // 输入输入是 `None`,返回 `None`。 // 也可以这样写 `let i = input?;` let i = match input { Some(input) => input, None => return None, }; // 这里是一些异步逻辑 } #[tokio::main] async fn main() { let (mut tx, mut rx) = tokio::sync::mpsc::channel(128); let mut done = false; let operation = action(None); tokio::pin!(operation); tokio::spawn(async move { let _ = tx.send(1).await; let _ = tx.send(3).await; let _ = tx.send(2).await; }); loop { tokio::select! { res = &mut operation, if !done => { done = true; if let Some(v) = res { println!("GOT = {}", v); return; } } Some(v) = rx.recv() => { if v % 2 == 0 { // `.set` 是 `Pin` 的一个方法。 operation.set(action(Some(v))); done = false; } } } } }
我们使用了与之前相似的策略。异步函数在循环外被调用,并且给了operation。operation变量被pinned。选择循环体作用在operation和通道的接收者上。
注意现在action接收Option<i32>参数。当我们接收到第一个偶数前,我们需要实例化operation为某些东西。我们让action接收Option并返回Option。如果传进来了None,那就返回None。第一次循环迭代operation会立刻返回None。
此示例用了一些新语法。第一个分支有一个, if !done。这是该分支的一个前提条件。在解析它是如何工作之前,让我们看看省略该前提条件会发生什么。删掉, if !done并运行该示例,会导致以下输出:
thread 'main' panicked at '`async fn` resumed after completion', src/main.rs:1:55
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
当我们尝试在它已经完成后使用operation时,发生此错误。通常来说,当使用.await时,调用了await的值会被消耗。在这个例子中,我们等待一个引用。这意味着operation完成后仍然存在。
为了避免panic,我们必须在操作完成后禁用第一个分支。done变量用于跟踪operation是否完成。一个select!分支可以包含一个前提条件。这个前提条件会在select!在该分支上调用await之前检查。如果条件是false,那么该分支被禁用。通常,done变量被初始化为false。当operation完成之后,done被设置为true。下一次循环迭代就会禁用这个operation分支。当从管道中接收到一个偶数时,operation被重置,done被设置为false。
每个任务的并发方式 Per-task concurrency
tokio::spawn和select!都可以启用并发异步操作。然而,用于运行并发操作的策略有所不同。tokio::spawn函数取得一个异步操作,然后生成一个任务来运行它。任务是Tokio运行时调度的对象。Tokio会独立安排两个不同的任务。这或许会让它们同时运行在两个不同的操作系统线程上。正因如此,生成的任务与生成的线程有相同的限制:不能借用外部的值。
select!宏会在同一个任务上并发运行所有分支。因为select!宏的所有分支在同一个任务上运行,它们永远不会同时运行。select!宏会在单个任务上多路复用异步操作。
Streams 流
流是一系列异步的值。它相当于std::iter::Iterator的异步版本,并且由Stream trait 来表示。流可以在异步函数中迭代。它们也可以通过适配器(adapter)来转换。Tokio 在StreamExt trait 上提供了许多通用的适配器。
Tokio 通过一个独立的 cratetokio-stream提供了流支持。
tokio-stream = "0.1"
info 当前,Tokio 的流工具包存在于
tokio-streamcrate 中。一旦Streamtrait 在 Rust 的标准库中稳定下来,Tokio 的流工具包将会迁移到tokiocrate。
迭代
当前,Rust 不支持异步for循环。取而代之的是,使用while let循环搭配StreamExt::next()来迭代流。
use tokio_stream::StreamExt; #[tokio::main] async fn main() { let mut stream = tokio_stream::iter(&[1, 2, 3]); while let Some(v) = stream.next().await { println!("GOT = {:?}", v); } }
就像迭代器一样,next()方法返回Option<T>,T是流的值类型。接收到None表示流迭代终止。
Mini-Redis 的广播
让我们看一下使用 Mini-Redis 的稍微复杂点的示例。
完整代码可在这里找到。
use tokio_stream::StreamExt; use mini_redis::client; async fn publish() -> mini_redis::Result<()> { let mut client = client::connect("127.0.0.1:6379").await?; // 发布一些数据 client.publish("numbers", "1".into()).await?; client.publish("numbers", "two".into()).await?; client.publish("numbers", "3".into()).await?; client.publish("numbers", "four".into()).await?; client.publish("numbers", "five".into()).await?; client.publish("numbers", "6".into()).await?; Ok(()) } async fn subscribe() -> mini_redis::Result<()> { let client = client::connect("127.0.0.1:6379").await?; let subscriber = client.subscribe(vec!["numbers".to_string()]).await?; let messages = subscriber.into_stream(); tokio::pin!(messages); while let Some(msg) = messages.next().await { println!("got = {:?}", msg); } Ok(()) } #[tokio::main] async fn main() -> mini_redis::Result<()> { tokio::spawn(async { publish().await }); subscribe().await?; println!("DONE"); Ok(()) }
生成一个任务来向 Mini-Redis 服务端发布消息到"numbers"频道上。然后,在 main 任务中,我们订阅了"numbers"频道,并且显示接收到的消息。
订阅后,into_stream()被调用,返回了订阅者(subscriber)。这会消耗订阅者,返回一个流,该流会在消息到达时生成消息。在我们开始迭代消息之前,注意流通过tokio::pin!被pin到了栈上。在流上调用next()需要这个流被 pin。into_stream()返回了一个没有pin 的流,我们必须显式 pin 它才能迭代他。
info
当一个 Rust 值在内存中无法再被移动,就说是被“pin”。一个 pinned 的值的关键属性就是指针可以指向 pinned 的数据,并且调用者可以确信指针可以一直有效。async/await使用这个特性来支持跨.await借用数据(borrowing data across.awaitpoints)。
如果我们忘了 pin 流,我们会得到以下错误:
error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
--> streams/src/main.rs:29:36
|
29 | while let Some(msg) = messages.next().await {
| ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
|
= note: required because it appears within the type `impl Future`
= note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
= note: required because it appears within the type `impl Stream`
= note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
= note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
= note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
= note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
如果你得到像这样的错误信息,尝试 pin 那个值!
当你尝试运行这个,首先开启 Mini-Redis 服务端:
$ mini-redis-server
尝试运行代码。我们会在 STDOUT 得到以下输出。
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"four" })
got = Ok(Message { channel: "numbers", content: b"five" })
got = Ok(Message { channel: "numbers", content: b"6" })
由于订阅和发布之间存在竞争,一些早期的消息可能被 drop。该程序永远不会退出。只要服务端保持活动状态,对 Mini-Redis 频道的订阅就会保持活动状态。
让我们看看如何使用流扩展这个程序。
适配器
接收一个流病返回其他流的函数被叫做“流适配器”,因为它们是“适配器模式”的一种形式。常见的流适配器包含 map, take 和 filter。
让我们更新 Mini-Redis 代码来让它可以退出。在接收到三条消息之后,停止迭代消息。这是使用take完成的。这个适配器限制流最多产生n条消息。
#![allow(unused)] fn main() { let messages = subscriber .into_stream() .take(3); }
再次运行程序,我们看到以下输出:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"two" })
got = Ok(Message { channel: "numbers", content: b"3" })
这次,程序会停止了。
现在,让我们限制流只返回一位十进制数字。我们将会通过检查消息长度来检查这一点。我们使用filter适配器来 drop 其他不匹配的消息。
#![allow(unused)] fn main() { let messages = subscriber .into_stream() .filter(|msg| match msg { Ok(msg) if msg.content.len() == 1 => true, _ => false, }) .take(3); }
再次运行程序,我们看到以下输出:
got = Ok(Message { channel: "numbers", content: b"1" })
got = Ok(Message { channel: "numbers", content: b"3" })
got = Ok(Message { channel: "numbers", content: b"6" })
注意,应用适配器的顺序很重要。先调用filter,然后take,跟先调用take,再调用filter效果是不同的。
最后,我们来整理Ok(Message { ... })输出。这是通过map完成的。因为这是在filter之后执行的,所以我们知道消息肯定是Ok的,所以我们可以用unwrap()。
#![allow(unused)] fn main() { let messages = subscriber .into_stream() .filter(|msg| match msg { Ok(msg) if msg.content.len() == 1 => true, _ => false, }) .map(|msg| msg.unwrap().content) .take(3); }
现在,输出是:
got = b"1"
got = b"3"
got = b"6"
另一种选择是组合filter和map到单次调用filter_map中。
这里还有更多可用适配器。查看这个列表。
实现流
Streamtrait 与Futuretrait 非常相似。
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; pub trait Stream { type Item; fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Option<Self::Item>>; fn size_hint(&self) -> (usize, Option<usize>) { (0, None) } } }
Stream::poll_next()函数非常像Future::poll,不同之处在于它可以重复调用从流中接收许多值。正如我们在深入异步章节中看到的那样,当一个流没准备好返回值,就会返回Poll::Pending。任务的 waker 已经注册,一旦流准备好被再次轮询,waker 就会收到通知。
size_hint()方法与迭代器一样。
通常来说,手动实现Stream时,是通过组合 future 和其他流来完成的。例如,在深入异步章节中构建Delay future 时。我们可以把它转换为一个流,间隔 10ms,生成三次()。
#![allow(unused)] fn main() { use tokio_stream::Stream; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; struct Interval { rem: usize, delay: Delay, } impl Interval { fn new() -> Self { Self { rem: 3, delay: Delay { when: Instant::now() } } } } impl Stream for Interval { type Item = (); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> { if self.rem == 0 { // 不用再延迟了 return Poll::Ready(None); } match Pin::new(&mut self.delay).poll(cx) { Poll::Ready(_) => { let when = self.delay.when + Duration::from_millis(10); self.delay = Delay { when }; self.rem -= 1; Poll::Ready(Some(())) } Poll::Pending => Poll::Pending, } } } }
async-stream异步流
但是手动实现Stream trait 可能很无聊。不幸的是,Rust 尚不支持async/await语法来定义流。这正在开发中,但并没有就绪。
async-streamcrate 可作为临时解决方案。这个 crate 提供了stream!宏来转换输入的流。通过使用这个 crate,上面的间隔返回可以这样实现:
#![allow(unused)] fn main() { use async_stream::stream; use std::time::{Duration, Instant}; stream! { let mut when = Instant::now(); for _ in 0..3 { let delay = Delay { when }; delay.await; yield (); when += Duration::from_millis(10); } } }
Topics 其他主题
本节包含编写异步程序出现的各种相关主题。
目前可用的主题文章有:
- Bridging with sync code 异步与同步代码共存
- Graceful shutdown 如何优雅地结束程序
- Getting started with Tracing 开始使用Tracing日志
- Next steps with Tracing 更进一步使用Tracing日志
- Unit Testing 单元测试
Bridging with sync code 异步与同步代码共存
使用 Tokio 的大多数例子中,我们使用#[tokio::main]注解标记 main 函数,并让整个项目是异步的。
但某些时候,你可能只需要执行一小部分异步代码。详细信息可以看:spawn_blocking。
其他情况下,把应用程序构建为大多数是同步,具有小部分或逻辑上不同的异步部分可能会更容易一些。例如,一个 GUI 应用可能需要在 main 线程运行 GUI 代码,并在另外一个线程运行 Tokio 运行时。
本节将介绍你该如何把 async/await 隔离到你的项目中的一小部分。
#[tokio::main]是什么东西
#[tokio::main]宏会用一个非异步的 main 函数来替换你的 main 函数,当这个函数启动了运行时,之后就可以调用你的代码。比如:
#[tokio::main] async fn main() { println!("Hello world"); }
可以通过宏转换为:
fn main() { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() .block_on(async { println!("Hello world"); }) }
为了在我们项目中使用 async/await,我们可以做类似的操作,在适当的情况下利用block_on方法,来进入异步上下文。
mini-redis 的同步接口
本小节中,我们将会介绍如何通过存储Runtime对象并使用block_on方法来为 mini-redis 构建一个同步接口。在下面,我们会讨论一些替代方法,和何时使用这些方法。
我们将会包装的接口是一个异步的Client类型。它有以下几个方法,并且我们会实现这些方法的阻塞版本:
为此,我们创建一个新文件,叫src/clients/blocking_client.rs并通过包装异步Client类型的结构体来初始化。
#![allow(unused)] fn main() { use tokio::net::ToSocketAddrs; use tokio::runtime::Runtime; pub use crate::clients::client::Message; /// 与 Redis server 建立连接。 pub struct BlockingClient { /// 异步的 `Client`. inner: crate::clients::Client, /// 一个 `current_thread` 运行时,用来在 /// 一个阻塞的环境下对异步 client 执行操作 rt: Runtime, } impl BlockingClient { pub fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<BlockingClient> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; // 通过运行时调用异步的 connect method let inner = rt.block_on(crate::clients::Client::connect(addr))?; Ok(BlockingClient { inner, rt }) } } }
这里,我们把包含构造函数的 impl 作为我们第一个示例,来展示如何在非异步上下文中执行异步方法。我们通过在 Tokio 的Runtime类型上使用block_on方法,这可以执行一个异步方法,并返回结果。
一个很重要的细节,我们使用了current_thread运行时。通常当我们使用 Tokio 时,你可能使用默认的multi_thread运行时,当它运行时,会生成一堆后台线程,以便于它可以有效地同时运行很多事情。但在我们使用情况中,我们只一次做一件事,所以使用多线程没有任何好处。这让current_thread运行时非常适合,因为它不会生成任何线程。
enable_all在 Tokio 运行时上调用了 IO 和定时器驱动程序。如果没启用,运行时就不会执行 IO 和定时器。
warning
因为current_thread运行时不会生成新线程,只会等待block_on调用。一旦block_on返回,这个运行时上所有生成的任务就会冻结,直到你再次调用block_on。如果生成的任务必须在没调用block_on时保持运行,使用multi_threaded运行时。
一旦我们有了这个结构体,大多数方法就很容易实现了:
#![allow(unused)] fn main() { use bytes::Bytes; use std::time::Duration; impl BlockingClient { pub fn get(&mut self, key: &str) -> crate::Result<Option<Bytes>> { self.rt.block_on(self.inner.get(key)) } pub fn set(&mut self, key: &str, value: Bytes) -> crate::Result<()> { self.rt.block_on(self.inner.set(key, value)) } pub fn set_expires( &mut self, key: &str, value: Bytes, expiration: Duration, ) -> crate::Result<()> { self.rt.block_on(self.inner.set_expires(key, value, expiration)) } pub fn publish(&mut self, channel: &str, message: Bytes) -> crate::Result<u64> { self.rt.block_on(self.inner.publish(channel, message)) } } }
Client::subscribe方法更有趣,因为它可以转换 Client 变成 Subscriber 对象。我们可以通过以下方式实现:
#![allow(unused)] fn main() { /// 已进入 发布/订阅 模式的客户端. /// /// 一旦客户端订阅了一个频道,它就只能处理 发布/订阅 /// 相关的指令。`BlockingClient` 类型是用来转换 /// 为一个 `BlockingSubscriber` 类型,这样才能 /// 阻止调用 非发布/订阅 的指令。 pub struct BlockingSubscriber { /// 异步的 `Subscriber`. inner: crate::clients::Subscriber, /// 一个 `current_thread` 运行时,用来在 /// 一个阻塞的环境下对异步 client 执行操作 rt: Runtime, } impl BlockingClient { pub fn subscribe(self, channels: Vec<String>) -> crate::Result<BlockingSubscriber> { let subscriber = self.rt.block_on(self.inner.subscribe(channels))?; Ok(BlockingSubscriber { inner: subscriber, rt: self.rt, }) } } impl BlockingSubscriber { pub fn get_subscribed(&self) -> &[String] { self.inner.get_subscribed() } pub fn next_message(&mut self) -> crate::Result<Option<Message>> { self.rt.block_on(self.inner.next_message()) } pub fn subscribe(&mut self, channels: &[String]) -> crate::Result<()> { self.rt.block_on(self.inner.subscribe(channels)) } pub fn unsubscribe(&mut self, channels: &[String]) -> crate::Result<()> { self.rt.block_on(self.inner.unsubscribe(channels)) } } }
这样,subscribe方法就可以首先使用运行时转换异步Client到一个异步的Subscriber。然后,它会把Subscribe和运行时存储在一起,并使用block_on实现各种方法。
其他方法
上面小节解释了实现同步包装器的最简单方法,但不是唯一的方法。下面的方法有:
- 创建一个运行时,在异步代码上调用
block_on。 - 创建一个运行时,在上面
spawn任务。 - 在独立的线程中运行一个运行时,给它发送消息。
我们已经了解第一种方法了,剩余的两种在下面。
在一个运行时上生成任务
运行时对象有一个方法,叫spawn。当你调用这个方法,你可以创建一个跑在这个运行时的新后台任务。例如:
use tokio::runtime::Builder; use tokio::time::{sleep, Duration}; fn main() { let runtime = Builder::new_multi_thread() .worker_threads(1) .enable_all() .build() .unwrap(); let mut handles = Vec::with_capacity(10); for i in 0..10 { handles.push(runtime.spawn(my_bg_task(i))); } // 做一些在后台任务执行时消耗时间的操作 std::thread::sleep(Duration::from_millis(750)); println!("Finished time-consuming task."); // 等待所有任务完成 for handle in handles { // `spawn` 方法返回了 `JoinHandle`。`JoinHandle`是 // 一个 future,所以我们可以使用 `block_on` 来等待。 runtime.block_on(handle).unwrap(); } } async fn my_bg_task(i: u64) { // 通过减法,i 值较大的任务会休眠更短的时间 let millis = 1000 - 50 * i; println!("Task {} sleeping for {} ms.", i, millis); sleep(Duration::from_millis(millis)).await; println!("Task {} stopping.", i); }
Task 0 sleeping for 1000 ms.
Task 1 sleeping for 950 ms.
Task 2 sleeping for 900 ms.
Task 3 sleeping for 850 ms.
Task 4 sleeping for 800 ms.
Task 5 sleeping for 750 ms.
Task 6 sleeping for 700 ms.
Task 7 sleeping for 650 ms.
Task 8 sleeping for 600 ms.
Task 9 sleeping for 550 ms.
Task 9 stopping.
Task 8 stopping.
Task 7 stopping.
Task 6 stopping.
Finished time-consuming task.
Task 5 stopping.
Task 4 stopping.
Task 3 stopping.
Task 2 stopping.
Task 1 stopping.
Task 0 stopping.
在上述示例中,我们在运行时上生成了 10 个后台任务,然后等待它们完成。例如,这可能是在图形应用程序中实现后台网络请求任务的好方法,因为网络请求很耗时,无法在主 GUI 线程上运行它们。所以,你可以在后台运行的 Tokio 运行时上生成请求,并当任务请求完成时,将消息发送回到 GUI 代码中,甚至如果你想实现一个进度条,可以让它们返回增量消息。
在这个例子中,将运行时配置为multi_thread非常重要。如果你更改为current_thread运行时,你就会发现耗时的任务会在任何后台任务开始前完成了。这是因为后台任务在current_thread运行时上生成,只有当在运行时上调用block_on期间才会执行,否则运行时没有其他任何地方运行它们。
这个例子通过在spawn生成的JoinHandle上调用block_on来等待生成的任务完成,但这不是唯一的方法,下面有一些替代方案:
- 使用消息传递管道,例如
tokio::sync::mpsc - 更改一个共享的值,例如一个
Mutex。这是一个好方法,对于一个 GUI 中的进度条来说,因为 GUI 需要每一帧都读取共享值。
spawn方法也在Handle类型上可用。Handle类型可以被克隆来拿到很多运行时的 handle,并且每一个Handle都可以用于在运行时上生成新任务。
消息传递
第三种方法是生成一个运行时,并使用消息传递与其通信。对比前两种方法,它是最灵活的,你可以在下面看到一个最基本的示例:
#![allow(unused)] fn main() { use tokio::runtime::Builder; use tokio::sync::mpsc; pub struct Task { name: String, // 描述任务的信息 } async fn handle_task(task: Task) { println!("Got task {}", task.name); } #[derive(Clone)] pub struct TaskSpawner { spawn: mpsc::Sender<Task>, } impl TaskSpawner { pub fn new() -> TaskSpawner { // 建立通信管道。 let (send, mut recv) = mpsc::channel(16); // 为新线程建立运行时 // // 在生成新线程之后就创建运行时,这样可以更清晰的追踪error // 如果 `unwrap()` panic了。 let rt = Builder::new_current_thread() .enable_all() .build() .unwrap(); std::thread::spawn(move || { rt.block_on(async move { while let Some(task) = recv.recv().await { tokio::spawn(handle_task(task)); } // 一旦所有的sender都已经走出作用域 // `.recv()` 返回 None 并从循环中退出 // 之后关闭线程 }); }); TaskSpawner { spawn: send, } } pub fn spawn_task(&self, task: Task) { match self.spawn.blocking_send(task) { Ok(()) => {}, Err(_) => panic!("The shared runtime has shut down."), } } } }
该示例可以通过多种方式配置。例如,你可以使用Semaphore (信号量)来限制处于活动状态的任务,或者你可以使用相反方向的管道来发送回一个响应对生成器(spawner)这儿。当你用这种方法生成一个运行时时,这是一个actor类型。
Graceful shutdown 如何优雅地结束程序
这一页的目的是概述如何在异步应用中正确地关闭程序。
实现优雅地结束程序分为三部分:
- 搞明白何时关闭
- 告知程序每一部分程序关闭
- 等待应用其他部分关闭
本文其余部分将介绍这些。此处描述的方法实现可以在mini-redis中找到,尤其是src/server.rs和src/shutdown.rs文件中。
搞清何时关闭
这取决于应用程序,当应用接收到从操作系统的信号是一种常见的关闭情况。这种情况会发生,比如当你程序运行时在终端中按下 ctrl+c 时。为了检测这种,Tokio 提供了一个tokio::signal::ctrl_c函数,该函数会休眠,直到收到这样的信号。你可以这样使用它:
use tokio::signal; #[tokio::main] async fn main() { // ... 在单独的任务上生成应用 ... match signal::ctrl_c().await { Ok(()) => {}, Err(err) => { eprintln!("Unable to listen for shutdown signal: {}", err); // 当发生 error 我们也结束程序 }, } // 向应用发送关机信号,并等待 }
如果你有多钟关闭条件,你可以使用mpsc 管道来发送关机信号到某个地方。你可以在ctrl_c和管道之间进行选择,例如:
use tokio::signal; use tokio::sync::mpsc; #[tokio::main] async fn main() { let (shutdown_send, mut shutdown_recv) = mpsc::unbounded_channel(); // ... 在单独的任务上生成应用 ... // // 只要从程序内部发出了关闭信号,应用使用 shutdown_send tokio::select! { _ = signal::ctrl_c() => {}, _ = shutdown_recv.recv() => {}, } // 向应用发送关机信号,并等待 }
告知程序每一部分程序关闭
当你想要告知更多任务来关闭,你可以使用Cancellation Tokens。这些 token 允许你来通知任务,它们需要终止它们自己来响应这个取消请求,从而轻松实现正常关闭。
为了在数个任务之间共享CancellationToken,你需要克隆它。这是由于单一的所有权规则要求每一个值只能有一个所有者。当克隆一个 token,你会得到一个与原来 token 一样的 token;如果其中一个取消了,那么其他的也会取消。你可以克隆你需要那么多数量的 token,并当你在其中一个 token 上调用cnacel,它们全部都会被取消掉。
这里是在多个任务中使用CancellationToken的步骤:
- 首先,创建新的
CancellationToken。 - 然后,通过
clone方法创建CancellationToken的克隆。这会创建新的 token 并可以用于其他任务上。 - 传递原始或者克隆的 token 到应该响应取消请求的任务上。
- 当你想要优雅地关闭任务时,在原始或者克隆的 token 上调用
cancel方法。任何任务侦测到从原始或克隆的 token 上的取消请求,将会被通知关闭。
这里是上述方法步骤的代码片段:
#![allow(unused)] fn main() { // Step 1: Create a new CancellationToken let token = CancellationToken::new(); // Step 2: Clone the token for use in another task let cloned_token = token.clone(); // Task 1 - Wait for token cancellation or a long time let task1_handle = tokio::spawn(async move { tokio::select! { // Step 3: Using cloned token to listen to cancellation requests _ = cloned_token.cancelled() => { // The token was cancelled, task can shut down } _ = tokio::time::sleep(std::time::Duration::from_secs(9999)) => { // Long work has completed } } }); // Task 2 - Cancel the original token after a small delay tokio::spawn(async move { tokio::time::sleep(std::time::Duration::from_millis(10)).await; // Step 4: Cancel the original or cloned token to notify other tasks about shutting down gracefully token.cancel(); }); // Wait for tasks to complete task1_handle.await.unwrap() }
当使用 Cancellation Token,你不必在 token 取消时立刻去关闭任务。相反,您可以在终止任务之前运行关机流程,比如刷新数据到一个文件或者数据库中,或发送一个关闭消息到一个连接中。
等待应用其他部分关闭
一旦您告知任务关闭,你需要等待它们完成关机流程。一个简单的方法是使用任务追踪(task tracker)。一个任务追踪器是任务的集合。任务追踪器的wait方法提供了一个 future,只有在所有任务的 future 都已经解析,并任务追踪器已经关闭后,才会进行解析。
下面的示例会生成 10 个任务,然后使用任务追踪器来等待它们关机。
use std::time::Duration; use tokio::time::sleep; use tokio_util::task::TaskTracker; #[tokio::main] async fn main() { let tracker = TaskTracker::new(); for i in 0..10 { tracker.spawn(some_operation(i)); } // 一旦我们已经生成了所有任务,我们关闭追踪器。 tracker.close(); // 等待所有任务完成。 tracker.wait().await; println!("This is printed after all of the tasks."); } async fn some_operation(i: u64) { sleep(Duration::from_millis(100 * i)).await; println!("Task {} shutting down.", i); }
Getting started with Tracing 开始使用 Tracing 日志
tracingcrate 是一个用来检测 Rust 程序收集结构化的,基于事件的诊断信息的框架。
在像 Tokio 这样的异步系统中,解释传统的日志信息可能非常具有挑战性。由于多个任务在同一线程上运行,因此关联的事件和日志行混合在一起,使得跟踪逻辑运行变得困难。tracing扩展了日志式诊断,允许库和应用来记录结构化的事件,以及具有时间性和因果关系的附加信息——与日志消息不同的是,在tracing中的一个Span具有开始和结束时间,可以通过执行流进入和退出,并可以存在于相似跨度的潜逃树中。为了表示某一时刻发生的事情,tracing提供了事件的补充概念。Span和Event都是结构化的,可以记录键入的数据和文本信息。
你可以使用tracing来:
安装
开始,我们需要添加tracing和tracing-subscriber作为依赖:
[dependencies]
tracing = "0.1"
tracing-subscriber = "0.3"
tracingcrate 提供了使用发出追踪的 API。tracing-subscribercrate 提供了一些基本的实用程序,用于将这些追踪转发到外部监听器(比如stdout)。
订阅追踪
如果您正在编写一个可执行文件(而不是库),你需要注册一个追踪订阅者(tracing subscriber)。订阅者是处理应用和依赖发出的跟踪的类型,并可以执行例如计算指标,监视错误以及向外界重新发送跟踪(例如jurnald,stdout,或open-telemetry守护进程)等任务。
大多数情况下,你应该在main函数中注册你的跟踪订阅者越早越好。例如,由tracing-subscriber提供的FmtSubscriber类型会打印格式化的跟踪日志和事件到stdout,并可以像这样注册:
#[tokio::main] pub async fn main() -> mini_redis::Result<()> { // 构造一个订阅者,以向 stdout 打印格式化跟踪日志 let subscriber = tracing_subscriber::FmtSubscriber::new(); // 在此之后,使用这个订阅者来进行触发日志 tracing::subscriber::set_global_default(subscriber)?; ... }
如果你运行这个程序,你可能会看到一些被 Tokio 触发的跟踪事件,但是你需要修改自己的应用来触发跟踪,以充分利用tracing。
订阅者设置
在上面的示例中,我们已经使用默认设置设置了FmtSubscriber。其实,tracing-subscriber也提供了很多方法来设置FmtSubscriber,比如自定义输出格式,包括一些额外信息(比如线程 ID 或源代码位置)在日志中,并把日志写到不是stdout的其他地方。
例如:
#![allow(unused)] fn main() { // 开始设置 `fmt` 订阅者 let subscriber = tracing_subscriber::fmt() // 使用更紧凑的日志格式 .compact() // 显示源代码文件路径 .with_file(true) // 显示源代码所在行数 .with_line_number(true) // 显示我们记录事件发生的线程ID .with_thread_ids(true) // 不要显示事件的目标路径(模块路径) .with_target(false) // 生成订阅者 .finish(); }
有关于配置选项的详细信息,可以看tracing_subscriber::fmt的文档。
除了tracing-subscriber中的FmtSubscriber,其他的Subscriber也可以实现它们自己记录tracing数据的方式。这包含替代输出格式,分析和聚合,以及与其他系统集成,例如分布式跟踪或日志聚合服务。许多 crate 都提供了Subscriber的实现。看这里来获取其他Subscriber的实现(不完整)列表。
最后,某些情况下,将记录跟踪的多种方式组合到一起,构建实现多种行为的Subscriber可能很有用。例如,tracing-subscribercrate 提供了Layertrait,可以表示与其他层组合在一起的订阅者组件。看这里了解使用Layer的细节。
触发 Span 和事件。
最简单触发 span 的方法是使用tracing提供的instrument预处理宏。,这可以重写函数体来触发 span,每次调用的时候都会;例如:
#![allow(unused)] fn main() { #[tracing::instrument] fn trace_me(a: u32, b: u32) -> u32 { a + b } }
每次调用trace_me都会触发tracingSpan:
- 具有详细的
info级别(“中间立场”的程度) - 被命名为
trace_me - 有
a和b的值,是trace_me的参数
instrument属性是可高度配置的;例如,跟踪mini-redis-server中处理每个连接的方法:
#![allow(unused)] fn main() { use tracing::instrument; impl Handler { /// 处理单个连接 #[instrument( name = "Handler::run", skip(self), fields( // `%` 序列化了对等IP地址通过 `Display` trait peer_addr = %self.connection.peer_addr().unwrap() ), )] async fn run(&mut self) -> mini_redis::Result<()> { ... } } }
mini-redis-server现在会出发tracing Span,对于每个传入连接:
- 具有详细的
info级别(“中间立场”的程度) - 被命名为
Hanler::run - 有一些结构化的数据。
fields(...)指示发出 span应该在名为peer_addr字段中包含连接的SocketAddr的fmt::Display表示形式。skip(self)指示发出 span应该不记录Hanler的调试形式。
你还可以通过调用span!宏来手动构建Span,或任何其他级别的宏(error_span!, warn_span!, info_span!, debug_span!, trace_span)。
要触发事件,使用event!宏,或者任何其他级别的宏(error!, warn!, info!, debug!, trace!)。例如,记录客户端发送了格式错误的命令的日志:
#![allow(unused)] fn main() { // 转换 redis frame 到一个指令结构体。 // 如果 frame 不是一个可用的 redis 指令会返回错误 let cmd = match Command::from_frame(frame) { Ok(cmd) => cmd, Err(cause) => { // frame格式不正确无法解析 // 这可能表明客户端有问题 (相对服务端来说) // 所以我们(1)触发一个warning // // 这里的宏语法是由 `tracing` crate 提供的 // 它可被认为类似于 // tracing::warn! { // cause = format!("{}", cause), // "failed to parse command from frame" // }; // `tracing` 提供了结构化的日志, // 所以信息是通过key-value对“记录“的 tracing::warn! { %cause, "failed to parse command from frame" }; // ...然后 (2) 给客户端发回了error响应 Command::from_error(cause) } }; }
如果运行应用,你会看到为其处理的每个传入连接触发的 span 上下文装饰的事件。
Next steps with Tracing 更进一步使用 Tracing 日志
Tokio-console Tokio 控制台
tokio-console是一个类似于 htop 的实用工具,让你可以看到应用程序的 span 和事件的实时视图。它还可以表示 Tokio 运行时创建的“资源”,例如任务。这对于理解开发过程中的性能问题至关重要。
例如,要在mini-redis 项目中使用 Tokio 控制台,你需要在 Tokio 包开启tracingfeature:
# Update the tokio import in your Cargo.toml
tokio = { version = "1", features = ["full", "tracing"] }
注意,full没启用tracing。
你还需要添加console-subscriber包依赖。这个 crate 提供了一个Subscriber实现,可以替换掉 mini-redis 中使用的实现:
# Add this to the dependencies section of your Cargo.toml
console-subscriber = "0.1.5"
最后,在src/bin/server.rs,替换调用tracing_subscriber为console-subscriber:
替换:
#![allow(unused)] fn main() { tracing_subscriber::fmt::try_init()?; }
为:
#![allow(unused)] fn main() { console_subscriber::init(); }
这将会启用console_subscriber,这意味着任何与tokio-console相关的追踪都将被记录。日志仍会被输出到 stdout(基于RUST_LOG环境变量的值)。
现在我们应该准备好再次启动 mini-redis,这次使用tokio_unstableflag(这是启用日志追踪的必要操作):
RUSTFLAGS="--cfg tokio_unstable" cargo run --bin mini-redis-server
tokio_unstableflag 让我们可以使用 Tokio 提供的额外 API,这些 API 目前没有保证稳定性(换句话说,这些 API 允许对它进行重大更改)。
cargo install --locked tokio-console
之后运行:
tokio-console
你将会看到的初始时图是正在运行的 tokio 任务。
例如:

它还可以在任务完成后展示一段时间(这些任务将被显示为灰色)。您可以通过运行 mini-redis hello world 示例(这在mini-redis 存储库中可以找到):
cargo run --example hello_world
如果你按下r,你会进入资源视图。这展示了信号量,锁,和其他属于 TOkio 运行时的结构。例如:

每当你需要查看 Tokio 运行时内部来更好地理解你的应用程序性能时,你可以使用 tokio-console 来查看正在发生什么,帮助你发现死锁和其他的状况。
了解更多如何使用 tokio-console,请查看文档页面。
与 OpenTelemetry 集成
OpenTelemetry (OTel)有多种含义;首先,这是一个开放规范,定义满足大多数用户的日志跟踪和指标的数据类型。这也是一系列特定语言的 SDK,提供检测,便于从应用中发出日志跟踪和指标。第三点,还有 OpenTelemetry Collector,一个与你的应用一起运行的二进制文件,用于收集日志跟踪和指标,最后将其推送给 telemetry vendor(遥测供应商),比如 DataDog,Honeycomb 或者 AWS X-Ray,它也可以将数据发送到 Prometheus 等工具。
Opentelemetry crate提供了 Rust 的 OpenTelemetry SDK,也就是我们本教程中使用的。
本教程中,我们将设置 mini-redis 将数据发送到Jaeger,这是一个用于可视化追踪的 UI。‘
这是一个 Jaeger 的运行实例,你可以使用 Docker:
docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest
你可以通过http://localhost:16686来访问 Jaeger 页面。它启动后长这样:

生成并发送一些跟踪数据后,我们将返回此页面。
要设置 mini-redis,我们首先需要添加一些依赖项。使用以下内容更新你的 Cargo.toml:
# Implements the types defined in the Otel spec
opentelemetry = "0.17.0"
# Integration between the tracing crate and the opentelemetry crate
tracing-opentelemetry = "0.17.2"
# Allows you to export data to Jaeger
opentelemetry-jaeger = "0.16.0"
现在,在src/bin/server.rs中添加以下导入:
#![allow(unused)] fn main() { use opentelemetry::global; use tracing_subscriber::{ fmt, layer::SubscriberExt, util::SubscriberInitExt, }; }
我们稍后来看它们各自的作用。
接下来使用 OTel 替换tracing_subscriber。
替换:
#![allow(unused)] fn main() { tracing_subscriber::fmt::try_init()?; }
为:
#![allow(unused)] fn main() { // Allows you to pass along context (i.e., trace IDs) across services global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); // Sets up the machinery needed to export data to Jaeger // There are other OTel crates that provide pipelines for the vendors // mentioned earlier. let tracer = opentelemetry_jaeger::new_pipeline() .with_service_name("mini-redis") .install_simple()?; // Create a tracing layer with the configured tracer let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); // The SubscriberExt and SubscriberInitExt traits are needed to extend the // Registry to accept `opentelemetry (the OpenTelemetryLayer type). tracing_subscriber::registry() .with(opentelemetry) // Continue logging to stdout .with(fmt::Layer::default()) .try_init()?; }
现在你可以启动 mini-redis:
cargo run --bin mini-redis-server
在另外一个终端,运行 hello world 例子(这在mini-redis 存储库中可以找到):
cargo run --example hello_world
现在,刷新打开的 Jaeger UI,然后在主搜索页面上,找到“mini-redis”服务下拉列表的选项/
选择该选项,点击“Find Traces”按钮。将会展示我们通过运行示例发出的请求。

单击跟踪会显示在处理 hello world 示例期间发出的 span 的详细视图。

就是这样!你可以通过发送更多请求,为 mini-redis 添加额外的工具或遥测供应商(而不是我们在本地运行的 Jaeger 实例)设置 OTel 来进一步探索,对于最后一个,你可能需要引入一个额外的 crate(例如,为了把数据发送到 OTel Collector,你需要opentelemetry-tolp crate)。在opentelemetry-rust 存储库中有很多可用的实例。
注意 mini-redis 仓库已经包含了使用 AWS X-Ray 的 OpenTelemetry 的完整实例,详细信息可以在README,以及Cargo.toml和src/bin/server.rs文件中找到。
Unit Testing 单元测试
本页的目的是提供有关如何在异步应用中写单元测试的建议。
测试中暂停和恢复时间
有时,异步代码可以通过tokio::time::sleep或等待tokio::time::Interval::tick来显式等待。当单元测试开始运行的非常缓慢时,基于时间的测试(例如,指数避退)可能变得非常麻烦。然而,在内部 tokio 的时间相关功能支持暂停和恢复时间。任何提前准备好的时间相关的 future 都有暂停时间的效果。与时间相关的 future 的提前解决的条件是没有更多其他可能提前就绪的 future。当唯一的 future 与时间相关时,这在本质上是时间的快进:
#![allow(unused)] fn main() { #[tokio::test] async fn paused_time() { tokio::time::pause(); let start = std::time::Instant::now(); tokio::time::sleep(Duration::from_millis(500)).await; println!("{:?}ms", start.elapsed().as_millis()); } }
这代码在正常的机器上会输出0ms。
对于单元测试来说,整个过程中保持时间暂停通常很有用。可以通过调用start_paused宏设置为true来实现:
#![allow(unused)] fn main() { #[tokio::test(start_paused = true)] async fn paused_time() { let start = std::time::Instant::now(); tokio::time::sleep(Duration::from_millis(500)).await; println!("{:?}ms", start.elapsed().as_millis()); } }
查看 tokio::test "设置运行时来让时间暂停" 以获取详细信息。
当然,即使使用不同时间相关 future,未来解析的时间顺序也不会改变:
#![allow(unused)] fn main() { #[tokio::test(start_paused = true)] async fn interval_with_paused_time() { let mut interval = interval(Duration::from_millis(300)); let _ = timeout(Duration::from_secs(1), async move { loop { interval.tick().await; println!("Tick!"); } }) .await; } }
这段代码正好打印 4 次Tick!。
使用AsyncRead和AsyncWrite进行模拟
异步读写的通用 trait(AsyncRead和AsyncWrite)是用来被实现的,比如,套接字。它们可以用于模拟套接字执行的 I/O。
让我们考虑一个 TCP 服务端循环:
use tokio::net::TcpListener; #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap(); loop { let Ok((mut socket, _)) = listener.accept().await else { eprintln!("Failed to accept client"); continue; }; tokio::spawn(async move { let (reader, writer) = socket.split(); // 运行一些客户端连接句柄, 比如: // handle_connection(reader, writer) // .await // .expect("Failed to handle connection"); }); } }
这里,每个 TCP 客户端连接都由专用的 tokio 任务提供了服务。该任务有一个 reader 和一个 writer,它们是从TcpStream分离(split)出来的。
考虑实际客户端句柄任务,尤其是函数签名的where句:
#![allow(unused)] fn main() { use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; async fn handle_connection<Reader, Writer>( reader: Reader, mut writer: Writer, ) -> std::io::Result<()> where Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin, { let mut line = String::new(); let mut reader = BufReader::new(reader); loop { if let Ok(bytes_read) = reader.read_line(&mut line).await { if bytes_read == 0 { break Ok(()); } writer .write_all(format!("Thanks for your message.\r\n").as_bytes()) .await .unwrap(); } line.clear(); } } }
本质上,实现AsyncRead和AsyncWrite的 reader 和 writer 都是按顺序提供服务的。对于每个给定的行,句柄都会回复"Thanks for your message"。
要对客户端的连接句柄进行单元测试,我们需要使用tokio_test::io::Builder进行模拟:
#![allow(unused)] fn main() { #[tokio::test] async fn client_handler_replies_politely() { let reader = tokio_test::io::Builder::new() .read(b"Hi there\r\n") .read(b"How are you doing?\r\n") .build(); let writer = tokio_test::io::Builder::new() .write(b"Thanks for your message.\r\n") .write(b"Thanks for your message.\r\n") .build(); let _ = handle_connection(reader, writer).await; } }