C++使用Mutex實(shí)現(xiàn)讀寫鎖思路
近期答辯完成了,想回頭看看之前沒做過的2PL。
實(shí)現(xiàn)2PL有4種方式:
- 死鎖檢測。本篇是為了做這個(gè)而實(shí)現(xiàn)的,做這個(gè)事情的原因是c++標(biāo)準(zhǔn)庫的shared_mutex無法從外界告知獲取鎖失敗。
- 如果需要等待,那么馬上結(jié)束txn。C++中有try_lock這樣的方式,如果上鎖失敗就返回false,這樣就可以實(shí)現(xiàn)這個(gè)了。
- 如果需要等待,那么殺死當(dāng)前已經(jīng)獲得鎖的一方。
- 在上鎖前對資源排序。
2和4是最簡單的,沒什么好說的。3比1略容易一些。
基本思路
一個(gè)讀寫鎖應(yīng)該具有以下特征:
- 多個(gè)讀者可以同時(shí)訪問
- 寫者獨(dú)占訪問
- 寫者與讀者互斥
- 避免寫者饑餓或讀者饑餓
- 鎖的遞歸使用
由于實(shí)現(xiàn)的鎖不能夠出現(xiàn)讀餓死、寫?zhàn)I死的現(xiàn)象,所以我想到一個(gè)很簡單的方法:先到先得。當(dāng)然也許會(huì)有其他方案。
先到先得的方式下,如何判斷一個(gè)線程是否該阻塞?
- 第一個(gè)寫請求之前的所有讀請求可以進(jìn)行
- 如果第一個(gè)請求是寫請求,那么只有這一個(gè)寫請求可以進(jìn)行
- 如果沒有寫請求,那么所有讀都可以進(jìn)行
- 如果沒有讀請求,那么第一個(gè)寫請求可以進(jìn)行。這實(shí)際是2的特殊情況
- 其他請求都不可以進(jìn)行
我們畫圖來說明一下。
假定某一刻有這些請求被阻塞,現(xiàn)在考慮挑出來可以執(zhí)行的線程來執(zhí)行
隊(duì)列中,第一個(gè)寫請求之前的讀都可以進(jìn)行,所以此時(shí)1,2線程是可以執(zhí)行的。它們讀完后釋放鎖,于是在這個(gè)隊(duì)列中刪除了1,2
1,2刪除后,3可以正常執(zhí)行。6在環(huán)檢測的時(shí)候被要求結(jié)束,然后線程3也結(jié)束了,所以此時(shí)所有的讀都可以進(jìn)行。
實(shí)現(xiàn)先到先得,可以通過記錄正在進(jìn)行讀的線程數(shù)量,正在進(jìn)行寫的線程數(shù)量,請求寫但是被阻塞的線程數(shù)量,請求讀但是被阻塞的線程數(shù)量,然后根據(jù)條件來分配資源給某個(gè)線程……維護(hù)的信息數(shù)量可能不止這些,比如說需要維護(hù)哪些線程的讀被阻塞了。
而環(huán)檢測的2PL,我們需要在外界通知線程鎖獲取失敗,所以選擇了使用隊(duì)列來實(shí)現(xiàn),這個(gè)隊(duì)列需要支持:
- 添加讀者、寫者(AddReader, AddWriter)
- 刪除讀者、寫者(RemoveReader, RemoveWriter,為了簡化,統(tǒng)一為一個(gè)Remove了)
- 當(dāng)可以獲得鎖的時(shí)候,提醒可以獲得鎖的線程。這個(gè)可以用condition_variable實(shí)現(xiàn)
- 確定某個(gè)線程是否應(yīng)該阻塞
然而做這樣一個(gè)隊(duì)列還是需要費(fèi)一些功夫的。
隊(duì)列實(shí)現(xiàn)
明確了功能需求后,考慮一下需要什么樣的數(shù)據(jù)結(jié)構(gòu)。普通的隊(duì)列肯定是不夠的,畢竟我們會(huì)刪除其中任意一個(gè)元素,容易想到的是map/set。然后考慮到先到先得的順序要求,可以考慮額外記錄一個(gè)邏輯時(shí)間timestamp,每當(dāng)一個(gè)請求到達(dá),就遞增timestamp。由于加入了timestamp,所以為了支持刪除,至少需要tid:timestamp的映射。而為了支持按timestamp查詢,至少需要timestamp:tid的映射。此外,需要記錄一個(gè)請求是讀還是寫,所以一共需要tid:timestamp的映射和timestamp:<tid,讀寫標(biāo)記>的映射。
timestamp:<tid,讀寫標(biāo)記>映射關(guān)系,很容易想到通過std::map這種天然自帶排序的數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn),即:
- 從最小到最大遍歷開頭的讀請求,這部分線程可以直接執(zhí)行。
- 如果是寫請求開頭的,那么這個(gè)寫可以直接執(zhí)行。
- 解鎖的時(shí)候刪除該線程的記錄。
筆者在此前做了CMU15445,里面的GC的watermark和這個(gè)非常顯相似。CMU15445中作者提到了可以使用unordered_map來將時(shí)間復(fù)雜度從O(logn)優(yōu)化到O(1),這種做法我想到了,所以這里的隊(duì)列使用的都是unordered_map。
#pragma once #ifndef READER_WRITER_QUEUE_H #define READER_WRITER_QUEUE_H // INSPIRED BY CMU15445 fall2023 watermark #include <cassert> #include <unordered_map> class ReaderWriterQueue { public: void AddReader(int tid) { assert(tid_ts.count(tid) == 0); ts_tt[next_timestamp] = {tid, TidType::kRead}; tid_ts[tid] = next_timestamp; next_timestamp++; } void AddWriter(int tid) { assert(tid_ts.count(tid) == 0); ts_tt[next_timestamp] = {tid, TidType::kWrite}; tid_ts[tid] = next_timestamp; next_timestamp++; } void Remove(int tid) { auto ts = tid_ts.find(tid); if (ts == tid_ts.end()) return; assert(ts_tt.count(ts->second) == 1); ts_tt.erase(ts->second); tid_ts.erase(ts); } bool ShallBlock(int tid) { ResetMinWriteTimestamp(); ResetMinTimestamp(); // 這兩個(gè)timestamp處理可以合并 auto iter = tid_ts.find(tid); assert(iter != tid_ts.end()); assert(ts_tt.count(iter->second) == 1); auto ts = iter->second; auto [_, type] = ts_tt[ts]; // 如果讀者之前有寫者,那么就需要阻塞等待 if (type == TidType::kRead) return ts > min_write_ts; // 如果寫者之前有讀者,那么就需要阻塞等待 if (min_ts < min_write_ts) return true; // 如果寫者之前有寫者,那么就需要阻塞等待 return ts_tt[min_write_ts].tid != tid; } private: void ResetMinWriteTimestamp() { for (; min_write_ts < next_timestamp; min_write_ts++) { auto iter = ts_tt.find(min_write_ts); if (iter == ts_tt.end()) { continue; } else if (iter->second.type == TidType::kWrite) { break; } else { // iter->second.type == TidType::kRead continue; } } } void ResetMinTimestamp() { for (; min_ts < next_timestamp; min_ts++) { auto iter = ts_tt.find(min_ts); if (iter != ts_tt.end()) break; } } long next_timestamp = 0; long min_write_ts = 0; long min_ts = 0; struct TidType { int tid; enum LockType {kRead, kWrite} type; bool operator==(const TidType &rhs) const { return tid == rhs.tid && type == rhs.type; } }; std::unordered_map<long, TidType> ts_tt; std::unordered_map<int, long> tid_ts; }; #endif // READER_WRITER_QUEUE_H
將隊(duì)列封裝為讀寫鎖
這一步封裝已經(jīng)非常容易了,一個(gè)請求到來,添加到隊(duì)列中。如果需要阻塞,那么就通過condition_variable等待通知。解鎖的時(shí)候,不僅僅需要在隊(duì)列中進(jìn)行移除,還需要notify_all。notify_all還可以優(yōu)化,但是這不是那么容易的事情了,不考慮。
#pragma once #ifndef SIMPLE_SHARED_MUTEX_H #define SIMPLE_SHARED_MUTEX_H #include <condition_variable> #include <ctime> #include <cstdio> #include <mutex> #include <unistd.h> #include "reader_writer_queue.h" class SimpleSharedMutex { public: void lock() { std::unique_lock lock{mtx}; auto tid = ::gettid(); queue.AddWriter(tid); while (queue.ShallBlock(tid)) cv.wait(lock); // printf("lock %d\n", tid); } void shared_lock() { std::unique_lock lock{mtx}; auto tid = ::gettid(); queue.AddReader(tid); while (queue.ShallBlock(tid)) cv.wait(lock); // printf("slock %d\n", tid); } void unlock() { std::unique_lock lock{mtx}; queue.Remove(::gettid()); cv.notify_all(); // printf("ulock %d\n", ::gettid()); } void shared_unlock() { std::unique_lock lock{mtx}; queue.Remove(::gettid()); cv.notify_all(); // printf("uslock %d\n", ::gettid()); } private: std::mutex mtx; ReaderWriterQueue queue; std::condition_variable cv; }; #endif // SIMPLE_SHARED_MUTEX_H
到此這篇關(guān)于C++用Mutex實(shí)現(xiàn)讀寫鎖的文章就介紹到這了,更多相關(guān)C++ Mutex讀寫鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!