Go庫實(shí)現(xiàn)Kafka消息的發(fā)送與接收(docker和k3s安裝kafka)
kafka是什么
Kafka傳統(tǒng)定義 :Kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。 發(fā)布/訂閱 :消息的發(fā)布者不會(huì)將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息 分為不同的類別,訂閱者只接收感興趣的消息。
Kafka最新定義:Kafka是一個(gè)開源的分布式事件流平臺(tái)(Event Streaming Platform),被數(shù)千家公司用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。
消息隊(duì)列的應(yīng)用場景無外乎是:削峰填谷、應(yīng)用解耦、異步處理等等,具體使用案例我們在之前講rabbitmq基礎(chǔ)篇已經(jīng)詳述過,這里不在做講述,這里說一下消息隊(duì)列的兩種模型:
- 點(diǎn)對點(diǎn)模型 :也叫消息隊(duì)列模型。如果拿上面那個(gè)“民間版”的定義來說,那么系統(tǒng) A 發(fā)送的消息只能被系統(tǒng) B 接收,其他任何系統(tǒng)都不能讀取 A 發(fā)送的消息。日常生活的例子比如電話客服就屬于這種模型:同一個(gè)客戶呼入電話只能被一位客服人員處理,第二個(gè)客服人員不能為該客戶服務(wù)。
- 發(fā)布 / 訂閱模型 :與上面不同的是,它有一個(gè)主題(Topic)的概念,你可以理解成邏輯語義相近的消息容器。該模型也有發(fā)送方和接收方,只不過提法不同。發(fā)送方也稱為發(fā)布者(Publisher),接收方稱為訂閱者(Subscriber)。和點(diǎn)對點(diǎn)模型不同的是,這個(gè)模型可能存在多個(gè)發(fā)布者向相同的主題發(fā)送消息,而訂閱者也可能存在多個(gè),它們都能接收到相同主題的消息。生活中的報(bào)紙訂閱就是一種典型的發(fā)布 / 訂閱模型。
kafka基礎(chǔ)架構(gòu)和核心概念
在 Kafka 中,發(fā)布訂閱的對象是 主題(Topic ),你可以為每個(gè)業(yè)務(wù)、每個(gè)應(yīng)用甚至是每類數(shù)據(jù)都創(chuàng)建專屬的主題。
生產(chǎn)者(Producer) :消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端,生產(chǎn)者程序通常持續(xù)不斷地向一個(gè)或多個(gè)主題發(fā)送消息。
消費(fèi)者(Consumer) :消息消費(fèi)者,向 kafka broker 取消息的客戶端,消費(fèi)者就是訂閱這些主題消息的客戶端應(yīng)用程序。
和生產(chǎn)者類似,消費(fèi)者也能夠同時(shí)訂閱多個(gè)主題的消息。我們把生產(chǎn)者和消費(fèi)者統(tǒng)稱為客戶端(Clients)。你可以同時(shí)運(yùn)行多個(gè)生產(chǎn)者和消費(fèi)者實(shí)例,這些實(shí)例會(huì)不斷地向 Kafka 集群中的多個(gè)主題生產(chǎn)和消費(fèi)消息。
消費(fèi)者組Consumer Group** (CG)**:由多個(gè) consumer 組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi),消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
Broker :一臺(tái) kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè) broker 可以容納多個(gè) topic。
主題(topic) :可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic;
分區(qū)(Partition) :為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上, 一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列;
副本(Replica) :副本,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 partition 數(shù)據(jù)不丟失,且 kafka 仍然能夠繼續(xù)工作,kafka 提供了副本機(jī)制,一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本, 一個(gè) leader 和若干個(gè) follower 。
leader :每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對 象都是 leader。
follower :每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從 leader 中同步數(shù)據(jù),保持和 leader 數(shù)據(jù) 的同步。leader 發(fā)生故障時(shí),某個(gè) follower 會(huì)成為新的 follower
副本的工作機(jī)制也很簡單:生產(chǎn)者總是向領(lǐng)導(dǎo)者副本寫消息;而消費(fèi)者總是從領(lǐng)導(dǎo)者副本讀消息。至于追隨者副本,它只做一件事:向領(lǐng)導(dǎo)者副本發(fā)送請求,請求領(lǐng)導(dǎo)者把最新生產(chǎn)的消息發(fā)給它,這樣它能保持與領(lǐng)導(dǎo)者的同步。
Kafka 使用消息日志(Log)來保存數(shù)據(jù),一個(gè)日志就是磁盤上一個(gè)只能追加寫(Append-only)消息的物理文件。因?yàn)橹荒茏芳訉懭耄?nbsp;避免了緩慢的隨機(jī) I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實(shí)現(xiàn) Kafka 高吞吐量特性的一個(gè)重要手段 。不過如果你不停地向一個(gè)日志寫入消息,最終也會(huì)耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?簡單來說就是通過日志段(Log Segment)機(jī)制。在 Kafka 底層,一個(gè)日志又近一步細(xì)分成多個(gè)日志段,消息被追加寫到當(dāng)前最新的日志段中,當(dāng)寫滿了一個(gè)日志段后,Kafka 會(huì)自動(dòng)切分出一個(gè)新的日志段,并將老的日志段封存起來。Kafka 在后臺(tái)還有定時(shí)任務(wù)會(huì)定期地檢查老的日志段是否能夠被刪除,從而實(shí)現(xiàn)回收磁盤空間的目的
Kafka優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
高吞吐量:Kafka的順序日志機(jī)制和高可用性設(shè)計(jì)使其在高并發(fā)場景下表現(xiàn)出色。
擴(kuò)展性強(qiáng):通過分區(qū)和復(fù)制機(jī)制,Kafka能夠輕松擴(kuò)展到多個(gè)節(jié)點(diǎn)。 ** easy to use**:Kafka提供了豐富的 API 和工具支持,簡化了集成和管理。
缺點(diǎn)
學(xué)習(xí)曲線:Kafka的發(fā)布-訂閱模型和分布式架構(gòu)對初次接觸者來說可能較為復(fù)雜。
配置敏感:Kafka的性能和穩(wěn)定性高度依賴于正確的配置和維護(hù)。
合規(guī)性與安全性在金融、醫(yī)療等高敏感領(lǐng)域,Kafka需要滿足嚴(yán)格的合規(guī)要求。可以通過配置安全機(jī)制(如認(rèn)證、授權(quán))來確保數(shù)據(jù)的完整性和安全性。
Kafka注意事項(xiàng)
高并發(fā)與分區(qū)的管理在高并發(fā)場景下,合理的分區(qū)劃分和負(fù)載均衡是關(guān)鍵。如果分區(qū)數(shù)量過多或負(fù)載不平衡,可能導(dǎo)致節(jié)點(diǎn)資源浪費(fèi)或消息延遲。
配置參數(shù)的優(yōu)化Kafka的性能參數(shù)(如生產(chǎn)速率、消費(fèi)速率、分區(qū)數(shù)等)需要根據(jù)實(shí)際應(yīng)用場景進(jìn)行調(diào)整。過高的生產(chǎn)速率可能導(dǎo)致消息堆積,而過低的消費(fèi)速率則會(huì)增加客戶端的負(fù)載。
網(wǎng)絡(luò)穩(wěn)定性Kafka對網(wǎng)絡(luò)性能有較高的要求。在實(shí)際部署中,需要確保集群內(nèi)各節(jié)點(diǎn)之間的網(wǎng)絡(luò)帶寬足夠高,避免因網(wǎng)絡(luò)延遲或分區(qū)不一致導(dǎo)致的消息丟失或延遲處理。
集群的高可用性Kafka的高可用性依賴于集群的配置和管理。在部署時(shí),需要確保節(jié)點(diǎn)的硬件配置一致,定期監(jiān)控集群狀態(tài),并及時(shí)處理節(jié)點(diǎn)故障。
監(jiān)控與運(yùn)維Kafka的監(jiān)控是保障系統(tǒng)穩(wěn)定運(yùn)行的關(guān)鍵。可以通過工具(如Prometheus、Grafana)實(shí)時(shí)監(jiān)控集群的性能、消息隊(duì)列的健康狀況以及消費(fèi)者組的負(fù)載情況。
docker安裝命令,其中172.16.11.111是宿主機(jī)ip,14818是宿主機(jī)端口,對應(yīng)容器端口9092:
docker run -d \ --name kafka \ -p 14818:9092 \ -p 9093:9093 \ -v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \ -e TZ=Asia/Shanghai \ -e KAFKA_NODE_ID=1 \ -e KAFKA_PROCESS_ROLES=broker,controller \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.11.111:14818 \ -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \ -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \ -e KAFKA_NUM_PARTITIONS=3 \ -e KAFKA_LOG_DIRS=/tmp/kraft-combined-logs \ -e CLUSTER_ID=5L6g3nShT-eMCtK--X86sw \ apache/kafka-native:4.1.0
k3s的yaml,其中172.16.11.111是宿主機(jī)ip,14818是宿主機(jī)端口,對應(yīng)容器端口9092:
apiVersion: apps/v1 kind: Deployment metadata: labels: app: kafka name: kafka namespace: moonfdd spec: replicas: 1 selector: matchLabels: app: kafka template: metadata: labels: app: kafka spec: initContainers: - name: kafka-fix-data-volume-permissions image: alpine imagePullPolicy: IfNotPresent command: - sh - -c - "chown -R 1000:1000 /tmp/kraft-combined-logs" volumeMounts: - mountPath: /tmp/kraft-combined-logs name: volv containers: - env: - name: TZ value: Asia/Shanghai - name: KAFKA_NODE_ID value: "1" - name: KAFKA_PROCESS_ROLES value: broker,controller - name: KAFKA_LISTENERS value: PLAINTEXT://:9092,CONTROLLER://:9093 - name: KAFKA_ADVERTISED_LISTENERS value: PLAINTEXT://172.16.11.111:14818 - name: KAFKA_CONTROLLER_LISTENER_NAMES value: CONTROLLER - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP value: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - name: KAFKA_CONTROLLER_QUORUM_VOTERS value: 1@localhost:9093 - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR value: "1" - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR value: "1" - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR value: "1" - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS value: "0" - name: KAFKA_NUM_PARTITIONS value: "3" - name: KAFKA_LOG_DIRS value: /tmp/kraft-combined-logs - name: CLUSTER_ID value: "5L6g3nShT-eMCtK--X86sw" # 固定集群ID,僅首次啟動(dòng)格式化使用 image: 'apache/kafka-native:4.1.0' imagePullPolicy: IfNotPresent name: kafka volumeMounts: - mountPath: /tmp/kraft-combined-logs name: volv volumes: - hostPath: path: /root/k8s/moonfdd/kafka/tmp/kraft-combined-logs type: DirectoryOrCreate name: volv --- apiVersion: v1 kind: Service metadata: labels: app: kafka name: kafka namespace: moonfdd spec: ports: - port: 9092 protocol: TCP targetPort: 9092 name: 9092-9092 - port: 9093 protocol: TCP targetPort: 9093 name: 9093-9093 selector: app: kafka type: NodePort
go發(fā)送kafka消息:github.com/segmentio/kafka-go
package main import ( "context" "log" "github.com/segmentio/kafka-go" ) func main() { // 創(chuàng)建一個(gè)Kafka writer(Producer) w := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"172.16.11.111:14818"}, // Kafka broker 地址 Topic: "test-topic", // 發(fā)送的 topic Balancer: &kafka.LeastBytes{}, // 負(fù)載均衡策略 }) // 寫入消息 err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("Hello Kafka from Go!"), }, ) if err != nil { log.Fatalf("could not write message: %v", err) } log.Println("Message sent successfully!") // 關(guān)閉 writer w.Close() }
go接收kafka消息:github.com/segmentio/kafka-go
package main import ( "context" "log" "github.com/segmentio/kafka-go" ) func main() { // 創(chuàng)建 Kafka reader(Consumer) r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"172.16.11.111:14818"}, // Kafka broker 地址 Topic: "test-topic", // 訂閱的 topic GroupID: "my-consumer-group", // 消費(fèi)者組,確保相同組會(huì)讀取上一 offset MinBytes: 10e3, // 最小fetch字節(jié)數(shù) MaxBytes: 10e6, // 最大fetch字節(jié)數(shù) }) for { // 讀取消息(會(huì)自動(dòng)從上次的 offset 開始) m, err := r.ReadMessage(context.Background()) if err != nil { log.Fatalf("could not read message: %v", err) } log.Printf("offset:%d | key:%s | value:%s\n", m.Offset, string(m.Key), string(m.Value)) } // r.Close() // 如果你打算退出循環(huán)時(shí)關(guān)閉 }
go發(fā)送kafka消息:github.com/IBM/sarama
package main import ( "fmt" "log" "time" "github.com/IBM/sarama" ) func main() { // 配置生產(chǎn)者 config := sarama.NewConfig() config.Producer.Return.Successes = true // 確保消息發(fā)送成功 config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本確認(rèn) config.Producer.Retry.Max = 3 // 重試次數(shù) // 重要:配置客戶端使用正確的主機(jī) config.Net.SASL.Enable = false config.Net.TLS.Enable = false config.Version = sarama.MaxVersion // 創(chuàng)建同步生產(chǎn)者 producer, err := sarama.NewSyncProducer([]string{"172.16.11.111:14818"}, config) if err != nil { log.Fatalf("創(chuàng)建生產(chǎn)者失敗: %v", err) } defer producer.Close() // 構(gòu)造消息 message := &sarama.ProducerMessage{ Topic: "test-topic", Key: sarama.StringEncoder("message-key"), Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka! %v", time.Now())), } // 發(fā)送消息 partition, offset, err := producer.SendMessage(message) if err != nil { log.Fatalf("發(fā)送消息失敗: %v", err) } fmt.Printf("消息發(fā)送成功! 分區(qū): %d, 偏移量: %d\n", partition, offset) }
go接收kafka消息:github.com/IBM/sarama
package main import ( "context" "fmt" "log" "os" "os/signal" "github.com/IBM/sarama" ) type Consumer struct{} func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error { // 會(huì)話初始化,可以在這里做一些準(zhǔn)備工作 return nil } func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error { // 會(huì)話結(jié)束時(shí)的清理操作 return nil } func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // claim.Messages() 會(huì)不斷返回新消息 for msg := range claim.Messages() { fmt.Printf("Topic:%s Partition:%d Offset:%d Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Value)) // 標(biāo)記該消息已被處理,Kafka會(huì)自動(dòng)保存offset session.MarkMessage(msg, "") } return nil } func main() { // Kafka集群地址 brokers := []string{"172.16.11.111:14818"} groupID := "my-group" // 消費(fèi)者組ID,保持不變才能從上次offset消費(fèi) topics := []string{"test-topic"} // 配置 config := sarama.NewConfig() config.Version = sarama.MaxVersion // Kafka版本 config.Consumer.Return.Errors = true // 非首次啟動(dòng)時(shí)自動(dòng)從上次位置開始 config.Consumer.Offsets.Initial = sarama.OffsetNewest // OffsetNewest: 如果沒有歷史offset,從最新開始; // OffsetOldest: 如果沒有歷史offset,從最舊開始。 // 創(chuàng)建消費(fèi)者組 consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { log.Fatalf("Error creating consumer group: %v", err) } defer consumerGroup.Close() consumer := &Consumer{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { for err := range consumerGroup.Errors() { log.Printf("Error: %v", err) } }() log.Println("Kafka consumer started...") // 優(yōu)雅退出 go func() { sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, os.Interrupt) <-sigchan cancel() }() // 循環(huán)消費(fèi) for { if err := consumerGroup.Consume(ctx, topics, consumer); err != nil { log.Printf("Error from consumer: %v", err) } // 檢查退出 if ctx.Err() != nil { return } } }
到此這篇關(guān)于Go庫實(shí)現(xiàn)Kafka消息的發(fā)送與接收(docker和k3s安裝kafka)的文章就介紹到這了,更多相關(guān)docker和k3s實(shí)現(xiàn)go語言發(fā)送和接收kafka消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
GO常見的錯(cuò)誤99%程序員會(huì)遇到(解決方法)
這篇文章主要介紹了GO常見的錯(cuò)誤99%程序員會(huì)遇到,本文給出了解決方法,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-01-01go語言實(shí)現(xiàn)Elasticsearches批量修改查詢及發(fā)送MQ操作示例
這篇文章主要為大家介紹了go語言實(shí)現(xiàn)Elasticsearches批量修改查詢及發(fā)送MQ操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04Go語言實(shí)現(xiàn)字符串搜索算法Boyer-Moore
Boyer-Moore?算法是一種非常高效的字符串搜索算法,被廣泛的應(yīng)用于多種字符串搜索場景,下面我們就來學(xué)習(xí)一下如何利用Go語言實(shí)現(xiàn)這一字符串搜索算法吧2023-11-11一文詳解go中如何實(shí)現(xiàn)定時(shí)任務(wù)
定時(shí)任務(wù)是指按照預(yù)定的時(shí)間間隔或特定時(shí)間點(diǎn)自動(dòng)執(zhí)行的計(jì)劃任務(wù)或操作,這篇文章主要為大家詳細(xì)介紹了go中是如何實(shí)現(xiàn)定時(shí)任務(wù)的,感興趣的可以了解下2023-11-11GO語言學(xué)習(xí)之語句塊的實(shí)現(xiàn)
本文主要介紹了GO語言學(xué)習(xí)之語句塊的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-06-06Golang?Template實(shí)現(xiàn)自定義函數(shù)的操作指南
這篇文章主要為大家詳細(xì)介紹了Golang如何利用Template實(shí)現(xiàn)自定義函數(shù)的操作,文中的示例代碼簡潔易懂,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-02-02Golang標(biāo)準(zhǔn)庫syscall詳解(什么是系統(tǒng)調(diào)用)
最近在研究go語言,發(fā)現(xiàn)go語言系統(tǒng)調(diào)用源碼只有調(diào)用函數(shù)的定義,今天通過本文給大家分享Golang標(biāo)準(zhǔn)庫syscall詳解及什么是系統(tǒng)調(diào)用,感興趣的朋友一起看看吧2021-05-05Golang自動(dòng)追蹤GitHub上熱門AI項(xiàng)目
這篇文章主要為大家介紹了Golang自動(dòng)追蹤GitHub上熱門AI項(xiàng)目,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12