欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C++ 第三方庫 RabbitMq示例詳解

 更新時間:2025年04月28日 11:12:57   作者:DieSnowK  
這篇文章主要介紹了C++ 第三方庫 RabbitMq示例詳解,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧

1.介紹

RabbitMQ消息隊列組件,實現(xiàn)兩個客戶端主機之間消息傳輸?shù)墓δ?發(fā)布&訂閱)

  • 核心概念:交換機、隊列、綁定、消息
  • 交換機類型
    • 廣播交換:當交換機收到消息,則將消息發(fā)布到所有綁定的隊列中
    • 直接交換:根據(jù)消息中的bkey與綁定的rkey對比,一致則放入隊列
    • 主題交換:使用bkey與綁定的rkey進行規(guī)則匹配,成功則放入隊列

2.安裝

1.RabbitMq

  • 安裝:sudo apt install rabbitmq-server
  • 簡單使用:
# 安裝完成的時候默認有個用戶guest,但是權(quán)限不夠,要創(chuàng)建一個administrator用戶,才可以做為遠程登錄和發(fā)表訂閱消息
#添加用戶 
sudo rabbitmqctl add_user root <PASSWORD>
#設置用戶tag 
sudo rabbitmqctl set_user_tags root administrator 
#設置用戶權(quán)限 
sudo rabbitmqctl set_permissions -p / root "." "." ".*" 
# RabbitMQ自帶了web管理界面, 執(zhí)行下面命令開啟, 默認端口15672
sudo rabbitmq-plugins enable rabbitmq_management 

2.客戶端庫

C語言庫

C++庫

sudo apt install libev-dev #libev 網(wǎng)絡庫組件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make
make install

如果安裝時出現(xiàn)以下報錯,則表示ssl版本出現(xiàn)問題

/usr/include/openssl/macros.h:147:4: error: #error 
"OPENSSL_API_COMPAT expresses an impossible API compatibility 
level" 
  147 | #  error "OPENSSL_API_COMPAT expresses an impossible API 
compatibility level" 
      |    ^~~~~ 
In file included from /usr/include/openssl/ssl.h:18, 
                 from linux_tcp/openssl.h:20, 
                 from linux_tcp/openssl.cpp:12: 
/usr/include/openssl/bio.h:687:1: error: expected constructor, 
destructor, or type conversion before ‘DEPRECATEDIN_1_1_0' 
  687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, 
unsigned short *port_ptr))

解決方案:卸載當前的ssl庫,重新進行修復安裝

dpkg -l | grep ssl
sudo dpkg -P --force-all libevent-openssl-2.1-7
sudo dpkg -P --force-all openssl
sudo dpkg -P --force-all libssl-dev
sudo apt --fix-broken install

3.AMQP-CPP

簡單使用

1.介紹

  • AMQP-CPP是用于與RabbitMq消息中間件通信的C++庫
    • 它能解析從RabbitMq服務發(fā)送來的數(shù)據(jù),也可以生成發(fā)向RabbitMq的數(shù)據(jù)包
    • AMQP-CPP庫不會向RabbitMq建立網(wǎng)絡連接,所有的網(wǎng)絡IO由用戶完成
  • AMQP-CPP提供了可選的網(wǎng)絡層接口,它預定義了TCP模塊,用戶就不用自己實現(xiàn)網(wǎng)絡IO,
    • 也可以選擇libevent、libev、libuv、asio等異步通信組件, 需要手動安裝對應的組件
  • AMQP-CPP完全異步,沒有阻塞式的系統(tǒng)調(diào)用,不使用線程就能夠應用在高性能應用中
  • 注意:它需要C++17的支持

2.使用

  • AMQP-CPP的使用有兩種模式:
    • 使用默認的TCP模塊進行網(wǎng)絡通信
    • 使用擴展的libevent、libev、libuv、asio異步通信組件進行通信
  • 此處以libev為例,不需要自己實現(xiàn)monitor函數(shù),可以直接使用AMQP::LibEvHandler

4.類與接口

1.Channel

  • channel是一個虛擬連接,一個連接上可以建立多個通道
    • 并且所有的RabbitMq指令都是通過channel傳輸
  • 所以連接建立后的第一步,就是建立channel
    • 因為所有操作是異步的,所以在channel上執(zhí)行指令的返回值并不能作為操作執(zhí)行結(jié)果
  • 實際上它返回的是Deferred類,可以使用它安裝處理函數(shù)
namespace AMQP 
{ 
    /** 
     *  Generic callbacks that are used by many deferred objects 
     */ 
    using SuccessCallback = std::function<void()>; 
    using ErrorCallback = std::function<void(const char *message)>; 
    using FinalizeCallback = std::function<void()>;
    /** 
     *  Declaring and deleting a queue 
     */ 
    using QueueCallback = std::function<void(const std::string &name, 
                                                   uint32_t messagecount, 
                                                   uint32_t consumercount)>;
    using DeleteCallback = std::function<void(uint32_t deletedmessages)>; 
    using MessageCallback = std::function<void(const Message &message, 
                                                   uint64_t deliveryTag, 
                                                   bool redelivered)>; 
    // 當使用發(fā)布者確認時,當服務器確認消息已被接收和處理時,將調(diào)用AckCallback 
    using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;
    // 使用確認包裹通道時,當消息被ack/nacked時,會調(diào)用這些回調(diào) 
    using PublishAckCallback = std::function<void()>; 
    using PublishNackCallback = std::function<void()>; 
    using PublishLostCallback = std::function<void()>; 
	// 信道類
    class Channel 
    { 
        Channel(Connection *connection); 
        bool connected();
        /** 
            *聲明交換機 
            *如果提供了一個空名稱,則服務器將分配一個名稱。 
            *以下flags可用于交換機: 
            * 
            *-durable     持久化,重啟后交換機依然有效 
            *-autodelete  刪除所有連接的隊列后,自動刪除交換 
            *-passive     僅被動檢查交換機是否存在 
            *-internal    創(chuàng)建內(nèi)部交換 
            * 
            *@param name    交換機的名稱 
            *@param-type    交換類型 
                enum ExchangeType 
                { 
                    fanout,  廣播交換,綁定的隊列都能拿到消息 
                    direct,  直接交換,只將消息交給routingkey一致的隊列 
                    topic,   主題交換,將消息交給符合bindingkey規(guī)則的隊列 
                    headers, 
                    consistent_hash, 
                    message_deduplication 
                }; 
            *@param flags    交換機標志 
            *@param arguments其他參數(shù) 
            * 
            *此函數(shù)返回一個延遲處理程序??梢园惭b回調(diào) 
            using onSuccess(), onError() and onFinalize() methods. 
        */ 
        Deferred &declareExchange(const std::string_view &name,
                                  ExchangeType type,
                                  int flags,
                                  const Table &arguments);
        /** 
            *聲明隊列 
            *如果不提供名稱,服務器將分配一個名稱。 
            *flags可以是以下值的組合: 
            * 
            *-durable 持久隊列在代理重新啟動后仍然有效 
            *-autodelete 當所有連接的使用者都離開時,自動刪除隊列 
            *-passive 僅被動檢查隊列是否存在 
            *-exclusive 隊列僅存在于此連接,并且在連接斷開時自動刪除 
            * 
            *@param name        隊列的名稱 
            *@param flags       標志組合 
            *@param arguments  可選參數(shù) 
            * 
            *此函數(shù)返回一個延遲處理程序??梢园惭b回調(diào) 
            *使用onSuccess()、onError()和onFinalize()方法。 
            * 
            Deferred &onError(const char *message) 
            * 
            *可以安裝的onSuccess()回調(diào)應該具有以下簽名: 
            void myCallback(const std::string &name,  
                uint32_t messageCount,  
                uint32_t consumerCount); 
            例如: 
            channel.declareQueue("myqueue").onSuccess( 
                [](const std::string &name,  
                    uint32_t messageCount, 
                    uint32_t consumerCount) { 
                       std::cout << "Queue '" << name << "' "; 
                       std::cout << "has been declared with "; 
                       std::cout << messageCount; 
                       std::cout << " messages and "; 
                       std::cout << consumerCount; 
                       std::cout << " consumers" << std::endl; 
         *  }); 
        */ 
        DeferredQueue &declareQueue(const std::string_view &name,
                                    int flags,
                                    const Table &arguments);
        /** 
            *將隊列綁定到交換機 
            * 
            *@param exchange     源交換機 
            *@param queue        目標隊列 
            *@param routingkey   路由密鑰 
            *@param arguments    其他綁定參數(shù) 
            * 
            *此函數(shù)返回一個延遲處理程序??梢园惭b回調(diào) 
            *使用onSuccess()、onError()和onFinalize()方法。 
        */ 
        Deferred &bindQueue(const std::string_view &exchange,
                            const std::string_view &queue,
                            const std::string_view &routingkey,
                            const Table &arguments);
        /** 
            *將消息發(fā)布到exchange
            *您必須提供交換機的名稱和路由密鑰。 
            然后,RabbitMQ將嘗試將消息發(fā)送到一個或多個隊列。 
            使用可選的flags參數(shù),可以指定如果消息無法路由到隊列時應該發(fā)生的情況。
            默認情況下,不可更改的消息將被靜默地丟棄。 
            *  
            *如果設置了'mandatory'或'immediate'標志, 
            則無法處理的消息將返回到應用程序。 
            在開始發(fā)布之前,請確保您已經(jīng)調(diào)用了recall()-方法, 
            并設置了所有適當?shù)奶幚沓绦騺硖幚磉@些返回的消息。 
            *  
            *可以提供以下flags: 
            *  
            *-mandatory 如果設置,服務器將返回未發(fā)送到隊列的消息 
            *-immediate 如果設置,服務器將返回無法立即轉(zhuǎn)發(fā)給使用者的消息。 
            *@param exchange要發(fā)布到的交易所 
            *@param routingkey路由密鑰 
            *@param envelope要發(fā)送的完整信封 
            *@param message要發(fā)送的消息 
            *@param size消息的大小 
            *@param flags可選標志 
        */ 
        bool publish(const std::string_view &exchange,
                     const std::string_view &routingKey,
                     const std::string &message,
                     int flags = 0);
        /** 
            *告訴RabbitMQ服務器已準備好使用消息-也就是 訂閱隊列消息 
            * 
            *調(diào)用此方法后,RabbitMQ開始向客戶端應用程序傳遞消息。 
            consumer tag是一個字符串標識符, 
            如果您以后想通過channel::cancel()調(diào)用停止它, 
            可以使用它來標識使用者。 
            *如果您沒有指定使用者tag,服務器將為您分配一個。 
            * 
            *支持以下flags: 
            * 
            *-nolocal    如果設置了,則不會同時消耗在此通道上發(fā)布的消息 
            *-noack      如果設置了,則不必對已消費的消息進行確認 
            *-exclusive  請求獨占訪問,只有此使用者可以訪問隊列 
            * 
            *@param queue    您要使用的隊列 
            *@param tag      將與此消費操作關(guān)聯(lián)的消費者標記 
            *@param flags    其他標記 
            *@param arguments其他參數(shù) 
            * 
            *此函數(shù)返回一個延遲處理程序。 
            可以使用onSuccess()、onError()和onFinalize()方法安裝回調(diào)
            可以安裝的onSuccess()回調(diào)應該具有以下格式: 
                void myCallback(const std::string_view&tag); 
            樣例: 
            channel.consume("myqueue").onSuccess( 
                [](const std::string_view& tag) { 
                    std::cout << "Started consuming under tag "; 
                    std::cout << tag << std::endl; 
            }); 
        */ 
        DeferredConsumer &consume(const std::string_view &queue,
                                  const std::string_view &tag,
                                  int flags,
                                  const Table &arguments);
        /** 
            *確認接收到的消息 
            *
            *消費者客戶端對收到的消息進行確認應答
            *
            *當在DeferredConsumer::onReceived()方法中接收到消息時, 
            必須確認該消息, 
            以便RabbitMQ將其從隊列中刪除(除非使用noack選項消費)
            * 
            *支持以下標志: 
            * 
            *-多條確認多條消息:之前傳遞的所有未確認消息也會得到確認 
            * 
            *@param deliveryTag    消息的唯一delivery標簽 
            *@param flags          可選標志 
            *@return bool 
        */ 
        bool ack(uint64_t deliveryTag, int flags=0);
    };
    class DeferredConsumer 
    { 
        /* 
            注冊一個回調(diào)函數(shù),該函數(shù)在消費者啟動時被調(diào)用
            void onSuccess(const std::string &consumertag) 
        */ 
        DeferredConsumer &onSuccess(const ConsumeCallback& callback);
        /* 
            注冊回調(diào)函數(shù),用于接收到一個完整消息的時候被調(diào)用 
            void MessageCallback(const AMQP::Message &message,  
                uint64_t deliveryTag, bool redelivered) 
        */ 
        DeferredConsumer &onReceived(const MessageCallback& callback);
        /* Alias for onReceived() */ 
        DeferredConsumer &onMessage(const MessageCallback& callback);
        /* 
            注冊要在服務器取消消費者時調(diào)用的函數(shù) 
            void CancelCallback(const std::string &tag) 
        */ 
        DeferredConsumer &onCancelled(const CancelCallback& callback);
    };
    class Message : public Envelope
    { 
        const std::string &exchange();
        const std::string &routingkey();
    };
    class Envelope : public MetaData
    { 
        const char *body();  // 獲取消息正文
        uint64_t bodySize(); // 獲取消息正文大小
    };
}

2.ev

typedef struct ev_async 
{ 
    EV_WATCHER (ev_async);
    EV_ATOMIC_T sent; /* private */ 
}ev_async; 
//break type 
enum 
{ 
    EVBREAK_CANCEL = 0, /* undo unloop */ 
    EVBREAK_ONE    = 1, /* unloop once */ 
    EVBREAK_ALL    = 2  /* unloop all loops */ 
}; 
// 實例化并獲取IO事件監(jiān)控接口句柄
struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0));
# define EV_DEFAULT  ev_default_loop (0) 
// 開始運行IO事件監(jiān)控, 這是一個阻塞接口
int  ev_run (struct ev_loop *loop);
/* break out of the loop */
// 結(jié)束IO監(jiān)控
// 如果在主線程進行ev_run(), 則可以直接調(diào)用,
// 如果在其他線程中進行ev_run(), 需要通過異步通知進行
void ev_break (struct ev_loop *loop, int32_t break_type) ;  
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
// 初始化異步事件結(jié)構(gòu), 并設置回調(diào)函數(shù)
void ev_async_init(ev_async *w, callback cb);
// 啟動事件監(jiān)控循環(huán)中的異步任務處理
void ev_async_start(struct ev_loop *loop, ev_async *w); 
// 發(fā)送當前異步事件到異步線程中執(zhí)行
void ev_async_send(struct ev_loop *loop, ev_async *w);

5.使用

1.publish.cc

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main()
{
    // 1.實例化底層網(wǎng)絡通信框架的IO事件監(jiān)控句柄
    auto *loop = EV_DEFAULT;
    // 2.實例化libEvHandler句柄 -> 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來
    AMQP::LibEvHandler handler(loop);
    // 3.實例化連接對象
    AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
    AMQP::TcpConnection connection(&handler, address);
    // 4.實例化信道對象
    AMQP::TcpChannel channel(&connection);
    // 5.聲明交換機
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
        .onError([](const char *message)
                { std::cout << "聲明交換機失敗: " << message << std::endl; })
        .onSuccess([]()
                { std::cout << "test-exchange 交換機創(chuàng)建成功" << std::endl; });
    // 6.聲明隊列
    channel.declareQueue("test-queue")
        .onError([](const char *message)
                 { std::cout << "聲明隊列失敗: " << message << std::endl; })
        .onSuccess([]()
                 { std::cout << "test-queue 隊列創(chuàng)建成功" << std::endl; });
    // 7.針對交換機和隊列進行綁定
    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
        .onError([](const char *message)
                 { std::cout << "test-exchange - test-queue 綁定失敗: " \
                 << message << std::endl; })
        .onSuccess([]()
                 { std::cout << "test-exchange - test-queue 綁定成功" 
                 << std::endl; });
    // 8.向交換機發(fā)布消息
    for (int i = 0; i < 5; ++i)
    {
        std::string msg = "Hello SnowK-" + std::to_string(i);
        if(channel.publish("test-exchange", "test-queue-key", msg) == false)
        {
            std::cout << "publish 失敗" << std::endl;
        }
    }
    // 9.啟動底層網(wǎng)絡通信框架 -> 開啟IO
    ev_run(loop, 0);
    return 0;
}

2.consume.cc

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
void MessageCB(AMQP::TcpChannel* channel, const AMQP::Message& message, 
               uint64_t deliveryTag, bool redelivered)
{
    std::string msg;
    msg.assign(message.body(), message.bodySize());
    // 不能這樣使用, AMQP::Message后面沒有存'\0'
    // std::cout << message << std::endl 
    std::cout << msg << std::endl;
    channel->ack(deliveryTag);
}
int main()
{
    // 1.實例化底層網(wǎng)絡通信框架的IO事件監(jiān)控句柄
    auto *loop = EV_DEFAULT;
    // 2.實例化libEvHandler句柄 -> 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來
    AMQP::LibEvHandler handler(loop);
    // 3.實例化連接對象
    AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/");
    AMQP::TcpConnection connection(&handler, address);
    // 4.實例化信道對象
    AMQP::TcpChannel channel(&connection);
    // 5.聲明交換機
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
        .onError([](const char *message)
                 { std::cout << "聲明交換機失敗: " << message << std::endl; })
        .onSuccess([]()
                 { std::cout << "test-exchange 交換機創(chuàng)建成功" << std::endl; });
    // 6.聲明隊列
    channel.declareQueue("test-queue")
        .onError([](const char *message)
                 { std::cout << "聲明隊列失敗: " << message << std::endl; })
        .onSuccess([]()
                 { std::cout << "test-queue 隊列創(chuàng)建成功" << std::endl; });
    // 7.針對交換機和隊列進行綁定
    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
        .onError([](const char *message)
                 { std::cout << "test-exchange - test-queue 綁定失敗: " \
				 << message << std::endl; })
        .onSuccess([]()
                 { std::cout << "test-exchange - test-queue 綁定成功"; });
    // 8.訂閱消息對壘 -> 設置消息處理回調(diào)函數(shù)
    auto callback = std::bind(MessageCB, &channel, std::placeholders::_1, 
                              std::placeholders::_2, std::placeholders::_3);
    channel.consume("test-queue", "consume-tag")
        .onReceived(callback)
        .onError([](const char *message)
        { 
            std::cout << "訂閱 test-queue 隊列消息失敗: " << message << std::endl;
            exit(0); 
        });
    // 9.啟動底層網(wǎng)絡通信框架 -> 開啟IO
    ev_run(loop, 0);
    return 0;
}

3.makefile

all: publish consume
publish: publish.cc
	g++ -o $@ $^ -lamqpcpp -lev -std=c++17
consume: consume.cc
	g++ -o $@ $^ -lamqpcpp -lev -std=c++17
.PHONY:clean
clean:
	rm publish consume

到此這篇關(guān)于C++ 第三方庫 RabbitMq詳細講解的文章就介紹到這了,更多相關(guān)C++ 第三方庫 RabbitMq內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:

相關(guān)文章

最新評論