C++環(huán)形緩沖區(qū)實(shí)踐與注意事項(xiàng)
環(huán)形緩沖區(qū)(Circular Buffer)是一種高效的數(shù)據(jù)結(jié)構(gòu),特別適用于生產(chǎn)者-消費(fèi)者場(chǎng)景、數(shù)據(jù)流處理和緩存管理。下面我將詳細(xì)介紹環(huán)形緩沖區(qū)的實(shí)現(xiàn)原理、代碼實(shí)踐和注意事項(xiàng)。
環(huán)形緩沖區(qū)核心概念
環(huán)形緩沖區(qū)通過(guò)固定大小的數(shù)組和兩個(gè)指針(讀指針和寫指針)實(shí)現(xiàn)循環(huán)使用存儲(chǔ)空間。當(dāng)指針到達(dá)數(shù)組末尾時(shí),會(huì)回到數(shù)組開(kāi)頭繼續(xù)操作。
完整實(shí)現(xiàn)代碼
#include <iostream>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <atomic>
#include <stdexcept>
template<typename T>
class CircularBuffer {
public:
// 構(gòu)造函數(shù)
explicit CircularBuffer(size_t size)
: buffer_(size), capacity_(size), read_pos_(0), write_pos_(0),
count_(0), is_full_(false) {
if (size == 0) {
throw std::invalid_argument("Buffer size must be greater than 0");
}
}
// 默認(rèn)析構(gòu)函數(shù)
~CircularBuffer() = default;
// 禁止拷貝和賦值
CircularBuffer(const CircularBuffer&) = delete;
CircularBuffer& operator=(const CircularBuffer&) = delete;
// 寫入數(shù)據(jù)(非阻塞)
bool push(const T& item) {
std::lock_guard<std::mutex> lock(mutex_);
if (is_full_) {
return false; // 緩沖區(qū)已滿
}
buffer_[write_pos_] = item;
write_pos_ = (write_pos_ + 1) % capacity_;
++count_;
is_full_ = (write_pos_ == read_pos_);
not_empty_.notify_one();
return true;
}
// 寫入數(shù)據(jù)(阻塞)
bool push_blocking(const T& item, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
std::unique_lock<std::mutex> lock(mutex_);
if (timeout.count() > 0) {
// 帶超時(shí)的等待
if (!not_full_.wait_for(lock, timeout, [this]() { return !is_full_; })) {
return false; // 超時(shí)
}
} else {
// 無(wú)限等待
not_full_.wait(lock, [this]() { return !is_full_; });
}
buffer_[write_pos_] = item;
write_pos_ = (write_pos_ + 1) % capacity_;
++count_;
is_full_ = (write_pos_ == read_pos_);
not_empty_.notify_one();
return true;
}
// 讀取數(shù)據(jù)(非阻塞)
bool pop(T& item) {
std::lock_guard<std::mutex> lock(mutex_);
if (empty()) {
return false; // 緩沖區(qū)為空
}
item = buffer_[read_pos_];
read_pos_ = (read_pos_ + 1) % capacity_;
--count_;
is_full_ = false;
not_full_.notify_one();
return true;
}
// 讀取數(shù)據(jù)(阻塞)
bool pop_blocking(T& item, std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
std::unique_lock<std::mutex> lock(mutex_);
if (timeout.count() > 0) {
// 帶超時(shí)的等待
if (!not_empty_.wait_for(lock, timeout, [this]() { return !empty(); })) {
return false; // 超時(shí)
}
} else {
// 無(wú)限等待
not_empty_.wait(lock, [this]() { return !empty(); });
}
item = buffer_[read_pos_];
read_pos_ = (read_pos_ + 1) % capacity_;
--count_;
is_full_ = false;
not_full_.notify_one();
return true;
}
// 查看但不移除數(shù)據(jù)
bool peek(T& item) const {
std::lock_guard<std::mutex> lock(mutex_);
if (empty()) {
return false;
}
item = buffer_[read_pos_];
return true;
}
// 清空緩沖區(qū)
void clear() {
std::lock_guard<std::mutex> lock(mutex_);
read_pos_ = write_pos_;
count_ = 0;
is_full_ = false;
// 注意:這里不需要通知條件變量,因?yàn)榍蹇詹僮鞑粫?huì)讓緩沖區(qū)變滿
}
// 判斷是否為空
bool empty() const {
// 注意:當(dāng) read_pos_ == write_pos_ 且 !is_full_ 時(shí)為空
return (!is_full_ && (read_pos_ == write_pos_));
}
// 判斷是否已滿
bool full() const {
return is_full_;
}
// 獲取當(dāng)前元素?cái)?shù)量
size_t size() const {
return count_;
}
// 獲取容量
size_t capacity() const {
return capacity_;
}
// 獲取剩余空間
size_t available() const {
return capacity_ - count_;
}
private:
std::vector<T> buffer_;
size_t capacity_;
mutable std::mutex mutex_;
std::condition_variable not_empty_;
std::condition_variable not_full_;
size_t read_pos_; // 讀位置
size_t write_pos_; // 寫位置
size_t count_; // 當(dāng)前元素?cái)?shù)量
bool is_full_; // 是否已滿標(biāo)志
};
// 無(wú)鎖環(huán)形緩沖區(qū)(適用于單生產(chǎn)者單消費(fèi)者場(chǎng)景)
template<typename T, size_t Size>
class LockFreeCircularBuffer {
public:
LockFreeCircularBuffer() : read_index_(0), write_index_(0) {}
bool push(const T& item) {
size_t current_write = write_index_.load(std::memory_order_relaxed);
size_t next_write = next_index(current_write);
// 檢查是否已滿
if (next_write == read_index_.load(std::memory_order_acquire)) {
return false;
}
buffer_[current_write] = item;
write_index_.store(next_write, std::memory_order_release);
return true;
}
bool pop(T& item) {
size_t current_read = read_index_.load(std::memory_order_relaxed);
// 檢查是否為空
if (current_read == write_index_.load(std::memory_order_acquire)) {
return false;
}
item = buffer_[current_read];
read_index_.store(next_index(current_read), std::memory_order_release);
return true;
}
bool empty() const {
return read_index_.load(std::memory_order_acquire) ==
write_index_.load(std::memory_order_acquire);
}
bool full() const {
size_t next_write = next_index(write_index_.load(std::memory_order_relaxed));
return next_write == read_index_.load(std::memory_order_acquire);
}
size_t size() const {
size_t write = write_index_.load(std::memory_order_acquire);
size_t read = read_index_.load(std::memory_order_acquire);
if (write >= read) {
return write - read;
} else {
return Size - read + write;
}
}
private:
size_t next_index(size_t current) const {
return (current + 1) % Size;
}
T buffer_[Size];
std::atomic<size_t> read_index_;
std::atomic<size_t> write_index_;
};
// 演示使用示例
void demo_usage() {
std::cout << "=== 環(huán)形緩沖區(qū)演示 ===" << std::endl;
// 創(chuàng)建容量為5的環(huán)形緩沖區(qū)
CircularBuffer<int> buffer(5);
// 基本操作演示
std::cout << "緩沖區(qū)容量: " << buffer.capacity() << std::endl;
std::cout << "初始大小: " << buffer.size() << std::endl;
std::cout << "是否為空: " << buffer.empty() << std::endl;
// 寫入數(shù)據(jù)
for (int i = 1; i <= 5; ++i) {
if (buffer.push(i)) {
std::cout << "寫入: " << i << std::endl;
}
}
std::cout << "寫入5個(gè)數(shù)據(jù)后大小: " << buffer.size() << std::endl;
std::cout << "是否已滿: " << buffer.full() << std::endl;
// 嘗試寫入第6個(gè)數(shù)據(jù)(應(yīng)該失?。?
if (!buffer.push(6)) {
std::cout << "寫入失敗 - 緩沖區(qū)已滿" << std::endl;
}
// 讀取數(shù)據(jù)
int value;
while (buffer.pop(value)) {
std::cout << "讀取: " << value << std::endl;
}
std::cout << "讀取所有數(shù)據(jù)后大小: " << buffer.size() << std::endl;
std::cout << "是否為空: " << buffer.empty() << std::endl;
}
// 生產(chǎn)者-消費(fèi)者演示
void producer_consumer_demo() {
std::cout << "\n=== 生產(chǎn)者-消費(fèi)者演示 ===" << std::endl;
CircularBuffer<int> buffer(10);
std::atomic<bool> stop_producer(false);
std::atomic<bool> stop_consumer(false);
// 生產(chǎn)者線程
std::thread producer([&]() {
for (int i = 1; i <= 15; ++i) {
if (buffer.push_blocking(i, std::chrono::milliseconds(100))) {
std::cout << "生產(chǎn): " << i << std::endl;
} else {
std::cout << "生產(chǎn)超時(shí): " << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
stop_producer.store(true);
});
// 消費(fèi)者線程
std::thread consumer([&]() {
int value;
while (!stop_producer.load() || !buffer.empty()) {
if (buffer.pop_blocking(value, std::chrono::milliseconds(200))) {
std::cout << "消費(fèi): " << value << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
stop_consumer.store(true);
});
producer.join();
consumer.join();
std::cout << "生產(chǎn)者-消費(fèi)者演示結(jié)束" << std::endl;
}
// 性能測(cè)試
void performance_test() {
std::cout << "\n=== 性能測(cè)試 ===" << std::endl;
const int ITERATIONS = 1000000;
CircularBuffer<int> buffer(1000);
auto start = std::chrono::high_resolution_clock::now();
std::thread producer([&]() {
for (int i = 0; i < ITERATIONS; ++i) {
while (!buffer.push(i)) {
std::this_thread::yield();
}
}
});
std::thread consumer([&]() {
int value;
for (int i = 0; i < ITERATIONS; ++i) {
while (!buffer.pop(value)) {
std::this_thread::yield();
}
// 驗(yàn)證數(shù)據(jù)完整性
if (value != i) {
std::cerr << "數(shù)據(jù)損壞: 期望 " << i << ", 得到 " << value << std::endl;
}
}
});
producer.join();
consumer.join();
auto end = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
std::cout << "處理 " << ITERATIONS << " 個(gè)元素耗時(shí): "
<< duration.count() << " ms" << std::endl;
std::cout << "吞吐量: "
<< (ITERATIONS * 1000.0 / duration.count()) << " 操作/秒" << std::endl;
}
int main() {
try {
demo_usage();
producer_consumer_demo();
performance_test();
// 無(wú)鎖緩沖區(qū)演示
std::cout << "\n=== 無(wú)鎖環(huán)形緩沖區(qū)演示 ===" << std::endl;
LockFreeCircularBuffer<int, 5> lock_free_buffer;
for (int i = 1; i <= 3; ++i) {
if (lock_free_buffer.push(i)) {
std::cout << "無(wú)鎖緩沖區(qū)寫入: " << i << std::endl;
}
}
int value;
while (lock_free_buffer.pop(value)) {
std::cout << "無(wú)鎖緩沖區(qū)讀取: " << value << std::endl;
}
} catch (const std::exception& e) {
std::cerr << "錯(cuò)誤: " << e.what() << std::endl;
return 1;
}
return 0;
}
關(guān)鍵注意事項(xiàng)
1. 線程安全設(shè)計(jì)
- 互斥鎖保護(hù):使用
std::mutex保護(hù)共享數(shù)據(jù) - 條件變量:使用
std::condition_variable實(shí)現(xiàn)高效的等待通知機(jī)制 - 內(nèi)存序:無(wú)鎖版本中正確使用內(nèi)存序保證數(shù)據(jù)一致性
2. 空滿判斷策略
// 方法1:使用計(jì)數(shù)變量(推薦)
bool empty() const { return count_ == 0; }
bool full() const { return count_ == capacity_; }
// 方法2:使用標(biāo)志位
bool empty() const { return !is_full_ && (read_pos_ == write_pos_); }
bool full() const { return is_full_; }
3. 指針管理
// 指針前進(jìn) read_pos_ = (read_pos_ + 1) % capacity_; write_pos_ = (write_pos_ + 1) % capacity_;
4. 異常安全
- 構(gòu)造函數(shù)驗(yàn)證參數(shù)有效性
- 使用 RAII 管理資源
- 提供強(qiáng)異常安全保證
編譯和運(yùn)行
使用以下命令編譯:
g++ -std=c++11 -pthread -O2 circular_buffer.cpp -o circular_buffer
運(yùn)行:
./circular_buffer
性能優(yōu)化建議
- 緩存友好:確保數(shù)據(jù)連續(xù)存儲(chǔ),提高緩存命中率
- 減少鎖競(jìng)爭(zhēng):使用細(xì)粒度鎖或無(wú)鎖設(shè)計(jì)
- 批量操作:支持批量讀寫減少鎖開(kāi)銷
- 內(nèi)存預(yù)分配:避免動(dòng)態(tài)內(nèi)存分配
適用場(chǎng)景
- 數(shù)據(jù)流處理(音頻、視頻流)
- 生產(chǎn)者-消費(fèi)者模式
- 網(wǎng)絡(luò)數(shù)據(jù)包緩沖
- 實(shí)時(shí)系統(tǒng)數(shù)據(jù)交換
- 日志記錄系統(tǒng)
這個(gè)實(shí)現(xiàn)提供了完整的環(huán)形緩沖區(qū)功能,包括線程安全、阻塞/非阻塞操作、異常安全等特性,可以直接在生產(chǎn)環(huán)境中使用。
到此這篇關(guān)于C++環(huán)形緩沖區(qū)實(shí)踐與注意事項(xiàng)的文章就介紹到這了,更多相關(guān)C++環(huán)形緩沖區(qū)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
利用C++實(shí)現(xiàn)簡(jiǎn)易的狼人殺游戲
狼人殺游戲是一款非常有趣的角色扮演游戲,它需要玩家之間互相猜測(cè)身份并進(jìn)行投票,通過(guò)推理來(lái)找出真正的狼人。本文將用C++實(shí)現(xiàn)這一游戲,感興趣的可以了解一下2023-04-04
在C++中實(shí)現(xiàn)云端存儲(chǔ)變量的操作步驟
隨著云計(jì)算技術(shù)的快速發(fā)展,現(xiàn)在我們可以將數(shù)據(jù)存儲(chǔ)在云端,以便于在不同設(shè)備和地點(diǎn)訪問(wèn),在C++中,我們也可以通過(guò)一些方法來(lái)實(shí)現(xiàn)這個(gè)功能,本文將詳細(xì)介紹如何在C++中實(shí)現(xiàn)云端存儲(chǔ)變量,需要的朋友可以參考下2023-11-11
Qt實(shí)現(xiàn)圖片移動(dòng)實(shí)例(圖文教程)
這學(xué)期實(shí)訓(xùn)的時(shí)候用MFC做過(guò)一個(gè)飛機(jī)大戰(zhàn),很無(wú)聊的東西,一直想用Qt做一個(gè);首先需要解決的問(wèn)題是圖片的移動(dòng),怎么說(shuō)飛機(jī)啊子彈啊都是動(dòng)著的,圖片當(dāng)然要跑起來(lái),感興趣的你可不要走開(kāi)啊2013-01-01
C++ EnterCriticalSection簡(jiǎn)單使用
線程鎖在多線程中可以控制線程的執(zhí)行順序,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
C++使用TinyXML2實(shí)現(xiàn)解析和生成XML數(shù)據(jù)
TinyXML2是一個(gè)輕量級(jí)的、開(kāi)源的C++庫(kù),專門用于解析和生成XML文檔,本文主要為大家介紹了如何使用TinyXML2實(shí)現(xiàn)解析和生成XML數(shù)據(jù),需要的可以參考下2024-04-04
c語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的易語(yǔ)言
在本篇內(nèi)容里小編給大家整理了一篇關(guān)于c語(yǔ)言實(shí)現(xiàn)一個(gè)簡(jiǎn)單的易語(yǔ)言的相關(guān)知識(shí)點(diǎn),需要的朋友們參考下。2018-12-12
C++ OpenCV實(shí)現(xiàn)圖像雙三次插值算法詳解
圖像雙三次插值的原理,就是目標(biāo)圖像的每一個(gè)像素都是由原圖上相對(duì)應(yīng)點(diǎn)周圍的4x4=16個(gè)像素經(jīng)過(guò)加權(quán)之后再相加得到的。本文主要介紹了通過(guò)C++ OpenCV實(shí)現(xiàn)圖像雙三次插值算法,需要的可以參考一下2021-12-12

