Golang操作Kafka的實(shí)現(xiàn)示例
一.使用庫(kù)說(shuō)明
Golang中連接kafka可以使用第三方庫(kù):github.com/Shopify/sarama
二.Kafka Producer發(fā)送消息
package main? import ( ?? ?"fmt" ?? ?"github.com/Shopify/sarama" ) func main() { ?? ?config := sarama.NewConfig() ?? ?config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follower都確認(rèn) ?? ?config.Producer.Partitioner = sarama.NewRandomPartitioner ?//寫(xiě)到隨機(jī)分區(qū)中,我們默認(rèn)設(shè)置32個(gè)分區(qū) ?? ?config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回 ?? ?// 構(gòu)造一個(gè)消息 ?? ?msg := &sarama.ProducerMessage{} ?? ?msg.Topic = "task" ?? ?msg.Value = sarama.StringEncoder("producer kafka messages...") ?? ?// 連接kafka ?? ?client, err := sarama.NewSyncProducer([]string{"192.20.216.8:9092"}, config) ?? ?if err != nil { ?? ??? ?fmt.Println("Producer closed, err:", err) ?? ??? ?return ?? ?} ?? ?defer client.Close() ?? ?// 發(fā)送消息 ?? ?pid, offset, err := client.SendMessage(msg) ?? ?if err != nil { ?? ??? ?fmt.Println("send msg failed, err:", err) ?? ??? ?return ?? ?} ?? ?fmt.Printf("pid:%v offset:%v\n", pid, offset) }
三.Kafka Consumer消費(fèi)消息
package main import ( ?? ?"fmt" ?? ?"github.com/Shopify/sarama" ?? ?"sync" ) func main() { ?? ?var wg sync.WaitGroup ?? ?consumer, err := sarama.NewConsumer([]string{"192.20.216.8:9092"}, nil) ?? ?if err != nil { ?? ??? ?fmt.Println("Failed to start consumer: %s", err) ?? ??? ?return ?? ?} ?? ?partitionList, err := consumer.Partitions("task-status-data") // 通過(guò)topic獲取到所有的分區(qū) ?? ?if err != nil { ?? ??? ?fmt.Println("Failed to get the list of partition: ", err) ?? ??? ?return ?? ?} ?? ?fmt.Println(partitionList) ?? ?for partition := range partitionList{ // 遍歷所有的分區(qū) ?? ??? ?pc, err := consumer.ConsumePartition("task", int32(partition), sarama.OffsetNewest) // 針對(duì)每個(gè)分區(qū)創(chuàng)建一個(gè)分區(qū)消費(fèi)者 ?? ??? ?if err != nil { ?? ??? ??? ?fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err) ?? ??? ?} ?? ??? ?wg.Add(1) ?? ??? ?go func(sarama.PartitionConsumer) { // 為每個(gè)分區(qū)開(kāi)一個(gè)go協(xié)程取值 ?? ??? ??? ?for msg := range pc.Messages() { // 阻塞直到有值發(fā)送過(guò)來(lái),然后再繼續(xù)等待 ?? ??? ??? ??? ?fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) ?? ??? ??? ?} ?? ??? ??? ?defer pc.AsyncClose() ?? ??? ??? ?wg.Done() ?? ??? ?}(pc) ?? ?} ?? ?wg.Wait() ?? ?consumer.Close() }
到此這篇關(guān)于Golang操作Kafka的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Golang操作Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang常用環(huán)境變量說(shuō)明與設(shè)置詳解
這篇文章主要介紹了Golang常用環(huán)境變量說(shuō)明與設(shè)置,需要的朋友可以參考下2020-02-02在ubuntu下安裝go開(kāi)發(fā)環(huán)境的全過(guò)程
Go語(yǔ)言是谷歌公司開(kāi)發(fā)的編程語(yǔ)言,雖然安裝和配置go很簡(jiǎn)單,但是很多初學(xué)者在第一次安裝go環(huán)境時(shí)會(huì)遇到各種坑,下面這篇文章主要給大家介紹了關(guān)于在ubuntu下安裝go開(kāi)發(fā)環(huán)境的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08GO語(yǔ)言異常處理機(jī)制panic和recover分析
這篇文章主要介紹了GO語(yǔ)言異常處理機(jī)制panic和recover,分析了捕獲運(yùn)行時(shí)發(fā)生錯(cuò)誤的方法,是非常實(shí)用的技巧,需要的朋友可以參考下2014-12-12Go 數(shù)據(jù)結(jié)構(gòu)之堆排序示例詳解
這篇文章主要為大家介紹了Go 數(shù)據(jù)結(jié)構(gòu)之堆排序示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08GO語(yǔ)言基礎(chǔ)入門(mén)第一個(gè)go程序解讀
這篇文章主要為大家介紹了GO語(yǔ)言基礎(chǔ)入門(mén)的第一個(gè)go程序解讀,下面來(lái)帶大家進(jìn)入Go語(yǔ)言世界helloworld的大門(mén)吧,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-11-11Go語(yǔ)言題解LeetCode下一個(gè)更大元素示例詳解
這篇文章主要為大家介紹了Go語(yǔ)言題解LeetCode下一個(gè)更大元素示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12如何基于Golang實(shí)現(xiàn)Kubernetes邊車(chē)模式
本文介紹了如何基于Go實(shí)現(xiàn)Kubernetes Sidecar模式,并通過(guò)實(shí)際示例演示創(chuàng)建Golang實(shí)現(xiàn)的微服務(wù)服務(wù)、Docker 容器化以及在 Kubernetes 上的部署和管理,感興趣的朋友一起看看吧2024-08-08