Framing 解析帧
现在我们将为 Mini-Redis 框架层实现我们刚刚学过的 I/O 知识。获取字节流,并把它转换为帧流的过程叫解析帧。一帧就是两个对等点(peer)之间的传输单元。Redis 协议帧定义如下:
#![allow(unused)] fn main() { use bytes::Bytes; enum Frame { Simple(String), Error(String), Integer(u64), Bulk(Bytes), Null, Array(Vec<Frame>), } }
注意观察该帧仅由数据组成,没有任何语义。指令的解析和执行发生在更高的层次。
对于 HTTP 来说,一帧可能长这样:
#![allow(unused)] fn main() { enum HttpFrame { RequestHead { method: Method, uri: Uri, version: Version, headers: HeaderMap, }, ResponseHead { status: StatusCode, version: Version, headers: HeaderMap, }, BodyChunk { chunk: Bytes, }, } }
为了给 Mini-Redis 实现数据帧,我们将先实现Connection结构体,包含了TcpStream和读写mini_redis::Frame的值。
#![allow(unused)] fn main() { use tokio::net::TcpStream; use mini_redis::{Frame, Result}; struct Connection { stream: TcpStream, // ... 其他成员变量 } impl Connection { /// 从连接中读取一帧 /// /// 遇到 EOF 返回 `None` pub async fn read_frame(&mut self) -> Result<Option<Frame>> { // 在这里实现 } /// 向连接中写入一帧 pub async fn write_frame(&mut self, frame: &Frame) -> Result<()> { // 在这里实现 } } }
你可以在这儿找到有关 Redis 有线协议的详情。完整的Connection代码在这儿。
读取缓冲区
read_frame方法在返回前等待接收整个数据帧。对TcpStream::read()单次调用可能返回任意数量的数据。它可能有一整个帧,一部分帧或者多个帧。如果只接收到一部分帧,则传到缓冲区,再从套接字读取更多数据。如果接收到多个帧,则返回第一帧,其他数据存到缓冲区,直到下次调用read_frame。
如果还没创建connection.rs,这样创建:
touch src/connection.rs
为了实现这种功能,连接(Connection)需要一个读缓冲区。从套接字读取到数据会存到读缓冲区。当解析了一帧时,缓冲区中相应的数据就会删除。
我们将使用BytesMut来作为缓冲区类型,它是Bytes的可变版本。
#![allow(unused)] fn main() { use bytes::BytesMut; use tokio::net::TcpStream; pub struct Connection { stream: TcpStream, buffer: BytesMut, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream, // 为缓冲区开辟4kb的容量 buffer: BytesMut::with_capacity(4096), } } } }
接下来,我们实现read_frame()方法。
#![allow(unused)] fn main() { use tokio::io::AsyncReadExt; use bytes::Buf; use mini_redis::Result; pub async fn read_frame(&mut self) -> Result<Option<Frame>> { loop { // 尝试从缓冲区解析一个数据帧。如果缓冲区中有足够的数据, // 就返回数据帧 if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); } // 缓冲区中没有足够的数据组成一帧。 // 尝试从套接字中读更多数据。 // // 如果成功,返回字节的数量。 // 返回 `0` 表示 “读到了流的末尾” if 0 == self.stream.read_buf(&mut self.buffer).await? { // 远程连接关闭。对这情况是一种干净的关闭,在读缓冲区中应该 // 没有数据了。如果还有数据,意味着传输一帧的同时,对等点peer // 关闭了套接字。 if self.buffer.is_empty() { return Ok(None); } else { return Err("connection reset by peer".into()); } } } } }
分析一下代码。read_frame方法在循环体中运行。首先,调用self.parse_frame()。这会尝试从self.buffer中解析一个 Redis 帧。如果有足够的数据可以解析成一帧,就把该帧返回。否则,尝试从套接字中读取更多数据到缓冲区中。读取到更多数据之后,parse_frame()再次被调用。这回,如果接收到足够的数据,解析或许就会成功。
当从流中读取时,返回了0表示我们不会从对方接收到更多数据。如果缓冲区中仍有数据,说明是接收到了部分帧,但是连接突然终止了。这是一个错误情况需要返回Err。
Buf特征
从流中读取时,调用了read_buf,这个读取函数采用了实现bytescrate 中BufMut的值。
首先,考虑使用read()实现相同的读取循环。Vec<u8>可以用来替代BytesMut。
#![allow(unused)] fn main() { use tokio::net::TcpStream; pub struct Connection { stream: TcpStream, buffer: Vec<u8>, cursor: usize, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream, // 为缓冲区开辟4kb的容量 buffer: vec![0; 4096], cursor: 0, } } } }
为Connection实现read_frame():
#![allow(unused)] fn main() { use mini_redis::{Frame, Result}; pub async fn read_frame(&mut self) -> Result<Option<Frame>> { loop { if let Some(frame) = self.parse_frame()? { return Ok(Some(frame)); } // 确保buffer有足够容量 if self.buffer.len() == self.cursor { // 给buffer扩容 self.buffer.resize(self.cursor * 2, 0); } // 从流中读取到buffer中,记录读了多少字节 let n = self.stream.read( &mut self.buffer[self.cursor..]).await?; if 0 == n { if self.cursor == 0 { return Ok(None); } else { return Err("connection reset by peer".into()); } } else { // 更新指针 self.cursor += n; } } } }
当用字节数组读时,我们必须维护一个指针,跟踪已使用的缓冲区数量。必须保证把缓冲区的空部分传给read()。否则,我们会覆盖已经缓冲了的数据。如果缓冲区满了,必须扩容缓冲区才能继续读取。在parse_frame()中(不包括在内),我们需要解析包含在self.buffer[..self.cursor]中的数据。
由于将字节数组和指针搭配使用非常常见,bytecrate 提供了表示字节数组和指针的抽象。Buf特征可以被读取数据的类型实现。BufMut特征可以被写入数据的类型实现。当把T: BufMut传递给read_buf()时,缓冲区的内部指针就会由read_buf自动更新。正因如此,在我们之前写的read_frame中,我们不需要管理自己的指针。
解析
现在,让我们实现parse_frame()函数。解析由两步组成:
- 确保缓冲区中有一个完整的帧,并找到帧的末尾索引。
- 解析帧。
mini-redis crate 为每一步都提供了对应的函数:
我们将会再用Buf抽象来获取帮助。Buf被传递到Frame::check中。check会遍历缓冲区,内部指针将会前进。当check函数返回时,缓冲区内部指针指向帧的末尾。
对于Buf类型,我们使用std::io::Cursor<&[u8]>
#![allow(unused)] fn main() { use mini_redis::{Frame, Result}; use mini_redis::frame::Error::Incomplete; use bytes::Buf; use std::io::Cursor; fn parse_frame(&mut self) -> Result<Option<Frame>> { // 创建 `T: Buf` 类型. let mut buf = Cursor::new(&self.buffer[..]); // 检查是否已有一个完整的帧 match Frame::check(&mut buf) { Ok(_) => { // 获取帧的字节长度 let len = buf.position() as usize; // 重置内部指针位置,因为调用了 `parse` buf.set_position(0); // 解析帧 let frame = Frame::parse(&mut buf)?; // 丢弃缓冲区中的帧 self.buffer.advance(len); // 返回解析后的帧 Ok(Some(frame)) } // 缓冲区中没有足够数据 Err(Incomplete) => Ok(None), // 遇到了错误 Err(e) => Err(e.into()), } } }
完整的Frame::check函数可以在这儿找到。这里不会完整介绍它。
需要注意的是,相关事项使用了Buf的“字节迭代器”风格 API。它们获取数据并移动内部指针。例如,解析一帧,第一个字节被检查来确定类型。使用了Buf::get_u8函数。这会获取到当前指针位置的字节并让指针前进一次。
Buf特征也有很多其他实用方法。查看API 文档获取更多细节。
缓冲写入(Buffered writes)
解析帧的另一部分就是write_frame(frame)函数。这个函数把整个帧写入套接字。为了最小地调用write,我们使用缓冲写入。维护一个写入缓冲区,帧在写入套接字之前需要先编码到这个缓冲区。然而,不像read_frame(),整个帧并不总是缓存到字节数组中。
考虑到一些流中的帧。正在写入的值是Frame::Bulk(Bytes)。这些帧线性排列,有一个帧头,它由$符后跟了整个数据长度那么多的字节。帧大部分内容都是Bytes值。如果数据很大,把它复制到缓冲区的成本很高。
为了实现缓冲写入,我们需要BufWriter结构体。该结构体使用T: AsyncWrite初始化并实现了AsyncWrite。当在BufWriter上调用write时,写入不会直接写入writer,而是写入缓冲区。当缓冲区满了,内容将会被刷新到writer,然后缓冲区清空。还有一些特殊的优化用来绕过缓冲区,在特定的情况下。
在本教程,我们不会实现完整的write_frame()。完整实现在这儿。
首先,更新Connection结构体:
#![allow(unused)] fn main() { use tokio::io::BufWriter; use tokio::net::TcpStream; use bytes::BytesMut; pub struct Connection { stream: BufWriter<TcpStream>, buffer: BytesMut, } impl Connection { pub fn new(stream: TcpStream) -> Connection { Connection { stream: BufWriter::new(stream), buffer: BytesMut::with_capacity(4096), } } } }
接下来,实现write_frame():
#![allow(unused)] fn main() { use tokio::io::{self, AsyncWriteExt}; use mini_redis::Frame; async fn write_frame(&mut self, frame: &Frame) -> io::Result<()> { match frame { Frame::Simple(val) => { self.stream.write_u8(b'+').await?; self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Error(val) => { self.stream.write_u8(b'-').await?; self.stream.write_all(val.as_bytes()).await?; self.stream.write_all(b"\r\n").await?; } Frame::Integer(val) => { self.stream.write_u8(b':').await?; self.write_decimal(*val).await?; } Frame::Null => { self.stream.write_all(b"$-1\r\n").await?; } Frame::Bulk(val) => { let len = val.len(); self.stream.write_u8(b'$').await?; self.write_decimal(len as u64).await?; self.stream.write_all(val).await?; self.stream.write_all(b"\r\n").await?; } Frame::Array(_val) => unimplemented!(), } self.stream.flush().await; Ok(()) } }
这里使用的函数由AsyncWriteExt提供。它们也能用在TcpStream,但是不建议在没有中间缓冲区的情况下处理单个字节。
write_u8向writer写入一个字节。write_all向writer写入全部。write_decimal是mini-redis实现的方法。
该函数以调用self.stream.flush().await结束。因为BufWriter向中间缓冲区存储了需要写入的内容,所以调用write不能保证数据被写入套接字。Return之前,我们想要帧被写入到套接字中。调用fluse()会将在缓冲区挂起的数据写入到套接字中。
另一种选择是不在write_frame()中调用flush()。取而代之的是,在Connection中提供flush()函数。这将允许调用者写入多个小帧到写缓冲区里,然后通过write系统调用,把他们写入到套接字中。这样会导致ConnectionAPI变得复杂。所以我们决定在fn write_frame()中调用fluse().await。