詳解C/C++如何發(fā)送與接收Kafka消息
一、背景
在實(shí)際工程中,難免會(huì)遇到不通系統(tǒng)之間通信,如何進(jìn)行系統(tǒng)之間通信呢?(作為一個(gè)“全棧工程師”,必須要解決它!)。
系統(tǒng)之間通信方式很多如:系統(tǒng)之間調(diào)用(http/rpc等),異步間接調(diào)用如發(fā)送消息、公共存儲(chǔ)等。目前,本人從事的項(xiàng)目中遇到web業(yè)務(wù)工程(Java)依賴與算法工程(C++) 處理的視頻/圖片分類與標(biāo)記結(jié)果。兩個(gè)系統(tǒng)之前數(shù)據(jù)通信采用了kafka消息方式。
算法工程為C/C++工程,本文將介紹如何在C/C++中如何發(fā)送與接收Kakfa消息(包含:Kafka的SASL認(rèn)證方式),并提供了詳細(xì)的源碼和講解。(至于Java中如何發(fā)送與接收Kakfa消息如有需要,可留言或私聊?。?/p>
二、環(huán)境依賴安裝
# 下載librdkafka git clone https://github.com/edenhill/librdkafka.git # 編譯 cd librdkafka ./configure --prefix=/usr/local # 安裝 sudo make install # 驗(yàn)證:查看/usr/local/lib目錄下是否有l(wèi)ibrdkafka文件 ls /usr/local/lib | grep kafka
三、編寫kakfa生產(chǎn)者消費(fèi)者
3.1 生產(chǎn)者
#include <rdkafka.h> // 包含C API頭文件 #include <iostream> #include <cstring> #include <cerrno> int main() { const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址 const char *topic_name = "kafka_msg_topic_test"; const char *payload = "Hello, Kafka from librdkafka!"; size_t len = strlen(payload); // 創(chuàng)建配置對(duì)象 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 設(shè)置broker地址 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 創(chuàng)建生產(chǎn)者實(shí)例 rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0); if (!rk) { std::cerr << "Failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 創(chuàng)建topic句柄(可選,但推薦) rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL); if (!rkt) { std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); // rd_kafka_conf_destroy(conf); return 1; } // 發(fā)送消息 int32_t partition = RD_KAFKA_PARTITION_UA; // 自動(dòng)選擇分區(qū) int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl; } else { std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl; } // 等待所有消息發(fā)送完成(可選,但推薦) // 在實(shí)際生產(chǎn)代碼中,您可能需要更復(fù)雜的邏輯來(lái)處理消息的發(fā)送和確認(rèn) int msgs_sent = 0; while (rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 100); // 輪詢Kafka隊(duì)列,直到所有消息都發(fā)送出去 msgs_sent += rd_kafka_outq_len(rk); } // 銷毀topic句柄 rd_kafka_topic_destroy(rkt); // 銷毀生產(chǎn)者實(shí)例 rd_kafka_destroy(rk); // 銷毀配置對(duì)象 // rd_kafka_conf_destroy(conf); return 0; }
3.2 消費(fèi)者
#include <rdkafka.h> #include <iostream> #include <cerrno> #include <cstring> #include <cstdlib> void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) { // 錯(cuò)誤處理回調(diào) std::cerr << "Kafka error: " << err << ": " << reason << std::endl; } int main() { std::cerr << "start " << std::endl; const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址 const char *group_id = "kafka_msg_topic_test"; // 消費(fèi)者組ID const char *topic_name = "kafka_msg_topic_test"; // Kafka topic名稱 // 創(chuàng)建配置對(duì)象 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 設(shè)置broker地址 if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 設(shè)置消費(fèi)者組ID if (rd_kafka_conf_set(conf, "group.id", group_id, NULL, 0) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set group.id: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 設(shè)置錯(cuò)誤處理回調(diào)(可選) rd_kafka_conf_set_error_cb(conf, error_cb); // 創(chuàng)建消費(fèi)者實(shí)例 rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0); if (!rk) { std::cerr << "Failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } // 創(chuàng)建一個(gè)topic分區(qū)列表 rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1); if (!topics) { std::cerr << "Failed to create topic partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); return 1; } // 添加topic到分區(qū)列表 if (!rd_kafka_topic_partition_list_add(topics, topic_name, RD_KAFKA_PARTITION_UA)) { std::cerr << "Failed to add topic to partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_topic_partition_list_destroy(topics); rd_kafka_destroy(rk); return 1; } // 訂閱topic rd_kafka_resp_err_t err = rd_kafka_subscribe(rk, topics); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { std::cerr << "Failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl; rd_kafka_topic_partition_list_destroy(topics); rd_kafka_destroy(rk); return 1; } // 銷毀分區(qū)列表(訂閱后不再需要) rd_kafka_topic_partition_list_destroy(topics); // 輪詢消息 while (true) { rd_kafka_message_t *rkmessage; rkmessage = rd_kafka_consumer_poll(rk, 1000); // 等待1秒以獲取消息 if (rkmessage == NULL) { // 沒有消息或者超時(shí) continue; } if (rkmessage->err) { // 處理錯(cuò)誤 if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { // 消息流的末尾 std::cout << "End of partition event" << std::endl; } else { // 打印錯(cuò)誤并退出 std::cerr << "Kafka consumer error: " << rd_kafka_message_errstr(rkmessage) << std::endl; break; } } else { // 處理消息 std::cout << "Received message at offset " << rkmessage->offset << " from partition " << rkmessage->partition << " with key \"" << rkmessage->key << "\" and payload size "<< rkmessage->len << " value :" <<(char *)rkmessage->payload << std::endl; // 如果需要,可以在這里處理消息內(nèi)容 // 例如,使用rkmessage->payload()獲取消息內(nèi)容 // 釋放消息 rd_kafka_message_destroy(rkmessage); } } // 清理 rd_kafka_destroy(rk); return 0; }
3.3 編譯運(yùn)行
3.3.1 編譯生產(chǎn)者消費(fèi)者
g++ -o send_kafka SendKakfaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
g++ -o receive_kafka ReceiveKafkaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
3.3.2 運(yùn)行驗(yàn)證
執(zhí)行時(shí),若出現(xiàn)錯(cuò)誤: error while loading shared libraries: librdkafka++.so.1: cannot open shared object file: No such file or directory
則需要執(zhí)行下面環(huán)境變量配置:
export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
生產(chǎn)者:發(fā)送消息
消費(fèi)者:接收消息
3.4 SASL認(rèn)證kakfa
下面是,支持sasl認(rèn)證的kakka生產(chǎn)者完整代碼
#include <rdkafka.h> #include <iostream> #include <cstring> #include <cerrno> int main(int argc, char *argv[]) { const char *brokers = "xx.xx.xx.xx:8092"; // Kafka broker地址 const char *username = "xxx"; const char *password = "xxx"; const char *topic_name = "kafka_msg_test_sasl"; const char *payload = "Hello, Kafka from librdkafka! sasl"; size_t len = strlen(payload); // 初始化配置 rd_kafka_conf_t *conf = rd_kafka_conf_new(); if (!conf) { std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; return 1; } char errstr[512]; // 聲明一個(gè)足夠大的字符數(shù)組來(lái)存儲(chǔ)錯(cuò)誤信息 // 設(shè)置SASL相關(guān)的配置 if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set security.protocol: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set sasl.mechanisms: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set sasl.username: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set sasl.password: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_conf_destroy(conf); return 1; } // 檢查配置是否設(shè)置成功 if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { std::cerr << "Failed to set configuration: " << errstr << std::endl; return 1; } // 創(chuàng)建producer實(shí)例 rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)); if (!rk) { std::cerr << "Failed to create new producer: " << errstr << std::endl; return 1; } // 創(chuàng)建topic句柄(可選,但推薦) rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL); if (!rkt) { std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl; rd_kafka_destroy(rk); // rd_kafka_conf_destroy(conf); return 1; } // 發(fā)送消息 int32_t partition = RD_KAFKA_PARTITION_UA; // 自動(dòng)選擇分區(qū) int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL); if (err != RD_KAFKA_RESP_ERR_NO_ERROR) { std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl; } else { std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl; } // 等待所有消息發(fā)送完成(可選,但推薦) // 在實(shí)際生產(chǎn)代碼中,您可能需要更復(fù)雜的邏輯來(lái)處理消息的發(fā)送和確認(rèn) int msgs_sent = 0; while (rd_kafka_outq_len(rk) > 0) { rd_kafka_poll(rk, 100); // 輪詢Kafka隊(duì)列,直到所有消息都發(fā)送出去 msgs_sent += rd_kafka_outq_len(rk); } // 銷毀topic句柄 rd_kafka_topic_destroy(rkt); // 清理資源 rd_kafka_destroy(rk); return 0; }
在kafka map 管理界面中查看發(fā)送效果如下:
3.5 結(jié)束語(yǔ)
到此這篇關(guān)于詳解C/C++如何發(fā)送與接收Kafka消息的文章就介紹到這了,更多相關(guān)C/C++發(fā)送與接收Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
基于C語(yǔ)言實(shí)現(xiàn)計(jì)算生辰八字五行的示例詳解
生辰八字,簡(jiǎn)稱八字,是指一個(gè)人出生時(shí)的干支歷日期;年月日時(shí)共四柱干支,每柱兩字,合共八個(gè)字。這篇文章主要介紹了C語(yǔ)言實(shí)現(xiàn)計(jì)算生辰八字五行的示例代碼,需要的可以參考一下2023-03-03C語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單推箱子游戲
這篇文章主要為大家詳細(xì)介紹了C語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單推箱子游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-02-02淺談Qt實(shí)現(xiàn)HTTP的Get/Post請(qǐng)求
本文主要介紹了淺談Qt實(shí)現(xiàn)HTTP的Get/Post請(qǐng)求,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05Qt5.9實(shí)現(xiàn)簡(jiǎn)單的多線程實(shí)例(類QThread)
Qt開啟多線程,主要用到類QThread。用一個(gè)類繼承QThread,然后重新改寫虛函數(shù)run()。具有一定的參考價(jià)值,感興趣的可以了解一下2021-09-09