欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C++無鎖隊(duì)列的原理與實(shí)現(xiàn)示例

 更新時(shí)間:2024年01月22日 10:41:42   作者:流星雨愛編程  
C++無鎖隊(duì)列是一種多線程編程技術(shù),它可以在不使用鎖的情況下實(shí)現(xiàn)線程安全的隊(duì)列,本文就來詳細(xì)的介紹一下C++無鎖隊(duì)列的原理與實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下

1.無鎖隊(duì)列原理

1.1.隊(duì)列操作模型

隊(duì)列是一種非常重要的數(shù)據(jù)結(jié)構(gòu),其特性是先進(jìn)先出(FIFO),符合流水線業(yè)務(wù)流程。在進(jìn)程間通信、網(wǎng)絡(luò)通信間經(jīng)常采用隊(duì)列做緩存,緩解數(shù)據(jù)處理壓力。根據(jù)操作隊(duì)列的場景分為:單生產(chǎn)者——單消費(fèi)者、多生產(chǎn)者——單消費(fèi)者、單生產(chǎn)者——多消費(fèi)者、多生產(chǎn)者——多消費(fèi)者四大模型。根據(jù)隊(duì)列中數(shù)據(jù)分為:隊(duì)列中的數(shù)據(jù)是定長的、隊(duì)列中的數(shù)據(jù)是變長的。

(1)單生產(chǎn)者——單消費(fèi)者

(2)多生產(chǎn)者——單消費(fèi)者

(3)單生產(chǎn)者——多消費(fèi)者

(4)多生產(chǎn)者——多消費(fèi)者

(5)數(shù)據(jù)定長隊(duì)列

(6)數(shù)據(jù)變長隊(duì)列

1.2.無鎖隊(duì)列簡介

生產(chǎn)環(huán)境中廣泛使用生產(chǎn)者和消費(fèi)者模型,要求生產(chǎn)者在生產(chǎn)的同時(shí),消費(fèi)者可以進(jìn)行消費(fèi),通常使用互斥鎖保證數(shù)據(jù)同步。但線程互斥鎖的開銷仍然比較大,因此在要求高性能、低延時(shí)場景中,推薦使用無鎖隊(duì)列。

1.3.CAS操作

CAS即Compare and Swap,是所有CPU指令都支持CAS的原子操作(X86中CMPXCHG匯編指令),用于實(shí)現(xiàn)實(shí)現(xiàn)各種無鎖(lock free)數(shù)據(jù)結(jié)構(gòu)。

CAS操作的C語言實(shí)現(xiàn)如下:

bool compare_and_swap ( int *memory_location, int expected_value, int new_value)
{
    if (*memory_location == expected_value)
    {
        *memory_location = new_value;
        return true;
    }
    return false;
}

CAS用于檢查一個(gè)內(nèi)存位置是否包含預(yù)期值,如果包含,則把新值復(fù)賦值到內(nèi)存位置。成功返回true,失敗返回false。

(1)GGC對CAS支持

GCC4.1+版本中支持CAS原子操作。

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);
 
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);

(2)Windows對CAS支持

Windows中使用Windows API支持CAS。

LONG InterlockedCompareExchange(
  LONG volatile *Destination,
  LONG          ExChange,
  LONG          Comperand
);

(3)C++11對CAS支持

   C++11 STL中atomic函數(shù)支持CAS并可以跨平臺(tái)。

template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );
 
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );

其它原子操作如下:

Fetch-And-Add:一般用來對變量做+1的原子操作

Test-and-set:寫值到某個(gè)內(nèi)存位置并傳回其舊值

2.無鎖隊(duì)列方案

2.1.boost方案

boost提供了三種無鎖方案,分別適用不同使用場景。

boost::lockfree::queue是支持多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者線程的無鎖隊(duì)列。

boost::lockfree::stack是支持多個(gè)生產(chǎn)者和多個(gè)消費(fèi)者線程的無鎖棧。

boost::lockfree::spsc_queue是僅支持單個(gè)生產(chǎn)者和單個(gè)消費(fèi)者線程的無鎖隊(duì)列,比boost::lockfree::queue性能更好。

Boost無鎖數(shù)據(jù)結(jié)構(gòu)的API通過輕量級原子鎖實(shí)現(xiàn)lock-free,不是真正意義的無鎖。

Boost提供的queue可以設(shè)置初始容量,添加新元素時(shí)如果容量不夠,則總?cè)萘孔詣?dòng)增長;但對于無鎖數(shù)據(jù)結(jié)構(gòu),添加新元素時(shí)如果容量不夠,總?cè)萘坎粫?huì)自動(dòng)增長。

2.2.ConcurrentQueue

ConcurrentQueue是基于C++實(shí)現(xiàn)的工業(yè)級無鎖隊(duì)列方案。

GitHub:https://github.com/cameron314/concurrentqueue

ReaderWriterQueue是基于C++實(shí)現(xiàn)的單生產(chǎn)者單消費(fèi)者場景的無鎖隊(duì)列方案。

GitHub:https://github.com/cameron314/readerwriterqueue

2.3.Disruptor

Disruptor是英國外匯交易公司LMAX基于JAVA開發(fā)的一個(gè)高性能隊(duì)列。

GitHub:https://github.com/LMAX-Exchange/disruptor

3.無鎖隊(duì)列實(shí)現(xiàn)

3.1.環(huán)形緩沖區(qū)

RingBuffer是生產(chǎn)者和消費(fèi)者模型中常用的數(shù)據(jù)結(jié)構(gòu),生產(chǎn)者將數(shù)據(jù)追加到數(shù)組尾端,當(dāng)達(dá)到數(shù)組的尾部時(shí),生產(chǎn)者繞回到數(shù)組的頭部;消費(fèi)者從數(shù)組頭端取走數(shù)據(jù),當(dāng)?shù)竭_(dá)數(shù)組的尾部時(shí),消費(fèi)者繞回到數(shù)組頭部。

如果只有一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者,環(huán)形緩沖區(qū)可以無鎖訪問,環(huán)形緩沖區(qū)的寫入index只允許生產(chǎn)者訪問并修改,只要生產(chǎn)者在更新index前將新的值保存到緩沖區(qū)中,則消費(fèi)者將始終看到一致的數(shù)據(jù)結(jié)構(gòu);讀取index也只允許消費(fèi)者訪問并修改,消費(fèi)者只要在取走數(shù)據(jù)后更新讀index,則生產(chǎn)者將始終看到一致的數(shù)據(jù)結(jié)構(gòu)。

空隊(duì)列時(shí),front與rear相等;當(dāng)有元素進(jìn)隊(duì),則rear后移;有元素出隊(duì),則front后移。

空隊(duì)列時(shí),rear等于front;滿隊(duì)列時(shí),隊(duì)列尾部空一個(gè)位置,因此判斷循環(huán)隊(duì)列滿時(shí)使用(rear-front+maxn)%maxn。

入隊(duì)操作:

data[rear] = x;
 
rear = (rear+1)%maxn;

出隊(duì)操作:

x = data[front];
 
front = (front+1)%maxn;

3.2.單生產(chǎn)者單消費(fèi)者

對于單生產(chǎn)者和單消費(fèi)者場景,由于read_index和write_index都只會(huì)有一個(gè)線程寫,因此不需要加鎖也不需要原子操作,直接修改即可,但讀寫數(shù)據(jù)時(shí)需要考慮遇到數(shù)組尾部的情況。

線程對write_index和read_index的讀寫操作如下:

(1)寫操作。先判斷隊(duì)列時(shí)否為滿,如果隊(duì)列未滿,則先寫數(shù)據(jù),寫完數(shù)據(jù)后再修改write_index。

(2)讀操作。先判斷隊(duì)列是否為空,如果隊(duì)列不為空,則先讀數(shù)據(jù),讀完再修改read_index。

3.3.多生產(chǎn)者單消費(fèi)者

多生產(chǎn)者和單消費(fèi)者場景中,由于多個(gè)生產(chǎn)者都會(huì)修改write_index,所以在不加鎖的情況下必須使用原子操作。

3.4.RingBuffer實(shí)現(xiàn)

RingBuffer.hpp文件:

#pragma once
 
template <class T>
class RingBuffer
{
public:
    RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0)
    {
        m_data = new T[size];
    }
 
    ~RingBuffer()
    {
        delete [] m_data;
        m_data = NULL;
    }
 
    inline bool isEmpty() const
    {
        return m_front == m_rear;
    }
 
    inline bool isFull() const
    {
        return m_front == (m_rear + 1) % m_size;
    }
 
    bool push(const T& value)
    {
        if(isFull())
        {
            return false;
        }
        m_data[m_rear] = value;
        m_rear = (m_rear + 1) % m_size;
        return true;
    }
 
    bool push(const T* value)
    {
        if(isFull())
        {
            return false;
        }
        m_data[m_rear] = *value;
        m_rear = (m_rear + 1) % m_size;
        return true;
    }
 
    inline bool pop(T& value)
    {
        if(isEmpty())
        {
            return false;
        }
        value = m_data[m_front];
        m_front = (m_front + 1) % m_size;
        return true;
    }
 
    inline unsigned int front()const
    {
        return m_front;
    }
 
    inline unsigned int rear()const
    {
        return m_rear;
    }
 
    inline unsigned int size()const 
    {
        return m_size;
    }
private:
    unsigned int m_size;// 隊(duì)列長度
    int m_front;// 隊(duì)列頭部索引
    int m_rear;// 隊(duì)列尾部索引
    T* m_data;// 數(shù)據(jù)緩沖區(qū)
};

RingBufferTest.cpp測試代碼:

#include <stdio.h>
#include <thread>
#include <unistd.h>
#include <sys/time.h>
#include "RingBuffer.hpp"
 
 
class Test
{
public:
   Test(int id = 0, int value = 0)
   {
        this->id = id;
        this->value = value;
        sprintf(data, "id = %d, value = %d\n", this->id, this->value);
   }
 
   void display()
   {
      printf("%s", data);
   }
private:
   int id;
   int value;
   char data[128];
};
 
double getdetlatimeofday(struct timeval *begin, struct timeval *end)
{
    return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -
           (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
}
 
RingBuffer<Test> queue(1 << 12);2u000
 
#define N (10 * (1 << 20))
 
void produce()
{
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
        if(queue.push(Test(i % 1024, i)))
        {
           i++;
        }
    }
 
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}
 
void consume()
{
    sleep(1);
    Test test;
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
        if(queue.pop(test))
        {
           // test.display();
           i++;
        }
    }
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}
 
int main(int argc, char const *argv[])
{
    std::thread producer1(produce);
    std::thread consumer(consume);
    producer1.join();
    consumer.join();
    return 0;
}

編譯:

g++ --std=c++11  RingBufferTest.cpp -o test -pthread

單生產(chǎn)者單消費(fèi)者場景下,消息吞吐量為350萬條/秒左右。

3.5.LockFreeQueue實(shí)現(xiàn)

LockFreeQueue.hpp:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdbool.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/mman.h>
 
#define SHM_NAME_LEN 128
#define MIN(a, b) ((a) > (b) ? (b) : (a))
#define IS_POT(x) ((x) && !((x) & ((x)-1)))
#define MEMORY_BARRIER __sync_synchronize()
 
template <class T>
class LockFreeQueue
{
protected:
    typedef struct
    {
        int m_lock;
        inline void spinlock_init()
        {
            m_lock = 0;
        }
 
        inline void spinlock_lock()
        {
            while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {}
        }
 
        inline void spinlock_unlock()
        {
            __sync_lock_release(&m_lock);
        }
    } spinlock_t;
 
public:
    // size:隊(duì)列大小
    // name:共享內(nèi)存key的路徑名稱,默認(rèn)為NULL,使用數(shù)組作為底層緩沖區(qū)。
    LockFreeQueue(unsigned int size, const char* name = NULL)
    {
        memset(shm_name, 0, sizeof(shm_name));
        createQueue(name, size);
    }
 
    ~LockFreeQueue()
    {
        if(shm_name[0] == 0)
        {
            delete [] m_buffer;
            m_buffer = NULL;
        }
        else
        {
            if (munmap(m_buffer, m_size * sizeof(T)) == -1) {
                perror("munmap");
            }
            if (shm_unlink(shm_name) == -1) {
                perror("shm_unlink");
            }
        }
    }
 
    bool isFull()const
    {
#ifdef USE_POT
        return m_head == (m_tail + 1) & (m_size - 1);
#else
        return m_head == (m_tail + 1) % m_size;
#endif
    }
 
    bool isEmpty()const
    {
        return m_head == m_tail;
    }
 
    unsigned int front()const
    {
        return m_head;
    }
 
    unsigned int tail()const
    {
        return m_tail;
    }
 
    bool push(const T& value)
    {
#ifdef USE_LOCK
        m_spinLock.spinlock_lock();
#endif
        if(isFull())
        {
#ifdef USE_LOCK
            m_spinLock.spinlock_unlock();
#endif
            return false;
        }
        memcpy(m_buffer + m_tail, &value, sizeof(T));
#ifdef USE_MB
        MEMORY_BARRIER;
#endif
 
#ifdef USE_POT
        m_tail = (m_tail + 1) & (m_size - 1);
#else
        m_tail = (m_tail + 1) % m_size;
#endif
 
#ifdef USE_LOCK
        m_spinLock.spinlock_unlock();
#endif
        return true;
    }
 
    bool pop(T& value)
    {
#ifdef USE_LOCK
        m_spinLock.spinlock_lock();
#endif
        if (isEmpty())
        {
#ifdef USE_LOCK
            m_spinLock.spinlock_unlock();
#endif
            return false;
        }
        memcpy(&value, m_buffer + m_head, sizeof(T));
#ifdef USE_MB
        MEMORY_BARRIER;
#endif
 
#ifdef USE_POT
        m_head = (m_head + 1) & (m_size - 1);
#else
        m_head = (m_head + 1) % m_size;
#endif
 
#ifdef USE_LOCK
        m_spinLock.spinlock_unlock();
#endif
        return true;
    }
 
protected:
    virtual void createQueue(const char* name, unsigned int size)
    {
#ifdef USE_POT
        if (!IS_POT(size))
        {
            size = roundup_pow_of_two(size);
        }
#endif
        m_size = size;
        m_head = m_tail = 0;
        if(name == NULL)
        {
            m_buffer = new T[m_size];
        }
        else
        {
            int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
            if (shm_fd < 0)
            {
                perror("shm_open");
            }
 
            if (ftruncate(shm_fd, m_size * sizeof(T)) < 0)
            {
                perror("ftruncate");
                close(shm_fd);
            }
 
            void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
            if (addr == MAP_FAILED)
            {
                perror("mmap");
                close(shm_fd);
            }
            if (close(shm_fd) == -1)
            {
                perror("close");
                exit(1);
            }
 
            m_buffer = static_cast<T*>(addr);
            memcpy(shm_name, name, SHM_NAME_LEN - 1);
        }
#ifdef USE_LOCK
    spinlock_init(m_lock);
#endif
    }
    inline unsigned int roundup_pow_of_two(size_t size)
    {
        size |= size >> 1;
        size |= size >> 2;
        size |= size >> 4;
        size |= size >> 8;
        size |= size >> 16;
        size |= size >> 32;
        return size + 1;
    }
protected:
    char shm_name[SHM_NAME_LEN];
    volatile unsigned int m_head;
    volatile unsigned int m_tail;
    unsigned int m_size;
#ifdef USE_LOCK
    spinlock_t m_spinLock;
#endif
    T* m_buffer;
};

#define USE_LOCK

開啟spinlock鎖,多生產(chǎn)者多消費(fèi)者場景

#define USE_MB

開啟Memory Barrier

#define USE_POT

開啟隊(duì)列大小的2的冪對齊

LockFreeQueueTest.cpp測試文件:

#include "LockFreeQueue.hpp"
#include <thread>
 
//#define USE_LOCK
 
class Test
{
public:
   Test(int id = 0, int value = 0)
   {
        this->id = id;
        this->value = value;
        sprintf(data, "id = %d, value = %d\n", this->id, this->value);
   }
 
   void display()
   {
      printf("%s", data);
   }
private:
   int id;
   int value;
   char data[128];
};
 
double getdetlatimeofday(struct timeval *begin, struct timeval *end)
{
    return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -
           (begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
}
 
LockFreeQueue<Test> queue(1 << 10, "/shm");
 
#define N ((1 << 20))
 
void produce()
{
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
        if(queue.push(Test(i >> 10, i)))
            i++;
    }
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}
 
void consume()
{
    Test test;
    struct timeval begin, end;
    gettimeofday(&begin, NULL);
    unsigned int i = 0;
    while(i < N)
    {
        if(queue.pop(test))
        {
           //test.display();
           i++;
        }
    }
    gettimeofday(&end, NULL);
    double tm = getdetlatimeofday(&begin, &end);
    printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}
 
int main(int argc, char const *argv[])
{
    std::thread producer1(produce);
    //std::thread producer2(produce);
    std::thread consumer(consume);
    producer1.join();
    //producer2.join();
    consumer.join();
 
    return 0;
}

多線程場景下,需要定義USE_LOCK宏,開啟鎖保護(hù)。

編譯:

g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread

4.kfifo內(nèi)核隊(duì)列

4.1.kfifo內(nèi)核隊(duì)列簡介

kfifo是Linux內(nèi)核的一個(gè)FIFO數(shù)據(jù)結(jié)構(gòu),采用環(huán)形循環(huán)隊(duì)列的數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn),提供一個(gè)無邊界的字節(jié)流服務(wù),并且使用并行無鎖編程技術(shù),即單生產(chǎn)者單消費(fèi)者場景下兩個(gè)線程可以并發(fā)操作,不需要任何加鎖行為就可以保證kfifo線程安全。

4.2.kfifo內(nèi)核隊(duì)列實(shí)現(xiàn)

kfifo數(shù)據(jù)結(jié)構(gòu)定義如下:

struct kfifo
{
    unsigned char *buffer;
    unsigned int size;
    unsigned int in;
    unsigned int out;
    spinlock_t *lock;
};
 
// 創(chuàng)建隊(duì)列
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    struct kfifo *fifo;
    // 判斷是否為2的冪
    BUG_ON(!is_power_of_2(size));
    fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
    if (!fifo)
        return ERR_PTR(-ENOMEM);
    fifo->buffer = buffer;
    fifo->size = size;
    fifo->in = fifo->out = 0;
    fifo->lock = lock;
 
    return fifo;
}
 
// 分配空間
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    unsigned char *buffer;
    struct kfifo *ret;
    // 判斷是否為2的冪
    if (!is_power_of_2(size))
    {
        BUG_ON(size > 0x80000000);
        // 向上擴(kuò)展成2的冪
        size = roundup_pow_of_two(size);
    }
 
    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
        return ERR_PTR(-ENOMEM);
    ret = kfifo_init(buffer, size, gfp_mask, lock);
 
    if (IS_ERR(ret))
        kfree(buffer);
    return ret;
}
 
void kfifo_free(struct kfifo *fifo)
{
    kfree(fifo->buffer);
    kfree(fifo);
}
 
// 入隊(duì)操作
static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_put(fifo, buffer, len);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}
 
// 出隊(duì)操作
static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_get(fifo, buffer, len);
    //當(dāng)fifo->in == fifo->out時(shí),buufer為空
    if (fifo->in == fifo->out)
        fifo->in = fifo->out = 0;
 
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}
 
// 入隊(duì)操作
unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //buffer中空的長度
    len = min(len, fifo->size - fifo->in + fifo->out);
    // 內(nèi)存屏障:smp_mb(),smp_rmb(), smp_wmb()來保證對方觀察到的內(nèi)存操作順序
    smp_mb();
    // 將數(shù)據(jù)追加到隊(duì)列尾部
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
    memcpy(fifo->buffer, buffer + l, len - l);
 
    smp_wmb();
    //每次累加,到達(dá)最大值后溢出,自動(dòng)轉(zhuǎn)為0
    fifo->in += len;
    return len;
}
 
// 出隊(duì)操作
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    //有數(shù)據(jù)的緩沖區(qū)的長度
    len = min(len, fifo->in - fifo->out);
    smp_rmb();
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    memcpy(buffer + l, fifo->buffer, len - l);
    smp_mb();
    fifo->out += len; //每次累加,到達(dá)最大值后溢出,自動(dòng)轉(zhuǎn)為0
 
    return len;
}
 
static inline void __kfifo_reset(struct kfifo *fifo)
{
    fifo->in = fifo->out = 0;
}
 
static inline void kfifo_reset(struct kfifo *fifo)
{
    unsigned long flags;
    spin_lock_irqsave(fifo->lock, flags);
    __kfifo_reset(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
}
 
static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
    return fifo->in - fifo->out;
}
 
static inline unsigned int kfifo_len(struct kfifo *fifo)
{
    unsigned long flags;
    unsigned int ret;
    spin_lock_irqsave(fifo->lock, flags);
    ret = __kfifo_len(fifo);
    spin_unlock_irqrestore(fifo->lock, flags);
    return ret;
}

4.3.kfifo設(shè)計(jì)要點(diǎn)

(1)保證buffer size為2的冪

kfifo->size值在調(diào)用者傳遞參數(shù)size的基礎(chǔ)上向2的冪擴(kuò)展,目的是使kfifo->size取模運(yùn)算可以轉(zhuǎn)化為位與運(yùn)算(提高運(yùn)行效率)。kfifo->in % kfifo->size轉(zhuǎn)化為 kfifo->in & (kfifo->size – 1)

保證size是2的冪可以通過位運(yùn)算的方式求余,在頻繁操作隊(duì)列的情況下可以大大提高效率。

(2)使用spin_lock_irqsave與spin_unlock_irqrestore 實(shí)現(xiàn)同步。

Linux內(nèi)核中有spin_lock、spin_lock_irq和spin_lock_irqsave保證同步。

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
 
static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
    local_irq_disable();
    preempt_disable();
    spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

spin_lock比spin_lock_irq速度快,但并不是線程安全的。spin_lock_irq增加調(diào)用local_irq_disable函數(shù),即禁止本地中斷,是線程安全的,既禁止本地中斷,又禁止內(nèi)核搶占。

spin_lock_irqsave是基于spin_lock_irq實(shí)現(xiàn)的一個(gè)輔助接口,在進(jìn)入和離開臨界區(qū)后,不會(huì)改變中斷的開啟、關(guān)閉狀態(tài)。

如果自旋鎖在中斷處理函數(shù)中被用到,在獲取自旋鎖前需要關(guān)閉本地中斷,spin_lock_irqsave實(shí)現(xiàn)如下:

A、保存本地中斷狀態(tài);

B、關(guān)閉本地中斷;

C、獲取自旋鎖。

解鎖時(shí)通過 spin_unlock_irqrestore完成釋放鎖、恢復(fù)本地中斷到原來狀態(tài)等工作。

(3)線性代碼結(jié)構(gòu)

代碼中沒有任何if-else分支來判斷是否有足夠的空間存放數(shù)據(jù),kfifo每次入隊(duì)或出隊(duì)只是簡單的 +len 判斷剩余空間,并沒有對kfifo->size 進(jìn)行取模運(yùn)算,所以kfifo->in和kfifo->out總是一直增大,直到unsigned in超過最大值時(shí)繞回到0這一起始端,但始終滿足:kfifo->in - kfifo->out <= kfifo->size。

(4)使用Memory Barrier

mb():適用于多處理器和單處理器的內(nèi)存屏障。

rmb():適用于多處理器和單處理器的讀內(nèi)存屏障。

wmb():適用于多處理器和單處理器的寫內(nèi)存屏障。

smp_mb():適用于多處理器的內(nèi)存屏障。

smp_rmb():適用于多處理器的讀內(nèi)存屏障。

smp_wmb():適用于多處理器的寫內(nèi)存屏障。

Memory Barrier使用場景如下:

A、實(shí)現(xiàn)同步原語(synchronization primitives)

B、實(shí)現(xiàn)無鎖數(shù)據(jù)結(jié)構(gòu)(lock-free data structures)

C、驅(qū)動(dòng)程序

程序在運(yùn)行時(shí)內(nèi)存實(shí)際訪問順序和程序代碼編寫的訪問順序不一定一致,即內(nèi)存亂序訪問。內(nèi)存亂序訪問行為出現(xiàn)是為了提升程序運(yùn)行時(shí)的性能。內(nèi)存亂序訪問主要發(fā)生在兩個(gè)階段:

A、編譯時(shí),編譯器優(yōu)化導(dǎo)致內(nèi)存亂序訪問(指令重排)。

B、運(yùn)行時(shí),多CPU間交互引起內(nèi)存亂序訪問。

Memory Barrier能夠讓CPU或編譯器在內(nèi)存訪問上有序。Memory barrier前的內(nèi)存訪問操作必定先于其后的完成。Memory Barrier包括兩類:

A、編譯器Memory Barrier。

B、CPU Memory Barrier。

通常,編譯器和CPU引起內(nèi)存亂序訪問不會(huì)帶來問題,但如果程序邏輯的正確性依賴于內(nèi)存訪問順序,內(nèi)存亂序訪問會(huì)帶來邏輯上的錯(cuò)誤。

在編譯時(shí),編譯器對代碼做出優(yōu)化時(shí)可能改變實(shí)際執(zhí)行指令的順序(如GCC的O2或O3都會(huì)改變實(shí)際執(zhí)行指令的順序)。

在運(yùn)行時(shí),CPU雖然會(huì)亂序執(zhí)行指令,但在單個(gè)CPU上,硬件能夠保證程序執(zhí)行時(shí)所有的內(nèi)存訪問操作都是按程序代碼編寫的順序執(zhí)行的,Memory Barrier沒有必要使用(不考慮編譯器優(yōu)化)。為了更快執(zhí)行指令,CPU采取流水線的執(zhí)行方式,編譯器在編譯代碼時(shí)為了使指令更適合CPU的流水線執(zhí)行方式以及多CPU執(zhí)行,原本指令就會(huì)出現(xiàn)亂序的情況。在亂序執(zhí)行時(shí),CPU真正執(zhí)行指令的順序由可用的輸入數(shù)據(jù)決定,而非程序員編寫的順序。

到此這篇關(guān)于C++無鎖隊(duì)列的原理與實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)C++ 無鎖隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • VS2022連接sqlserver數(shù)據(jù)庫教程

    VS2022連接sqlserver數(shù)據(jù)庫教程

    本文主要介紹了VS2022連接sqlserver數(shù)據(jù)庫教程,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-07-07
  • 用C語言編寫推箱子游戲

    用C語言編寫推箱子游戲

    這篇文章主要為大家詳細(xì)介紹了用C語言編寫推箱子游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-10-10
  • C語言實(shí)現(xiàn)音樂播放器的示例代碼

    C語言實(shí)現(xiàn)音樂播放器的示例代碼

    這篇文章主要和大家分享了一個(gè)C語言的小DEMO,可以實(shí)現(xiàn)音樂播放器功能,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,需要的可以參考一下
    2023-02-02
  • 深入理解C++中常見的關(guān)鍵字含義

    深入理解C++中常見的關(guān)鍵字含義

    本篇文章是對C++中常見關(guān)鍵字的含義進(jìn)行了詳細(xì)的分析介紹,需要的朋友參考下
    2013-05-05
  • 使用OpenGL創(chuàng)建窗口的示例詳解

    使用OpenGL創(chuàng)建窗口的示例詳解

    OpenGL,也就是Open?Graphics?Library。其主要就是用于我們?nèi)ヤ秩?D、3D矢量圖形的一種跨語言、跨平臺(tái)的應(yīng)用程序編程接口,這篇文章主要介紹了使用OpenGL創(chuàng)建窗口,需要的朋友可以參考下
    2022-04-04
  • C++ 重載與重寫的區(qū)別與實(shí)現(xiàn)

    C++ 重載與重寫的區(qū)別與實(shí)現(xiàn)

    在面向?qū)ο笳Z言中,經(jīng)常提到重載與重寫,本文主要介紹了C++ 重載與重寫的區(qū)別與實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-01-01
  • C++實(shí)現(xiàn)LeetCode(162.求數(shù)組的局部峰值)

    C++實(shí)現(xiàn)LeetCode(162.求數(shù)組的局部峰值)

    這篇文章主要介紹了C++實(shí)現(xiàn)LeetCode(162.求數(shù)組的局部峰值),本篇文章通過簡要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下
    2021-07-07
  • 一個(gè)win32窗口創(chuàng)建示例

    一個(gè)win32窗口創(chuàng)建示例

    這篇文章主要介紹了一個(gè)win32窗口創(chuàng)建示例,需要的朋友可以參考下
    2014-04-04
  • VC讀配置文件實(shí)例

    VC讀配置文件實(shí)例

    這篇文章主要介紹了VC讀配置文件的方法,實(shí)例講述了VC針對文件操作的技巧,需要的朋友可以參考下
    2014-10-10
  • C語言報(bào)錯(cuò)Use of Uninitialized Variable的原因及解決方案

    C語言報(bào)錯(cuò)Use of Uninitialized Variable的原因及解決方案

    Use of Uninitialized Variable是C語言中常見且危險(xiǎn)的錯(cuò)誤之一,它通常在程序試圖使用一個(gè)未初始化的變量時(shí)發(fā)生,本文將詳細(xì)介紹Use of Uninitialized Variable的產(chǎn)生原因,提供多種解決方案,并通過實(shí)例代碼演示如何有效避免和解決此類錯(cuò)誤,需要的朋友可以參考下
    2024-06-06

最新評論