.Net Core 集成 Kafka的步驟
最近維護(hù)的一個(gè)系統(tǒng)并發(fā)有點(diǎn)高,所以想引入一個(gè)消息隊(duì)列來(lái)進(jìn)行削峰??疾炝艘恍┊a(chǎn)品,最終決定使用kafka來(lái)當(dāng)做消息隊(duì)列。以下是關(guān)于kafka的一些知識(shí)的整理筆記。
kafka
kafka 是分布式流式平臺(tái)。它由linkedin開發(fā),后貢獻(xiàn)給了Apache開源組織并成為頂級(jí)開源項(xiàng)目。它可以應(yīng)用在高并發(fā)場(chǎng)景下的日志系統(tǒng),也可以當(dāng)作消息隊(duì)列來(lái)使用,也可以當(dāng)作消息服務(wù)對(duì)系統(tǒng)進(jìn)行解耦。
流處理平臺(tái)有以下三種特性:
- 可以讓你發(fā)布和訂閱流式的記錄。這一方面與消息隊(duì)列或者企業(yè)消息系統(tǒng)類似。
- 可以儲(chǔ)存流式的記錄,并且有較好的容錯(cuò)性。
- 可以在流式記錄產(chǎn)生時(shí)就進(jìn)行處理。
一般它可以應(yīng)用于兩個(gè)場(chǎng)景:
- 構(gòu)造實(shí)時(shí)流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。 (相當(dāng)于message queue)
- 構(gòu)建實(shí)時(shí)流式應(yīng)用程序,對(duì)這些流數(shù)據(jù)進(jìn)行轉(zhuǎn)換或者影響。 (就是流處理,通過(guò)kafka stream topic和topic之間內(nèi)部進(jìn)行變化)
broker
kafka中的每個(gè)節(jié)點(diǎn)即每個(gè)服務(wù)器就是一個(gè)broker 。
topic
kafka中的topic是一個(gè)分類的概念,表示一類消息。生產(chǎn)者在生產(chǎn)消息的時(shí)候需要指定topic,消費(fèi)者在消費(fèi)消息的時(shí)候也需要指定topic。
partition
partition是分區(qū)的概念。kafka的一個(gè)topic可以有多個(gè)partition。每個(gè)partition會(huì)分散到不同的broker上,起到負(fù)載均衡的作用。生產(chǎn)者的消息會(huì)通過(guò)算法均勻的分散在各個(gè)partition上。
consumer group
kafka的消費(fèi)者有個(gè)組的概念。一個(gè)partition可以被多consumer group訂閱。每個(gè)消息會(huì)廣播到每一個(gè)group中。但是每個(gè)消息只會(huì)被group中的一個(gè)consumer消費(fèi)。相當(dāng)于每個(gè)group,一個(gè)partition只能有一個(gè)consumer訂閱,所以group中的consumer數(shù)量不可以超過(guò)topic中partition的數(shù)量。并且消息的消費(fèi)的順序在每個(gè)partition中是保證有序的,但是在多個(gè)partition之間是不保證的,因?yàn)閏onsumer的消費(fèi)速度是有快慢的。
所以如果要用kafka實(shí)現(xiàn)嚴(yán)格的消息隊(duì)列點(diǎn)對(duì)點(diǎn)模式那么我們可以設(shè)置一個(gè)partition并且設(shè)置一個(gè)consumer。如果對(duì)消息消費(fèi)的順序不是那么敏感,那么可以設(shè)置多個(gè)partition來(lái)并行消費(fèi)消息,提高吞吐量。
安裝kafka
為了能體驗(yàn)下kafka,我們還是要實(shí)際安裝一下kafka,畢竟空想是沒(méi)有用的?,F(xiàn)在有了docker,安裝起來(lái)也是相當(dāng)?shù)魏?jiǎn)單。我們只需要定義好docker-compose的yml就行了。
version: '3' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117 KAFKA_CREATE_TOPICS: "test:3:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
1.我們?cè)趛ml里定義2個(gè)service:
2.zookeeper,kafka的分布式依賴zookeeper,所以我需要先定義它。
kafka ,kafka的定義有幾個(gè)地方要注意的。
- depends_on:zookeeper 指定kafka依賴zookeeper這個(gè)service,當(dāng)啟動(dòng)kafka的時(shí)候自動(dòng)會(huì)啟動(dòng)zookeeper。
- KAFKA_ADVERTISED_HOST_NAME 這里要指定宿主機(jī)的ip
- KAFKA_CREATE_TOPICS 這個(gè)變量只是的默認(rèn)創(chuàng)建的topic。"test:3:1"代表創(chuàng)建一個(gè)名為test的topic并且創(chuàng)建3個(gè)分區(qū)1個(gè)復(fù)制。
定義好這些之后我們只需要使用docker-compose命令運(yùn)行它:
sudo docker-compose up -d
.net 操作 kafka
安裝好kafka的docker環(huán)境之后,下面演示下如何使用.net操作kafka,進(jìn)行消息的生產(chǎn)與消費(fèi)。
生產(chǎn)者
static async Task Main(string[] args) { Console.WriteLine("Hello World Producer!"); var config = new ProducerConfig { BootstrapServers = "192.168.0.117:9092", ClientId = Dns.GetHostName(), }; using (var producer = new ProducerBuilder<Null, string>(config).Build()) { string topic = "test"; for (int i = 0; i < 100; i++) { var msg = "message " + i; Console.WriteLine($"Send message: value {msg}"); var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg }); Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}"); Thread.Sleep(500); } } Console.ReadLine(); }
新建一個(gè)控制臺(tái)項(xiàng)目,從nuget安裝kafka的官方client。
Install-Package Confluent.Kafka
代碼非常簡(jiǎn)單,使用ProducerBuilder構(gòu)造一個(gè)producer,然后調(diào)用ProduceAsync方法發(fā)送消息。
其中需要注意的是如果你的場(chǎng)景并發(fā)非常之高,官方文檔推薦的方法是Produce而不是ProduceAsync。這是一個(gè)比較迷的地方。按常理使用ProduceAsync應(yīng)該比使用同步方法Produce能獲得更高的并發(fā)才對(duì)。但是文檔確確實(shí)實(shí)說(shuō)高并發(fā)場(chǎng)景請(qǐng)使用Produce??赡苁菫榱吮苊釶roduceAsync結(jié)果返回的時(shí)候異步線程上下文切換造成的性能開銷。
原文:
There are a couple of additional benefits of using the Produce method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ProduceAsync, this is not the case because Tasks may complete on any thread pool thread. Second, Produce is more performant because there is unavoidable overhead in the higher level Task based API.
消費(fèi)者
static void Main(string[] args) { Console.WriteLine("Hello World kafka consumer !"); var config = new ConsumerConfig { BootstrapServers = "192.168.0.117:9092", GroupId = "foo", AutoOffsetReset = AutoOffsetReset.Earliest }; var cancel = false; using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) { var topic = "test"; consumer.Subscribe(topic); while (!cancel) { var consumeResult = consumer.Consume(CancellationToken.None); Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}"); } consumer.Close(); } }
消費(fèi)者的演示代碼同樣很簡(jiǎn)單。我們需要指定groupId,然后訂閱topic。使用ConsumerBuilder構(gòu)造一個(gè)consumer,然后調(diào)用Consume方法進(jìn)行消費(fèi)就可以。
注意:
這里默認(rèn)是自動(dòng)commit消費(fèi)。你也可以根據(jù)情況手動(dòng)提交commit。
運(yùn)行一下
我們運(yùn)行一個(gè)生產(chǎn)者進(jìn)程,按照500ms的速度生產(chǎn)消息。運(yùn)行三個(gè)consumer進(jìn)行消費(fèi),可以看到消息被均勻的推送到三個(gè)consumer上去。
總結(jié)
以上簡(jiǎn)單的介紹了kafka的背景、安裝方法、使用場(chǎng)景。還簡(jiǎn)單演示了如何使用.net來(lái)操作kafka。它可以當(dāng)作流式計(jì)算平臺(tái)來(lái)使用,也可以當(dāng)作傳統(tǒng)的消息隊(duì)列使用。它當(dāng)前非常流行,網(wǎng)上的資料也多如牛毛。官方也提供了簡(jiǎn)單易用的.net sdk ,為.net 平臺(tái)集成kafka提供了便利。
以上就是.Net Core 集成 Kafka的步驟的詳細(xì)內(nèi)容,更多關(guān)于.Net Core 集成 Kafka的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Entity Framework Core延遲加載(懶加載)用法
這篇文章介紹了Entity Framework Core延遲加載(懶加載)的使用方式,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-02-02效控制C#中l(wèi)abel輸出文字的長(zhǎng)度,自動(dòng)換行
效控制C#中l(wèi)abel輸出文字的長(zhǎng)度,自動(dòng)換行...2007-04-04.Net?Core使用Coravel實(shí)現(xiàn)任務(wù)調(diào)度的完整步驟
最近在使用調(diào)度程序創(chuàng)建簡(jiǎn)單的服務(wù),該服務(wù)將執(zhí)行一些重復(fù)的IO操作,使用的是Coravel調(diào)度庫(kù),下面這篇文章主要給大家介紹了關(guān)于.Net?Core使用Coravel實(shí)現(xiàn)任務(wù)調(diào)度的完整步驟,需要的朋友可以參考下2022-08-08.net+FusionChart實(shí)現(xiàn)動(dòng)態(tài)顯示的柱狀圖和餅狀圖
這篇文章介紹了.net+FusionChart實(shí)現(xiàn)動(dòng)態(tài)顯示柱狀圖和餅狀圖的方法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-07-07asp.net?web?api2設(shè)置默認(rèn)啟動(dòng)登錄頁(yè)面的方法
這篇文章主要介紹了asp.net?web?api2設(shè)置默認(rèn)啟動(dòng)登錄頁(yè)面的方法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-09-09ASP.NET頁(yè)面請(qǐng)求超時(shí)時(shí)間設(shè)置多種方法
這篇文章主要為大家詳細(xì)介紹了ASP.NET頁(yè)面請(qǐng)求超時(shí)時(shí)間設(shè)置Server.ScriptTimeOut executionTimeout多種方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-09-09