C和Java沒(méi)那么香了,Serverless時(shí)代Rust即將稱(chēng)王?
高并發(fā)模式初探
在這個(gè)高并發(fā)時(shí)代最重要的設(shè)計(jì)模式無(wú)疑是生產(chǎn)者、消費(fèi)者模式,比如著名的消息隊(duì)列kafka其實(shí)就是一個(gè)生產(chǎn)者消費(fèi)者模式的典型實(shí)現(xiàn)。其實(shí)生產(chǎn)者消費(fèi)者問(wèn)題,也就是有限緩沖問(wèn)題,可以用以下場(chǎng)景進(jìn)行簡(jiǎn)要描述,生產(chǎn)者生成一定量的產(chǎn)品放到庫(kù)房,并不斷重復(fù)此過(guò)程;與此同時(shí),消費(fèi)者也在緩沖區(qū)消耗這些數(shù)據(jù),但由于庫(kù)房大小有限,所以生產(chǎn)者和消費(fèi)者之間步調(diào)協(xié)調(diào),生產(chǎn)者不會(huì)在庫(kù)房滿(mǎn)的情況放入端口,消費(fèi)者也不會(huì)在庫(kù)房空時(shí)消耗數(shù)據(jù)。詳見(jiàn)下圖:
而如果在生產(chǎn)者與消費(fèi)者之間完美協(xié)調(diào)并保持高效,這就是高并發(fā)要解決的本質(zhì)問(wèn)題。
C語(yǔ)言的高并發(fā)案例
筆者在前文曾經(jīng)介紹過(guò)TDEngine的相關(guān)代碼,其中Sheduler模塊的相關(guān)調(diào)度算法就使用了生產(chǎn)、消費(fèi)者模式進(jìn)行消息傳遞功能的實(shí)現(xiàn),也就是有多個(gè)生產(chǎn)者(producer)生成并不斷向隊(duì)列中傳遞消息,也有多個(gè)消費(fèi)者(consumer)不斷從隊(duì)列中取消息。
后面我們也會(huì)說(shuō)明類(lèi)型功能在Go、Java等高級(jí)語(yǔ)言中類(lèi)似的功能已經(jīng)被封裝好了,但是在C語(yǔ)言中你就必須要用好互斥體( mutex)和信號(hào)量(semaphore)并協(xié)調(diào)他們之間的關(guān)系。由于C語(yǔ)言的實(shí)現(xiàn)是最復(fù)雜的,先來(lái)看結(jié)構(gòu)體設(shè)計(jì)和他的注釋?zhuān)?/p>
typedef struct { char label[16];//消息內(nèi)容 sem_t emptySem;//此信號(hào)量代表隊(duì)列的可寫(xiě)狀態(tài) sem_t fullSem;//此信號(hào)量代表隊(duì)列的可讀狀態(tài) pthread_mutex_t queueMutex;//此互斥體為保證消息不會(huì)被誤修改,保證線(xiàn)程程安全 int fullSlot;//隊(duì)尾位置 int emptySlot;//隊(duì)頭位置 int queueSize;#隊(duì)列長(zhǎng)度 int numOfThreads;//同時(shí)操作的線(xiàn)程數(shù)量 pthread_t * qthread;//線(xiàn)程指針 SSchedMsg * queue;//隊(duì)列指針 } SSchedQueue;
再來(lái)看Shceduler初始化函數(shù),這里需要特別說(shuō)明的是,兩個(gè)信號(hào)量的創(chuàng)建,其中emptySem是隊(duì)列的可寫(xiě)狀態(tài),初始化時(shí)其值為queueSize,即初始時(shí)隊(duì)列可寫(xiě),可接受消息長(zhǎng)度為隊(duì)列長(zhǎng)度,fullSem是隊(duì)列的可讀狀態(tài),初始化時(shí)其值為0,即初始時(shí)隊(duì)列不可讀。具體代碼及我的注釋如下:
void *taosInitScheduler(int queueSize, int numOfThreads, char *label) { pthread_attr_t attr; SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue)); memset(pSched, 0, sizeof(SSchedQueue)); pSched->queueSize = queueSize; pSched->numOfThreads = numOfThreads; strcpy(pSched->label, label); if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) { pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno)); goto _error; } //emptySem是隊(duì)列的可寫(xiě)狀態(tài),初始化時(shí)其值為queueSize,即初始時(shí)隊(duì)列可寫(xiě),可接受消息長(zhǎng)度為隊(duì)列長(zhǎng)度。 if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) { pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno)); goto _error; } //fullSem是隊(duì)列的可讀狀態(tài),初始化時(shí)其值為0,即初始時(shí)隊(duì)列不可讀 if (sem_init(&pSched->fullSem, 0, 0) != 0) { pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno)); goto _error; } if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) { pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno)); goto _error; } memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg)); pSched->fullSlot = 0;//實(shí)始化時(shí)隊(duì)列為空,故隊(duì)頭和隊(duì)尾的位置都是0 pSched->emptySlot = 0;//實(shí)始化時(shí)隊(duì)列為空,故隊(duì)頭和隊(duì)尾的位置都是0 pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads); pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); for (int i = 0; i < pSched->numOfThreads; ++i) { if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) { pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno)); goto _error; } } pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads); return (void *)pSched; _error: taosCleanUpScheduler(pSched); return NULL; }
再來(lái)看讀消息的taosProcessSchedQueue函數(shù)這其實(shí)是消費(fèi)者一方的實(shí)現(xiàn),這個(gè)函數(shù)的主要邏輯是
1.使用無(wú)限循環(huán),只要隊(duì)列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續(xù)向下處理
2.在操作msg前,加入互斥體防止msg被誤用。
3.讀操作完畢后修改fullSlot的值,注意這為避免fullSlot溢出,需要對(duì)于queueSize取余。同時(shí)退出互斥體。
4.對(duì)emptySem進(jìn)行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個(gè)消息后,emptySem的值為6,即可寫(xiě)狀態(tài),且能接受的消息數(shù)量為6
具體代碼及注釋如下:
void *taosProcessSchedQueue(void *param) { SSchedMsg msg; SSchedQueue *pSched = (SSchedQueue *)param; //注意這里是個(gè)無(wú)限循環(huán),只要隊(duì)列可讀即sem_wait(&pSched->fullSem)不再阻塞就繼續(xù)處理 while (1) { if (sem_wait(&pSched->fullSem) != 0) { pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno)); if (errno == EINTR) { /* sem_wait is interrupted by interrupt, ignore and continue */ continue; } } //加入互斥體防止msg被誤用。 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); msg = pSched->queue[pSched->fullSlot]; memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg)); //讀取完畢修改fullSlot的值,注意這為避免fullSlot溢出,需要對(duì)于queueSize取余。 pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize; //讀取完畢修改退出互斥體 if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError("unlock %s queueMutex failed, reason:%s\n", pSched->label, strerror(errno)); //讀取完畢對(duì)emptySem進(jìn)行post操作,即把emptySem的值加1,如emptySem原值為5,讀取一個(gè)消息后,emptySem的值為6,即可寫(xiě)狀態(tài),且能接受的消息數(shù)量為6 if (sem_post(&pSched->emptySem) != 0) pError("post %s emptySem failed, reason:%s\n", pSched->label, strerror(errno)); if (msg.fp) (*(msg.fp))(&msg); else if (msg.tfp) (*(msg.tfp))(msg.ahandle, msg.thandle); } }
最后寫(xiě)消息的taosScheduleTask函數(shù)也就是生產(chǎn)的實(shí)現(xiàn),其基本邏輯是
1.寫(xiě)隊(duì)列前先對(duì)emptySem進(jìn)行減1操作,如emptySem原值為1,那么減1后為0,也就是隊(duì)列已滿(mǎn),必須在讀取消息后,即emptySem進(jìn)行post操作后,隊(duì)列才能進(jìn)行可寫(xiě)狀態(tài)。
2.加入互斥體防止msg被誤操作,寫(xiě)入完成后退出互斥體
3.寫(xiě)隊(duì)列完成后對(duì)fullSem進(jìn)行加1操作,如fullSem原值為0,那么加1后為1,也就是隊(duì)列可讀,咱們上面介紹的讀取taosProcessSchedQueue中sem_wait(&pSched->fullSem)不再阻塞就繼續(xù)向下。
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) { SSchedQueue *pSched = (SSchedQueue *)qhandle; if (pSched == NULL) { pError("sched is not ready, msg:%p is dropped", pMsg); return 0; } //在寫(xiě)隊(duì)列前先對(duì)emptySem進(jìn)行減1操作,如emptySem原值為1,那么減1后為0,也就是隊(duì)列已滿(mǎn),必須在讀取消息后,即emptySem進(jìn)行post操作后,隊(duì)列才能進(jìn)行可寫(xiě)狀態(tài)。 if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno)); //加入互斥體防止msg被誤操作 if (pthread_mutex_lock(&pSched->queueMutex) != 0) pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); pSched->queue[pSched->emptySlot] = *pMsg; pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize; if (pthread_mutex_unlock(&pSched->queueMutex) != 0) pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno)); //在寫(xiě)隊(duì)列前先對(duì)fullSem進(jìn)行加1操作,如fullSem原值為0,那么加1后為1,也就是隊(duì)列可讀,咱們上面介紹的讀取函數(shù)可以進(jìn)行處理。 if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno)); return 0; }
Java的高并發(fā)實(shí)現(xiàn)
從并發(fā)模型來(lái)看,Go和Rust都有channel這個(gè)概念,也都是通過(guò)Channel來(lái)實(shí)現(xiàn)線(xiàn)(協(xié))程間的同步,由于channel帶有讀寫(xiě)狀態(tài)且保證數(shù)據(jù)順序,而且channel的封裝程度和效率明顯可以做的更高,因此Go和Rust官方都會(huì)建議使用channel(通信)來(lái)共享內(nèi)存,而不是使用共享內(nèi)存來(lái)通信。
為了讓幫助大家找到區(qū)別,我們先以Java為例來(lái),看一下沒(méi)有channel的高級(jí)語(yǔ)言Java,生產(chǎn)者消費(fèi)者該如何實(shí)現(xiàn),代碼及注釋如下:
public class Storage { // 倉(cāng)庫(kù)最大存儲(chǔ)量 private final int MAX_SIZE = 10; // 倉(cāng)庫(kù)存儲(chǔ)的載體 private LinkedList<Object> list = new LinkedList<Object>(); // 鎖 private final Lock lock = new ReentrantLock(); // 倉(cāng)庫(kù)滿(mǎn)的信號(hào)量 private final Condition full = lock.newCondition(); // 倉(cāng)庫(kù)空的信號(hào)量 private final Condition empty = lock.newCondition(); public void produce() { // 獲得鎖 lock.lock(); while (list.size() + 1 > MAX_SIZE) { System.out.println("【生產(chǎn)者" + Thread.currentThread().getName() + "】倉(cāng)庫(kù)已滿(mǎn)"); try { full.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(new Object()); System.out.println("【生產(chǎn)者" + Thread.currentThread().getName() + "】生產(chǎn)一個(gè)產(chǎn)品,現(xiàn)庫(kù)存" + list.size()); empty.signalAll(); lock.unlock(); } public void consume() { // 獲得鎖 lock.lock(); while (list.size() == 0) { System.out.println("【消費(fèi)者" + Thread.currentThread().getName() + "】倉(cāng)庫(kù)為空"); try { empty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.remove(); System.out.println("【消費(fèi)者" + Thread.currentThread().getName() + "】消費(fèi)一個(gè)產(chǎn)品,現(xiàn)庫(kù)存" + list.size()); full.signalAll(); lock.unlock(); } }
在Java、C#這種面向?qū)ο螅菦](méi)有channel語(yǔ)言中,生產(chǎn)者、消費(fèi)者模式至少要借助一個(gè)lock和兩個(gè)信號(hào)量共同完成。其中鎖的作用是保證同是時(shí)間,倉(cāng)庫(kù)中只有一個(gè)用戶(hù)進(jìn)行數(shù)據(jù)的修改,而還需要表示倉(cāng)庫(kù)滿(mǎn)的信號(hào)量,一旦達(dá)到倉(cāng)庫(kù)滿(mǎn)的情況則將此信號(hào)量置為阻塞狀態(tài),從而阻止其它生產(chǎn)者再向倉(cāng)庫(kù)運(yùn)商品了,反之倉(cāng)庫(kù)空的信號(hào)量也是一樣,一旦倉(cāng)庫(kù)空了,也要阻其它消費(fèi)者再前來(lái)消費(fèi)了。
Go的高并發(fā)實(shí)現(xiàn)
我們剛剛也介紹過(guò)了Go語(yǔ)言中官方推薦使用channel來(lái)實(shí)現(xiàn)協(xié)程間通信,所以不需要再添加lock和信號(hào)量就能實(shí)現(xiàn)模式了,以下代碼中我們通過(guò)子goroutine完成了生產(chǎn)者的功能,在在另一個(gè)子goroutine中實(shí)現(xiàn)了消費(fèi)者的功能,注意要阻塞主goroutine以確保子goroutine能夠執(zhí)行,從而輕而易舉的就這完成了生產(chǎn)者消費(fèi)者模式。下面我們就通過(guò)具體實(shí)踐中來(lái)看一下生產(chǎn)者消費(fèi)者模型的實(shí)現(xiàn)。
package main import ( "fmt" "time" ) func Product(ch chan<- int) { //生產(chǎn)者 for i := 0; i < 3; i++ { fmt.Println("Product produceed", i) ch <- i //由于channel是goroutine安全的,所以此處沒(méi)有必要必須加鎖或者加lock操作. } } func Consumer(ch <-chan int) { for i := 0; i < 3; i++ { j := <-ch //由于channel是goroutine安全的,所以此處沒(méi)有必要必須加鎖或者加lock操作. fmt.Println("Consmuer consumed ", j) } } func main() { ch := make(chan int) go Product(ch)//注意生產(chǎn)者與消費(fèi)者放在不同goroutine中 go Consumer(ch)//注意生產(chǎn)者與消費(fèi)者放在不同goroutine中 time.Sleep(time.Second * 1)//防止主goroutine退出 /*運(yùn)行結(jié)果并不確定,可能為 Product produceed 0 Product produceed 1 Consmuer consumed 0 Consmuer consumed 1 Product produceed 2 Consmuer consumed 2 */ }
可以看到和Java比起來(lái)使用GO來(lái)實(shí)現(xiàn)并發(fā)式的生產(chǎn)者消費(fèi)者模式的確是更為清爽了。
Rust的高并發(fā)實(shí)現(xiàn)
不得不說(shuō)Rust的難度實(shí)在太高了,雖然筆者之前在匯編、C、Java等方面的經(jīng)驗(yàn)可以幫助我快速掌握Go語(yǔ)言。但是假期看了兩天Rust真想大呼告辭,這尼瑪也太勸退了。在Rust官方提供的功能中,其實(shí)并不包括多生產(chǎn)者、多消費(fèi)者的channel,std:sync空間下只有一個(gè)多生產(chǎn)者單消費(fèi)者(mpsc)的channel。其樣例實(shí)現(xiàn)如下:
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); let tx1 = mpsc::Sender::clone(&tx); let tx2 = mpsc::Sender::clone(&tx); thread::spawn(move || { let vals = vec![ String::from("1"), String::from("3"), String::from("5"), String::from("7"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("11"), String::from("13"), String::from("15"), String::from("17"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("21"), String::from("23"), String::from("25"), String::from("27"), ]; for val in vals { tx2.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for rec in rx { println!("Got: {}", rec); } }
可以看到在Rust下實(shí)現(xiàn)生產(chǎn)者消費(fèi)者是不難的,但是生產(chǎn)者可以clone多個(gè),不過(guò)消費(fèi)者卻只能有一個(gè),究其原因是因?yàn)镽ust下沒(méi)有GC也就是垃圾回收功能,而想保證安全Rust就必須要對(duì)于變更使用權(quán)限進(jìn)行嚴(yán)格管理。在Rust下使用move關(guān)鍵字進(jìn)行變更的所有權(quán)轉(zhuǎn)移,但是按照Rust對(duì)于變更生產(chǎn)周期的管理規(guī)定,線(xiàn)程間權(quán)限轉(zhuǎn)移的所有權(quán)接收者在同一時(shí)間只能有一個(gè),這也是Rust官方只提供MPSC的原因,
use std::thread; fn main() { let s = "hello"; let handle = thread::spawn(move || { println!("{}", s); }); handle.join().unwrap(); }
當(dāng)然Rust下有一個(gè)API比較貼心就是join,他可以所有子線(xiàn)程都執(zhí)行結(jié)束再退出主線(xiàn)程,這比Go中要手工阻塞還是要有一定的提高。而如果你想用多生產(chǎn)者、多消費(fèi)者的功能,就要入手crossbeam模塊了,這個(gè)模塊掌握起來(lái)難度也真的不低。
總結(jié)
通過(guò)上面的比較我們可以用一張表格來(lái)說(shuō)明幾種主流語(yǔ)言的情況對(duì)比:
語(yǔ)言 | 安全性 | 運(yùn)行速度 | 進(jìn)程啟動(dòng)速度 | 學(xué)習(xí)難度 |
C | 低 | 極快 | 極快 | 困難 |
Java | 高 | 一般 | 一般 | 一般 |
Go | 高 | 較快 | 較快 | 一般 |
Rust | 高 | 極快(基本比肩C) | 極快(基本比肩C) | 極困難 |
可以看到Rust以其高安全性、基本比肩C的運(yùn)行及啟動(dòng)速度必將在Serverless的時(shí)代獨(dú)占鰲頭,Go基本也能緊隨其后,而C語(yǔ)言程序中難以避免的野指針,Java相對(duì)較低的運(yùn)行及啟動(dòng)速度,可能都不太適用于函數(shù)式運(yùn)算的場(chǎng)景,Java在企業(yè)級(jí)開(kāi)發(fā)的時(shí)代打敗各種C#之類(lèi)的對(duì)手,但是在云時(shí)代好像還真沒(méi)有之前統(tǒng)治力那么強(qiáng)了,真可謂是打敗你的往往不是你的對(duì)手,而是其它空間的降維打擊。
這篇文章的內(nèi)容就到這了,希望能給你帶來(lái)幫助,也希望您可以多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
使用Rust采集天氣預(yù)報(bào)信息并實(shí)現(xiàn)實(shí)時(shí)更新數(shù)據(jù)功能
Rust作為一種高效、安全的編程語(yǔ)言,可以用于開(kāi)發(fā)各種應(yīng)用,包括天氣預(yù)報(bào)采集系統(tǒng),本文將探討如何使用Rust來(lái)采集天氣預(yù)報(bào)信息,并實(shí)現(xiàn)實(shí)時(shí)更新數(shù)據(jù)的功能,文中通過(guò)代碼示例給大家介紹的非常詳細(xì),需要的朋友可以參考下2024-01-01Rust實(shí)現(xiàn)一個(gè)表達(dá)式Parser小結(jié)
這篇文章主要為大家介紹了Rust實(shí)現(xiàn)一個(gè)表達(dá)式Parser小結(jié),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-11-11Rust中的Cargo構(gòu)建、運(yùn)行、調(diào)試
Cargo是rustup安裝后自帶的,Cargo?是?Rust?的構(gòu)建系統(tǒng)和包管理器,這篇文章主要介紹了Rust之Cargo構(gòu)建、運(yùn)行、調(diào)試,需要的朋友可以參考下2022-09-09在Rust中編寫(xiě)自定義Error的詳細(xì)代碼
Result<T, E> 類(lèi)型可以方便地用于錯(cuò)誤傳導(dǎo),Result<T, E>是模板類(lèi)型,實(shí)例化后可以是各種類(lèi)型,但 Rust 要求傳導(dǎo)的 Result 中的 E 是相同類(lèi)型的,所以我們需要編寫(xiě)自己的 Error 類(lèi)型,本文給大家介紹了在Rust中編寫(xiě)自定義Error的詳細(xì)代碼,需要的朋友可以參考下2024-01-01