Go語(yǔ)言實(shí)現(xiàn)Kafka消息隊(duì)列的示例代碼
一、Go語(yǔ)言快速實(shí)現(xiàn)Kafka消息隊(duì)列
安裝依賴庫(kù)
go get github.com/IBM/sarama
生產(chǎn)者代碼示例
package main
import (
"log"
"github.com/IBM/sarama"
)
func main() {
// 1. 創(chuàng)建生產(chǎn)者配置
config := sarama.NewConfig()
config.Producer.Return.Successes = true
// 2. 連接Kafka集群
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatal("連接失敗:", err)
}
defer producer.Close()
// 3. 發(fā)送消息
msg := &sarama.ProducerMessage{
Topic: "test-topic",
Value: sarama.StringEncoder("Hello Kafka!"),
}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Fatal("發(fā)送失敗:", err)
}
log.Printf("發(fā)送成功! 分區(qū):%d 偏移量:%d", partition, offset)
}
消費(fèi)者代碼示例
package main
import (
"log"
"github.com/IBM/sarama"
)
func main() {
// 1. 創(chuàng)建消費(fèi)者
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatal("連接失敗:", err)
}
defer consumer.Close()
// 2. 訂閱指定分區(qū)(這里消費(fèi)分區(qū)0)
partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("訂閱失敗:", err)
}
defer partitionConsumer.Close()
// 3. 持續(xù)消費(fèi)消息
for msg := range partitionConsumer.Messages() {
log.Printf("收到消息: 分區(qū):%d 偏移量:%d 內(nèi)容:%s",
msg.Partition, msg.Offset, string(msg.Value))
}
}
二、Kafka與其他消息隊(duì)列對(duì)比
對(duì)比維度表
| 特性 | Kafka | RabbitMQ | Redis Streams | RocketMQ |
|---|---|---|---|---|
| 設(shè)計(jì)定位 | 高吞吐日志處理 | 企業(yè)級(jí)消息路由 | 輕量級(jí)實(shí)時(shí)消息 | 金融級(jí)可靠消息 |
| 吞吐量 | 百萬(wàn)級(jí)/秒 | 萬(wàn)級(jí)/秒 | 十萬(wàn)級(jí)/秒 | 十萬(wàn)級(jí)/秒 |
| 消息持久化 | 磁盤(pán)持久化(默認(rèn)7天) | 內(nèi)存/磁盤(pán)可選 | 內(nèi)存為主 | 磁盤(pán)持久化 |
| 消息模式 | 發(fā)布-訂閱 | 點(diǎn)對(duì)點(diǎn)/發(fā)布訂閱 | 發(fā)布訂閱 | 發(fā)布訂閱 |
| 協(xié)議支持 | 自定義協(xié)議 | AMQP協(xié)議 | RESP協(xié)議 | 自定義協(xié)議 |
| 順序保證 | 分區(qū)內(nèi)有序 | 隊(duì)列有序 | 全局無(wú)序 | 隊(duì)列有序 |
| 延遲消息 | 不支持 | 支持 | 支持 | 支持 |
| 典型場(chǎng)景 | 日志收集、流處理 | 訂單系統(tǒng)、任務(wù)隊(duì)列 | 實(shí)時(shí)通知、排行榜更新 | 交易系統(tǒng)、金融場(chǎng)景 |
| Go生態(tài)支持 | Sarama庫(kù)完善 | amqp庫(kù)穩(wěn)定 | go-redis庫(kù)支持 | 官方SDK支持 |
三、核心差異說(shuō)明
吞吐量對(duì)比
- Kafka:采用順序磁盤(pán)寫(xiě)入 + 零拷貝技術(shù),適合大數(shù)據(jù)量場(chǎng)景(如:?jiǎn)渭嚎蛇_(dá)百萬(wàn)TPS)
- RabbitMQ:基于內(nèi)存的Erlang實(shí)現(xiàn),適合中小規(guī)模業(yè)務(wù)(如:電商訂單處理)
消息存儲(chǔ)機(jī)制
// Kafka的持久化示例(自動(dòng)創(chuàng)建日志文件) kafka-topics.sh --create \ --topic user-log \ --partitions 3 \ --replication-factor 2 \ --config retention.ms=604800000 // 保留7天
- Kafka:消息持久化到磁盤(pán),支持TB級(jí)數(shù)據(jù)存儲(chǔ)
- Redis:數(shù)據(jù)主要在內(nèi)存,重啟可能丟失(需配置持久化)
消息消費(fèi)模式
- Kafka:消費(fèi)者主動(dòng)拉?。≒ull)
- RabbitMQ:服務(wù)端推送(Push)
使用復(fù)雜度
// 對(duì)比初始化復(fù)雜度(以生產(chǎn)者為例)
// Kafka需要配置更多參數(shù)
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewHashPartitioner
// RabbitMQ連接更簡(jiǎn)單
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")
四、選型建議
選擇Kafka當(dāng)
- 需要處理日志流、點(diǎn)擊流等大數(shù)據(jù)量場(chǎng)景
- 要求消息持久化存儲(chǔ)和回溯能力
- 需要橫向擴(kuò)展的分布式系統(tǒng)
選擇其他隊(duì)列當(dāng)
- RabbitMQ:需要復(fù)雜路由(如:延時(shí)隊(duì)列、死信隊(duì)列)
- Redis Streams:需要極低延遲的實(shí)時(shí)通知
- RocketMQ:金融場(chǎng)景要求強(qiáng)一致性
五、常見(jiàn)問(wèn)題
如何保證Kafka消息順序?
- 單分區(qū)內(nèi)保證順序
- 發(fā)送時(shí)指定分區(qū)鍵(如用戶ID)
msg := &sarama.ProducerMessage{
Topic: "order-events",
Key: sarama.StringEncoder(userID), // 相同Key進(jìn)入同一分區(qū)
Value: sarama.StringEncoder("下單成功"),
}
如何避免重復(fù)消費(fèi)?
- 啟用冪等生產(chǎn)者
config.Producer.Idempotent = true config.Producer.RequiredAcks = sarama.WaitForAll
到此這篇關(guān)于Go語(yǔ)言實(shí)現(xiàn)Kafka消息隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)Go Kafka消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語(yǔ)言中Select語(yǔ)句用法實(shí)例
這篇文章主要介紹了Go語(yǔ)言中Select語(yǔ)句用法,實(shí)例分析了select語(yǔ)句的原理與使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-02-02
Go語(yǔ)言reflect.TypeOf()和reflect.Type通過(guò)反射獲取類(lèi)型信息
這篇文章主要介紹了Go語(yǔ)言reflect.TypeOf()和reflect.Type通過(guò)反射獲取類(lèi)型信息,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
go原子級(jí)內(nèi)存操作實(shí)現(xiàn)
原子級(jí)內(nèi)存操作是在多線程并發(fā)執(zhí)行時(shí),能夠確保某個(gè)內(nèi)存操作是不可中斷的操作,本文主要介紹了go原子級(jí)內(nèi)存操作實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02
Golang讀寫(xiě)二進(jìn)制文件方法總結(jié)
使用?Golang?的?encoding/gob?包讀寫(xiě)二進(jìn)制文件非常方便,而且代碼量也非常少,本文就來(lái)通過(guò)兩個(gè)示例帶大家了解一下encoding/gob的具體用法吧2023-05-05

