20最后的项目:构建多线程web server

在本章中,我们将一同构建另一个项目,来展示最后几章所学,同时复习更早的章节。

如下是我们将怎样构建此 web server 的计划:

  1. 学习一些 TCP 与 HTTP 知识
  2. 在套接字(socket)上监听 TCP 请求
  3. 解析少量的 HTTP 请求
  4. 创建一个合适的 HTTP 响应
  5. 通过线程池改善 server 的吞吐量

不过在开始之前,需要提到一点细节:这里使用的方法并不是使用 Rust 构建 web server 最好的方法。crates.io 上有很多可用于生产环境的 crate,它们提供了比我们所要编写的更为完整的 web server 和线程池实现。

然而,本章的目的在于学习,而不是走捷径。因为 Rust 是一个系统编程语言,我们能够选择处理什么层次的抽象,并能够选择比其他语言可能或可用的层次更低的层次。因此我们将自己编写一个基础的 HTTP server 和线程池,以便学习将来可能用到的 crate 背后的通用理念和技术。

涉及知识点

超文本传输协议Hypertext Transfer ProtocolHTTP

传输控制协议Transmission Control ProtocolTCP

  • 都是 请求-响应request-response)协议,
  • 也就是说,有 客户端client)来初始化请求,并有 服务端server)监听请求并向客户端提供响应。请求与响应的内容由协议本身定义。

标准库提供了 std::net 模块处理这些功能。

TcpListener 用于监听 TCP 连接。

  • 具体内容可以查看API文档;
  • bind 函数返回 Result<T, E>,这表明绑定可能会失败;
  • TcpListenerincoming 方法返回一个迭代器,它提供了一系列的流(更准确的说是 TcpStream 类型的流)。

stream)代表一个客户端和服务端之间打开的连接。

连接connection)代表客户端连接服务端、服务端生成响应以及服务端关闭连接的全部请求 / 响应过程。

当客户端连接到服务端时 incoming 方法返回错误是可能的,因为我们实际上没有遍历连接,而是遍历 连接尝试connection attempts)。

stream 在循环的结尾离开作用域并被丢弃,其连接将被关闭,作为 drop 实现的一部分。

读取请求

std::io::prelude 引入作用域来获取读写流所需的特定 trait。

1
fn handle_connection(mut stream: TcpStream) {

handle_connection 中,stream 参数是可变的。这是因为 TcpStream 实例在内部记录了所返回的数据。它可能读取了多于我们请求的数据并保存它们以备下一次请求数据。因此它需要是 mut因为其内部状态可能会改变;通常我们认为 “读取” 不需要可变性,不过在这个例子中则需要 mut 关键字。

HTTP 是一个基于文本的协议,同时一个请求有如下格式:

1
2
3
Method Request-URI HTTP-Version CRLF
headers CRLF
message-body

第一行叫做 请求行request line),它存放了客户端请求了什么的信息。

  • 请求行的第一部分是所使用的 method,比如 GETPOST,这描述了客户端如何进行请求。这里客户端使用了 GET 请求。

  • 请求行接下来的部分是 /,它代表客户端请求的 统一资源标识符Uniform Resource IdentifierURI) —— URI 大体上类似,但也不完全类似于 URL(统一资源定位符Uniform Resource Locators)。URI 和 URL 之间的区别对于本章的目的来说并不重要,不过 HTTP 规范使用术语 URI,所以这里可以简单的将 URL 理解为 URI。

  • 最后一部分是客户端使用的HTTP版本,然后请求行以 CRLF序列 (CRLF代表回车和换行,carriage return line feed,这是打字机时代的术语!)结束。CRLF序列也可以写成\r\n,其中\r是回车符,\n是换行符。 CRLF序列将请求行与其余请求数据分开。 请注意,打印CRLF时,我们会看到一个新行,而不是\r\n

第二行是 headers;从 Host: 开始的其余的行是 headers;GET 请求没有 body。

编写响应

我们将实现在客户端请求的响应中发送数据的功能。响应有如下格式:

1
2
3
HTTP-Version Status-Code Reason-Phrase CRLF
headers CRLF
message-body

第一行叫做 状态行status line),它包含响应的 HTTP 版本、一个数字状态码用以总结请求的结果和一个描述之前状态码的文本原因短语。

CRLF 序列之后是任意 header,另一个 CRLF 序列,和响应的 body。

例如,一个没有body的响应

1
HTTP/1.1 200 OK\r\n\r\n

数据开头增加 b"" 字节字符串语法将其转换为字节字符串。

线程池

通道可以通道任务队列的作用

如何创建并存储空线程?

  • 使用 JoinHandle来存储创建的线程

看一下 thread::spawn的签名

1
2
3
4
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static

spawn返回 JoinHandle<T>T 是闭包返回的类型。不返回有意义的值则使用()

1
2
// thread::spawn() -> JoinHandle<T>;
let thread = thread::spawn(||{});
1
2
3
4
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static

F 是这里我们关心的参数;

要理解F的三个 trait bound的目的,

  • FnOnce() ,线程仅对闭包执行一次,
  • Send 来将闭包从一个线程转移到另一个线程(安全在线程间转移所有权);
  • 'static 是因为并不知道线程会执行多久。
1
2
3
4
5
6
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static
{

}

这里与上面 spawn 部分 FnOnce的不同有一个知识点:

  • 使用FnOnce/FnMut/Fn定义 trait bound时,就像普通定义函数签名时一样
  1. 如果有参数和返回值,需要FnOnce(类型) -> 类型2
  2. 如果没有参数和返回值,或者两者缺一,
    • 没有参数时也需要(),即FnOnce(),函数没有参数时也是这样;
    • 没有返回值时,可以省略返回值,即FnOnce(xxx)(没有返回值时其实就是返回(),不过可以省略);

Vec::with_capacity(size)Vec::new()的区别?

  • Vec::with_capacity(size) 效率要高一些,因为会预先进行分配size个元素的空间。

  • Vec::new 会随着插入元素而重新改变大小;

使用通道向线程发送请求

在线程池中,Worker 结构体能够从 ThreadPool 的队列中获取需要执行的代码,并发送到线程中执行他们。

我们要队列,在这里我们可以简单是使用通道来充当任务队列的作用。

  1. ThreadPool 会创建一个通道,其execute 方法会在通道发送端发出期望执行的任务。
  2. 每个 Worker 将会充当通道的接收端。

注意,mpsc::channel()中,Rust 提供的通道是多生产者,单消费者。

  • 在线程中的接收端是不能传递给多个worker实例的。
  • 所以只能所有worker共享单一的receiver了,同时取出任务需要修改receiver。
    • 这涉及到在多个线程间共享所有权,并允许线程修改其值得问题。
    • 需要使用Arc<Mutex<T>>Arc允许多个线程间共享所有权,Mutex 确保一次只有一个worker能从接收端获取任务并修改。
1
2
3
4
5
6
7
let receiver = Arc::new(Mutex::new(receiver));
...
for id in 0..size {

// 共享所有权需要使用clone方法
workers.push(Worker::new(id,Arc::clone(&receiver)));
}

接收闭包类型的 trait对象的类型别名

1
type Job = Box<dyn FnOnce() + Send + 'static>;

为什么选择loop而不是while let?

我们需要闭包一直循环,向通道的接收端请求任务,并在得到任务时执行他们

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();

println!("Worker {} got a job; executing.", id);

job();
}
});

Worker {
id,
thread,
}
}
}

实现上使用了loop,而不是while let,为什么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {} got a job; executing.", id);

job();
}
});

Worker {
id,
thread,
}
}
}

这段代码可以编译和运行,但是并不会产生所期望的线程行为:一个慢请求仍然会导致其他请求等待执行。– 这里主要因为获取锁的原因;

原因有些微妙:

  • Mutex 结构体没有公有 unlock 方法,因为锁的所有权依赖 lock 方法返回的 LockResult<MutexGuard<T>>MutexGuard<T> 的生命周期。
    • 因为MutexGuard<T>是个智能指针,其实现了Drop trait,当MutexGuard离开作用域时会自动释放锁(lock);
    • 所以借用检查器可以在编译时就确保不会出现被Mutex守护的资源被没有持有锁的情况下被访问。
    • 所以在处理锁的时候,需要考虑lock方法返回值的生命周期,否则可能出现持有锁的时间比预期长的情况,导致性能受损。
  • 问题来了,这与 while let 有什么关系呢?
    • let语句,当let语句结束时,表达式中等号右边的任何临时值都会被立即丢弃。
    • while letif letmatch 直到相关块的末尾才会丢弃临时值。

所以综上,

如果这里使用 while let,一次判断循环,在离开其代码块前,产生的MutexGuard临时值会一直存在,如果有慢请求,worker获取锁后,到执行完job都会一直持有锁,导致无法实现多线程并发。

而使用loop循环,因为let语句在内部,且let语句执行完,MutexGuard就会被丢弃,这样其他worker就可以争夺锁,就可以达到多个线程并发执行了。

优雅停机与清理

为 ThreadPool 实现 Drop trait

当线程池被丢弃时,应该 join 所有线程以确保他们完成其操作。这个在 Drop trait 中实现。

JoinHandler.join 的作用

  • 等待关联的thread结束,如果关联的线程结束了这个方法会立刻返回。
  • 该方法需要获取实例的所有权。
    • 为了解决这个问题,需要一个方法将 thread 移动出拥有其所有权的 Worker 实例以便 join 可以消费这个线程。
    • 如果 Worker 存放的是 Option<thread::JoinHandle<()>,就可以在 Option 上调用 take 方法将值从 Some 成员中移动出来而对 None 成员不做处理。

向线程发送信号使其停止接收任务

修改线程既监听是否有 Job 运行也要监听一个应该停止监听并退出无限循环的信号。

否则,如果没有停止退出无限循环的信号,那么线程将永远不会退出,泽县城永远阻塞在等待第一个线程结束上。

  • 所以通道将发送这个枚举的两个成员之一而不是 Job 实例。

为什么在 Drop trait 实现中不能将发送terminate的循环与获取join的循环放在一起?而是分为两个for循环?

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2022-2023 ligongzhao
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信