欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang微服務(wù)框架Kratos實現(xiàn)Kafka消息隊列的方法

 更新時間:2023年09月22日 09:48:07   作者:喵個咪  
消息隊列是大型分布式系統(tǒng)不可缺少的中間件,也是高并發(fā)系統(tǒng)的基石中間件,所以掌握好消息隊列MQ就變得極其重要,在本文當中,您將了解到:什么是消息隊列?什么是Kafka?怎樣在微服務(wù)框架Kratos當中應(yīng)用Kafka進行業(yè)務(wù)開發(fā),需要的朋友可以參考下

Golang微服務(wù)框架Kratos應(yīng)用Kafka消息隊列

消息隊列是一種異步的服務(wù)間通信方式,適用于無服務(wù)器和微服務(wù)架構(gòu)。消息在被處理和刪除之前一直存儲在隊列上。每條消息僅可被一位用戶處理一次。消息隊列可被用于分離重量級處理、緩沖或批處理工作以及緩解高峰期工作負載。

消息隊列是大型分布式系統(tǒng)不可缺少的中間件,也是高并發(fā)系統(tǒng)的基石中間件,所以掌握好消息隊列MQ就變得極其重要。

在本文當中,您將了解到:什么是消息隊列?什么是Kafka?怎樣在微服務(wù)框架Kratos當中應(yīng)用Kafka進行業(yè)務(wù)開發(fā)。

什么是消息隊列

消息隊列(Message Queue,簡稱MQ)指保存消息的一個容器,其實本質(zhì)就是一個保存數(shù)據(jù)的隊列。

消息中間件是指利用高效可靠的消息傳遞機制進行與平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的構(gòu)建。

消息中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用解耦,異步消息,流量削峰等問題,實現(xiàn)高性能,高可用,可伸縮和最終一致性的系統(tǒng)架構(gòu)。目前使用較多的消息隊列有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、Kafka、NAQ、NATS、Pulsar等。

消息隊列應(yīng)用場景

消息中間件在互聯(lián)網(wǎng)公司使用得越來越多,主要用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。以下介紹消息隊列在實際應(yīng)用中常用的使用場景:異步處理,應(yīng)用解耦,流量削峰和消息通訊。

異步處理

通常的微服務(wù)實現(xiàn)的接口,都是通過RPC進行微服務(wù)、服務(wù)客戶端之間的相互調(diào)用,這是同步阻塞執(zhí)行。有一些業(yè)務(wù),業(yè)務(wù)流程比較耗時且可以不需要立即返回結(jié)果,還有一些業(yè)務(wù)可以互不干擾的并行執(zhí)行,那么我們就可以將之轉(zhuǎn)為異步,并發(fā)執(zhí)行。從而減少同步接口的請求響應(yīng)時間,從而提高系統(tǒng)的吞吐量。

以下單為例,用戶下單后需要實施:生成訂單、贈送活動積分、贈送紅包、發(fā)送下單成功通知等,一系列業(yè)務(wù)處理。假設(shè)三個業(yè)務(wù)節(jié)點每個使用100毫秒鐘,不考慮網(wǎng)絡(luò)等其他開銷,則串行方式的時間是400毫秒,并行的時間只需要200毫秒。這樣就大大提高了系統(tǒng)的吞吐量。

應(yīng)用解耦

應(yīng)用解耦,顧名思義就是解除應(yīng)用系統(tǒng)之間的耦合依賴。通過消息隊列,使得每個應(yīng)用系統(tǒng)不必受其他系統(tǒng)影響,可以更獨立自主。

以電商系統(tǒng)為例,用戶下單后,訂單系統(tǒng)需要通知積分系統(tǒng)。一般的做法是:訂單系統(tǒng)直接調(diào)用積分系統(tǒng)的接口。這就使得應(yīng)用系統(tǒng)間的耦合特別緊密。如果積分系統(tǒng)無法訪問,則積分處理失敗,從而導(dǎo)致訂單失敗。

加入消息隊列之后,用戶下單后,訂單系統(tǒng)完成下單業(yè)務(wù)后,將消息寫入消息隊列,返回用戶訂單下單成功。積分系統(tǒng)通過訂閱下單消息的方式獲取下單通知消息,從而進行積分操作。實現(xiàn)訂單系統(tǒng)與庫存系統(tǒng)的應(yīng)用解耦。如果,在下單時積分系統(tǒng)系統(tǒng)異常,也不影響用戶正常下單,因為下單后,訂單系統(tǒng)寫入消息隊列就不再關(guān)心其他的后續(xù)操作。

流量削峰

流量削峰也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。

以秒殺活動為例,一般會因為流量過大,導(dǎo)致流量暴增,應(yīng)用掛掉。為解決這個問題,一般需要在應(yīng)用前端加入消息隊列,秒殺業(yè)務(wù)處理系統(tǒng)根據(jù)消息隊列中的請求信息,再做后續(xù)處理。

如上圖所示,服務(wù)器接收到用戶的請求后,首先寫入消息隊列,秒殺業(yè)務(wù)處理系統(tǒng)根據(jù)消息隊列中的請求信息,做后續(xù)業(yè)務(wù)處理。假如消息隊列長度超過最大數(shù)量,則直接拋棄用戶請求或跳轉(zhuǎn)到錯誤頁面。

消息通訊

消息通訊是指應(yīng)用間的數(shù)據(jù)通信。消息隊列一般都內(nèi)置了高效的通信機制,因此也可以用在單純的消息通訊上。比如:實現(xiàn)點對點消息隊列,或者聊天室等點對點通訊。

以上實際是消息隊列的兩種消息模式,點對點或發(fā)布訂閱模式。

什么是 Apache Kafka?

Apache Kafka 是一個分布式數(shù)據(jù)流處理平臺,可以實時發(fā)布、訂閱、存儲和處理數(shù)據(jù)流。它旨在處理多種來源的數(shù)據(jù)流,并將它們交付給多個消費者。簡而言之,它可以移動大量數(shù)據(jù),不僅是從 A 點移到 B 點,而是能從 A 到 Z 的多個點移到任何您想要的位置,并且可以同時進行。

Apache Kafka 可以取代傳統(tǒng)的企業(yè)級消息傳遞系統(tǒng)。它最初是 Linkedin 為處理每天 1.4 萬億條消息而開發(fā)的一個內(nèi)部系統(tǒng),現(xiàn)已成為應(yīng)用于各式各樣企業(yè)需求的開源數(shù)據(jù)流處理解決方案。

Kafka 的工作原理

Kafka 結(jié)合了兩種消息收發(fā)模型、列隊和發(fā)布-訂閱,以向客戶提供其各自的主要優(yōu)勢。通過列隊可以跨多個使用器實例分發(fā)數(shù)據(jù)處理,因此具有很高的可擴展性。但是,傳統(tǒng)隊列不支持多訂閱者。發(fā)布-訂閱方法支持多訂閱者,但是由于每條消息傳送給每個訂閱者,因此無法用于跨多個工作進程發(fā)布工作。Kafka uses 使用分區(qū)日志模型將這兩種解決方案融合在一起。日志是一種有序的記錄,這些日志分成區(qū)段或分區(qū),分別對應(yīng)不同的訂閱者。這意味著,同一個主題可有多個訂閱者,分別有各自的分區(qū)以獲得更高的可擴展性。最后,Kafka 的模型帶來可重放性,允許多個相互獨立的應(yīng)用程序從數(shù)據(jù)流執(zhí)行讀取以便按自己的速率獨立地工作。

列隊

發(fā)布-訂閱

Kafka的基本概念

kafka運行在集群上,集群包含一個或多個服務(wù)器。kafka把消息存在topic中,每一條消息包含鍵值(key),值(value)和時間戳(timestamp)。

kafka有以下一些基本概念:

  • Producer - 消息生產(chǎn)者,就是向kafka broker發(fā)消息的客戶端。

  • Consumer - 消息消費者,是消息的使用方,負責(zé)消費Kafka服務(wù)器上的消息。

  • Topic - 主題,由用戶定義并配置在Kafka服務(wù)器,用于建立Producer和Consumer之間的訂閱關(guān)系。生產(chǎn)者發(fā)送消息到指定的Topic下,消息者從這個Topic下消費消息。

  • Partition - 消息分區(qū),一個topic可以分為多個 partition,每個partition是一個有序的隊列。partition中的每條消息都會被分配一個有序的id(offset)。

  • Broker - 一臺kafka服務(wù)器就是一個broker。一個集群由多個broker組成。一個broker可以容納多個topic。

  • Consumer Group - 消費者分組,用于歸組同類消費者。每個consumer屬于一個特定的consumer group,多個消費者可以共同消息一個Topic下的消息,每個消費者消費其中的部分消息,這些消費者就組成了一個分組,擁有同一個分組名稱,通常也被稱為消費者集群。

  • Offset - 消息在partition中的偏移量。每一條消息在partition都有唯一的偏移量,消息者可以指定偏移量來指定要消費的消息。

Docker部署開發(fā)環(huán)境

docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka:latest
docker run -itd \
    --name zookeeper-test \
    -p 2181:2181 \
    -e ALLOW_ANONYMOUS_LOGIN=yes \
    bitnami/zookeeper:latest
docker run -itd \
    --name kafka-standalone \
    --link zookeeper-test \
    -p 9092:9092 \
    -v /home/data/kafka:/bitnami/kafka \
    -e KAFKA_BROKER_ID=1 \
    -e KAFKA_LISTENERS=PLAINTEXT://:9092 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper-test:2181 \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    --user root \
    bitnami/kafka:latest

管理工具

Kratos下如何應(yīng)用Kafka?

我對Kafka做了一個封裝,要在Kratos下面使用Kafka,首先需要在項目中引用我封裝的兩個庫:

第一個庫可以視之為Kafka客戶端的一個封裝:

go get -u github.com/tx7do/kratos-transport/broker/kafka

這一個庫是講Kafka的客戶端封裝成一個Kratos的transport.Server,該庫依賴上面的庫:

go get -u github.com/tx7do/kratos-transport/transport/kafka

想要在Kratos里面應(yīng)用Kafka,有兩條途徑可以達成:

  • Data層引用Kafka的Broker,僅用于發(fā)布(Publish)消息之用,換言之,就是只發(fā)送不接收的單向通訊;
  • Server層引用Kafka的Server,可以發(fā)布(Publish)消息,也可以訂閱(Subscribe)消息,換言之,就是既發(fā)送又接收的全雙工通訊。

接下來我就詳細的講解應(yīng)用方法:

在Data層引用Kafka的Broker

首先創(chuàng)建Kafka的Broker:

import (
	"github.com/tx7do/kratos-transport/broker"
	"github.com/tx7do/kratos-transport/broker/kafka"
)
func NewKafkaBroker(cfg *conf.Bootstrap) broker.Broker {
	b := kafka.NewBroker(
		broker.WithAddress(cfg.Data.Kafka.Addrs...),
		broker.WithCodec(cfg.Data.Kafka.Codec),
	)
	if b == nil {
		return nil
	}
	_ = b.Init()
	if err := b.Connect(); err != nil {
		return nil
	}
	return b
}

然后,注入到WireProviderSet

package data
import "github.com/google/wire"
// ProviderSet is data providers.
var ProviderSet = wire.NewSet(
    ...
	NewKafkaBroker,
)

最后,我們就可以在Service里面調(diào)用了:

package service
type ReportService struct {
	v1.ReportServiceHTTPServer
	kafkaBroker broker.Broker
	log         *log.Helper
}
func NewReportService(logger log.Logger, kafkaBroker broker.Broker) *ReportService {
	l := log.NewHelper(log.With(logger, "module", "report/service/agent-service"))
	return &ReportService{
		log:         l,
		kafkaBroker: kafkaBroker,
	}
}
func (s *ReportService) PostReport(_ context.Context, req *v1.PostReportRequest) (*v1.PostReportResponse, error) {
	_ = s.kafkaBroker.Publish(topic.EventReportData, reportV1.RealTimeWarehousingData{
		EventName:  &req.EventName,
		ReportData: &req.Content,
		CreateTime: util.UnixMilliToStringPtr(trans.Int64(time.Now().UnixMilli())),
	})
	return &v1.PostReportResponse{
		Code: 0,
		Msg:  "success",
	}, nil
}

需要注意的是,添加了以上代碼之后,需要使用命令生成Wire的膠水代碼:

go run -mod=mod github.com/google/wire/cmd/wire ./cmd/server

在Server層引用Kafka的Server

首先要創(chuàng)建Server

package server
import (
    ...
	"github.com/tx7do/kratos-transport/transport/kafka"
)
// NewKafkaServer create a kafka server.
func NewKafkaServer(cfg *conf.Bootstrap, _ log.Logger, svc *service.SaverService) *kafka.Server {
	ctx := context.Background()
	srv := kafka.NewServer(
		kafka.WithAddress(cfg.Server.Kafka.Addrs),
		kafka.WithGlobalTracerProvider(),
		kafka.WithGlobalPropagator(),
		kafka.WithCodec("json"),
	)
	registerKafkaSubscribers(ctx, srv, svc)
	return srv
}
func registerKafkaSubscribers(ctx context.Context, srv *kafka.Server, svc *service.SaverService) {
	_ = kafka.RegisterSubscriber(srv, ctx,
		topic.UserReportData, topic.LoggerSaverQueue, false,
		svc.SaveUserReport,
	)
	_ = kafka.RegisterSubscriber(srv, ctx,
		topic.EventReportData, topic.LoggerSaverQueue, false,
		svc.SaveEventReport,
	)
}

接著,調(diào)用kratos.Server把Kafka的服務(wù)器注冊到Kratos里去:

func newApp(ll log.Logger, rr registry.Registrar, ks *kafka.Server) *kratos.App {
	return kratos.New(
		kratos.ID(Service.GetInstanceId()),
		kratos.Name(Service.Name),
		kratos.Version(Service.Version),
		kratos.Metadata(Service.Metadata),
		kratos.Logger(ll),
		kratos.Server(
			ks,
		),
		kratos.Registrar(rr),
	)
}

最后,我們就可以在Service里愉快的玩耍了,在這里,我只演示收到Kafka消息之后立即寫入數(shù)據(jù)庫的操作:

package service
type SaverService struct {
	log          *log.Helper
	statusRepo   *data.AcceptStatusRepo
	realtimeRepo *data.RealtimeWarehousingRepo
}
func NewSaverService(
	logger log.Logger,
	statusRepo *data.AcceptStatusRepo,
	realtimeRepo *data.RealtimeWarehousingRepo,
) *SaverService {
	l := log.NewHelper(log.With(logger, "module", "saver/service/logger-service"))
	return &SaverService{
		log:          l,
		statusRepo:   statusRepo,
		realtimeRepo: realtimeRepo,
	}
}
func (s *SaverService) SaveUserReport(_ context.Context, _ string, _ broker.Headers, msg *v1.AcceptStatusReportData) error {
	return s.statusRepo.Create(msg)
}
func (s *SaverService) SaveEventReport(_ context.Context, _ string, _ broker.Headers, msg *v1.RealTimeWarehousingData) error {
	return s.realtimeRepo.Create(msg)
}

實例代碼

以上代碼以及接口定義,可以在我的另外一個開源項目里面找到:

https://github.com/tx7do/kratos-uba

https://gitee.com/tx7do/kratos-uba

以上就是Golang微服務(wù)框架Kratos實現(xiàn)Kafka消息隊列的方法的詳細內(nèi)容,更多關(guān)于Golang Kafka消息隊列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 使用Golang的channel交叉打印兩個數(shù)組的操作

    使用Golang的channel交叉打印兩個數(shù)組的操作

    這篇文章主要介紹了使用Golang的channel交叉打印兩個數(shù)組的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-04-04
  • 使用Go HTTP客戶端打造高性能服務(wù)

    使用Go HTTP客戶端打造高性能服務(wù)

    大多數(shù)語言都有提供各自的 HTTP 客戶端,本文將動手實踐如何使用Go語言發(fā)起HTTP請求,并討論其中有可能遇到的問題。具有一定的參考價值,感興趣的可以了解一下
    2021-12-12
  • Go語言中的goroutine和channel如何協(xié)同工作

    Go語言中的goroutine和channel如何協(xié)同工作

    在Go語言中,goroutine和channel是并發(fā)編程的兩個核心概念,它們協(xié)同工作以實現(xiàn)高效、安全的并發(fā)執(zhí)行,本文將詳細探討goroutine和channel如何協(xié)同工作,以及它們在并發(fā)編程中的作用和優(yōu)勢,需要的朋友可以參考下
    2024-04-04
  • Golang中下劃線(_)的不錯用法分享

    Golang中下劃線(_)的不錯用法分享

    golang中的下劃線表示忽略變量的意思,也沒有產(chǎn)生新的變量,但是后面的表達式依然會被執(zhí)行,本文為大家整理了golang中下劃線的一些不錯的用法,需要的可以參考下
    2023-05-05
  • Golang排序和查找使用方法介紹

    Golang排序和查找使用方法介紹

    排序操作和查找一樣是很多程序經(jīng)常使用的操作。盡管一個最短的快排程序只要15行就可以搞定,但是一個健壯的實現(xiàn)需要更多的代碼,并且我們不希望每次我們需要的時候都重寫或者拷貝這些代碼
    2022-12-12
  • 一文搞懂Go語言中defer關(guān)鍵字的使用

    一文搞懂Go語言中defer關(guān)鍵字的使用

    defer是golang中用的比較多的一個關(guān)鍵字,也是go面試題里經(jīng)常出現(xiàn)的問題。今天就來整理一下關(guān)于defer的學(xué)習(xí)使用,希望對需要的朋友有所幫助
    2022-09-09
  • Golang中crypto/cipher加密標準庫全面指南

    Golang中crypto/cipher加密標準庫全面指南

    本文主要介紹了Golang中crypto/cipher加密標準庫,包括對稱加密、非對稱加密以及使用流加密和塊加密算法,文中通過示例代碼介紹的非常詳細,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-02-02
  • go實現(xiàn)fping功能

    go實現(xiàn)fping功能

    這篇文章主要介紹了go實現(xiàn)fping功能,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-11-11
  • Go語言開發(fā)前后端不分離項目詳解

    Go語言開發(fā)前后端不分離項目詳解

    這篇文章主要為大家介紹了Go語言開發(fā)前后端不分離項目詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-11-11
  • Go語言中調(diào)用外部命令的方法總結(jié)

    Go語言中調(diào)用外部命令的方法總結(jié)

    在工作中,我們時不時地會需要在Go中調(diào)用外部命令。本文為大家總結(jié)了Go語言中調(diào)用外部命令的幾種姿勢,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2022-11-11

最新評論