詳解C/C++如何發(fā)送與接收Kafka消息
一、背景
在實際工程中,難免會遇到不通系統(tǒng)之間通信,如何進行系統(tǒng)之間通信呢?(作為一個“全棧工程師”,必須要解決它!)。
系統(tǒng)之間通信方式很多如:系統(tǒng)之間調(diào)用(http/rpc等),異步間接調(diào)用如發(fā)送消息、公共存儲等。目前,本人從事的項目中遇到web業(yè)務工程(Java)依賴與算法工程(C++) 處理的視頻/圖片分類與標記結(jié)果。兩個系統(tǒng)之前數(shù)據(jù)通信采用了kafka消息方式。
算法工程為C/C++工程,本文將介紹如何在C/C++中如何發(fā)送與接收Kakfa消息(包含:Kafka的SASL認證方式),并提供了詳細的源碼和講解。(至于Java中如何發(fā)送與接收Kakfa消息如有需要,可留言或私聊?。?/p>
二、環(huán)境依賴安裝
# 下載librdkafka git clone https://github.com/edenhill/librdkafka.git # 編譯 cd librdkafka ./configure --prefix=/usr/local # 安裝 sudo make install # 驗證:查看/usr/local/lib目錄下是否有l(wèi)ibrdkafka文件 ls /usr/local/lib | grep kafka
三、編寫kakfa生產(chǎn)者消費者
3.1 生產(chǎn)者
#include <rdkafka.h> // 包含C API頭文件
#include <iostream>
#include <cstring>
#include <cerrno>
int main() {
const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址
const char *topic_name = "kafka_msg_topic_test";
const char *payload = "Hello, Kafka from librdkafka!";
size_t len = strlen(payload);
// 創(chuàng)建配置對象
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (!conf) {
std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
// 設置broker地址
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) {
std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 創(chuàng)建生產(chǎn)者實例
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
if (!rk) {
std::cerr << "Failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 創(chuàng)建topic句柄(可選,但推薦)
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL);
if (!rkt) {
std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_destroy(rk);
// rd_kafka_conf_destroy(conf);
return 1;
}
// 發(fā)送消息
int32_t partition = RD_KAFKA_PARTITION_UA; // 自動選擇分區(qū)
int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl;
} else {
std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl;
}
// 等待所有消息發(fā)送完成(可選,但推薦)
// 在實際生產(chǎn)代碼中,您可能需要更復雜的邏輯來處理消息的發(fā)送和確認
int msgs_sent = 0;
while (rd_kafka_outq_len(rk) > 0) {
rd_kafka_poll(rk, 100); // 輪詢Kafka隊列,直到所有消息都發(fā)送出去
msgs_sent += rd_kafka_outq_len(rk);
}
// 銷毀topic句柄
rd_kafka_topic_destroy(rkt);
// 銷毀生產(chǎn)者實例
rd_kafka_destroy(rk);
// 銷毀配置對象
// rd_kafka_conf_destroy(conf);
return 0;
}3.2 消費者
#include <rdkafka.h>
#include <iostream>
#include <cerrno>
#include <cstring>
#include <cstdlib>
void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
// 錯誤處理回調(diào)
std::cerr << "Kafka error: " << err << ": " << reason << std::endl;
}
int main() {
std::cerr << "start " << std::endl;
const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址
const char *group_id = "kafka_msg_topic_test"; // 消費者組ID
const char *topic_name = "kafka_msg_topic_test"; // Kafka topic名稱
// 創(chuàng)建配置對象
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (!conf) {
std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
// 設置broker地址
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) {
std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 設置消費者組ID
if (rd_kafka_conf_set(conf, "group.id", group_id, NULL, 0) != RD_KAFKA_CONF_OK) {
std::cerr << "Failed to set group.id: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 設置錯誤處理回調(diào)(可選)
rd_kafka_conf_set_error_cb(conf, error_cb);
// 創(chuàng)建消費者實例
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
if (!rk) {
std::cerr << "Failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
// 創(chuàng)建一個topic分區(qū)列表
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
if (!topics) {
std::cerr << "Failed to create topic partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_destroy(rk);
return 1;
}
// 添加topic到分區(qū)列表
if (!rd_kafka_topic_partition_list_add(topics, topic_name, RD_KAFKA_PARTITION_UA)) {
std::cerr << "Failed to add topic to partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(rk);
return 1;
}
// 訂閱topic
rd_kafka_resp_err_t err = rd_kafka_subscribe(rk, topics);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
std::cerr << "Failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl;
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(rk);
return 1;
}
// 銷毀分區(qū)列表(訂閱后不再需要)
rd_kafka_topic_partition_list_destroy(topics);
// 輪詢消息
while (true) {
rd_kafka_message_t *rkmessage;
rkmessage = rd_kafka_consumer_poll(rk, 1000); // 等待1秒以獲取消息
if (rkmessage == NULL) {
// 沒有消息或者超時
continue;
}
if (rkmessage->err) {
// 處理錯誤
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
// 消息流的末尾
std::cout << "End of partition event" << std::endl;
} else {
// 打印錯誤并退出
std::cerr << "Kafka consumer error: " << rd_kafka_message_errstr(rkmessage) << std::endl;
break;
}
} else {
// 處理消息
std::cout << "Received message at offset " << rkmessage->offset
<< " from partition " << rkmessage->partition
<< " with key \"" << rkmessage->key << "\" and payload size "<< rkmessage->len
<< " value :" <<(char *)rkmessage->payload
<< std::endl;
// 如果需要,可以在這里處理消息內(nèi)容
// 例如,使用rkmessage->payload()獲取消息內(nèi)容
// 釋放消息
rd_kafka_message_destroy(rkmessage);
}
}
// 清理
rd_kafka_destroy(rk);
return 0;
}3.3 編譯運行
3.3.1 編譯生產(chǎn)者消費者
g++ -o send_kafka SendKakfaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
g++ -o receive_kafka ReceiveKafkaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
3.3.2 運行驗證
執(zhí)行時,若出現(xiàn)錯誤: error while loading shared libraries: librdkafka++.so.1: cannot open shared object file: No such file or directory
則需要執(zhí)行下面環(huán)境變量配置:
export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
生產(chǎn)者:發(fā)送消息

消費者:接收消息

3.4 SASL認證kakfa
下面是,支持sasl認證的kakka生產(chǎn)者完整代碼
#include <rdkafka.h>
#include <iostream>
#include <cstring>
#include <cerrno>
int main(int argc, char *argv[]) {
const char *brokers = "xx.xx.xx.xx:8092"; // Kafka broker地址
const char *username = "xxx";
const char *password = "xxx";
const char *topic_name = "kafka_msg_test_sasl";
const char *payload = "Hello, Kafka from librdkafka! sasl";
size_t len = strlen(payload);
// 初始化配置
rd_kafka_conf_t *conf = rd_kafka_conf_new();
if (!conf)
{
std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
return 1;
}
char errstr[512]; // 聲明一個足夠大的字符數(shù)組來存儲錯誤信息
// 設置SASL相關(guān)的配置
if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
std::cerr << "Failed to set security.protocol: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
std::cerr << "Failed to set sasl.mechanisms: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
std::cerr << "Failed to set sasl.username: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
std::cerr << "Failed to set sasl.password: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_conf_destroy(conf);
return 1;
}
// 檢查配置是否設置成功
if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
std::cerr << "Failed to set configuration: " << errstr << std::endl;
return 1;
}
// 創(chuàng)建producer實例
rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk) {
std::cerr << "Failed to create new producer: " << errstr << std::endl;
return 1;
}
// 創(chuàng)建topic句柄(可選,但推薦)
rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL);
if (!rkt)
{
std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
rd_kafka_destroy(rk);
// rd_kafka_conf_destroy(conf);
return 1;
}
// 發(fā)送消息
int32_t partition = RD_KAFKA_PARTITION_UA; // 自動選擇分區(qū)
int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl;
}
else
{
std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl;
}
// 等待所有消息發(fā)送完成(可選,但推薦)
// 在實際生產(chǎn)代碼中,您可能需要更復雜的邏輯來處理消息的發(fā)送和確認
int msgs_sent = 0;
while (rd_kafka_outq_len(rk) > 0)
{
rd_kafka_poll(rk, 100); // 輪詢Kafka隊列,直到所有消息都發(fā)送出去
msgs_sent += rd_kafka_outq_len(rk);
}
// 銷毀topic句柄
rd_kafka_topic_destroy(rkt);
// 清理資源
rd_kafka_destroy(rk);
return 0;
}在kafka map 管理界面中查看發(fā)送效果如下:

3.5 結(jié)束語
到此這篇關(guān)于詳解C/C++如何發(fā)送與接收Kafka消息的文章就介紹到這了,更多相關(guān)C/C++發(fā)送與接收Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Qt5.9實現(xiàn)簡單的多線程實例(類QThread)
Qt開啟多線程,主要用到類QThread。用一個類繼承QThread,然后重新改寫虛函數(shù)run()。具有一定的參考價值,感興趣的可以了解一下2021-09-09

