Redis所實(shí)現(xiàn)的Reactor模型設(shè)計(jì)方案
寫在文章開頭
我們都知道解決C10k
問題的最好方案就是通過在IO多路復(fù)用
的基礎(chǔ)上通過reactor
模型實(shí)現(xiàn)高性能的網(wǎng)絡(luò)并發(fā)程序,借助這個(gè)設(shè)計(jì),redis的主線程也是基于IO多路復(fù)用
以reactor
模型的思路實(shí)現(xiàn)了一個(gè)高性能的單線程內(nèi)存數(shù)據(jù),本文將帶領(lǐng)讀者從源碼的角度來查看redis
關(guān)于reactor
模型的設(shè)計(jì)。
詳解Redis中的Reactor模型
Reactor模型掃盲
在此之前我們先來了解一下Reactor
模型,在高性能網(wǎng)絡(luò)并發(fā)程序的設(shè)計(jì)中,Reactor
模型通過reactor
接收用戶連接事件、讀事件、寫事件這些網(wǎng)絡(luò)事件,得到連接事件之后通過acceptor
為其分配handler
,后續(xù)的這些客戶端的讀寫事件都會(huì)交由handler
完成讀寫事件的處理,由此實(shí)現(xiàn)盡可能少的線程處理盡可能多的連接。
詳解reactor的實(shí)現(xiàn)
上文我們簡(jiǎn)單的對(duì)Reactor
模型進(jìn)行了簡(jiǎn)單的掃盲,接下來我們將從redis
的源碼來了解redis
對(duì)于Reactor
模型的實(shí)現(xiàn),我們都知道Reactor
模型是通過reactor接收連接、讀、寫三種事件的,這一點(diǎn)我們可以直接在main
方法看到aeMain
的調(diào)用,該方法內(nèi)部本質(zhì)就是通過epoll模型進(jìn)行非阻塞獲取就的網(wǎng)絡(luò)事件:
int main(int argc, char **argv) { //前置初始化步驟 //...... //事件循環(huán)輪詢前置操作 aeSetBeforeSleepProc(server.el,beforeSleep); //執(zhí)行事件驅(qū)動(dòng)框架,循環(huán)處理各種觸發(fā)的事件 aeMain(server.el); //事件循環(huán)后置操作 aeDeleteEventLoop(server.el); return 0; }
我們步入aeMain
方法,可以看到只要eventLoop
沒有停止就會(huì)無限循環(huán)調(diào)用aeProcessEvents
獲取并處理就緒的事件:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { //...... //輪詢并處理就緒的事件 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
步入aeProcessEvents
方法,我們就可以看到redis
通過對(duì)于epoll
的封裝函數(shù)aeApiPoll
非阻塞獲取就緒的IO事件
,注意筆者所強(qiáng)調(diào)的非阻塞獲取,這也就是為什么redis僅僅用一個(gè)主線程即可實(shí)現(xiàn)Reactor模型的原因所在。
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { //...... //非阻塞獲取就緒事件 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { //...... //處理事件 processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ }
對(duì)此我們?cè)俅尾饺?code>aeApiPoll實(shí)現(xiàn)可以看到redis
對(duì)于epoll
的調(diào)用epoll_wait
,得到事件數(shù)retval
之后,直接基于retval
遍歷eventLoop
的events
這里面存儲(chǔ)的就是所有收到的事件aeFiredEvent
,redis
會(huì)根據(jù)其事件類型累加對(duì)應(yīng)的事件mask
值,例如如果是得到的事件類型是EPOLLIN
則mask值會(huì)加上AE_READABLE
(1),若是標(biāo)準(zhǔn)輸出事件EPOLLOUT
則累加AE_WRITABLE
即2:
對(duì)應(yīng)的我們給出這段基于epoll
實(shí)現(xiàn)reacor
的實(shí)現(xiàn),可以看到其reactor
通過事件輪詢獲取對(duì)應(yīng)的事件類型再將其封裝為aeFileEvent
存到事件數(shù)組eventLoop->fired
中:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { int j; numevents = retval; //遍歷事件 for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; //根據(jù)事件類型累加讀寫的mask值 if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE; //將該事件存到fired數(shù)組中 eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } //返回事件數(shù) return numevents; }
詳解事件的封裝
上文我們提到一個(gè)aeFileEvent
事件的概念,該個(gè)事件結(jié)構(gòu)如下圖所示,它通過mask
標(biāo)記當(dāng)前IO事件類型,在epoll
輪詢到事件時(shí),它并通過rfileProc
讀事件處理指針和wfileProc
寫文件處理保存針對(duì)網(wǎng)絡(luò)IO事件
的處理函數(shù),注意這個(gè)處理函數(shù)我們完全可以直接理解為reactor
模型中的handler
,最后用clientData
記錄客戶端私有數(shù)據(jù)的指針:
typedef struct aeFileEvent { //記錄事件讀寫類型,如果是讀事件READABLE則mask+1,若是寫事件WRITABLE則加2 int mask; /* one of AE_(READABLE|WRITABLE) */ //讀事件處理器指針指向讀事件處理函數(shù)handler aeFileProc *rfileProc; //寫事件處理器指針指向讀事件處理函數(shù)handler aeFileProc *wfileProc; //記錄客戶端私有數(shù)據(jù)指針 void *clientData; } aeFileEvent;
這里我們以服務(wù)端socket
初始化階段為例展示一下aeFileEvent
對(duì)應(yīng)處理器的初始化過程,我們?cè)?code>redis服務(wù)端啟動(dòng)的main
函數(shù)可以看到initServer
的調(diào)用,該方法會(huì)為當(dāng)前服務(wù)端socket套接字的文件描述符綁定讀事件的處理器acceptTcpHandler
:
對(duì)應(yīng)的我們給出這一段事件綁定handler
的邏輯的核心代碼段:
int main(int argc, char **argv) { //...... //server初始化,其內(nèi)部會(huì)完成數(shù)據(jù)結(jié)構(gòu)、鍵值對(duì)數(shù)據(jù)庫初始化、網(wǎng)絡(luò)框架初始化工作 initServer(); } void initServer(void) { //...... for (j = 0; j < server.ipfd_count; j++) { //為每一個(gè)監(jiān)聽服務(wù)端socket的讀事件綁定對(duì)應(yīng)的TCP處理器acceptTcpHandler,并將其注冊(cè)到eventLoop中 if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } //...... }
輪詢并分發(fā)到handler
上述步驟完成redis server
的事件注冊(cè)之后,main
方法的aeMain
函數(shù)就會(huì)通過epoll
輪詢eventLoop
中是否有就緒的IO事件,如果redis server
的fd
的讀事件就緒就會(huì)交給當(dāng)前對(duì)應(yīng)的讀處理器完成redis
客戶端初始化工作,后續(xù)redis
客戶端套接字的fd
也會(huì)將讀寫事件注冊(cè)到eventLoop
中,如此一來所有的服務(wù)端和客戶端socket
的讀寫事件都會(huì)注冊(cè)到epoll
上,讓epoll
作為reactor
進(jìn)行輪詢,然后根據(jù)讀寫事件分配到各自的handler
即rfileProc/wfileProc
指針?biāo)赶虻暮瘮?shù)上。
這里我們補(bǔ)充的一下rfileProc/wfileProc
指針指向的函數(shù)列表:
rfileProc
:如果是redis
服務(wù)端則該指針指向acceptTcpHandler
處理新連接,如果是客戶端則指向readQueryFromClient
處理客戶端的命令。wfileProc
:該指針服務(wù)端和客戶端都一樣,指向sendReplyToClient
用于將響應(yīng)結(jié)果發(fā)送給客戶端。
對(duì)應(yīng)的我們給出上述描述的核心代碼段,可以看到main
方法會(huì)調(diào)用aeMain
開始事件輪詢:
int main(int argc, char **argv) { //前置初始化步驟 //...... //事件循環(huán)輪詢前置操作 aeSetBeforeSleepProc(server.el,beforeSleep); //執(zhí)行事件驅(qū)動(dòng)框架,循環(huán)處理各種觸發(fā)的事件 aeMain(server.el); //事件循環(huán)后置操作 aeDeleteEventLoop(server.el); return 0; }
步入aeMain
即可看到無限循環(huán)傳入eventLoop
查看是否有就緒的事件:
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { //...... //傳入eventLoop查看是否有socket的事件就緒 aeProcessEvents(eventLoop, AE_ALL_EVENTS); } }
繼續(xù)步入aeProcessEvents
即看到輪詢就緒事件、acceptor
調(diào)用acceptTcpHandler
分發(fā)到讀寫的處理器handler
上、后續(xù)客戶端都會(huì)基于讀寫handler
完成事件處理這樣一套核心的reactor
模型設(shè)計(jì):
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { //...... //調(diào)用epoll獲取所有就緒的socket的讀寫事件 numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) { //獲取當(dāng)前事件的讀寫類型為mask賦值 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int rfired = 0; //如果是讀事件則交給rfileProc指向的函數(shù),可以是服務(wù)端socket的連接處理器acceptTcpHandler,也可能是客戶端的命令處理器readQueryFromClient if (fe->mask & mask & AE_READABLE) { rfired = 1; fe->rfileProc(eventLoop,fd,fe->clientData,mask); } //如果是寫事件則調(diào)用wfileProc指向的sendReplyToClient將結(jié)果發(fā)送給客戶端 if (fe->mask & mask & AE_WRITABLE) { if (!rfired || fe->wfileProc != fe->rfileProc) fe->wfileProc(eventLoop,fd,fe->clientData,mask); } processed++; } } //...... }
小結(jié)
自此我們將redis單線程的reactor模型設(shè)計(jì)都分析完成了,希望對(duì)你有幫助。
到此這篇關(guān)于Redis所實(shí)現(xiàn)的Reactor模型的文章就介紹到這了,更多相關(guān)Redis Reactor模型內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis整合Spring結(jié)合使用緩存實(shí)例
這篇文章主要介紹了Redis整合Spring結(jié)合使用緩存實(shí)例,介紹了如何在Spring中配置redis,并通過Spring中AOP的思想,將緩存的方法切入到有需要進(jìn)入緩存的類或方法前面。需要的朋友可以參考下2015-12-12Redis動(dòng)態(tài)字符串SDS的實(shí)現(xiàn)
SDS在Redis中是實(shí)現(xiàn)字符串對(duì)象的工具,本文主要介紹了Redis動(dòng)態(tài)字符串SDS的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11Redis連接池監(jiān)控(連接池是否已滿)與優(yōu)化方法
本文詳細(xì)講解了如何在Linux系統(tǒng)中監(jiān)控Redis連接池的使用情況,以及如何通過連接池參數(shù)配置、系統(tǒng)資源使用情況、Redis命令監(jiān)控、外部監(jiān)控工具等多種方法進(jìn)行檢測(cè)和優(yōu)化,以確保系統(tǒng)在高并發(fā)場(chǎng)景下的性能和穩(wěn)定性,討論了連接池的概念、工作原理、參數(shù)配置,以及優(yōu)化策略等內(nèi)容2024-09-09Redis緩存-序列化對(duì)象存儲(chǔ)亂碼問題的解決
這篇文章主要介紹了Redis緩存-序列化對(duì)象存儲(chǔ)亂碼問題的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06基于Redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)方法
這篇文章主要介紹了基于Redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)方法,Redis官方給出兩種思路,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-10-10redis實(shí)現(xiàn)延遲任務(wù)的項(xiàng)目實(shí)踐
本文主要介紹了redis實(shí)現(xiàn)延遲任務(wù)的項(xiàng)目實(shí)踐,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07Redis從單點(diǎn)到集群部署模式(單機(jī)模式?主從模式?哨兵模式)
這篇文章主要為大家介紹了Redis從單點(diǎn)集群部署模式(單機(jī)模式?主從模式?哨兵模式)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11Redis高并發(fā)防止秒殺超賣實(shí)戰(zhàn)源碼解決方案
本文主要介紹了Redis高并發(fā)防止秒殺超賣實(shí)戰(zhàn)源碼解決方案,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10