Go操作Kafka的實(shí)現(xiàn)示例(kafka-go)
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),本文介紹了如何使用kafka-go這個(gè)庫實(shí)現(xiàn)Go語言與kafka的交互。
Go社區(qū)中目前有三個(gè)比較常用的kafka客戶端庫 , 它們各有特點(diǎn)。
首先是IBM/sarama(這個(gè)庫已經(jīng)由Shopify轉(zhuǎn)給了IBM),之前我寫過一篇使用sarama操作Kafka的教程,相較于sarama, kafka-go 更簡單、更易用。
segmentio/kafka-go 是純Go實(shí)現(xiàn),提供了與kafka交互的低級(jí)別和高級(jí)別兩套API,同時(shí)也支持Context。
此外社區(qū)中另一個(gè)比較常用的confluentinc/confluent-kafka-go,它是一個(gè)基于cgo的librdkafka包裝,在項(xiàng)目中使用它會(huì)引入對(duì)C庫的依賴。
準(zhǔn)備Kafka環(huán)境
這里推薦使用Docker Compose快速搭建一套本地開發(fā)環(huán)境。
以下docker-compose.yml文件用來搭建一套單節(jié)點(diǎn)zookeeper和單節(jié)點(diǎn)kafka環(huán)境,并且在8080端口提供kafka-ui管理界面。
version: '2.1'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
hostname: zoo1
container_name: zoo1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
kafka1:
image: confluentinc/cp-kafka:7.3.2
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "29092:29092"
- "9999:9999"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_JMX_PORT: 9999
KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1}
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
depends_on:
- zoo1
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka1
environment:
DYNAMIC_CONFIG_ENABLED: "TRUE"
將上述docker-compose.yml文件在本地保存,在同一目錄下執(zhí)行以下命令啟動(dòng)容器。
docker-compose up -d
容器啟動(dòng)后,使用瀏覽器打開127.0.0.1:8080 即可看到如下kafka-ui界面。

點(diǎn)擊頁面右側(cè)的“Configure new cluster”按鈕,配置kafka服務(wù)連接信息。

填寫完信息后,點(diǎn)擊頁面下方的“Submit”按鈕提交即可。

安裝kafka-go
執(zhí)行以下命令下載 kafka-go依賴。
go get github.com/segmentio/kafka-go
注意:kafka-go 需要 Go 1.15或更高版本。
kafka-go使用指南
kafka-go 提供了兩套與Kafka交互的API。
- 低級(jí)別( low-level):基于與 Kafka 服務(wù)器的原始網(wǎng)絡(luò)連接實(shí)現(xiàn)。
- 高級(jí)別(high-level):對(duì)于常用讀寫操作封裝了一套更易用的API。
通常建議直接使用高級(jí)別的交互API。
Connection
Conn 類型是 kafka-go 包的核心。它代表與 Kafka broker之間的連接?;谒鼘?shí)現(xiàn)了一套與Kafka交互的低級(jí)別 API。
發(fā)送消息
下面是連接至Kafka之后,使用Conn發(fā)送消息的代碼示例。
// writeByConn 基于Conn發(fā)送消息
func writeByConn() {
topic := "my-topic"
partition := 0
// 連接至Kafka集群的Leader節(jié)點(diǎn)
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// 設(shè)置發(fā)送消息的超時(shí)時(shí)間
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
// 發(fā)送消息
_, err = conn.WriteMessages(
kafka.Message{Value: []byte("one!")},
kafka.Message{Value: []byte("two!")},
kafka.Message{Value: []byte("three!")},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
// 關(guān)閉連接
if err := conn.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
消費(fèi)消息
// readByConn 連接至kafka后接收消息
func readByConn() {
// 指定要連接的topic和partition
topic := "my-topic"
partition := 0
// 連接至Kafka的leader節(jié)點(diǎn)
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal("failed to dial leader:", err)
}
// 設(shè)置讀取超時(shí)時(shí)間
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
// 讀取一批消息,得到的batch是一系列消息的迭代器
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
// 遍歷讀取消息
b := make([]byte, 10e3) // 10KB max per message
for {
n, err := batch.Read(b)
if err != nil {
break
}
fmt.Println(string(b[:n]))
}
// 關(guān)閉batch
if err := batch.Close(); err != nil {
log.Fatal("failed to close batch:", err)
}
// 關(guān)閉連接
if err := conn.Close(); err != nil {
log.Fatal("failed to close connection:", err)
}
}
使用batch.Read更高效一些,但是需要根據(jù)消息長度選擇合適的buffer(上述代碼中的b),如果傳入的buffer太?。ㄏ⒀b不下)就會(huì)返回io.ErrShortBuffer錯(cuò)誤。
如果不考慮內(nèi)存分配的效率問題,也可以按以下代碼使用batch.ReadMessage讀取消息。
for {
msg, err := batch.ReadMessage()
if err != nil {
break
}
fmt.Println(string(msg.Value))
}
創(chuàng)建topic
當(dāng)Kafka關(guān)閉自動(dòng)創(chuàng)建topic的設(shè)置時(shí),可按如下方式創(chuàng)建topic。
// createTopicByConn 創(chuàng)建topic
func createTopicByConn() {
// 指定要?jiǎng)?chuàng)建的topic名稱
topic := "my-topic"
// 連接至任意kafka節(jié)點(diǎn)
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
// 獲取當(dāng)前控制節(jié)點(diǎn)信息
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var controllerConn *kafka.Conn
// 連接至leader節(jié)點(diǎn)
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{
{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
// 創(chuàng)建topic
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
}
通過非leader節(jié)點(diǎn)連接leader節(jié)點(diǎn)
下面的示例代碼演示了如何通過已有的非leader節(jié)點(diǎn)的Conn,連接至 leader節(jié)點(diǎn)。
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
// 獲取當(dāng)前控制節(jié)點(diǎn)信息
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
var connLeader *kafka.Conn
connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer connLeader.Close()
獲取topic列表
conn, err := kafka.Dial("tcp", "localhost:9092")
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
panic(err.Error())
}
m := map[string]struct{}{}
// 遍歷所有分區(qū)取topic
for _, p := range partitions {
m[p.Topic] = struct{}{}
}
for k := range m {
fmt.Println(k)
}
Reader
Reader是由 kafka-go 包提供的另一個(gè)概念,對(duì)于從單個(gè)主題-分區(qū)(topic-partition)消費(fèi)消息這種典型場景,使用它能夠簡化代碼。Reader 還實(shí)現(xiàn)了自動(dòng)重連和偏移量管理,并支持使用 Context 支持異步取消和超時(shí)的 API。
注意: 當(dāng)進(jìn)程退出時(shí),必須在 Reader 上調(diào)用 Close() 。Kafka服務(wù)器需要一個(gè)優(yōu)雅的斷開連接來阻止它繼續(xù)嘗試向已連接的客戶端發(fā)送消息。如果進(jìn)程使用 SIGINT (shell 中的 Ctrl-C)或 SIGTERM (如 docker stop 或 kubernetes start)終止,那么下面給出的示例不會(huì)調(diào)用 Close()。當(dāng)同一topic上有新Reader連接時(shí),可能導(dǎo)致延遲(例如,新進(jìn)程啟動(dòng)或新容器運(yùn)行)。在這種場景下應(yīng)使用signal.Notify處理程序在進(jìn)程關(guān)閉時(shí)關(guān)閉Reader。
消費(fèi)消息
下面的代碼演示了如何使用Reader連接至Kafka消費(fèi)消息。
// readByReader 通過Reader接收消息
func readByReader() {
// 創(chuàng)建Reader
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "topic-A",
Partition: 0,
MaxBytes: 10e6, // 10MB
})
r.SetOffset(42) // 設(shè)置Offset
// 接收消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
// 程序退出前關(guān)閉Reader
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
}
消費(fèi)者組
kafka-go支持消費(fèi)者組,包括broker管理的offset。要啟用消費(fèi)者組,只需在 ReaderConfig 中指定 GroupID。
使用消費(fèi)者組時(shí),ReadMessage 會(huì)自動(dòng)提交偏移量。
// 創(chuàng)建一個(gè)reader,指定GroupID,從 topic-A 消費(fèi)消息
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id", // 指定消費(fèi)者組id
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
})
// 接收消息
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}
// 程序退出前關(guān)閉Reader
if err := r.Close(); err != nil {
log.Fatal("failed to close reader:", err)
}
在使用消費(fèi)者組時(shí)會(huì)有以下限制:
(*Reader).SetOffset 當(dāng)設(shè)置了GroupID時(shí)會(huì)返回錯(cuò)誤
(*Reader).Offset 當(dāng)設(shè)置了GroupID時(shí)會(huì)永遠(yuǎn)返回 -1
(*Reader).Lag 當(dāng)設(shè)置了GroupID時(shí)會(huì)永遠(yuǎn)返回 -1
(*Reader).ReadLag 當(dāng)設(shè)置了GroupID時(shí)會(huì)返回錯(cuò)誤
(*Reader).Stats 當(dāng)設(shè)置了GroupID時(shí)會(huì)返回一個(gè)-1的分區(qū)
顯式提交
kafka-go 也支持顯式提交。當(dāng)需要顯式提交時(shí)不要調(diào)用 ReadMessage,而是調(diào)用 FetchMessage獲取消息,然后調(diào)用 CommitMessages 顯式提交。
ctx := context.Background()
for {
// 獲取消息
m, err := r.FetchMessage(ctx)
if err != nil {
break
}
// 處理消息
fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
// 顯式提交
if err := r.CommitMessages(ctx, m); err != nil {
log.Fatal("failed to commit messages:", err)
}
}
在消費(fèi)者組中提交消息時(shí),具有給定主題/分區(qū)的最大偏移量的消息確定該分區(qū)的提交偏移量的值。例如,如果通過調(diào)用 FetchMessage 獲取了單個(gè)分區(qū)的偏移量為 1、2 和 3 的消息,則使用偏移量為3的消息調(diào)用 CommitMessages 也將導(dǎo)致該分區(qū)的偏移量為 1 和 2 的消息被提交。
管理提交間隔
默認(rèn)情況下,調(diào)用CommitMessages將同步向Kafka提交偏移量。為了提高性能,可以在ReaderConfig中設(shè)置CommitInterval來定期向Kafka提交偏移。
// 創(chuàng)建一個(gè)reader從 topic-A 消費(fèi)消息
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MaxBytes: 10e6, // 10MB
CommitInterval: time.Second, // 每秒刷新一次提交給 Kafka
})
Writer
向Kafka發(fā)送消息,除了使用基于Conn的低級(jí)API,kafka-go包還提供了更高級(jí)別的 Writer 類型。大多數(shù)情況下使用Writer即可滿足條件,它支持以下特性。
- 對(duì)錯(cuò)誤進(jìn)行自動(dòng)重試和重新連接。
- 在可用分區(qū)之間可配置的消息分布。
- 向Kafka同步或異步寫入消息。
- 使用Context的異步取消。
- 關(guān)閉時(shí)清除掛起的消息以支持正常關(guān)閉。
- 在發(fā)布消息之前自動(dòng)創(chuàng)建不存在的topic。
發(fā)送消息
// 創(chuàng)建一個(gè)writer 向topic-A發(fā)送消息
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.LeastBytes{}, // 指定分區(qū)的balancer模式為最小字節(jié)分布
RequiredAcks: kafka.RequireAll, // ack模式
Async: true, // 異步
}
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
創(chuàng)建不存在的topic
如果給Writer配置了AllowAutoTopicCreation:true,那么當(dāng)發(fā)送消息至某個(gè)不存在的topic時(shí),則會(huì)自動(dòng)創(chuàng)建topic。
// 創(chuàng)建不存在的topic
// 如果給Writer配置了AllowAutoTopicCreation:true,那么當(dāng)發(fā)送消息至某個(gè)不存在的topic時(shí),則會(huì)自動(dòng)創(chuàng)建topic。
func writeByWriter2() {
writer := kafka.Writer{
Addr: kafka.TCP("192.168.2.204:9092"),
Topic: "kafka-test-topic",
AllowAutoTopicCreation: true, //自動(dòng)創(chuàng)建topic
}
messages := []kafka.Message{
{
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
{
Key: []byte("Key-B"),
Value: []byte("One!"),
},
{
Key: []byte("Key-C"),
Value: []byte("Tow!"),
},
}
const retries = 3
//重試3次
for i := 0; i < retries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := writer.WriteMessages(ctx, messages...)
if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
time.Sleep(time.Millisecond * 250)
continue
}
if err != nil {
log.Fatal("unexpected error %v", err)
}
break
}
//關(guān)閉Writer
if err := writer.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
}
寫入多個(gè)topic
通常,WriterConfig.Topic用于初始化單個(gè)topic的Writer。通過去掉WriterConfig中的Topic配置,分別設(shè)置每條消息的message.topic,可以實(shí)現(xiàn)將消息發(fā)送至多個(gè)topic。
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
// 注意: 當(dāng)此處不設(shè)置Topic時(shí),后續(xù)的每條消息都需要指定Topic
Balancer: &kafka.LeastBytes{},
}
err := w.WriteMessages(context.Background(),
// 注意: 每條消息都需要指定一個(gè) Topic, 否則就會(huì)報(bào)錯(cuò)
kafka.Message{
Topic: "topic-A",
Key: []byte("Key-A"),
Value: []byte("Hello World!"),
},
kafka.Message{
Topic: "topic-B",
Key: []byte("Key-B"),
Value: []byte("One!"),
},
kafka.Message{
Topic: "topic-C",
Key: []byte("Key-C"),
Value: []byte("Two!"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
if err := w.Close(); err != nil {
log.Fatal("failed to close writer:", err)
}
注意:Writer中的Topic和Message中的Topic是互斥的,同一時(shí)刻有且只能設(shè)置一處。
其他配置
TLS
對(duì)于基本的 Conn 類型或在 Reader/Writer 配置中,可以在Dialer中設(shè)置TLS選項(xiàng)。如果 TLS 字段為空,則它將不啟用TLS 連接。
注意:不在Conn/Reder/Writer上配置TLS,連接到啟用TLS的Kafka集群,可能會(huì)出現(xiàn)io.ErrUnexpectedEOF錯(cuò)誤。
Connection
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{...tls config...}, // 指定TLS配置
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
Dialer: dialer,
})
Writer
創(chuàng)建Writer時(shí)可以按如下方式指定TLS配置。
w := kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: "topic-A",
Balancer: &kafka.Hash{},
Transport: &kafka.Transport{
TLS: &tls.Config{}, // 指定TLS配置
},
}到此這篇關(guān)于Go操作Kafka的實(shí)現(xiàn)示例(kafka-go)的文章就介紹到這了,更多相關(guān)Go操作Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang學(xué)習(xí)之反射機(jī)制的用法詳解
反射的本質(zhì)就是在程序運(yùn)行的時(shí)候,獲取對(duì)象的類型信息和內(nèi)存結(jié)語構(gòu),反射是把雙刃劍,功能強(qiáng)大但可讀性差。本文將詳細(xì)講講Golang中的反射機(jī)制,感興趣的可以了解一下2022-06-06
Golang創(chuàng)建構(gòu)造函數(shù)的方法超詳細(xì)講解
構(gòu)造器一般面向?qū)ο笳Z言的典型特性,用于初始化變量。Go語言沒有任何具體構(gòu)造器,但我們能使用該特性去初始化變量。本文介紹不同類型構(gòu)造器的差異及其應(yīng)用場景2023-01-01
golang jwt+token驗(yàn)證的實(shí)現(xiàn)
這篇文章主要介紹了golang jwt+token驗(yàn)證的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10
深入解析Go語言中HTTP請(qǐng)求處理的底層實(shí)現(xiàn)
本文將詳細(xì)介紹?Go?語言中?HTTP?請(qǐng)求處理的底層機(jī)制,包括工作流程、創(chuàng)建?Listen?Socket?監(jiān)聽端口、接收客戶端請(qǐng)求并建立連接以及處理客戶端請(qǐng)求并返回響應(yīng)等,需要的朋友可以參考下2023-05-05
詳解Go語言Slice作為函數(shù)參數(shù)的使用
Slice切片在Go語言中實(shí)質(zhì)是一種結(jié)構(gòu)體類型,本文詳細(xì)的介紹了Go語言Slice作為函數(shù)參數(shù)的使用,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07

