Go語言實(shí)現(xiàn)Kafka消息隊(duì)列的示例代碼
一、Go語言快速實(shí)現(xiàn)Kafka消息隊(duì)列
安裝依賴庫
go get github.com/IBM/sarama
生產(chǎn)者代碼示例
package main import ( "log" "github.com/IBM/sarama" ) func main() { // 1. 創(chuàng)建生產(chǎn)者配置 config := sarama.NewConfig() config.Producer.Return.Successes = true // 2. 連接Kafka集群 producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { log.Fatal("連接失敗:", err) } defer producer.Close() // 3. 發(fā)送消息 msg := &sarama.ProducerMessage{ Topic: "test-topic", Value: sarama.StringEncoder("Hello Kafka!"), } partition, offset, err := producer.SendMessage(msg) if err != nil { log.Fatal("發(fā)送失敗:", err) } log.Printf("發(fā)送成功! 分區(qū):%d 偏移量:%d", partition, offset) }
消費(fèi)者代碼示例
package main import ( "log" "github.com/IBM/sarama" ) func main() { // 1. 創(chuàng)建消費(fèi)者 consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil) if err != nil { log.Fatal("連接失敗:", err) } defer consumer.Close() // 2. 訂閱指定分區(qū)(這里消費(fèi)分區(qū)0) partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetNewest) if err != nil { log.Fatal("訂閱失敗:", err) } defer partitionConsumer.Close() // 3. 持續(xù)消費(fèi)消息 for msg := range partitionConsumer.Messages() { log.Printf("收到消息: 分區(qū):%d 偏移量:%d 內(nèi)容:%s", msg.Partition, msg.Offset, string(msg.Value)) } }
二、Kafka與其他消息隊(duì)列對比
對比維度表
特性 | Kafka | RabbitMQ | Redis Streams | RocketMQ |
---|---|---|---|---|
設(shè)計(jì)定位 | 高吞吐日志處理 | 企業(yè)級(jí)消息路由 | 輕量級(jí)實(shí)時(shí)消息 | 金融級(jí)可靠消息 |
吞吐量 | 百萬級(jí)/秒 | 萬級(jí)/秒 | 十萬級(jí)/秒 | 十萬級(jí)/秒 |
消息持久化 | 磁盤持久化(默認(rèn)7天) | 內(nèi)存/磁盤可選 | 內(nèi)存為主 | 磁盤持久化 |
消息模式 | 發(fā)布-訂閱 | 點(diǎn)對點(diǎn)/發(fā)布訂閱 | 發(fā)布訂閱 | 發(fā)布訂閱 |
協(xié)議支持 | 自定義協(xié)議 | AMQP協(xié)議 | RESP協(xié)議 | 自定義協(xié)議 |
順序保證 | 分區(qū)內(nèi)有序 | 隊(duì)列有序 | 全局無序 | 隊(duì)列有序 |
延遲消息 | 不支持 | 支持 | 支持 | 支持 |
典型場景 | 日志收集、流處理 | 訂單系統(tǒng)、任務(wù)隊(duì)列 | 實(shí)時(shí)通知、排行榜更新 | 交易系統(tǒng)、金融場景 |
Go生態(tài)支持 | Sarama庫完善 | amqp庫穩(wěn)定 | go-redis庫支持 | 官方SDK支持 |
三、核心差異說明
吞吐量對比
- Kafka:采用順序磁盤寫入 + 零拷貝技術(shù),適合大數(shù)據(jù)量場景(如:單集群可達(dá)百萬TPS)
- RabbitMQ:基于內(nèi)存的Erlang實(shí)現(xiàn),適合中小規(guī)模業(yè)務(wù)(如:電商訂單處理)
消息存儲(chǔ)機(jī)制
// Kafka的持久化示例(自動(dòng)創(chuàng)建日志文件) kafka-topics.sh --create \ --topic user-log \ --partitions 3 \ --replication-factor 2 \ --config retention.ms=604800000 // 保留7天
- Kafka:消息持久化到磁盤,支持TB級(jí)數(shù)據(jù)存儲(chǔ)
- Redis:數(shù)據(jù)主要在內(nèi)存,重啟可能丟失(需配置持久化)
消息消費(fèi)模式
- Kafka:消費(fèi)者主動(dòng)拉?。≒ull)
- RabbitMQ:服務(wù)端推送(Push)
使用復(fù)雜度
// 對比初始化復(fù)雜度(以生產(chǎn)者為例) // Kafka需要配置更多參數(shù) config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.Partitioner = sarama.NewHashPartitioner // RabbitMQ連接更簡單 conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
四、選型建議
選擇Kafka當(dāng)
- 需要處理日志流、點(diǎn)擊流等大數(shù)據(jù)量場景
- 要求消息持久化存儲(chǔ)和回溯能力
- 需要橫向擴(kuò)展的分布式系統(tǒng)
選擇其他隊(duì)列當(dāng)
- RabbitMQ:需要復(fù)雜路由(如:延時(shí)隊(duì)列、死信隊(duì)列)
- Redis Streams:需要極低延遲的實(shí)時(shí)通知
- RocketMQ:金融場景要求強(qiáng)一致性
五、常見問題
如何保證Kafka消息順序?
- 單分區(qū)內(nèi)保證順序
- 發(fā)送時(shí)指定分區(qū)鍵(如用戶ID)
msg := &sarama.ProducerMessage{ Topic: "order-events", Key: sarama.StringEncoder(userID), // 相同Key進(jìn)入同一分區(qū) Value: sarama.StringEncoder("下單成功"), }
如何避免重復(fù)消費(fèi)?
- 啟用冪等生產(chǎn)者
config.Producer.Idempotent = true config.Producer.RequiredAcks = sarama.WaitForAll
到此這篇關(guān)于Go語言實(shí)現(xiàn)Kafka消息隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)Go Kafka消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言reflect.TypeOf()和reflect.Type通過反射獲取類型信息
這篇文章主要介紹了Go語言reflect.TypeOf()和reflect.Type通過反射獲取類型信息,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04go原子級(jí)內(nèi)存操作實(shí)現(xiàn)
原子級(jí)內(nèi)存操作是在多線程并發(fā)執(zhí)行時(shí),能夠確保某個(gè)內(nèi)存操作是不可中斷的操作,本文主要介紹了go原子級(jí)內(nèi)存操作實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02