rust?創(chuàng)建多線程web?server的詳細過程
創(chuàng)建一個 http server,處理 http 請求。
創(chuàng)建一個單線程的 web 服務(wù)
web server 中主要的兩個協(xié)議是 http 和 tcp。tcp 是底層協(xié)議,http 是構(gòu)建在 tcp 之上的。
通過std::net
庫創(chuàng)建一個 tcp 連接的監(jiān)聽對象,監(jiān)聽地址為127.0.0.1:8080
.
use std::net::TcpListener; fn main() { let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); println!("connected!"); } }
運行cargo run
,在瀏覽器中訪問http://127.0.0.1:8080
,可以看到控制臺輸出。
瀏覽器中顯示鏈接被重置,無法被訪問,因為沒有響應(yīng)任何數(shù)據(jù)。通過listener.incoming()
方法返回一個迭代器,它是客戶端與服務(wù)端之間打開的連接。稱之為stream
流,可以用來處理請求、響應(yīng)。
首先處理請求,需要讀取請求的參數(shù),通過std::io
庫處理流信息,引入std::io::prelude::*
包含一些讀寫流需要的特定 trait。
use std::io::{prelude::*, BufReader}; use std::net::TcpListener; fn main() { let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); for stream in listener.incoming() { let mut stream = stream.unwrap(); // 處理請求 let buf_reader = BufReader::new(&stream); let http_request: Vec<_> = buf_reader .lines() .map(|result| result.unwrap()) .take_while(|line| !line.is_empty()) .collect(); println!("requrest:{:#?}", http_request); } }
BufReader
實現(xiàn)了BufRead
trait,提供了lines
方法,通過換行符切割數(shù)據(jù)流返回一個Result<String,std::io::Error>
迭代器。通過map
獲取到每一個結(jié)果值,take_while
處理值直到為空結(jié)束,然后collect
收集結(jié)果值。
http_request
必須指定類型Vec<_>
來收集。在閉包那一節(jié)中,迭代器適配器,必須調(diào)用消費適配器獲取結(jié)果。
現(xiàn)在嘗試給請求作出一個響應(yīng),響應(yīng)狀態(tài)碼200
表示成功響應(yīng)。一個簡單的響應(yīng)頭包括了協(xié)議、協(xié)議版本、響應(yīng)狀態(tài)、狀態(tài)語句。
let res = "HTTP/1.1 200 OK\r\n\r\n"; stream.write_all(res.as_bytes()).unwrap();
重新啟動,再次瀏覽器訪問地址,可以看到空白頁面,F12
查看網(wǎng)絡(luò)請求,可以看到請求成功
可以增加請求路徑http://127.0.0.1:8080/home
或增加參數(shù)看看請求信息的不同。將請求處理、響應(yīng)處理放到一個函數(shù)中handle_request
接著可以返回一個html
文件,這樣頁面就有了基礎(chǔ)的展示效果。新建一個index.html
文件
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta name="viewport" content="width=<device-width>, initial-scale=1.0" /> <title>Document</title> </head> <body> <p>hello world</p> </body> </html>
讀取index.html
文件,并將文件內(nèi)容作為響應(yīng)返回
let res_status = "HTTP/1.1 200 OK\r\n"; let contents = fs::read_to_string("index.html").unwrap(); let len = contents.len(); let res = format!("{res_status}Content-Length:{len}\r\n\r\n{contents}"); stream.write_all(res.as_bytes()).unwrap();
再次運行,瀏覽器訪問可以看到頁面上已經(jīng)展示信息。現(xiàn)在只要是所有的請求訪問都會返回index.html
文件,通常我們會根據(jù)訪問路徑來處理響應(yīng),比如http://127.0.0.1:8080/home
限制如果有請求路徑或者是參數(shù),則響應(yīng)一個404.html
頁面,獲取http_request
第一個元素匹配GET / HTTP/1.1
,響應(yīng) 200,其他訪問都是返回 404.
fn handle_request(mut stream: TcpStream) { // 處理請求 let buf_reader = BufReader::new(&stream); let http_request: Vec<_> = buf_reader .lines() .map(|result| result.unwrap()) .take_while(|line| !line.is_empty()) .collect(); if http_request[0] == "GET / HTTP/1.1" { let res_status = "HTTP/1.1 200 OK\r\n"; let contents = fs::read_to_string("index.html").unwrap(); let len = contents.len(); let res = format!("{res_status}Content-Length:{len}\r\n\r\n{contents}"); stream.write_all(res.as_bytes()).unwrap(); } else { let res_status = "HTTP/1.1 404 NOT FOUND\r\n"; let contents = fs::read_to_string("404.html").unwrap(); let len = contents.len(); let res = format!("{res_status}Content-Length:{len}\r\n\r\n{contents}"); stream.write_all(res.as_bytes()).unwrap(); } }
優(yōu)化一下if else
里的代碼,只有響應(yīng)狀態(tài)、響應(yīng)的文件不一樣,其他邏輯都一樣。
let (res_status, file_name) = if http_request[0] == "GET / HTTP/1.1" { ("HTTP/1.1 200 OK\r\n", "index.html") } else { ("HTTP/1.1 404 NOT FOUND\r\n", "404.html") }; let contents = fs::read_to_string(file_name).unwrap(); let len = contents.len(); let res = format!("{res_status}Content-Length:{len}\r\n\r\n{contents}"); stream.write_all(res.as_bytes()).unwrap();
main
方法中的簡化,調(diào)用處理請求的函數(shù)。
fn main() { let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); for stream in listener.incoming() { let mut stream = stream.unwrap(); // 處理請求 handle_request(stream); } }
現(xiàn)在一個簡易的 web 服務(wù)就好了,可以處理請求、可以處理響應(yīng)。在這過程出現(xiàn)的錯誤我們都用unwrap
方法處理,只要遇到錯誤,直接停止程序,而在真實環(huán)境中,需要處理這些錯誤,避免程序的不可訪問。
創(chuàng)建多線程 server 服務(wù)
已經(jīng)構(gòu)建了單線程的服務(wù),但是它每次只能處理一個請求,只要完成上一個請求之后才能處理下一個連接。如果請求很多,則需要等待,這種表現(xiàn)使得服務(wù)性能很差。
首先,來模擬演示一下單線程的堵塞行為,通過線程休眠模擬慢請求
use std::thread::{self}; use std::time::Duration; fn handle_request(mut stream: TcpStream) { // ... // 將if部分改為match匹配,增加/sleep 路徑匹配,用以堵塞線程 let (res_status, file_name) = match &http_request[0][..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK\r\n", "index.html"), "GET /sleep HTTP/1.1" => { // 線程堵塞5s thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "index.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // ... }
然后我們打開兩個瀏覽器的 tab 頁,訪問不同的地址帶路徑/sleep
和不帶路徑/
的,先訪問帶路徑的,可以看到瀏覽器正在加載,再訪問不帶路徑的也發(fā)現(xiàn)瀏覽器正在加載。等 5 秒過后,全部加載完成,如果直接訪問不帶路徑/
則瞬間訪問成功。
為了處理這種情況,我們嘗試為每一個請求都分配一個線程獨立去處理請求任務(wù)。
構(gòu)建一個線程池,當(dāng)程序每收到新請求時,分配一個線程去處理該請求;其余線程等待處理其他接收到的請求,當(dāng)線程處理完請求后,返回到線程池等待處理新的請求。這樣我們就可以并發(fā)處理請求,這樣就是服務(wù)的吞吐量。
線程池的線程數(shù)不易過多,以固有數(shù)量的線程等待處理請求。這可以防止拒絕式服務(wù)攻擊
DOS
除了多線程處理服務(wù),還有其他方法改善服務(wù)吞吐量,fork/join
模型、單線程異步 I/O 模型、多線程異步 I/O 模型。
修改main
方法,thread::spawn
會創(chuàng)建一個新線程并運行閉包里的代碼。
fn main() { let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_request(stream); }); // handle_request(stream); } }
現(xiàn)在可以再次嘗試請求/sleep
和/
,可以發(fā)現(xiàn)/
瞬間就響應(yīng)了,/sleep
還需要等待 5s。如果有上千、上萬個請求,我們就要開同等數(shù)量的線程,在占用完所有資源后,就會使系統(tǒng)奔潰。
通過線程池,創(chuàng)建有限的線程數(shù)量。在處理請求時,內(nèi)部執(zhí)行的方法execute
會檢測空閑的線程并執(zhí)行之后的請求任務(wù),如果請求超過線程池線程數(shù)量,則排隊等待。
fn main() { let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); // 創(chuàng)建線程池 let threadPool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); // thread::spawn(|| { // handle_request(stream); // }); threadPool.execute(|| { handle_request(stream); }) // handle_request(stream); } }
實現(xiàn)ThreadPool
線程池類型
ThreadPool
類型并不存在于 rust 庫中,需要我們自己實現(xiàn)ThreadPool
。
在rust-lib
項目中,新建庫thread_pool
, 在src/lib.rs
中,通過new
函數(shù)實現(xiàn)創(chuàng)建ThreadPool
實例,它接受一個參數(shù)size
為線程的數(shù)量;通過定義execute
函數(shù)接受一個閉包參數(shù),閉包作為參數(shù)可以使用三個不同的 traitFn\FnMut\FnOnce
,要決定用哪個取決于最終的調(diào)用,最終是要調(diào)用thread::spawn()
的,它是使用了FnOnce
的,還需要Send
來將閉包從一個線程轉(zhuǎn)移到另一個線程,綁定生命周期'static
是因為不知道線程會執(zhí)行多久。
pub struct ThreadPool; impl ThreadPool { /// 創(chuàng)建線程池 /// /// 線程池中線程的數(shù)量 pub fn new(size: usize) -> ThreadPool { ThreadPool } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } }
定義完之后,回到項目rust-web
項目,引入依賴,在Cargo.toml
,
[dependencies] thread_pool = {path="../rust-lib/thread_pool"}
然后在src/main.rs
使用依賴use thread_pool::ThreadPool;
, 運行程序cargo run
,沒有報錯正常運行。
在new
方法中要保證初始化的線程數(shù)是一個有效的值,即size
不能為分?jǐn)?shù)或等于 0.這沒有意義。然后初始化 vector 實例來存儲線程實例,thread::spawn()
執(zhí)行后返回的類型為thread::JoinHandle
,它可以管理并等待創(chuàng)建的線程完成任務(wù)。
use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } impl ThreadPool { /// 創(chuàng)建線程池 /// /// 線程池中線程的數(shù)量 /// /// # Panics /// /// `new`函數(shù)在size為0 時panicthread_pool pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // 創(chuàng)建對應(yīng)數(shù)量的線程,并把它們存儲到vec中 } ThreadPool { threads } } // ... }
之前一直在使用thread::spawn()
來創(chuàng)建線程,并執(zhí)行任務(wù)?,F(xiàn)在在線程池中,需要提前創(chuàng)建線程,等待任務(wù)傳入后再執(zhí)行。標(biāo)準(zhǔn)的 rust 庫中沒有這樣的定義,仍需要自己實現(xiàn),可以稱之為Worker
數(shù)據(jù)結(jié)構(gòu),這樣我們在ThreadPool
存儲的是Worker
實例,在 worker 實例中存儲一個單獨的JoinHandle<()>
實例,并賦予該實例一個唯一的id
,方便日志和調(diào)用棧區(qū)分。
同樣的,在ThreadPool
src/lib.rs 定義結(jié)構(gòu)體Worker
類型,對于外部 worker 類型是私有的,不需要pub
定義。
use std::thread; pub struct ThreadPool { // threads: Vec<thread::JoinHandle<()>>, workers: Vec<Worker>, } impl ThreadPool { /// 創(chuàng)建線程池 /// /// 線程池中線程的數(shù)量 pub fn new(size: usize) -> ThreadPool { assert!(size > 0); // let mut threads = Vec::with_capacity(size); let mut workers = Vec::with_capacity(size); for id in 0..size { // 創(chuàng)建對應(yīng)數(shù)量的線程,并把它們存儲到vec中 workers.push(Worker::new(id)) } ThreadPool { workers } } // ... } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } }
運行我們的代碼,正常運行。現(xiàn)在需要解決的是向創(chuàng)建的線程傳遞要處理的請求任務(wù),通過之前文章中學(xué)過的channel
信道來傳遞信息.
在ThreadPool
中存在一個信道實例充當(dāng)發(fā)送者;并新建一個Job
結(jié)構(gòu)體存放用于向信道發(fā)送的閉包;execute
方法會發(fā)送期望執(zhí)行的任務(wù)。
use std::{sync::mpsc, thread}; pub struct ThreadPool { // threads: Vec<thread::JoinHandle<()>>, workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); // let mut threads = Vec::with_capacity(size); let mut workers = Vec::with_capacity(size); // 創(chuàng)建信道實例,提供一個發(fā)送者、接收者 let (sender, receiver) = mpsc::channel(); for id in 0..size { // 創(chuàng)建對應(yīng)數(shù)量的線程,并把它們存儲到vec中 workers.push(Worker::new(id,receiver)) } ThreadPool { workers, sender } } }
ThreadPool
實例存儲信道發(fā)送者對象sender
,需要將接受者實例receiver
傳遞給Worker
用于接收傳遞的信息。
impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }
這會有一個錯誤信息,因為在 rust 中信道實現(xiàn)是多生產(chǎn)者、單消費者,不能將receiver
接受者傳遞多個 work 實例。我們希望有一個任務(wù)列表,每個任務(wù)只允許處理一次。這在之前的文章中
rust 自動化測試、迭代器與閉包、智能指針、無畏并發(fā)
已經(jīng)解決過在線程間共享狀態(tài),通過線程安全智能指針Arc<Mutex<T>>
,多個線程共享所有權(quán)并允許線程修改其值。Arc
使得多個 worker 擁有接受端,而Mutex
確保一次只有一個 worker 能接收到任務(wù)。
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { assert!(size > 0); // let mut threads = Vec::with_capacity(size); let mut workers = Vec::with_capacity(size); let (sender, receiver) = mpsc::channel(); // 通過`Arc<T>`創(chuàng)建多所有者,Mutex<T>共享數(shù)據(jù) let receiver = Arc::new(Mutex::new(receiver)); for id in 0..size { // 創(chuàng)建對應(yīng)數(shù)量的線程,并把它們存儲到vec中 workers.push(Worker::new(id, Arc::clone(&receiver))) } ThreadPool { workers, sender } } } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // ... } }
最后處理execute
方法,它接受的閉包需要分配給空閑的線程并執(zhí)行,修改Job
結(jié)構(gòu)體,它不是一個結(jié)構(gòu)體,是接受execute
方法接受的閉包類型的類型別名。
// struct Job; type Job = Box<dyn FnOnce() + Send + 'static>;
在execute
方法被調(diào)用后,新建Job
實例,將任務(wù)從信道發(fā)送端發(fā)出,因為發(fā)送可能會失敗,所以需要unwrap
處理錯誤的發(fā)生。
impl ThreadPool { pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } }
繼續(xù)優(yōu)化接受端執(zhí)行任務(wù)的邏輯,在接收到任務(wù)后,通過lock
獲取互斥器來鎖定資源,防止其他地方使用資源。通過unwrap
處理錯誤時的情況,在獲取了互斥器鎖定了資源后,調(diào)用recv()
方法接受任務(wù)Job
,這會阻塞當(dāng)前線程,所有如果當(dāng)前線程沒有任務(wù),則會一直等待直到有用的任務(wù)。Mutex<T>
可以確保一次只有一個 Worker 線程請求任務(wù)。
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("開始執(zhí)行任務(wù){(diào)id}"); job(); }); Worker { id, thread } } }
通過loop
循環(huán)執(zhí)行閉包,一直向信道的接受端請求任務(wù),并在得到任務(wù)時執(zhí)行它們。
現(xiàn)在執(zhí)行cargo run
,并在瀏覽器中打開多個 tab 請求地址,可以看到打印輸出
不能使用其他循環(huán),比如while let \ if let \ match
是因為它們循環(huán)時相關(guān)的代碼塊結(jié)束都不會丟棄臨時值,導(dǎo)致鎖守護的資源不能釋放,不能被訪問。
程序停止與清理
當(dāng)我們終止程序后,如何去處理未執(zhí)行完的任務(wù),如何清理資源。
為ThreadPool
實現(xiàn)Drop
,當(dāng)線程池被丟棄時,應(yīng)該join
所有線程以確保任務(wù)完成。
impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("stop worker {}", worker.id); worker.thread.join().unwrap(); } } }
這里會有一個錯誤,不能編譯,提示沒有 worker 所有權(quán),因為我們只得到了一個可變借用,不能調(diào)用join
來消費線程。通過修改來使得thread
實例成為一個Option
值,這樣就可以通過take
方法來獲取到其中Some
成員值進行處理。清理時可以直接將thread
賦值為None
struct Worker { id: usize, // thread: thread::JoinHandle<()>, thread: Option<thread::JoinHandle<()>>, }
通過 rust 代碼檢測提示信息來修改其他需要調(diào)整的地方。Worker
new 方法創(chuàng)建實例時,接收thread
使用Some(thread)
在停止程序,清理時,通過take()
獲取到成員值后,再調(diào)用join()
方法等待線程執(zhí)行結(jié)束。
impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("stop worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }
正常邏輯來說調(diào)用了join()
之后會關(guān)閉線程,但是由于之前的線程邏輯是循環(huán)閉包調(diào)用等待接受任務(wù),也就是會導(dǎo)致線程一直不會執(zhí)行完畢,導(dǎo)致阻塞。一直阻塞在第一個線程結(jié)束上。
通過修改ThreadPool
的Drop
方法來顯式丟棄sender
。為了轉(zhuǎn)移sender
所有權(quán),同樣的使用Option
類型來傳遞
pub struct ThreadPool { // threads: Vec<thread::JoinHandle<()>>, workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } impl ThreadPool { pub fn new(size: usize) -> ThreadPool { // ... // ... ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); // self.sender.send(job).unwrap(); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { // 顯示的丟棄sender drop(self.sender.take()); for worker in &mut self.workers { println!("stop worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }
Drop()
方法調(diào)用顯示的丟棄sender
后,這會關(guān)閉信道,表明了后續(xù)不會有消息發(fā)送,這時在Worker
中無限循環(huán)調(diào)用接受消息的方法都會返回錯誤,此時可以修改邏輯在遭遇錯誤后退出循環(huán)。
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("開始執(zhí)行任務(wù){(diào)id}"); job(); } Err(_) => { println!("worker {id} disconnected"); break; } } }); Worker { id, thread: Some(thread), } } }
現(xiàn)在可以正常清理、停機了,如果希望在服務(wù)停止前再處理幾個請求,通過take()
方法模擬只兩個請求進行處理,來驗證停機的邏輯。它是Iterator
trait
fn main() { let listener = TcpListener::bind("127.0.0.1:8080").unwrap(); // 創(chuàng)建線程池 let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_request(stream); }) } }
現(xiàn)在運行程序cargo run
,同時在瀏覽器請求三次,看看控制臺如何打印信息,第三個請求不會被執(zhí)行。
可以看到只執(zhí)行完了兩次請求,在第一次請求處理完成后,調(diào)用了Drop
方法顯示的丟棄了信道發(fā)送者sender
,這樣整個就導(dǎo)致所有 worker 關(guān)閉連接。
到此這篇關(guān)于rust 創(chuàng)建多線程web server的詳細過程的文章就介紹到這了,更多相關(guān)rust 多線程web server內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Rust調(diào)用tree-sitter支持自定義語言解析
使用Rust語言結(jié)合tree-sitter庫解析自定義語言需要定義語法、生成C解析器,并在Rust項目中集成,具體步驟包括創(chuàng)建grammar.js定義語法,使用tree-sitter-cli工具生成C解析器,以及在Rust項目中編寫代碼調(diào)用解析器,這一過程涉及到對tree-sitter的深入理解和Rust語言的應(yīng)用技巧2024-09-09使用Rust采集天氣預(yù)報信息并實現(xiàn)實時更新數(shù)據(jù)功能
Rust作為一種高效、安全的編程語言,可以用于開發(fā)各種應(yīng)用,包括天氣預(yù)報采集系統(tǒng),本文將探討如何使用Rust來采集天氣預(yù)報信息,并實現(xiàn)實時更新數(shù)據(jù)的功能,文中通過代碼示例給大家介紹的非常詳細,需要的朋友可以參考下2024-01-01