Go庫(kù)實(shí)現(xiàn)Kafka消息的發(fā)送與接收(docker和k3s安裝kafka)
kafka是什么
Kafka傳統(tǒng)定義 :Kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(Message Queue),主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。 發(fā)布/訂閱 :消息的發(fā)布者不會(huì)將消息直接發(fā)送給特定的訂閱者,而是將發(fā)布的消息 分為不同的類別,訂閱者只接收感興趣的消息。
Kafka最新定義:Kafka是一個(gè)開源的分布式事件流平臺(tái)(Event Streaming Platform),被數(shù)千家公司用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成和關(guān)鍵任務(wù)應(yīng)用。
消息隊(duì)列的應(yīng)用場(chǎng)景無外乎是:削峰填谷、應(yīng)用解耦、異步處理等等,具體使用案例我們?cè)谥爸vrabbitmq基礎(chǔ)篇已經(jīng)詳述過,這里不在做講述,這里說一下消息隊(duì)列的兩種模型:
- 點(diǎn)對(duì)點(diǎn)模型 :也叫消息隊(duì)列模型。如果拿上面那個(gè)“民間版”的定義來說,那么系統(tǒng) A 發(fā)送的消息只能被系統(tǒng) B 接收,其他任何系統(tǒng)都不能讀取 A 發(fā)送的消息。日常生活的例子比如電話客服就屬于這種模型:同一個(gè)客戶呼入電話只能被一位客服人員處理,第二個(gè)客服人員不能為該客戶服務(wù)。
- 發(fā)布 / 訂閱模型 :與上面不同的是,它有一個(gè)主題(Topic)的概念,你可以理解成邏輯語義相近的消息容器。該模型也有發(fā)送方和接收方,只不過提法不同。發(fā)送方也稱為發(fā)布者(Publisher),接收方稱為訂閱者(Subscriber)。和點(diǎn)對(duì)點(diǎn)模型不同的是,這個(gè)模型可能存在多個(gè)發(fā)布者向相同的主題發(fā)送消息,而訂閱者也可能存在多個(gè),它們都能接收到相同主題的消息。生活中的報(bào)紙訂閱就是一種典型的發(fā)布 / 訂閱模型。
kafka基礎(chǔ)架構(gòu)和核心概念

在 Kafka 中,發(fā)布訂閱的對(duì)象是 主題(Topic ),你可以為每個(gè)業(yè)務(wù)、每個(gè)應(yīng)用甚至是每類數(shù)據(jù)都創(chuàng)建專屬的主題。
生產(chǎn)者(Producer) :消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端,生產(chǎn)者程序通常持續(xù)不斷地向一個(gè)或多個(gè)主題發(fā)送消息。
消費(fèi)者(Consumer) :消息消費(fèi)者,向 kafka broker 取消息的客戶端,消費(fèi)者就是訂閱這些主題消息的客戶端應(yīng)用程序。
和生產(chǎn)者類似,消費(fèi)者也能夠同時(shí)訂閱多個(gè)主題的消息。我們把生產(chǎn)者和消費(fèi)者統(tǒng)稱為客戶端(Clients)。你可以同時(shí)運(yùn)行多個(gè)生產(chǎn)者和消費(fèi)者實(shí)例,這些實(shí)例會(huì)不斷地向 Kafka 集群中的多個(gè)主題生產(chǎn)和消費(fèi)消息。
消費(fèi)者組Consumer Group** (CG)**:由多個(gè) consumer 組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi),消費(fèi)者組之間互不影響。所有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。
Broker :一臺(tái) kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè) broker 可以容納多個(gè) topic。
主題(topic) :可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic;
分區(qū)(Partition) :為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上, 一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列;
副本(Replica) :副本,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 partition 數(shù)據(jù)不丟失,且 kafka 仍然能夠繼續(xù)工作,kafka 提供了副本機(jī)制,一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本, 一個(gè) leader 和若干個(gè) follower 。
leader :每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì) 象都是 leader。
follower :每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從 leader 中同步數(shù)據(jù),保持和 leader 數(shù)據(jù) 的同步。leader 發(fā)生故障時(shí),某個(gè) follower 會(huì)成為新的 follower
副本的工作機(jī)制也很簡(jiǎn)單:生產(chǎn)者總是向領(lǐng)導(dǎo)者副本寫消息;而消費(fèi)者總是從領(lǐng)導(dǎo)者副本讀消息。至于追隨者副本,它只做一件事:向領(lǐng)導(dǎo)者副本發(fā)送請(qǐng)求,請(qǐng)求領(lǐng)導(dǎo)者把最新生產(chǎn)的消息發(fā)給它,這樣它能保持與領(lǐng)導(dǎo)者的同步。
Kafka 使用消息日志(Log)來保存數(shù)據(jù),一個(gè)日志就是磁盤上一個(gè)只能追加寫(Append-only)消息的物理文件。因?yàn)橹荒茏芳訉懭耄?nbsp;避免了緩慢的隨機(jī) I/O 操作,改為性能較好的順序 I/O 寫操作,這也是實(shí)現(xiàn) Kafka 高吞吐量特性的一個(gè)重要手段 。不過如果你不停地向一個(gè)日志寫入消息,最終也會(huì)耗盡所有的磁盤空間,因此 Kafka 必然要定期地刪除消息以回收磁盤。怎么刪除呢?簡(jiǎn)單來說就是通過日志段(Log Segment)機(jī)制。在 Kafka 底層,一個(gè)日志又近一步細(xì)分成多個(gè)日志段,消息被追加寫到當(dāng)前最新的日志段中,當(dāng)寫滿了一個(gè)日志段后,Kafka 會(huì)自動(dòng)切分出一個(gè)新的日志段,并將老的日志段封存起來。Kafka 在后臺(tái)還有定時(shí)任務(wù)會(huì)定期地檢查老的日志段是否能夠被刪除,從而實(shí)現(xiàn)回收磁盤空間的目的
Kafka優(yōu)缺點(diǎn)
優(yōu)點(diǎn)
高吞吐量:Kafka的順序日志機(jī)制和高可用性設(shè)計(jì)使其在高并發(fā)場(chǎng)景下表現(xiàn)出色。
擴(kuò)展性強(qiáng):通過分區(qū)和復(fù)制機(jī)制,Kafka能夠輕松擴(kuò)展到多個(gè)節(jié)點(diǎn)。 ** easy to use**:Kafka提供了豐富的 API 和工具支持,簡(jiǎn)化了集成和管理。
缺點(diǎn)
學(xué)習(xí)曲線:Kafka的發(fā)布-訂閱模型和分布式架構(gòu)對(duì)初次接觸者來說可能較為復(fù)雜。
配置敏感:Kafka的性能和穩(wěn)定性高度依賴于正確的配置和維護(hù)。
合規(guī)性與安全性在金融、醫(yī)療等高敏感領(lǐng)域,Kafka需要滿足嚴(yán)格的合規(guī)要求??梢酝ㄟ^配置安全機(jī)制(如認(rèn)證、授權(quán))來確保數(shù)據(jù)的完整性和安全性。
Kafka注意事項(xiàng)
高并發(fā)與分區(qū)的管理在高并發(fā)場(chǎng)景下,合理的分區(qū)劃分和負(fù)載均衡是關(guān)鍵。如果分區(qū)數(shù)量過多或負(fù)載不平衡,可能導(dǎo)致節(jié)點(diǎn)資源浪費(fèi)或消息延遲。
配置參數(shù)的優(yōu)化Kafka的性能參數(shù)(如生產(chǎn)速率、消費(fèi)速率、分區(qū)數(shù)等)需要根據(jù)實(shí)際應(yīng)用場(chǎng)景進(jìn)行調(diào)整。過高的生產(chǎn)速率可能導(dǎo)致消息堆積,而過低的消費(fèi)速率則會(huì)增加客戶端的負(fù)載。
網(wǎng)絡(luò)穩(wěn)定性Kafka對(duì)網(wǎng)絡(luò)性能有較高的要求。在實(shí)際部署中,需要確保集群內(nèi)各節(jié)點(diǎn)之間的網(wǎng)絡(luò)帶寬足夠高,避免因網(wǎng)絡(luò)延遲或分區(qū)不一致導(dǎo)致的消息丟失或延遲處理。
集群的高可用性Kafka的高可用性依賴于集群的配置和管理。在部署時(shí),需要確保節(jié)點(diǎn)的硬件配置一致,定期監(jiān)控集群狀態(tài),并及時(shí)處理節(jié)點(diǎn)故障。
監(jiān)控與運(yùn)維Kafka的監(jiān)控是保障系統(tǒng)穩(wěn)定運(yùn)行的關(guān)鍵??梢酝ㄟ^工具(如Prometheus、Grafana)實(shí)時(shí)監(jiān)控集群的性能、消息隊(duì)列的健康狀況以及消費(fèi)者組的負(fù)載情況。
docker安裝命令,其中172.16.11.111是宿主機(jī)ip,14818是宿主機(jī)端口,對(duì)應(yīng)容器端口9092:
docker run -d \ --name kafka \ -p 14818:9092 \ -p 9093:9093 \ -v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \ -e TZ=Asia/Shanghai \ -e KAFKA_NODE_ID=1 \ -e KAFKA_PROCESS_ROLES=broker,controller \ -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.11.111:14818 \ -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \ -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \ -e KAFKA_NUM_PARTITIONS=3 \ -e KAFKA_LOG_DIRS=/tmp/kraft-combined-logs \ -e CLUSTER_ID=5L6g3nShT-eMCtK--X86sw \ apache/kafka-native:4.1.0
k3s的yaml,其中172.16.11.111是宿主機(jī)ip,14818是宿主機(jī)端口,對(duì)應(yīng)容器端口9092:
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: kafka
name: kafka
namespace: moonfdd
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
initContainers:
- name: kafka-fix-data-volume-permissions
image: alpine
imagePullPolicy: IfNotPresent
command:
- sh
- -c
- "chown -R 1000:1000 /tmp/kraft-combined-logs"
volumeMounts:
- mountPath: /tmp/kraft-combined-logs
name: volv
containers:
- env:
- name: TZ
value: Asia/Shanghai
- name: KAFKA_NODE_ID
value: "1"
- name: KAFKA_PROCESS_ROLES
value: broker,controller
- name: KAFKA_LISTENERS
value: PLAINTEXT://:9092,CONTROLLER://:9093
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://172.16.11.111:14818
- name: KAFKA_CONTROLLER_LISTENER_NAMES
value: CONTROLLER
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- name: KAFKA_CONTROLLER_QUORUM_VOTERS
value: 1@localhost:9093
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "1"
- name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
value: "1"
- name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
value: "0"
- name: KAFKA_NUM_PARTITIONS
value: "3"
- name: KAFKA_LOG_DIRS
value: /tmp/kraft-combined-logs
- name: CLUSTER_ID
value: "5L6g3nShT-eMCtK--X86sw" # 固定集群ID,僅首次啟動(dòng)格式化使用
image: 'apache/kafka-native:4.1.0'
imagePullPolicy: IfNotPresent
name: kafka
volumeMounts:
- mountPath: /tmp/kraft-combined-logs
name: volv
volumes:
- hostPath:
path: /root/k8s/moonfdd/kafka/tmp/kraft-combined-logs
type: DirectoryOrCreate
name: volv
---
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka
name: kafka
namespace: moonfdd
spec:
ports:
- port: 9092
protocol: TCP
targetPort: 9092
name: 9092-9092
- port: 9093
protocol: TCP
targetPort: 9093
name: 9093-9093
selector:
app: kafka
type: NodePort
go發(fā)送kafka消息:github.com/segmentio/kafka-go
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// 創(chuàng)建一個(gè)Kafka writer(Producer)
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"172.16.11.111:14818"}, // Kafka broker 地址
Topic: "test-topic", // 發(fā)送的 topic
Balancer: &kafka.LeastBytes{}, // 負(fù)載均衡策略
})
// 寫入消息
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte("Hello Kafka from Go!"),
},
)
if err != nil {
log.Fatalf("could not write message: %v", err)
}
log.Println("Message sent successfully!")
// 關(guān)閉 writer
w.Close()
}
go接收kafka消息:github.com/segmentio/kafka-go
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
// 創(chuàng)建 Kafka reader(Consumer)
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"172.16.11.111:14818"}, // Kafka broker 地址
Topic: "test-topic", // 訂閱的 topic
GroupID: "my-consumer-group", // 消費(fèi)者組,確保相同組會(huì)讀取上一 offset
MinBytes: 10e3, // 最小fetch字節(jié)數(shù)
MaxBytes: 10e6, // 最大fetch字節(jié)數(shù)
})
for {
// 讀取消息(會(huì)自動(dòng)從上次的 offset 開始)
m, err := r.ReadMessage(context.Background())
if err != nil {
log.Fatalf("could not read message: %v", err)
}
log.Printf("offset:%d | key:%s | value:%s\n", m.Offset, string(m.Key), string(m.Value))
}
// r.Close() // 如果你打算退出循環(huán)時(shí)關(guān)閉
}
go發(fā)送kafka消息:github.com/IBM/sarama
package main
import (
"fmt"
"log"
"time"
"github.com/IBM/sarama"
)
func main() {
// 配置生產(chǎn)者
config := sarama.NewConfig()
config.Producer.Return.Successes = true // 確保消息發(fā)送成功
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本確認(rèn)
config.Producer.Retry.Max = 3 // 重試次數(shù)
// 重要:配置客戶端使用正確的主機(jī)
config.Net.SASL.Enable = false
config.Net.TLS.Enable = false
config.Version = sarama.MaxVersion
// 創(chuàng)建同步生產(chǎn)者
producer, err := sarama.NewSyncProducer([]string{"172.16.11.111:14818"}, config)
if err != nil {
log.Fatalf("創(chuàng)建生產(chǎn)者失敗: %v", err)
}
defer producer.Close()
// 構(gòu)造消息
message := &sarama.ProducerMessage{
Topic: "test-topic",
Key: sarama.StringEncoder("message-key"),
Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka! %v", time.Now())),
}
// 發(fā)送消息
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatalf("發(fā)送消息失敗: %v", err)
}
fmt.Printf("消息發(fā)送成功! 分區(qū): %d, 偏移量: %d\n", partition, offset)
}
go接收kafka消息:github.com/IBM/sarama
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"github.com/IBM/sarama"
)
type Consumer struct{}
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
// 會(huì)話初始化,可以在這里做一些準(zhǔn)備工作
return nil
}
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
// 會(huì)話結(jié)束時(shí)的清理操作
return nil
}
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// claim.Messages() 會(huì)不斷返回新消息
for msg := range claim.Messages() {
fmt.Printf("Topic:%s Partition:%d Offset:%d Value:%s\n",
msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 標(biāo)記該消息已被處理,Kafka會(huì)自動(dòng)保存offset
session.MarkMessage(msg, "")
}
return nil
}
func main() {
// Kafka集群地址
brokers := []string{"172.16.11.111:14818"}
groupID := "my-group" // 消費(fèi)者組ID,保持不變才能從上次offset消費(fèi)
topics := []string{"test-topic"}
// 配置
config := sarama.NewConfig()
config.Version = sarama.MaxVersion // Kafka版本
config.Consumer.Return.Errors = true
// 非首次啟動(dòng)時(shí)自動(dòng)從上次位置開始
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// OffsetNewest: 如果沒有歷史offset,從最新開始;
// OffsetOldest: 如果沒有歷史offset,從最舊開始。
// 創(chuàng)建消費(fèi)者組
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
log.Fatalf("Error creating consumer group: %v", err)
}
defer consumerGroup.Close()
consumer := &Consumer{}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for err := range consumerGroup.Errors() {
log.Printf("Error: %v", err)
}
}()
log.Println("Kafka consumer started...")
// 優(yōu)雅退出
go func() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, os.Interrupt)
<-sigchan
cancel()
}()
// 循環(huán)消費(fèi)
for {
if err := consumerGroup.Consume(ctx, topics, consumer); err != nil {
log.Printf("Error from consumer: %v", err)
}
// 檢查退出
if ctx.Err() != nil {
return
}
}
}
到此這篇關(guān)于Go庫(kù)實(shí)現(xiàn)Kafka消息的發(fā)送與接收(docker和k3s安裝kafka)的文章就介紹到這了,更多相關(guān)docker和k3s實(shí)現(xiàn)go語言發(fā)送和接收kafka消息內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
GO常見的錯(cuò)誤99%程序員會(huì)遇到(解決方法)
這篇文章主要介紹了GO常見的錯(cuò)誤99%程序員會(huì)遇到,本文給出了解決方法,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-01-01
go語言實(shí)現(xiàn)Elasticsearches批量修改查詢及發(fā)送MQ操作示例
這篇文章主要為大家介紹了go語言實(shí)現(xiàn)Elasticsearches批量修改查詢及發(fā)送MQ操作示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-04-04
Go語言實(shí)現(xiàn)字符串搜索算法Boyer-Moore
Boyer-Moore?算法是一種非常高效的字符串搜索算法,被廣泛的應(yīng)用于多種字符串搜索場(chǎng)景,下面我們就來學(xué)習(xí)一下如何利用Go語言實(shí)現(xiàn)這一字符串搜索算法吧2023-11-11
一文詳解go中如何實(shí)現(xiàn)定時(shí)任務(wù)
定時(shí)任務(wù)是指按照預(yù)定的時(shí)間間隔或特定時(shí)間點(diǎn)自動(dòng)執(zhí)行的計(jì)劃任務(wù)或操作,這篇文章主要為大家詳細(xì)介紹了go中是如何實(shí)現(xiàn)定時(shí)任務(wù)的,感興趣的可以了解下2023-11-11
GO語言學(xué)習(xí)之語句塊的實(shí)現(xiàn)
本文主要介紹了GO語言學(xué)習(xí)之語句塊的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-06-06
Golang?Template實(shí)現(xiàn)自定義函數(shù)的操作指南
這篇文章主要為大家詳細(xì)介紹了Golang如何利用Template實(shí)現(xiàn)自定義函數(shù)的操作,文中的示例代碼簡(jiǎn)潔易懂,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2025-02-02
Golang標(biāo)準(zhǔn)庫(kù)syscall詳解(什么是系統(tǒng)調(diào)用)
最近在研究go語言,發(fā)現(xiàn)go語言系統(tǒng)調(diào)用源碼只有調(diào)用函數(shù)的定義,今天通過本文給大家分享Golang標(biāo)準(zhǔn)庫(kù)syscall詳解及什么是系統(tǒng)調(diào)用,感興趣的朋友一起看看吧2021-05-05
Golang自動(dòng)追蹤GitHub上熱門AI項(xiàng)目
這篇文章主要為大家介紹了Golang自動(dòng)追蹤GitHub上熱門AI項(xiàng)目,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12

