RUST異步流處理方法詳細(xì)講解
Stream 特質(zhì)
在同步Rust 中流的核心是Iterator
提供了一種在序列中產(chǎn)生項(xiàng)的方法,并在它們之間進(jìn)行阻塞,通過迭代器傳遞給其他迭代器
在異步Rust中流的核心Stream, 允許其他任務(wù)在當(dāng)前阻塞等待時允許
Read/Write, AsyncRead/AsyncWrite
fn main() { let f = file::create("E:\\foot.txt").await?; f.write_all(b"hello world").await?; let f = file::open("E:\\foot.txt").await?; let mut buffer = Vec::new(); f.read_to_end(&mut buffer).await?; }
Stream 經(jīng)典子流
source: 可以生成數(shù)據(jù)流
Sink: 可以消費(fèi)數(shù)據(jù)流
Through: 消費(fèi)數(shù)據(jù),對其進(jìn)行操作生成新數(shù)據(jù)流
Duplex: 流可以生成數(shù)據(jù),也可以獨(dú)立消費(fèi)數(shù)據(jù)(AsyncWrite/Read)
asyncread 和 Stream 區(qū)別
這兩種對byte 進(jìn)行操作,AsyncRead 只能對byte進(jìn)行操作(生成未解析數(shù)據(jù)),Stream對任何類型的數(shù)據(jù)進(jìn)行操作(生成解析數(shù)據(jù))
使用for_each_concurrent, try_for_each_concurrent 進(jìn)行并發(fā)的處理流,進(jìn)行流的處理
yield 匿名流
在async 異步過程中使用yield 關(guān)鍵字, 類似于Python 迭代產(chǎn)生時候可以返回,下一次從上一次返回值在進(jìn)行開始跌打
try_join
如果某個發(fā)生錯誤后會立即返回?cái)?shù)據(jù)
使用try_join 需要函數(shù)返回結(jié)果,并且錯誤的類型,才能正常運(yùn)行
use futures; use tokio::runtime::Runtime; use std::io::Result; async fn func1() -> Result<()> { tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await; println!("func1 finished!"); Ok(()) } async fn func2() -> Result<()> { println!("func2 finished!"); Ok(()) } async fn async_main() { let f1 = func1(); let f2 = func2(); if let Err(_) = futures::try_join!(f1, f2) { println!("Err!"); } } fn main() { let mut runtime = Runtime::new().unwrap(); runtime.block_on(async_main()); println!("Hello, world!"); }
select
使用場景 有三個運(yùn)行任務(wù) ,只要其中一個完成后立馬返回,使用select
在使用select啟動使用pin_mut!(f1, f2),
使用select! 進(jìn)行匹配
use futures::{select, future::FutureExt, pin_mut}; use tokio::runtime::Runtime; use std::io::Result; async fn func1() -> Result<()> { tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await; println!("func1 finished!"); Ok(()) } async fn func2() -> Result<()> { println!("func2 finished!"); Ok(()) } async fn async_main() { let f1 = func1().fuse(); let f2 = func2().fuse(); pin_mut!(f1, f2); // 使用select 進(jìn)行匹配 select! { _ = f1 => println!("func1 finished++++++!"), _ = f2 => println!("func2 finished++++++!"), } } fn main() { // 使用tokio的runtime() let mut runtime = Runtime::new().unwrap(); runtime.block_on(async_main()); println!("Hello, world!"); }
select! y與default/complete 一起聯(lián)合使用
complete :表示兩個都已經(jīng)就緒,default表示兩個都沒有就緒
use futures::{future, select, executor}; async fn count() { let mut a_fut = future::ready(4); let mut b_fut = future::ready(6); let mut total = 0; loop { select! { a = a_fut => total += a, b = b_fut => total += b, complete => break, //表示所有的分支都已經(jīng)完成,并且不會再取得進(jìn)展的情況 default => unreachable!(), //表示沒有分支完成 } } assert_eq!(total, 10); } fn main() { executor::block_on(count()); println!("Hello, world!"); }
complete 表示所有分支都已經(jīng)完成,并且不會取得進(jìn)展的情況,如上所示,使用loop 第一次b分支準(zhǔn)備好,下一次循環(huán)可能是a分支,最后兩個分支都已經(jīng)完成后 就break退出
complete 類似讓所有分支都完成后直接退出
SELECT宏幾個條件
- select中使用Future必須首先UnPinFuture trait, Fused trait
- 必須實(shí)現(xiàn)UnpinFuture原因在于select! 不是按照值獲取,按照引用獲取,這樣能夠在不獲取future所有權(quán)條件下,未完成的future可以繼續(xù)使用
- 必須實(shí)現(xiàn)FusedFuture: select 完成后不在輪詢future,因此需要實(shí)現(xiàn)FusedFuture 跟蹤Future是否完成
- 如果select使用stream,其stream 也是需要實(shí)現(xiàn)FusedStream
async 問號使用
如果返回類型有Result<T, E> 結(jié)果使用.await?
Send trait
在保證多線程安全時候 需要保證接口實(shí)現(xiàn)Send trait 、sync trait 才能保證多線程的安全
Send trait 表示數(shù)據(jù)能夠在線程間安全的發(fā)送,sync trait 能夠保證線程安全的引用
use std::rc::Rc; #[derive(Default)] struct NoSend(Rc<()>); async fn bar() {} async fn foo() { NoSend::default(); //{ // let x = NoSend::default(); // //to do : xxxxx //} let _ = NoSend::default(); bar().await; } //Send trait:如果所有的子類型都是實(shí)現(xiàn)Send trait的,那么它本身也是實(shí)現(xiàn)Send Trait的 // 如果內(nèi)部沒有定義 只是使用 是一個Send Trait 主要是在 生成 匿名結(jié)構(gòu)體中 會進(jìn)行解析 not let x: impl Send Trait //struct Foo { // f: Future, //} let x: Not impl Send Trait //struct Foo { // x: NoSend, //not impl Send Trait // f: Future, //impl Send Trait //} fn required_send(_: impl Send) {} fn main() { required_send(foo()); println!("Hello, world!"); }
到此這篇關(guān)于RUST異步流處理方法詳細(xì)講解的文章就介紹到這了,更多相關(guān)RUST異步流處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Rust可迭代類型迭代器正確創(chuàng)建自定義可迭代類型的方法
在 Rust 中, 如果一個類型實(shí)現(xiàn)了 Iterator, 那么它會被同時實(shí)現(xiàn) IntoIterator, 具體邏輯是返回自身, 因?yàn)樽陨砭褪堑?這篇文章主要介紹了Rust可迭代類型迭代器正確創(chuàng)建自定義可迭代類型的方法,需要的朋友可以參考下2023-12-12Rust語言之Prometheus系統(tǒng)監(jiān)控工具包的使用詳解
Prometheus?是一個開源的系統(tǒng)監(jiān)控和警報(bào)工具包,最初是由SoundCloud構(gòu)建的,隨著時間的發(fā)展,Prometheus已經(jīng)具有適用于各種使用場景的版本,為了開發(fā)者方便開發(fā),更是有各種語言版本的Prometheus的開發(fā)工具包,本文主要介紹Rust版本的Prometheus開發(fā)工具包2023-10-10使用vscode配置Rust運(yùn)行環(huán)境全過程
VS Code對Rust有著較完備的支持,這篇文章主要給大家介紹了關(guān)于使用vscode配置Rust運(yùn)行環(huán)境的相關(guān)資料,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2023-06-06MacBook Pro安裝rust編程環(huán)境的過程
rustup是一個用于管理Rust版本和工具鏈的工具,這篇文章主要介紹了MacBook Pro安裝rust編程環(huán)境的過程,感興趣的朋友跟隨小編一起看看吧2024-02-02