C++實(shí)現(xiàn)高并發(fā)異步定時(shí)器
在C++高并發(fā)場(chǎng)景,定時(shí)功能的實(shí)現(xiàn)有三大難題:高效、精準(zhǔn)、原子性。
除了定時(shí)任務(wù)隨時(shí)可能到期、而進(jìn)程隨時(shí)可能要退出之外,最近Workflow甚至為定時(shí)任務(wù)增加了取消功能,導(dǎo)致任務(wù)可能被框架調(diào)起之前被用戶取消,或者創(chuàng)建之后不想執(zhí)行直接刪除等情況,而這些情況大部分來(lái)說(shuō)都是由不同線程執(zhí)行的,因此其中的并發(fā)處理可謂教科書(shū)級(jí)別!
那么就和大家一起看看Workflow在定時(shí)器的設(shè)計(jì)上做了哪些考慮,深扒細(xì)節(jié),體驗(yàn)并發(fā)架構(gòu)之美~
https://github.com/sogou/workflow
1. 高效的數(shù)據(jù)結(jié)構(gòu)與timerfd
舉個(gè)例子:實(shí)現(xiàn)一個(gè)server,收到請(qǐng)求之后,隔1s再回復(fù)給用戶。
聰明的讀者肯定知道,在server的執(zhí)行函數(shù)中用sleep(1)是不行的,sleep()這個(gè)系統(tǒng)調(diào)用是會(huì)阻塞當(dāng)前線程的,而異步編程里阻塞線程是高效的大忌!
所以我們可以使用timerfd,顧名思義就是用特定的fd來(lái)通知定時(shí)事件,把定時(shí)事件響應(yīng)和網(wǎng)絡(luò)事件響應(yīng)都一起處理,用epoll管理就是一把梭。
現(xiàn)在離高效還差一點(diǎn)?;氐嚼?,我們不可能每次收到一個(gè)請(qǐng)求都創(chuàng)建一個(gè)timerfd,因?yàn)楦卟l(fā)場(chǎng)景下一個(gè)server通常要抗上百萬(wàn)的QPS。
目前Workflow的超時(shí)算法做法是:一個(gè)poller有一個(gè)timerfd,內(nèi)部利用了鏈表+紅黑樹(shù)的數(shù)據(jù)結(jié)構(gòu),時(shí)間復(fù)雜度在O(1)和O(logn)之間,其中n為poller線程的fd數(shù)量。

2. 精準(zhǔn)的響應(yīng)
這樣的數(shù)據(jù)結(jié)構(gòu)設(shè)計(jì)有什么好處呢?
- 寫(xiě)得快(放入一個(gè)新節(jié)點(diǎn))
- 讀得快(響應(yīng)已超時(shí)的節(jié)點(diǎn))
- 精度高(超時(shí)時(shí)間無(wú)精度損失)
Workflow源碼在kernel和factory目錄中都有對(duì)應(yīng)的實(shí)現(xiàn),kernel層是主要負(fù)責(zé)timerfd的地方,當(dāng)前factory層還比較薄。我們重點(diǎn)看看上述數(shù)據(jù)結(jié)構(gòu)。
寫(xiě):由用戶發(fā)起異步任務(wù),將這個(gè)任務(wù)加到上述的鏈表+紅黑樹(shù)的數(shù)據(jù)結(jié)構(gòu)中,如果這個(gè)超時(shí)是當(dāng)前最小的超時(shí)時(shí)間,還會(huì)更新一下timerfd。
讀:框架的網(wǎng)絡(luò)線程每次會(huì)從epoll拿出事件,如果響應(yīng)到超時(shí)事件,會(huì)把數(shù)據(jù)結(jié)構(gòu)中已經(jīng)超時(shí)的全部節(jié)點(diǎn)都拿出來(lái),并調(diào)用任務(wù)的handle。
以下是從epoll處理超時(shí)事件的關(guān)鍵函數(shù):
/*** poller響應(yīng)timerfd的到時(shí)事件,并處理所有到時(shí)的定時(shí)任務(wù) ***/
static void __poller_handle_timeout(const struct __poller_node *time_node, poller_t *poller)
{
...
// 鎖里,把list與rbtree上時(shí)間已到的節(jié)點(diǎn)都從數(shù)據(jù)結(jié)構(gòu)里刪除,臨時(shí)放到一個(gè)局部變量上
list_for_each_safe(pos, tmp, &poller->timeo_list)
{
...
node->removed = 1; // 標(biāo)志位:【removed】
...
}
if (poller->tree_first)
{ ... }
// 鎖外,設(shè)置state和error,并回調(diào)Task的handle()函數(shù)
while (!list_empty(&timeo_list))
{
node = list_entry(timeo_list.next, struct __poller_node, list);
list_del(&node->list);
node->error = ETIMEDOUT;
node->state = PR_ST_ERROR;
free(node->res);
poller->cb((struct poller_result *)node, poller->ctx);
}
}由于timerfd使用的超時(shí)時(shí)間是所有節(jié)點(diǎn)中最早超時(shí)的時(shí)間,而所有節(jié)點(diǎn)都在rbtree和list上按序排好,我們從前到后找的都是已超時(shí)的節(jié)點(diǎn)。因此利用了timerfd的精準(zhǔn)性可以非常準(zhǔn)確地叫醒當(dāng)前已經(jīng)超時(shí)的全部節(jié)點(diǎn),無(wú)精度損失。
由于實(shí)際使用中,用戶使用的超時(shí)時(shí)長(zhǎng)總是傾向于固定的,比如上述例子都是1s,因此超時(shí)的絕對(duì)時(shí)間一般來(lái)說(shuō)都是遞增的,使用這個(gè)數(shù)據(jù)結(jié)構(gòu)寫(xiě)入會(huì)非常快,設(shè)計(jì)特別適合定時(shí)器的實(shí)際需求。
3. 原子性
上述有提到,用戶的回調(diào)需要調(diào)且只調(diào)一次,Workflow可以保證在進(jìn)程退出時(shí)立刻全部立刻到時(shí)結(jié)束。進(jìn)程退出又是另一個(gè)話題,感興趣的讀者先自行去看代碼,回頭再細(xì)說(shuō)~
4.允許取消(新功能)
看到這里,已經(jīng)可以感受到優(yōu)雅的數(shù)據(jù)結(jié)構(gòu)如何實(shí)現(xiàn)高效精準(zhǔn)的定時(shí)器了~
但是當(dāng)我們打開(kāi)poller.h,感受一下它的接口,總覺(jué)得差了點(diǎn)什么:

是的!一個(gè)timer可以add,但是卻不可以delete!
Workflow中許多結(jié)構(gòu)的實(shí)現(xiàn)都是非常完備和對(duì)稱(chēng)的,因此取消一個(gè)定時(shí)任務(wù)這件事在Workflow開(kāi)源的第一天就一直想要實(shí)現(xiàn)。
但是多加一個(gè)取消cancel會(huì)有很大的問(wèn)題:如果一個(gè)timer已經(jīng)timeout,用戶再去cancel的生命周期是沒(méi)有辦法保證的,它可能已經(jīng)析構(gòu)了!
最近終于找到了一個(gè)非常好的解決辦法:使用命名timer,交給全局map管,cancel的時(shí)候帶名字去操作,就可以解決生命周期問(wèn)題。
我們?cè)黾恿藥值腡imer :__WFNamedTimerTask,通過(guò)全局的map可以找到它,從而進(jìn)行刪除。刪除實(shí)際上就是從poller中刪除一個(gè)timer。
所以從底向上,為孤獨(dú)的poller_add_time增加一個(gè)小伙伴:poller_del_timer。
/*** 取消一個(gè)定時(shí)任務(wù)時(shí),從poller刪除它 ***/
int poller_del_timer(void *timer, poller_t *poller)
{
...
// 鎖內(nèi):如果這個(gè)標(biāo)志位還是0,表示stop還沒(méi)把它拿走,這里就可以去刪除這個(gè)timer
if (!node->removed)
{
node->removed = 1; // 可以讓cancel和stop互斥,保證只調(diào)一次
if (node->in_rbtree) // 從定時(shí)器的數(shù)據(jù)結(jié)構(gòu)中刪掉
__poller_tree_erase(node, poller);
else
list_del(&node->list);
node->error = 0;
node->state = PR_ST_DELETED;
stopped = poller->stopped;
if (!stopped) // 標(biāo)志位【stopped】,如果當(dāng)時(shí)沒(méi)有進(jìn)程退出,把timer事件交出去處理
write(poller->pipe_wr, &node, sizeof (void *));
}
...
// 鎖外:標(biāo)志位【stopped】如果進(jìn)程要退出了,立刻處理timer事件的handle
if (stopped)
poller->cb((struct poller_result *)node, poller->ctx);
return -!node;
}剛才講述過(guò)的timeout(時(shí)間到)、stop(進(jìn)程退出)、cancel(用戶取消)三者可能由三個(gè)線程分別發(fā)起,于是我們看到的并發(fā)狀態(tài),簡(jiǎn)單來(lái)說(shuō)是這樣的:

定時(shí)器到期(timeout)、進(jìn)程退出(stop)、任務(wù)取消(cancel)三者隨時(shí)可能發(fā)生!
5. 精妙的并發(fā)狀態(tài)分析
cancel和另外兩個(gè)行為有著本質(zhì)上的不同:
- timeout和stop的觸發(fā)順序是先poller層、再到Task層,最后到用戶的callbback;
- cancel的觸發(fā)先Task層,Task層的命名map中先刪掉它觸發(fā)、再到poller層,最后到用戶的callback;
因此先討論第一類(lèi)情況。
我們以timeout為例:
- poller層面的
__poller_handle_timeout()會(huì)把上述的removed標(biāo)志位用上,與poller_del_timer()互斥:誰(shuí)第一個(gè)搶到removed標(biāo)志位并置為1,就代表了timer結(jié)束于哪個(gè)狀態(tài)。如果是timeout,那么用戶拿到的state為SS_STATE_COMPLETE; - 互斥鎖
poller->mutex保證從poller的數(shù)據(jù)結(jié)構(gòu)中刪掉這個(gè)節(jié)點(diǎn)并調(diào)Task層回調(diào),從而可以保證stop的時(shí)候無(wú)需重復(fù)處理它; - timeout調(diào)用的Task層回調(diào),實(shí)際上是__WFNamedTimerTask::handle():
(1) 它會(huì)置一個(gè)標(biāo)志位node_.task,表示此任務(wù)已經(jīng)處理過(guò);
(2) 并且把這個(gè)節(jié)點(diǎn)從全局map中刪除:這樣就保證了用戶自頂向下cancel就不會(huì)刪到它了;
/*** 處理定時(shí)器到期,由poller調(diào)用 ***/
void __WFNamedTimerTask::handle(int state, int error)
{
if (node_.task) // 由于不想先加鎖再處理,所以先判斷任務(wù)沒(méi)有被cancel處理過(guò)
{
std::lock_guard<std::mutex> lock(__timer_map.mutex_);
if (node_.task) // 鎖內(nèi)再檢查一下,入門(mén)技能
{
timers_->del(&node_, &__timer_map.root_); // 從map中刪除
node_.task = NULL; // 標(biāo)志位:表示任務(wù)生命周期結(jié)束了
}
}
mutex_.lock(); // 這里是為了等待dispatch流程保證exchange已經(jīng)結(jié)束,
mutex_.unlock(); // 否則資源被釋放就不能訪問(wèn)成員變量了。也是非常常用的技巧!
this->__WFTimerTask::handle(state, error); // 里邊會(huì)釋放資源,并發(fā)起任務(wù)流下一個(gè)任務(wù)
}第二類(lèi)情況,如果用戶調(diào)用cancel先發(fā)生呢?
- 自頂向下先由factory層找到這個(gè)節(jié)點(diǎn),再調(diào)到poller的
poller_del_timer()。期間需要記錄一些狀態(tài),因?yàn)槲覀冃枰ǔS卸鄠€(gè)poller。然后內(nèi)部會(huì)先置上removed,并從定時(shí)器數(shù)據(jù)結(jié)構(gòu)中刪掉,以保證和timeout流程互斥; - 在Task層面,需要由cancel流程負(fù)責(zé)調(diào)用用戶的callback(),同時(shí)回收timer的資源;

void __NamedTimerMap::cancel(const std::string& name, size_t max)
{
...
// 鎖內(nèi),拿出命名為name的timer隊(duì)列
timers = __get_object_list<TimerList>(name, &root_, false);
if (timers)
{
do
{
if (max == 0) // 從map中刪除最多max個(gè)
return;
// 從該名字對(duì)應(yīng)的隊(duì)列中刪除該timer
node = list_entry(timers->head.next, struct __timer_node, list);
list_del(&node->list);
// 標(biāo)識(shí)位:exchange。如果是第二次exchange,會(huì)調(diào)到task自身的cancel()從poller中刪掉它
if (node->task->flag_.exchange(true))
node->task->cancel();
// 標(biāo)志位:表示生命周期正確結(jié)束,資源已經(jīng)被回收,否則timeout流程或析構(gòu)函數(shù)需要做回收
node->task = NULL;
max--;
} while (!timers->empty());
rb_erase(&timers->rb, &root_);
delete timers;
}
}6. 異步任務(wù)的發(fā)起時(shí)機(jī)是個(gè)謎
上面那張圖,我們假設(shè)的是任務(wù)先創(chuàng)建好,再被發(fā)起。那如果任務(wù)還沒(méi)有被發(fā)起,甚至我們不想發(fā)起呢?
我們假設(shè)的是任務(wù)先創(chuàng)建好,再被發(fā)起。那如果任務(wù)還沒(méi)有被發(fā)起,甚至我們不想發(fā)起呢?
1、任務(wù)可以在被發(fā)起前取消
實(shí)際上我們把一個(gè)timer放到一個(gè)任務(wù)流圖中,我們并不能確定它被發(fā)起的準(zhǔn)確的時(shí)機(jī),但我們依然允許先cancel它。

這時(shí)候我們就需要上述的標(biāo)志位exchange來(lái)做互斥了。exchange是個(gè)std::atomic<tool>,初始化為false,用戶已經(jīng)手動(dòng)cancel過(guò)之后,任務(wù)可能在任務(wù)流中才會(huì)被發(fā)起dispatch。
因此即使先取消,沒(méi)關(guān)系,但必須保證dispatch過(guò)才能釋放這個(gè)timer的資源:
/*** 由任務(wù)流發(fā)起、或者用戶手動(dòng)start起來(lái) ***/
void __WFNamedTimerTask::dispatch()
{
int ret;
mutex_.lock();
ret = this->scheduler->sleep(this); // 先把定時(shí)任務(wù)交給poller
if (ret >= 0 && flag_.exchange(true)) // exchange一下。如果是第二個(gè)調(diào)用exchange的人,會(huì)拿到true
this->cancel(); // 說(shuō)明發(fā)起之前已經(jīng)有人cancel過(guò)了,立刻從poller中刪除即可
mutex_.unlock();
if (ret < 0)
this->handle(SS_STATE_ERROR, errno);
}這里有兩個(gè)要注意的點(diǎn):
- 通過(guò)
sleep()交給poller之后,用戶的callback可以在網(wǎng)絡(luò)線程中處理,而不是當(dāng)前線程立刻處理,這樣可以遞歸爆棧問(wèn)題; - 如果過(guò)期時(shí)間非常短,
sleep()之后是隨時(shí)有可能到期并回到__WFNamedTimerTask::handle()的!因此前面對(duì)mutex_.lock()和mutex_.unlock()等的就是這里的flag_.exchange(true)執(zhí)行結(jié)束;
2. 任務(wù)甚至可以不發(fā)起
而如果創(chuàng)建完之后不想發(fā)起,Workflow統(tǒng)一的接口是需要調(diào)用一下task->dismiss(),以釋放task資源,調(diào)用析構(gòu)函數(shù)等。
/*** 命名定時(shí)任務(wù)的析構(gòu)函數(shù),異步任務(wù)需要注意處理各種情況的資源回收 ***/
virtual ~__WFNamedTimerTask()
{
if (node_.task) // 標(biāo)志位:如果沒(méi)有置空,說(shuō)明任務(wù)沒(méi)有發(fā)起過(guò)。需要從全局map中刪掉這個(gè)timer
{
std::lock_guard<std::mutex> lock(__timer_map.mutex_);
if (node_.task) // 鎖內(nèi)再檢查一次
timers_->del(&node_, &__timer_map.root_);
}
}7. 總結(jié)
最后貼一段代碼看看,一個(gè)高并發(fā)1s定時(shí)返回的server代碼,用Workflow實(shí)現(xiàn)可以多簡(jiǎn)單:
int main()
{
WFHttpServer server([](WFHttpTask * task)
{
task->get_resp()->append_output_body("<html>will response after 1s</html>");
auto timer = WFTaskFactory::create_timer_task(1, 0, nullptr);
series_of(task)->push_back(timer);
});
if (server.start(1412) == 0)
{
getchar();
server.stop();
}
return 0;
}本篇介紹了如何優(yōu)雅地處理異步任務(wù)的:創(chuàng)建、發(fā)起、回調(diào)、取消、進(jìn)程退出多種并發(fā)行為,其中包含了許多常用的技巧!不記得的小伙伴自行翻回去再看一遍._.
當(dāng)然這在并發(fā)的世界中還只是冰山一角,因?yàn)楹苡锌赡軐?xiě)下某一句話的時(shí)機(jī)不對(duì),任務(wù)一結(jié)束,程序就GG了。
以上就是C++實(shí)現(xiàn)高并發(fā)異步定時(shí)器的詳細(xì)內(nèi)容,更多關(guān)于C++定時(shí)器的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
C++實(shí)現(xiàn)簡(jiǎn)單酒店管理系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了C++實(shí)現(xiàn)簡(jiǎn)單酒店管理系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08
C++實(shí)現(xiàn)LeetCode(152.求最大子數(shù)組乘積)
這篇文章主要介紹了C++實(shí)現(xiàn)LeetCode(152.求最大子數(shù)組乘積),本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-07-07
Windows下ncnn環(huán)境配置教程詳解(VS2019)
這篇文章主要介紹了Windows下ncnn環(huán)境配置(VS2019),本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-03-03
C++ 實(shí)現(xiàn)自定義類(lèi)型的迭代器操作
這篇文章主要介紹了C++ 實(shí)現(xiàn)自定義類(lèi)型的迭代器操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12
C語(yǔ)言數(shù)據(jù)結(jié)構(gòu)不掛科指南之隊(duì)列詳解
這篇博客主要介紹一下隊(duì)列的概念,并且采用 C 語(yǔ)言,編寫(xiě)兩種存儲(chǔ)實(shí)現(xiàn)方式:順序存儲(chǔ)和鏈?zhǔn)酱鎯?chǔ),當(dāng)然還有常規(guī)的隊(duì)列基本操作的實(shí)現(xiàn)算法2022-09-09
C語(yǔ)言?詳解如何刪除有序數(shù)組中的重復(fù)項(xiàng)
數(shù)組不擅長(zhǎng)插入(添加)和刪除元素。數(shù)組的優(yōu)點(diǎn)在于它是連續(xù)的,所以查找數(shù)據(jù)速度很快。但這也是它的一個(gè)缺點(diǎn)。正因?yàn)樗沁B續(xù)的,所以當(dāng)插入一個(gè)元素時(shí),插入點(diǎn)后所有的元素全部都要向后移;而刪除一個(gè)元素時(shí),刪除點(diǎn)后所有的元素全部都要向前移2022-03-03

