Rust中多線程?Web?服務器的項目實戰(zhàn)
前情提要:http://www.dbjr.com.cn/program/34427748j.htm
單線程 Web 服務器將依次處理每個請求,這意味著在第一個連接完成處理之前,它不會處理第二個連接。如果服務器接收到越來越多的請求,那么串行執(zhí)行將越來越不理想。如果服務器接收到一個需要很長時間來處理的請求,則后續(xù)請求將不得不等待,直到長請求完成,即使新請求可以快速處理。我們需要解決這個問題,但首先我們要看看實際的問題。
模擬慢速請求
我們將了解處理緩慢的請求如何影響對單線程 Web 服務器實現的其他請求。
我們使用模擬的緩慢響應實現了對 /sleep 的請求處理,該響應將導致服務器在響應之前休眠 5 s。
use std::{ fs, io::{BufReader, prelude::*}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn handle_connection(mut stream: TcpStream) { // --snip-- let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- }
新增了一種對 /sleep 請求的響應,當接收到該請求時,服務器將在呈現 hello.html 之前休眠 5 s。
使用 cargo run 啟動服務器。然后打開兩個瀏覽器窗口:一個用于 127.0.0.1:7878,另一個用于 127.0.0.1:7878/sleep。如果像以前一樣多次輸入 / URI,您將看到它快速響應。但是如果你輸入 /sleep,然后加載 /,你會看到 / 等待,直到 sleep 了整整 5 s 才加載。
我們要實現一個線程池,避免慢速請求后面的請求等待。
使用線程池提高吞吐量
線程池是一組正在等待并準備處理任務的派生線程。當程序接收到一個新任務時,它將池中的一個線程分配給該任務,該線程將處理該任務。池中的剩余線程可用于處理在第一個線程正在處理時進入的任何其他任務。當第一個線程完成其任務的處理后,它將返回到空閑線程池,準備處理新任務。線程池允許您并發(fā)地處理連接,從而提高服務器的吞吐量。
我們將限制池中的線程數量,因為服務器的資源是有限的,也保護我們免受 DoS 攻擊。進入的請求被發(fā)送到池中進行處理,線程池將維護一個傳入請求隊列,池中的每個線程將從這個隊列中彈出一個請求,處理該請求,然后向隊列請求另一個請求。使用這種設計,我們最多可以并發(fā)處理 N 個請求,其中 N 是線程數。
這種技術只是提高 Web 服務器吞吐量的眾多方法之一。其他選項包括 fork/join 模型、單線程異步 I/O 模型和多線程異步 I/O 模型,等等。
初步嘗試:為每個請求生成一個線程
首先,讓我們探索一下,如果為每個連接創(chuàng)建一個新線程,我們的代碼會是什么樣子。正如前面提到的,這不是我們的最終計劃,因為可能會產生無限數量的線程,但這是一個起點,可以首先獲得一個工作的多線程服務器。然后我們將添加線程池作為改進,并且比較兩種解決方案會更容易。
在單線程 Web 服務器的 main 函數中進行修改:
use std::thread; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } }
thread::spawn 將創(chuàng)建一個新線程,然后在新線程中運行閉包中的代碼。
如果運行這段代碼并在瀏覽器中加載 /sleep,然后在另外兩個瀏覽器選項卡中加載 /,對 / 的請求不必等待 /sleep 完成。然而,正如我們所提到的,這最終將使系統(tǒng)不堪重負,因為你將無限制地創(chuàng)建新線程。
現在,是時候讓 async 和 await 真正發(fā)揮作用了!
實現線程池的定義和函數聲明
我們的線程池的實現將獨立于我們的 Web 服務器正在做的工作。
創(chuàng)建一個src/lib.rs,先實現一個 ThreadPool 結構體的定義,以及 ThreadPool::new 函數,其參數 size 表示線程池內線程的最大數量。
pub struct ThreadPool; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } }
我們將實現 execute 函數,它接受給定的閉包,并將其交給池中的空閑線程運行。該函數類似于標準庫 thread::spawn 函數。
我們可以將閉包作為具有三個不同特征的參數:Fn、FnMut 和 FnOnce。我們需要決定在這里使用哪種閉包。我們可以看看 thread::spawn 的簽名對它的參數有什么限制。文檔向我們展示了以下內容:
impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } }
F 類型參數是我們關心的,T 類型參數與返回值有關,我們不關心這個。我們可以看到 spawn 使用 FnOnce 作為 f 上的 trait 約束。因為我們最終將在 execute 中獲得的參數傳遞給 spawn,并且運行請求的線程只會執(zhí)行該請求的閉包一次,所以 FnOnce 是我們想要使用的 trait。
F 類型參數也有 Send trait 約束和 static 生命周期約束,這在我們的情況下很有用:我們需要 Send 來將閉包從一個線程轉移到另一個線程,而需要 static 是因為我們不知道線程執(zhí)行需要多長時間。讓我們在 ThreadPool 上創(chuàng)建一個execute方法,它將接受 F 類型的泛型參數,并具有以下約束:
impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } }
我們仍然在 FnOnce 之后使用 (),因為這個 FnOnce 表示一個閉包,它不接受參數,返回單元類型 ()。就像函數定義一樣,返回類型可以從簽名中省略,但即使沒有參數,仍然需要括號。
ThreadPool 結構體的定義和兩個函數的聲明已經完成,使用 ThreadPool 結構體代替 thread::spawn 的假設接口。
修改 main.rs 中的代碼:
use multi_thread_web_server::ThreadPool; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } }
我們使用 ThreadPool::new 創(chuàng)建一個新的線程池,可配置的線程數為 4 個。然后,在 for 循環(huán)中,pool.execute 有一個類似 thread::spawn 的接口,因為它接受一個閉包,處理每一個 stream。
運行 cargo build,編譯通過了。
驗證 new 中的線程數
前面我們?yōu)?size 參數選擇了 unsigned 類型,因為線程數為負數的池沒有意義。然而,一個沒有線程的池也沒有意義,所以在返回 ThreadPool 實例之前,我們將添加代碼來檢查 size 是否大于 0,并通過 assert 讓程序在接收到 0 時 panic。
impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); ThreadPool } // --snip-- }
我們還為 ThreadPool 添加了一些文檔和文檔注釋。運行 cargo doc --open,在打開的 HTML 文檔中點擊 ThreadPool 就能查看它的一些介紹信息。
我們也可以將 new 更改為 build 并返回一個 Result,就像下面的定義一樣:
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
但是在這種情況下,我們嘗試創(chuàng)建一個沒有任何線程的線程池是不合理的,我們希望在錯誤時 panic。
創(chuàng)建存儲線程的空間
既然我們有辦法知道池中存儲了有效數量的線程,我們就可以創(chuàng)建這些線程,并在返回結構體之前將它們存儲在 ThreadPool 結構體中。
但是我們如何“存儲”一個線程呢?讓我們再看一下 thread::spawn 簽名:
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,
spawn 函數返回一個 JoinHandle<T>,其中 T 是閉包返回的類型。我們也使用 JoinHandle,因為我們傳遞給線程池的閉包將處理連接而不返回任何東西,因此 T 將是單元類型 ()。
修改 ThreadPool 的定義,使其包含 thread::JoinHandle<()> 實例的 vector。
use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, }
再修改 new 函數,初始化 vector 的容量為 size,設置 for 循環(huán),運行一些代碼來創(chuàng)建線程,并返回一個包含它們的 ThreadPool 實例。
pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // create some threads and store them in the vector } ThreadPool { threads } }
再次運行 cargo build,編譯成功。
負責將代碼從線程池發(fā)送到線程的 Worker 結構體
標準庫的 thread::spawn 期望得到一些代碼,這些代碼應該在線程創(chuàng)建后立即運行。然而,在本例中,我們希望創(chuàng)建線程并讓它們等待稍后發(fā)送的代碼。
我們將通過在 ThreadPool 和管理這種新行為的線程之間引入一個新的數據結構來實現這種行為。我們將此數據結構稱為 Worker,這是池實現中的一個常用術語。Worker 獲取需要運行的代碼,并在 Worker 的線程中運行這些代碼。
我們將存儲 Worker 結構的實例,而不是在線程池中存儲 JoinHandle<()> 實例的 vector。每個 Worker 將存儲一個 JoinHandle<()> 實例。然后,我們將在 Worker 上實現一個方法,該方法將接受代碼的閉包來運行,并將其發(fā)送到已經運行的線程中執(zhí)行。我們還將為每個 Worker 提供一個 id,以便在進行日志記錄或調試時區(qū)分池中 Worker 的不同實例。
總結一下,我們要實現這四件事:
- 定義一個 Worker 結構體,它包含一個 id 和一個 JoinHandle<()>.
- 更改 ThreadPool 的定義,包含一個Worker 實例的 vector,而不是 Vec<thread::JoinHandle<()>>。
- 定義一個 Worker::new 函數,該函數接受一個 id 號,并返回一個包含該 id 的 Worker 實例和一個由空閉包派生的線程。
- 在 ThreadPool::new 函數中,使用 for 循環(huán)計數器生成一個 id,用該 id 創(chuàng)建一個新的 Worker,并將該 Worker 存儲在 vector 中。
use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } // --snip-- } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } }
外部代碼不需要知道在 ThreadPool 中使用 Worke r結構體的實現細節(jié),所以我們把 Worker 結構體及其函數設為 private。Worker::new 函數使用我們給它的 id,并存儲一個 JoinHandle<()> 實例,該實例是通過使用空閉包生成一個新線程創(chuàng)建的。
我們將 ThreadPool 上的字段名稱從 threads 更改為 workers,因為它現在保存 Worker 實例而不是 JoinHandle<()> 實例。
注意,如果操作系統(tǒng)因為沒有足夠的系統(tǒng)資源而無法創(chuàng)建線程,thread::spawn 將出現 panic,這在實際生產環(huán)境中很危險。實際情況下,我們可以使用 std::thread::Builder 及其派生方法。
這段代碼將編譯并存儲作為 ThreadPool::new 參數指定的 Worker 實例的數量。但是我們仍然沒有處理在 execute 中得到的閉包。讓我們看看接下來該怎么做。
通過通道向線程發(fā)送請求
我們希望剛剛創(chuàng)建的 Worker 結構體從 ThreadPool 中保存的隊列中獲取要運行的代碼,并將該代碼發(fā)送到其線程中運行。
我們將使用通道作為作業(yè)隊列,execute 將把作業(yè)從 ThreadPool 發(fā)送到 Worker 實例,后者將把作業(yè)發(fā)送到它的線程。計劃如下:
- ThreadPool 將創(chuàng)建一個通道并保持發(fā)送端。
- 每個 Worker 獲取 receiver,作為接收端。
- 我們將創(chuàng)建一個新的 Job 結構體來保存我們想要發(fā)送到通道中的閉包。
- execute 方法將通過發(fā)送端發(fā)送它想要執(zhí)行的作業(yè)。
- 在它的線程中,Worker 將遍歷它的接收者,并執(zhí)行它接收到的所有作業(yè)的閉包。
讓我們首先在 ThreadPool::new 中創(chuàng)建一個通道,并在 ThreadPool 實例中保存發(fā)送端。Job 結構現在還沒有保存任何東西,但它將是我們發(fā)送到通道的項的類型。
use std::{sync::mpsc, thread}; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers, sender } } // --snip-- }
在 ThreadPool::new 中,我們創(chuàng)建了一個通道,并讓 ThreadPool 包含 sender。這將成功編譯。
讓我們嘗試在線程池創(chuàng)建通道時將通道的接收器傳遞給每個 Worker。我們知道我們想要在 Worker 實例產生的線程中使用 receiver,所以我們將在閉包中引用 receiver 參數。
impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }
代碼試圖將 receiver 傳遞給多個 Worker 實例,這是行不通的,因為 Rust 提供的通道實現是多個生產者,單個消費者。這意味著我們不能僅僅克隆通道的消費端(接收端)來修復此代碼。我們也不想多次向多個消費者發(fā)送消息。我們想要一個包含多個 Worker 實例的消息列表,這樣每個消息都會被處理一次。
此外,從通道隊列中取出作業(yè)涉及到改變 receiver,因此線程需要一種安全的方式來共享和修改 receiver。
為了在多個線程之間共享所有權并允許線程改變值,我們需要使用 Arc<Mutex<T>> 。Arc 類型將允許多個 Worker 實例擁有 receiver,Mutex 將確保一次只有一個 Worker 從接收器獲得作業(yè)。
use std::{ sync::{Arc, Mutex, mpsc}, thread, }; // --snip-- impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- } }
在 ThreadPool::new 中,我們將接收者置于 Arc<Mutex<>> 中。對于每個新的 Worker,我們克隆 Arc 來增加引用計數,這樣 Worker 實例就可以共享 receiver 的所有權。
有了這些修改,代碼就可以編譯了。
實現 execute 方法
最后讓我們實現 ThreadPool::execute 方法。我們還將 Job 從結構體更改為 trait 對象的類型別名,該 trait 對象保存 execute 接收的閉包類型。
// --snip-- type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } }
在使用獲得的閉包創(chuàng)建新 Job 實例之后,我們將該作業(yè)發(fā)送到通道中。在 send 失敗的情況下我們調用 unwrap。
如果我們停止執(zhí)行所有線程,這意味著接收端已經停止接收新消息,就可能發(fā)生這種情況。目前,我們不能停止線程的執(zhí)行:只要池存在,線程就會繼續(xù)執(zhí)行。我們使用 unwrap 的原因是我們知道失敗情況不會發(fā)生,但是編譯器不知道。
但我們還沒有完全完成!在 Worker 中,傳遞給 thread::spawn 的閉包仍然只引用通道的接收端。相反,我們需要閉包永遠循環(huán),向通道的接收端請求作業(yè),并在獲得作業(yè)時運行作業(yè)。讓我們對 Worker::new 函數進行如下的更改。
// --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } }
在這里,我們首先在 receiver 上調用 lock 來獲取互斥鎖,然后調用 unwrap 來在出現錯誤時發(fā)出警報。
如果互斥鎖處于鎖定狀態(tài),獲取鎖可能會失敗,如果其他線程在持有鎖而不是釋放鎖時 panic,就會發(fā)生這種情況。在這種情況下,調用 unwrap 使該線程 panic 是正確的操作。你也可以將此 unwrap 更改為 expect,并顯示有意義的錯誤消息。
如果我們獲得了互斥鎖,我們調用 recv 從通道接收 Job。如果持有發(fā)送方的線程已經關閉,那么最后的 unwrap 也會跳過這里的任何錯誤,類似于如果接收方關閉,則 send 方法返回 Err。
對 recv 的調用會阻塞當前線程,直到有作業(yè)可用。Mutex<T> 確保一次只有一個 Worker 線程試圖請求作業(yè)。
至此,多線程 Web 服務器已經能成功運行了。現在我們有了一個異步執(zhí)行連接的線程池。創(chuàng)建的線程永遠不會超過 4 個,所以如果服務器接收到大量請求,我們的系統(tǒng)也不會過載,但也不會停止。在瀏覽器打開多個網頁,程序輸出一些執(zhí)行的信息:
Worker 0 got a job; executing. Worker 1 got a job; executing. Worker 2 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. ...
你可能想知道為什么不按照下面所示的方式編寫工作線程代碼。
// --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } }
這段代碼可以編譯和運行,但不會產生期望的線程行為:緩慢的請求仍然會導致其他請求等待處理。原因有些微妙:Mutex 沒有公共解鎖方法,因為鎖的所有權是基于鎖方法返回的 LockResult<MutexGuard<T>> 中的 MutexGuard<T> 的生命周期。在編譯時,借用檢查器可以強制執(zhí)行由互斥鎖保護的資源不能被訪問的規(guī)則,除非我們持有該鎖。但是,如果我們不注意 MutexGuard<T>的生命周期,這種實現也會導致鎖被持有的時間比預期的要長。
之前的代碼使用 let job = receiver.lock().unwrap().recv().unwrap();
之所以有效,是因為使用 let 時,在等號右側的表達式中使用的任何臨時值都會在 let 語句結束時立即刪除。然而,while let(以及 if let 和 match)在相關塊結束之前不會刪除臨時值。在使用 while let 的代碼中,鎖在調用 job() 期間保持持有,這意味著其他 Worker 實例不能接收作業(yè)。
正常關機和清理
接下來,我們將實現 Drop trait,在池中的每個線程上調用 join,這樣它們就可以在關閉之前完成正在處理的請求。然后我們將實現一種方法來告訴線程它們應該停止接受新請求并關閉。要查看這段代碼的實際效果,我們將修改服務器,使其在優(yōu)雅地關閉線程池之前只接受兩個請求。
在 ThreadPool 上實現D rop trait
讓我們從在線程池上實現 Drop 開始。當池被刪除時,我們的線程都應該連接起來,以確保它們完成自己的工作。
impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }
我們循環(huán)遍歷線程池的每個 worker。我們使用 &mut 是因為 self 是一個可變引用,而且我們還需要能夠改變 worker。對于每個 worker,我們打印一條消息,表示這個特定的 worker 實例正在關閉,然后我們在該 worker 實例的線程上調用 join。如果 join 調用失敗,我們使用 unwrap 使 Rust 陷入 panic,并進入不正常的關閉狀態(tài)。
然而,程序并不能成功編譯:
這個錯誤告訴我們不能調用 join,因為我們只有每個 worker 的可變借用,join 擁有其參數的所有權。為了解決這個問題,我們需要將線程移出擁有線程的 Worker 實例,以便 join 可以使用線程。
一種解決方法是使用 Option。如果 Worker 持有 Option<thread::JoinHandle<()>>,我們可以調用 Optio n的 take 方法將值從 Some 變體中移出,并在其位置留下 None 變體。換句話說,正在運行的 Worker 在線程中會有一個 Some 變量,當我們想要清理 Worker 時,我們將 Some 替換為 None,這樣 Worker 就不會有線程要運行了。
然而,只有在丟棄 Worker 時才會出現這種情況。使用 Option 之后,我們必須在訪問 worker.thread 的任何地方處理 Option<thread::JoinHandle<()>>,這很繁瑣。
在這種情況下,存在一個更好的替代方法:Vec::drain 方法。它接受一個 range 參數來指定要從Vec中刪除哪些項,并返回這些項的迭代器。傳遞 .. 將從 Vec 中刪除所有值。
所以我們需要像這樣更新 ThreadPool 的 Drop 實現:
impl Drop for ThreadPool { fn drop(&mut self) { for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }
這將解決編譯器錯誤,并且不需要對代碼進行任何其他更改。
向線程發(fā)出停止監(jiān)聽作業(yè)的信號
程序還沒有按照我們想要的方式運行。關鍵是由 Worker 實例的線程運行的閉包中的邏輯:目前,我們調用 join,但這不會關閉線程,因為它們永遠在循環(huán)尋找作業(yè)。如果我們嘗試使用當前的 Drop 實現來刪除 ThreadPool,主線程將永遠阻塞,等待第一個線程完成。
為了解決這個問題,我們需要改變 ThreadPool Drop 的實現,在等待線程完成之前顯式地刪除 sender。然后再改變 Worker 中的 loop。
pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } // --snip-- impl ThreadPool { pub fn new(size: usize) -> ThreadPool { // --snip-- ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in self.workers.drain(..) { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }
與線程不同,這里我們需要使用 Option::take 來將 sender 移出 ThreadPool。
刪除 sender 將關閉通道,這表明將不再發(fā)送消息。當這種情況發(fā)生時, Worker 實例在 loop 中對 recv 的所有調用都會返回一個錯誤。在這種情況下,我們應該優(yōu)雅地退出循環(huán),這意味著線程將在 ThreadPool Drop 實現調用 join 時結束。
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } } }); Worker { id, thread } } }
要查看這段代碼的實際效果,讓我們修改 main 函數,使其在優(yōu)雅地關閉服務器之前只接受兩個請求。
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); }
take 方法是在 Iterator trait 中定義的,它將迭代最多限制在前兩項。 ThreadPool 將在 main 函數結束時超出作用域,并運行 Drop實現。
啟動裝載運行的服務器,并發(fā)出三個請求。第三個請求應該出錯,程序輸出為:
Worker 0 got a job; executing. Shutting down. Worker 1 got a job; executing. Shutting down worker 0 Worker 3 disconnected; shutting down. Worker 2 disconnected; shutting down. Worker 0 disconnected; shutting down. Shutting down worker 1 Worker 1 disconnected; shutting down. Shutting down worker 2 Shutting down worker 3
打印的 Worker id 和消息可能有不同順序。我們可以從消息中看到這段代碼是如何工作的:Worker 實例 0 和 1 獲得了前兩個請求。服務器在第二個連接之后停止接受連接,線程池上的 Drop 實現甚至在 Worker 1 開始它的工作之前就開始執(zhí)行。刪除發(fā)送器將斷開所有 Worker 實例的連接,并告訴它們關閉。每個 Worker 實例在斷開連接時打印一條消息,然后線程池調用 join 來等待每個 Worker 線程完成。
注意這個特殊執(zhí)行的一個有趣的方面:ThreadPool 丟棄了 sender,并且在任何 Worker 接收到錯誤之前,我們嘗試加入 Worker 0。工作線程 0 還沒有從 recv 獲得錯誤,所以主線程阻塞等待工作線程 0 完成。同時,Worker 1 收到了一個作業(yè),然后所有線程都收到了一個錯誤。當 Worker 0 完成時,主線程等待其余的 Worker 實例完成。在這一點上,他們都退出了循環(huán),停止了。
我們現在已經完成了我們的項目;我們有一個基本的 Web 服務器,它使用線程池進行異步響應。我們能夠執(zhí)行服務器的優(yōu)雅關閉,這將清理池中的所有線程。
項目地址
GitHub:UestcXiye / Multi-Thread-Web-Server-based-on-Rust
到此這篇關于Rust中多線程 Web 服務器的項目實戰(zhàn)的文章就介紹到這了,更多相關Rust 多線程Web服務器內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Rust-使用dotenvy加載和使用環(huán)境變量的過程詳解
系統(tǒng)的開發(fā),測試和部署離不開環(huán)境變量,今天分享在Rust的系統(tǒng)開發(fā)中,使用dotenvy來讀取和使用環(huán)境變量,感興趣的朋友跟隨小編一起看看吧2023-11-11