Redis所實(shí)現(xiàn)的Reactor模型設(shè)計(jì)方案
寫在文章開(kāi)頭
我們都知道解決C10k問(wèn)題的最好方案就是通過(guò)在IO多路復(fù)用的基礎(chǔ)上通過(guò)reactor模型實(shí)現(xiàn)高性能的網(wǎng)絡(luò)并發(fā)程序,借助這個(gè)設(shè)計(jì),redis的主線程也是基于IO多路復(fù)用以reactor模型的思路實(shí)現(xiàn)了一個(gè)高性能的單線程內(nèi)存數(shù)據(jù),本文將帶領(lǐng)讀者從源碼的角度來(lái)查看redis關(guān)于reactor模型的設(shè)計(jì)。
詳解Redis中的Reactor模型
Reactor模型掃盲
在此之前我們先來(lái)了解一下Reactor模型,在高性能網(wǎng)絡(luò)并發(fā)程序的設(shè)計(jì)中,Reactor模型通過(guò)reactor接收用戶連接事件、讀事件、寫事件這些網(wǎng)絡(luò)事件,得到連接事件之后通過(guò)acceptor為其分配handler,后續(xù)的這些客戶端的讀寫事件都會(huì)交由handler完成讀寫事件的處理,由此實(shí)現(xiàn)盡可能少的線程處理盡可能多的連接。
詳解reactor的實(shí)現(xiàn)
上文我們簡(jiǎn)單的對(duì)Reactor模型進(jìn)行了簡(jiǎn)單的掃盲,接下來(lái)我們將從redis的源碼來(lái)了解redis對(duì)于Reactor模型的實(shí)現(xiàn),我們都知道Reactor模型是通過(guò)reactor接收連接、讀、寫三種事件的,這一點(diǎn)我們可以直接在main方法看到aeMain的調(diào)用,該方法內(nèi)部本質(zhì)就是通過(guò)epoll模型進(jìn)行非阻塞獲取就的網(wǎng)絡(luò)事件:
int main(int argc, char **argv) {
//前置初始化步驟
//......
//事件循環(huán)輪詢前置操作
aeSetBeforeSleepProc(server.el,beforeSleep);
//執(zhí)行事件驅(qū)動(dòng)框架,循環(huán)處理各種觸發(fā)的事件
aeMain(server.el);
//事件循環(huán)后置操作
aeDeleteEventLoop(server.el);
return 0;
}我們步入aeMain方法,可以看到只要eventLoop沒(méi)有停止就會(huì)無(wú)限循環(huán)調(diào)用aeProcessEvents獲取并處理就緒的事件:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
//......
//輪詢并處理就緒的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}步入aeProcessEvents方法,我們就可以看到redis通過(guò)對(duì)于epoll的封裝函數(shù)aeApiPoll非阻塞獲取就緒的IO事件,注意筆者所強(qiáng)調(diào)的非阻塞獲取,這也就是為什么redis僅僅用一個(gè)主線程即可實(shí)現(xiàn)Reactor模型的原因所在。
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//......
//非阻塞獲取就緒事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
//......
//處理事件
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}對(duì)此我們?cè)俅尾饺?code>aeApiPoll實(shí)現(xiàn)可以看到redis對(duì)于epoll的調(diào)用epoll_wait,得到事件數(shù)retval 之后,直接基于retval遍歷eventLoop的events這里面存儲(chǔ)的就是所有收到的事件aeFiredEvent,redis會(huì)根據(jù)其事件類型累加對(duì)應(yīng)的事件mask值,例如如果是得到的事件類型是EPOLLIN則mask值會(huì)加上AE_READABLE(1),若是標(biāo)準(zhǔn)輸出事件EPOLLOUT則累加AE_WRITABLE即2:

對(duì)應(yīng)的我們給出這段基于epoll實(shí)現(xiàn)reacor的實(shí)現(xiàn),可以看到其reactor通過(guò)事件輪詢獲取對(duì)應(yīng)的事件類型再將其封裝為aeFileEvent存到事件數(shù)組eventLoop->fired中:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
//遍歷事件
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
//根據(jù)事件類型累加讀寫的mask值
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
//將該事件存到fired數(shù)組中
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
//返回事件數(shù)
return numevents;
}詳解事件的封裝
上文我們提到一個(gè)aeFileEvent 事件的概念,該個(gè)事件結(jié)構(gòu)如下圖所示,它通過(guò)mask標(biāo)記當(dāng)前IO事件類型,在epoll輪詢到事件時(shí),它并通過(guò)rfileProc讀事件處理指針和wfileProc寫文件處理保存針對(duì)網(wǎng)絡(luò)IO事件的處理函數(shù),注意這個(gè)處理函數(shù)我們完全可以直接理解為reactor模型中的handler,最后用clientData記錄客戶端私有數(shù)據(jù)的指針:
typedef struct aeFileEvent {
//記錄事件讀寫類型,如果是讀事件READABLE則mask+1,若是寫事件WRITABLE則加2
int mask; /* one of AE_(READABLE|WRITABLE) */
//讀事件處理器指針指向讀事件處理函數(shù)handler
aeFileProc *rfileProc;
//寫事件處理器指針指向讀事件處理函數(shù)handler
aeFileProc *wfileProc;
//記錄客戶端私有數(shù)據(jù)指針
void *clientData;
} aeFileEvent;這里我們以服務(wù)端socket初始化階段為例展示一下aeFileEvent對(duì)應(yīng)處理器的初始化過(guò)程,我們?cè)?code>redis服務(wù)端啟動(dòng)的main函數(shù)可以看到initServer的調(diào)用,該方法會(huì)為當(dāng)前服務(wù)端socket套接字的文件描述符綁定讀事件的處理器acceptTcpHandler:

對(duì)應(yīng)的我們給出這一段事件綁定handler的邏輯的核心代碼段:
int main(int argc, char **argv) {
//......
//server初始化,其內(nèi)部會(huì)完成數(shù)據(jù)結(jié)構(gòu)、鍵值對(duì)數(shù)據(jù)庫(kù)初始化、網(wǎng)絡(luò)框架初始化工作
initServer();
}
void initServer(void) {
//......
for (j = 0; j < server.ipfd_count; j++) {
//為每一個(gè)監(jiān)聽(tīng)服務(wù)端socket的讀事件綁定對(duì)應(yīng)的TCP處理器acceptTcpHandler,并將其注冊(cè)到eventLoop中
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
//......
}輪詢并分發(fā)到handler
上述步驟完成redis server的事件注冊(cè)之后,main方法的aeMain函數(shù)就會(huì)通過(guò)epoll輪詢eventLoop中是否有就緒的IO事件,如果redis server的fd的讀事件就緒就會(huì)交給當(dāng)前對(duì)應(yīng)的讀處理器完成redis客戶端初始化工作,后續(xù)redis客戶端套接字的fd也會(huì)將讀寫事件注冊(cè)到eventLoop中,如此一來(lái)所有的服務(wù)端和客戶端socket的讀寫事件都會(huì)注冊(cè)到epoll上,讓epoll作為reactor進(jìn)行輪詢,然后根據(jù)讀寫事件分配到各自的handler即rfileProc/wfileProc 指針?biāo)赶虻暮瘮?shù)上。
這里我們補(bǔ)充的一下rfileProc/wfileProc指針指向的函數(shù)列表:
rfileProc:如果是redis服務(wù)端則該指針指向acceptTcpHandler處理新連接,如果是客戶端則指向readQueryFromClient處理客戶端的命令。wfileProc:該指針?lè)?wù)端和客戶端都一樣,指向sendReplyToClient用于將響應(yīng)結(jié)果發(fā)送給客戶端。

對(duì)應(yīng)的我們給出上述描述的核心代碼段,可以看到main方法會(huì)調(diào)用aeMain開(kāi)始事件輪詢:
int main(int argc, char **argv) {
//前置初始化步驟
//......
//事件循環(huán)輪詢前置操作
aeSetBeforeSleepProc(server.el,beforeSleep);
//執(zhí)行事件驅(qū)動(dòng)框架,循環(huán)處理各種觸發(fā)的事件
aeMain(server.el);
//事件循環(huán)后置操作
aeDeleteEventLoop(server.el);
return 0;
}步入aeMain即可看到無(wú)限循環(huán)傳入eventLoop查看是否有就緒的事件:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
//......
//傳入eventLoop查看是否有socket的事件就緒
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}繼續(xù)步入aeProcessEvents即看到輪詢就緒事件、acceptor調(diào)用acceptTcpHandler分發(fā)到讀寫的處理器handler上、后續(xù)客戶端都會(huì)基于讀寫handler完成事件處理這樣一套核心的reactor模型設(shè)計(jì):
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//......
//調(diào)用epoll獲取所有就緒的socket的讀寫事件
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
//獲取當(dāng)前事件的讀寫類型為mask賦值
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
//如果是讀事件則交給rfileProc指向的函數(shù),可以是服務(wù)端socket的連接處理器acceptTcpHandler,也可能是客戶端的命令處理器readQueryFromClient
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
//如果是寫事件則調(diào)用wfileProc指向的sendReplyToClient將結(jié)果發(fā)送給客戶端
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
//......
}小結(jié)
自此我們將redis單線程的reactor模型設(shè)計(jì)都分析完成了,希望對(duì)你有幫助。
到此這篇關(guān)于Redis所實(shí)現(xiàn)的Reactor模型的文章就介紹到這了,更多相關(guān)Redis Reactor模型內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis整合Spring結(jié)合使用緩存實(shí)例
這篇文章主要介紹了Redis整合Spring結(jié)合使用緩存實(shí)例,介紹了如何在Spring中配置redis,并通過(guò)Spring中AOP的思想,將緩存的方法切入到有需要進(jìn)入緩存的類或方法前面。需要的朋友可以參考下2015-12-12
Redis動(dòng)態(tài)字符串SDS的實(shí)現(xiàn)
SDS在Redis中是實(shí)現(xiàn)字符串對(duì)象的工具,本文主要介紹了Redis動(dòng)態(tài)字符串SDS的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11
Redis連接池監(jiān)控(連接池是否已滿)與優(yōu)化方法
本文詳細(xì)講解了如何在Linux系統(tǒng)中監(jiān)控Redis連接池的使用情況,以及如何通過(guò)連接池參數(shù)配置、系統(tǒng)資源使用情況、Redis命令監(jiān)控、外部監(jiān)控工具等多種方法進(jìn)行檢測(cè)和優(yōu)化,以確保系統(tǒng)在高并發(fā)場(chǎng)景下的性能和穩(wěn)定性,討論了連接池的概念、工作原理、參數(shù)配置,以及優(yōu)化策略等內(nèi)容2024-09-09
Redis緩存-序列化對(duì)象存儲(chǔ)亂碼問(wèn)題的解決
這篇文章主要介紹了Redis緩存-序列化對(duì)象存儲(chǔ)亂碼問(wèn)題的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06
基于Redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)方法
這篇文章主要介紹了基于Redis的分布式鎖的簡(jiǎn)單實(shí)現(xiàn)方法,Redis官方給出兩種思路,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-10-10
redis實(shí)現(xiàn)延遲任務(wù)的項(xiàng)目實(shí)踐
本文主要介紹了redis實(shí)現(xiàn)延遲任務(wù)的項(xiàng)目實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07
Redis從單點(diǎn)到集群部署模式(單機(jī)模式?主從模式?哨兵模式)
這篇文章主要為大家介紹了Redis從單點(diǎn)集群部署模式(單機(jī)模式?主從模式?哨兵模式)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11
Redis高并發(fā)防止秒殺超賣實(shí)戰(zhàn)源碼解決方案
本文主要介紹了Redis高并發(fā)防止秒殺超賣實(shí)戰(zhàn)源碼解決方案,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10

