C++ 實現(xiàn)線程安全的頻率限制器(推薦)
很早以前,在學(xué)習(xí)使用 Python 的deque
容器時,我寫了一篇文章python3 deque 雙向隊列創(chuàng)建與使用方法分析。最近需要壓測線上服務(wù)的性能,又不愿意總是在 QA 那邊排隊,于是需要自己寫一個壓測用的客戶端。其中一個核心需求就是要實現(xiàn) QPS 限制。
于是,終究逃不開要在 C++ 中實現(xiàn)一個線程安全的頻率限制器。
設(shè)計思路
所謂頻率限制,就是要在一個時間段(inteval)中,限制操作的次數(shù)(limit)。這又可以引出兩種強弱不同的表述:
- 強表述:在任意一個長度等于設(shè)定的時間段(interval)的滑動窗口中,頻率限制器放行的操作次數(shù)(count)都不高于限制次數(shù)(limit)。
- 弱表述:在一組長度等于設(shè)定的時間段(interval)且緊密相連的固定窗口中,每一個窗口里頻率限制器放行的操作次數(shù)(count)都不高于限制次數(shù)(limit)。
不難發(fā)現(xiàn),強表述通過「滑動窗口」的方式,不僅限制了頻率,還要求了操作在時間上的均勻性。前作的頻率限制器,實際上對應(yīng)了這里的強表述。但實際工程中,我們通常只需要實現(xiàn)弱表述的頻率限制器就足夠使用了。
對于弱表述,實現(xiàn)起來主要思路如下:
當(dāng)操作計數(shù)(count)小于限制(limit)時直接放行;
單線程實現(xiàn)
在不考慮線程安全時,不難給出這樣的實現(xiàn)。
struct ms_clock { using rep = std::chrono::milliseconds::rep; using period = std::chrono::milliseconds::period; using duration = std::chrono::duration<rep, period>; using time_point = std::chrono::time_point<ms_clock, duration>; static time_point now() noexcept { return time_point(std::chrono::duration_cast<duration>( std::chrono::steady_clock::now().time_since_epoch())); } }; } // namespace __details class RateLimiter { public: using clock = __details::ms_clock; // 1. private: const uint64_t limit_; const clock::duration interval_; const clock::rep interval_count_; mutable uint64_t count_{std::numeric_limits<uint64_t>::max()}; // 2.a. mutable clock::rep timestamp_{0}; // 2.b. public: constexpr RateLimiter(uint64_t limit, clock::duration interval) : limit_(limit), interval_(interval), interval_count_(interval_.count()) {} RateLimiter(const RateLimiter&) = delete; // 3.a. RateLimiter& operator=(const RateLimiter&) = delete; // 3.b. RateLimiter(RateLimiter&&) = delete; // 3.c. RateLimiter& operator=(RateLimiter&&) = delete; // 3.d. bool operator()() const; double qps() const { return 1000.0 * this->limit_ / this->interval_count_; } }; bool RateLimiter::operator()() const { auto orig_count = this->count_++; if (orig_count < this->limit_) { // 4. return true; } else { auto ts = this->timestamp_; auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { // 5. return false; } this->timestamp_ = now; this->count_ = 1; return true; } }
這里,
(1) 表明頻率限制器使用單位為毫秒的時鐘。在實際使用時,也可以按需改成微妙甚至納秒。
(2) 使用mutable
修飾內(nèi)部狀態(tài)count_
和timestamp_
。這是因為兩個limit_
和interval_
相同的頻率限制器,在邏輯上是等價的,但他們的內(nèi)部狀態(tài)卻不一定相同。因此,為了讓const
成員函數(shù)能夠修改內(nèi)部狀態(tài)(而不改變邏輯等價),我們要給內(nèi)部狀態(tài)數(shù)據(jù)成員加上mutable
修飾。
(2.a) 處將count_
設(shè)置為類型可表示的最大值是為了讓
(4) 的判斷失敗,而對timestamp_
進行初始化。
(2.b) 處將timestamp_
設(shè)置為0
則是基于同樣的原因,讓 (5) 的判斷失敗。
(2.a) 和 (2.b) 處通過巧妙的初值設(shè)計,將初始化狀態(tài)與后續(xù)正常工作狀態(tài)的邏輯統(tǒng)一了起來,而無須丑陋的判斷。
(3) 禁止了對象的拷貝和移動。這是因為一個頻率限制器應(yīng)綁定一組操作,而不應(yīng)由兩組或更多組操作共享(對于拷貝的情形),或是中途失效(對于移動的情形)。
如此一來,函數(shù)調(diào)用運算符就忠實地實現(xiàn)了前述邏輯。
多線程改造
第一步改造
當(dāng)有多線程同時調(diào)用RateLimiter::operator()
時,顯而易見,在count_
和timestamp_
上會產(chǎn)生競爭。我們有兩種辦法解決這個問題:要不然加鎖,要不然把count_
和timestamp_
設(shè)為原子變量然后用原子操作解決問題。于是,對函數(shù)調(diào)用運算符,我們有如下第一步的改造。
class RateLimiter { // 其余保持不變 private: mutable std::atomic<uint64_t> count_{std::numeric_limits<uint64_t>::max()}; // 1.a. mutable std::atomic<clock::rep> timestamp_{0}; // 1.b. // 其余保持不變 }; bool RateLimiter::operator()() const { auto orig_count = this->count_.fetch_add(1UL); // 2. if (orig_count < this->limit_) { return true; } else { auto ts = this->timestamp_.load(); // 3. auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { return false; } this->timestamp_.store(now); // 4. this->count_.store(1UL); // 5. return true; } }
這里,
- (1) 將
count_
和timestamp_
聲明為原子的,從而方便后續(xù)進行原子操作。 - (2) -- (5) 則將原有操作分別改為對應(yīng)的原子操作。
這樣看起來很完美,但其實是有 bug 的。我們重點關(guān)注 (4) 這里。(4) 的本意是更新timestamp_
,以備下一次orig_count >= this->limit_
時進行判斷。準確設(shè)置這一timestamp
是頻率限制器正確工作的基石。但是,如果有兩個(或更多)線程,同時走到了 (4)會發(fā)生什么?
- 因為原子操作的存在,兩個線程會先后執(zhí)行 (4)。于是
timestamp_
的值究竟是什么,我們完全不可預(yù)期。 - 此外,兩個線程會先后執(zhí)行 (5),即原子地將
count_
置為1
。但是你想,頻率限制器先后放行了兩次操作,但為什么count_
是1
呢?這是不是就「偷跑」了一次操作?
為此,我們要保證只有一個線程會真正設(shè)置timestamp_
,而拒絕其他同樣走到 (4) 位置的線程的操作,以避免其重復(fù)設(shè)置timestamp_
以及錯誤地將count_
再次置為1
。
第二步改進
于是有以下改進。
bool RateLimiter::operator()() const { auto orig_count = this->count_.fetch_add(1UL); // 3. if (orig_count < this->limit_) { // 4. return true; } else { auto ts = this->timestamp_.load(); auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { // 5. return false; } if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { // 1. return false; } this->count_.store(1UL); // 2. return true; } }
這里,(1) 是一個 CAS 原子操作。它會原子地比較timestamp_
和ts
的值(Compare):若他們相等,則將now
的值寫入timestamp_
(Swap),并返回true
;若他們不相等,則將timestamp_
的值寫入ts
,并返回false
。如果沒有其他線程搶先修改timestamp_
的值,那么 CAS 操作應(yīng)該成功并返回true
,繼續(xù)執(zhí)行后面的代碼;否則,說明其他線程已經(jīng)搶先修改了timestamp_
,當(dāng)前線程的操作被記入上一個周期而被頻率限制器拒絕。
現(xiàn)在要考慮 (2)。如果執(zhí)行完 (1) 之后立即立刻馬上沒有任何延遲地執(zhí)行 (2),那么當(dāng)然一切大吉。但如果這時候當(dāng)前線程被切出去,會發(fā)生什么?我們要分情況討論。
如果ts == 0
,也就是「當(dāng)前線程」是頻率限制器第一次修改timestamp_
。于是,當(dāng)前線程可能會在 (3) 處將count_
(溢出地)自增為0
,從而可能有其他線程通過 (4)。此時,當(dāng)前線程在當(dāng)前分片有可能應(yīng)當(dāng)被拒絕操作。為此,我們需要在 (1) 和 (2) 之間做額外的判斷。
if (ts == 0) { auto orig_count = this->count.fetch_add(1UL); return (orig_count < this->limit_); }
如果ts != 0
,也就是「當(dāng)前線程」并非頻率限制器第一次修改timestamp_
。那么其他線程在 (4) 處必然判斷失敗,但在 (5) 處的判斷可能成功,從而可能繼續(xù)成功執(zhí)行 (1),從而接連兩次執(zhí)行 (2)。但這不影響正確性。因為通過 (5) 表明相對當(dāng)前線程填入的timestamp_
,已經(jīng)由過了一個時間段(interval),而在這個時間段里,只有當(dāng)前線程的一次操作會被接受。
第三次改進
由此,我們得到:
bool RateLimiter::operator()() const { auto orig_count = this->count_.fetch_add(1UL); if (orig_count < this->limit_) { return true; } else { auto ts = this->timestamp_.load(); auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { return false; } if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { return false; } if (ts == 0) { auto orig_count = this->count.fetch_add(1UL); return (orig_count < this->limit_); } this->count_.store(1UL); return true; } }
至此,我們的代碼在邏輯上已經(jīng)成立了,接下來要做一些性能方面的優(yōu)化。
原子操作默認采用std::memory_order_seq_cst
的內(nèi)存模型。這是 C++ 中最嚴格的內(nèi)存模型,它有兩個保證:
- 程序指令和源碼順序一致;
- 所有線程的所有操作都有一致的順序。
為了實現(xiàn)順序一致性(sequential consistency),編譯器會使用很多對抗編譯器優(yōu)化和 CPU 亂序執(zhí)行(Out-of-Order Execution)的手段,因而性能較差。對于此處的count_
,我們無需順序一致性模型,只需要獲取釋放模型(Aquire-Release)模型就足夠了。
第四次改進
于是有第四次改進:
bool RateLimiter::operator()() const { auto orig_count = this->count_.fetch_add(1UL, std::memory_order_acq_rel); if (orig_count < this->limit_) { return true; } else { auto ts = this->timestamp_.load(); auto now = clock::now().time_since_epoch().count(); if (now - ts < this->interval_count_) { return false; } if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { return false; } if (ts == 0) { auto orig_count = this->count.fetch_add(1UL, std::memory_order_acq_rel); return (orig_count < this->limit_); } this->count_.store(1UL, std::memory_order_release); return true; } }
至此,我們就完整實現(xiàn)了一個頻率限制器,可以愉快地開始拉 QPS 壓測了!
總結(jié)
到此這篇關(guān)于在 C++ 中實現(xiàn)一個線程安全的頻率限制器的文章就介紹到這了,更多相關(guān)在 C++ 中實現(xiàn)一個線程安全的頻率限制器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C/C++ Qt 自定義Dialog對話框組件應(yīng)用案例詳解
有時候我們需要一次性修改多個數(shù)據(jù),使用默認的模態(tài)對話框似乎不太夠用,此時我們需要自己創(chuàng)建一個自定義對話框。這篇文章主要介紹了Qt自定義Dialog對話框組件的應(yīng)用,感興趣的同學(xué)可以學(xué)習(xí)一下2021-11-11