Rust并發(fā)編程之使用消息傳遞進行線程間數(shù)據(jù)共享方式
一、通道(Channel)的基本概念
一個通道可以想象成一條單向水道或河流:有一個 發(fā)送端(transmitter) 和一個 接收端(receiver)。發(fā)送端好比河流上游,負責把“橡皮鴨”丟進水里;接收端在河流下游,收到這只“橡皮鴨”。在編程中,線程之間的通信即是這樣——把數(shù)據(jù)發(fā)到通道的一端,另外一個(或多個)線程在通道的另一端接收。
Rust 通過 std::sync::mpsc
(Multiple Producer, Single Consumer)來提供通道功能:
- Multiple Producer:可以有多個發(fā)送端同時發(fā)送數(shù)據(jù);
- Single Consumer:但只能有一個接收端來接收數(shù)據(jù)。
通過克隆發(fā)送端可以允許多個線程一起發(fā)送數(shù)據(jù)給同一個接收端。
二、創(chuàng)建并使用通道
1. 基礎用法
創(chuàng)建一個 mpsc::channel
use std::sync::mpsc; use std::thread; fn main() { // 創(chuàng)建一個通道 let (tx, rx) = mpsc::channel(); // 這里的 tx 是 transmitter(發(fā)送端),rx 是 receiver(接收端)。 // 我們先不發(fā)送任何數(shù)據(jù),因此代碼暫時無法編譯, // 因為編譯器不知道通道要發(fā)送什么類型的數(shù)據(jù)。 }
mpsc::channel()
函數(shù)會返回一個元組 (tx, rx)
,分別代表發(fā)送端和接收端。在之后,我們會看到如何把 tx
移動到不同線程去發(fā)送消息,rx
則留在當前線程用于接收消息。
2. 在子線程中發(fā)送消息
下面的例子中,我們在主線程創(chuàng)建了通道,然后把發(fā)送端 tx
移動(move
)到子線程中,子線程通過 tx.send()
發(fā)送一條字符串“hi”給主線程。
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); // 如果接收端已關閉,則 send 會返回錯誤,這里直接 unwrap 處理 }); // 在主線程接收消息 let received = rx.recv().unwrap(); // recv 會阻塞主線程,直到收到一條消息或者發(fā)送端被關閉 println!("Got: {}", received); }
運行后,主線程會打?。?/p>
Got: hi
這表示主線程成功地收到了子線程通過通道發(fā)送的字符串。
3. 通道與所有權
當我們調用 tx.send(val)
時,send
方法會獲取 val
的所有權。這樣做能夠避免在另一個線程修改數(shù)據(jù)后,我們在原線程又使用這段數(shù)據(jù)的潛在風險。
例如,下面這段示例代碼(示意)試圖在發(fā)送之后繼續(xù)使用 val
,就會導致編譯錯誤:
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); let val = String::from("hello"); thread::spawn(move || { tx.send(val).unwrap(); // 發(fā)送后 val 的所有權已轉移到通道 // println!("val is: {}", val); // 編譯錯誤: val 所有權已經(jīng)被移動 }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
由于所有權已經(jīng)轉移,主線程可以安全地接收并處理這條消息,而子線程也不會再訪問已經(jīng)移出的數(shù)據(jù)。這種嚴格的所有權規(guī)則能有效避免數(shù)據(jù)競爭和其他并發(fā)錯誤。
4. 發(fā)送多個消息
我們可以讓子線程發(fā)送不止一條消息。下面的例子讓子線程依次發(fā)送多條字符串,并在發(fā)送之間加上 sleep
用來模擬耗時操作:
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); // 主線程中,通過 iter 的方式持續(xù)接收消息 for received in rx { println!("Got: {}", received); } }
由于接收端可以被當做迭代器來使用,當所有發(fā)送端都關閉時,for
循環(huán)會自動結束。這段程序會像下面這樣依次打印每條消息:
Got: hi
Got: from
Got: the
Got: thread
5. 多個發(fā)送端(Multiple Producer)
mpsc
的含義之一就是 Multiple Producer。如果我們希望有多個不同的子線程來發(fā)送消息給同一個接收端,只需要克隆發(fā)送端即可。如下:
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("first thread"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); // 這里使用原先的 tx thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }
- 第一個子線程使用
tx1
; - 第二個子線程使用原本的
tx
;
所有發(fā)送過來的數(shù)據(jù)都將流向同一個 rx
(接收端)。運行結果每次可能都不一樣,因為線程的調度順序不可預測,這也正是并發(fā)編程“有趣”且需要謹慎之處。
三、總結
- 創(chuàng)建通道:使用
mpsc::channel()
創(chuàng)建通道,獲得(tx, rx)
(發(fā)送端和接收端)。 - 發(fā)送數(shù)據(jù):
tx.send(data)
會轉移data
的所有權,若接收端已關閉,send
會返回錯誤。 - 接收數(shù)據(jù):
rx.recv()
會阻塞等待數(shù)據(jù);rx.try_recv()
則不會阻塞,可用于非阻塞檢查。也可將rx
當做迭代器使用,以便持續(xù)接收數(shù)據(jù),直到通道被關閉。 - 所有權規(guī)則保障安全:發(fā)送端在
send
時會移動數(shù)據(jù)的所有權,避免了多線程中對同一數(shù)據(jù)的潛在不安全訪問。 - 多發(fā)送端:通過克隆發(fā)送端(
tx.clone()
),多個線程可以各自發(fā)送數(shù)據(jù)到同一個接收端,從而實現(xiàn)復雜的多生產(chǎn)者單消費者架構。
Rust 的通道借助所有權系統(tǒng),幫助我們輕松規(guī)避了許多并發(fā)陷阱。通過消息傳遞的思路,不再需要小心翼翼地管理鎖和共享數(shù)據(jù),編程思路也往往更加清晰簡潔。在實際項目中,若需要多個線程之間相互通信,不妨考慮一下通道(channel)方案,也許能帶來更加優(yōu)雅和可靠的并發(fā)架構。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Rust中的Iterator和IntoIterator介紹及應用小結
Iterator即迭代器,它可以用于對數(shù)據(jù)結構進行迭代,被迭代的數(shù)據(jù)結構是可迭代的(iterable),所謂的可迭代就是這個數(shù)據(jù)結構有返回迭代器的方法,這篇文章主要介紹了Rust中的Iterator和IntoIterator介紹及應用,需要的朋友可以參考下2023-07-07關于Rust編譯時報link.exe?not?found錯誤問題
這篇文章主要介紹了Rust編譯的時候報出link.exe?not?found錯誤問題,解決方法是在命令行就是CMD執(zhí)行相應的命令即可,本文給大家分解決方法,需要的朋友可以參考下2022-09-09