欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解C/C++如何發(fā)送與接收Kafka消息

 更新時(shí)間:2024年07月17日 09:53:20   作者:周先生FullStack  
系統(tǒng)之間通信方式很多如:系統(tǒng)之間調(diào)用(http/rpc等),異步間接調(diào)用如發(fā)送消息、公共存儲(chǔ)等,算法工程為C/C++工程,本文將介紹如何在C/C++中如何發(fā)送與接收Kakfa消息(包含:Kafka的SASL認(rèn)證方式),并提供了詳細(xì)的源碼和講解,需要的朋友可以參考下

一、背景

在實(shí)際工程中,難免會(huì)遇到不通系統(tǒng)之間通信,如何進(jìn)行系統(tǒng)之間通信呢?(作為一個(gè)“全棧工程師”,必須要解決它!)。

系統(tǒng)之間通信方式很多如:系統(tǒng)之間調(diào)用(http/rpc等),異步間接調(diào)用如發(fā)送消息、公共存儲(chǔ)等。目前,本人從事的項(xiàng)目中遇到web業(yè)務(wù)工程(Java)依賴與算法工程(C++) 處理的視頻/圖片分類與標(biāo)記結(jié)果。兩個(gè)系統(tǒng)之前數(shù)據(jù)通信采用了kafka消息方式。

算法工程為C/C++工程,本文將介紹如何在C/C++中如何發(fā)送與接收Kakfa消息(包含:Kafka的SASL認(rèn)證方式),并提供了詳細(xì)的源碼和講解。(至于Java中如何發(fā)送與接收Kakfa消息如有需要,可留言或私聊?。?/p>

二、環(huán)境依賴安裝

# 下載librdkafka
git clone https://github.com/edenhill/librdkafka.git
 
# 編譯
cd librdkafka
./configure --prefix=/usr/local
 
# 安裝
sudo make install
 
# 驗(yàn)證:查看/usr/local/lib目錄下是否有l(wèi)ibrdkafka文件
ls /usr/local/lib | grep kafka

三、編寫kakfa生產(chǎn)者消費(fèi)者

3.1 生產(chǎn)者

#include <rdkafka.h>     // 包含C API頭文件
#include <iostream>
#include <cstring>
#include <cerrno>
  
int main() {
    const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址
    const char *topic_name = "kafka_msg_topic_test";
    const char *payload = "Hello, Kafka from librdkafka!";
    size_t len = strlen(payload);
  
    // 創(chuàng)建配置對(duì)象
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    if (!conf) {
        std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }
  
    // 設(shè)置broker地址
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) {
        std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
  
    // 創(chuàng)建生產(chǎn)者實(shí)例
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
    if (!rk) {
        std::cerr << "Failed to create producer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
  
    // 創(chuàng)建topic句柄(可選,但推薦)
    rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL);
    if (!rkt) {
        std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_destroy(rk);
//        rd_kafka_conf_destroy(conf);
        return 1;
    }
  
    // 發(fā)送消息
    int32_t partition = RD_KAFKA_PARTITION_UA; // 自動(dòng)選擇分區(qū)
    int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL);
    if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
        std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl;
    } else {
        std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl;
    }
  
    // 等待所有消息發(fā)送完成(可選,但推薦)
    // 在實(shí)際生產(chǎn)代碼中,您可能需要更復(fù)雜的邏輯來(lái)處理消息的發(fā)送和確認(rèn)
    int msgs_sent = 0;
    while (rd_kafka_outq_len(rk) > 0) {
        rd_kafka_poll(rk, 100); // 輪詢Kafka隊(duì)列,直到所有消息都發(fā)送出去
        msgs_sent += rd_kafka_outq_len(rk);
    }
  
    // 銷毀topic句柄
    rd_kafka_topic_destroy(rkt);
  
    // 銷毀生產(chǎn)者實(shí)例
    rd_kafka_destroy(rk);
  
    // 銷毀配置對(duì)象
//    rd_kafka_conf_destroy(conf);
  
    return 0;
}

3.2 消費(fèi)者

#include <rdkafka.h>
#include <iostream>
#include <cerrno>
#include <cstring>
#include <cstdlib>
  
void error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque) {
    // 錯(cuò)誤處理回調(diào)
    std::cerr << "Kafka error: " << err << ": " << reason << std::endl;
}
  
int main() {
    std::cerr << "start " << std::endl;
    const char *brokers = "xx.xx.xx.xx:7091"; // Kafka broker地址
    const char *group_id = "kafka_msg_topic_test"; // 消費(fèi)者組ID
    const char *topic_name = "kafka_msg_topic_test"; // Kafka topic名稱
    
    // 創(chuàng)建配置對(duì)象
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    if (!conf) {
        std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }
    
    // 設(shè)置broker地址
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, NULL, 0) != RD_KAFKA_CONF_OK) {
        std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    
    // 設(shè)置消費(fèi)者組ID
    if (rd_kafka_conf_set(conf, "group.id", group_id, NULL, 0) != RD_KAFKA_CONF_OK) {
        std::cerr << "Failed to set group.id: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    
    // 設(shè)置錯(cuò)誤處理回調(diào)(可選)
    rd_kafka_conf_set_error_cb(conf, error_cb);
    
    // 創(chuàng)建消費(fèi)者實(shí)例
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, NULL, 0);
    if (!rk) {
        std::cerr << "Failed to create consumer: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }
    
    
    // 創(chuàng)建一個(gè)topic分區(qū)列表
    rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
    if (!topics) {
        std::cerr << "Failed to create topic partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_destroy(rk);
        return 1;
    }
    
    // 添加topic到分區(qū)列表
    if (!rd_kafka_topic_partition_list_add(topics, topic_name, RD_KAFKA_PARTITION_UA)) {
        std::cerr << "Failed to add topic to partition list: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_topic_partition_list_destroy(topics);
        rd_kafka_destroy(rk);
        return 1;
    }
    // 訂閱topic
    rd_kafka_resp_err_t err = rd_kafka_subscribe(rk, topics);
    if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
        std::cerr << "Failed to subscribe to topic: " << rd_kafka_err2str(err) << std::endl;
        rd_kafka_topic_partition_list_destroy(topics);
        rd_kafka_destroy(rk);
        return 1;
    }
    
    // 銷毀分區(qū)列表(訂閱后不再需要)
    rd_kafka_topic_partition_list_destroy(topics);
    
    
    // 輪詢消息
    while (true) {
        rd_kafka_message_t *rkmessage;
        rkmessage = rd_kafka_consumer_poll(rk, 1000); // 等待1秒以獲取消息
        
        if (rkmessage == NULL) {
            // 沒有消息或者超時(shí)
            continue;
        }
        
        if (rkmessage->err) {
            // 處理錯(cuò)誤
            if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                // 消息流的末尾
                std::cout << "End of partition event" << std::endl;
            } else {
                // 打印錯(cuò)誤并退出
                std::cerr << "Kafka consumer error: " << rd_kafka_message_errstr(rkmessage) << std::endl;
                break;
            }
        } else {
            // 處理消息
            std::cout << "Received message at offset " << rkmessage->offset
            << " from partition " << rkmessage->partition
            << " with key \"" << rkmessage->key << "\" and payload size "<< rkmessage->len
            << " value :" <<(char *)rkmessage->payload
            << std::endl;
            
            // 如果需要,可以在這里處理消息內(nèi)容
            // 例如,使用rkmessage->payload()獲取消息內(nèi)容
            
            // 釋放消息
            rd_kafka_message_destroy(rkmessage);
        }
    }
    
    // 清理
    rd_kafka_destroy(rk);
    
    return 0;
}

3.3 編譯運(yùn)行

3.3.1 編譯生產(chǎn)者消費(fèi)者

g++ -o send_kafka SendKakfaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread
g++ -o receive_kafka ReceiveKafkaMessage.cpp -I/usr/local/include/librdkafka -lrdkafka++ -lrdkafka -lpthread

3.3.2 運(yùn)行驗(yàn)證

執(zhí)行時(shí),若出現(xiàn)錯(cuò)誤: error while loading shared libraries: librdkafka++.so.1: cannot open shared object file: No such file or directory

則需要執(zhí)行下面環(huán)境變量配置:

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

生產(chǎn)者:發(fā)送消息

消費(fèi)者:接收消息

3.4 SASL認(rèn)證kakfa

下面是,支持sasl認(rèn)證的kakka生產(chǎn)者完整代碼

#include <rdkafka.h>
#include <iostream>
#include <cstring>
#include <cerrno>
 
int main(int argc, char *argv[]) {
    const char *brokers = "xx.xx.xx.xx:8092"; // Kafka broker地址
    const char *username = "xxx";
    const char *password = "xxx";
    const char *topic_name = "kafka_msg_test_sasl";
    const char *payload = "Hello, Kafka from librdkafka! sasl";
    size_t len = strlen(payload);
     // 初始化配置
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    if (!conf)
    {
        std::cerr << "Failed to create configuration object: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        return 1;
    }
    char errstr[512]; // 聲明一個(gè)足夠大的字符數(shù)組來(lái)存儲(chǔ)錯(cuò)誤信息
 
    // 設(shè)置SASL相關(guān)的配置
    if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set security.protocol: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set sasl.mechanisms: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    if (rd_kafka_conf_set(conf, "sasl.username", username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set sasl.username: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    if (rd_kafka_conf_set(conf, "sasl.password", password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set sasl.password: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
 
    if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
    {
        std::cerr << "Failed to set bootstrap.servers: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_conf_destroy(conf);
        return 1;
    }
    // 檢查配置是否設(shè)置成功
    if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
        std::cerr << "Failed to set configuration: " << errstr << std::endl;
        return 1;
    }
 
    // 創(chuàng)建producer實(shí)例
    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!rk) {
        std::cerr << "Failed to create new producer: " << errstr << std::endl;
        return 1;
    }
 
    // 創(chuàng)建topic句柄(可選,但推薦)
    rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, topic_name, NULL);
    if (!rkt)
    {
        std::cerr << "Failed to create topic handle: " << rd_kafka_err2str(rd_kafka_last_error()) << std::endl;
        rd_kafka_destroy(rk);
        //        rd_kafka_conf_destroy(conf);
        return 1;
    }
 
    // 發(fā)送消息
    int32_t partition = RD_KAFKA_PARTITION_UA; // 自動(dòng)選擇分區(qū)
    int err = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, const_cast<char *>(payload), len, NULL, 0, NULL);
    if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
    {
        std::cerr << "Failed to produce to topic " << topic_name << ": " << err << std::endl;
    }
    else
    {
        std::cout << "Produced " << len << " bytes to topic " << topic_name << std::endl;
    }
 
    // 等待所有消息發(fā)送完成(可選,但推薦)
    // 在實(shí)際生產(chǎn)代碼中,您可能需要更復(fù)雜的邏輯來(lái)處理消息的發(fā)送和確認(rèn)
    int msgs_sent = 0;
    while (rd_kafka_outq_len(rk) > 0)
    {
        rd_kafka_poll(rk, 100); // 輪詢Kafka隊(duì)列,直到所有消息都發(fā)送出去
        msgs_sent += rd_kafka_outq_len(rk);
    }
 
    // 銷毀topic句柄
    rd_kafka_topic_destroy(rkt);
 
    // 清理資源
    rd_kafka_destroy(rk);
    return 0;
}

在kafka map 管理界面中查看發(fā)送效果如下:

3.5 結(jié)束語(yǔ)

到此這篇關(guān)于詳解C/C++如何發(fā)送與接收Kafka消息的文章就介紹到這了,更多相關(guān)C/C++發(fā)送與接收Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • C語(yǔ)言入門篇--定義宏#define的概述

    C語(yǔ)言入門篇--定義宏#define的概述

    本篇文章是C語(yǔ)言系列基礎(chǔ)篇,適合c語(yǔ)言剛?cè)腴T的朋友,本文對(duì)關(guān)于c語(yǔ)言的定義宏#define作了簡(jiǎn)要的概述,希望可以幫助大家快速入門c語(yǔ)言的世界,更好的理解c語(yǔ)言
    2021-08-08
  • 基于C語(yǔ)言實(shí)現(xiàn)計(jì)算生辰八字五行的示例詳解

    基于C語(yǔ)言實(shí)現(xiàn)計(jì)算生辰八字五行的示例詳解

    生辰八字,簡(jiǎn)稱八字,是指一個(gè)人出生時(shí)的干支歷日期;年月日時(shí)共四柱干支,每柱兩字,合共八個(gè)字。這篇文章主要介紹了C語(yǔ)言實(shí)現(xiàn)計(jì)算生辰八字五行的示例代碼,需要的可以參考一下
    2023-03-03
  • C語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單推箱子游戲

    C語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單推箱子游戲

    這篇文章主要為大家詳細(xì)介紹了C語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單推箱子游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2020-02-02
  • 淺談Qt實(shí)現(xiàn)HTTP的Get/Post請(qǐng)求

    淺談Qt實(shí)現(xiàn)HTTP的Get/Post請(qǐng)求

    本文主要介紹了淺談Qt實(shí)現(xiàn)HTTP的Get/Post請(qǐng)求,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-05-05
  • C語(yǔ)言直接插入排序算法

    C語(yǔ)言直接插入排序算法

    大家好,本篇文章主要講的是C語(yǔ)言直接插入排序算法,感興趣的同學(xué)趕快來(lái)看一看吧,對(duì)你有幫助的話記得收藏一下,方便下次瀏覽
    2022-01-01
  • C++使用函數(shù)的一些高級(jí)操作指南

    C++使用函數(shù)的一些高級(jí)操作指南

    C++中函數(shù)調(diào)用的方法與C語(yǔ)言并無(wú)區(qū)別,依舊是在調(diào)用方函數(shù)中執(zhí)行函數(shù)調(diào)用語(yǔ)句來(lái)實(shí)現(xiàn)函數(shù)調(diào)用,下面這篇文章主要給大家介紹了關(guān)于C++使用函數(shù)的一些高級(jí)操作,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2022-12-12
  • Qt5.9實(shí)現(xiàn)簡(jiǎn)單的多線程實(shí)例(類QThread)

    Qt5.9實(shí)現(xiàn)簡(jiǎn)單的多線程實(shí)例(類QThread)

    Qt開啟多線程,主要用到類QThread。用一個(gè)類繼承QThread,然后重新改寫虛函數(shù)run()。具有一定的參考價(jià)值,感興趣的可以了解一下
    2021-09-09
  • 詳解C++?STL模擬實(shí)現(xiàn)list

    詳解C++?STL模擬實(shí)現(xiàn)list

    這篇文章主要為大家詳細(xì)介紹了C++如何模擬實(shí)現(xiàn)STL容器list,文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)C++有一定幫助,需要的可以參考一下
    2023-01-01
  • C中實(shí)現(xiàn)矩陣乘法的一種高效的方法

    C中實(shí)現(xiàn)矩陣乘法的一種高效的方法

    本篇文章介紹了,在C中實(shí)現(xiàn)矩陣乘法的一種高效的方法。需要的朋友參考下
    2013-05-05
  • C++中的Reactor原理與實(shí)現(xiàn)

    C++中的Reactor原理與實(shí)現(xiàn)

    reactor設(shè)計(jì)模式是event-driven?architecture的一種實(shí)現(xiàn)方式,處理多個(gè)客戶端并發(fā)的向服務(wù)端請(qǐng)求服務(wù)的場(chǎng)景,每種服務(wù)在服務(wù)端可能由多個(gè)方法組成,這篇文章主要介紹了Reactor原理與實(shí)現(xiàn),需要的朋友可以參考下
    2022-07-07

最新評(píng)論