關(guān)于Kafka消費(fèi)者訂閱方式
Kafka消費(fèi)者訂閱方式
Kafka為消費(fèi)者提供了三種類(lèi)型的訂閱消費(fèi)方式:訂閱主題集合、正則表達(dá)式訂閱主題、訂閱指定主題的分區(qū)集合。三種方式只能使用其中一種。
1.指定主題消費(fèi)
一個(gè)消費(fèi)者可以使用KafkaConsumer提供的subscribe()方法訂閱一個(gè)或多個(gè)主題,訂閱主題集合和正則表達(dá)式訂閱主題都使用此方法實(shí)現(xiàn)的。下面兩種方式都可以訂閱topic_1120主題。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 訂閱主題 consumer.subscribe(Collections.singletonList("topic_1120"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //正則表達(dá)式.*代表后續(xù)0個(gè)或者多個(gè)任意字符。 consumer.subscribe(Pattern.compile("topic.*"));
訂閱主題在源碼中由4個(gè)方法重載實(shí)現(xiàn),其中兩個(gè)帶listener的方法是可以自定義Rebalance重平衡的監(jiān)聽(tīng)類(lèi)。
@Override public void subscribe(Collection<String> topics) { ? ? subscribe(topics, new NoOpConsumerRebalanceListener()); } @Override public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { ? ? //省略源碼 } @Override public void subscribe(Pattern pattern) { ? ? subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { ? ? //省略源碼 }
2.指定分區(qū)消費(fèi)
消費(fèi)者指定分區(qū)消費(fèi)是通過(guò)KafkaConsumer提供的assign()方法實(shí)現(xiàn)的,assign()方法入?yún)镃ollection, 其中TopicPartition有2個(gè)屬性, topic和partition, 分區(qū)從0開(kāi)始編號(hào)。使用assign()方法訂閱指定主題test_1120分區(qū)0的消息。
/訂閱指定分區(qū) consumer.assign(Collections.singleton(new TopicPartition("topic_1120", 0)));
3.取消訂閱
取消訂閱調(diào)用unsubscribe()方法。
consumer.unsubscribe();
小結(jié):subscribe()具有自動(dòng)重平衡的功能,來(lái)實(shí)現(xiàn)消費(fèi)負(fù)載均衡和故障自動(dòng)轉(zhuǎn)移,而assign()不具備這種功能。
Kafka概述
定義
Kafka是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列,主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域。
消息隊(duì)列
1.傳統(tǒng)消息隊(duì)列的應(yīng)用場(chǎng)景
使用消息隊(duì)列的好處
1)解耦
允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過(guò)程,只要確保它們遵守同樣的接口約束。
2)可恢復(fù)性
系統(tǒng)的一部分組件失效時(shí),不會(huì)影響到整個(gè)系統(tǒng)。消息隊(duì)列降低了進(jìn)程間的耦合度,所 以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
3)緩沖
有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過(guò)系統(tǒng)的速度,解決生產(chǎn)消息和消費(fèi)消息的處理速度不一致的情況。
4)靈活性 & 峰值處理能力
在訪問(wèn)量劇增的情況下,應(yīng)用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見(jiàn)。 如果為以能處理這類(lèi)峰值訪問(wèn)為標(biāo)準(zhǔn)來(lái)投入資源隨時(shí)待命無(wú)疑是巨大的浪費(fèi)。使用消息隊(duì)列 能夠使關(guān)鍵組件頂住突發(fā)的訪問(wèn)壓力,而不會(huì)因?yàn)橥话l(fā)的超負(fù)荷的請(qǐng)求而完全崩潰。
5)異步通信
很多時(shí)候,用戶不想也不需要立即處理消息。消息隊(duì)列提供了異步處理機(jī)制,允許用戶 把一個(gè)消息放入隊(duì)列,但并不立即處理它。想向隊(duì)列中放入多少消息就放多少,然后在需要 的時(shí)候再去處理它們。
2.消息隊(duì)列的兩種模式
(1)點(diǎn)對(duì)點(diǎn)模式(一對(duì)一,消費(fèi)者主動(dòng)拉取數(shù)據(jù),消息收到后消息清除)
消息生產(chǎn)者生產(chǎn)消息發(fā)送到Queue中,然后消息消費(fèi)者從Queue中取出并且消費(fèi)消息。 消息被消費(fèi)以后,queue 中不再有存儲(chǔ),所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。 Queue 支持存在多個(gè)消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。
(2)發(fā)布/訂閱模式(一對(duì)多,消費(fèi)者消費(fèi)數(shù)據(jù)之后不會(huì)清除消息)
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消 息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到 topic 的消息會(huì)被所有訂閱者消費(fèi)。
Kafka 基礎(chǔ)架構(gòu)
Producer
:消息生產(chǎn)者,就是向 kafka broker 發(fā)消息的客戶端;Consumer
:消息消費(fèi)者,向 kafka broker 取消息的客戶端;Consumer Group (CG)
:消費(fèi)者組,由多個(gè) consumer 組成。消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù) 責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)消費(fèi)者消費(fèi);消費(fèi)者組之間互不影響。所 有的消費(fèi)者都屬于某個(gè)消費(fèi)者組,即消費(fèi)者組是邏輯上的一個(gè)訂閱者。Broker
:一臺(tái) kafka 服務(wù)器就是一個(gè) broker。一個(gè)集群由多個(gè) broker 組成。一個(gè) broker 可以容納多個(gè) topic。Topic
:可以理解為一個(gè)隊(duì)列,生產(chǎn)者和消費(fèi)者面向的都是一個(gè) topic;Partition
:為了實(shí)現(xiàn)擴(kuò)展性,一個(gè)非常大的 topic 可以分布到多個(gè) broker(即服務(wù)器)上, 一個(gè) topic 可以分為多個(gè) partition,每個(gè) partition 是一個(gè)有序的隊(duì)列;Replica
:副本,為保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 partition 數(shù)據(jù)不丟失, 且 kafka 仍然能夠繼續(xù)工作,kafka 提供了副本機(jī)制,一個(gè) topic 的每個(gè)分區(qū)都有若干個(gè)副本, 一個(gè) leader 和若干個(gè) follower。leader
:每個(gè)分區(qū)多個(gè)副本的“主”,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì) 象都是 leader。follower
:每個(gè)分區(qū)多個(gè)副本中的“從”,實(shí)時(shí)從 leader 中同步數(shù)據(jù),保持和 leader 數(shù)據(jù) 的同步。leader 發(fā)生故障時(shí),某個(gè) follower 會(huì)成為新的 follower。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java實(shí)現(xiàn)獲取客戶端真實(shí)IP方法小結(jié)
本文給大家匯總介紹了2種使用java實(shí)現(xiàn)獲取客戶端真實(shí)IP的方法,主要用于獲取使用了代理訪問(wèn)的來(lái)訪者的IP,有需要的小伙伴可以參考下。2016-03-03Java中調(diào)用SQL Server存儲(chǔ)過(guò)程詳解
這篇文章主要介紹了Java中調(diào)用SQL Server存儲(chǔ)過(guò)程詳解,本文講解了使用不帶參數(shù)的存儲(chǔ)過(guò)程、使用帶有輸入?yún)?shù)的存儲(chǔ)過(guò)程、使用帶有輸出參數(shù)的存儲(chǔ)過(guò)程、使用帶有返回狀態(tài)的存儲(chǔ)過(guò)程、使用帶有更新計(jì)數(shù)的存儲(chǔ)過(guò)程等操作實(shí)例,需要的朋友可以參考下2015-01-01Spring中的@EnableScheduling定時(shí)任務(wù)注解
這篇文章主要介紹了Spring中的@EnableScheduling注解,@EnableScheduling是 Spring Framework 提供的一個(gè)注解,用于啟用 Spring 的定時(shí)任務(wù)功能,通過(guò)使用這個(gè)注解,可以在 Spring 應(yīng)用程序中創(chuàng)建定時(shí)任務(wù),需要的朋友可以參考下2024-01-01SpringBoot配置主從數(shù)據(jù)庫(kù)實(shí)現(xiàn)讀寫(xiě)分離
現(xiàn)在的 Web 應(yīng)用大都是讀多寫(xiě)少,本文主要介紹了SpringBoot配置主從數(shù)據(jù)庫(kù)實(shí)現(xiàn)讀寫(xiě)分離,具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11Spark學(xué)習(xí)筆記之Spark SQL的具體使用
這篇文章主要介紹了Spark學(xué)習(xí)筆記之Spark SQL的具體使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06