欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go語言實(shí)現(xiàn)Kafka消息隊(duì)列的示例代碼

 更新時(shí)間:2025年07月15日 09:21:07   作者:Go Dgg  
本文主要介紹了Go語言實(shí)現(xiàn)Kafka消息隊(duì)列的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

一、Go語言快速實(shí)現(xiàn)Kafka消息隊(duì)列

安裝依賴庫

go get github.com/IBM/sarama

生產(chǎn)者代碼示例

package main

import (
	"log"
	"github.com/IBM/sarama"
)

func main() {
	// 1. 創(chuàng)建生產(chǎn)者配置
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true

	// 2. 連接Kafka集群
	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatal("連接失敗:", err)
	}
	defer producer.Close()

	// 3. 發(fā)送消息
	msg := &sarama.ProducerMessage{
		Topic: "test-topic",
		Value: sarama.StringEncoder("Hello Kafka!"),
	}
	
	partition, offset, err := producer.SendMessage(msg)
	if err != nil {
		log.Fatal("發(fā)送失敗:", err)
	}
	
	log.Printf("發(fā)送成功! 分區(qū):%d 偏移量:%d", partition, offset)
}

消費(fèi)者代碼示例

package main

import (
	"log"
	"github.com/IBM/sarama"
)

func main() {
	// 1. 創(chuàng)建消費(fèi)者
	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
	if err != nil {
		log.Fatal("連接失敗:", err)
	}
	defer consumer.Close()

	// 2. 訂閱指定分區(qū)(這里消費(fèi)分區(qū)0)
	partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetNewest)
	if err != nil {
		log.Fatal("訂閱失敗:", err)
	}
	defer partitionConsumer.Close()

	// 3. 持續(xù)消費(fèi)消息
	for msg := range partitionConsumer.Messages() {
		log.Printf("收到消息: 分區(qū):%d 偏移量:%d 內(nèi)容:%s", 
			msg.Partition, msg.Offset, string(msg.Value))
	}
}

二、Kafka與其他消息隊(duì)列對比

對比維度表

特性KafkaRabbitMQRedis StreamsRocketMQ
設(shè)計(jì)定位高吞吐日志處理企業(yè)級(jí)消息路由輕量級(jí)實(shí)時(shí)消息金融級(jí)可靠消息
吞吐量百萬級(jí)/秒萬級(jí)/秒十萬級(jí)/秒十萬級(jí)/秒
消息持久化磁盤持久化(默認(rèn)7天)內(nèi)存/磁盤可選內(nèi)存為主磁盤持久化
消息模式發(fā)布-訂閱點(diǎn)對點(diǎn)/發(fā)布訂閱發(fā)布訂閱發(fā)布訂閱
協(xié)議支持自定義協(xié)議AMQP協(xié)議RESP協(xié)議自定義協(xié)議
順序保證分區(qū)內(nèi)有序隊(duì)列有序全局無序隊(duì)列有序
延遲消息不支持支持支持支持
典型場景日志收集、流處理訂單系統(tǒng)、任務(wù)隊(duì)列實(shí)時(shí)通知、排行榜更新交易系統(tǒng)、金融場景
Go生態(tài)支持Sarama庫完善amqp庫穩(wěn)定go-redis庫支持官方SDK支持

三、核心差異說明

吞吐量對比

  • Kafka:采用順序磁盤寫入 + 零拷貝技術(shù),適合大數(shù)據(jù)量場景(如:單集群可達(dá)百萬TPS)
  • RabbitMQ:基于內(nèi)存的Erlang實(shí)現(xiàn),適合中小規(guī)模業(yè)務(wù)(如:電商訂單處理)

消息存儲(chǔ)機(jī)制

// Kafka的持久化示例(自動(dòng)創(chuàng)建日志文件)
kafka-topics.sh --create \
--topic user-log \
--partitions 3 \
--replication-factor 2 \
--config retention.ms=604800000  // 保留7天
  • Kafka:消息持久化到磁盤,支持TB級(jí)數(shù)據(jù)存儲(chǔ)
  • Redis:數(shù)據(jù)主要在內(nèi)存,重啟可能丟失(需配置持久化)

消息消費(fèi)模式

  • Kafka:消費(fèi)者主動(dòng)拉?。≒ull)
  • RabbitMQ:服務(wù)端推送(Push)

使用復(fù)雜度

// 對比初始化復(fù)雜度(以生產(chǎn)者為例)

// Kafka需要配置更多參數(shù)
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewHashPartitioner

// RabbitMQ連接更簡單
conn, _ := amqp.Dial("amqp://guest:guest@localhost:5672/")

四、選型建議

選擇Kafka當(dāng)

  • 需要處理日志流、點(diǎn)擊流等大數(shù)據(jù)量場景
  • 要求消息持久化存儲(chǔ)和回溯能力
  • 需要橫向擴(kuò)展的分布式系統(tǒng)

選擇其他隊(duì)列當(dāng)

  • RabbitMQ:需要復(fù)雜路由(如:延時(shí)隊(duì)列、死信隊(duì)列)
  • Redis Streams:需要極低延遲的實(shí)時(shí)通知
  • RocketMQ:金融場景要求強(qiáng)一致性

五、常見問題

如何保證Kafka消息順序?

  • 單分區(qū)內(nèi)保證順序
  • 發(fā)送時(shí)指定分區(qū)鍵(如用戶ID)
msg := &sarama.ProducerMessage{
    Topic: "order-events",
    Key:   sarama.StringEncoder(userID), // 相同Key進(jìn)入同一分區(qū)
    Value: sarama.StringEncoder("下單成功"),
}

如何避免重復(fù)消費(fèi)?

  • 啟用冪等生產(chǎn)者
config.Producer.Idempotent = true
config.Producer.RequiredAcks = sarama.WaitForAll

到此這篇關(guān)于Go語言實(shí)現(xiàn)Kafka消息隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)Go Kafka消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家! 

相關(guān)文章

  • Go語言自定義linter靜態(tài)檢查工具

    Go語言自定義linter靜態(tài)檢查工具

    這篇文章主要介紹了Go語言自定義linter靜態(tài)檢查工具,Go語言是一門編譯型語言,編譯器將高級(jí)語言翻譯成機(jī)器語言,會(huì)先對源代碼做詞法分析,詞法分析是將字符序列轉(zhuǎn)換為Token序列的過程,文章詳細(xì)介紹需要的小伙伴可以參考一下
    2022-05-05
  • Go語言內(nèi)置包的使用

    Go語言內(nèi)置包的使用

    本文主要介紹了Go語言內(nèi)置包的使用,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • Go工具鏈之代碼測試神器go?test詳解

    Go工具鏈之代碼測試神器go?test詳解

    這篇文章主要給大家介紹Go?工具鏈go?test,go?test?是?Go?工具鏈中的一個(gè)命令,用于編譯和運(yùn)行按照要求編寫的?Golang?測試代碼,并生成測試報(bào)告,感興趣的同學(xué)跟著小編一起來看看本文吧
    2023-07-07
  • 使用go操作redis的有序集合(zset)

    使用go操作redis的有序集合(zset)

    這篇文章主要介紹了使用go操作redis的有序集合(zset),具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Go語言中Select語句用法實(shí)例

    Go語言中Select語句用法實(shí)例

    這篇文章主要介紹了Go語言中Select語句用法,實(shí)例分析了select語句的原理與使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-02-02
  • Go語言reflect.TypeOf()和reflect.Type通過反射獲取類型信息

    Go語言reflect.TypeOf()和reflect.Type通過反射獲取類型信息

    這篇文章主要介紹了Go語言reflect.TypeOf()和reflect.Type通過反射獲取類型信息,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • go原子級(jí)內(nèi)存操作實(shí)現(xiàn)

    go原子級(jí)內(nèi)存操作實(shí)現(xiàn)

    原子級(jí)內(nèi)存操作是在多線程并發(fā)執(zhí)行時(shí),能夠確保某個(gè)內(nèi)存操作是不可中斷的操作,本文主要介紹了go原子級(jí)內(nèi)存操作實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-02-02
  • Golang拾遺之自定義類型和方法集詳解

    Golang拾遺之自定義類型和方法集詳解

    golang拾遺主要是用來記錄一些遺忘了的、平時(shí)從沒注意過的golang相關(guān)知識(shí)。這篇文章主要整理了一下Golang如何自定義類型和方法集,需要的可以參考一下
    2023-02-02
  • Golang中四種gRPC模式舉例詳解

    Golang中四種gRPC模式舉例詳解

    gRPC是一種進(jìn)程間通信技術(shù),在微服務(wù)和云原生領(lǐng)域都有著廣泛的應(yīng)用,下面這篇文章主要給大家介紹了關(guān)于Golang中四種gRPC模式的相關(guān)資料,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2024-03-03
  • Golang讀寫二進(jìn)制文件方法總結(jié)

    Golang讀寫二進(jìn)制文件方法總結(jié)

    使用?Golang?的?encoding/gob?包讀寫二進(jìn)制文件非常方便,而且代碼量也非常少,本文就來通過兩個(gè)示例帶大家了解一下encoding/gob的具體用法吧
    2023-05-05

最新評(píng)論