Golang操作Kafka的實(shí)現(xiàn)示例
更新時(shí)間:2023年02月19日 09:00:04 作者:YUHAOHAO
本文主要介紹了Golang操作Kafka的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
一.使用庫說明
Golang中連接kafka可以使用第三方庫:github.com/Shopify/sarama
二.Kafka Producer發(fā)送消息
package main?
import (
?? ?"fmt"
?? ?"github.com/Shopify/sarama"
)
func main() {
?? ?config := sarama.NewConfig()
?? ?config.Producer.RequiredAcks = sarama.WaitForAll // 發(fā)送完數(shù)據(jù)需要leader和follower都確認(rèn)
?? ?config.Producer.Partitioner = sarama.NewRandomPartitioner ?//寫到隨機(jī)分區(qū)中,我們默認(rèn)設(shè)置32個(gè)分區(qū)
?? ?config.Producer.Return.Successes = true // 成功交付的消息將在success channel返回
?? ?// 構(gòu)造一個(gè)消息
?? ?msg := &sarama.ProducerMessage{}
?? ?msg.Topic = "task"
?? ?msg.Value = sarama.StringEncoder("producer kafka messages...")
?? ?// 連接kafka
?? ?client, err := sarama.NewSyncProducer([]string{"192.20.216.8:9092"}, config)
?? ?if err != nil {
?? ??? ?fmt.Println("Producer closed, err:", err)
?? ??? ?return
?? ?}
?? ?defer client.Close()
?? ?// 發(fā)送消息
?? ?pid, offset, err := client.SendMessage(msg)
?? ?if err != nil {
?? ??? ?fmt.Println("send msg failed, err:", err)
?? ??? ?return
?? ?}
?? ?fmt.Printf("pid:%v offset:%v\n", pid, offset)
}三.Kafka Consumer消費(fèi)消息
package main
import (
?? ?"fmt"
?? ?"github.com/Shopify/sarama"
?? ?"sync"
)
func main() {
?? ?var wg sync.WaitGroup
?? ?consumer, err := sarama.NewConsumer([]string{"192.20.216.8:9092"}, nil)
?? ?if err != nil {
?? ??? ?fmt.Println("Failed to start consumer: %s", err)
?? ??? ?return
?? ?}
?? ?partitionList, err := consumer.Partitions("task-status-data") // 通過topic獲取到所有的分區(qū)
?? ?if err != nil {
?? ??? ?fmt.Println("Failed to get the list of partition: ", err)
?? ??? ?return
?? ?}
?? ?fmt.Println(partitionList)
?? ?for partition := range partitionList{ // 遍歷所有的分區(qū)
?? ??? ?pc, err := consumer.ConsumePartition("task", int32(partition), sarama.OffsetNewest) // 針對(duì)每個(gè)分區(qū)創(chuàng)建一個(gè)分區(qū)消費(fèi)者
?? ??? ?if err != nil {
?? ??? ??? ?fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
?? ??? ?}
?? ??? ?wg.Add(1)
?? ??? ?go func(sarama.PartitionConsumer) { // 為每個(gè)分區(qū)開一個(gè)go協(xié)程取值
?? ??? ??? ?for msg := range pc.Messages() { // 阻塞直到有值發(fā)送過來,然后再繼續(xù)等待
?? ??? ??? ??? ?fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
?? ??? ??? ?}
?? ??? ??? ?defer pc.AsyncClose()
?? ??? ??? ?wg.Done()
?? ??? ?}(pc)
?? ?}
?? ?wg.Wait()
?? ?consumer.Close()
}到此這篇關(guān)于Golang操作Kafka的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Golang操作Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang常用環(huán)境變量說明與設(shè)置詳解
這篇文章主要介紹了Golang常用環(huán)境變量說明與設(shè)置,需要的朋友可以參考下2020-02-02
在ubuntu下安裝go開發(fā)環(huán)境的全過程
Go語言是谷歌公司開發(fā)的編程語言,雖然安裝和配置go很簡單,但是很多初學(xué)者在第一次安裝go環(huán)境時(shí)會(huì)遇到各種坑,下面這篇文章主要給大家介紹了關(guān)于在ubuntu下安裝go開發(fā)環(huán)境的相關(guān)資料,文中通過圖文介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08
Go 數(shù)據(jù)結(jié)構(gòu)之堆排序示例詳解
這篇文章主要為大家介紹了Go 數(shù)據(jù)結(jié)構(gòu)之堆排序示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
如何基于Golang實(shí)現(xiàn)Kubernetes邊車模式
本文介紹了如何基于Go實(shí)現(xiàn)Kubernetes Sidecar模式,并通過實(shí)際示例演示創(chuàng)建Golang實(shí)現(xiàn)的微服務(wù)服務(wù)、Docker 容器化以及在 Kubernetes 上的部署和管理,感興趣的朋友一起看看吧2024-08-08

