C++實(shí)現(xiàn)高并發(fā)異步定時(shí)器
在C++高并發(fā)場景,定時(shí)功能的實(shí)現(xiàn)有三大難題:高效、精準(zhǔn)、原子性。
除了定時(shí)任務(wù)隨時(shí)可能到期、而進(jìn)程隨時(shí)可能要退出之外,最近Workflow甚至為定時(shí)任務(wù)增加了取消功能,導(dǎo)致任務(wù)可能被框架調(diào)起之前被用戶取消,或者創(chuàng)建之后不想執(zhí)行直接刪除等情況,而這些情況大部分來說都是由不同線程執(zhí)行的,因此其中的并發(fā)處理可謂教科書級(jí)別!
那么就和大家一起看看Workflow在定時(shí)器的設(shè)計(jì)上做了哪些考慮,深扒細(xì)節(jié),體驗(yàn)并發(fā)架構(gòu)之美~
https://github.com/sogou/workflow
1. 高效的數(shù)據(jù)結(jié)構(gòu)與timerfd
舉個(gè)例子:實(shí)現(xiàn)一個(gè)server,收到請(qǐng)求之后,隔1s再回復(fù)給用戶。
聰明的讀者肯定知道,在server的執(zhí)行函數(shù)中用sleep(1)是不行的,sleep()
這個(gè)系統(tǒng)調(diào)用是會(huì)阻塞當(dāng)前線程的,而異步編程里阻塞線程是高效的大忌!
所以我們可以使用timerfd,顧名思義就是用特定的fd來通知定時(shí)事件,把定時(shí)事件響應(yīng)和網(wǎng)絡(luò)事件響應(yīng)都一起處理,用epoll管理就是一把梭。
現(xiàn)在離高效還差一點(diǎn)。回到例子,我們不可能每次收到一個(gè)請(qǐng)求都創(chuàng)建一個(gè)timerfd,因?yàn)楦卟l(fā)場景下一個(gè)server通常要抗上百萬的QPS。
目前Workflow的超時(shí)算法做法是:一個(gè)poller有一個(gè)timerfd,內(nèi)部利用了鏈表+紅黑樹的數(shù)據(jù)結(jié)構(gòu),時(shí)間復(fù)雜度在O(1)和O(logn)之間,其中n為poller線程的fd數(shù)量。
2. 精準(zhǔn)的響應(yīng)
這樣的數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)有什么好處呢?
- 寫得快(放入一個(gè)新節(jié)點(diǎn))
- 讀得快(響應(yīng)已超時(shí)的節(jié)點(diǎn))
- 精度高(超時(shí)時(shí)間無精度損失)
Workflow源碼在kernel和factory目錄中都有對(duì)應(yīng)的實(shí)現(xiàn),kernel層是主要負(fù)責(zé)timerfd的地方,當(dāng)前factory層還比較薄。我們重點(diǎn)看看上述數(shù)據(jù)結(jié)構(gòu)。
寫:由用戶發(fā)起異步任務(wù),將這個(gè)任務(wù)加到上述的鏈表+紅黑樹的數(shù)據(jù)結(jié)構(gòu)中,如果這個(gè)超時(shí)是當(dāng)前最小的超時(shí)時(shí)間,還會(huì)更新一下timerfd。
讀:框架的網(wǎng)絡(luò)線程每次會(huì)從epoll拿出事件,如果響應(yīng)到超時(shí)事件,會(huì)把數(shù)據(jù)結(jié)構(gòu)中已經(jīng)超時(shí)的全部節(jié)點(diǎn)都拿出來,并調(diào)用任務(wù)的handle。
以下是從epoll處理超時(shí)事件的關(guān)鍵函數(shù):
/*** poller響應(yīng)timerfd的到時(shí)事件,并處理所有到時(shí)的定時(shí)任務(wù) ***/ static void __poller_handle_timeout(const struct __poller_node *time_node, poller_t *poller) { ... // 鎖里,把list與rbtree上時(shí)間已到的節(jié)點(diǎn)都從數(shù)據(jù)結(jié)構(gòu)里刪除,臨時(shí)放到一個(gè)局部變量上 list_for_each_safe(pos, tmp, &poller->timeo_list) { ... node->removed = 1; // 標(biāo)志位:【removed】 ... } if (poller->tree_first) { ... } // 鎖外,設(shè)置state和error,并回調(diào)Task的handle()函數(shù) while (!list_empty(&timeo_list)) { node = list_entry(timeo_list.next, struct __poller_node, list); list_del(&node->list); node->error = ETIMEDOUT; node->state = PR_ST_ERROR; free(node->res); poller->cb((struct poller_result *)node, poller->ctx); } }
由于timerfd使用的超時(shí)時(shí)間是所有節(jié)點(diǎn)中最早超時(shí)的時(shí)間,而所有節(jié)點(diǎn)都在rbtree和list上按序排好,我們從前到后找的都是已超時(shí)的節(jié)點(diǎn)。因此利用了timerfd的精準(zhǔn)性可以非常準(zhǔn)確地叫醒當(dāng)前已經(jīng)超時(shí)的全部節(jié)點(diǎn),無精度損失。
由于實(shí)際使用中,用戶使用的超時(shí)時(shí)長總是傾向于固定的,比如上述例子都是1s,因此超時(shí)的絕對(duì)時(shí)間一般來說都是遞增的,使用這個(gè)數(shù)據(jù)結(jié)構(gòu)寫入會(huì)非???,設(shè)計(jì)特別適合定時(shí)器的實(shí)際需求。
3. 原子性
上述有提到,用戶的回調(diào)需要調(diào)且只調(diào)一次,Workflow可以保證在進(jìn)程退出時(shí)立刻全部立刻到時(shí)結(jié)束。進(jìn)程退出又是另一個(gè)話題,感興趣的讀者先自行去看代碼,回頭再細(xì)說~
4.允許取消(新功能)
看到這里,已經(jīng)可以感受到優(yōu)雅的數(shù)據(jù)結(jié)構(gòu)如何實(shí)現(xiàn)高效精準(zhǔn)的定時(shí)器了~
但是當(dāng)我們打開poller.h,感受一下它的接口,總覺得差了點(diǎn)什么:
是的!一個(gè)timer可以add,但是卻不可以delete!
Workflow中許多結(jié)構(gòu)的實(shí)現(xiàn)都是非常完備和對(duì)稱的,因此取消一個(gè)定時(shí)任務(wù)這件事在Workflow開源的第一天就一直想要實(shí)現(xiàn)。
但是多加一個(gè)取消cancel會(huì)有很大的問題:如果一個(gè)timer已經(jīng)timeout,用戶再去cancel的生命周期是沒有辦法保證的,它可能已經(jīng)析構(gòu)了!
最近終于找到了一個(gè)非常好的解決辦法:使用命名timer,交給全局map管,cancel的時(shí)候帶名字去操作,就可以解決生命周期問題。
我們?cè)黾恿藥值腡imer :__WFNamedTimerTask
,通過全局的map可以找到它,從而進(jìn)行刪除。刪除實(shí)際上就是從poller中刪除一個(gè)timer。
所以從底向上,為孤獨(dú)的poller_add_time增加一個(gè)小伙伴:poller_del_timer。
/*** 取消一個(gè)定時(shí)任務(wù)時(shí),從poller刪除它 ***/ int poller_del_timer(void *timer, poller_t *poller) { ... // 鎖內(nèi):如果這個(gè)標(biāo)志位還是0,表示stop還沒把它拿走,這里就可以去刪除這個(gè)timer if (!node->removed) { node->removed = 1; // 可以讓cancel和stop互斥,保證只調(diào)一次 if (node->in_rbtree) // 從定時(shí)器的數(shù)據(jù)結(jié)構(gòu)中刪掉 __poller_tree_erase(node, poller); else list_del(&node->list); node->error = 0; node->state = PR_ST_DELETED; stopped = poller->stopped; if (!stopped) // 標(biāo)志位【stopped】,如果當(dāng)時(shí)沒有進(jìn)程退出,把timer事件交出去處理 write(poller->pipe_wr, &node, sizeof (void *)); } ... // 鎖外:標(biāo)志位【stopped】如果進(jìn)程要退出了,立刻處理timer事件的handle if (stopped) poller->cb((struct poller_result *)node, poller->ctx); return -!node; }
剛才講述過的timeout(時(shí)間到)、stop(進(jìn)程退出)、cancel(用戶取消)三者可能由三個(gè)線程分別發(fā)起,于是我們看到的并發(fā)狀態(tài),簡單來說是這樣的:
定時(shí)器到期(timeout)、進(jìn)程退出(stop)、任務(wù)取消(cancel)三者隨時(shí)可能發(fā)生!
5. 精妙的并發(fā)狀態(tài)分析
cancel和另外兩個(gè)行為有著本質(zhì)上的不同:
- timeout和stop的觸發(fā)順序是先poller層、再到Task層,最后到用戶的callbback;
- cancel的觸發(fā)先Task層,Task層的命名map中先刪掉它觸發(fā)、再到poller層,最后到用戶的callback;
因此先討論第一類情況。
我們以timeout為例:
- poller層面的
__poller_handle_timeout()
會(huì)把上述的removed標(biāo)志位用上,與poller_del_timer()
互斥:誰第一個(gè)搶到removed標(biāo)志位并置為1,就代表了timer結(jié)束于哪個(gè)狀態(tài)。如果是timeout,那么用戶拿到的state為SS_STATE_COMPLETE; - 互斥鎖
poller->mutex
保證從poller的數(shù)據(jù)結(jié)構(gòu)中刪掉這個(gè)節(jié)點(diǎn)并調(diào)Task層回調(diào),從而可以保證stop的時(shí)候無需重復(fù)處理它; - timeout調(diào)用的Task層回調(diào),實(shí)際上是__WFNamedTimerTask::handle():
(1) 它會(huì)置一個(gè)標(biāo)志位node_.task,表示此任務(wù)已經(jīng)處理過;
(2) 并且把這個(gè)節(jié)點(diǎn)從全局map中刪除:這樣就保證了用戶自頂向下cancel就不會(huì)刪到它了;
/*** 處理定時(shí)器到期,由poller調(diào)用 ***/ void __WFNamedTimerTask::handle(int state, int error) { if (node_.task) // 由于不想先加鎖再處理,所以先判斷任務(wù)沒有被cancel處理過 { std::lock_guard<std::mutex> lock(__timer_map.mutex_); if (node_.task) // 鎖內(nèi)再檢查一下,入門技能 { timers_->del(&node_, &__timer_map.root_); // 從map中刪除 node_.task = NULL; // 標(biāo)志位:表示任務(wù)生命周期結(jié)束了 } } mutex_.lock(); // 這里是為了等待dispatch流程保證exchange已經(jīng)結(jié)束, mutex_.unlock(); // 否則資源被釋放就不能訪問成員變量了。也是非常常用的技巧! this->__WFTimerTask::handle(state, error); // 里邊會(huì)釋放資源,并發(fā)起任務(wù)流下一個(gè)任務(wù) }
第二類情況,如果用戶調(diào)用cancel先發(fā)生呢?
- 自頂向下先由factory層找到這個(gè)節(jié)點(diǎn),再調(diào)到poller的
poller_del_timer()
。期間需要記錄一些狀態(tài),因?yàn)槲覀冃枰ǔS卸鄠€(gè)poller。然后內(nèi)部會(huì)先置上removed
,并從定時(shí)器數(shù)據(jù)結(jié)構(gòu)中刪掉,以保證和timeout流程互斥; - 在Task層面,需要由cancel流程負(fù)責(zé)調(diào)用用戶的callback(),同時(shí)回收timer的資源;
void __NamedTimerMap::cancel(const std::string& name, size_t max) { ... // 鎖內(nèi),拿出命名為name的timer隊(duì)列 timers = __get_object_list<TimerList>(name, &root_, false); if (timers) { do { if (max == 0) // 從map中刪除最多max個(gè) return; // 從該名字對(duì)應(yīng)的隊(duì)列中刪除該timer node = list_entry(timers->head.next, struct __timer_node, list); list_del(&node->list); // 標(biāo)識(shí)位:exchange。如果是第二次exchange,會(huì)調(diào)到task自身的cancel()從poller中刪掉它 if (node->task->flag_.exchange(true)) node->task->cancel(); // 標(biāo)志位:表示生命周期正確結(jié)束,資源已經(jīng)被回收,否則timeout流程或析構(gòu)函數(shù)需要做回收 node->task = NULL; max--; } while (!timers->empty()); rb_erase(&timers->rb, &root_); delete timers; } }
6. 異步任務(wù)的發(fā)起時(shí)機(jī)是個(gè)謎
上面那張圖,我們假設(shè)的是任務(wù)先創(chuàng)建好,再被發(fā)起。那如果任務(wù)還沒有被發(fā)起,甚至我們不想發(fā)起呢?
我們假設(shè)的是任務(wù)先創(chuàng)建好,再被發(fā)起。那如果任務(wù)還沒有被發(fā)起,甚至我們不想發(fā)起呢?
1、任務(wù)可以在被發(fā)起前取消
實(shí)際上我們把一個(gè)timer放到一個(gè)任務(wù)流圖中,我們并不能確定它被發(fā)起的準(zhǔn)確的時(shí)機(jī),但我們依然允許先cancel它。
這時(shí)候我們就需要上述的標(biāo)志位exchange
來做互斥了。exchange
是個(gè)std::atomic<tool>
,初始化為false,用戶已經(jīng)手動(dòng)cancel過之后,任務(wù)可能在任務(wù)流中才會(huì)被發(fā)起dispatch。
因此即使先取消,沒關(guān)系,但必須保證dispatch過才能釋放這個(gè)timer的資源:
/*** 由任務(wù)流發(fā)起、或者用戶手動(dòng)start起來 ***/ void __WFNamedTimerTask::dispatch() { int ret; mutex_.lock(); ret = this->scheduler->sleep(this); // 先把定時(shí)任務(wù)交給poller if (ret >= 0 && flag_.exchange(true)) // exchange一下。如果是第二個(gè)調(diào)用exchange的人,會(huì)拿到true this->cancel(); // 說明發(fā)起之前已經(jīng)有人cancel過了,立刻從poller中刪除即可 mutex_.unlock(); if (ret < 0) this->handle(SS_STATE_ERROR, errno); }
這里有兩個(gè)要注意的點(diǎn):
- 通過
sleep()
交給poller之后,用戶的callback可以在網(wǎng)絡(luò)線程中處理,而不是當(dāng)前線程立刻處理,這樣可以遞歸爆棧問題; - 如果過期時(shí)間非常短,
sleep(
)之后是隨時(shí)有可能到期并回到__WFNamedTimerTask::handle()
的!因此前面對(duì)mutex_.lock()和mutex_.unlock()等的就是這里的flag_.exchange(true)
執(zhí)行結(jié)束;
2. 任務(wù)甚至可以不發(fā)起
而如果創(chuàng)建完之后不想發(fā)起,Workflow統(tǒng)一的接口是需要調(diào)用一下task->dismiss()
,以釋放task資源,調(diào)用析構(gòu)函數(shù)等。
/*** 命名定時(shí)任務(wù)的析構(gòu)函數(shù),異步任務(wù)需要注意處理各種情況的資源回收 ***/ virtual ~__WFNamedTimerTask() { if (node_.task) // 標(biāo)志位:如果沒有置空,說明任務(wù)沒有發(fā)起過。需要從全局map中刪掉這個(gè)timer { std::lock_guard<std::mutex> lock(__timer_map.mutex_); if (node_.task) // 鎖內(nèi)再檢查一次 timers_->del(&node_, &__timer_map.root_); } }
7. 總結(jié)
最后貼一段代碼看看,一個(gè)高并發(fā)1s定時(shí)返回的server代碼,用Workflow實(shí)現(xiàn)可以多簡單:
int main() { WFHttpServer server([](WFHttpTask * task) { task->get_resp()->append_output_body("<html>will response after 1s</html>"); auto timer = WFTaskFactory::create_timer_task(1, 0, nullptr); series_of(task)->push_back(timer); }); if (server.start(1412) == 0) { getchar(); server.stop(); } return 0; }
本篇介紹了如何優(yōu)雅地處理異步任務(wù)的:創(chuàng)建、發(fā)起、回調(diào)、取消、進(jìn)程退出多種并發(fā)行為,其中包含了許多常用的技巧!不記得的小伙伴自行翻回去再看一遍._.
當(dāng)然這在并發(fā)的世界中還只是冰山一角,因?yàn)楹苡锌赡軐懴履骋痪湓挼臅r(shí)機(jī)不對(duì),任務(wù)一結(jié)束,程序就GG了。
以上就是C++實(shí)現(xiàn)高并發(fā)異步定時(shí)器的詳細(xì)內(nèi)容,更多關(guān)于C++定時(shí)器的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
C++實(shí)現(xiàn)簡單酒店管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了C++實(shí)現(xiàn)簡單酒店管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08C++實(shí)現(xiàn)LeetCode(152.求最大子數(shù)組乘積)
這篇文章主要介紹了C++實(shí)現(xiàn)LeetCode(152.求最大子數(shù)組乘積),本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-07-07Windows下ncnn環(huán)境配置教程詳解(VS2019)
這篇文章主要介紹了Windows下ncnn環(huán)境配置(VS2019),本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-03-03C語言數(shù)據(jù)結(jié)構(gòu)不掛科指南之隊(duì)列詳解
這篇博客主要介紹一下隊(duì)列的概念,并且采用 C 語言,編寫兩種存儲(chǔ)實(shí)現(xiàn)方式:順序存儲(chǔ)和鏈?zhǔn)酱鎯?chǔ),當(dāng)然還有常規(guī)的隊(duì)列基本操作的實(shí)現(xiàn)算法2022-09-09C語言?詳解如何刪除有序數(shù)組中的重復(fù)項(xiàng)
數(shù)組不擅長插入(添加)和刪除元素。數(shù)組的優(yōu)點(diǎn)在于它是連續(xù)的,所以查找數(shù)據(jù)速度很快。但這也是它的一個(gè)缺點(diǎn)。正因?yàn)樗沁B續(xù)的,所以當(dāng)插入一個(gè)元素時(shí),插入點(diǎn)后所有的元素全部都要向后移;而刪除一個(gè)元素時(shí),刪除點(diǎn)后所有的元素全部都要向前移2022-03-03