欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Rust 通過(guò)異步實(shí)現(xiàn)并發(fā)的方法示例

 更新時(shí)間:2025年06月26日 10:39:48   作者:UestcXiye  
本文主要介紹了Rust 通過(guò)異步實(shí)現(xiàn)并發(fā)的方法示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

在本文中,我們將重點(diǎn)討論線程和 future 之間的區(qū)別。

在許多情況下,使用異步處理并發(fā)性的 API 與使用線程的 API 非常相似,但它們通常具有不同的行為,并且它們幾乎總是具有不同的性能特征。

用 spawn_task 創(chuàng)建一個(gè)新任務(wù)

使用 thread::spawn 可以創(chuàng)建一個(gè)新線程,我們編寫的第一個(gè)程序是在兩個(gè)單獨(dú)的線程上進(jìn)行計(jì)數(shù)。

讓我們使用 async 做同樣的事情。trpl crate 提供了一個(gè)看起來(lái)與 thread::spawn API 非常相似的 spawn_task 函數(shù),以及一個(gè) sleep 函數(shù),它是 thread::sleep API 的異步版本。我們可以一起使用它們來(lái)實(shí)現(xiàn)計(jì)數(shù)示例:

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

我們使用 trpl::run 設(shè)置 main 函數(shù),以便我們的頂級(jí)函數(shù)可以是異步的。

然后我們?cè)谠搲K中編寫兩個(gè)循環(huán),每個(gè)循環(huán)都包含一個(gè) trpl::sleep 調(diào)用,該調(diào)用在發(fā)送下一條消息之前等待半秒。我們將一個(gè)循環(huán)放在 trpl::spawn_task 中,另一個(gè)放在頂層的 for 循環(huán)中。我們還在 sleep 調(diào)用之后添加了一個(gè) await。

這段代碼的行為類似于基于線程的實(shí)現(xiàn)——包括當(dāng)你運(yùn)行它時(shí),你可能會(huì)看到消息以不同的順序出現(xiàn)在你自己的終端上。

在這里插入圖片描述

這個(gè)版本在主異步塊體中的 for 循環(huán)完成后立即停止,因?yàn)樵谥骱瘮?shù)結(jié)束時(shí),由 spawn_task 生成的任務(wù)被關(guān)閉。如果希望它一直運(yùn)行到任務(wù)完成,則需要使用連接句柄來(lái)等待第一個(gè)任務(wù)完成。對(duì)于線程,我們使用 join 方法來(lái)“阻塞”,直到線程完成運(yùn)行。我們可以使用 await 來(lái)做同樣的事情,因?yàn)槿蝿?wù)句柄本身就是一個(gè) future。它的輸出類型是 Result,所以我們也在等待它之后展開(kāi)它。

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}

這個(gè)更新的版本運(yùn)行直到兩個(gè)循環(huán)結(jié)束。

在這里插入圖片描述

到目前為止,看起來(lái) async 和線程給出了相同的基本結(jié)果,只是語(yǔ)法不同:使用 await 而不是在連接句柄上調(diào)用 join,并等待 sleep 調(diào)用。

更大的區(qū)別在于,我們不需要生成另一個(gè)操作系統(tǒng)線程來(lái)執(zhí)行此操作。實(shí)際上,我們甚至不需要在這里生成任務(wù)。由于 async 塊編譯為匿名的 future,我們可以將每個(gè)循環(huán)放在 async 塊中,并讓運(yùn)行時(shí)使用 trpl::join 函數(shù)將它們運(yùn)行到完成。

我們之前展示了如何在調(diào)用 std::thread::spawn 時(shí)對(duì)返回的 JoinHandle 類型使用 join 方法。trpl::join 函數(shù)與此類似,但用于 future。當(dāng)你給它兩個(gè) future 時(shí),它會(huì)產(chǎn)生一個(gè)新的 future,它的輸出是一個(gè)元組,其中包含你傳入的每個(gè) future 完成后的輸出。

我們使用 trpl::join 來(lái)等待 fut1 和 fut2 完成。我們不等待 fut1 和 fut2,而是等待 trpl::join 生成的新 future。

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

編譯運(yùn)行,我們看到兩個(gè) future 都運(yùn)行到完成:

在這里插入圖片描述

現(xiàn)在,每次運(yùn)行的結(jié)果的順序都完全相同,這與我們?cè)诰€程中看到的非常不同。

這是因?yàn)?trpl::join 函數(shù)是公平的,這意味著它同樣頻繁地檢查每個(gè) future,在它們之間交替,如果另一個(gè)準(zhǔn)備好了,它永遠(yuǎn)不會(huì)讓一個(gè)搶先。對(duì)于線程,操作系統(tǒng)決定檢查哪個(gè)線程以及讓它運(yùn)行多長(zhǎng)時(shí)間。對(duì)于異步 Rust,運(yùn)行時(shí)決定檢查哪個(gè)任務(wù)。

在實(shí)踐中,細(xì)節(jié)變得復(fù)雜,因?yàn)楫惒竭\(yùn)行時(shí)可能會(huì)在后臺(tái)使用操作系統(tǒng)線程作為管理并發(fā)性的一部分,因此保證公平性對(duì)運(yùn)行時(shí)來(lái)說(shuō)可能需要更多的工作。

運(yùn)行時(shí)不必保證任何給定操作的公平性,它們通常提供不同的 API,讓你選擇是否需要公平性。

使用消息傳遞計(jì)算兩個(gè)任務(wù)

我們使用消息傳遞的異步版本在 future 之間共享數(shù)據(jù)。

我們將采用與使用消息傳遞在線程之間傳輸數(shù)據(jù)略有不同的方法來(lái)說(shuō)明基于線程的并發(fā)和基于 future 的并發(fā)之間的一些關(guān)鍵區(qū)別。

在 trpl::run 的 async 塊中創(chuàng)建通道:

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("Got: {received}");
    });
}

這里,我們使用 trpl::channel,這是 std::mpsc::channel(多生產(chǎn)者、單消費(fèi)者通道)的異步版本。異步版本的 API 與基于線程的版本只有一點(diǎn)不同:它使用一個(gè)可變的接收端 rx,它的 recv 方法產(chǎn)生一個(gè)我們需要等待的 future,而不是直接產(chǎn)生值。現(xiàn)在我們可以將消息從發(fā)送者發(fā)送到接收者。注意,我們不需要生成一個(gè)單獨(dú)的線程或任務(wù),我們只需要等待 rx.recv 調(diào)用。

在 std::mpsc::channel 中的 Receiver::recv 方法阻塞線程,直到它接收到消息。trpl::Receiver::recv 方法是異步的,它不阻塞,而是將控制權(quán)交還給運(yùn)行時(shí),直到接收到消息或通道的發(fā)送端關(guān)閉為止。相比之下,我們不等待 send 調(diào)用,因?yàn)樗粫?huì)阻塞。

注意:由于所有這些異步代碼都在 trpl::run 調(diào)用中的異步塊中運(yùn)行,因此其中的所有代碼都可以避免阻塞。但是,它外面的代碼將在運(yùn)行函數(shù)返回時(shí)阻塞。這就是 trpl::run 函數(shù)的全部意義:它允許你選擇在哪里阻塞某些異步代碼集,以及在哪里在同步代碼和異步代碼之間轉(zhuǎn)換。在大多數(shù)異步運(yùn)行時(shí),run 實(shí)際上被命名為 block_on 正是出于這個(gè)原因。

關(guān)于這個(gè)例子,請(qǐng)注意兩點(diǎn)。首先,消息會(huì)馬上到達(dá)。第二,雖然我們?cè)谶@里使用了 future,但是還沒(méi)有并發(fā)。程序的一切都是按順序進(jìn)行的,就像不涉及 future 一樣。

讓我們通過(guò)發(fā)送一系列消息并在它們之間休眠來(lái)解決第一部分:

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

Rust 還沒(méi)有一種方法可以在一系列異步項(xiàng)上編寫 for 循環(huán),因此我們需要使用 while let 條件循環(huán),只要循環(huán)指定的模式繼續(xù)匹配該值,循環(huán)就會(huì)繼續(xù)執(zhí)行。

rx.recv() 產(chǎn)生一個(gè)我們等待的 future。運(yùn)行時(shí)將暫停 future,直到它準(zhǔn)備好。一旦消息到達(dá),future 將解析為 Some(message)。當(dāng)通道關(guān)閉時(shí),無(wú)論是否有消息到達(dá),future 都將解析為 None,表示沒(méi)有更多的值,因此我們應(yīng)該停止輪詢——也就是說(shuō),停止 await。

while let 循環(huán)將所有這些組合在一起。如果調(diào)用 rx.recv().await 的結(jié)果是Some(message),則可以訪問(wèn)該消息,并可以在循環(huán)體中使用它。如果結(jié)果為 None,則循環(huán)結(jié)束。每次循環(huán)完成時(shí),它都會(huì)再次到達(dá)等待點(diǎn),因此運(yùn)行時(shí)將再次暫停它,直到另一條消息到達(dá)。

代碼現(xiàn)在成功地發(fā)送和接收了所有消息:

在這里插入圖片描述

不幸的是,仍然存在一些問(wèn)題。首先,消息不會(huì)以半秒的間隔到達(dá),它們?cè)谖覀儐?dòng)程序后 2 秒同時(shí)到達(dá)。其次,這個(gè)程序永遠(yuǎn)不會(huì)退出!相反,它會(huì)永遠(yuǎn)等待新的消息。

因?yàn)槌绦蛑兄挥幸粋€(gè)異步塊,因此其中的所有內(nèi)容都是線性運(yùn)行的,仍然沒(méi)有并發(fā)性。所有的 tx.send 調(diào)用都會(huì)發(fā)生,并與所有的 trpl::sleep 調(diào)用及其相關(guān)的等待點(diǎn)穿插在一起。只有這樣,while let 循環(huán)才能通過(guò) recv 調(diào)用上的任何等待點(diǎn)。

為了獲得我們想要的行為,即在每個(gè)消息之間發(fā)生睡眠延遲,我們需要將 tx 和 rx 操作放在各自的異步塊中,然后運(yùn)行時(shí)可以使用 trpl::join 分別執(zhí)行它們中的每一個(gè)。同樣,我們等待調(diào)用 trpl::join 的結(jié)果,而不是單個(gè)的 future。

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

消息以 500 ms 的間隔打印,而不是在 2 s 后匆忙打印。

然而,由于 while let 循環(huán)與 trpl::join 的交互方式,程序仍然不會(huì)退出:

  • 只有當(dāng)傳遞給它的兩個(gè) future 都完成后,從 trpl::join 返回的 future 才會(huì)完成。
  • 在發(fā)送 vals 中的最后一條消息后,一旦結(jié)束 sleep,tx future 就完成了。
  • 直到 while let 循環(huán)結(jié)束,rx future 才會(huì)完成。
  • while let 循環(huán)直到等待 rx.recv 產(chǎn)生 None 才會(huì)結(jié)束。
  • 等待 rx.recv 只會(huì)在通道的另一端關(guān)閉時(shí)返回 None。
  • 只有當(dāng)我們調(diào)用 rx.close 或當(dāng)發(fā)送端 tx 被丟棄時(shí),通道才會(huì)關(guān)閉。
  • 我們不會(huì)在任何地方調(diào)用 rx.close,并且在傳遞給 trpl::run 的最外層異步塊結(jié)束之前,tx 不會(huì)被丟棄。
  • 這個(gè)塊不能結(jié)束,因?yàn)樗?trpl::join 完成時(shí)被阻塞了,這將我們帶回到列表的頂部。

我們可以通過(guò)在某處調(diào)用 rx.close 來(lái)手動(dòng)關(guān)閉 rx,但這沒(méi)有多大意義。在處理任意數(shù)量的消息后停止將使程序關(guān)閉,但我們可能會(huì)錯(cuò)過(guò)消息。我們需要一些其他的方法來(lái)確保 tx 在函數(shù)結(jié)束前被刪除。

現(xiàn)在,我們發(fā)送消息的異步塊只借用 tx,因?yàn)榘l(fā)送消息不需要所有權(quán),但是如果我們可以將 tx 移動(dòng)到異步塊中,那么一旦該塊結(jié)束,它就會(huì)被丟棄。move 關(guān)鍵字對(duì)異步塊的作用就像對(duì)閉包的作用一樣,將數(shù)據(jù)轉(zhuǎn)移到異步塊中。

我們將用于發(fā)送消息的塊從 async 更改為 async move。當(dāng)我們運(yùn)行這個(gè)版本的代碼時(shí),它會(huì)在發(fā)送和接收最后一條消息后優(yōu)雅地關(guān)閉。

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

因?yàn)?tx 所有權(quán)被轉(zhuǎn)移到 async 塊內(nèi),在該塊執(zhí)行完也就是發(fā)送作業(yè)結(jié)束之后,tx 隨之被銷毀,觸發(fā)通道關(guān)閉,接收端返回 None。

這個(gè)異步通道也是一個(gè)多生產(chǎn)者通道,所以如果我們想從多個(gè) future 發(fā)送消息,我們可以在 tx 上調(diào)用 clone。

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}

克隆 tx,在第一個(gè)異步塊之外創(chuàng)建 tx1,我們將 tx1 移動(dòng)到該塊中。然后將原始 tx 移動(dòng)到一個(gè)新的異步塊中,在那里我們以稍慢的延遲發(fā)送更多消息。

用于發(fā)送消息的兩個(gè)異步塊都需要是 async move 塊,以便在這些塊完成時(shí)丟棄 tx 和 tx1。最后,我們從 trpl::join 切換到 trpl::join3 來(lái)處理額外的 future。

現(xiàn)在我們看到了來(lái)自兩個(gè)發(fā)送 future 的所有消息,由于發(fā)送 future 在發(fā)送后使用的延遲略有不同,因此接收消息的間隔也不同。

在這里插入圖片描述

這是一個(gè)良好的開(kāi)端,但它限制了我們的 future 數(shù)量:兩個(gè)對(duì)應(yīng) join,或三個(gè)對(duì)應(yīng) join3。

到此這篇關(guān)于Rust 通過(guò)異步實(shí)現(xiàn)并發(fā)的方法示例的文章就介紹到這了,更多相關(guān)Rust  異步并發(fā)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 一文學(xué)會(huì)Rust語(yǔ)言如何操作JSON

    一文學(xué)會(huì)Rust語(yǔ)言如何操作JSON

    JSON在Web開(kāi)發(fā)中被廣泛應(yīng)用于數(shù)據(jù)交換,本文主要介紹了Rust語(yǔ)言操作JSON,包括序列化、反序列化、JSON創(chuàng)建等多個(gè)方面,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-03-03
  • Windows系統(tǒng)下安裝Rust環(huán)境超詳細(xì)教程

    Windows系統(tǒng)下安裝Rust環(huán)境超詳細(xì)教程

    這篇文章主要介紹了如何在Windows系統(tǒng)上安裝mingw64和Rust,mingw64是一個(gè)輕便的C語(yǔ)言編譯環(huán)境,可以替代Rust默認(rèn)使用的Visual?Studio,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2025-02-02
  • libbpf和Rust開(kāi)發(fā)ebpf程序?qū)崙?zhàn)示例

    libbpf和Rust開(kāi)發(fā)ebpf程序?qū)崙?zhàn)示例

    這篇文章主要為大家介紹了libbpf和Rust開(kāi)發(fā)ebpf程序?qū)崙?zhàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-12-12
  • 如何使用Rust的向量存儲(chǔ)值列表

    如何使用Rust的向量存儲(chǔ)值列表

    本文介紹了在Rust中使用向量存儲(chǔ)值列表的方法,包括創(chuàng)建、更新、讀取、遍歷、存儲(chǔ)多種類型以及內(nèi)存釋放等方面,向量是Rust中常用且強(qiáng)大的集合類型,熟練掌握其用法有助于編寫高效且安全的代碼
    2025-02-02
  • 深入理解Rust中Cargo的使用

    深入理解Rust中Cargo的使用

    本文主要介紹了深入理解Rust中Cargo的使用,Cargo簡(jiǎn)化了項(xiàng)目的構(gòu)建過(guò)程,提供了依賴項(xiàng)管理,以及一系列方便的工作流程工具,下面就來(lái)具體的介紹一下如何使用,感興趣的可以了解一下
    2024-04-04
  • Rust常用特型之ToOwned特型示例詳解

    Rust常用特型之ToOwned特型示例詳解

    在Rust中,假定某類型實(shí)現(xiàn)了Clone特型,如果給你一個(gè)對(duì)它引用,那我們得到它指向內(nèi)容的備份的最常見(jiàn)方式是調(diào)用其clone()函數(shù),這篇文章主要介紹了Rust常用特型之ToOwned特型,需要的朋友可以參考下
    2024-04-04
  • Rust+React創(chuàng)建富文本編輯器

    Rust+React創(chuàng)建富文本編輯器

    這篇文章主要為大家介紹了Rust+React創(chuàng)建富文本編輯器示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-07-07
  • Rust日期與時(shí)間的操作方法

    Rust日期與時(shí)間的操作方法

    Rust的時(shí)間操作主要用到chrono庫(kù),接下來(lái)我將簡(jiǎn)單選一些常用的操作進(jìn)行介紹,感興趣的朋友跟隨小編一起看看吧
    2023-09-09
  • Rust實(shí)現(xiàn)一個(gè)表達(dá)式Parser小結(jié)

    Rust實(shí)現(xiàn)一個(gè)表達(dá)式Parser小結(jié)

    這篇文章主要為大家介紹了Rust實(shí)現(xiàn)一個(gè)表達(dá)式Parser小結(jié),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-11-11
  • Rust之模式與模式匹配的實(shí)現(xiàn)

    Rust之模式與模式匹配的實(shí)現(xiàn)

    Rust中的模式匹配功能強(qiáng)大且靈活,它極大地提高了代碼的表達(dá)力和可讀性,本文主要介紹了Rust之模式與模式匹配,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-03-03

最新評(píng)論