欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

.Net Core 集成 Kafka的步驟

 更新時間:2021年04月28日 09:17:04   作者:Agile.Zhou  
這篇文章主要介紹了.Net Core 集成 Kafka的實現(xiàn)步驟,幫助大家更好的理解和學習使用.net技術(shù),感興趣的朋友可以了解下

最近維護的一個系統(tǒng)并發(fā)有點高,所以想引入一個消息隊列來進行削峰。考察了一些產(chǎn)品,最終決定使用kafka來當做消息隊列。以下是關(guān)于kafka的一些知識的整理筆記。

kafka

kafka 是分布式流式平臺。它由linkedin開發(fā),后貢獻給了Apache開源組織并成為頂級開源項目。它可以應(yīng)用在高并發(fā)場景下的日志系統(tǒng),也可以當作消息隊列來使用,也可以當作消息服務(wù)對系統(tǒng)進行解耦。

流處理平臺有以下三種特性:

  • 可以讓你發(fā)布和訂閱流式的記錄。這一方面與消息隊列或者企業(yè)消息系統(tǒng)類似。
  • 可以儲存流式的記錄,并且有較好的容錯性。
  • 可以在流式記錄產(chǎn)生時就進行處理。

一般它可以應(yīng)用于兩個場景:

  • 構(gòu)造實時流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。 (相當于message queue)
  • 構(gòu)建實時流式應(yīng)用程序,對這些流數(shù)據(jù)進行轉(zhuǎn)換或者影響。 (就是流處理,通過kafka stream topic和topic之間內(nèi)部進行變化)

broker

kafka中的每個節(jié)點即每個服務(wù)器就是一個broker 。

topic

kafka中的topic是一個分類的概念,表示一類消息。生產(chǎn)者在生產(chǎn)消息的時候需要指定topic,消費者在消費消息的時候也需要指定topic。

partition

partition是分區(qū)的概念。kafka的一個topic可以有多個partition。每個partition會分散到不同的broker上,起到負載均衡的作用。生產(chǎn)者的消息會通過算法均勻的分散在各個partition上。

consumer group

kafka的消費者有個組的概念。一個partition可以被多consumer group訂閱。每個消息會廣播到每一個group中。但是每個消息只會被group中的一個consumer消費。相當于每個group,一個partition只能有一個consumer訂閱,所以group中的consumer數(shù)量不可以超過topic中partition的數(shù)量。并且消息的消費的順序在每個partition中是保證有序的,但是在多個partition之間是不保證的,因為consumer的消費速度是有快慢的。
所以如果要用kafka實現(xiàn)嚴格的消息隊列點對點模式那么我們可以設(shè)置一個partition并且設(shè)置一個consumer。如果對消息消費的順序不是那么敏感,那么可以設(shè)置多個partition來并行消費消息,提高吞吐量。

安裝kafka

為了能體驗下kafka,我們還是要實際安裝一下kafka,畢竟空想是沒有用的。現(xiàn)在有了docker,安裝起來也是相當?shù)魏唵?。我們只需要定義好docker-compose的yml就行了。

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.117
      KAFKA_CREATE_TOPICS: "test:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

1.我們在yml里定義2個service:

2.zookeeper,kafka的分布式依賴zookeeper,所以我需要先定義它。
kafka ,kafka的定義有幾個地方要注意的。

  • depends_on:zookeeper 指定kafka依賴zookeeper這個service,當啟動kafka的時候自動會啟動zookeeper。
  • KAFKA_ADVERTISED_HOST_NAME 這里要指定宿主機的ip
  • KAFKA_CREATE_TOPICS 這個變量只是的默認創(chuàng)建的topic。"test:3:1"代表創(chuàng)建一個名為test的topic并且創(chuàng)建3個分區(qū)1個復(fù)制。

定義好這些之后我們只需要使用docker-compose命令運行它:

sudo docker-compose up -d

.net 操作 kafka

安裝好kafka的docker環(huán)境之后,下面演示下如何使用.net操作kafka,進行消息的生產(chǎn)與消費。

生產(chǎn)者

        static async Task Main(string[] args)
        {
            Console.WriteLine("Hello World Producer!");

            var config = new ProducerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                ClientId = Dns.GetHostName(),
            };


            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                string topic = "test";
                for (int i = 0; i < 100; i++)
                {
                    var msg = "message " + i;
                    Console.WriteLine($"Send message:   value {msg}");
                    var result = await producer.ProduceAsync(topic, new Message<Null, string> { Value = msg });
                    Console.WriteLine($"Result: key {result.Key} value {result.Value} partition:{result.TopicPartition}");
                    Thread.Sleep(500);
                }
            }

            Console.ReadLine();

        }

新建一個控制臺項目,從nuget安裝kafka的官方client。

Install-Package Confluent.Kafka

代碼非常簡單,使用ProducerBuilder構(gòu)造一個producer,然后調(diào)用ProduceAsync方法發(fā)送消息。
其中需要注意的是如果你的場景并發(fā)非常之高,官方文檔推薦的方法是Produce而不是ProduceAsync。這是一個比較迷的地方。按常理使用ProduceAsync應(yīng)該比使用同步方法Produce能獲得更高的并發(fā)才對。但是文檔確確實實說高并發(fā)場景請使用Produce。可能是為了避免ProduceAsync結(jié)果返回的時候異步線程上下文切換造成的性能開銷。
原文:

There are a couple of additional benefits of using the Produce method. First, notification of message delivery (or failure) is strictly in the order of broker acknowledgement. With ProduceAsync, this is not the case because Tasks may complete on any thread pool thread. Second, Produce is more performant because there is unavoidable overhead in the higher level Task based API.

消費者

        static void Main(string[] args)
        {
            Console.WriteLine("Hello World kafka consumer !");

            var config = new ConsumerConfig
            {
                BootstrapServers = "192.168.0.117:9092",
                GroupId = "foo",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };

            var cancel = false;

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                var topic = "test";
                consumer.Subscribe(topic);

                while (!cancel)
                {
                    var consumeResult = consumer.Consume(CancellationToken.None);

                    Console.WriteLine($"Consumer message: { consumeResult.Message.Value} topic: {consumeResult.Topic} Partition: {consumeResult.Partition}");
                }

                consumer.Close();
            }
        }

消費者的演示代碼同樣很簡單。我們需要指定groupId,然后訂閱topic。使用ConsumerBuilder構(gòu)造一個consumer,然后調(diào)用Consume方法進行消費就可以。
注意:
這里默認是自動commit消費。你也可以根據(jù)情況手動提交commit。

運行一下

我們運行一個生產(chǎn)者進程,按照500ms的速度生產(chǎn)消息。運行三個consumer進行消費,可以看到消息被均勻的推送到三個consumer上去。

總結(jié)

以上簡單的介紹了kafka的背景、安裝方法、使用場景。還簡單演示了如何使用.net來操作kafka。它可以當作流式計算平臺來使用,也可以當作傳統(tǒng)的消息隊列使用。它當前非常流行,網(wǎng)上的資料也多如牛毛。官方也提供了簡單易用的.net sdk ,為.net 平臺集成kafka提供了便利。

以上就是.Net Core 集成 Kafka的步驟的詳細內(nèi)容,更多關(guān)于.Net Core 集成 Kafka的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • .NET生成水印更好的方法實例代碼

    .NET生成水印更好的方法實例代碼

    這篇文章主要給大家介紹了關(guān)于.NET中生成水印更好的方法,文中通過示例代碼介紹的非常詳細,對大家學習或者使用.NET具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2019-07-07
  • Entity Framework Core延遲加載(懶加載)用法

    Entity Framework Core延遲加載(懶加載)用法

    這篇文章介紹了Entity Framework Core延遲加載(懶加載)的使用方式,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-02-02
  • 效控制C#中l(wèi)abel輸出文字的長度,自動換行

    效控制C#中l(wèi)abel輸出文字的長度,自動換行

    效控制C#中l(wèi)abel輸出文字的長度,自動換行...
    2007-04-04
  • .Net?Core使用Coravel實現(xiàn)任務(wù)調(diào)度的完整步驟

    .Net?Core使用Coravel實現(xiàn)任務(wù)調(diào)度的完整步驟

    最近在使用調(diào)度程序創(chuàng)建簡單的服務(wù),該服務(wù)將執(zhí)行一些重復(fù)的IO操作,使用的是Coravel調(diào)度庫,下面這篇文章主要給大家介紹了關(guān)于.Net?Core使用Coravel實現(xiàn)任務(wù)調(diào)度的完整步驟,需要的朋友可以參考下
    2022-08-08
  • Visual Studio 2017安裝使用教程

    Visual Studio 2017安裝使用教程

    這篇文章主要為大家詳細介紹了Visual Studio 2017安裝使用教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-09-09
  • .net+FusionChart實現(xiàn)動態(tài)顯示的柱狀圖和餅狀圖

    .net+FusionChart實現(xiàn)動態(tài)顯示的柱狀圖和餅狀圖

    這篇文章介紹了.net+FusionChart實現(xiàn)動態(tài)顯示柱狀圖和餅狀圖的方法,文中通過示例代碼介紹的非常詳細。對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-07-07
  • asp.net?web?api2設(shè)置默認啟動登錄頁面的方法

    asp.net?web?api2設(shè)置默認啟動登錄頁面的方法

    這篇文章主要介紹了asp.net?web?api2設(shè)置默認啟動登錄頁面的方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-09-09
  • ASP.NET頁面請求超時時間設(shè)置多種方法

    ASP.NET頁面請求超時時間設(shè)置多種方法

    這篇文章主要為大家詳細介紹了ASP.NET頁面請求超時時間設(shè)置Server.ScriptTimeOut executionTimeout多種方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-09-09
  • asp.net get set用法

    asp.net get set用法

    屬性的定義和使用 屬性由兩個部分組成:屬性頭和存儲器。存儲器分為get訪問器和set訪問器。聲明屬性的一般形式為: 修飾符 類型 屬性名
    2008-05-05
  • 為T-SQL添加intellisense功能

    為T-SQL添加intellisense功能

    為T-SQL添加intellisense功能...
    2007-02-02

最新評論