.NET Core下使用Kafka的方法步驟
安裝
CentOS安裝 kafka
Kafka : http://kafka.apache.org/downloads
ZooLeeper : https://zookeeper.apache.org/releases.html
下載并解壓
# 下載,并解壓 $ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz $ tar -zxvf kafka_2.12-2.1.1.tgz $ mv kafka_2.12-2.1.1.tgz /data/kafka # 下載 zookeeper,解壓 $ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz $ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz $ mv apache-zookeeper-3.5.8-bin /data/zookeeper
啟動(dòng) ZooKeeper
# 復(fù)制配置模版 $ cd /data/kafka/conf $ cp zoo_sample.cfg zoo.cfg # 看看配置需不需要改 $ vim zoo.cfg # 命令 $ ./bin/zkServer.sh start # 啟動(dòng) $ ./bin/zkServer.sh status # 狀態(tài) $ ./bin/zkServer.sh stop # 停止 $ ./bin/zkServer.sh restart # 重啟 # 使用客戶端測(cè)試 $ ./bin/zkCli.sh -server localhost:2181 $ quit
啟動(dòng) Kafka
# 備份配置 $ cd /data/kafka $ cp config/server.properties config/server.properties_copy # 修改配置 $ vim /data/kafka/config/server.properties # 集群配置下,每個(gè) broker 的 id 是必須不同的 # broker.id=0 # 監(jiān)聽地址設(shè)置(內(nèi)網(wǎng)) # listeners=PLAINTEXT://ip:9092 # 對(duì)外提供服務(wù)的IP、端口 # advertised.listeners=PLAINTEXT://106.75.84.97:9092 # 修改每個(gè)topic的默認(rèn)分區(qū)參數(shù)num.partitions,默認(rèn)是1,具體合適的取值需要根據(jù)服務(wù)器配置進(jìn)程確定,UCloud.ukafka = 3 # num.partitions=3 # zookeeper 配置 # zookeeper.connect=localhost:2181 # 通過配置啟動(dòng) kafka $ ./bin/kafka-server-start.sh config/server.properties& # 狀態(tài)查看 $ ps -ef|grep kafka $ jps
docker下安裝Kafka
docker pull wurstmeister/zookeeper docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
docker pull wurstmeister/kafka docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka
介紹
- Broker:消息中間件處理節(jié)點(diǎn),一個(gè)Kafka節(jié)點(diǎn)就是一個(gè)broker,多個(gè)broker可以組成一個(gè)Kafka集群。
- Topic:一類消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能夠同時(shí)負(fù)責(zé)多個(gè)topic的分發(fā)。
- Partition:topic物理上的分組,一個(gè)topic可以分為多個(gè)partition,每個(gè)partition是一個(gè)有序的隊(duì)列。
- Segment:partition物理上由多個(gè)segment組成,下面2.2和2.3有詳細(xì)說明。
- offset:每個(gè)partition都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到partition中。partition中的每個(gè)消息都有一個(gè)連續(xù)的序列號(hào)叫做offset,用于partition唯一標(biāo)識(shí)一條消息。
kafka partition 和 consumer 數(shù)目關(guān)系
- 如果consumer比partition多是浪費(fèi),因?yàn)閗afka的設(shè)計(jì)是在一個(gè)partition上是不允許并發(fā)的,所以consumer數(shù)不要大于partition數(shù) 。
- 如果consumer比partition少,一個(gè)consumer會(huì)對(duì)應(yīng)于多個(gè)partitions,這里主要合理分配consumer數(shù)和partition數(shù),否則會(huì)導(dǎo)致partition里面的數(shù)據(jù)被取的不均勻 。最好partiton數(shù)目是consumer數(shù)目的整數(shù)倍,所以partition數(shù)目很重要,比如取24,就很容易設(shè)定consumer數(shù)目 。
- 如果consumer從多個(gè)partition讀到數(shù)據(jù),不保證數(shù)據(jù)間的順序性,kafka只保證在一個(gè)partition上數(shù)據(jù)是有序的,但多個(gè)partition,根據(jù)你讀的順序會(huì)有不同
- 增減consumer,broker,partition會(huì)導(dǎo)致rebalance,所以rebalance后consumer對(duì)應(yīng)的partition會(huì)發(fā)生變化 快速開始
在 .NET Core 項(xiàng)目中安裝組件
Install-Package Confluent.Kafka
開源地址: https://github.com/confluentinc/confluent-kafka-dotnet
添加 IKafkaService
服務(wù)接口
public interface IKafkaService { /// <summary> /// 發(fā)送消息至指定主題 /// </summary> /// <typeparam name="TMessage"></typeparam> /// <param name="topicName"></param> /// <param name="message"></param> /// <returns></returns> Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class; /// <summary> /// 從指定主題訂閱消息 /// </summary> /// <typeparam name="TMessage"></typeparam> /// <param name="topics"></param> /// <param name="messageFunc"></param> /// <param name="cancellationToken"></param> /// <returns></returns> Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class; }
實(shí)現(xiàn) IKafkaService
public class KafkaService : IKafkaService { public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class { var config = new ProducerConfig { BootstrapServers = "127.0.0.1:9092" }; using var producer = new ProducerBuilder<string, string>(config).Build(); await producer.ProduceAsync(topicName, new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = message.SerializeToJson() }); } public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class { var config = new ConsumerConfig { BootstrapServers = "127.0.0.1:9092", GroupId = "crow-consumer", EnableAutoCommit = false, StatisticsIntervalMs = 5000, SessionTimeoutMs = 6000, AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = true }; //const int commitPeriod = 5; using var consumer = new ConsumerBuilder<Ignore, string>(config) .SetErrorHandler((_, e) => { Console.WriteLine($"Error: {e.Reason}"); }) .SetStatisticsHandler((_, json) => { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息監(jiān)聽中.."); }) .SetPartitionsAssignedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($" - 分配的 kafka 分區(qū): {partitionsStr}"); }) .SetPartitionsRevokedHandler((c, partitions) => { string partitionsStr = string.Join(", ", partitions); Console.WriteLine($" - 回收了 kafka 的分區(qū): {partitionsStr}"); }) .Build(); consumer.Subscribe(topics); try { while (true) { try { var consumeResult = consumer.Consume(cancellationToken); Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'."); if (consumeResult.IsPartitionEOF) { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已經(jīng)到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } TMessage messageResult = null; try { messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value); } catch (Exception ex) { var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失敗,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}"; Console.WriteLine(errorMessage); messageResult = null; } if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/) { messageFunc(messageResult); try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine(e.Message); } } } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { Console.WriteLine("Closing consumer."); consumer.Close(); } await Task.CompletedTask; } }
注入 IKafkaService
,在需要使用的地方直接調(diào)用即可。
public class MessageService : IMessageService, ITransientDependency { private readonly IKafkaService _kafkaService; public MessageService(IKafkaService kafkaService) { _kafkaService = kafkaService; } public async Task RequestTraceAdded(XxxEventData eventData) { await _kafkaService.PublishAsync(eventData.TopicName, eventData); } }
以上相當(dāng)于一個(gè)生產(chǎn)者,當(dāng)我們消息隊(duì)列發(fā)出后,還需一個(gè)消費(fèi)者進(jìn)行消費(fèi),所以可以使用一個(gè)控制臺(tái)項(xiàng)目接收消息來處理業(yè)務(wù)。
var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) => { // Your logic Console.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已處理"); }, cts.Token);
在 IKafkaService
中已經(jīng)寫了訂閱消息的接口,這里也是注入后直接使用即可。
生產(chǎn)者消費(fèi)者示例
生產(chǎn)者
static async Task Main(string[] args) { if (args.Length != 2) { Console.WriteLine("Usage: .. brokerList topicName"); // 127.0.0.1:9092 helloTopic return; } var brokerList = args.First(); var topicName = args.Last(); var config = new ProducerConfig { BootstrapServers = brokerList }; using var producer = new ProducerBuilder<string, string>(config).Build(); Console.WriteLine("\n-----------------------------------------------------------------------"); Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}."); Console.WriteLine("-----------------------------------------------------------------------"); Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:"); Console.WriteLine("> key value<Enter>"); Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:"); Console.WriteLine("> value<enter>"); Console.WriteLine("Ctrl-C to quit.\n"); var cancelled = false; Console.CancelKeyPress += (_, e) => { e.Cancel = true; cancelled = true; }; while (!cancelled) { Console.Write("> "); var text = string.Empty; try { text = Console.ReadLine(); } catch (IOException) { break; } if (string.IsNullOrWhiteSpace(text)) { break; } var key = string.Empty; var val = text; var index = text.IndexOf(" "); if (index != -1) { key = text.Substring(0, index); val = text.Substring(index + 1); } try { var deliveryResult = await producer.ProduceAsync(topicName, new Message<string, string> { Key = key, Value = val }); Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}"); } catch (ProduceException<string, string> e) { Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]"); } } }
消費(fèi)者
static void Main(string[] args) { if (args.Length != 2) { Console.WriteLine("Usage: .. brokerList topicName"); // 127.0.0.1:9092 helloTopic return; } var brokerList = args.First(); var topicName = args.Last(); Console.WriteLine($"Started consumer, Ctrl-C to stop consuming"); var cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; var config = new ConsumerConfig { BootstrapServers = brokerList, GroupId = "consumer", EnableAutoCommit = false, StatisticsIntervalMs = 5000, SessionTimeoutMs = 6000, AutoOffsetReset = AutoOffsetReset.Earliest, EnablePartitionEof = true }; const int commitPeriod = 5; using var consumer = new ConsumerBuilder<Ignore, string>(config) .SetErrorHandler((_, e) => { Console.WriteLine($"Error: {e.Reason}"); }) .SetStatisticsHandler((_, json) => { Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring.."); //Console.WriteLine($"Statistics: {json}"); }) .SetPartitionsAssignedHandler((c, partitions) => { Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]"); }) .SetPartitionsRevokedHandler((c, partitions) => { Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]"); }) .Build(); consumer.Subscribe(topicName); try { while (true) { try { var consumeResult = consumer.Consume(cts.Token); if (consumeResult.IsPartitionEOF) { Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}."); continue; } Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}"); if (consumeResult.Offset % commitPeriod == 0) { try { consumer.Commit(consumeResult); } catch (KafkaException e) { Console.WriteLine($"Commit error: {e.Error.Reason}"); } } } catch (ConsumeException e) { Console.WriteLine($"Consume error: {e.Error.Reason}"); } } } catch (OperationCanceledException) { Console.WriteLine("Closing consumer."); consumer.Close(); } }
到此這篇關(guān)于.NET Core下使用Kafka的方法步驟的文章就介紹到這了,更多相關(guān).NET Core使用Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
vs2012創(chuàng)建的ado.net模型無法實(shí)例化的解決方案
本文給大家分享的是升級(jí)vs2012后,發(fā)現(xiàn)創(chuàng)建數(shù)據(jù)模型無法實(shí)例化使用,嘗試了很多種方法,最后在度娘的幫助下,才解決了這個(gè)問題,這里記錄下來,分享給大家。2015-03-03jquery中如何獲得服務(wù)器控件實(shí)現(xiàn)思路
jquery中如何獲得服務(wù)器控件,很多新手朋友對(duì)此比較陌生,接下來為您介紹解決方法,感興趣的朋友可以了解下哦2013-01-01Entity Framework Core實(shí)現(xiàn)軟刪除與查詢過濾器
這篇文章介紹了Entity Framework Core實(shí)現(xiàn)軟刪除與查詢過濾器的方法,文中通過示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-02-02ASP.NET Core中的Action的返回值類型實(shí)現(xiàn)
這篇文章主要介紹了ASP.NET Core中的Action的返回值類型實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04提交頁(yè)面的定位--scrollIntoView的用法
提交頁(yè)面的定位--scrollIntoView的用法...2007-03-03.NET?6開發(fā)TodoList應(yīng)用之實(shí)現(xiàn)DELETE請(qǐng)求與HTTP請(qǐng)求冪等性
這篇文章主要介紹了在.NET6開發(fā)中如何實(shí)現(xiàn)DELETE請(qǐng)求以及HTTP請(qǐng)求冪等性的,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2021-12-12ASP.NET 連接ACCESS數(shù)據(jù)庫(kù)的簡(jiǎn)單方法
一段非常簡(jiǎn)單的連接ACCESS數(shù)據(jù)庫(kù)的實(shí)例代碼,有需要的朋友可以參考一下2013-07-07GridView分頁(yè)的實(shí)現(xiàn)(通用分頁(yè)模板)
要在GridView中加入AllowPaging=true,一頁(yè)數(shù)據(jù)多少行PageSize=10分頁(yè)時(shí)觸發(fā)的事件OnPageIndexChanging等等,感興趣的朋友可以了解下本文,希望對(duì)你有所幫助2013-04-04