Golang操作Kafka的實(shí)現(xiàn)示例
更新時(shí)間:2023年02月19日 09:00:04 作者:YUHAOHAO
本文主要介紹了Golang操作Kafka的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
一.使用庫說明
Golang中連接kafka可以使用第三方庫:github.com/Shopify/sarama
二.Kafka Producer發(fā)送消息
package main? import ( ?? ?"fmt" ?? ?"github.com/Shopify/sarama" ) func main() { ?? ?config := sarama.NewConfig() ?? ?config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follower都確認(rèn) ?? ?config.Producer.Partitioner = sarama.NewRandomPartitioner ?//寫到隨機(jī)分區(qū)中,我們默認(rèn)設(shè)置32個(gè)分區(qū) ?? ?config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回 ?? ?// 構(gòu)造一個(gè)消息 ?? ?msg := &sarama.ProducerMessage{} ?? ?msg.Topic = "task" ?? ?msg.Value = sarama.StringEncoder("producer kafka messages...") ?? ?// 連接kafka ?? ?client, err := sarama.NewSyncProducer([]string{"192.20.216.8:9092"}, config) ?? ?if err != nil { ?? ??? ?fmt.Println("Producer closed, err:", err) ?? ??? ?return ?? ?} ?? ?defer client.Close() ?? ?// 發(fā)送消息 ?? ?pid, offset, err := client.SendMessage(msg) ?? ?if err != nil { ?? ??? ?fmt.Println("send msg failed, err:", err) ?? ??? ?return ?? ?} ?? ?fmt.Printf("pid:%v offset:%v\n", pid, offset) }
三.Kafka Consumer消費(fèi)消息
package main import ( ?? ?"fmt" ?? ?"github.com/Shopify/sarama" ?? ?"sync" ) func main() { ?? ?var wg sync.WaitGroup ?? ?consumer, err := sarama.NewConsumer([]string{"192.20.216.8:9092"}, nil) ?? ?if err != nil { ?? ??? ?fmt.Println("Failed to start consumer: %s", err) ?? ??? ?return ?? ?} ?? ?partitionList, err := consumer.Partitions("task-status-data") // 通過topic獲取到所有的分區(qū) ?? ?if err != nil { ?? ??? ?fmt.Println("Failed to get the list of partition: ", err) ?? ??? ?return ?? ?} ?? ?fmt.Println(partitionList) ?? ?for partition := range partitionList{ // 遍歷所有的分區(qū) ?? ??? ?pc, err := consumer.ConsumePartition("task", int32(partition), sarama.OffsetNewest) // 針對每個(gè)分區(qū)創(chuàng)建一個(gè)分區(qū)消費(fèi)者 ?? ??? ?if err != nil { ?? ??? ??? ?fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err) ?? ??? ?} ?? ??? ?wg.Add(1) ?? ??? ?go func(sarama.PartitionConsumer) { // 為每個(gè)分區(qū)開一個(gè)go協(xié)程取值 ?? ??? ??? ?for msg := range pc.Messages() { // 阻塞直到有值發(fā)送過來,然后再繼續(xù)等待 ?? ??? ??? ??? ?fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) ?? ??? ??? ?} ?? ??? ??? ?defer pc.AsyncClose() ?? ??? ??? ?wg.Done() ?? ??? ?}(pc) ?? ?} ?? ?wg.Wait() ?? ?consumer.Close() }
到此這篇關(guān)于Golang操作Kafka的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Golang操作Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang常用環(huán)境變量說明與設(shè)置詳解
這篇文章主要介紹了Golang常用環(huán)境變量說明與設(shè)置,需要的朋友可以參考下2020-02-02在ubuntu下安裝go開發(fā)環(huán)境的全過程
Go語言是谷歌公司開發(fā)的編程語言,雖然安裝和配置go很簡單,但是很多初學(xué)者在第一次安裝go環(huán)境時(shí)會遇到各種坑,下面這篇文章主要給大家介紹了關(guān)于在ubuntu下安裝go開發(fā)環(huán)境的相關(guān)資料,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08Go 數(shù)據(jù)結(jié)構(gòu)之堆排序示例詳解
這篇文章主要為大家介紹了Go 數(shù)據(jù)結(jié)構(gòu)之堆排序示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08如何基于Golang實(shí)現(xiàn)Kubernetes邊車模式
本文介紹了如何基于Go實(shí)現(xiàn)Kubernetes Sidecar模式,并通過實(shí)際示例演示創(chuàng)建Golang實(shí)現(xiàn)的微服務(wù)服務(wù)、Docker 容器化以及在 Kubernetes 上的部署和管理,感興趣的朋友一起看看吧2024-08-08