Rust 通過(guò)異步實(shí)現(xiàn)并發(fā)的方法示例
在本文中,我們將重點(diǎn)討論線程和 future 之間的區(qū)別。
在許多情況下,使用異步處理并發(fā)性的 API 與使用線程的 API 非常相似,但它們通常具有不同的行為,并且它們幾乎總是具有不同的性能特征。
用 spawn_task 創(chuàng)建一個(gè)新任務(wù)
使用 thread::spawn 可以創(chuàng)建一個(gè)新線程,我們編寫的第一個(gè)程序是在兩個(gè)單獨(dú)的線程上進(jìn)行計(jì)數(shù)。
讓我們使用 async 做同樣的事情。trpl crate 提供了一個(gè)看起來(lái)與 thread::spawn API 非常相似的 spawn_task 函數(shù),以及一個(gè) sleep 函數(shù),它是 thread::sleep API 的異步版本。我們可以一起使用它們來(lái)實(shí)現(xiàn)計(jì)數(shù)示例:
use std::time::Duration; fn main() { trpl::run(async { trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }); }
我們使用 trpl::run 設(shè)置 main 函數(shù),以便我們的頂級(jí)函數(shù)可以是異步的。
然后我們?cè)谠搲K中編寫兩個(gè)循環(huán),每個(gè)循環(huán)都包含一個(gè) trpl::sleep 調(diào)用,該調(diào)用在發(fā)送下一條消息之前等待半秒。我們將一個(gè)循環(huán)放在 trpl::spawn_task 中,另一個(gè)放在頂層的 for 循環(huán)中。我們還在 sleep 調(diào)用之后添加了一個(gè) await。
這段代碼的行為類似于基于線程的實(shí)現(xiàn)——包括當(dāng)你運(yùn)行它時(shí),你可能會(huì)看到消息以不同的順序出現(xiàn)在你自己的終端上。
這個(gè)版本在主異步塊體中的 for 循環(huán)完成后立即停止,因?yàn)樵谥骱瘮?shù)結(jié)束時(shí),由 spawn_task 生成的任務(wù)被關(guān)閉。如果希望它一直運(yùn)行到任務(wù)完成,則需要使用連接句柄來(lái)等待第一個(gè)任務(wù)完成。對(duì)于線程,我們使用 join 方法來(lái)“阻塞”,直到線程完成運(yùn)行。我們可以使用 await 來(lái)做同樣的事情,因?yàn)槿蝿?wù)句柄本身就是一個(gè) future。它的輸出類型是 Result,所以我們也在等待它之后展開(kāi)它。
use std::time::Duration; fn main() { trpl::run(async { let handle = trpl::spawn_task(async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }); for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } handle.await.unwrap(); }); }
這個(gè)更新的版本運(yùn)行直到兩個(gè)循環(huán)結(jié)束。
到目前為止,看起來(lái) async 和線程給出了相同的基本結(jié)果,只是語(yǔ)法不同:使用 await 而不是在連接句柄上調(diào)用 join,并等待 sleep 調(diào)用。
更大的區(qū)別在于,我們不需要生成另一個(gè)操作系統(tǒng)線程來(lái)執(zhí)行此操作。實(shí)際上,我們甚至不需要在這里生成任務(wù)。由于 async 塊編譯為匿名的 future,我們可以將每個(gè)循環(huán)放在 async 塊中,并讓運(yùn)行時(shí)使用 trpl::join 函數(shù)將它們運(yùn)行到完成。
我們之前展示了如何在調(diào)用 std::thread::spawn 時(shí)對(duì)返回的 JoinHandle 類型使用 join 方法。trpl::join 函數(shù)與此類似,但用于 future。當(dāng)你給它兩個(gè) future 時(shí),它會(huì)產(chǎn)生一個(gè)新的 future,它的輸出是一個(gè)元組,其中包含你傳入的每個(gè) future 完成后的輸出。
我們使用 trpl::join 來(lái)等待 fut1 和 fut2 完成。我們不等待 fut1 和 fut2,而是等待 trpl::join 生成的新 future。
use std::time::Duration; fn main() { trpl::run(async { let fut1 = async { for i in 1..10 { println!("hi number {i} from the first task!"); trpl::sleep(Duration::from_millis(500)).await; } }; let fut2 = async { for i in 1..5 { println!("hi number {i} from the second task!"); trpl::sleep(Duration::from_millis(500)).await; } }; trpl::join(fut1, fut2).await; }); }
編譯運(yùn)行,我們看到兩個(gè) future 都運(yùn)行到完成:
現(xiàn)在,每次運(yùn)行的結(jié)果的順序都完全相同,這與我們?cè)诰€程中看到的非常不同。
這是因?yàn)?trpl::join 函數(shù)是公平的,這意味著它同樣頻繁地檢查每個(gè) future,在它們之間交替,如果另一個(gè)準(zhǔn)備好了,它永遠(yuǎn)不會(huì)讓一個(gè)搶先。對(duì)于線程,操作系統(tǒng)決定檢查哪個(gè)線程以及讓它運(yùn)行多長(zhǎng)時(shí)間。對(duì)于異步 Rust,運(yùn)行時(shí)決定檢查哪個(gè)任務(wù)。
在實(shí)踐中,細(xì)節(jié)變得復(fù)雜,因?yàn)楫惒竭\(yùn)行時(shí)可能會(huì)在后臺(tái)使用操作系統(tǒng)線程作為管理并發(fā)性的一部分,因此保證公平性對(duì)運(yùn)行時(shí)來(lái)說(shuō)可能需要更多的工作。
運(yùn)行時(shí)不必保證任何給定操作的公平性,它們通常提供不同的 API,讓你選擇是否需要公平性。
使用消息傳遞計(jì)算兩個(gè)任務(wù)
我們使用消息傳遞的異步版本在 future 之間共享數(shù)據(jù)。
我們將采用與使用消息傳遞在線程之間傳輸數(shù)據(jù)略有不同的方法來(lái)說(shuō)明基于線程的并發(fā)和基于 future 的并發(fā)之間的一些關(guān)鍵區(qū)別。
在 trpl::run 的 async 塊中創(chuàng)建通道:
fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let val = String::from("hi"); tx.send(val).unwrap(); let received = rx.recv().await.unwrap(); println!("Got: {received}"); }); }
這里,我們使用 trpl::channel,這是 std::mpsc::channel(多生產(chǎn)者、單消費(fèi)者通道)的異步版本。異步版本的 API 與基于線程的版本只有一點(diǎn)不同:它使用一個(gè)可變的接收端 rx,它的 recv 方法產(chǎn)生一個(gè)我們需要等待的 future,而不是直接產(chǎn)生值。現(xiàn)在我們可以將消息從發(fā)送者發(fā)送到接收者。注意,我們不需要生成一個(gè)單獨(dú)的線程或任務(wù),我們只需要等待 rx.recv 調(diào)用。
在 std::mpsc::channel 中的 Receiver::recv 方法阻塞線程,直到它接收到消息。trpl::Receiver::recv 方法是異步的,它不阻塞,而是將控制權(quán)交還給運(yùn)行時(shí),直到接收到消息或通道的發(fā)送端關(guān)閉為止。相比之下,我們不等待 send 調(diào)用,因?yàn)樗粫?huì)阻塞。
注意:由于所有這些異步代碼都在 trpl::run 調(diào)用中的異步塊中運(yùn)行,因此其中的所有代碼都可以避免阻塞。但是,它外面的代碼將在運(yùn)行函數(shù)返回時(shí)阻塞。這就是 trpl::run 函數(shù)的全部意義:它允許你選擇在哪里阻塞某些異步代碼集,以及在哪里在同步代碼和異步代碼之間轉(zhuǎn)換。在大多數(shù)異步運(yùn)行時(shí),run 實(shí)際上被命名為 block_on 正是出于這個(gè)原因。
關(guān)于這個(gè)例子,請(qǐng)注意兩點(diǎn)。首先,消息會(huì)馬上到達(dá)。第二,雖然我們?cè)谶@里使用了 future,但是還沒(méi)有并發(fā)。程序的一切都是按順序進(jìn)行的,就像不涉及 future 一樣。
讓我們通過(guò)發(fā)送一系列消息并在它們之間休眠來(lái)解決第一部分:
use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); }
Rust 還沒(méi)有一種方法可以在一系列異步項(xiàng)上編寫 for 循環(huán),因此我們需要使用 while let 條件循環(huán),只要循環(huán)指定的模式繼續(xù)匹配該值,循環(huán)就會(huì)繼續(xù)執(zhí)行。
rx.recv() 產(chǎn)生一個(gè)我們等待的 future。運(yùn)行時(shí)將暫停 future,直到它準(zhǔn)備好。一旦消息到達(dá),future 將解析為 Some(message)。當(dāng)通道關(guān)閉時(shí),無(wú)論是否有消息到達(dá),future 都將解析為 None,表示沒(méi)有更多的值,因此我們應(yīng)該停止輪詢——也就是說(shuō),停止 await。
while let 循環(huán)將所有這些組合在一起。如果調(diào)用 rx.recv().await 的結(jié)果是Some(message),則可以訪問(wèn)該消息,并可以在循環(huán)體中使用它。如果結(jié)果為 None,則循環(huán)結(jié)束。每次循環(huán)完成時(shí),它都會(huì)再次到達(dá)等待點(diǎn),因此運(yùn)行時(shí)將再次暫停它,直到另一條消息到達(dá)。
代碼現(xiàn)在成功地發(fā)送和接收了所有消息:
不幸的是,仍然存在一些問(wèn)題。首先,消息不會(huì)以半秒的間隔到達(dá),它們?cè)谖覀儐?dòng)程序后 2 秒同時(shí)到達(dá)。其次,這個(gè)程序永遠(yuǎn)不會(huì)退出!相反,它會(huì)永遠(yuǎn)等待新的消息。
因?yàn)槌绦蛑兄挥幸粋€(gè)異步塊,因此其中的所有內(nèi)容都是線性運(yùn)行的,仍然沒(méi)有并發(fā)性。所有的 tx.send 調(diào)用都會(huì)發(fā)生,并與所有的 trpl::sleep 調(diào)用及其相關(guān)的等待點(diǎn)穿插在一起。只有這樣,while let 循環(huán)才能通過(guò) recv 調(diào)用上的任何等待點(diǎn)。
為了獲得我們想要的行為,即在每個(gè)消息之間發(fā)生睡眠延遲,我們需要將 tx 和 rx 操作放在各自的異步塊中,然后運(yùn)行時(shí)可以使用 trpl::join 分別執(zhí)行它們中的每一個(gè)。同樣,我們等待調(diào)用 trpl::join 的結(jié)果,而不是單個(gè)的 future。
use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
消息以 500 ms 的間隔打印,而不是在 2 s 后匆忙打印。
然而,由于 while let 循環(huán)與 trpl::join 的交互方式,程序仍然不會(huì)退出:
- 只有當(dāng)傳遞給它的兩個(gè) future 都完成后,從 trpl::join 返回的 future 才會(huì)完成。
- 在發(fā)送 vals 中的最后一條消息后,一旦結(jié)束 sleep,tx future 就完成了。
- 直到 while let 循環(huán)結(jié)束,rx future 才會(huì)完成。
- while let 循環(huán)直到等待 rx.recv 產(chǎn)生 None 才會(huì)結(jié)束。
- 等待 rx.recv 只會(huì)在通道的另一端關(guān)閉時(shí)返回 None。
- 只有當(dāng)我們調(diào)用 rx.close 或當(dāng)發(fā)送端 tx 被丟棄時(shí),通道才會(huì)關(guān)閉。
- 我們不會(huì)在任何地方調(diào)用 rx.close,并且在傳遞給 trpl::run 的最外層異步塊結(jié)束之前,tx 不會(huì)被丟棄。
- 這個(gè)塊不能結(jié)束,因?yàn)樗?trpl::join 完成時(shí)被阻塞了,這將我們帶回到列表的頂部。
我們可以通過(guò)在某處調(diào)用 rx.close 來(lái)手動(dòng)關(guān)閉 rx,但這沒(méi)有多大意義。在處理任意數(shù)量的消息后停止將使程序關(guān)閉,但我們可能會(huì)錯(cuò)過(guò)消息。我們需要一些其他的方法來(lái)確保 tx 在函數(shù)結(jié)束前被刪除。
現(xiàn)在,我們發(fā)送消息的異步塊只借用 tx,因?yàn)榘l(fā)送消息不需要所有權(quán),但是如果我們可以將 tx 移動(dòng)到異步塊中,那么一旦該塊結(jié)束,它就會(huì)被丟棄。move 關(guān)鍵字對(duì)異步塊的作用就像對(duì)閉包的作用一樣,將數(shù)據(jù)轉(zhuǎn)移到異步塊中。
我們將用于發(fā)送消息的塊從 async 更改為 async move。當(dāng)我們運(yùn)行這個(gè)版本的代碼時(shí),它會(huì)在發(fā)送和接收最后一條消息后優(yōu)雅地關(guān)閉。
use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; trpl::join(tx_fut, rx_fut).await; }); }
因?yàn)?tx 所有權(quán)被轉(zhuǎn)移到 async 塊內(nèi),在該塊執(zhí)行完也就是發(fā)送作業(yè)結(jié)束之后,tx 隨之被銷毀,觸發(fā)通道關(guān)閉,接收端返回 None。
這個(gè)異步通道也是一個(gè)多生產(chǎn)者通道,所以如果我們想從多個(gè) future 發(fā)送消息,我們可以在 tx 上調(diào)用 clone。
use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_millis(500)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_millis(1500)).await; } }; trpl::join3(tx1_fut, tx_fut, rx_fut).await; }); }
克隆 tx,在第一個(gè)異步塊之外創(chuàng)建 tx1,我們將 tx1 移動(dòng)到該塊中。然后將原始 tx 移動(dòng)到一個(gè)新的異步塊中,在那里我們以稍慢的延遲發(fā)送更多消息。
用于發(fā)送消息的兩個(gè)異步塊都需要是 async move 塊,以便在這些塊完成時(shí)丟棄 tx 和 tx1。最后,我們從 trpl::join 切換到 trpl::join3 來(lái)處理額外的 future。
現(xiàn)在我們看到了來(lái)自兩個(gè)發(fā)送 future 的所有消息,由于發(fā)送 future 在發(fā)送后使用的延遲略有不同,因此接收消息的間隔也不同。
這是一個(gè)良好的開(kāi)端,但它限制了我們的 future 數(shù)量:兩個(gè)對(duì)應(yīng) join,或三個(gè)對(duì)應(yīng) join3。
到此這篇關(guān)于Rust 通過(guò)異步實(shí)現(xiàn)并發(fā)的方法示例的文章就介紹到這了,更多相關(guān)Rust 異步并發(fā)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
一文學(xué)會(huì)Rust語(yǔ)言如何操作JSON
JSON在Web開(kāi)發(fā)中被廣泛應(yīng)用于數(shù)據(jù)交換,本文主要介紹了Rust語(yǔ)言操作JSON,包括序列化、反序列化、JSON創(chuàng)建等多個(gè)方面,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03Windows系統(tǒng)下安裝Rust環(huán)境超詳細(xì)教程
這篇文章主要介紹了如何在Windows系統(tǒng)上安裝mingw64和Rust,mingw64是一個(gè)輕便的C語(yǔ)言編譯環(huán)境,可以替代Rust默認(rèn)使用的Visual?Studio,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2025-02-02libbpf和Rust開(kāi)發(fā)ebpf程序?qū)崙?zhàn)示例
這篇文章主要為大家介紹了libbpf和Rust開(kāi)發(fā)ebpf程序?qū)崙?zhàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12Rust實(shí)現(xiàn)一個(gè)表達(dá)式Parser小結(jié)
這篇文章主要為大家介紹了Rust實(shí)現(xiàn)一個(gè)表達(dá)式Parser小結(jié),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11