Golang操作Kafka的實(shí)現(xiàn)示例
一.使用庫(kù)說(shuō)明
Golang中連接kafka可以使用第三方庫(kù):github.com/Shopify/sarama
二.Kafka Producer發(fā)送消息
package main?
import (
?? ?"fmt"
?? ?"github.com/Shopify/sarama"
)
func main() {
?? ?config := sarama.NewConfig()
?? ?config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follower都確認(rèn)
?? ?config.Producer.Partitioner = sarama.NewRandomPartitioner ?//寫(xiě)到隨機(jī)分區(qū)中,我們默認(rèn)設(shè)置32個(gè)分區(qū)
?? ?config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回
?? ?// 構(gòu)造一個(gè)消息
?? ?msg := &sarama.ProducerMessage{}
?? ?msg.Topic = "task"
?? ?msg.Value = sarama.StringEncoder("producer kafka messages...")
?? ?// 連接kafka
?? ?client, err := sarama.NewSyncProducer([]string{"192.20.216.8:9092"}, config)
?? ?if err != nil {
?? ??? ?fmt.Println("Producer closed, err:", err)
?? ??? ?return
?? ?}
?? ?defer client.Close()
?? ?// 發(fā)送消息
?? ?pid, offset, err := client.SendMessage(msg)
?? ?if err != nil {
?? ??? ?fmt.Println("send msg failed, err:", err)
?? ??? ?return
?? ?}
?? ?fmt.Printf("pid:%v offset:%v\n", pid, offset)
}三.Kafka Consumer消費(fèi)消息
package main
import (
?? ?"fmt"
?? ?"github.com/Shopify/sarama"
?? ?"sync"
)
func main() {
?? ?var wg sync.WaitGroup
?? ?consumer, err := sarama.NewConsumer([]string{"192.20.216.8:9092"}, nil)
?? ?if err != nil {
?? ??? ?fmt.Println("Failed to start consumer: %s", err)
?? ??? ?return
?? ?}
?? ?partitionList, err := consumer.Partitions("task-status-data") // 通過(guò)topic獲取到所有的分區(qū)
?? ?if err != nil {
?? ??? ?fmt.Println("Failed to get the list of partition: ", err)
?? ??? ?return
?? ?}
?? ?fmt.Println(partitionList)
?? ?for partition := range partitionList{ // 遍歷所有的分區(qū)
?? ??? ?pc, err := consumer.ConsumePartition("task", int32(partition), sarama.OffsetNewest) // 針對(duì)每個(gè)分區(qū)創(chuàng)建一個(gè)分區(qū)消費(fèi)者
?? ??? ?if err != nil {
?? ??? ??? ?fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
?? ??? ?}
?? ??? ?wg.Add(1)
?? ??? ?go func(sarama.PartitionConsumer) { // 為每個(gè)分區(qū)開(kāi)一個(gè)go協(xié)程取值
?? ??? ??? ?for msg := range pc.Messages() { // 阻塞直到有值發(fā)送過(guò)來(lái),然后再繼續(xù)等待
?? ??? ??? ??? ?fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
?? ??? ??? ?}
?? ??? ??? ?defer pc.AsyncClose()
?? ??? ??? ?wg.Done()
?? ??? ?}(pc)
?? ?}
?? ?wg.Wait()
?? ?consumer.Close()
}到此這篇關(guān)于Golang操作Kafka的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Golang操作Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang常用環(huán)境變量說(shuō)明與設(shè)置詳解
這篇文章主要介紹了Golang常用環(huán)境變量說(shuō)明與設(shè)置,需要的朋友可以參考下2020-02-02
在ubuntu下安裝go開(kāi)發(fā)環(huán)境的全過(guò)程
Go語(yǔ)言是谷歌公司開(kāi)發(fā)的編程語(yǔ)言,雖然安裝和配置go很簡(jiǎn)單,但是很多初學(xué)者在第一次安裝go環(huán)境時(shí)會(huì)遇到各種坑,下面這篇文章主要給大家介紹了關(guān)于在ubuntu下安裝go開(kāi)發(fā)環(huán)境的相關(guān)資料,文中通過(guò)圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08
GO語(yǔ)言異常處理機(jī)制panic和recover分析
這篇文章主要介紹了GO語(yǔ)言異常處理機(jī)制panic和recover,分析了捕獲運(yùn)行時(shí)發(fā)生錯(cuò)誤的方法,是非常實(shí)用的技巧,需要的朋友可以參考下2014-12-12
Go 數(shù)據(jù)結(jié)構(gòu)之堆排序示例詳解
這篇文章主要為大家介紹了Go 數(shù)據(jù)結(jié)構(gòu)之堆排序示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
GO語(yǔ)言基礎(chǔ)入門(mén)第一個(gè)go程序解讀
這篇文章主要為大家介紹了GO語(yǔ)言基礎(chǔ)入門(mén)的第一個(gè)go程序解讀,下面來(lái)帶大家進(jìn)入Go語(yǔ)言世界helloworld的大門(mén)吧,有需要的朋友可以借鑒參考下,希望能夠有所幫助2021-11-11
Go語(yǔ)言題解LeetCode下一個(gè)更大元素示例詳解
這篇文章主要為大家介紹了Go語(yǔ)言題解LeetCode下一個(gè)更大元素示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12
如何基于Golang實(shí)現(xiàn)Kubernetes邊車(chē)模式
本文介紹了如何基于Go實(shí)現(xiàn)Kubernetes Sidecar模式,并通過(guò)實(shí)際示例演示創(chuàng)建Golang實(shí)現(xiàn)的微服務(wù)服務(wù)、Docker 容器化以及在 Kubernetes 上的部署和管理,感興趣的朋友一起看看吧2024-08-08

