springboot+kafka中@KafkaListener動(dòng)態(tài)指定多個(gè)topic問題
說明
本項(xiàng)目為springboot+kafak的整合項(xiàng)目,故其用了springboot中對kafak的消費(fèi)注解@KafkaListener
首先,application.properties中配置用逗號(hào)隔開的多個(gè)topic。
方法:利用Spring的SpEl表達(dá)式,將topics 配置為:@KafkaListener(topics = “#{’${topics}’.split(’,’)}”)
運(yùn)行程序,console打印的效果如下:
因?yàn)橹婚_了一條消費(fèi)者線程,所以所有的topic和分區(qū)都分配給這條線程。
如果你想開多條消費(fèi)者線程去消費(fèi)這些topic,添加@KafkaListener注解的參數(shù)concurrency的值為自己想要的消費(fèi)者個(gè)數(shù)即可(注意,消費(fèi)者數(shù)要小于等于你開的所有topic的分區(qū)數(shù)總和)
運(yùn)行程序,console打印的效果如下:
總結(jié)一下大家問的最多的一個(gè)問題
如何在程序運(yùn)行的過程中,改變topic,消費(fèi)者能夠消費(fèi)修改后的topic?
ans: 經(jīng)過嘗試,使用@KafkaListener注解實(shí)現(xiàn)不了此需求,在程序啟動(dòng)的時(shí)候,程序就會(huì)根據(jù)@KafkaListener的注解信息初始化好消費(fèi)者去消費(fèi)指定好的topic。如果在程序運(yùn)行的過程中,修改topic,不會(huì)讓此消費(fèi)者修改消費(fèi)者的配置再重新訂閱topic的。
不過我們可以有個(gè)折中的辦法,就是利用@KafkaListener的topicPattern參數(shù)來進(jìn)行topic匹配。
具體如何操作的可以看下這篇文章:
http://www.dbjr.com.cn/article/271098.htm
終極方法
思路
不使用@KafkaListener,使用kafka原生客戶端依賴,手動(dòng)初始化消費(fèi)者,開啟消費(fèi)者線程。
在消費(fèi)者線程中,每次循環(huán)都從配置、數(shù)據(jù)庫或者其他配置源獲取最新的topic信息,與之前的topic比較,如果發(fā)生變化,重新訂閱topic或者初始化消費(fèi)者。
實(shí)現(xiàn)
加入kafka客戶端依賴(本次測試服務(wù)端kafka版本:2.12-2.4.0)
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency>
代碼
@Service @Slf4j public class KafkaConsumers implements InitializingBean { /** * 消費(fèi)者 */ private static KafkaConsumer<String, String> consumer; /** * topic */ private List<String> topicList; public static String getNewTopic() { try { return org.apache.commons.io.FileUtils.readLines(new File("D:/topic.txt"), "utf-8").get(0); } catch (IOException e) { e.printStackTrace(); } return null; } /** * 初始化消費(fèi)者(配置寫死是為了快速測試,請大家使用配置文件) * * @param topicList * @return */ public KafkaConsumer<String, String> getInitConsumer(List<String> topicList) { //配置信息 Properties props = new Properties(); //kafka服務(wù)器地址 props.put("bootstrap.servers", "192.168.9.185:9092"); //必須指定消費(fèi)者組 props.put("group.id", "haha"); //設(shè)置數(shù)據(jù)key和value的序列化處理類 props.put("key.deserializer", StringDeserializer.class); props.put("value.deserializer", StringDeserializer.class); //創(chuàng)建消息者實(shí)例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //訂閱topic的消息 consumer.subscribe(topicList); return consumer; } /** * 開啟消費(fèi)者線程 * 異常請自己根據(jù)需求自己處理 */ @Override public void afterPropertiesSet() { // 初始化topic topicList = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); if (org.apache.commons.collections.CollectionUtils.isNotEmpty(topicList)) { consumer = getInitConsumer(topicList); // 開啟一個(gè)消費(fèi)者線程 new Thread(() -> { while (true) { // 模擬從配置源中獲取最新的topic(字符串,逗號(hào)隔開) final List<String> newTopic = Splitter.on(",").splitToList(Objects.requireNonNull(getNewTopic())); // 如果topic發(fā)生變化 if (!topicList.equals(newTopic)) { log.info("topic 發(fā)生變化:newTopic:{},oldTopic:{}-------------------------", newTopic, topicList); // method one:重新訂閱topic: topicList = newTopic; consumer.subscribe(newTopic); // method two:關(guān)閉原來的消費(fèi)者,重新初始化一個(gè)消費(fèi)者 //consumer.close(); //topicList = newTopic; //consumer = getInitConsumer(newTopic); continue; } ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.println("key:" + record.key() + "" + ",value:" + record.value()); } } }).start(); } } }
說一下第72行代碼:
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
上面這行代碼表示:在100ms內(nèi)等待Kafka的broker返回?cái)?shù)據(jù).超市參數(shù)指定poll在多久之后可以返回,不管有沒有可用的數(shù)據(jù)都要返回。
在修改topic后,必須等到此次poll拉取的消息處理完,while(true)循環(huán)的時(shí)候檢測topic發(fā)生變化,才能重新訂閱topic.
poll()方法一次拉取得消息數(shù)默認(rèn)為:500,如下圖,kafka客戶端源碼中設(shè)置的。
如果想自定義此配置,可在初始化消費(fèi)者時(shí)加入
運(yùn)行結(jié)果(測試的topic中都無數(shù)據(jù))
注意:KafkaConsumer是線程不安全的,不要用一個(gè)KafkaConsumer實(shí)例開啟多個(gè)消費(fèi)者,要開啟多個(gè)消費(fèi)者,需要new 多個(gè)KafkaConsumer實(shí)例。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringMVC多個(gè)文件上傳及上傳后立即顯示圖片功能
這篇文章主要介紹了SpringMVC多個(gè)文件上傳及上傳后立即顯示圖片功能,非常不錯(cuò),具有參考借鑒價(jià)值功能,需要的朋友可以參考下2017-10-10Java Socket實(shí)現(xiàn)聊天室附1500行源代碼
Socket是應(yīng)用層與TCP/IP協(xié)議族通信的中間軟件抽象層,它是一組接口。本篇文章手把手帶你通過Java Socket來實(shí)現(xiàn)自己的聊天室,大家可以在過程中查缺補(bǔ)漏,溫故而知新2021-10-10SpringBoot整合MyBatis-Plus3.1教程詳解
這篇文章主要介紹了SpringBoot整合MyBatis-Plus3.1詳細(xì)教程,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2019-08-08MybatisPlus查詢條件為空字符串或null問題及解決
這篇文章主要介紹了MybatisPlus查詢條件為空字符串或null問題及解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06