C++ 第三方庫 RabbitMq示例詳解
1.介紹
RabbitMQ
:消息隊列組件,實現(xiàn)兩個客戶端主機之間消息傳輸?shù)墓δ?發(fā)布&訂閱)
- 核心概念:交換機、隊列、綁定、消息
- 交換機類型:
- 廣播交換:當交換機收到消息,則將消息發(fā)布到所有綁定的隊列中
- 直接交換:根據(jù)消息中的
bkey
與綁定的rkey
對比,一致則放入隊列 - 主題交換:使用
bkey
與綁定的rkey
進行規(guī)則匹配,成功則放入隊列
2.安裝
1.RabbitMq
- 安裝:
sudo apt install rabbitmq-server
- 簡單使用:
# 安裝完成的時候默認有個用戶guest,但是權(quán)限不夠,要創(chuàng)建一個administrator用戶,才可以做為遠程登錄和發(fā)表訂閱消息 #添加用戶 sudo rabbitmqctl add_user root <PASSWORD> #設置用戶tag sudo rabbitmqctl set_user_tags root administrator #設置用戶權(quán)限 sudo rabbitmqctl set_permissions -p / root "." "." ".*" # RabbitMQ自帶了web管理界面, 執(zhí)行下面命令開啟, 默認端口15672 sudo rabbitmq-plugins enable rabbitmq_management
2.客戶端庫
sudo apt install libev-dev #libev 網(wǎng)絡庫組件 git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git cd AMQP-CPP/ make make install
如果安裝時出現(xiàn)以下報錯,則表示ssl
版本出現(xiàn)問題
/usr/include/openssl/macros.h:147:4: error: #error "OPENSSL_API_COMPAT expresses an impossible API compatibility level" 147 | # error "OPENSSL_API_COMPAT expresses an impossible API compatibility level" | ^~~~~ In file included from /usr/include/openssl/ssl.h:18, from linux_tcp/openssl.h:20, from linux_tcp/openssl.cpp:12: /usr/include/openssl/bio.h:687:1: error: expected constructor, destructor, or type conversion before ‘DEPRECATEDIN_1_1_0' 687 | DEPRECATEDIN_1_1_0(int BIO_get_port(const char *str, unsigned short *port_ptr))
解決方案:卸載當前的ssl
庫,重新進行修復安裝
dpkg -l | grep ssl sudo dpkg -P --force-all libevent-openssl-2.1-7 sudo dpkg -P --force-all openssl sudo dpkg -P --force-all libssl-dev sudo apt --fix-broken install
3.AMQP-CPP
簡單使用
1.介紹
AMQP-CPP
是用于與RabbitMq
消息中間件通信的C++庫- 它能解析從
RabbitMq
服務發(fā)送來的數(shù)據(jù),也可以生成發(fā)向RabbitMq
的數(shù)據(jù)包 AMQP-CPP
庫不會向RabbitMq
建立網(wǎng)絡連接,所有的網(wǎng)絡IO由用戶完成
- 它能解析從
AMQP-CPP
提供了可選的網(wǎng)絡層接口,它預定義了TCP
模塊,用戶就不用自己實現(xiàn)網(wǎng)絡IO,- 也可以選擇
libevent、libev、libuv、asio
等異步通信組件, 需要手動安裝對應的組件
- 也可以選擇
AMQP-CPP
完全異步,沒有阻塞式的系統(tǒng)調(diào)用,不使用線程就能夠應用在高性能應用中- 注意:它需要C++17的支持
2.使用
AMQP-CPP
的使用有兩種模式:- 使用默認的
TCP
模塊進行網(wǎng)絡通信 - 使用擴展的
libevent、libev、libuv、asio
異步通信組件進行通信
- 使用默認的
- 此處以
libev
為例,不需要自己實現(xiàn)monitor
函數(shù),可以直接使用AMQP::LibEvHandler
4.類與接口
1.Channel
channel
是一個虛擬連接,一個連接上可以建立多個通道- 并且所有的
RabbitMq
指令都是通過channel
傳輸
- 并且所有的
- 所以連接建立后的第一步,就是建立
channel
- 因為所有操作是異步的,所以在
channel
上執(zhí)行指令的返回值并不能作為操作執(zhí)行結(jié)果
- 因為所有操作是異步的,所以在
- 實際上它返回的是
Deferred
類,可以使用它安裝處理函數(shù)
namespace AMQP { /** * Generic callbacks that are used by many deferred objects */ using SuccessCallback = std::function<void()>; using ErrorCallback = std::function<void(const char *message)>; using FinalizeCallback = std::function<void()>; /** * Declaring and deleting a queue */ using QueueCallback = std::function<void(const std::string &name, uint32_t messagecount, uint32_t consumercount)>; using DeleteCallback = std::function<void(uint32_t deletedmessages)>; using MessageCallback = std::function<void(const Message &message, uint64_t deliveryTag, bool redelivered)>; // 當使用發(fā)布者確認時,當服務器確認消息已被接收和處理時,將調(diào)用AckCallback using AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>; // 使用確認包裹通道時,當消息被ack/nacked時,會調(diào)用這些回調(diào) using PublishAckCallback = std::function<void()>; using PublishNackCallback = std::function<void()>; using PublishLostCallback = std::function<void()>; // 信道類 class Channel { Channel(Connection *connection); bool connected(); /** *聲明交換機 *如果提供了一個空名稱,則服務器將分配一個名稱。 *以下flags可用于交換機: * *-durable 持久化,重啟后交換機依然有效 *-autodelete 刪除所有連接的隊列后,自動刪除交換 *-passive 僅被動檢查交換機是否存在 *-internal 創(chuàng)建內(nèi)部交換 * *@param name 交換機的名稱 *@param-type 交換類型 enum ExchangeType { fanout, 廣播交換,綁定的隊列都能拿到消息 direct, 直接交換,只將消息交給routingkey一致的隊列 topic, 主題交換,將消息交給符合bindingkey規(guī)則的隊列 headers, consistent_hash, message_deduplication }; *@param flags 交換機標志 *@param arguments其他參數(shù) * *此函數(shù)返回一個延遲處理程序??梢园惭b回調(diào) using onSuccess(), onError() and onFinalize() methods. */ Deferred &declareExchange(const std::string_view &name, ExchangeType type, int flags, const Table &arguments); /** *聲明隊列 *如果不提供名稱,服務器將分配一個名稱。 *flags可以是以下值的組合: * *-durable 持久隊列在代理重新啟動后仍然有效 *-autodelete 當所有連接的使用者都離開時,自動刪除隊列 *-passive 僅被動檢查隊列是否存在 *-exclusive 隊列僅存在于此連接,并且在連接斷開時自動刪除 * *@param name 隊列的名稱 *@param flags 標志組合 *@param arguments 可選參數(shù) * *此函數(shù)返回一個延遲處理程序??梢园惭b回調(diào) *使用onSuccess()、onError()和onFinalize()方法。 * Deferred &onError(const char *message) * *可以安裝的onSuccess()回調(diào)應該具有以下簽名: void myCallback(const std::string &name, uint32_t messageCount, uint32_t consumerCount); 例如: channel.declareQueue("myqueue").onSuccess( [](const std::string &name, uint32_t messageCount, uint32_t consumerCount) { std::cout << "Queue '" << name << "' "; std::cout << "has been declared with "; std::cout << messageCount; std::cout << " messages and "; std::cout << consumerCount; std::cout << " consumers" << std::endl; * }); */ DeferredQueue &declareQueue(const std::string_view &name, int flags, const Table &arguments); /** *將隊列綁定到交換機 * *@param exchange 源交換機 *@param queue 目標隊列 *@param routingkey 路由密鑰 *@param arguments 其他綁定參數(shù) * *此函數(shù)返回一個延遲處理程序??梢园惭b回調(diào) *使用onSuccess()、onError()和onFinalize()方法。 */ Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue, const std::string_view &routingkey, const Table &arguments); /** *將消息發(fā)布到exchange *您必須提供交換機的名稱和路由密鑰。 然后,RabbitMQ將嘗試將消息發(fā)送到一個或多個隊列。 使用可選的flags參數(shù),可以指定如果消息無法路由到隊列時應該發(fā)生的情況。 默認情況下,不可更改的消息將被靜默地丟棄。 * *如果設置了'mandatory'或'immediate'標志, 則無法處理的消息將返回到應用程序。 在開始發(fā)布之前,請確保您已經(jīng)調(diào)用了recall()-方法, 并設置了所有適當?shù)奶幚沓绦騺硖幚磉@些返回的消息。 * *可以提供以下flags: * *-mandatory 如果設置,服務器將返回未發(fā)送到隊列的消息 *-immediate 如果設置,服務器將返回無法立即轉(zhuǎn)發(fā)給使用者的消息。 *@param exchange要發(fā)布到的交易所 *@param routingkey路由密鑰 *@param envelope要發(fā)送的完整信封 *@param message要發(fā)送的消息 *@param size消息的大小 *@param flags可選標志 */ bool publish(const std::string_view &exchange, const std::string_view &routingKey, const std::string &message, int flags = 0); /** *告訴RabbitMQ服務器已準備好使用消息-也就是 訂閱隊列消息 * *調(diào)用此方法后,RabbitMQ開始向客戶端應用程序傳遞消息。 consumer tag是一個字符串標識符, 如果您以后想通過channel::cancel()調(diào)用停止它, 可以使用它來標識使用者。 *如果您沒有指定使用者tag,服務器將為您分配一個。 * *支持以下flags: * *-nolocal 如果設置了,則不會同時消耗在此通道上發(fā)布的消息 *-noack 如果設置了,則不必對已消費的消息進行確認 *-exclusive 請求獨占訪問,只有此使用者可以訪問隊列 * *@param queue 您要使用的隊列 *@param tag 將與此消費操作關(guān)聯(lián)的消費者標記 *@param flags 其他標記 *@param arguments其他參數(shù) * *此函數(shù)返回一個延遲處理程序。 可以使用onSuccess()、onError()和onFinalize()方法安裝回調(diào) 可以安裝的onSuccess()回調(diào)應該具有以下格式: void myCallback(const std::string_view&tag); 樣例: channel.consume("myqueue").onSuccess( [](const std::string_view& tag) { std::cout << "Started consuming under tag "; std::cout << tag << std::endl; }); */ DeferredConsumer &consume(const std::string_view &queue, const std::string_view &tag, int flags, const Table &arguments); /** *確認接收到的消息 * *消費者客戶端對收到的消息進行確認應答 * *當在DeferredConsumer::onReceived()方法中接收到消息時, 必須確認該消息, 以便RabbitMQ將其從隊列中刪除(除非使用noack選項消費) * *支持以下標志: * *-多條確認多條消息:之前傳遞的所有未確認消息也會得到確認 * *@param deliveryTag 消息的唯一delivery標簽 *@param flags 可選標志 *@return bool */ bool ack(uint64_t deliveryTag, int flags=0); }; class DeferredConsumer { /* 注冊一個回調(diào)函數(shù),該函數(shù)在消費者啟動時被調(diào)用 void onSuccess(const std::string &consumertag) */ DeferredConsumer &onSuccess(const ConsumeCallback& callback); /* 注冊回調(diào)函數(shù),用于接收到一個完整消息的時候被調(diào)用 void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered) */ DeferredConsumer &onReceived(const MessageCallback& callback); /* Alias for onReceived() */ DeferredConsumer &onMessage(const MessageCallback& callback); /* 注冊要在服務器取消消費者時調(diào)用的函數(shù) void CancelCallback(const std::string &tag) */ DeferredConsumer &onCancelled(const CancelCallback& callback); }; class Message : public Envelope { const std::string &exchange(); const std::string &routingkey(); }; class Envelope : public MetaData { const char *body(); // 獲取消息正文 uint64_t bodySize(); // 獲取消息正文大小 }; }
2.ev
typedef struct ev_async { EV_WATCHER (ev_async); EV_ATOMIC_T sent; /* private */ }ev_async; //break type enum { EVBREAK_CANCEL = 0, /* undo unloop */ EVBREAK_ONE = 1, /* unloop once */ EVBREAK_ALL = 2 /* unloop all loops */ }; // 實例化并獲取IO事件監(jiān)控接口句柄 struct ev_loop *ev_default_loop (unsigned int flags EV_CPP (= 0)); # define EV_DEFAULT ev_default_loop (0) // 開始運行IO事件監(jiān)控, 這是一個阻塞接口 int ev_run (struct ev_loop *loop); /* break out of the loop */ // 結(jié)束IO監(jiān)控 // 如果在主線程進行ev_run(), 則可以直接調(diào)用, // 如果在其他線程中進行ev_run(), 需要通過異步通知進行 void ev_break (struct ev_loop *loop, int32_t break_type) ; void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents); // 初始化異步事件結(jié)構(gòu), 并設置回調(diào)函數(shù) void ev_async_init(ev_async *w, callback cb); // 啟動事件監(jiān)控循環(huán)中的異步任務處理 void ev_async_start(struct ev_loop *loop, ev_async *w); // 發(fā)送當前異步事件到異步線程中執(zhí)行 void ev_async_send(struct ev_loop *loop, ev_async *w);
5.使用
1.publish.cc
#include <ev.h> #include <amqpcpp.h> #include <amqpcpp/libev.h> #include <openssl/ssl.h> #include <openssl/opensslv.h> int main() { // 1.實例化底層網(wǎng)絡通信框架的IO事件監(jiān)控句柄 auto *loop = EV_DEFAULT; // 2.實例化libEvHandler句柄 -> 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來 AMQP::LibEvHandler handler(loop); // 3.實例化連接對象 AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/"); AMQP::TcpConnection connection(&handler, address); // 4.實例化信道對象 AMQP::TcpChannel channel(&connection); // 5.聲明交換機 channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) .onError([](const char *message) { std::cout << "聲明交換機失敗: " << message << std::endl; }) .onSuccess([]() { std::cout << "test-exchange 交換機創(chuàng)建成功" << std::endl; }); // 6.聲明隊列 channel.declareQueue("test-queue") .onError([](const char *message) { std::cout << "聲明隊列失敗: " << message << std::endl; }) .onSuccess([]() { std::cout << "test-queue 隊列創(chuàng)建成功" << std::endl; }); // 7.針對交換機和隊列進行綁定 channel.bindQueue("test-exchange", "test-queue", "test-queue-key") .onError([](const char *message) { std::cout << "test-exchange - test-queue 綁定失敗: " \ << message << std::endl; }) .onSuccess([]() { std::cout << "test-exchange - test-queue 綁定成功" << std::endl; }); // 8.向交換機發(fā)布消息 for (int i = 0; i < 5; ++i) { std::string msg = "Hello SnowK-" + std::to_string(i); if(channel.publish("test-exchange", "test-queue-key", msg) == false) { std::cout << "publish 失敗" << std::endl; } } // 9.啟動底層網(wǎng)絡通信框架 -> 開啟IO ev_run(loop, 0); return 0; }
2.consume.cc
#include <ev.h> #include <amqpcpp.h> #include <amqpcpp/libev.h> #include <openssl/ssl.h> #include <openssl/opensslv.h> void MessageCB(AMQP::TcpChannel* channel, const AMQP::Message& message, uint64_t deliveryTag, bool redelivered) { std::string msg; msg.assign(message.body(), message.bodySize()); // 不能這樣使用, AMQP::Message后面沒有存'\0' // std::cout << message << std::endl std::cout << msg << std::endl; channel->ack(deliveryTag); } int main() { // 1.實例化底層網(wǎng)絡通信框架的IO事件監(jiān)控句柄 auto *loop = EV_DEFAULT; // 2.實例化libEvHandler句柄 -> 將AMQP框架與事件監(jiān)控關(guān)聯(lián)起來 AMQP::LibEvHandler handler(loop); // 3.實例化連接對象 AMQP::Address address("amqp://root:SnowK8989@127.0.0.1:5672/"); AMQP::TcpConnection connection(&handler, address); // 4.實例化信道對象 AMQP::TcpChannel channel(&connection); // 5.聲明交換機 channel.declareExchange("test-exchange", AMQP::ExchangeType::direct) .onError([](const char *message) { std::cout << "聲明交換機失敗: " << message << std::endl; }) .onSuccess([]() { std::cout << "test-exchange 交換機創(chuàng)建成功" << std::endl; }); // 6.聲明隊列 channel.declareQueue("test-queue") .onError([](const char *message) { std::cout << "聲明隊列失敗: " << message << std::endl; }) .onSuccess([]() { std::cout << "test-queue 隊列創(chuàng)建成功" << std::endl; }); // 7.針對交換機和隊列進行綁定 channel.bindQueue("test-exchange", "test-queue", "test-queue-key") .onError([](const char *message) { std::cout << "test-exchange - test-queue 綁定失敗: " \ << message << std::endl; }) .onSuccess([]() { std::cout << "test-exchange - test-queue 綁定成功"; }); // 8.訂閱消息對壘 -> 設置消息處理回調(diào)函數(shù) auto callback = std::bind(MessageCB, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); channel.consume("test-queue", "consume-tag") .onReceived(callback) .onError([](const char *message) { std::cout << "訂閱 test-queue 隊列消息失敗: " << message << std::endl; exit(0); }); // 9.啟動底層網(wǎng)絡通信框架 -> 開啟IO ev_run(loop, 0); return 0; }
3.makefile
all: publish consume publish: publish.cc g++ -o $@ $^ -lamqpcpp -lev -std=c++17 consume: consume.cc g++ -o $@ $^ -lamqpcpp -lev -std=c++17 .PHONY:clean clean: rm publish consume
到此這篇關(guān)于C++ 第三方庫 RabbitMq詳細講解的文章就介紹到這了,更多相關(guān)C++ 第三方庫 RabbitMq內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
C++類中的常數(shù)據(jù)成員與靜態(tài)數(shù)據(jù)成員之間的區(qū)別
常數(shù)據(jù)成員是指在類中定義的不能修改其值的一些數(shù)據(jù)成員,類似于我們以前學過的常變量,雖然是變量,也有自己的地址,但是一經(jīng)賦初值,便不能再被修改2013-10-10C語言實現(xiàn)簡易通訊錄(靜態(tài)版本)的代碼分享
這篇文章主要為大家詳細介紹了如何錄音C語言實現(xiàn)一個簡易的通訊錄(靜態(tài)版本),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-10-10vscode使用cmake時將命令行參數(shù)傳遞給調(diào)試目標的方法
這篇文章主要介紹了vscode使用cmake時將命令行參數(shù)傳遞給調(diào)試目標,下面介紹了一個示例,將參數(shù)first_arg, second-arg和third arg傳遞給程序(此處需要注意,third arg中間雖然存在空格,但是仍然被視作一個參數(shù)),需要的朋友參考下吧2024-03-03