Go操作Kafka的實現(xiàn)示例(kafka-go)
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),本文介紹了如何使用kafka-go這個庫實現(xiàn)Go語言與kafka的交互。
Go社區(qū)中目前有三個比較常用的kafka客戶端庫 , 它們各有特點。
首先是IBM/sarama(這個庫已經(jīng)由Shopify轉(zhuǎn)給了IBM),之前我寫過一篇使用sarama操作Kafka的教程,相較于sarama, kafka-go 更簡單、更易用。
segmentio/kafka-go 是純Go實現(xiàn),提供了與kafka交互的低級別和高級別兩套API,同時也支持Context。
此外社區(qū)中另一個比較常用的confluentinc/confluent-kafka-go,它是一個基于cgo的librdkafka包裝,在項目中使用它會引入對C庫的依賴。
準(zhǔn)備Kafka環(huán)境
這里推薦使用Docker Compose快速搭建一套本地開發(fā)環(huán)境。
以下docker-compose.yml文件用來搭建一套單節(jié)點zookeeper和單節(jié)點kafka環(huán)境,并且在8080端口提供kafka-ui管理界面。
version: '2.1' services: zoo1: image: confluentinc/cp-zookeeper:7.3.2 hostname: zoo1 container_name: zoo1 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zoo1:2888:3888 kafka1: image: confluentinc/cp-kafka:7.3.2 hostname: kafka1 container_name: kafka1 ports: - "9092:9092" - "29092:29092" - "9999:9999" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_JMX_PORT: 9999 KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1} KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" depends_on: - zoo1 kafka-ui: container_name: kafka-ui image: provectuslabs/kafka-ui:latest ports: - 8080:8080 depends_on: - kafka1 environment: DYNAMIC_CONFIG_ENABLED: "TRUE"
將上述docker-compose.yml文件在本地保存,在同一目錄下執(zhí)行以下命令啟動容器。
docker-compose up -d
容器啟動后,使用瀏覽器打開127.0.0.1:8080 即可看到如下kafka-ui界面。
點擊頁面右側(cè)的“Configure new cluster”按鈕,配置kafka服務(wù)連接信息。
填寫完信息后,點擊頁面下方的“Submit”按鈕提交即可。
安裝kafka-go
執(zhí)行以下命令下載 kafka-go依賴。
go get github.com/segmentio/kafka-go
注意:kafka-go 需要 Go 1.15或更高版本。
kafka-go使用指南
kafka-go 提供了兩套與Kafka交互的API。
- 低級別( low-level):基于與 Kafka 服務(wù)器的原始網(wǎng)絡(luò)連接實現(xiàn)。
- 高級別(high-level):對于常用讀寫操作封裝了一套更易用的API。
通常建議直接使用高級別的交互API。
Connection
Conn 類型是 kafka-go 包的核心。它代表與 Kafka broker之間的連接?;谒鼘崿F(xiàn)了一套與Kafka交互的低級別 API。
發(fā)送消息
下面是連接至Kafka之后,使用Conn發(fā)送消息的代碼示例。
// writeByConn 基于Conn發(fā)送消息 func writeByConn() { topic := "my-topic" partition := 0 // 連接至Kafka集群的Leader節(jié)點 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) if err != nil { log.Fatal("failed to dial leader:", err) } // 設(shè)置發(fā)送消息的超時時間 conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) // 發(fā)送消息 _, err = conn.WriteMessages( kafka.Message{Value: []byte("one!")}, kafka.Message{Value: []byte("two!")}, kafka.Message{Value: []byte("three!")}, ) if err != nil { log.Fatal("failed to write messages:", err) } // 關(guān)閉連接 if err := conn.Close(); err != nil { log.Fatal("failed to close writer:", err) } }
消費(fèi)消息
// readByConn 連接至kafka后接收消息 func readByConn() { // 指定要連接的topic和partition topic := "my-topic" partition := 0 // 連接至Kafka的leader節(jié)點 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) if err != nil { log.Fatal("failed to dial leader:", err) } // 設(shè)置讀取超時時間 conn.SetReadDeadline(time.Now().Add(10 * time.Second)) // 讀取一批消息,得到的batch是一系列消息的迭代器 batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max // 遍歷讀取消息 b := make([]byte, 10e3) // 10KB max per message for { n, err := batch.Read(b) if err != nil { break } fmt.Println(string(b[:n])) } // 關(guān)閉batch if err := batch.Close(); err != nil { log.Fatal("failed to close batch:", err) } // 關(guān)閉連接 if err := conn.Close(); err != nil { log.Fatal("failed to close connection:", err) } }
使用batch.Read更高效一些,但是需要根據(jù)消息長度選擇合適的buffer(上述代碼中的b),如果傳入的buffer太?。ㄏ⒀b不下)就會返回io.ErrShortBuffer錯誤。
如果不考慮內(nèi)存分配的效率問題,也可以按以下代碼使用batch.ReadMessage讀取消息。
for { msg, err := batch.ReadMessage() if err != nil { break } fmt.Println(string(msg.Value)) }
創(chuàng)建topic
當(dāng)Kafka關(guān)閉自動創(chuàng)建topic的設(shè)置時,可按如下方式創(chuàng)建topic。
// createTopicByConn 創(chuàng)建topic func createTopicByConn() { // 指定要創(chuàng)建的topic名稱 topic := "my-topic" // 連接至任意kafka節(jié)點 conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() // 獲取當(dāng)前控制節(jié)點信息 controller, err := conn.Controller() if err != nil { panic(err.Error()) } var controllerConn *kafka.Conn // 連接至leader節(jié)點 controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { panic(err.Error()) } defer controllerConn.Close() topicConfigs := []kafka.TopicConfig{ { Topic: topic, NumPartitions: 1, ReplicationFactor: 1, }, } // 創(chuàng)建topic err = controllerConn.CreateTopics(topicConfigs...) if err != nil { panic(err.Error()) } }
通過非leader節(jié)點連接leader節(jié)點
下面的示例代碼演示了如何通過已有的非leader節(jié)點的Conn,連接至 leader節(jié)點。
conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() // 獲取當(dāng)前控制節(jié)點信息 controller, err := conn.Controller() if err != nil { panic(err.Error()) } var connLeader *kafka.Conn connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { panic(err.Error()) } defer connLeader.Close()
獲取topic列表
conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() partitions, err := conn.ReadPartitions() if err != nil { panic(err.Error()) } m := map[string]struct{}{} // 遍歷所有分區(qū)取topic for _, p := range partitions { m[p.Topic] = struct{}{} } for k := range m { fmt.Println(k) }
Reader
Reader是由 kafka-go 包提供的另一個概念,對于從單個主題-分區(qū)(topic-partition)消費(fèi)消息這種典型場景,使用它能夠簡化代碼。Reader 還實現(xiàn)了自動重連和偏移量管理,并支持使用 Context 支持異步取消和超時的 API。
注意: 當(dāng)進(jìn)程退出時,必須在 Reader 上調(diào)用 Close() 。Kafka服務(wù)器需要一個優(yōu)雅的斷開連接來阻止它繼續(xù)嘗試向已連接的客戶端發(fā)送消息。如果進(jìn)程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)終止,那么下面給出的示例不會調(diào)用 Close()。當(dāng)同一topic上有新Reader連接時,可能導(dǎo)致延遲(例如,新進(jìn)程啟動或新容器運(yùn)行)。在這種場景下應(yīng)使用signal.Notify處理程序在進(jìn)程關(guān)閉時關(guān)閉Reader。
消費(fèi)消息
下面的代碼演示了如何使用Reader連接至Kafka消費(fèi)消息。
// readByReader 通過Reader接收消息 func readByReader() { // 創(chuàng)建Reader r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "topic-A", Partition: 0, MaxBytes: 10e6, // 10MB }) r.SetOffset(42) // 設(shè)置Offset // 接收消息 for { m, err := r.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } // 程序退出前關(guān)閉Reader if err := r.Close(); err != nil { log.Fatal("failed to close reader:", err) } }
消費(fèi)者組
kafka-go支持消費(fèi)者組,包括broker管理的offset。要啟用消費(fèi)者組,只需在 ReaderConfig 中指定 GroupID。
使用消費(fèi)者組時,ReadMessage 會自動提交偏移量。
// 創(chuàng)建一個reader,指定GroupID,從 topic-A 消費(fèi)消息 r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", // 指定消費(fèi)者組id Topic: "topic-A", MaxBytes: 10e6, // 10MB }) // 接收消息 for { m, err := r.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) } // 程序退出前關(guān)閉Reader if err := r.Close(); err != nil { log.Fatal("failed to close reader:", err) }
在使用消費(fèi)者組時會有以下限制:
(*Reader).SetOffset 當(dāng)設(shè)置了GroupID時會返回錯誤
(*Reader).Offset 當(dāng)設(shè)置了GroupID時會永遠(yuǎn)返回 -1
(*Reader).Lag 當(dāng)設(shè)置了GroupID時會永遠(yuǎn)返回 -1
(*Reader).ReadLag 當(dāng)設(shè)置了GroupID時會返回錯誤
(*Reader).Stats 當(dāng)設(shè)置了GroupID時會返回一個-1的分區(qū)
顯式提交
kafka-go 也支持顯式提交。當(dāng)需要顯式提交時不要調(diào)用 ReadMessage,而是調(diào)用 FetchMessage獲取消息,然后調(diào)用 CommitMessages 顯式提交。
ctx := context.Background() for { // 獲取消息 m, err := r.FetchMessage(ctx) if err != nil { break } // 處理消息 fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) // 顯式提交 if err := r.CommitMessages(ctx, m); err != nil { log.Fatal("failed to commit messages:", err) } }
在消費(fèi)者組中提交消息時,具有給定主題/分區(qū)的最大偏移量的消息確定該分區(qū)的提交偏移量的值。例如,如果通過調(diào)用 FetchMessage 獲取了單個分區(qū)的偏移量為 1、2 和 3 的消息,則使用偏移量為3的消息調(diào)用 CommitMessages 也將導(dǎo)致該分區(qū)的偏移量為 1 和 2 的消息被提交。
管理提交間隔
默認(rèn)情況下,調(diào)用CommitMessages將同步向Kafka提交偏移量。為了提高性能,可以在ReaderConfig中設(shè)置CommitInterval來定期向Kafka提交偏移。
// 創(chuàng)建一個reader從 topic-A 消費(fèi)消息 r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", MaxBytes: 10e6, // 10MB CommitInterval: time.Second, // 每秒刷新一次提交給 Kafka })
Writer
向Kafka發(fā)送消息,除了使用基于Conn的低級API,kafka-go包還提供了更高級別的 Writer 類型。大多數(shù)情況下使用Writer即可滿足條件,它支持以下特性。
- 對錯誤進(jìn)行自動重試和重新連接。
- 在可用分區(qū)之間可配置的消息分布。
- 向Kafka同步或異步寫入消息。
- 使用Context的異步取消。
- 關(guān)閉時清除掛起的消息以支持正常關(guān)閉。
- 在發(fā)布消息之前自動創(chuàng)建不存在的topic。
發(fā)送消息
// 創(chuàng)建一個writer 向topic-A發(fā)送消息 w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.LeastBytes{}, // 指定分區(qū)的balancer模式為最小字節(jié)分布 RequiredAcks: kafka.RequireAll, // ack模式 Async: true, // 異步 } err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("Hello World!"), }, kafka.Message{ Key: []byte("Key-B"), Value: []byte("One!"), }, kafka.Message{ Key: []byte("Key-C"), Value: []byte("Two!"), }, ) if err != nil { log.Fatal("failed to write messages:", err) } if err := w.Close(); err != nil { log.Fatal("failed to close writer:", err) }
創(chuàng)建不存在的topic
如果給Writer配置了AllowAutoTopicCreation:true,那么當(dāng)發(fā)送消息至某個不存在的topic時,則會自動創(chuàng)建topic。
// 創(chuàng)建不存在的topic // 如果給Writer配置了AllowAutoTopicCreation:true,那么當(dāng)發(fā)送消息至某個不存在的topic時,則會自動創(chuàng)建topic。 func writeByWriter2() { writer := kafka.Writer{ Addr: kafka.TCP("192.168.2.204:9092"), Topic: "kafka-test-topic", AllowAutoTopicCreation: true, //自動創(chuàng)建topic } messages := []kafka.Message{ { Key: []byte("Key-A"), Value: []byte("Hello World!"), }, { Key: []byte("Key-B"), Value: []byte("One!"), }, { Key: []byte("Key-C"), Value: []byte("Tow!"), }, } const retries = 3 //重試3次 for i := 0; i < retries; i++ { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := writer.WriteMessages(ctx, messages...) if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue } if err != nil { log.Fatal("unexpected error %v", err) } break } //關(guān)閉Writer if err := writer.Close(); err != nil { log.Fatal("failed to close writer:", err) } }
寫入多個topic
通常,WriterConfig.Topic用于初始化單個topic的Writer。通過去掉WriterConfig中的Topic配置,分別設(shè)置每條消息的message.topic,可以實現(xiàn)將消息發(fā)送至多個topic。
w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), // 注意: 當(dāng)此處不設(shè)置Topic時,后續(xù)的每條消息都需要指定Topic Balancer: &kafka.LeastBytes{}, } err := w.WriteMessages(context.Background(), // 注意: 每條消息都需要指定一個 Topic, 否則就會報錯 kafka.Message{ Topic: "topic-A", Key: []byte("Key-A"), Value: []byte("Hello World!"), }, kafka.Message{ Topic: "topic-B", Key: []byte("Key-B"), Value: []byte("One!"), }, kafka.Message{ Topic: "topic-C", Key: []byte("Key-C"), Value: []byte("Two!"), }, ) if err != nil { log.Fatal("failed to write messages:", err) } if err := w.Close(); err != nil { log.Fatal("failed to close writer:", err) }
注意:Writer中的Topic和Message中的Topic是互斥的,同一時刻有且只能設(shè)置一處。
其他配置
TLS
對于基本的 Conn 類型或在 Reader/Writer 配置中,可以在Dialer中設(shè)置TLS選項。如果 TLS 字段為空,則它將不啟用TLS 連接。
注意:不在Conn/Reder/Writer上配置TLS,連接到啟用TLS的Kafka集群,可能會出現(xiàn)io.ErrUnexpectedEOF錯誤。
Connection
dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: &tls.Config{...tls config...}, // 指定TLS配置 } conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: &tls.Config{...tls config...}, // 指定TLS配置 } r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", Dialer: dialer, })
Writer
創(chuàng)建Writer時可以按如下方式指定TLS配置。
w := kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, Transport: &kafka.Transport{ TLS: &tls.Config{}, // 指定TLS配置 }, }
到此這篇關(guān)于Go操作Kafka的實現(xiàn)示例(kafka-go)的文章就介紹到這了,更多相關(guān)Go操作Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang 并發(fā)編程之生產(chǎn)者消費(fèi)者詳解
這篇文章主要介紹了golang 并發(fā)編程之生產(chǎn)者消費(fèi)者詳解,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05Go語言MySQLCURD數(shù)據(jù)庫操作示例詳解
這篇文章主要為大家介紹了Go語言MySQLCURD數(shù)據(jù)庫操作示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12Golang使用channel實現(xiàn)一個優(yōu)雅退出功能
最近補(bǔ)?Golang?channel?方面八股的時候發(fā)現(xiàn)用?channel?實現(xiàn)一個優(yōu)雅退出功能好像不是很難,之前寫的?HTTP?框架剛好也不支持優(yōu)雅退出功能,于是就參考了?Hertz?優(yōu)雅退出方面的代碼,為我的?PIANO?補(bǔ)足了這個?feature2023-03-03Go語言關(guān)于幾種深度拷貝(deepcopy)方法的性能對比
這篇文章主要介紹了Go語言關(guān)于幾種深度拷貝(deepcopy)方法的性能對比,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01深入探討Go語言中的map是否是并發(fā)安全以及解決方法
這篇文章主要來和大家探討?Go?語言中的?map?是否是并發(fā)安全的,并提供三種方案來解決并發(fā)問題,文中的示例代碼講解詳細(xì),需要的可以參考一下2023-05-05詳解Go語言如何實現(xiàn)類似Python中的with上下文管理器
熟悉?Python?的同學(xué)應(yīng)該知道?Python?中的上下文管理器非常好用,那么在?Go?中是否也能實現(xiàn)上下文管理器呢,下面小編就來和大家仔細(xì)講講吧2023-07-07