C++高性能服務(wù)器框架之線程模塊
線程模塊概述
該模塊基于pthread
實現(xiàn)。sylar說,由于c++11中的thread也是由pthread封裝實現(xiàn)的,并且沒有提供讀寫互斥量,讀寫鎖,自旋鎖等,所以自己封裝了pthread。
鎖模塊實現(xiàn)了信號量、互斥量、讀寫鎖、自旋鎖、原子鎖的封裝
- class Semaphore:信號量封裝
- class Mutex:互斥量封裝
- class RWMutex:讀寫鎖封裝
- class Spinlock:自旋鎖封裝
- class CASLock:原子鎖封裝
線程模塊主要由Thread類實現(xiàn)
- class Thread:實現(xiàn)線程的封裝
關(guān)于線程id的問題,在獲取線程id時使用syscall獲得唯一的線程id
進(jìn)程pid: getpid() ? ? ? ? ? ? ? ? 線程tid: pthread_self() ? ? //進(jìn)程內(nèi)唯一,但是在不同進(jìn)程則不唯一。 線程pid: syscall(SYS_gettid) ? ? //系統(tǒng)內(nèi)是唯一的
鎖模塊詳解
class Semaphore(信號量)
mumber(成員函數(shù))
// 信號量,它本質(zhì)上是一個長整型的數(shù) sem_t m_semaphore;
Semaphore(構(gòu)造函數(shù))
初始化信號量。函數(shù)原型:int sem_init(sem_t *sem, int pshared, unsigned int value);
其中,參數(shù) sem
是指向要初始化的信號量的指針;參數(shù) pshared
指定了信號量是進(jìn)程內(nèi)共享還是跨進(jìn)程共享,如果值為 0,則表示進(jìn)程內(nèi)共享;參數(shù) value
是信號量的初始值。該函數(shù)成功時返回 0,否則返回 -1,并設(shè)置適當(dāng)?shù)腻e誤碼。
Semaphore::Semaphore(uint32_t count) { ? ?if (sem_init(&m_semaphore, 0, count)) { ? ? ? ?throw std::logic_error("sem_init error"); ? } }
~Semaphore(析構(gòu)函數(shù))
銷毀信號量。函數(shù)原型:int sem_destroy(sem_t *sem);
注意,只有在確保沒有任何線程或進(jìn)程正在使用該信號量時,才應(yīng)該調(diào)用 sem_destroy()
函數(shù)。否則,可能會導(dǎo)致未定義的行為。此外,如果在調(diào)用 sem_destroy()
函數(shù)之前,沒有使用 sem_post()
函數(shù)將信號量的值增加到其初始值,則可能會導(dǎo)致在銷毀信號量時出現(xiàn)死鎖情況。
Semaphore::~Semaphore() { ? ?sem_destroy(&m_semaphore); }
wait(獲取信號量)
函數(shù)原型:int sem_wait(sem_t *sem);
其中,參數(shù) sem
是指向要獲取的信號量的指針。如果在調(diào)用此函數(shù)時信號量的值大于零,則該值將遞減并立即返回。如果信號量的值為零,則當(dāng)前線程將被阻塞,直到信號量的值大于零或者被信號中斷。
當(dāng)線程成功獲取信號量時,可以執(zhí)行相應(yīng)的操作來使用資源。使用完資源后,可以通過調(diào)用 sem_post()
函數(shù)來增加信號量的值以釋放資源,并使其他等待線程得以繼續(xù)執(zhí)行。
void Semaphore::wait() { ? ?if (sem_wait(&m_semaphore)) { ? ? ? ?throw std::logic_error("sem_wait error"); ? } }
notify(釋放信號量
函數(shù)原型:int sem_post(sem_t *sem);
用于向指定的命名或未命名信號量發(fā)送信號,使其計數(shù)器加1。如果有進(jìn)程或線程正在等待該信號量,那么其中一個將被喚醒以繼續(xù)執(zhí)行。
參數(shù):sem
:指向要增加計數(shù)器的信號量的指針。
返回值:成功時返回0,失敗時返回-1,并設(shè)置errno來指示錯誤原因。
void Semaphore::notify() { ? ?if (sem_post(&m_semaphore)) { ? ? ? ?throw std::logic_error("sem_post error"); ? } }
鎖
為方便封裝各種鎖,這里定義了3個結(jié)構(gòu)體,都在構(gòu)造函數(shù)時自動lock
,在析構(gòu)時自動unlock
,這樣可以簡化鎖的操作,避免忘記解鎖導(dǎo)致死鎖。
- ScopedLockImpl:用來分裝互斥量,自旋鎖,原子鎖
- ReadScopedLockImpl && WriteScopedLockImpl:用來封裝讀寫鎖
class Mutex(互斥量)
mumber(成員函數(shù))
// 互斥量 pthread_mutex_t m_mutex;
Mutex(構(gòu)造函數(shù))
初始化互斥鎖對象。函數(shù)原型:int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
參數(shù)說明:
mutex
:指向要初始化的互斥鎖對象的指針。attr
:指向互斥鎖屬性對象的指針,可以為NULL以使用默認(rèn)屬性。
返回值:
- 成功時返回0,失敗時返回錯誤碼并設(shè)置errno變量。
Mutex () { pthread_mutex_init(&m_mutex, nullptr); }
~Mutex(析構(gòu)函數(shù))
銷毀已初始化的互斥鎖對象。函數(shù)原型:int pthread_mutex_destroy(pthread_mutex_t *mutex);
參數(shù)說明:
mutex
:指向要銷毀的互斥鎖對象的指針。
返回值:
- 成功時返回0,失敗時返回錯誤碼并設(shè)置errno變量。
~Mutex () { pthread_mutex_destroy(&m_mutex); }
lock(加鎖)
函數(shù)原型:int pthread_mutex_lock(pthread_mutex_t *mutex);
參數(shù)說明:
mutex
:指向要加鎖的互斥鎖對象的指針。
返回值
- 成功時返回0,失敗時返回錯誤碼并設(shè)置errno變量。
當(dāng)一個線程調(diào)用pthread_mutex_lock()
時,如果當(dāng)前該互斥鎖沒有被其它線程持有,則該線程會獲得該互斥鎖,并將其標(biāo)記為已被持有;如果該互斥鎖已經(jīng)被其它線程持有,則當(dāng)前線程會被阻塞,直到該互斥鎖被釋放并重新嘗試加鎖。
void lock() { pthread_mutex_lock(&m_mutex); }
unlock(解鎖)
函數(shù)原型:int pthread_mutex_unlock(pthread_mutex_t *mutex);
參數(shù)說明:
mutex
:指向要解鎖的互斥鎖對象的指針。
返回值:
- 成功時返回0,失敗時返回錯誤碼并設(shè)置errno變量。
當(dāng)一個線程調(diào)用pthread_mutex_unlock()
時,該互斥鎖將被標(biāo)記為未被持有,并且如果有其它線程正在等待該鎖,則其中一個線程將被喚醒以繼續(xù)執(zhí)行。
void unlock() { pthread_mutex_unlock(&m_mutex); }
class RWMutex(讀寫鎖)
mumber(成員變量)
// 讀寫鎖 pthread_rwlock_t m_lock;
RWMutex(構(gòu)造函數(shù))
初始化一個讀寫鎖對象。函數(shù)原型:int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr);
參數(shù)說明:
rwlock
:指向要初始化的讀寫鎖對象的指針。attr
:指向讀寫鎖屬性對象的指針,可以為NULL以使用默認(rèn)屬性。
返回值:
- 成功時返回0,失敗時返回錯誤碼并設(shè)置errno變量。
讀寫鎖是一種同步機(jī)制,用于在多線程環(huán)境下對共享資源進(jìn)行訪問控制。與互斥鎖不同,讀寫鎖允許多個線程同時讀取共享資源,但只允許一個線程寫入共享資源。這樣可以提高程序的性能和效率,但需要注意避免讀寫鎖死鎖等問題。
RWMutex() { pthread_rwlock_init(&m_lock, nullptr); }
~RWMutex(析構(gòu)函數(shù))
銷毀一個讀寫鎖。
~RWMutex() { pthread_rwlock_destroy(&m_lock); }
rdlock(加讀鎖)
pthread_rwlock_rdlock()
用于獲取讀取鎖(pthread_rwlock_t)上的共享讀取訪問權(quán)限。它允許多個線程同時讀取共享資源,但不能寫入它。如果有線程已經(jīng)持有寫入鎖,則其他線程將被阻塞直到寫入鎖被釋放。調(diào)用此函數(shù)時,如果另一個線程已經(jīng)持有寫入鎖,則該線程將被阻塞,直到寫入鎖被釋放。
void rdlock() { pthread_rwlock_rdlock(&m_lock); }
wrlock(加寫鎖)
pthread_rwlock_wrlock()
用于獲取寫入鎖(pthread_rwlock_t)上的排他寫訪問權(quán)限。它阻止其他線程讀取或?qū)懭牍蚕碣Y源,直到該線程釋放寫入鎖。如果有其他線程已經(jīng)持有讀取或?qū)懭腈i,則調(diào)用此函數(shù)的線程將被阻塞,直到所有的讀取和寫入鎖都被釋放。
void wrlock() { pthread_rwlock_wrlock(&m_lock); }
unlock(解鎖)
pthread_rwlock_unlock()
用于釋放讀取或?qū)懭腈i(pthread_rwlock_t)。它允許其他線程獲取相應(yīng)的鎖來訪問共享資源。如果當(dāng)前線程沒有持有讀取或?qū)懭腈i,則調(diào)用pthread_rwlock_unlock將導(dǎo)致未定義的行為。此外,如果已經(jīng)銷毀了讀寫鎖,則再次調(diào)用pthread_rwlock_unlock也會導(dǎo)致未定義的行為。在使用pthread_rwlock_t時,需要注意正確地獲取和釋放讀取或?qū)懭腈i,以確保多個線程可以正確地訪問共享資源。
void unlock() { pthread_rwlock_unlock(&m_lock); }
class Spinlock(自旋鎖)
與mutex不同,自旋鎖不會使線程進(jìn)入睡眠狀態(tài),而是在獲取鎖時進(jìn)行忙等待,直到鎖可用。當(dāng)鎖被釋放時,等待獲取鎖的線程將立即獲取鎖,從而避免了線程進(jìn)入和退出睡眠狀態(tài)的額外開銷。
mumber(成員變量)
// 自旋鎖 pthread_spinlock_t m_mutex;
Spinlock(構(gòu)造函數(shù))
函數(shù)pthread_spin_init(&m_mutex, 0)
是用于對自旋鎖進(jìn)行初始化的函數(shù),其中第一個參數(shù)&m_mutex
表示要初始化的自旋鎖變量,第二個參數(shù)0
表示使用默認(rèn)的屬性。在調(diào)用pthread_spin_init
函數(shù)之前,必須先分配內(nèi)存空間來存儲自旋鎖變量。與pthread_rwlock_t
類似,需要在使用自旋鎖前先進(jìn)行初始化才能正確使用。
Spinlock() { pthread_spin_init(&m_mutex, 0); }
~Spinlock(析構(gòu)函數(shù))
pthread_spin_destroy()
用于銷毀自旋鎖(pthread_spinlock_t)。在不再需要自旋鎖時,可以使用pthread_spin_destroy函數(shù)將其銷毀。該函數(shù)確保在銷毀自旋鎖之前所有等待的線程都被解除阻塞并返回適當(dāng)?shù)腻e誤碼。如果自旋鎖已經(jīng)被銷毀,則再次調(diào)用pthread_spin_destroy將導(dǎo)致未定義的行為。
~Spinlock() { pthread_spin_destroy(&m_mutex); }
lock(加鎖)
pthread_spin_lock()
用于獲取自旋鎖(pthread_spinlock_t)上的排他訪問權(quán)限。與mutex
不同,自旋鎖在獲取鎖時忙等待,即不斷地檢查鎖狀態(tài)是否可用,如果不可用則一直循環(huán)等待,直到鎖可用。當(dāng)鎖被其他線程持有時,調(diào)用pthread_spin_lock()
的線程將在自旋等待中消耗CPU時間,直到鎖被釋放并獲取到鎖。
void lock() { pthread_spin_lock(&m_mutex); }
unlock(解鎖)
pthread_spin_unlock()
用于釋放自旋鎖(pthread_spinlock_t)。調(diào)用該函數(shù)可以使其他線程獲取相應(yīng)的鎖來訪問共享資源。與mutex
不同,自旋鎖在釋放鎖時并不會導(dǎo)致線程進(jìn)入睡眠狀態(tài),而是立即釋放鎖并允許等待獲取鎖的線程快速地獲取鎖來訪問共享資源,從而避免了線程進(jìn)入和退出睡眠狀態(tài)的額外開銷。
void unlock() { pthread_spin_unlock(&m_mutex); }
class CASLock(原子鎖)
mumber(成員變量)
// m_mutex是一個原子布爾類型,具有特殊的原子性質(zhì),可以用于實現(xiàn)線程間同步和互斥。 // volatile關(guān)鍵字表示該變量可能會被異步修改,因此編譯器不會對其進(jìn)行優(yōu)化,而是每次都從內(nèi)存中讀取該變量的值。 volatile std::atomic_flag m_mutex;
CASLock(構(gòu)造函數(shù))
atomic_flag.clear()是C++標(biāo)準(zhǔn)庫中的一個原子操作函數(shù),用于將給定的原子標(biāo)志位(atomic flag)清除或重置為未設(shè)置狀態(tài)。
在多線程編程中,原子標(biāo)志位通常用于實現(xiàn)簡單的鎖機(jī)制,以確保對共享資源的訪問是互斥的。使用atomic_flag.clear()可以輕松地重置標(biāo)志位,使之再次可用于控制對共享資源的訪問。需要注意的是,由于該函數(shù)是一個原子操作,因此可以安全地在多個線程之間使用,而無需擔(dān)心競態(tài)條件和數(shù)據(jù)競爭等問題。
CASLock () { m_mutex.clear(); }
lock(加鎖)
std::atomic_flag_test_and_set_explicit()
是C++標(biāo)準(zhǔn)庫中的一個原子操作函數(shù),用于測試給定的原子標(biāo)志位(atomic flag)是否被設(shè)置,并在測試后將其設(shè)置為已設(shè)置狀態(tài)。該函數(shù)接受一個指向原子標(biāo)志位對象的指針作為參數(shù),并返回一個布爾值,表示在調(diào)用函數(shù)前該標(biāo)志位是否已經(jīng)被設(shè)置。第二個可選參數(shù)order用于指定內(nèi)存序,以控制原子操作的內(nèi)存順序和同步行為。通過循環(huán)等待實現(xiàn)了互斥鎖的效果。
std::memory_order_acquire
是C++中的一種內(nèi)存序,用于指定原子操作的同步和內(nèi)存順序。具體來說,使用std::memory_order_acquire
可以確保在當(dāng)前線程獲取鎖之前,所有該線程之前發(fā)生的寫操作都被完全同步到主內(nèi)存中。這樣可以防止編譯器或硬件對寫操作進(jìn)行重排序或延遲,從而確保其他線程可以正確地讀取共享資源的最新值。
void lock() { while (std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire)); }
unlock(解鎖)
atomic_flag_clear_explicit()
是C++標(biāo)準(zhǔn)庫中的一個原子操作函數(shù),用于將給定的原子標(biāo)志位(atomic flag)清除或重置為未設(shè)置狀態(tài)。該函數(shù)接受一個指向原子標(biāo)志位對象的指針作為參數(shù),并使用可選的第二個參數(shù)order來指定內(nèi)存序,以控制原子操作的同步和內(nèi)存順序。
void unlock() { std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release); }
線程模塊詳解
class Thread
定義了兩個線程局部變量用于指向當(dāng)前線程以及線程的名稱。
static thread_local
是C++中的一個關(guān)鍵字組合,用于定義靜態(tài)線程本地存儲變量。具體來說,當(dāng)一個變量被聲明為static thread_local
時,它會在每個線程中擁有自己獨立的靜態(tài)實例,并且對其他線程不可見。這使得變量可以跨越多個函數(shù)調(diào)用和代碼塊,在整個程序運(yùn)行期間保持其狀態(tài)和值不變。
需要注意的是,由于靜態(tài)線程本地存儲變量是線程特定的,因此它們的初始化和銷毀時機(jī)也與普通靜態(tài)變量不同。具體來說,在每個線程首次訪問該變量時會進(jìn)行初始化,在線程結(jié)束時才會進(jìn)行銷毀,而不是在程序啟動或運(yùn)行期間進(jìn)行一次性初始化或銷毀。
// 指向當(dāng)前線程 static thread_local Thread *t_thread = nullptr; // 指向線程名稱 static thread_local std::string t_thread_name = "UNKNOW";
mumber(成員變量)
// 指向當(dāng)前線程 static thread_local Thread *t_thread = nullptr; // 指向線程名稱 static thread_local std::string t_thread_name = "UNKNOW";
Thread(構(gòu)造函數(shù))
初始化線程執(zhí)行函數(shù)、線程名稱,創(chuàng)建新線程。
函數(shù)原型:int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
該函數(shù)接受四個參數(shù):
thread
:指向pthread_t類型的指針,用于返回新線程的ID。attr
:指向pthread_attr_t類型的指針,該結(jié)構(gòu)體包含一些有關(guān)新線程屬性的信息。可以將其設(shè)置為NULL以使用默認(rèn)值。start_routine
:是指向新線程函數(shù)的指針,該函數(shù)將在新線程中運(yùn)行。該函數(shù)必須采用一個void類型的指針作為參數(shù),并返回一個void類型的指針。arg
:是指向新線程函數(shù)的參數(shù)的指針。如果不需要傳遞參數(shù),則可以將其設(shè)置為NULL。
調(diào)用pthread_create函數(shù)后,將會創(chuàng)建一個新線程,并開始執(zhí)行通過start_routine傳遞給它的函數(shù)。新線程的ID將存儲在thread指向的變量中。請注意,新線程將在與調(diào)用pthread_create函數(shù)的線程并發(fā)執(zhí)行的情況下運(yùn)行。
Thread::Thread(std::function<void()> cb, const std::string &name) :m_cb(cb) ,m_name(name) { if (m_name.empty()) { m_name = "UNKNOW"; } // 創(chuàng)建新線程,并將其與Thread::run方法關(guān)聯(lián),創(chuàng)建的新線程對象this作為參數(shù)傳給run方法 int rt = pthread_create(&m_thread, nullptr, &Thread::run, this); if (rt) { SYLAR_LOG_ERROR(g_logger) << "pthread_creat thread fail, rt = " << rt << "name = " << name; throw std::logic_error("pthread_creat error"); } // 在出構(gòu)造函數(shù)之前,確保線程先跑起來, 保證能夠初始化id m_semaphore.wait(); }
~Thread(析構(gòu)函數(shù))
首先檢查m_thread
是否存在,如果存在,則調(diào)用pthread_detach(m_thread)
函數(shù)來分離已經(jīng)結(jié)束的線程。pthread_detach
函數(shù)用于釋放與線程關(guān)聯(lián)的資源,并確保線程可以安全地終止。通過在析構(gòu)函數(shù)中分離線程,可以避免在主線程退出時出現(xiàn)懸掛線程,從而防止內(nèi)存泄漏和其他問題。
Thread::~Thread() { if (m_thread) { pthread_detach(m_thread); } }
join(等待線程執(zhí)行完成)
pthread_join
用于等待指定線程的終止,并獲取該線程的返回值。它的原型為:
#include <pthread.h> int pthread_join(pthread_t thread, void **retval);
該函數(shù)接受兩個參數(shù):
thread
:要等待的線程ID。retval
:指向指針的指針,用于存儲線程返回的值。如果不需要獲取返回值,則可以將其設(shè)置為NULL。
當(dāng)調(diào)用 pthread_join() 時,當(dāng)前線程會阻塞,直到指定的線程完成執(zhí)行。一旦線程結(jié)束,當(dāng)前線程就會恢復(fù)執(zhí)行,并且可以通過 retval
參數(shù)來獲取線程的返回值。如果不關(guān)心線程的返回值,也可以將 retval
參數(shù)設(shè)置為 NULL
。成功:返回 0 表示線程成功退出。
總之,pthread_join
函數(shù)是一種阻塞機(jī)制,用于等待指定線程的終止并獲取其返回值,同時負(fù)責(zé)回收線程所使用的資源。
void Thread::join() { if (m_thread) { int rt = pthread_join(m_thread, nullptr); if (rt) { SYLAR_LOG_ERROR(g_logger) << "pthread_join thread fail, rt = " << rt << "name = " << m_name; throw std::logic_error("pthread_join error"); } m_thread = 0; } }
run(線程執(zhí)行函數(shù))
通過信號量,能夠確保構(gòu)造函數(shù)在創(chuàng)建線程之后會一直阻塞,直到run
方法運(yùn)行并通知信號量,構(gòu)造函數(shù)才會返回。
在構(gòu)造函數(shù)中完成線程的啟動和初始化操作,可能會導(dǎo)致線程還沒有完全啟動就被調(diào)用,從而導(dǎo)致一些未知的問題。因此,在出構(gòu)造函數(shù)之前,確保線程先跑起來,保證能夠初始化id,可以避免這種情況的發(fā)生。同時,這也可以保證線程的安全性和穩(wěn)定性。
總結(jié)
- 對日志系統(tǒng)的臨界資源進(jìn)行互斥訪問時,使用自旋鎖而不是互斥鎖。
- mutex使用系統(tǒng)調(diào)用將線程阻塞,并等待其他線程釋放鎖后再喚醒它,這種方式適用于長時間持有鎖的情況。而spinlock在獲取鎖時忙等待,即不斷地檢查鎖狀態(tài)是否可用,如果不可用則一直循環(huán)等待,因此適用于短時間持有鎖的情況。
- 由于mutex會將線程阻塞,因此在高并發(fā)情況下可能會出現(xiàn)線程頻繁地進(jìn)入和退出睡眠狀態(tài),導(dǎo)致系統(tǒng)開銷大。而spinlock雖然不會使線程進(jìn)入睡眠狀態(tài),但會消耗大量的CPU時間,在高并發(fā)情況下也容易導(dǎo)致性能問題。
- 另外,當(dāng)一個線程嘗試獲取已經(jīng)被其他線程持有的鎖時,mutex會將該線程阻塞,而spinlock則會在自旋等待中消耗CPU時間。如果鎖的持有時間較短,則spinlock比mutex更適合使用;如果鎖的持有時間較長,則mutex比spinlock
- 在構(gòu)造函數(shù)中創(chuàng)建子進(jìn)程并等待其完成執(zhí)行是一種常見的技術(shù),可以通過信號量(Semaphore)來實現(xiàn)主線程等待子線程完成。
- 首先,在主線程中創(chuàng)建一個Semaphore對象并初始化為0。然后,在構(gòu)造函數(shù)中創(chuàng)建子線程,并將Semaphore對象傳遞給子線程。子線程將執(zhí)行所需的操作,并在最后使用Semaphore對象發(fā)出信號通知主線程它已經(jīng)完成了工作。
- 主線程在構(gòu)造函數(shù)中調(diào)用Semaphore對象的wait方法,這會使主線程阻塞直到收到信號并且Semaphore對象的計數(shù)器值大于0。當(dāng)子線程發(fā)出信號時,Semaphore對象的計數(shù)器值增加1,因此主線程可以繼續(xù)執(zhí)行構(gòu)造函數(shù)的剩余部分。
以上就是C++高性能服務(wù)器框架之線程模塊的詳細(xì)內(nèi)容,更多關(guān)于C++ 線程模塊的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
VS2019安裝配置MFC(安裝vs2019時沒有安裝mfc)
這篇文章主要介紹了VS2019安裝配置MFC(安裝vs2019時沒有安裝mfc),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-03-03詳解C/C++中const關(guān)鍵字的用法及其與宏常量的比較
簡單的說const關(guān)鍵字修飾的變量具有常屬性,也就是說它所修飾的變量不能被修改,下文給大家介紹C/C++中const關(guān)鍵字的用法及其與宏常量的比較,需要的朋友可以參考下2017-07-07