Rust中多線程?Web?服務(wù)器的項目實戰(zhàn)
前情提要:http://www.dbjr.com.cn/program/34427748j.htm
單線程 Web 服務(wù)器將依次處理每個請求,這意味著在第一個連接完成處理之前,它不會處理第二個連接。如果服務(wù)器接收到越來越多的請求,那么串行執(zhí)行將越來越不理想。如果服務(wù)器接收到一個需要很長時間來處理的請求,則后續(xù)請求將不得不等待,直到長請求完成,即使新請求可以快速處理。我們需要解決這個問題,但首先我們要看看實際的問題。
模擬慢速請求
我們將了解處理緩慢的請求如何影響對單線程 Web 服務(wù)器實現(xiàn)的其他請求。
我們使用模擬的緩慢響應(yīng)實現(xiàn)了對 /sleep 的請求處理,該響應(yīng)將導(dǎo)致服務(wù)器在響應(yīng)之前休眠 5 s。
use std::{
fs,
io::{BufReader, prelude::*},
net::{TcpListener, TcpStream},
thread,
time::Duration,
};
// --snip--
fn handle_connection(mut stream: TcpStream) {
// --snip--
let (status_line, filename) = match &request_line[..] {
"GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
"GET /sleep HTTP/1.1" => {
thread::sleep(Duration::from_secs(5));
("HTTP/1.1 200 OK", "hello.html")
}
_ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
};
// --snip--
}
新增了一種對 /sleep 請求的響應(yīng),當(dāng)接收到該請求時,服務(wù)器將在呈現(xiàn) hello.html 之前休眠 5 s。
使用 cargo run 啟動服務(wù)器。然后打開兩個瀏覽器窗口:一個用于 127.0.0.1:7878,另一個用于 127.0.0.1:7878/sleep。如果像以前一樣多次輸入 / URI,您將看到它快速響應(yīng)。但是如果你輸入 /sleep,然后加載 /,你會看到 / 等待,直到 sleep 了整整 5 s 才加載。
我們要實現(xiàn)一個線程池,避免慢速請求后面的請求等待。
使用線程池提高吞吐量
線程池是一組正在等待并準(zhǔn)備處理任務(wù)的派生線程。當(dāng)程序接收到一個新任務(wù)時,它將池中的一個線程分配給該任務(wù),該線程將處理該任務(wù)。池中的剩余線程可用于處理在第一個線程正在處理時進入的任何其他任務(wù)。當(dāng)?shù)谝粋€線程完成其任務(wù)的處理后,它將返回到空閑線程池,準(zhǔn)備處理新任務(wù)。線程池允許您并發(fā)地處理連接,從而提高服務(wù)器的吞吐量。
我們將限制池中的線程數(shù)量,因為服務(wù)器的資源是有限的,也保護我們免受 DoS 攻擊。進入的請求被發(fā)送到池中進行處理,線程池將維護一個傳入請求隊列,池中的每個線程將從這個隊列中彈出一個請求,處理該請求,然后向隊列請求另一個請求。使用這種設(shè)計,我們最多可以并發(fā)處理 N 個請求,其中 N 是線程數(shù)。
這種技術(shù)只是提高 Web 服務(wù)器吞吐量的眾多方法之一。其他選項包括 fork/join 模型、單線程異步 I/O 模型和多線程異步 I/O 模型,等等。
初步嘗試:為每個請求生成一個線程
首先,讓我們探索一下,如果為每個連接創(chuàng)建一個新線程,我們的代碼會是什么樣子。正如前面提到的,這不是我們的最終計劃,因為可能會產(chǎn)生無限數(shù)量的線程,但這是一個起點,可以首先獲得一個工作的多線程服務(wù)器。然后我們將添加線程池作為改進,并且比較兩種解決方案會更容易。
在單線程 Web 服務(wù)器的 main 函數(shù)中進行修改:
use std::thread;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
for stream in listener.incoming() {
let stream = stream.unwrap();
thread::spawn(|| {
handle_connection(stream);
});
}
}
thread::spawn 將創(chuàng)建一個新線程,然后在新線程中運行閉包中的代碼。
如果運行這段代碼并在瀏覽器中加載 /sleep,然后在另外兩個瀏覽器選項卡中加載 /,對 / 的請求不必等待 /sleep 完成。然而,正如我們所提到的,這最終將使系統(tǒng)不堪重負,因為你將無限制地創(chuàng)建新線程。
現(xiàn)在,是時候讓 async 和 await 真正發(fā)揮作用了!
實現(xiàn)線程池的定義和函數(shù)聲明
我們的線程池的實現(xiàn)將獨立于我們的 Web 服務(wù)器正在做的工作。
創(chuàng)建一個src/lib.rs,先實現(xiàn)一個 ThreadPool 結(jié)構(gòu)體的定義,以及 ThreadPool::new 函數(shù),其參數(shù) size 表示線程池內(nèi)線程的最大數(shù)量。
pub struct ThreadPool;
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
ThreadPool
}
}
我們將實現(xiàn) execute 函數(shù),它接受給定的閉包,并將其交給池中的空閑線程運行。該函數(shù)類似于標(biāo)準(zhǔn)庫 thread::spawn 函數(shù)。
我們可以將閉包作為具有三個不同特征的參數(shù):Fn、FnMut 和 FnOnce。我們需要決定在這里使用哪種閉包。我們可以看看 thread::spawn 的簽名對它的參數(shù)有什么限制。文檔向我們展示了以下內(nèi)容:
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
F 類型參數(shù)是我們關(guān)心的,T 類型參數(shù)與返回值有關(guān),我們不關(guān)心這個。我們可以看到 spawn 使用 FnOnce 作為 f 上的 trait 約束。因為我們最終將在 execute 中獲得的參數(shù)傳遞給 spawn,并且運行請求的線程只會執(zhí)行該請求的閉包一次,所以 FnOnce 是我們想要使用的 trait。
F 類型參數(shù)也有 Send trait 約束和 static 生命周期約束,這在我們的情況下很有用:我們需要 Send 來將閉包從一個線程轉(zhuǎn)移到另一個線程,而需要 static 是因為我們不知道線程執(zhí)行需要多長時間。讓我們在 ThreadPool 上創(chuàng)建一個execute方法,它將接受 F 類型的泛型參數(shù),并具有以下約束:
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
}
}
我們?nèi)匀辉?FnOnce 之后使用 (),因為這個 FnOnce 表示一個閉包,它不接受參數(shù),返回單元類型 ()。就像函數(shù)定義一樣,返回類型可以從簽名中省略,但即使沒有參數(shù),仍然需要括號。
ThreadPool 結(jié)構(gòu)體的定義和兩個函數(shù)的聲明已經(jīng)完成,使用 ThreadPool 結(jié)構(gòu)體代替 thread::spawn 的假設(shè)接口。
修改 main.rs 中的代碼:
use multi_thread_web_server::ThreadPool;
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming() {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
}
我們使用 ThreadPool::new 創(chuàng)建一個新的線程池,可配置的線程數(shù)為 4 個。然后,在 for 循環(huán)中,pool.execute 有一個類似 thread::spawn 的接口,因為它接受一個閉包,處理每一個 stream。
運行 cargo build,編譯通過了。
驗證 new 中的線程數(shù)
前面我們?yōu)?size 參數(shù)選擇了 unsigned 類型,因為線程數(shù)為負數(shù)的池沒有意義。然而,一個沒有線程的池也沒有意義,所以在返回 ThreadPool 實例之前,我們將添加代碼來檢查 size 是否大于 0,并通過 assert 讓程序在接收到 0 時 panic。
impl ThreadPool {
/// Create a new ThreadPool.
///
/// The size is the number of threads in the pool.
///
/// # Panics
///
/// The `new` function will panic if the size is zero.
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
ThreadPool
}
// --snip--
}
我們還為 ThreadPool 添加了一些文檔和文檔注釋。運行 cargo doc --open,在打開的 HTML 文檔中點擊 ThreadPool 就能查看它的一些介紹信息。

我們也可以將 new 更改為 build 并返回一個 Result,就像下面的定義一樣:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
但是在這種情況下,我們嘗試創(chuàng)建一個沒有任何線程的線程池是不合理的,我們希望在錯誤時 panic。
創(chuàng)建存儲線程的空間
既然我們有辦法知道池中存儲了有效數(shù)量的線程,我們就可以創(chuàng)建這些線程,并在返回結(jié)構(gòu)體之前將它們存儲在 ThreadPool 結(jié)構(gòu)體中。
但是我們?nèi)绾?ldquo;存儲”一個線程呢?讓我們再看一下 thread::spawn 簽名:
pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static,
spawn 函數(shù)返回一個 JoinHandle<T>,其中 T 是閉包返回的類型。我們也使用 JoinHandle,因為我們傳遞給線程池的閉包將處理連接而不返回任何東西,因此 T 將是單元類型 ()。
修改 ThreadPool 的定義,使其包含 thread::JoinHandle<()> 實例的 vector。
use std::thread;
pub struct ThreadPool {
threads: Vec<thread::JoinHandle<()>>,
}
再修改 new 函數(shù),初始化 vector 的容量為 size,設(shè)置 for 循環(huán),運行一些代碼來創(chuàng)建線程,并返回一個包含它們的 ThreadPool 實例。
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut threads = Vec::with_capacity(size);
for _ in 0..size {
// create some threads and store them in the vector
}
ThreadPool { threads }
}
再次運行 cargo build,編譯成功。
負責(zé)將代碼從線程池發(fā)送到線程的 Worker 結(jié)構(gòu)體
標(biāo)準(zhǔn)庫的 thread::spawn 期望得到一些代碼,這些代碼應(yīng)該在線程創(chuàng)建后立即運行。然而,在本例中,我們希望創(chuàng)建線程并讓它們等待稍后發(fā)送的代碼。
我們將通過在 ThreadPool 和管理這種新行為的線程之間引入一個新的數(shù)據(jù)結(jié)構(gòu)來實現(xiàn)這種行為。我們將此數(shù)據(jù)結(jié)構(gòu)稱為 Worker,這是池實現(xiàn)中的一個常用術(shù)語。Worker 獲取需要運行的代碼,并在 Worker 的線程中運行這些代碼。
我們將存儲 Worker 結(jié)構(gòu)的實例,而不是在線程池中存儲 JoinHandle<()> 實例的 vector。每個 Worker 將存儲一個 JoinHandle<()> 實例。然后,我們將在 Worker 上實現(xiàn)一個方法,該方法將接受代碼的閉包來運行,并將其發(fā)送到已經(jīng)運行的線程中執(zhí)行。我們還將為每個 Worker 提供一個 id,以便在進行日志記錄或調(diào)試時區(qū)分池中 Worker 的不同實例。
總結(jié)一下,我們要實現(xiàn)這四件事:
- 定義一個 Worker 結(jié)構(gòu)體,它包含一個 id 和一個 JoinHandle<()>.
- 更改 ThreadPool 的定義,包含一個Worker 實例的 vector,而不是 Vec<thread::JoinHandle<()>>。
- 定義一個 Worker::new 函數(shù),該函數(shù)接受一個 id 號,并返回一個包含該 id 的 Worker 實例和一個由空閉包派生的線程。
- 在 ThreadPool::new 函數(shù)中,使用 for 循環(huán)計數(shù)器生成一個 id,用該 id 創(chuàng)建一個新的 Worker,并將該 Worker 存儲在 vector 中。
use std::thread;
pub struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers }
}
// --snip--
}
struct Worker {
id: usize,
thread: thread::JoinHandle<()>,
}
impl Worker {
fn new(id: usize) -> Worker {
let thread = thread::spawn(|| {});
Worker { id, thread }
}
}
外部代碼不需要知道在 ThreadPool 中使用 Worke r結(jié)構(gòu)體的實現(xiàn)細節(jié),所以我們把 Worker 結(jié)構(gòu)體及其函數(shù)設(shè)為 private。Worker::new 函數(shù)使用我們給它的 id,并存儲一個 JoinHandle<()> 實例,該實例是通過使用空閉包生成一個新線程創(chuàng)建的。
我們將 ThreadPool 上的字段名稱從 threads 更改為 workers,因為它現(xiàn)在保存 Worker 實例而不是 JoinHandle<()> 實例。
注意,如果操作系統(tǒng)因為沒有足夠的系統(tǒng)資源而無法創(chuàng)建線程,thread::spawn 將出現(xiàn) panic,這在實際生產(chǎn)環(huán)境中很危險。實際情況下,我們可以使用 std::thread::Builder 及其派生方法。
這段代碼將編譯并存儲作為 ThreadPool::new 參數(shù)指定的 Worker 實例的數(shù)量。但是我們?nèi)匀粵]有處理在 execute 中得到的閉包。讓我們看看接下來該怎么做。
通過通道向線程發(fā)送請求
我們希望剛剛創(chuàng)建的 Worker 結(jié)構(gòu)體從 ThreadPool 中保存的隊列中獲取要運行的代碼,并將該代碼發(fā)送到其線程中運行。
我們將使用通道作為作業(yè)隊列,execute 將把作業(yè)從 ThreadPool 發(fā)送到 Worker 實例,后者將把作業(yè)發(fā)送到它的線程。計劃如下:
- ThreadPool 將創(chuàng)建一個通道并保持發(fā)送端。
- 每個 Worker 獲取 receiver,作為接收端。
- 我們將創(chuàng)建一個新的 Job 結(jié)構(gòu)體來保存我們想要發(fā)送到通道中的閉包。
- execute 方法將通過發(fā)送端發(fā)送它想要執(zhí)行的作業(yè)。
- 在它的線程中,Worker 將遍歷它的接收者,并執(zhí)行它接收到的所有作業(yè)的閉包。
讓我們首先在 ThreadPool::new 中創(chuàng)建一個通道,并在 ThreadPool 實例中保存發(fā)送端。Job 結(jié)構(gòu)現(xiàn)在還沒有保存任何東西,但它將是我們發(fā)送到通道的項的類型。
use std::{sync::mpsc, thread};
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
struct Job;
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id));
}
ThreadPool { workers, sender }
}
// --snip--
}
在 ThreadPool::new 中,我們創(chuàng)建了一個通道,并讓 ThreadPool 包含 sender。這將成功編譯。
讓我們嘗試在線程池創(chuàng)建通道時將通道的接收器傳遞給每個 Worker。我們知道我們想要在 Worker 實例產(chǎn)生的線程中使用 receiver,所以我們將在閉包中引用 receiver 參數(shù)。
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, receiver));
}
ThreadPool { workers, sender }
}
// --snip--
}
// --snip--
impl Worker {
fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
let thread = thread::spawn(|| {
receiver;
});
Worker { id, thread }
}
}
代碼試圖將 receiver 傳遞給多個 Worker 實例,這是行不通的,因為 Rust 提供的通道實現(xiàn)是多個生產(chǎn)者,單個消費者。這意味著我們不能僅僅克隆通道的消費端(接收端)來修復(fù)此代碼。我們也不想多次向多個消費者發(fā)送消息。我們想要一個包含多個 Worker 實例的消息列表,這樣每個消息都會被處理一次。
此外,從通道隊列中取出作業(yè)涉及到改變 receiver,因此線程需要一種安全的方式來共享和修改 receiver。
為了在多個線程之間共享所有權(quán)并允許線程改變值,我們需要使用 Arc<Mutex<T>> 。Arc 類型將允許多個 Worker 實例擁有 receiver,Mutex 將確保一次只有一個 Worker 從接收器獲得作業(yè)。
use std::{
sync::{Arc, Mutex, mpsc},
thread,
};
// --snip--
impl ThreadPool {
// --snip--
pub fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
// --snip--
}
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
// --snip--
}
}
在 ThreadPool::new 中,我們將接收者置于 Arc<Mutex<>> 中。對于每個新的 Worker,我們克隆 Arc 來增加引用計數(shù),這樣 Worker 實例就可以共享 receiver 的所有權(quán)。
有了這些修改,代碼就可以編譯了。
實現(xiàn) execute 方法
最后讓我們實現(xiàn) ThreadPool::execute 方法。我們還將 Job 從結(jié)構(gòu)體更改為 trait 對象的類型別名,該 trait 對象保存 execute 接收的閉包類型。
// --snip--
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
// --snip--
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).unwrap();
}
}
在使用獲得的閉包創(chuàng)建新 Job 實例之后,我們將該作業(yè)發(fā)送到通道中。在 send 失敗的情況下我們調(diào)用 unwrap。
如果我們停止執(zhí)行所有線程,這意味著接收端已經(jīng)停止接收新消息,就可能發(fā)生這種情況。目前,我們不能停止線程的執(zhí)行:只要池存在,線程就會繼續(xù)執(zhí)行。我們使用 unwrap 的原因是我們知道失敗情況不會發(fā)生,但是編譯器不知道。
但我們還沒有完全完成!在 Worker 中,傳遞給 thread::spawn 的閉包仍然只引用通道的接收端。相反,我們需要閉包永遠循環(huán),向通道的接收端請求作業(yè),并在獲得作業(yè)時運行作業(yè)。讓我們對 Worker::new 函數(shù)進行如下的更改。
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
在這里,我們首先在 receiver 上調(diào)用 lock 來獲取互斥鎖,然后調(diào)用 unwrap 來在出現(xiàn)錯誤時發(fā)出警報。
如果互斥鎖處于鎖定狀態(tài),獲取鎖可能會失敗,如果其他線程在持有鎖而不是釋放鎖時 panic,就會發(fā)生這種情況。在這種情況下,調(diào)用 unwrap 使該線程 panic 是正確的操作。你也可以將此 unwrap 更改為 expect,并顯示有意義的錯誤消息。
如果我們獲得了互斥鎖,我們調(diào)用 recv 從通道接收 Job。如果持有發(fā)送方的線程已經(jīng)關(guān)閉,那么最后的 unwrap 也會跳過這里的任何錯誤,類似于如果接收方關(guān)閉,則 send 方法返回 Err。
對 recv 的調(diào)用會阻塞當(dāng)前線程,直到有作業(yè)可用。Mutex<T> 確保一次只有一個 Worker 線程試圖請求作業(yè)。
至此,多線程 Web 服務(wù)器已經(jīng)能成功運行了?,F(xiàn)在我們有了一個異步執(zhí)行連接的線程池。創(chuàng)建的線程永遠不會超過 4 個,所以如果服務(wù)器接收到大量請求,我們的系統(tǒng)也不會過載,但也不會停止。在瀏覽器打開多個網(wǎng)頁,程序輸出一些執(zhí)行的信息:
Worker 0 got a job; executing. Worker 1 got a job; executing. Worker 2 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. ...
你可能想知道為什么不按照下面所示的方式編寫工作線程代碼。
// --snip--
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
while let Ok(job) = receiver.lock().unwrap().recv() {
println!("Worker {id} got a job; executing.");
job();
}
});
Worker { id, thread }
}
}
這段代碼可以編譯和運行,但不會產(chǎn)生期望的線程行為:緩慢的請求仍然會導(dǎo)致其他請求等待處理。原因有些微妙:Mutex 沒有公共解鎖方法,因為鎖的所有權(quán)是基于鎖方法返回的 LockResult<MutexGuard<T>> 中的 MutexGuard<T> 的生命周期。在編譯時,借用檢查器可以強制執(zhí)行由互斥鎖保護的資源不能被訪問的規(guī)則,除非我們持有該鎖。但是,如果我們不注意 MutexGuard<T>的生命周期,這種實現(xiàn)也會導(dǎo)致鎖被持有的時間比預(yù)期的要長。
之前的代碼使用 let job = receiver.lock().unwrap().recv().unwrap(); 之所以有效,是因為使用 let 時,在等號右側(cè)的表達式中使用的任何臨時值都會在 let 語句結(jié)束時立即刪除。然而,while let(以及 if let 和 match)在相關(guān)塊結(jié)束之前不會刪除臨時值。在使用 while let 的代碼中,鎖在調(diào)用 job() 期間保持持有,這意味著其他 Worker 實例不能接收作業(yè)。
正常關(guān)機和清理
接下來,我們將實現(xiàn) Drop trait,在池中的每個線程上調(diào)用 join,這樣它們就可以在關(guān)閉之前完成正在處理的請求。然后我們將實現(xiàn)一種方法來告訴線程它們應(yīng)該停止接受新請求并關(guān)閉。要查看這段代碼的實際效果,我們將修改服務(wù)器,使其在優(yōu)雅地關(guān)閉線程池之前只接受兩個請求。
在 ThreadPool 上實現(xiàn)D rop trait
讓我們從在線程池上實現(xiàn) Drop 開始。當(dāng)池被刪除時,我們的線程都應(yīng)該連接起來,以確保它們完成自己的工作。
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
我們循環(huán)遍歷線程池的每個 worker。我們使用 &mut 是因為 self 是一個可變引用,而且我們還需要能夠改變 worker。對于每個 worker,我們打印一條消息,表示這個特定的 worker 實例正在關(guān)閉,然后我們在該 worker 實例的線程上調(diào)用 join。如果 join 調(diào)用失敗,我們使用 unwrap 使 Rust 陷入 panic,并進入不正常的關(guān)閉狀態(tài)。
然而,程序并不能成功編譯:

這個錯誤告訴我們不能調(diào)用 join,因為我們只有每個 worker 的可變借用,join 擁有其參數(shù)的所有權(quán)。為了解決這個問題,我們需要將線程移出擁有線程的 Worker 實例,以便 join 可以使用線程。
一種解決方法是使用 Option。如果 Worker 持有 Option<thread::JoinHandle<()>>,我們可以調(diào)用 Optio n的 take 方法將值從 Some 變體中移出,并在其位置留下 None 變體。換句話說,正在運行的 Worker 在線程中會有一個 Some 變量,當(dāng)我們想要清理 Worker 時,我們將 Some 替換為 None,這樣 Worker 就不會有線程要運行了。
然而,只有在丟棄 Worker 時才會出現(xiàn)這種情況。使用 Option 之后,我們必須在訪問 worker.thread 的任何地方處理 Option<thread::JoinHandle<()>>,這很繁瑣。
在這種情況下,存在一個更好的替代方法:Vec::drain 方法。它接受一個 range 參數(shù)來指定要從Vec中刪除哪些項,并返回這些項的迭代器。傳遞 .. 將從 Vec 中刪除所有值。
所以我們需要像這樣更新 ThreadPool 的 Drop 實現(xiàn):
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
這將解決編譯器錯誤,并且不需要對代碼進行任何其他更改。
向線程發(fā)出停止監(jiān)聽作業(yè)的信號
程序還沒有按照我們想要的方式運行。關(guān)鍵是由 Worker 實例的線程運行的閉包中的邏輯:目前,我們調(diào)用 join,但這不會關(guān)閉線程,因為它們永遠在循環(huán)尋找作業(yè)。如果我們嘗試使用當(dāng)前的 Drop 實現(xiàn)來刪除 ThreadPool,主線程將永遠阻塞,等待第一個線程完成。
為了解決這個問題,我們需要改變 ThreadPool Drop 的實現(xiàn),在等待線程完成之前顯式地刪除 sender。然后再改變 Worker 中的 loop。
pub struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
// --snip--
impl ThreadPool {
pub fn new(size: usize) -> ThreadPool {
// --snip--
ThreadPool {
workers,
sender: Some(sender),
}
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in self.workers.drain(..) {
println!("Shutting down worker {}", worker.id);
worker.thread.join().unwrap();
}
}
}
與線程不同,這里我們需要使用 Option::take 來將 sender 移出 ThreadPool。
刪除 sender 將關(guān)閉通道,這表明將不再發(fā)送消息。當(dāng)這種情況發(fā)生時, Worker 實例在 loop 中對 recv 的所有調(diào)用都會返回一個錯誤。在這種情況下,我們應(yīng)該優(yōu)雅地退出循環(huán),這意味著線程將在 ThreadPool Drop 實現(xiàn)調(diào)用 join 時結(jié)束。
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || {
loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {id} got a job; executing.");
job();
}
Err(_) => {
println!("Worker {id} disconnected; shutting down.");
break;
}
}
}
});
Worker { id, thread }
}
}
要查看這段代碼的實際效果,讓我們修改 main 函數(shù),使其在優(yōu)雅地關(guān)閉服務(wù)器之前只接受兩個請求。
fn main() {
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
let pool = ThreadPool::new(4);
for stream in listener.incoming().take(2) {
let stream = stream.unwrap();
pool.execute(|| {
handle_connection(stream);
});
}
println!("Shutting down.");
}
take 方法是在 Iterator trait 中定義的,它將迭代最多限制在前兩項。 ThreadPool 將在 main 函數(shù)結(jié)束時超出作用域,并運行 Drop實現(xiàn)。
啟動裝載運行的服務(wù)器,并發(fā)出三個請求。第三個請求應(yīng)該出錯,程序輸出為:
Worker 0 got a job; executing. Shutting down. Worker 1 got a job; executing. Shutting down worker 0 Worker 3 disconnected; shutting down. Worker 2 disconnected; shutting down. Worker 0 disconnected; shutting down. Shutting down worker 1 Worker 1 disconnected; shutting down. Shutting down worker 2 Shutting down worker 3
打印的 Worker id 和消息可能有不同順序。我們可以從消息中看到這段代碼是如何工作的:Worker 實例 0 和 1 獲得了前兩個請求。服務(wù)器在第二個連接之后停止接受連接,線程池上的 Drop 實現(xiàn)甚至在 Worker 1 開始它的工作之前就開始執(zhí)行。刪除發(fā)送器將斷開所有 Worker 實例的連接,并告訴它們關(guān)閉。每個 Worker 實例在斷開連接時打印一條消息,然后線程池調(diào)用 join 來等待每個 Worker 線程完成。
注意這個特殊執(zhí)行的一個有趣的方面:ThreadPool 丟棄了 sender,并且在任何 Worker 接收到錯誤之前,我們嘗試加入 Worker 0。工作線程 0 還沒有從 recv 獲得錯誤,所以主線程阻塞等待工作線程 0 完成。同時,Worker 1 收到了一個作業(yè),然后所有線程都收到了一個錯誤。當(dāng) Worker 0 完成時,主線程等待其余的 Worker 實例完成。在這一點上,他們都退出了循環(huán),停止了。
我們現(xiàn)在已經(jīng)完成了我們的項目;我們有一個基本的 Web 服務(wù)器,它使用線程池進行異步響應(yīng)。我們能夠執(zhí)行服務(wù)器的優(yōu)雅關(guān)閉,這將清理池中的所有線程。
項目地址
GitHub:UestcXiye / Multi-Thread-Web-Server-based-on-Rust
到此這篇關(guān)于Rust中多線程 Web 服務(wù)器的項目實戰(zhàn)的文章就介紹到這了,更多相關(guān)Rust 多線程Web服務(wù)器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Rust中的Box<T>之堆上的數(shù)據(jù)與遞歸類型詳解
本文介紹了Rust中的Box<T>類型,包括其在堆與棧之間的內(nèi)存分配,性能優(yōu)勢,以及如何利用Box<T>來實現(xiàn)遞歸類型和處理大小未知類型,通過Box<T>,Rust程序員可以更靈活地管理內(nèi)存,避免編譯時大小不確定的問題,并提高代碼的效率和靈活性2025-02-02
Rust中類型轉(zhuǎn)換在錯誤處理中的應(yīng)用小結(jié)
隨著項目的進展,關(guān)于Rust的故事又翻開了新的一頁,今天來到了服務(wù)器端的開發(fā)場景,發(fā)現(xiàn)錯誤處理中的錯誤類型轉(zhuǎn)換有必要分享一下,對Rust錯誤處理相關(guān)知識感興趣的朋友一起看看吧2023-09-09
如何使用bindgen將C語言頭文件轉(zhuǎn)換為Rust接口代碼
這篇文章主要介紹了使用bindgen將C語言頭文件轉(zhuǎn)換為Rust接口代碼,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-01-01
Rust-使用dotenvy加載和使用環(huán)境變量的過程詳解
系統(tǒng)的開發(fā),測試和部署離不開環(huán)境變量,今天分享在Rust的系統(tǒng)開發(fā)中,使用dotenvy來讀取和使用環(huán)境變量,感興趣的朋友跟隨小編一起看看吧2023-11-11

