C++ RabbitMq消息隊(duì)列組件詳解
1. RabbitMq介紹
RabbitMq - 消息隊(duì)列組件:實(shí)現(xiàn)兩個(gè)客戶端主機(jī)之間消息傳輸?shù)墓δ埽òl(fā)布&訂閱)。
一端發(fā)布消息,一端訂閱消息,消息就會被推送到訂閱消息那一端然后進(jìn)行處理。
RabbitMq遵守AMQP協(xié)議(標(biāo)準(zhǔn)的高級消息隊(duì)列協(xié)議)
AMQP協(xié)議核心概念:交換機(jī)(交換機(jī)類型)、隊(duì)列,綁定,消息。
兩個(gè)客戶端之間進(jìn)行消息傳輸,一端產(chǎn)生消息另一端接收消息然后處理。按照以前的思想就是兩個(gè)客戶端直接進(jìn)行網(wǎng)絡(luò)通信socket,通過網(wǎng)絡(luò)消息將一條消息發(fā)送給對方讓對方進(jìn)行處理,這是一種最基礎(chǔ)數(shù)據(jù)傳輸過程。
但是這種消息傳輸是存在缺陷的!如果有一端連接斷開了,那另一端消息到底還發(fā)不發(fā),是等,還是將這條消息丟棄掉。如果一直等,新產(chǎn)生的消息又該怎么辦,總不能一直存著。所以這種安全性是很低的。而且一對一這種客戶端里面,通常數(shù)據(jù)的產(chǎn)生和數(shù)據(jù)的處理所消耗的時(shí)間是不成正比的。通常消息的處理消耗時(shí)間更多。
基于兩端消息進(jìn)行安全傳輸?shù)男枨?,所以高級消息?duì)列組件就產(chǎn)生了。兩端不直接進(jìn)行消息傳輸了。而是通過消息隊(duì)列服務(wù)器來進(jìn)行一個(gè)中間的數(shù)據(jù)轉(zhuǎn)發(fā)功能。發(fā)布消息客戶端將信息發(fā)布到服務(wù)器上,服務(wù)器在將這條消息推送給訂閱消息隊(duì)列客戶端讓它來進(jìn)行處理。
但是針對一個(gè)高級消息隊(duì)列設(shè)計(jì)的話,單純一個(gè)只是做中間數(shù)據(jù)轉(zhuǎn)發(fā)其實(shí)是不夠的。我們希望它能在做中間數(shù)據(jù)轉(zhuǎn)發(fā)更加靈活,在不同場景提供不同的功能。這個(gè)時(shí)候就有了AMQP的核心概念(交換機(jī)、隊(duì)列、綁定、消息)。
消息隊(duì)列服務(wù)器里面首先有一個(gè)交換機(jī),它是用來處理數(shù)據(jù)轉(zhuǎn)發(fā)邏輯功能模塊。然后還有隊(duì)列。訂閱客戶端連接服務(wù)器告訴服務(wù)器訂閱那個(gè)隊(duì)列。發(fā)布客戶端進(jìn)行消息發(fā)布并不是直接把消息發(fā)布到某個(gè)隊(duì)列中,而是把信息發(fā)布到交換機(jī),由交換機(jī)來決定把這條消息放到那個(gè)隊(duì)列。決定了這條消息推送到那個(gè)訂閱客戶端哪里去進(jìn)行處理。
交換機(jī)該把消息放到那一個(gè)隊(duì)列中呢?這個(gè)時(shí)候就有了不同的交換機(jī)類型:
廣播交換:當(dāng)交換機(jī)收到消息,則將消息發(fā)布到所有綁定的隊(duì)列中
交換機(jī)和隊(duì)列都創(chuàng)建好了之后,會把交換機(jī)和隊(duì)列進(jìn)行關(guān)系綁定,也就是交換機(jī)和隊(duì)列建立一個(gè)關(guān)聯(lián)關(guān)系。而且會設(shè)置一個(gè)routing key(路由密鑰:一定規(guī)則的字符串)用來標(biāo)識這是一個(gè)放置什么類型消息的隊(duì)列。
直接交換:根據(jù)消息中的binding_key與綁定的routing_key對比,一致則放到隊(duì)列中
主題交換:使用binding_key與綁定的routing_key進(jìn)行規(guī)則匹配,成功則放入隊(duì)列
2. 安裝RabbitMQ
sudo apt install rabbitmq-server
# 啟動服務(wù) sudo systemctl start rabbitmq-server.service # 查看服務(wù)狀態(tài) sudo systemctl status rabbitmq-server.service # 安裝完成的時(shí)候默認(rèn)有個(gè)用戶 guest ,但是權(quán)限不夠,要?jiǎng)?chuàng)建一個(gè) # administrator 用戶,才可以做為遠(yuǎn)程登錄和發(fā)表訂閱消息: #添加用戶 sudo rabbitmqctl add_user root 123456 #設(shè)置用戶 tag sudo rabbitmqctl set_user_tags root administrator #設(shè)置用戶權(quán)限 sudo rabbitmqctl set_permissions -p / root "." "." ".*" # RabbitMQ 自帶了 web 管理界面,執(zhí)行下面命令開啟 sudo rabbitmq-plugins enable rabbitmq_management
訪問 webUI 界面, 默認(rèn)端口為 15672
至此RabbitMQ安裝成功。
3. 安裝 RabbitMQ 的 C++客戶端庫
我們這里使用 AMQP-CPP 庫來編寫客戶端程序。
先安裝libev網(wǎng)絡(luò)通信庫。在搭建RabbitMQ客戶端的時(shí)候需要進(jìn)行一個(gè)網(wǎng)絡(luò)通信的事件監(jiān)控。事件監(jiān)控我們可以自己寫poll,epoll但是太麻煩了。這里我們使用第三方網(wǎng)絡(luò)通信框架。RabbitMQ對libevent、libev等等這些都支持。這里我們選擇的是libvev。
sudo apt install libev-dev #libev 網(wǎng)絡(luò)庫組件
git clone https://gitee.com/iOceanPlus_Forked/AMQP-CPP.git cd AMQP-CPP/ make make install
至此可以通過 AMQP-CPP 來操作 rabbitmq
4. AMQP-CPP 庫的簡單使用
AMQP-CPP 是用于與 RabbitMq 消息中間件通信的 c++庫。它能解析從 RabbitMq
服務(wù)發(fā)送來的數(shù)據(jù),也可以生成發(fā)向 RabbitMq 的數(shù)據(jù)包。AMQP-CPP 庫不會向
RabbitMq 建立網(wǎng)絡(luò)連接,所有的網(wǎng)絡(luò)I/O由用戶完成。
- 當(dāng)然,AMQP-CPP 提供了可選的網(wǎng)絡(luò)層接口,它預(yù)定義了 TCP 模塊,用戶就不用自己實(shí)現(xiàn)網(wǎng)絡(luò)IO,我們也可以選擇 libevent、libev、libuv、asio 等異步通信組件,需要手動安裝對應(yīng)的組件。
- AMQP-CPP 完全異步,沒有阻塞式的系統(tǒng)調(diào)用,不使用線程就能夠應(yīng)用在高性能
- 應(yīng)用中。
- 注意:它需要 c++17 的支持
4.1 使用
AMQP-CPP 的使用有兩種模式:
- 使用默認(rèn)的 TCP 模塊進(jìn)行網(wǎng)絡(luò)通信
- 使用擴(kuò)展的 libevent、libev、libuv、asio 異步通信組件進(jìn)行通信
4.1.1 TCP 模式
- 實(shí)現(xiàn)一個(gè)類繼承自 AMQP::TcpHandler 類, 它負(fù)責(zé)網(wǎng)絡(luò)層的 TCP 連接
- 重寫相關(guān)函數(shù), 其中必須重寫 monitor 函數(shù)
- 在 monitor 函數(shù)中需要實(shí)現(xiàn)的是將 fd 放入 eventloop(select、epoll)中監(jiān)控, 當(dāng) fd可寫可讀就緒之后, 調(diào)用 AMQP-CPP 的 connection->process(fd, flags)方法
#include <amqpcpp.h> #include <amqpcpp/linux_tcp.h> class MyTcpHandler : public AMQP::TcpHandler { /** *AMQP 庫在創(chuàng)建新連接時(shí)調(diào)用的方法 *與處理程序相關(guān)聯(lián)。這是對處理程序的第一次調(diào)用 *@param connection 附加到處理程序的連接 */ virtual void onAttached(AMQP::TcpConnection *connection) override { //@todo // 添加您自己的實(shí)現(xiàn),例如初始化事物 // 以處理連接。 } /** *當(dāng) TCP 連接時(shí)由 AMQP 庫調(diào)用的方法 *已經(jīng)建立。調(diào)用此方法后,庫 *仍然需要設(shè)置可選的 TLS 層和 *在 TCP 層的頂部建立 AMQP 連接。,這種方法 *總是與稍后對 onLost()的調(diào)用配對。 *@param connection 現(xiàn)在可以使用的連接 */ virtual void onConnected(AMQP::TcpConnection *connection) override { //@todo // 添加您自己的實(shí)現(xiàn)(可能不需要) } /** *在建立安全 TLS 連接時(shí)調(diào)用的方法。 *這只對 amqps://連接調(diào)用。它允許您檢查連接是否足夠安全,以滿足 您的喜好 *(例如,您可以檢查服務(wù)器證書)。AMQP 協(xié)議仍然需要啟動。 *@param connection 已被保護(hù)的連接 *@param ssl 來自 openssl 庫的 ssl 結(jié)構(gòu) *@return bool 如果可以使用連接,則為 True */ virtual bool onSecured(AMQP::TcpConnection *connection, const SSL *ssl) override { //@todo // 添加您自己的實(shí)現(xiàn),例如讀取證書并檢查它是否確實(shí)是您的 return true; } /** *當(dāng)?shù)卿泧L試成功時(shí)由 AMQP 庫調(diào)用的方法。在此之后,連接就可以使用 了。 *@param connection 現(xiàn)在可以使用的連接 */ virtual void onReady(AMQP::TcpConnection *connection) override { //@todo // 添加您自己的實(shí)現(xiàn),例如通過創(chuàng)建一個(gè)通道實(shí)例,然后開始發(fā)布或使用 } /** *該方法在服務(wù)器嘗試協(xié)商檢測信號間隔時(shí)調(diào)用, *并被覆蓋以擺脫默認(rèn)實(shí)現(xiàn)(否決建議的檢測信號間隔),轉(zhuǎn)而接受該間 隔。 *@param connection 發(fā)生錯(cuò)誤的連接 *@param interval 建議的間隔(秒) */ virtual uint16_t onNegotiate(AMQP::TcpConnection *connection,uint16_t interval) { // 我們接受服務(wù)器的建議,但如果間隔小于一分鐘,我們將使用一分鐘的間隔 if (interval < 60) interval = 60; //@todo // 在事件循環(huán)中設(shè)置一個(gè)計(jì)時(shí)器, // 如果在這段時(shí)間內(nèi)沒有發(fā)送其他指令, // 請確保每隔 interval 秒調(diào)用 connection->heartbeat()。 // 返回我們要使用的間隔 return interval; } /** * *發(fā)生致命錯(cuò)誤時(shí)由 AMQP 庫調(diào)用的方法 例如,因?yàn)闊o法識別從 RabbitMQ 接收的數(shù)據(jù),或者基礎(chǔ)連接丟失。 此調(diào)用之后通常會調(diào)用 onLost()(如果錯(cuò)誤發(fā)生在 TCP 連接建立之 后)和 onDetached()。 *@param connection 發(fā)生錯(cuò)誤的連接 *@param message 一條人類可讀的錯(cuò)誤消息 */ virtual void onError(AMQP::TcpConnection *connection, const char *message) override { //@todo // 添加您自己的實(shí)現(xiàn),例如,通過向程序的用戶報(bào)告錯(cuò)誤并記錄錯(cuò)誤 } /** *該方法在 AMQP 協(xié)議結(jié)束時(shí)調(diào)用的方法。這是調(diào)用 connection.close ()以正常關(guān)閉連接的計(jì)數(shù)器部分。請注意,TCP 連接此時(shí)仍處于活動狀態(tài),您還 將收到對 onLost()和 onDetached()的調(diào)用 @param connection AMQP 協(xié)議結(jié)束的連接 */ virtual void onClosed(AMQP::TcpConnection *connection) override { //@todo // 添加您自己的實(shí)現(xiàn), 可能沒有必要, // 但如果您想在 amqp 連接結(jié)束后立即執(zhí)行某些操作, // 又不想等待 tcp 連接關(guān)閉,則這可能會很有用 } /** *當(dāng) TCP 連接關(guān)閉或丟失時(shí)調(diào)用的方法。 *如果同時(shí)調(diào)用了 onConnected(),則始終調(diào)用此方法 *@param connection 已關(guān)閉但現(xiàn)在無法使用的連接 */ virtual void onLost(AMQP::TcpConnection *connection) override { //@todo // 添加您自己的實(shí)現(xiàn)(可能沒有必要) } /** *調(diào)用的最終方法。這表示將不再對處理程序進(jìn)行有關(guān)連接的進(jìn)一步調(diào) 用。 *@param connection 可以被破壞的連接 */ virtual void onDetached(AMQP::TcpConnection *connection) override { //@todo // 添加您自己的實(shí)現(xiàn),如清理資源或退出應(yīng)用程序 } /** *當(dāng) AMQP-CPP 庫想要與主事件循環(huán)交互時(shí),它會調(diào)用該方法。 *AMQP-CPP 庫是完全不阻塞的, *并且只有在事先知道這些調(diào)用不會阻塞時(shí)才進(jìn)行“write()”或“read()”系統(tǒng) 調(diào)用。 *要在事件循環(huán)中注冊文件描述符,它會調(diào)用這個(gè)“monitor()”方法, *該方法帶有一個(gè)文件描述符和指示是否該檢查文件描述符的可讀性或可寫性 的標(biāo)志。 * *@param connection 想要與事件循環(huán)交互的連接 *@param fd 應(yīng)該檢查的文件描述符 *@param 標(biāo)記位或 AMQP::可讀和/或 AMQP::可寫 */ virtual void monitor(AMQP::TcpConnection *connection, int fd,int flags) override { //@todo // 添加您自己的實(shí)現(xiàn), // 例如將文件描述符添加到主應(yīng)用程序事件循環(huán)(如 select()或 poll()循環(huán))。 // 當(dāng)事件循環(huán)報(bào)告描述符變?yōu)榭勺x和或可寫時(shí), // 由您通過調(diào)用 connection->process(fd,flags)方法 // 通知 AMQP-CPP 庫文件描述符處于活動狀態(tài)。 } };
4.1.2 擴(kuò)展模式
以 libev 為例, 我們不必要自己實(shí)現(xiàn) monitor 函數(shù), 可以直接使用AMQP::LibEvHandler
4.2 常用類與接口介紹
4.2.1 Channel
channel(信道類) 是一個(gè)虛擬連接,大佬認(rèn)為一個(gè)socket只用于一個(gè)連接太浪費(fèi)了,所有在socket之上又做了封裝,一個(gè)連接上可以建立多個(gè)信道。每個(gè)信道都可以支持一個(gè)客戶端和服務(wù)器進(jìn)行通信。并且所有的 RabbitMq 指令都是通過 channel 傳輸,所以連接建立后的第一步,就是建立 channel。因?yàn)樗胁僮魇钱惒降?,所以?channel 上執(zhí)行指令的返回值并不能作為操作執(zhí)行結(jié)果,實(shí)際上它返回的是 Deferred 類,可以使用它安裝處理函數(shù)。
namespace AMQP { /** * Generic callbacks that are used by many deferred objects */ using SuccessCallback = std::function<void()>; using ErrorCallback = std::function<void(const char* message)>; using FinalizeCallback = std::function<void()>; /** * Declaring and deleting a queue */ using QueueCallback = std::function<void(const std::string& name, uint32_t messagecount, uint32_t consumercount)>; using DeleteCallback = std::function<void(uint32_t deletedmessages)>; using MessageCallback = std::function<void( const Message &message, uint64_t deliveryTag, bool redelivered)>; // 當(dāng)使用發(fā)布者確認(rèn)時(shí),當(dāng)服務(wù)器確認(rèn)消息已被接收和處理時(shí),將調(diào)用 AckCallback using AckCallback = std::function<void( uint64_t deliveryTag, bool multiple)>; // 使用確認(rèn)包裹通道時(shí),當(dāng)消息被 ack/nacked 時(shí),會調(diào)用這些回調(diào) using PublishAckCallback = std::function<void()>; using PublishNackCallback = std::function<void()>; using PublishLostCallback = std::function<void()>; class Channel { //構(gòu)造函數(shù) Channel(Connection *connection); //判斷是否連接成功 bool connected(); /** *聲明交換機(jī),交換機(jī)已經(jīng)存在就ok,不存在就創(chuàng)建 *如果提供了一個(gè)空名稱,則服務(wù)器將分配一個(gè)名稱。 *以下 flags 可用于交換機(jī): * *-durable 持久化,重啟后交換機(jī)依然有效 *-autodelete 刪除所有連接的隊(duì)列后,自動刪除交換 *-passive 僅被動檢查交換機(jī)是否存在 *-internal 創(chuàng)建內(nèi)部交換 * *@param name 交換機(jī)的名稱 *@param-type 交換類型 enum ExchangeType { fanout, 廣播交換,綁定的隊(duì)列都能拿到消息 direct, 直接交換,只將消息交給 routingkey 一致的隊(duì)列 topic, 主題交換,將消息交給符合 bindingkey 規(guī)則的隊(duì)列 headers, consistent_hash, message_deduplication }; *@param flags 交換機(jī)標(biāo)志 *@param arguments 其他參數(shù) * *此函數(shù)返回一個(gè)延遲處理程序Deferred類??梢栽O(shè)置回調(diào)函數(shù) using onSuccess(), onError() and onFinalize() methods. */ Deferred &declareExchange( const std::string_view &name, ExchangeType type, int flags, const Table &arguments) /** *聲明隊(duì)列 *如果不提供名稱,服務(wù)器將分配一個(gè)名稱。 *flags 可以是以下值的組合: * *-durable 持久隊(duì)列在代理重新啟動后仍然有效 *-autodelete 當(dāng)所有連接的使用者都離開時(shí),自動刪除隊(duì)列 *-passive 僅被動檢查隊(duì)列是否存在 *-exclusive 隊(duì)列僅存在于此連接,并且在連接斷開時(shí)自動刪除 * *@param name 隊(duì)列的名稱 *@param flags 標(biāo)志組合 *@param arguments 可選參數(shù) * *此函數(shù)返回一個(gè)延遲處理程序DeferredQueue類。可以設(shè)置回調(diào)函數(shù) *使用 onSuccess()、onError()和 onFinalize()方法。 * Deferred &onError(const char *message) * *可以安裝的 onSuccess()回調(diào)應(yīng)該具有以下簽名: void myCallback(const std::string &name, uint32_t messageCount, uint32_t consumerCount); 例如: channel.declareQueue("myqueue").onSuccess( [](const std::string &name, uint32_t messageCount, uint32_t consumerCount) { std::cout << "Queue '" << name << "' "; std::cout << "has been declared with "; std::cout << messageCount; std::cout << " messages and "; std::cout << consumerCount; std::cout << " consumers" << std::endl; * }); */ DeferredQueue &declareQueue( const std::string_view &name, int flags, const Table &arguments) /** *將隊(duì)列綁定到交換機(jī) * *@param exchange 源交換機(jī) *@param queue 目標(biāo)隊(duì)列 *@param routingkey 路由密鑰 *@param arguments 其他綁定參數(shù) * *此函數(shù)返回一個(gè)延遲處理程序??梢园惭b回調(diào) *使用 onSuccess()、onError()和 onFinalize()方法。 */ Deferred &bindQueue( const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments) /** *將消息發(fā)布到 exchange *您必須提供交換機(jī)的名稱和路由密鑰。 然后,RabbitMQ 將嘗試將消息發(fā)送到一個(gè)或多個(gè)隊(duì)列。 使用可選的 flags 參數(shù),可以指定如果消息無法路由到隊(duì)列時(shí)應(yīng)該發(fā)生的情況。 默認(rèn)情況下,不可更改的消息將被靜默地丟棄。 * *如果設(shè)置了'mandatory'或'immediate'標(biāo)志, 則無法處理的消息將返回到應(yīng)用程序。 在開始發(fā)布之前,請確保您已經(jīng)調(diào)用了 recall()-方法, 并設(shè)置了所有適當(dāng)?shù)奶幚沓绦騺硖幚磉@些返回的消息。 * *可以提供以下 flags: * *-mandatory 如果設(shè)置,服務(wù)器將返回未發(fā)送到隊(duì)列的消息 *-immediate 如果設(shè)置,服務(wù)器將返回?zé)o法立即轉(zhuǎn)發(fā)給使用者的消息。 *@param exchange 要發(fā)布到的交易所 *@param routingkey 路由密鑰 *@param envelope 要發(fā)送的完整信封 *@param message 要發(fā)送的消息 *@param size 消息的大小 *@param flags 可選標(biāo)志 */ bool publish( const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0) /** *告訴 RabbitMQ 服務(wù)器我們已準(zhǔn)備好使用消息-也就是訂閱那個(gè)隊(duì)列消息 * *調(diào)用此方法后,RabbitMQ 開始向客戶端應(yīng)用程序傳遞消息。 consumer tag 是一個(gè)字符串標(biāo)識符, 如果您以后想通過 channel::cancel()調(diào)用停止它, 可以使用它來標(biāo)識使用者。 *如果您沒有指定使用者 tag,服務(wù)器將為您分配一個(gè)。 * *支持以下 flags: * *-nolocal 如果設(shè)置了,則不會同時(shí)消耗在此通道上發(fā)布的消息 *-noack 如果設(shè)置了,則不必對已消費(fèi)的消息進(jìn)行確認(rèn) *-exclusive 請求獨(dú)占訪問,只有此使用者可以訪問隊(duì)列 * *@param queue 您要使用的隊(duì)列 *@param tag 將與此消費(fèi)操作關(guān)聯(lián)的消費(fèi)者標(biāo)記 *@param flags 其他標(biāo)記 *@param arguments 其他參數(shù) * *此函數(shù)返回一個(gè)延遲處理程序。 可以使用 onSuccess()、onError()和 onFinalize()方法安裝回 調(diào)。 可以安裝的 onSuccess()回調(diào)應(yīng)該具有以下格式: void myCallback(const std::string_view&tag); 樣例: channel.consume("myqueue").onSuccess( [](const std::string_view& tag) { std::cout << "Started consuming under tag "; std::cout << tag << std::endl; }); */ DeferredConsumer &consume( const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments) /** *確認(rèn)接收到的消息 * *當(dāng)在 DeferredConsumer::onReceived()方法中接收到消息進(jìn)行處理之后, 必須確認(rèn)該消息, 以便 RabbitMQ 將其從隊(duì)列中刪除(除非使用 noack 選項(xiàng)消費(fèi))。 * *支持以下標(biāo)志: * *-多條確認(rèn)多條消息:之前傳遞的所有未確認(rèn)消息也會得到確認(rèn) * *@param deliveryTag 消息的唯一 delivery 標(biāo)簽 *@param flags 可選標(biāo)志 *@return bool */ bool ack(uint64_t deliveryTag, int flags = 0) } class DeferredConsumer { /* 注冊一個(gè)回調(diào)函數(shù),該函數(shù)在消費(fèi)者啟動時(shí)被調(diào)用。 void onSuccess(const std::string &consumertag) */ DeferredConsumer &onSuccess(const ConsumeCallback &callback) /* 注冊回調(diào)函數(shù),用于接收到一個(gè)完整消息的時(shí)候被調(diào)用 void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) */ DeferredConsumer &onReceived(const MessageCallback &callback) /* Alias for onReceived() */ DeferredConsumer &onMessage(const MessageCallback &callback) /* 注冊要在服務(wù)器取消消費(fèi)者時(shí)調(diào)用的函數(shù) void CancelCallback(const std::string &tag) */ DeferredConsumer &onCancelled(const CancelCallback &callback) } class Message : public Envelope { const std::string &exchange() const std::string &routingkey():q } class Envelope : public MetaData { const char *body() uint64_t bodySize() } }
類與接口的介紹總結(jié):
AMQP::Channel:信道類
- Channel(Connection *connection) 構(gòu)造
- bool connected() 判斷連接
- Deferred &declareExchange() 聲明交換機(jī)
- DeferredQueue &declareQueue() 聲明隊(duì)列
- Deferred& bindQueue)() 將交換機(jī)與隊(duì)列進(jìn)行關(guān)系綁定的功能
- bool publish() 發(fā)布消息
- DeferredConsumer&consume() 定訂閱隊(duì)列消息
- bool ack() 消費(fèi)者客戶端對收到的消息進(jìn)行確認(rèn)應(yīng)答
class Message:消息類
- const char* body() 獲取消息正文
- uint64_t bodySize() 獲取消息正文大小
4.3.2 ev
typedef struct ev_async { EV_WATCHER(ev_async) EV_ATOMIC_T sent; /* private */ } ev_async; // break type enum { EVBREAK_CANCEL = 0, /* undo unloop */ EVBREAK_ONE = 1, /* unloop once */ EVBREAK_ALL = 2 /* unloop all loops */ }; //實(shí)例化并獲取I/O事件監(jiān)控結(jié)構(gòu)句柄 struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0)) #define EV_DEFAULT ev_default_loop(0)(使用宏獲取上面結(jié)構(gòu)) //開始運(yùn)行I/O事件監(jiān)控,這是一個(gè)阻塞接口(創(chuàng)建一個(gè)線程執(zhí)行該接口) int ev_run(struct ev_loop *loop); /* break out of the loop */ //結(jié)束I/O監(jiān)控 void ev_break(struct ev_loop *loop, int32_t break_type); void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents) //如果在當(dāng)前線程進(jìn)行ev_run則可以直接調(diào)用,如果在其他線程中進(jìn)行ev_run需要通過異步通知進(jìn)行 void ev_async_init(ev_async *w, callback cb);//初始化異步事件結(jié)構(gòu),并設(shè)置回調(diào)函數(shù) void ev_async_start(struct ev_loop *loop, ev_async *w);//啟動事件監(jiān)控循環(huán)中的異步任務(wù)處理 void ev_async_send(struct ev_loop *loop, ev_async *w);//發(fā)送當(dāng)前異步事件到異步線程中執(zhí)行
第三方庫鏈接
g++ -o example example.cpp -lamqpcpp -lev
5. RabbitMQ樣例編寫
5.1 發(fā)布消息
#include <ev.h> #include <amqpcpp.h> #include <amqpcpp/libev.h> #include <openssl/ssl.h> #include <openssl/opensslv.h> int main() { //1. 實(shí)例化底層網(wǎng)絡(luò)通信框架的I/O事件監(jiān)控句柄 auto *loop = EV_DEFAULT; //2. 實(shí)例化libEvHandler句柄 --- 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來 AMQP::LibEvHandler handler(loop); //3. 實(shí)例化連接對象 AMQP::Address address("amqp://root:123456@127.0.0.1:5672/"); AMQP::TcpConnection connection(&handler, address); //4. 實(shí)例化信道對象 AMQP::TcpChannel channel(&connection); //5. 聲明交換機(jī) channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) .onError([](const char *message) { std::cout << "聲明交換機(jī)失?。? << message << std::endl; exit(0); }) .onSuccess([](){ std::cout << "test-exchange 交換機(jī)創(chuàng)建成功!" << std::endl; }); //6. 聲明隊(duì)列 channel.declareQueue("test-queue") .onError([](const char *message) { std::cout << "聲明隊(duì)列失?。? << message << std::endl; exit(0); }) .onSuccess([](){ std::cout << "test-queue 隊(duì)列創(chuàng)建成功!" << std::endl; }); //7. 針對交換機(jī)和隊(duì)列進(jìn)行綁定 channel.bindQueue("test-exchange", "test-queue", "test-queue-key") .onError([](const char *message) { std::cout << "test-exchange - test-queue 綁定失?。? << message << std::endl; exit(0); }) .onSuccess([](){ std::cout << "test-exchange - test-queue 綁定成功!" << std::endl; }); //8. 向交換機(jī)發(fā)布消息 for (int i = 0; i < 10; i++) { std::string msg = "Hello Bite-" + std::to_string(i); bool ret = channel.publish("test-exchange", "test-queue-key", msg); if (ret == false) { std::cout << "publish 失??!\n"; } } //啟動底層網(wǎng)絡(luò)通信框架--開啟I/O ev_run(loop, 0); return 0; }
5.2 訂閱消息
#include <ev.h> #include <amqpcpp.h> #include <amqpcpp/libev.h> #include <openssl/ssl.h> #include <openssl/opensslv.h> //消息回調(diào)處理函數(shù)的實(shí)現(xiàn) void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) { std::string msg; msg.assign(message.body(), message.bodySize()); std::cout << msg << std::endl; channel->ack(deliveryTag); // 對消息進(jìn)行確認(rèn) } int main() { //1. 實(shí)例化底層網(wǎng)絡(luò)通信框架的I/O事件監(jiān)控句柄 auto *loop = EV_DEFAULT; //2. 實(shí)例化libEvHandler句柄 --- 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來 AMQP::LibEvHandler handler(loop); //2.5. 實(shí)例化連接對象 AMQP::Address address("amqp://root:123456@127.0.0.1:5672/"); AMQP::TcpConnection connection(&handler, address); //3. 實(shí)例化信道對象 AMQP::TcpChannel channel(&connection); //4. 聲明交換機(jī) channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) .onError([](const char *message) { std::cout << "聲明交換機(jī)失?。? << message << std::endl; exit(0); }) .onSuccess([](){ std::cout << "test-exchange 交換機(jī)創(chuàng)建成功!" << std::endl; }); //5. 聲明隊(duì)列 channel.declareQueue("test-queue") .onError([](const char *message) { std::cout << "聲明隊(duì)列失?。? << message << std::endl; exit(0); }) .onSuccess([](){ std::cout << "test-queue 隊(duì)列創(chuàng)建成功!" << std::endl; }); //6. 針對交換機(jī)和隊(duì)列進(jìn)行綁定 channel.bindQueue("test-exchange", "test-queue", "test-queue-key") .onError([](const char *message) { std::cout << "test-exchange - test-queue 綁定失?。? << message << std::endl; exit(0); }) .onSuccess([](){ std::cout << "test-exchange - test-queue 綁定成功!" << std::endl; }); //7. 訂閱隊(duì)列消息 -- 設(shè)置消息處理回調(diào)函數(shù) auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); channel.consume("test-queue", "consume-tag") //返回值 DeferredConsumer .onReceived(callback) .onError([](const char *message){ std::cout << "訂閱 test-queue 隊(duì)列消息失敗:" << message << std::endl; exit(0); }); // 返回值是 AMQP::Deferred //8. 啟動底層網(wǎng)絡(luò)通信框架--開啟I/O ev_run(loop, 0); return 0; }
all : publish consume publish : publish.cc g++ -std=c++17 $^ -o $@ -lamqpcpp -lev consume : consume.cc g++ -std=c++17 $^ -o $@ -lamqpcpp -lev
到此這篇關(guān)于C++ RabbitMq消息隊(duì)列組件的文章就介紹到這了,更多相關(guān)C++ RabbitMq消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C語言函數(shù)傳遞數(shù)組和傳遞地址的區(qū)別你知道嗎
這篇文章主要介紹了C語言中數(shù)組作為函數(shù)的參數(shù)以及返回值的使用簡單入門,這里以一維數(shù)組作為基本條件進(jìn)行例子講解,需要的朋友可以參考下2021-09-09圖解C++的STL之stack和queue,輕松理解數(shù)據(jù)結(jié)構(gòu)
聚焦?C++?的?STL?中的?stack?和?queue,讓數(shù)據(jù)結(jié)構(gòu)變得簡單有趣!?通過圖解的方式,我們將輕松理解這兩個(gè)重要的數(shù)據(jù)結(jié)構(gòu),準(zhǔn)備好開啟?STL?學(xué)習(xí)之旅了嗎?讓我們一起探索?stack?和?queue?的奧秘吧!2024-03-03詳解C語言中的ttyname()函數(shù)和isatty()函數(shù)的用法
這篇文章主要介紹了C語言中的ttyname()函數(shù)和isatty()函數(shù)的用法,是C語言入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下2015-09-09C語言用fun函數(shù)實(shí)現(xiàn)兩個(gè)數(shù)的交換方式
這篇文章主要介紹了C語言用fun函數(shù)實(shí)現(xiàn)兩個(gè)數(shù)的交換方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12