欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C++  RabbitMq消息隊(duì)列組件詳解

 更新時(shí)間:2025年05月29日 15:38:58   作者:LuckyRich1  
這篇文章主要介紹了C++ RabbitMq消息隊(duì)列組件的相關(guān)知識,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧

1. RabbitMq介紹

RabbitMq - 消息隊(duì)列組件:實(shí)現(xiàn)兩個(gè)客戶端主機(jī)之間消息傳輸?shù)墓δ埽òl(fā)布&訂閱)。

一端發(fā)布消息,一端訂閱消息,消息就會被推送到訂閱消息那一端然后進(jìn)行處理。

RabbitMq遵守AMQP協(xié)議(標(biāo)準(zhǔn)的高級消息隊(duì)列協(xié)議)

AMQP協(xié)議核心概念:交換機(jī)(交換機(jī)類型)、隊(duì)列,綁定,消息。

兩個(gè)客戶端之間進(jìn)行消息傳輸,一端產(chǎn)生消息另一端接收消息然后處理。按照以前的思想就是兩個(gè)客戶端直接進(jìn)行網(wǎng)絡(luò)通信socket,通過網(wǎng)絡(luò)消息將一條消息發(fā)送給對方讓對方進(jìn)行處理,這是一種最基礎(chǔ)數(shù)據(jù)傳輸過程。

但是這種消息傳輸是存在缺陷的!如果有一端連接斷開了,那另一端消息到底還發(fā)不發(fā),是等,還是將這條消息丟棄掉。如果一直等,新產(chǎn)生的消息又該怎么辦,總不能一直存著。所以這種安全性是很低的。而且一對一這種客戶端里面,通常數(shù)據(jù)的產(chǎn)生和數(shù)據(jù)的處理所消耗的時(shí)間是不成正比的。通常消息的處理消耗時(shí)間更多。

基于兩端消息進(jìn)行安全傳輸?shù)男枨?,所以高級消息?duì)列組件就產(chǎn)生了。兩端不直接進(jìn)行消息傳輸了。而是通過消息隊(duì)列服務(wù)器來進(jìn)行一個(gè)中間的數(shù)據(jù)轉(zhuǎn)發(fā)功能。發(fā)布消息客戶端將信息發(fā)布到服務(wù)器上,服務(wù)器在將這條消息推送給訂閱消息隊(duì)列客戶端讓它來進(jìn)行處理。

但是針對一個(gè)高級消息隊(duì)列設(shè)計(jì)的話,單純一個(gè)只是做中間數(shù)據(jù)轉(zhuǎn)發(fā)其實(shí)是不夠的。我們希望它能在做中間數(shù)據(jù)轉(zhuǎn)發(fā)更加靈活,在不同場景提供不同的功能。這個(gè)時(shí)候就有了AMQP的核心概念(交換機(jī)、隊(duì)列、綁定、消息)。

消息隊(duì)列服務(wù)器里面首先有一個(gè)交換機(jī),它是用來處理數(shù)據(jù)轉(zhuǎn)發(fā)邏輯功能模塊。然后還有隊(duì)列。訂閱客戶端連接服務(wù)器告訴服務(wù)器訂閱那個(gè)隊(duì)列。發(fā)布客戶端進(jìn)行消息發(fā)布并不是直接把消息發(fā)布到某個(gè)隊(duì)列中,而是把信息發(fā)布到交換機(jī),由交換機(jī)來決定把這條消息放到那個(gè)隊(duì)列。決定了這條消息推送到那個(gè)訂閱客戶端哪里去進(jìn)行處理。

交換機(jī)該把消息放到那一個(gè)隊(duì)列中呢?這個(gè)時(shí)候就有了不同的交換機(jī)類型:

廣播交換:當(dāng)交換機(jī)收到消息,則將消息發(fā)布到所有綁定的隊(duì)列中

交換機(jī)和隊(duì)列都創(chuàng)建好了之后,會把交換機(jī)和隊(duì)列進(jìn)行關(guān)系綁定,也就是交換機(jī)和隊(duì)列建立一個(gè)關(guān)聯(lián)關(guān)系。而且會設(shè)置一個(gè)routing key(路由密鑰:一定規(guī)則的字符串)用來標(biāo)識這是一個(gè)放置什么類型消息的隊(duì)列。

直接交換:根據(jù)消息中的binding_key與綁定的routing_key對比,一致則放到隊(duì)列中

主題交換:使用binding_key與綁定的routing_key進(jìn)行規(guī)則匹配,成功則放入隊(duì)列

2. 安裝RabbitMQ

sudo apt install rabbitmq-server
# 啟動服務(wù)
sudo systemctl start rabbitmq-server.service
# 查看服務(wù)狀態(tài)
sudo systemctl status rabbitmq-server.service
# 安裝完成的時(shí)候默認(rèn)有個(gè)用戶 guest ,但是權(quán)限不夠,要?jiǎng)?chuàng)建一個(gè)
# administrator 用戶,才可以做為遠(yuǎn)程登錄和發(fā)表訂閱消息:
 #添加用戶
sudo rabbitmqctl add_user root 123456
 #設(shè)置用戶 tag
sudo rabbitmqctl set_user_tags root administrator
 #設(shè)置用戶權(quán)限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自帶了 web 管理界面,執(zhí)行下面命令開啟
sudo rabbitmq-plugins enable rabbitmq_management

訪問 webUI 界面, 默認(rèn)端口為 15672

至此RabbitMQ安裝成功。

3. 安裝 RabbitMQ 的 C++客戶端庫

我們這里使用 AMQP-CPP 庫來編寫客戶端程序。

先安裝libev網(wǎng)絡(luò)通信庫。在搭建RabbitMQ客戶端的時(shí)候需要進(jìn)行一個(gè)網(wǎng)絡(luò)通信的事件監(jiān)控。事件監(jiān)控我們可以自己寫poll,epoll但是太麻煩了。這里我們使用第三方網(wǎng)絡(luò)通信框架。RabbitMQ對libevent、libev等等這些都支持。這里我們選擇的是libvev。

sudo apt install libev-dev #libev 網(wǎng)絡(luò)庫組件
git clone https://gitee.com/iOceanPlus_Forked/AMQP-CPP.git
cd AMQP-CPP/
make 
make install

至此可以通過 AMQP-CPP 來操作 rabbitmq

4. AMQP-CPP 庫的簡單使用

AMQP-CPP 是用于與 RabbitMq 消息中間件通信的 c++庫。它能解析從 RabbitMq
服務(wù)發(fā)送來的數(shù)據(jù),也可以生成發(fā)向 RabbitMq 的數(shù)據(jù)包。AMQP-CPP 庫不會向
RabbitMq 建立網(wǎng)絡(luò)連接,所有的網(wǎng)絡(luò)I/O由用戶完成。

  • 當(dāng)然,AMQP-CPP 提供了可選的網(wǎng)絡(luò)層接口,它預(yù)定義了 TCP 模塊,用戶就不用自己實(shí)現(xiàn)網(wǎng)絡(luò)IO,我們也可以選擇 libevent、libev、libuv、asio 等異步通信組件,需要手動安裝對應(yīng)的組件。
  • AMQP-CPP 完全異步,沒有阻塞式的系統(tǒng)調(diào)用,不使用線程就能夠應(yīng)用在高性能
  • 應(yīng)用中。
  • 注意:它需要 c++17 的支持

4.1 使用

AMQP-CPP 的使用有兩種模式:

  • 使用默認(rèn)的 TCP 模塊進(jìn)行網(wǎng)絡(luò)通信
  • 使用擴(kuò)展的 libevent、libev、libuv、asio 異步通信組件進(jìn)行通信

4.1.1 TCP 模式

  • 實(shí)現(xiàn)一個(gè)類繼承自 AMQP::TcpHandler 類, 它負(fù)責(zé)網(wǎng)絡(luò)層的 TCP 連接
  • 重寫相關(guān)函數(shù), 其中必須重寫 monitor 函數(shù)
  • 在 monitor 函數(shù)中需要實(shí)現(xiàn)的是將 fd 放入 eventloop(select、epoll)中監(jiān)控, 當(dāng) fd可寫可讀就緒之后, 調(diào)用 AMQP-CPP 的 connection->process(fd, flags)方法
#include <amqpcpp.h>
#include <amqpcpp/linux_tcp.h>
class MyTcpHandler : public AMQP::TcpHandler
{
    /**
     *AMQP 庫在創(chuàng)建新連接時(shí)調(diào)用的方法
     *與處理程序相關(guān)聯(lián)。這是對處理程序的第一次調(diào)用
     *@param connection 附加到處理程序的連接
     */
    virtual void onAttached(AMQP::TcpConnection *connection) override
    {
        //@todo
        // 添加您自己的實(shí)現(xiàn),例如初始化事物
        // 以處理連接。
    }
    /**
     *當(dāng) TCP 連接時(shí)由 AMQP 庫調(diào)用的方法
     *已經(jīng)建立。調(diào)用此方法后,庫
     *仍然需要設(shè)置可選的 TLS 層和
     *在 TCP 層的頂部建立 AMQP 連接。,這種方法
     *總是與稍后對 onLost()的調(diào)用配對。
     *@param connection 現(xiàn)在可以使用的連接
     */
    virtual void onConnected(AMQP::TcpConnection *connection) override
    {
        //@todo
        // 添加您自己的實(shí)現(xiàn)(可能不需要)
    }
    /**
    *在建立安全 TLS 連接時(shí)調(diào)用的方法。
    *這只對 amqps://連接調(diào)用。它允許您檢查連接是否足夠安全,以滿足
   您的喜好
    *(例如,您可以檢查服務(wù)器證書)。AMQP 協(xié)議仍然需要啟動。
    *@param connection 已被保護(hù)的連接
    *@param ssl 來自 openssl 庫的 ssl 結(jié)構(gòu)
    *@return bool 如果可以使用連接,則為 True
    */
    virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override
    {
        //@todo
        // 添加您自己的實(shí)現(xiàn),例如讀取證書并檢查它是否確實(shí)是您的
        return true;
    }
    /**
    *當(dāng)?shù)卿泧L試成功時(shí)由 AMQP 庫調(diào)用的方法。在此之后,連接就可以使用
   了。
    *@param connection 現(xiàn)在可以使用的連接
    */
    virtual void onReady(AMQP::TcpConnection *connection) override
    {
        //@todo
        // 添加您自己的實(shí)現(xiàn),例如通過創(chuàng)建一個(gè)通道實(shí)例,然后開始發(fā)布或使用
    }
    /**
    *該方法在服務(wù)器嘗試協(xié)商檢測信號間隔時(shí)調(diào)用,
    *并被覆蓋以擺脫默認(rèn)實(shí)現(xiàn)(否決建議的檢測信號間隔),轉(zhuǎn)而接受該間
   隔。
    *@param connection 發(fā)生錯(cuò)誤的連接
    *@param interval 建議的間隔(秒)
    */
    virtual uint16_t onNegotiate(AMQP::TcpConnection *connection,uint16_t interval)
    {
        // 我們接受服務(wù)器的建議,但如果間隔小于一分鐘,我們將使用一分鐘的間隔
        if (interval < 60)
            interval = 60;
        //@todo
        // 在事件循環(huán)中設(shè)置一個(gè)計(jì)時(shí)器,
        // 如果在這段時(shí)間內(nèi)沒有發(fā)送其他指令,
        // 請確保每隔 interval 秒調(diào)用 connection->heartbeat()。
        // 返回我們要使用的間隔
        return interval;
    }
    /**
     * *發(fā)生致命錯(cuò)誤時(shí)由 AMQP 庫調(diào)用的方法
例如,因?yàn)闊o法識別從 RabbitMQ 接收的數(shù)據(jù),或者基礎(chǔ)連接丟失。
此調(diào)用之后通常會調(diào)用 onLost()(如果錯(cuò)誤發(fā)生在 TCP 連接建立之
后)和 onDetached()。
*@param connection 發(fā)生錯(cuò)誤的連接
*@param message 一條人類可讀的錯(cuò)誤消息
*/
    virtual void onError(AMQP::TcpConnection *connection, const char *message) override
    {
        //@todo
        // 添加您自己的實(shí)現(xiàn),例如,通過向程序的用戶報(bào)告錯(cuò)誤并記錄錯(cuò)誤
    }
    /**
    *該方法在 AMQP 協(xié)議結(jié)束時(shí)調(diào)用的方法。這是調(diào)用 connection.close
   ()以正常關(guān)閉連接的計(jì)數(shù)器部分。請注意,TCP 連接此時(shí)仍處于活動狀態(tài),您還
   將收到對 onLost()和 onDetached()的調(diào)用
    @param connection AMQP 協(xié)議結(jié)束的連接
    */
    virtual void onClosed(AMQP::TcpConnection *connection) override
    {
        //@todo
        // 添加您自己的實(shí)現(xiàn), 可能沒有必要,
        // 但如果您想在 amqp 連接結(jié)束后立即執(zhí)行某些操作,
        // 又不想等待 tcp 連接關(guān)閉,則這可能會很有用
    }
    /**
     *當(dāng) TCP 連接關(guān)閉或丟失時(shí)調(diào)用的方法。
     *如果同時(shí)調(diào)用了 onConnected(),則始終調(diào)用此方法
     *@param connection 已關(guān)閉但現(xiàn)在無法使用的連接
     */
    virtual void onLost(AMQP::TcpConnection *connection) override
    {
        //@todo
        // 添加您自己的實(shí)現(xiàn)(可能沒有必要)
    }
    /**
    *調(diào)用的最終方法。這表示將不再對處理程序進(jìn)行有關(guān)連接的進(jìn)一步調(diào)
   用。
   *@param connection 可以被破壞的連接
*/
    virtual void onDetached(AMQP::TcpConnection *connection) override
    {
        //@todo
        // 添加您自己的實(shí)現(xiàn),如清理資源或退出應(yīng)用程序
    }
    /**
    *當(dāng) AMQP-CPP 庫想要與主事件循環(huán)交互時(shí),它會調(diào)用該方法。
    *AMQP-CPP 庫是完全不阻塞的,
    *并且只有在事先知道這些調(diào)用不會阻塞時(shí)才進(jìn)行“write()”或“read()”系統(tǒng)
   調(diào)用。
    *要在事件循環(huán)中注冊文件描述符,它會調(diào)用這個(gè)“monitor()”方法,
    *該方法帶有一個(gè)文件描述符和指示是否該檢查文件描述符的可讀性或可寫性
   的標(biāo)志。
    *
    *@param connection 想要與事件循環(huán)交互的連接
    *@param fd 應(yīng)該檢查的文件描述符
    *@param 標(biāo)記位或 AMQP::可讀和/或 AMQP::可寫
    */
    virtual void monitor(AMQP::TcpConnection *connection, int fd,int flags) override
    {
        //@todo
        // 添加您自己的實(shí)現(xiàn),
        // 例如將文件描述符添加到主應(yīng)用程序事件循環(huán)(如 select()或 poll()循環(huán))。
		 // 當(dāng)事件循環(huán)報(bào)告描述符變?yōu)榭勺x和或可寫時(shí),
		 // 由您通過調(diào)用 connection->process(fd,flags)方法
		 // 通知 AMQP-CPP 庫文件描述符處于活動狀態(tài)。
    }
};

4.1.2 擴(kuò)展模式

以 libev 為例, 我們不必要自己實(shí)現(xiàn) monitor 函數(shù), 可以直接使用AMQP::LibEvHandler

4.2 常用類與接口介紹

4.2.1 Channel

channel(信道類) 是一個(gè)虛擬連接,大佬認(rèn)為一個(gè)socket只用于一個(gè)連接太浪費(fèi)了,所有在socket之上又做了封裝,一個(gè)連接上可以建立多個(gè)信道。每個(gè)信道都可以支持一個(gè)客戶端和服務(wù)器進(jìn)行通信。并且所有的 RabbitMq 指令都是通過 channel 傳輸,所以連接建立后的第一步,就是建立 channel。因?yàn)樗胁僮魇钱惒降?,所以?channel 上執(zhí)行指令的返回值并不能作為操作執(zhí)行結(jié)果,實(shí)際上它返回的是 Deferred 類,可以使用它安裝處理函數(shù)。

namespace AMQP
{
    /**
     * Generic callbacks that are used by many deferred objects
     */
    using SuccessCallback = std::function<void()>;
    using ErrorCallback = std::function<void(const char* message)>;
    using FinalizeCallback = std::function<void()>;
    /**
     * Declaring and deleting a queue
     */
    using QueueCallback = std::function<void(const std::string& name,
        uint32_t messagecount, uint32_t consumercount)>;
    using DeleteCallback = std::function<void(uint32_t deletedmessages)>;
    using MessageCallback = std::function<void(
        const Message &message,
        uint64_t deliveryTag,
        bool redelivered)>;
    // 當(dāng)使用發(fā)布者確認(rèn)時(shí),當(dāng)服務(wù)器確認(rèn)消息已被接收和處理時(shí),將調(diào)用
    AckCallback using AckCallback = std::function<void(
        uint64_t deliveryTag,
        bool multiple)>;
    // 使用確認(rèn)包裹通道時(shí),當(dāng)消息被 ack/nacked 時(shí),會調(diào)用這些回調(diào)
    using PublishAckCallback = std::function<void()>;
    using PublishNackCallback = std::function<void()>;
    using PublishLostCallback = std::function<void()>;
    class Channel
    {
    	//構(gòu)造函數(shù)
        Channel(Connection *connection);
        //判斷是否連接成功
        bool connected();
        /**
          *聲明交換機(jī),交換機(jī)已經(jīng)存在就ok,不存在就創(chuàng)建
          *如果提供了一個(gè)空名稱,則服務(wù)器將分配一個(gè)名稱。
          *以下 flags 可用于交換機(jī):
          *
          *-durable 持久化,重啟后交換機(jī)依然有效
          *-autodelete 刪除所有連接的隊(duì)列后,自動刪除交換
          *-passive 僅被動檢查交換機(jī)是否存在
          *-internal 創(chuàng)建內(nèi)部交換
          *
          *@param name 交換機(jī)的名稱
          *@param-type 交換類型
          enum ExchangeType
          {
              fanout, 廣播交換,綁定的隊(duì)列都能拿到消息
              direct, 直接交換,只將消息交給 routingkey 一致的隊(duì)列
              topic, 主題交換,將消息交給符合 bindingkey 規(guī)則的隊(duì)列
              headers,
              consistent_hash,
              message_deduplication
          };
          *@param flags 交換機(jī)標(biāo)志
          *@param arguments 其他參數(shù)
          *
          *此函數(shù)返回一個(gè)延遲處理程序Deferred類??梢栽O(shè)置回調(diào)函數(shù)
          using onSuccess(), onError() and onFinalize() methods.
          */
        Deferred &declareExchange(
            const std::string_view &name,
            ExchangeType type,
            int flags,
            const Table &arguments)
          /**
          *聲明隊(duì)列
          *如果不提供名稱,服務(wù)器將分配一個(gè)名稱。
          *flags 可以是以下值的組合:
          *
          *-durable 持久隊(duì)列在代理重新啟動后仍然有效
          *-autodelete 當(dāng)所有連接的使用者都離開時(shí),自動刪除隊(duì)列
          *-passive 僅被動檢查隊(duì)列是否存在
          *-exclusive 隊(duì)列僅存在于此連接,并且在連接斷開時(shí)自動刪除
          *
          *@param name 隊(duì)列的名稱
          *@param flags 標(biāo)志組合
          *@param arguments 可選參數(shù)
          *
          *此函數(shù)返回一個(gè)延遲處理程序DeferredQueue類。可以設(shè)置回調(diào)函數(shù)
          *使用 onSuccess()、onError()和 onFinalize()方法。
          *
          Deferred &onError(const char *message)
          *
          *可以安裝的 onSuccess()回調(diào)應(yīng)該具有以下簽名:
          void myCallback(const std::string &name,
          uint32_t messageCount,
          uint32_t consumerCount);
          例如:
          channel.declareQueue("myqueue").onSuccess(
          [](const std::string &name,
          uint32_t messageCount,
          uint32_t consumerCount) {
          std::cout << "Queue '" << name << "' ";
          std::cout << "has been declared with ";
          std::cout << messageCount;
          std::cout << " messages and ";
          std::cout << consumerCount;
          std::cout << " consumers" << std::endl;
          * });
          */
        DeferredQueue &declareQueue(
            const std::string_view &name,
            int flags,
            const Table &arguments)
          /**
           *將隊(duì)列綁定到交換機(jī)
           *
           *@param exchange 源交換機(jī)
           *@param queue 目標(biāo)隊(duì)列
           *@param routingkey 路由密鑰
           *@param arguments 其他綁定參數(shù)
           *
           *此函數(shù)返回一個(gè)延遲處理程序??梢园惭b回調(diào)
           *使用 onSuccess()、onError()和 onFinalize()方法。
           */
        Deferred &bindQueue(
            const std::string_view &exchange,
            const std::string_view &queue,
            const std::string_view &routingkey,
            const Table &arguments)
          /**
          *將消息發(fā)布到 exchange
          *您必須提供交換機(jī)的名稱和路由密鑰。
          然后,RabbitMQ 將嘗試將消息發(fā)送到一個(gè)或多個(gè)隊(duì)列。
          使用可選的 flags 參數(shù),可以指定如果消息無法路由到隊(duì)列時(shí)應(yīng)該發(fā)生的情況。
          默認(rèn)情況下,不可更改的消息將被靜默地丟棄。
          *
          *如果設(shè)置了'mandatory'或'immediate'標(biāo)志,
          則無法處理的消息將返回到應(yīng)用程序。
          在開始發(fā)布之前,請確保您已經(jīng)調(diào)用了 recall()-方法,
          并設(shè)置了所有適當(dāng)?shù)奶幚沓绦騺硖幚磉@些返回的消息。
          *
          *可以提供以下 flags:
          *
          *-mandatory 如果設(shè)置,服務(wù)器將返回未發(fā)送到隊(duì)列的消息
          *-immediate 如果設(shè)置,服務(wù)器將返回?zé)o法立即轉(zhuǎn)發(fā)給使用者的消息。
          *@param exchange 要發(fā)布到的交易所
          *@param routingkey 路由密鑰
          *@param envelope 要發(fā)送的完整信封
          *@param message 要發(fā)送的消息
          *@param size 消息的大小
          *@param flags 可選標(biāo)志
          */
        bool publish(
            const std::string_view &exchange,
            const std::string_view &routingKey,
            const std::string &message,
            int flags = 0)
       /**
       *告訴 RabbitMQ 服務(wù)器我們已準(zhǔn)備好使用消息-也就是訂閱那個(gè)隊(duì)列消息
       *
       *調(diào)用此方法后,RabbitMQ 開始向客戶端應(yīng)用程序傳遞消息。
       consumer tag 是一個(gè)字符串標(biāo)識符,
       如果您以后想通過 channel::cancel()調(diào)用停止它,
       可以使用它來標(biāo)識使用者。
       *如果您沒有指定使用者 tag,服務(wù)器將為您分配一個(gè)。
       *
       *支持以下 flags:
       *
       *-nolocal 如果設(shè)置了,則不會同時(shí)消耗在此通道上發(fā)布的消息
       *-noack 如果設(shè)置了,則不必對已消費(fèi)的消息進(jìn)行確認(rèn)
       *-exclusive 請求獨(dú)占訪問,只有此使用者可以訪問隊(duì)列
       *
       *@param queue 您要使用的隊(duì)列
       *@param tag 將與此消費(fèi)操作關(guān)聯(lián)的消費(fèi)者標(biāo)記
       *@param flags 其他標(biāo)記
       *@param arguments 其他參數(shù)
       *
       *此函數(shù)返回一個(gè)延遲處理程序。
       可以使用 onSuccess()、onError()和 onFinalize()方法安裝回
      調(diào)。
       可以安裝的 onSuccess()回調(diào)應(yīng)該具有以下格式:
       void myCallback(const std::string_view&tag);
       樣例:
       channel.consume("myqueue").onSuccess(
       [](const std::string_view& tag) {
       std::cout << "Started consuming under tag ";
       std::cout << tag << std::endl;
       });
       */
        DeferredConsumer &consume(
            const std::string_view &queue,
            const std::string_view &tag,
            int flags,
            const Table &arguments)
       /**
        *確認(rèn)接收到的消息
        *
        *當(dāng)在 DeferredConsumer::onReceived()方法中接收到消息進(jìn)行處理之后,
        必須確認(rèn)該消息,
        以便 RabbitMQ 將其從隊(duì)列中刪除(除非使用 noack 選項(xiàng)消費(fèi))。
        *
        *支持以下標(biāo)志:
        *
        *-多條確認(rèn)多條消息:之前傳遞的所有未確認(rèn)消息也會得到確認(rèn)
        *
     *@param deliveryTag 消息的唯一 delivery 標(biāo)簽
     *@param flags 可選標(biāo)志
     *@return bool
     */
     bool ack(uint64_t deliveryTag, int flags = 0)
    } 
    class DeferredConsumer
    {
        /*
	        注冊一個(gè)回調(diào)函數(shù),該函數(shù)在消費(fèi)者啟動時(shí)被調(diào)用。
	        void onSuccess(const std::string &consumertag)
        */
        DeferredConsumer &onSuccess(const ConsumeCallback &callback)
        /*
	        注冊回調(diào)函數(shù),用于接收到一個(gè)完整消息的時(shí)候被調(diào)用
	        void MessageCallback(const AMQP::Message &message,
	        uint64_t deliveryTag, bool redelivered)
        */
        DeferredConsumer &onReceived(const MessageCallback &callback)
        /* Alias for onReceived() */
        DeferredConsumer &onMessage(const MessageCallback &callback)
        /*
	        注冊要在服務(wù)器取消消費(fèi)者時(shí)調(diào)用的函數(shù)
	        void CancelCallback(const std::string &tag)
        */
        DeferredConsumer &onCancelled(const CancelCallback &callback)
    } 
    class Message : public Envelope
    {
        const std::string &exchange()
            const std::string &routingkey():q
    } 
    class Envelope : public MetaData
    {
        const char *body()
            uint64_t bodySize()
    }
}

類與接口的介紹總結(jié):

AMQP::Channel:信道類

  • Channel(Connection *connection) 構(gòu)造
  • bool connected() 判斷連接
  • Deferred &declareExchange() 聲明交換機(jī)
  • DeferredQueue &declareQueue() 聲明隊(duì)列
  • Deferred& bindQueue)() 將交換機(jī)與隊(duì)列進(jìn)行關(guān)系綁定的功能
  • bool publish() 發(fā)布消息
  • DeferredConsumer&consume() 定訂閱隊(duì)列消息
  • bool ack() 消費(fèi)者客戶端對收到的消息進(jìn)行確認(rèn)應(yīng)答

class Message:消息類

  • const char* body() 獲取消息正文
  • uint64_t bodySize() 獲取消息正文大小

4.3.2 ev

typedef struct ev_async
{
    EV_WATCHER(ev_async)
    EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{
    EVBREAK_CANCEL = 0, /* undo unloop */
    EVBREAK_ONE = 1,    /* unloop once */
    EVBREAK_ALL = 2     /* unloop all loops */
};
//實(shí)例化并獲取I/O事件監(jiān)控結(jié)構(gòu)句柄
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0))
#define EV_DEFAULT ev_default_loop(0)(使用宏獲取上面結(jié)構(gòu))
//開始運(yùn)行I/O事件監(jiān)控,這是一個(gè)阻塞接口(創(chuàng)建一個(gè)線程執(zhí)行該接口)
int ev_run(struct ev_loop *loop);
/* break out of the loop */
//結(jié)束I/O監(jiān)控
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents) 
//如果在當(dāng)前線程進(jìn)行ev_run則可以直接調(diào)用,如果在其他線程中進(jìn)行ev_run需要通過異步通知進(jìn)行
void ev_async_init(ev_async *w, callback cb);//初始化異步事件結(jié)構(gòu),并設(shè)置回調(diào)函數(shù)
void ev_async_start(struct ev_loop *loop, ev_async *w);//啟動事件監(jiān)控循環(huán)中的異步任務(wù)處理
void ev_async_send(struct ev_loop *loop, ev_async *w);//發(fā)送當(dāng)前異步事件到異步線程中執(zhí)行

第三方庫鏈接

g++ -o example example.cpp -lamqpcpp -lev

5. RabbitMQ樣例編寫

5.1 發(fā)布消息

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
int main()
{
    //1. 實(shí)例化底層網(wǎng)絡(luò)通信框架的I/O事件監(jiān)控句柄
    auto *loop = EV_DEFAULT;
    //2. 實(shí)例化libEvHandler句柄 --- 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來
    AMQP::LibEvHandler handler(loop);
    //3. 實(shí)例化連接對象
    AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
    AMQP::TcpConnection connection(&handler, address);
    //4. 實(shí)例化信道對象
    AMQP::TcpChannel channel(&connection);
    //5. 聲明交換機(jī)
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
        .onError([](const char *message) {
            std::cout << "聲明交換機(jī)失?。? << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange 交換機(jī)創(chuàng)建成功!" << std::endl;
        });
    //6. 聲明隊(duì)列
    channel.declareQueue("test-queue")
        .onError([](const char *message) {
            std::cout << "聲明隊(duì)列失?。? << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-queue 隊(duì)列創(chuàng)建成功!" << std::endl;
        });
    //7. 針對交換機(jī)和隊(duì)列進(jìn)行綁定
    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
        .onError([](const char *message) {
            std::cout << "test-exchange - test-queue 綁定失?。? << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange - test-queue 綁定成功!" << std::endl;
        });
    //8. 向交換機(jī)發(fā)布消息
    for (int i = 0; i < 10; i++) {
        std::string msg = "Hello Bite-" + std::to_string(i);
        bool ret = channel.publish("test-exchange", "test-queue-key", msg);
        if (ret == false) {
            std::cout << "publish 失??!\n";
        }
    }
    //啟動底層網(wǎng)絡(luò)通信框架--開啟I/O
    ev_run(loop, 0);
    return 0;
}

5.2 訂閱消息

#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>
//消息回調(diào)處理函數(shù)的實(shí)現(xiàn)
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{
    std::string msg;
    msg.assign(message.body(), message.bodySize());
    std::cout << msg << std::endl;
    channel->ack(deliveryTag); // 對消息進(jìn)行確認(rèn)
}
int main()
{
    //1. 實(shí)例化底層網(wǎng)絡(luò)通信框架的I/O事件監(jiān)控句柄
    auto *loop = EV_DEFAULT;
    //2. 實(shí)例化libEvHandler句柄 --- 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來
    AMQP::LibEvHandler handler(loop);
    //2.5. 實(shí)例化連接對象
    AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");
    AMQP::TcpConnection connection(&handler, address);
    //3. 實(shí)例化信道對象
    AMQP::TcpChannel channel(&connection);
    //4. 聲明交換機(jī)
    channel.declareExchange("test-exchange", AMQP::ExchangeType::direct)
        .onError([](const char *message) {
            std::cout << "聲明交換機(jī)失?。? << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange 交換機(jī)創(chuàng)建成功!" << std::endl;
        });
    //5. 聲明隊(duì)列
    channel.declareQueue("test-queue")
        .onError([](const char *message) {
            std::cout << "聲明隊(duì)列失?。? << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-queue 隊(duì)列創(chuàng)建成功!" << std::endl;
        });
    //6. 針對交換機(jī)和隊(duì)列進(jìn)行綁定
    channel.bindQueue("test-exchange", "test-queue", "test-queue-key")
        .onError([](const char *message) {
            std::cout << "test-exchange - test-queue 綁定失?。? << message << std::endl;
            exit(0);
        })
        .onSuccess([](){
            std::cout << "test-exchange - test-queue 綁定成功!" << std::endl;
        });
    //7. 訂閱隊(duì)列消息 -- 設(shè)置消息處理回調(diào)函數(shù)
    auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);
    channel.consume("test-queue", "consume-tag")  //返回值 DeferredConsumer
        .onReceived(callback)
        .onError([](const char *message){
            std::cout << "訂閱 test-queue 隊(duì)列消息失敗:" << message << std::endl;
            exit(0);
        }); // 返回值是 AMQP::Deferred
    //8. 啟動底層網(wǎng)絡(luò)通信框架--開啟I/O
    ev_run(loop, 0);
    return 0;
}
all : publish consume
publish : publish.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
consume : consume.cc
	g++ -std=c++17 $^ -o $@ -lamqpcpp -lev

到此這篇關(guān)于C++ RabbitMq消息隊(duì)列組件的文章就介紹到這了,更多相關(guān)C++ RabbitMq消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 利用Matlab繪制優(yōu)美的k線圖

    利用Matlab繪制優(yōu)美的k線圖

    本期又是一個(gè)花里胡哨的數(shù)據(jù)可視化,前兩天刷到了耐克的視覺設(shè)計(jì)師Gladys Orteza繪制的k線圖作品,把沉悶的股票圖變成了精彩的風(fēng)景,但是那些大部分是真的完全看不清,我這里挑選了幾個(gè)能看清的k線圖風(fēng)格將其用MATLAB進(jìn)行了實(shí)現(xiàn)
    2022-10-10
  • C++中命名空間(namespace)詳解及其作用介紹

    C++中命名空間(namespace)詳解及其作用介紹

    考慮一種情況,當(dāng)我們有兩個(gè)同名的人,Zara,在同一個(gè)班里。當(dāng)我們需要對它們進(jìn)行區(qū)分我們必須使用一些額外的信息和它們的名字,比如它們生活在不同的區(qū)域或者興趣愛好什么的,在C++程序中也會遇到同樣的情況,所以命名空間就此產(chǎn)生
    2022-08-08
  • C語言函數(shù)傳遞數(shù)組和傳遞地址的區(qū)別你知道嗎

    C語言函數(shù)傳遞數(shù)組和傳遞地址的區(qū)別你知道嗎

    這篇文章主要介紹了C語言中數(shù)組作為函數(shù)的參數(shù)以及返回值的使用簡單入門,這里以一維數(shù)組作為基本條件進(jìn)行例子講解,需要的朋友可以參考下
    2021-09-09
  • 圖解C++的STL之stack和queue,輕松理解數(shù)據(jù)結(jié)構(gòu)

    圖解C++的STL之stack和queue,輕松理解數(shù)據(jù)結(jié)構(gòu)

    聚焦?C++?的?STL?中的?stack?和?queue,讓數(shù)據(jù)結(jié)構(gòu)變得簡單有趣!?通過圖解的方式,我們將輕松理解這兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu),準(zhǔn)備好開啟?STL?學(xué)習(xí)之旅了嗎?讓我們一起探索?stack?和?queue?的奧秘吧!
    2024-03-03
  • Qt中互斥鎖QMutex和QMutexLocker的使用

    Qt中互斥鎖QMutex和QMutexLocker的使用

    本文主要介紹了Qt中互斥鎖QMutex和QMutexLocker的使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-05-05
  • C語言全方位講解指針的使用

    C語言全方位講解指針的使用

    指針是C語言中一個(gè)非常重要的概念,也是C語言的特色之一。使用指針可以對復(fù)雜數(shù)據(jù)進(jìn)行處理,能對計(jì)算機(jī)的內(nèi)存分配進(jìn)行控制,在函數(shù)調(diào)用中使用指針還可以返回多個(gè)值
    2022-04-04
  • C語言游戲之猜數(shù)字

    C語言游戲之猜數(shù)字

    這篇文章主要為大家詳細(xì)介紹了C語言游戲之猜數(shù)字,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-02-02
  • 淺談C++ 虛函數(shù)

    淺談C++ 虛函數(shù)

    這篇文章主要介紹了C++ 虛函數(shù)的相關(guān)資料,幫助大家更好的理解和學(xué)習(xí)c++,感興趣的朋友可以了解下
    2020-09-09
  • 詳解C語言中的ttyname()函數(shù)和isatty()函數(shù)的用法

    詳解C語言中的ttyname()函數(shù)和isatty()函數(shù)的用法

    這篇文章主要介紹了C語言中的ttyname()函數(shù)和isatty()函數(shù)的用法,是C語言入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下
    2015-09-09
  • C語言用fun函數(shù)實(shí)現(xiàn)兩個(gè)數(shù)的交換方式

    C語言用fun函數(shù)實(shí)現(xiàn)兩個(gè)數(shù)的交換方式

    這篇文章主要介紹了C語言用fun函數(shù)實(shí)現(xiàn)兩個(gè)數(shù)的交換方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-12-12

最新評論