Channels 管道

现在我们已经了解了一些关于 Tokio 并发的知识,让我们把它应用到客户端。把我们之前写的服务端代码放到独立的二进制文件中。

$ mkdir src/bin
$ mv src/main.rs src/bin/server.rs

并且创建一个新的二进制文件来存放客户端代码:

touch src/bin/client.rs

在这个文件中,我们将会编写本页的代码。每当想要运行它时,你需要先在一个独立的终端窗口启动服务器:

$ cargo run --bin server

然后运行客户端,也是在独立的终端窗口:

$ cargo run --bin client

完成之后,让我们开始 coding!

假设我们要并发地运行两个 Redis 指令。我们可以为每个指令生成一个任务。然后这两个命令将并发处理。

首先,我们可能这样写:

use mini_redis::client;

#[tokio::main]
async fn main() {
    // 与服务器建立连接
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // 产生两个任务,一个get一个key,另一个set一个key
    let t1 = tokio::spawn(async {
        let res = client.get("foo").await;
    });

    let t2 = tokio::spawn(async {
        client.set("foo", "bar".into()).await;
    });

    t1.await.unwrap();
    t2.await.unwrap();
}

这个不会编译成功,因为两个任务可能在随机时刻访问client连接。但是Client并没有实现Copy,因此如果没有一些代码来让它共享,就不能编译成功。此外,Client::set参数类型是&mut self的,这意味着调用它需要独占访问权限。我们可以为每个任务单独开一个连接,但是这样并不理想。我们不能用std::sync::Mutex,因为调用.await时需要持有锁。我们可以用tokio::sync::Mutex,但是这只能让我们处理单个请求。如果客户端实现了流水线,异步锁会导致连接的利用率不足。

消息传递 Message passing

解决办法是用消息传递。该模式可以生成一个专用任务来处理client资源。希望发出请求的任务都可以发送一个消息到client任务。然后client任务代表这些发送者发出请求,并把响应返回来给发送者。

用这种方式,建立单个连接。管理client的任务能够获得独占的访问权限,以便于调用getset。此外,这个管道还可以充当缓冲区。在client任务忙时,也可能有操作被发送到client任务。一旦client任务空闲,可以处理新的请求,它就会从管道中接收下一个请求。这可以带来更好的吞吐量,并且可以扩展支持连接池。

Tokio管道原语

Tokio提供了许多管道,每一种都有不同的用处。

  • mpsc:多个生产者,单个消费者管道。可以向管道发送很多值。
  • oneshot:单个生产者,单个消费者管道。一次只能发送一个值。
  • broadcast:多个生产者,多个消费者管道。可以向管道发送很多值,每个接收者都能看到每个被发送到值。
  • watch:多个生产者,多个消费者管道。可以向管道发送很多值,但是没有历史记录。接收者只能看到最新的值。

如果你需要一个多生产者多消费者管道,其中只能有一个消费者看到每一条消息,你可以使用async-channel库。还有一些在异步Rust以外用的库,比如std::sync::mpsccrossbeam::channel。这些管道通过阻塞当前线程来等待消息,这在异步代码中是不允许的。

在本节课中,我们将会使用mpsconeshot。其他类型的消息传递通道将会在后续章节中探讨。本节的完整代码可以在这里找到。

定义消息类型

大多数情况下,当使用消息传递时,接收消息的任务会响应多个指令。在我们的例子中,此任务会对GETSET指令做出响应。对此建立模型,我们首先定义一个Command枚举,并包含每个指令类型的变体。

#![allow(unused)]
fn main() {
use bytes::Bytes;

#[derive(Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        val: Bytes,
    }
}
}

创建管道

main函数中,创建一个mpsc管道。

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    // 创建一个容量为32的新管道
    let (tx, mut rx) = mpsc::channel(32);

    // ... 其他代码一会儿在这儿写
}

mpsc管道是用来**发送(send)**指令到管理redis连接的任务的。多生产者能力可以允许消息从很多任务发送过来。创建通道会返回两个值,一个发送者(sender)和一个接收者(receiver)。这两个句柄(handle)可以分开使用。它们可以被移动(moved)到不同的任务中。

创建的管道容量是32。如果发送消息的速度比接收快,管道就会存储它们。一旦32条消息都被存储到管道里,调用send(...).await就会进入休眠状态(go to sleep),直到接收者移除了(处理了)一条消息。

从多个任务发送是由克隆发送者来完成的。例如:

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let (tx, mut rx) = mpsc::channel(32);
    let tx2 = tx.clone();

    tokio::spawn(async move {
        tx.send("sending from first handle").await.unwrap();
    });

    tokio::spawn(async move {
        tx2.send("sending from second handle").await.unwrap();
    });

    while let Some(message) = rx.recv().await {
        println!("GOT = {}", message);
    }
}

两条消息都被发送到单个接收者句柄。无法克隆mpsc管道的接收者。

当每个发送者都超出生命周期或其他原因被drop了,就不再能向管道中发送更多消息了。此时,接收者调用recv将返回None,这意味着所有的发送者都消失了,管道关闭了。

在我们的管理Redis连接任务的例子中,它知道一旦管道关闭了,Redis连接就可以关了,因为这个连接再也用不到了。

生成管理任务

接下来,生成一个任务处理来自管道中的消息。首先,客户端与Redis建立连接。然后,接收到的指令经由Redis连接发出。

#![allow(unused)]
fn main() {
use mini_redis::client;
// `move` 关键字用来 **移动** `rx` 的所有权到任务中。
let manager = tokio::spawn(async move {
    // 与服务端建立连接
    let mut client = client::connect("127.0.0.1:6379").await.unwrap();

    // 开始接收消息
    while let Some(cmd) = rx.recv().await {
        use Command::*;

        match cmd {
            Get { key } => {
                client.get(&key).await;
            }
            Set { key, val } => {
                client.set(&key, val).await;
            }
        }
    }
});
}

现在,更新两个任务的代码来通过管道发送指令,而不是直接通过Redis连接发送。

#![allow(unused)]
fn main() {
// `Sender` 句柄被移动到任务里. 因为这儿有俩任务,
// 我们需要另一个 `Sender`
let tx2 = tx.clone();

// 生成俩任务,一个get一个key,一个set一个key
let t1 = tokio::spawn(async move {
    let cmd = Command::Get {
        key: "foo".to_string(),
    };

    tx.send(cmd).await.unwrap();
});

let t2 = tokio::spawn(async move {
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
    };

    tx2.send(cmd).await.unwrap();
});
}

main函数底部,我们调用.await来等待连接句柄(join handles),以确保在进程退出之前全部完成这些指令。

#![allow(unused)]
fn main() {
t1.await.unwrap();
t2.await.unwrap();
manager.await.unwrap();
}

接收响应

最后一步,是接收从管理任务中返回的响应。GET指令需要获取到值,SET指令需要知道这个操作是否成功完成。

为了传递响应,使用oneshot管道。oneshot管道是单个生产者,单个消费者管道,针对发送单个值进行了优化。在我们的例子中,单个值就是响应。

mpsc类似,oneshot::channel()返回一个发送者和一个接收者句柄。

#![allow(unused)]
fn main() {
use tokio::sync::oneshot;

let (tx, rx) = oneshot::channel();
}

mpsc不同的地方在于没有指定容量,因为容量就是一。此外,两个句柄都不能克隆。

为了接收从管理任务传过来的响应,在发送指令之前,需要建立一个oneshot管道。管道中的发送者这块被包含在了管理任务的指令中。接收者那块用来接收响应。

首先,更新Command枚举来包含发送者。为了方便,给发送者起个别名。

#![allow(unused)]
fn main() {
use tokio::sync::oneshot;
use bytes::Bytes;

/// 多个不同的命令在单个管道上复用
#[derive(Debug)]
enum Command {
    Get {
        key: String,
        resp: Responder<Option<Bytes>>,
    },
    Set {
        key: String,
        val: Bytes,
        resp: Responder<()>,
    },
}

/// 由请求者提供,并由管理任务来发送指令的响应,返回给请求者
type Responder<T> = oneshot::Sender<mini_redis::Result<T>>;
}

现在,更新发出命令的任务代码,来包含oneshot::Sender

#![allow(unused)]
fn main() {
let t1 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Get {
        key: "foo".to_string(),
        resp: resp_tx,
    };

    // 发送GET请求
    tx.send(cmd).await.unwrap();

    // 等待响应
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});

let t2 = tokio::spawn(async move {
    let (resp_tx, resp_rx) = oneshot::channel();
    let cmd = Command::Set {
        key: "foo".to_string(),
        val: "bar".into(),
        resp: resp_tx,
    };

    // 发送SET请求
    tx2.send(cmd).await.unwrap();

    // 等待响应
    let res = resp_rx.await;
    println!("GOT = {:?}", res);
});
}

最后,更新管理任务,来通过oneshot管道发送响应。

#![allow(unused)]
fn main() {
while let Some(cmd) = rx.recv().await {
    match cmd {
        Command::Get { key, resp } => {
            let res = client.get(&key).await;
            // 使用 `_` 忽略错误
            let _ = resp.send(res);
        }
        Command::Set { key, val, resp } => {
            let res = client.set(&key, val).await;
            // 使用 `_` 忽略错误
            let _ = resp.send(res);
        }
    }
}
}

onsshot::Sender上调用send会立即完成,并且不再需要.await。这是这是因为oneshot管道的send方法会总是会立即失败或成功,不需要等待。 当接收者那部分被drop了,在oneshot通道上发送值就会返回Err。这意味着接收者不再对响应作出反应。在我们的场景中,接收者对接收事件不再响应是合理的。resp.send()返回的Err不需要处理。

你可以在这儿找到完整代码。

对消息通道进行限制

每当引入并发或排队时,确保排队是有界的并且系统能够负载得起的非常重要。无界队列最终将会填满所有可用内容,并导致系统以不可预测的方式崩掉。

Tokyo会关注避免隐式排队。很大异步原因就是因为异步操作是惰性的(lazy)。考虑以下情况:

#![allow(unused)]
fn main() {
loop {
    async_op();
}
}

如果异步操作都马上开始运行,那么这个循环会重复排队一个新的async_op任务,而不会确保之前的任务操作完成。这会导致隐式无界排队。基于回调的系统和基于eager future的系统很容易受到这种情况影响。

然而,Tokio和异步Rust不会让上述代码段中async_op运行。这是因为.await没有被调用。如果代码块使用了.await,那么循环在重新开始时会等待上一次操作完成。

#![allow(unused)]
fn main() {
loop {
    // 在 `async_op` 完成之前不会重复
    async_op().await;
}
}

必须明确地引入并发和队列。执行此操作的方法有:

  • tokio::spawn
  • select!
  • join!
  • mpsc::channel

这样做时,注意确保总的并发量是有限的。例如,当写TCP接收循环时,确保总的打开的套接字(socket)是有限的。当使用mpsc::channel管道,选择管道的容量。具体的限定值取决于是什么应用。

小心地选择良好的边界是编写可靠Tokio应用的重要组成部分。