RUST異步流處理方法詳細(xì)講解
Stream 特質(zhì)
在同步Rust 中流的核心是Iterator 提供了一種在序列中產(chǎn)生項(xiàng)的方法,并在它們之間進(jìn)行阻塞,通過(guò)迭代器傳遞給其他迭代器
在異步Rust中流的核心Stream, 允許其他任務(wù)在當(dāng)前阻塞等待時(shí)允許
Read/Write, AsyncRead/AsyncWrite
fn main() {
let f = file::create("E:\\foot.txt").await?;
f.write_all(b"hello world").await?;
let f = file::open("E:\\foot.txt").await?;
let mut buffer = Vec::new();
f.read_to_end(&mut buffer).await?;
}Stream 經(jīng)典子流
source: 可以生成數(shù)據(jù)流
Sink: 可以消費(fèi)數(shù)據(jù)流
Through: 消費(fèi)數(shù)據(jù),對(duì)其進(jìn)行操作生成新數(shù)據(jù)流
Duplex: 流可以生成數(shù)據(jù),也可以獨(dú)立消費(fèi)數(shù)據(jù)(AsyncWrite/Read)
asyncread 和 Stream 區(qū)別
這兩種對(duì)byte 進(jìn)行操作,AsyncRead 只能對(duì)byte進(jìn)行操作(生成未解析數(shù)據(jù)),Stream對(duì)任何類型的數(shù)據(jù)進(jìn)行操作(生成解析數(shù)據(jù))
使用for_each_concurrent, try_for_each_concurrent 進(jìn)行并發(fā)的處理流,進(jìn)行流的處理
yield 匿名流
在async 異步過(guò)程中使用yield 關(guān)鍵字, 類似于Python 迭代產(chǎn)生時(shí)候可以返回,下一次從上一次返回值在進(jìn)行開始跌打
try_join
如果某個(gè)發(fā)生錯(cuò)誤后會(huì)立即返回?cái)?shù)據(jù)
使用try_join 需要函數(shù)返回結(jié)果,并且錯(cuò)誤的類型,才能正常運(yùn)行
use futures;
use tokio::runtime::Runtime;
use std::io::Result;
async fn func1() -> Result<()> {
tokio::time::delay_for(tokio::time::Duration::from_secs(1)).await;
println!("func1 finished!");
Ok(())
}
async fn func2() -> Result<()> {
println!("func2 finished!");
Ok(())
}
async fn async_main() {
let f1 = func1();
let f2 = func2();
if let Err(_) = futures::try_join!(f1, f2) {
println!("Err!");
}
}
fn main() {
let mut runtime = Runtime::new().unwrap();
runtime.block_on(async_main());
println!("Hello, world!");
}select
使用場(chǎng)景 有三個(gè)運(yùn)行任務(wù) ,只要其中一個(gè)完成后立馬返回,使用select
在使用select啟動(dòng)使用pin_mut!(f1, f2), 使用select! 進(jìn)行匹配
use futures::{select, future::FutureExt, pin_mut};
use tokio::runtime::Runtime;
use std::io::Result;
async fn func1() -> Result<()> {
tokio::time::delay_for(tokio::time::Duration::from_secs(2)).await;
println!("func1 finished!");
Ok(())
}
async fn func2() -> Result<()> {
println!("func2 finished!");
Ok(())
}
async fn async_main() {
let f1 = func1().fuse();
let f2 = func2().fuse();
pin_mut!(f1, f2);
// 使用select 進(jìn)行匹配
select! {
_ = f1 => println!("func1 finished++++++!"),
_ = f2 => println!("func2 finished++++++!"),
}
}
fn main() {
// 使用tokio的runtime()
let mut runtime = Runtime::new().unwrap();
runtime.block_on(async_main());
println!("Hello, world!");
}select! y與default/complete 一起聯(lián)合使用
complete :表示兩個(gè)都已經(jīng)就緒,default表示兩個(gè)都沒(méi)有就緒
use futures::{future, select, executor};
async fn count() {
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;
loop {
select! {
a = a_fut => total += a,
b = b_fut => total += b,
complete => break, //表示所有的分支都已經(jīng)完成,并且不會(huì)再取得進(jìn)展的情況
default => unreachable!(), //表示沒(méi)有分支完成
}
}
assert_eq!(total, 10);
}
fn main() {
executor::block_on(count());
println!("Hello, world!");
}complete 表示所有分支都已經(jīng)完成,并且不會(huì)取得進(jìn)展的情況,如上所示,使用loop 第一次b分支準(zhǔn)備好,下一次循環(huán)可能是a分支,最后兩個(gè)分支都已經(jīng)完成后 就break退出
complete 類似讓所有分支都完成后直接退出
SELECT宏幾個(gè)條件
- select中使用Future必須首先UnPinFuture trait, Fused trait
- 必須實(shí)現(xiàn)UnpinFuture原因在于select! 不是按照值獲取,按照引用獲取,這樣能夠在不獲取future所有權(quán)條件下,未完成的future可以繼續(xù)使用
- 必須實(shí)現(xiàn)FusedFuture: select 完成后不在輪詢future,因此需要實(shí)現(xiàn)FusedFuture 跟蹤Future是否完成
- 如果select使用stream,其stream 也是需要實(shí)現(xiàn)FusedStream
async 問(wèn)號(hào)使用
如果返回類型有Result<T, E> 結(jié)果使用.await?
Send trait
在保證多線程安全時(shí)候 需要保證接口實(shí)現(xiàn)Send trait 、sync trait 才能保證多線程的安全
Send trait 表示數(shù)據(jù)能夠在線程間安全的發(fā)送,sync trait 能夠保證線程安全的引用
use std::rc::Rc;
#[derive(Default)]
struct NoSend(Rc<()>);
async fn bar() {}
async fn foo() {
NoSend::default();
//{
// let x = NoSend::default();
// //to do : xxxxx
//}
let _ = NoSend::default();
bar().await;
}
//Send trait:如果所有的子類型都是實(shí)現(xiàn)Send trait的,那么它本身也是實(shí)現(xiàn)Send Trait的
// 如果內(nèi)部沒(méi)有定義 只是使用 是一個(gè)Send Trait 主要是在 生成 匿名結(jié)構(gòu)體中 會(huì)進(jìn)行解析
not let x: impl Send Trait
//struct Foo {
// f: Future,
//}
let x: Not impl Send Trait
//struct Foo {
// x: NoSend, //not impl Send Trait
// f: Future, //impl Send Trait
//}
fn required_send(_: impl Send) {}
fn main() {
required_send(foo());
println!("Hello, world!");
}到此這篇關(guān)于RUST異步流處理方法詳細(xì)講解的文章就介紹到這了,更多相關(guān)RUST異步流處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Rust可迭代類型迭代器正確創(chuàng)建自定義可迭代類型的方法
在 Rust 中, 如果一個(gè)類型實(shí)現(xiàn)了 Iterator, 那么它會(huì)被同時(shí)實(shí)現(xiàn) IntoIterator, 具體邏輯是返回自身, 因?yàn)樽陨砭褪堑?這篇文章主要介紹了Rust可迭代類型迭代器正確創(chuàng)建自定義可迭代類型的方法,需要的朋友可以參考下2023-12-12
Rust語(yǔ)言之Prometheus系統(tǒng)監(jiān)控工具包的使用詳解
Prometheus?是一個(gè)開源的系統(tǒng)監(jiān)控和警報(bào)工具包,最初是由SoundCloud構(gòu)建的,隨著時(shí)間的發(fā)展,Prometheus已經(jīng)具有適用于各種使用場(chǎng)景的版本,為了開發(fā)者方便開發(fā),更是有各種語(yǔ)言版本的Prometheus的開發(fā)工具包,本文主要介紹Rust版本的Prometheus開發(fā)工具包2023-10-10
使用vscode配置Rust運(yùn)行環(huán)境全過(guò)程
VS Code對(duì)Rust有著較完備的支持,這篇文章主要給大家介紹了關(guān)于使用vscode配置Rust運(yùn)行環(huán)境的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2023-06-06
Rust語(yǔ)言之trait中的個(gè)方法可以重寫嗎
在Rust中,trait定義了一組方法,這些方法可以被一個(gè)或多個(gè)類型實(shí)現(xiàn),當(dāng)你為某個(gè)類型實(shí)現(xiàn)一個(gè)trait時(shí),你可以為該trait中的每個(gè)方法提供自己的具體實(shí)現(xiàn),本文將給大家介紹一下trait中的個(gè)方法是否可以重寫,需要的朋友可以參考下2023-10-10
MacBook Pro安裝rust編程環(huán)境的過(guò)程
rustup是一個(gè)用于管理Rust版本和工具鏈的工具,這篇文章主要介紹了MacBook Pro安裝rust編程環(huán)境的過(guò)程,感興趣的朋友跟隨小編一起看看吧2024-02-02

