欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

@KafkaListener 如何使用

 更新時間:2023年02月20日 09:23:32   作者:DemonHunter211  
這篇文章主要介紹了@KafkaListener 如何使用,本文通過圖文實例代碼相結合給大家詳細講解,文末給大家介紹了kafka的消費者分區(qū)分配策略,需要的朋友可以參考下

@KafkaListener 如何使用

spring-kafka使用基于@KafkaListener注解,@KafkaListener使用方式如下

@KafkaListener(topics = "topic1")
public void? ?kafkaListen(List<ConsumerRecord<xxx, xxx>> records) {
? ? ...
}

在注解內指定topic名稱,當對應的topic內有新的消息時,testListen方法會被調用,參數就是topic內新的消息。這個過程是異步進行的。

@KafkaListener工作流程主要有以下幾步:

解析;解析@KafkaListener注解。
注冊;解析后的數據注冊到spring-kafka。
監(jiān)聽;開始監(jiān)聽topic變更。
調用;調用注解標識的方法,將監(jiān)聽到的數據作為參數傳入。
下面我們一步一步分析

解析

@KafkaListener注解由KafkaListenerAnnotationBeanPostProcessor類解析,后者實現了BeanPostProcessor接口,這個接口如下

public interface BeanPostProcessor {

    Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException;

    Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException;
}

接口內部有2個方法,分別在bean初始化前后被調用。

KafkaListenerAnnotationBeanPostProcessor內會在postProcessAfterInitialization方法內解析@KafkaListener注解。

注冊
解析步驟里,我們可以獲取到所有含有@KafkaListener注解的類,之后這些類的相關信息會被注冊到 KafkaListenerEndpointRegistry內,包括注解所在的方法,當前的bean等。KafkaListenerEndpointRegistry這個類內部會維護多個Listener Container,每一個@KafkaListener都會對應一個Listener Container。并且每個Container對應一個線程。

監(jiān)聽
注冊完成之后,每個Listener Container會開始工作,會新啟一個新的線程,初始化KafkaConsumer,監(jiān)聽topic變更等。

調用
監(jiān)聽到數據之后,container會組織消息的格式,隨后調用解析得到的@KafkaListener注解標識的方法,將組織后的消息作為參數傳入方法,執(zhí)行用戶邏輯。

@KafkaListener和@KafkaListners

@KafkaListeners是@KafkaListener的Container Annotation,這也是jdk8的新特性之一,注解可以重復標注。

@KafkaListeners({@KafkaListener(topics="topic1"), @KafkaListener(topics="topic2")})
public void listen(ConsumerRecord<Integer, String> msg) {}
 
等同于
 
@KafkaListener(topics="topic1")
@KafkaListener(topics="topic2")
public void listen(ConsumerRecord<Integer, String> msg) {}

擴展:kafka的消費者分區(qū)分配策略

kafka有三種分區(qū)分配策略

1. RoundRobin

2. Range

3. Sticky

1. RoundRobin

(1)把所有topic的分區(qū)partition放入一個隊列中,按照name的hashcode進行排序;

(2)把consumer放在一個循環(huán)隊列,按照name的hashcode進行排序;

(3)循環(huán)遍歷consumer,從partition隊列pop出一個partition,分配給當前consumer;以此類推,取下一個consumer,繼續(xù)從partition隊列pop出來分配給當前consumer;直到partition隊列中的元素被分配完;

2. Range

(1)假設topicA有4個分區(qū),topicB有5個分區(qū),topicC有6個分區(qū);一共有3個consumer;

(2)遍歷3個topic的分區(qū)集合,先取topicA的分區(qū)集合,然后準備依次給3個consumer分配分區(qū);對于第1個consumer,所分配的分區(qū)數量根據以下公式:假設消費者數量為N,當前主題剩下的分區(qū)數量為M,則當前消費者應該分配的分區(qū)數量 = M%N==0? M/N +1 : M/N ;按照公式,3個消費者應該分配的分區(qū)數量依次為:2/1/1,即topicA-partition-0/1分配給consumer-0,topicA-partition-2分配給consumer-1,topicA-partition-3分配給consumer-2;

(3)按照上述規(guī)則按序把topicB和topicC的分區(qū)分配給3個consumer;依次為:2/2/1,2/2/2;

3. Sticky

kafka在0.11版本引入了Sticky分區(qū)分配策略,它的兩個主要目的是:

1. 分區(qū)的分配要盡可能的均勻,分配給消費者者的主題分區(qū)數最多相差一個;

2. 分區(qū)的分配盡可能的與上次分配的保持相同;

當兩者發(fā)生沖突時,第一個目標優(yōu)先于第二個目標;

粘性分區(qū)是由Kafka從0.11x版本開始引入的分配策略,首先會盡量均衡的分配分區(qū)到消費者上面,在出現同一消費組內消費者出現問題的時候,會盡量保持原來的分配的分區(qū)不變;

Sticky分區(qū)初始分配分區(qū)的方法與Range相似,但是不同;拿7個分區(qū)3個消費者為例,消費者消費的分區(qū)依舊是3/2/2,但是不同與Range的是Range分區(qū)是排好序的,但是Sticky分區(qū)是隨機的;

到此這篇關于@KafkaListener 如何使用方式的文章就介紹到這了,更多相關@KafkaListener 使用內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java之Set?交集,差集,并集的用法

    Java之Set?交集,差集,并集的用法

    這篇文章主要介紹了Java之Set?交集,差集,并集的用法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-05-05
  • 淺談用java實現事件驅動機制

    淺談用java實現事件驅動機制

    這篇文章主要介紹了淺談用java實現事件驅動機制,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-09-09
  • Java Linkedlist原理及實例詳解

    Java Linkedlist原理及實例詳解

    這篇文章主要介紹了Java Linkedlist原理及實例詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-01-01
  • 使用maven-assembly-plugin如何打包多模塊項目

    使用maven-assembly-plugin如何打包多模塊項目

    這篇文章主要介紹了使用maven-assembly-plugin如何打包多模塊項目,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • postman中POST請求時參數包含參數list設置方式

    postman中POST請求時參數包含參數list設置方式

    這篇文章主要介紹了postman中POST請求時參數包含參數list設置方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-05-05
  • Java的main方法使用及說明

    Java的main方法使用及說明

    這篇文章主要介紹了Java的main方法使用及說明,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • JAVA異常處理捕獲與拋出原理解析

    JAVA異常處理捕獲與拋出原理解析

    這篇文章主要介紹了JAVA異常處理捕獲與拋出原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-09-09
  • RabbitMQ 實現延遲隊列的兩種方式詳解

    RabbitMQ 實現延遲隊列的兩種方式詳解

    很多場景下我們都需要延遲隊列。這篇文章主要以RabbitMQ為例來和大家聊一聊延遲隊列的玩法。文中的代碼具有一定的學習價值,感興趣的同學可以了解一下
    2021-12-12
  • Java,C#使用二進制序列化、反序列化操作數據

    Java,C#使用二進制序列化、反序列化操作數據

    這篇文章主要介紹了Java,C#使用二進制序列化、反序列化操作數據的相關資料,需要的朋友可以參考下
    2014-10-10
  • Java中構造函數,set/get方法和toString方法使用及注意說明

    Java中構造函數,set/get方法和toString方法使用及注意說明

    這篇文章主要介紹了Java中構造函數,set/get方法和toString方法的使用及注意說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-01-01

最新評論