Spawning 生成任务
现在我们来实现 Redis 服务端。
首先,把上一节客户端SET/GET代码移动到 example 文件中。这样我们就可以在服务器上运行它。
$ mkdir -p examples
$ mv src/main.rs examples/hello-redis.rs
然后新建一个空文件src/main.rs,并继续。
接收套接字
我们 Redis 服务器要做的第一件事就是接收 TCP 套接字。这是通过绑定tokio::net::TcpListener到6379端口来完成的。
info Tokio 很多类型名与 Rust 标准库中同步类型相同。在合理情况(make sense)下,Tokio 暴露了与 std 相同的 API,但是使用
async fn。
然后在一个 loop 循环中接收套接字。每个套接字都会被处理然后关闭。现在,我们将会读取指令,将其打印到 stdout 并响应一个 error。
src/main.rs
use tokio::net::{TcpListener, TcpStream}; use mini_redis::{Connection, Frame}; #[tokio::main] async fn main() { // 把listener绑定到这个地址 let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); loop { // 第二个值包含了新连接的IP和端口 let (socket, _) = listener.accept().await.unwrap(); process(socket).await; } } async fn process(socket: TcpStream) { // 这个 `Connection` 让我们读/写Redis **帧(frames)** 而不是 // 比特流. 这个 `Connection` 类型是由 mini-redis 定义的。 let mut connection = Connection::new(socket); if let Some(frame) = connection.read_frame().await.unwrap() { println!("GOT: {:?}", frame); // 响应一个error let response = Frame::Error("unimplemented".to_string()); connection.write_frame(&response).await.unwrap(); } }
现在,运行这个接收循环:
$ cargo run
在独立的终端窗口,运行hello-redis例子(之前章节写的SET/GET命令):
$ cargo run --example hello-redis
输出应该是:
Error: "unimplemented"
服务端的终端窗口,输出应该是:
GOT: Array([Bulk(b"set"), Bulk(b"hello"), Bulk(b"world")])
并发
我们的服务端有个小问题(除了仅仅响应 error 之外)。它一次只能处理一个入站请求。当一个连接被接收,服务端将会停留在接收循环里,直到响应完全写入套接字。
我们希望 Redis 服务端可以处理许多并发请求。所以我们需要添加一些并发功能。
info 并发和并行不是一回事。如果你在两个任务之间交替,那你就是并发地处理两个任务,而不是并行的。如果是并行的话,你需要两个人,每个人专门负责一个任务。
使用 Tokio 的一个优点就是,异步代码允许您同时处理多个任务,而不用使用普通线程并行得处理它们。事实上,Tokio 可以在单个线程上运行多个任务!
为了同时处理连接,将会为每个入站连接生成一个新任务。连接会在此任务上进行处理。
接收循环变成下面这样:
use tokio::net::TcpListener; #[tokio::main] async fn main() { let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap(); loop { let (socket, _) = listener.accept().await.unwrap(); // 每个传入的套接字都会被生成一个新任务。套接字(的所有权) // 被移动到新任务并且在那儿处理。 tokio::spawn(async move { process(socket).await; }); } }
任务
一个 Tokio 任务就是一个异步绿色线程。它们是通过async块传递给tokio::spawn来创建的。tokio::spawn函数返回一个JoinHandle,调用者可以使用它来与生成的任务进行交互。async块可以有返回值。调用者可以使用.await作用于JoinHandle上,来获取返回值。
例如:
#[tokio::main] async fn main() { let handle = tokio::spawn(async { // 做一些异步任务 "return value" }); // 做一些其他任务 let out = handle.await.unwrap(); println!("GOT {}", out); }
等待JoinHandle返回一个Result。当一个任务执行中发生错误,JoinHandle会返回一个Err。任务 Panic 或者因为运行时关闭而被强制取消时,会发生这种情况。
任务是调度器管理的执行单元。生成任务会提交给 Tokio 调度器,然后调度器会确保这个任务在有工作时执行。生成的任务可以在生成它的同一个线程上执行,也可以在不同的线程上执行。任务生成后也可以在线程间移动。
Tokio 中的任务非常轻量。在底层,它们只需要一次性分配 64 字节内存。应用应该可以直接生成数千计,甚至数百万的任务。
'static bound
当在 Tokio 运行时生成一个任务时,它的类型生命周期必须是'static的。这意味着生成的任务不能包含任何任务外的数据的引用。
info 普遍认为
'static意味着“永远存在”,但事实并非如此。仅仅因为一个'static值不意味着存在内存泄漏。你可以在Rust 生命周期误会中阅读更多。
例如,下面的代码不能通过编译:
use tokio::task; #[tokio::main] async fn main() { let v = vec![1, 2, 3]; task::spawn(async { println!("Here's a vec: {:?}", v); }); }
尝试编译会导致以下错误:
error[E0373]: async block may outlive the current function, but
it borrows `v`, which is owned by the current function
--> src/main.rs:7:23
|
7 | task::spawn(async {
| _______________________^
8 | | println!("Here's a vec: {:?}", v);
| | - `v` is borrowed here
9 | | });
| |_____^ may outlive borrowed value `v`
|
note: function requires argument type to outlive `'static`
--> src/main.rs:7:17
|
7 | task::spawn(async {
| _________________^
8 | | println!("Here's a vector: {:?}", v);
9 | | });
| |_____^
help: to force the async block to take ownership of `v` (and any other
referenced variables), use the `move` keyword
|
7 | task::spawn(async move {
8 | println!("Here's a vec: {:?}", v);
9 | });
|
这种情况是因为在默认情况下,变量不会移动move到异步块中。v vector 仍被main函数所拥有。println!行借用了v。Rust 编译器向我们解释了这一点,甚至给出了如何修改的建议!将第 7 行更改为task::spawn(async move {将会让编译器把v``移动move到生成的任务中。现在,这个任务拥有了它自己的数据,使其成为'static的。
如果必须同时从多个任务访问单个数据,那就必须使用Arc等同步原语来共享。
注意,刚刚的错误信息也提到了参数类型比'static生命周期活得更长。这种术语可能相当令人困惑,因为'static生命周期会持续到程序结束,所以如果比它活得还长,是不是出现了内存泄漏?解释是,它是个类型,不是必须比'static活得更长的值,并且这个值有可能在类型不再有效之前被摧毁。
当我们说一个值是'static时,意思是永远保持该值不是错误的。这很重要,因为编译器无能推断新生成的任务会持续多长时间。我们必须确保任务能够永远活着,这样 Tokio 就可以让任务运行它需要长的时间。
Info 框链接的文章使用了术语“受'static限制”而不是“它的类型活得比'static长”或者“值为'static,引用于T: 'static”。之前这些都是一个意思,但是与&'static T中的'static注解不同(这是个引用的生命周期,前面的不是)。
Send bound
由tokio::spawn生成的任务必须实现了Send。这使得Tokio运行时可以在线程之间移动任务,同时在.await处挂起任务。
当所有通过调用.await的数据都是Send的,任务才能是Send的。这有点微妙。当.await被调用,任务返回给调度器。下一次任务被执行时,它将会从上次返回时恢复。为了实现这种功能,任务必须保存.await之后的所有状态。如果这个状态是Send的,即可以跨线程移动,那么任务本身就可以跨线程移动。相反的,如果状态没实现Send,那任务也不是Send的。
例如,这个可以运行:
use tokio::task::yield_now; use std::rc::Rc; #[tokio::main] async fn main() { tokio::spawn(async { // 这歌代码块强制 `rc` 在 `.await` 之前 drop 了 { let rc = Rc::new("hello"); println!("{}", rc); } // `rc` 没再用。 当任务返回给(yield)调度器,它**没再**持续 yield_now().await; }); }
这个不行:
use tokio::task::yield_now; use std::rc::Rc; #[tokio::main] async fn main() { tokio::spawn(async { let rc = Rc::new("hello"); // `rc` 在 `.await` 之后使用了,它必须在任务状态中持续 yield_now().await; println!("{}", rc); }); }
尝试编译这段代码会导致:
error: future cannot be sent between threads safely
--> src/main.rs:6:5
|
6 | tokio::spawn(async {
| ^^^^^^^^^^^^ future created by async block is not `Send`
|
::: [..]spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ---- required by this bound in
| `tokio::task::spawn::spawn`
|
= help: within `impl std::future::Future`, the trait
| `std::marker::Send` is not implemented for
| `std::rc::Rc<&str>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:10:9
|
7 | let rc = Rc::new("hello");
| -- has type `std::rc::Rc<&str>` which is not `Send`
...
10 | yield_now().await;
| ^^^^^^^^^^^^^^^^^ await occurs here, with `rc` maybe
| used later
11 | println!("{}", rc);
12 | });
| - `rc` is later dropped here
我们将会在下一章节更深入探讨此类特殊情况。
存储值
我们现在将会实现process函数来处理传入的指令。我们将会用一个HashMap来存储值。SET指令会插入到HashMap中,GET会加载它们。此外,我们会使用循环来为每个连接接收多条指令。
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Connection, Frame}; async fn process(socket: TcpStream) { use mini_redis::Command::{self, Get, Set}; use std::collections::HashMap; // HashMap用来存储值 let mut db = HashMap::new(); // 由 `mini-redis` 提供的 Connection ,处理解析从套接字传过来的帧 // (handles parsing frames from the socket) let mut connection = Connection::new(socket); // 使用 `read_frame` 来从连接中接收一条指令。 while let Some(frame) = connection.read_frame().await.unwrap() { let response = match Command::from_frame(frame).unwrap() { Set(cmd) => { // 值被存储为 `Vec<u8>` db.insert(cmd.key().to_string(), cmd.value().to_vec()); Frame::Simple("OK".to_string()) } Get(cmd) => { if let Some(value) = db.get(cmd.key()) { // `Frame::Bulk` 期望数据类型是 `Bytes`。 // 这种类型在本教程中稍后介绍。现在, // `&Vec<u8>` 可以使用 `into()` 转换为 `Bytes`。 Frame::Bulk(value.clone().into()) } else { Frame::Null } } cmd => panic!("unimplemented {:?}", cmd), }; // 写回响应,传回给客户端 connection.write_frame(&response).await.unwrap(); } } }
现在,启动服务端:
$ cargo run
然后在另外的终端窗口,运行hello-redis例子:
$ cargo run --example hello-redis
现在,输出应该是:
got value from the server; result=Some(b"world")
现在,我们可以获取和设置值,但还有个问题:连接之间,值不能被共享。如果另一个套接字连接,并尝试GET hellokey,它将找不到任何内容。
你可以在这里找到完整代码。
下一节,我们将会为所有套接字实现持久数据。