Java Kafka分區(qū)發(fā)送及消費(fèi)實(shí)戰(zhàn)
前言
Kafka是現(xiàn)在非常熱門的分布式消息隊(duì)列,常用于微服務(wù)間異步通信,業(yè)務(wù)解耦等場(chǎng)景。kafka的性能非常強(qiáng)大,但是單個(gè)微服務(wù)吞吐性能是有上限的,我們就會(huì)用到分布式微服務(wù),多消費(fèi)者多生產(chǎn)者進(jìn)行數(shù)據(jù)處理,保證性能同時(shí)也能根據(jù)業(yè)務(wù)量進(jìn)行橫向拓展,對(duì)于同一個(gè)微服務(wù)的多個(gè)實(shí)例,輸入輸出的topic是同一個(gè),這時(shí)候我們就可以利用Kafka分區(qū)消費(fèi)來解決這個(gè)問題。
業(yè)務(wù)場(chǎng)景
我們開發(fā)的是一個(gè)物聯(lián)網(wǎng)系統(tǒng),大量設(shè)備接入到平臺(tái)實(shí)時(shí)發(fā)送數(shù)據(jù),有秒級(jí)數(shù)據(jù)和分鐘級(jí)別數(shù)據(jù)等等,處理流程包含接入、處理、存儲(chǔ),這三個(gè)模塊間就是使用kafka進(jìn)行數(shù)據(jù)流轉(zhuǎn),數(shù)據(jù)處理模塊中包含多個(gè)微服務(wù),單條數(shù)據(jù)會(huì)經(jīng)歷多次處理,部分業(yè)務(wù)耗時(shí)較長,導(dǎo)致在高頻率接收到數(shù)據(jù)時(shí)候單體服務(wù)無法達(dá)到吞吐平衡,于是對(duì)于這些服務(wù)進(jìn)行了分布式部署,多個(gè)實(shí)例進(jìn)行消費(fèi)處理。
業(yè)務(wù)實(shí)現(xiàn)
不指定分區(qū)
我們?cè)诮okafka發(fā)送消息時(shí)候,如果不指定分區(qū),是不需要手動(dòng)創(chuàng)建topic的,發(fā)送時(shí)沒有topic,kafka會(huì)自動(dòng)創(chuàng)建一個(gè)分區(qū)為1的topic,如下:
@Service public class ProductService { @Autowired private KafkaTemplate kafkaTemplate; public void send(String msg, String topic) { kafkaTemplate.send(topic, msg); } }
指定分區(qū)
topic分區(qū)初始化及配置
指定分區(qū)發(fā)送時(shí)候,如果未配置topic分區(qū)數(shù),指定>0的分區(qū),會(huì)提示分區(qū)不存在,這時(shí)候我們就需要提前創(chuàng)建好topic及分區(qū)
手動(dòng)創(chuàng)建,服務(wù)啟動(dòng)前,使用kafka tool手動(dòng)創(chuàng)建topic 不推薦 x
自動(dòng)創(chuàng)建,服務(wù)啟動(dòng)時(shí),使用KafkaClient創(chuàng)建 推薦 √
/** * 初始化多分區(qū)的topic 基于springboot2 */ @Component public void TopicInitRunner implements ApplicationRunner { @Autowired private AdminClient adminClient; @Override public void run(ApplicationArguments args) throws Exception { // 通過配置文件讀取自定義配置的topic名及分區(qū)數(shù) 省略... // Key topic V 分區(qū)數(shù) Map<String, Integer> topicPartitionMap = new HashMap<>(); for (Map.Entry<String, Integer> e : topicPartitionMap.entrySet()) { createTopic(e.getKey(), e.getValue()); } } public void createTopic(String topic, int partition) { NewTopic newTopic = new NewTopic(topic, partition); adminClient.createTopics(Lists.newArrayList(newTopic)); } } /** * 配置類參考 基于springboot2 * 如果只進(jìn)行普通的單消息發(fā)送 無需添加此配置到項(xiàng)目中 */ @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String servers; @Bean public AdminClient adminClient() { return AdminClient.create(kafkaAdmin().getConfig()); } @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> props = Maps.newHashMap(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); return new KafkaAdmin(props); } }
生產(chǎn)者分區(qū)發(fā)送方案
上面講到如何初始化分區(qū)topic,這時(shí)候我們的kafka環(huán)境已經(jīng)準(zhǔn)備好了,我們先使用TopicInitRunner為我們創(chuàng)建一個(gè)名稱為 partition-topic 分區(qū)數(shù)為三,現(xiàn)在講一講如何均勻的講消息發(fā)送的每個(gè)分區(qū)上,如何保證多消費(fèi)者實(shí)例是負(fù)載均衡的,具體方案如下:
- 1.因?yàn)槊織l消息都是設(shè)備上傳的,都會(huì)有設(shè)備id,先給每個(gè)設(shè)備生成一個(gè)自增號(hào),這樣1000個(gè)設(shè)備,每個(gè)設(shè)備就會(huì)有0到999的自增號(hào),放到緩存中,每次根據(jù)消息中的設(shè)備id獲取到該設(shè)備的自增號(hào)
- 2.使用自增號(hào)對(duì)分區(qū)數(shù)進(jìn)行取模操作,代碼實(shí)現(xiàn)如下:
public class ProductService { /** * data為需要發(fā)送的數(shù)據(jù) */ public void partitionSend(String topic, int partition, JSONObject data) { // 獲取設(shè)備id String deviceId = data.getString("deviceId"); // 獲取自增數(shù) 如果是新設(shè)備會(huì)創(chuàng)建一個(gè)并放入緩存中 int inc = getDeviceInc(deviceId); // 如果分區(qū)數(shù)為3 設(shè)備自增id為1 取模結(jié)果為1 就是發(fā)送到1分區(qū) 這樣1000個(gè)設(shè)備就可以保證每個(gè)分區(qū)發(fā)送數(shù)據(jù)量是1000 / 3 int targetPartition = Math.floorMod(inc, partition); // 分區(qū)發(fā)送時(shí)候 需要指定一個(gè)唯一k 可以使用uuid或者百度提供的雪花算法獲取id 字符串即可 kafkaTemplate.send(topic, partition, getUuid(), data.toJSONString()); } }
消費(fèi)者
我們講到消費(fèi)者使用分布式部署,一個(gè)微服務(wù)有多個(gè)實(shí)例,我們只需要按照服務(wù)監(jiān)聽的topic分區(qū)數(shù)創(chuàng)建對(duì)應(yīng)數(shù)目的服務(wù)實(shí)例即可,這樣kafka就會(huì)自動(dòng)分配對(duì)應(yīng)分區(qū)的數(shù)據(jù)到每個(gè)實(shí)例。
我們采取批量消費(fèi),進(jìn)一步提高服務(wù)吞吐性能,消費(fèi)及配置代碼如下,配置文件參考springbootkafka配置即可,主要設(shè)計(jì)kafka服務(wù)配置,消費(fèi)及生產(chǎn)配置,比較核心的是
@Component public class DataListener { @Autowired private MongoTemplate mongoTemplate; /** * 站點(diǎn)報(bào)文監(jiān)聽消費(fèi) * * @param records */ @KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory") public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) { } /** * 消費(fèi)者配置 */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * 批量消費(fèi)配置 */ @Bean public KafkaListenerContainerFactory batchConsumerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setBatchListener(true); return factory; } }
到此這篇關(guān)于Java Kafka分區(qū)發(fā)送及消費(fèi)實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)Kafka分區(qū)發(fā)送及消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java項(xiàng)目如何防止SQL注入(多種方案匯總)
SQL注入即是指web應(yīng)用程序?qū)τ脩糨斎霐?shù)據(jù)的合法性沒有判斷或過濾不嚴(yán),攻擊者可以在web應(yīng)用程序中事先定義好的查詢語句的結(jié)尾上添加額外的SQL語句,這篇文章主要介紹了?Java項(xiàng)目防止SQL注入的四種方案,需要的朋友可以參考下2023-12-12Java中對(duì)list元素進(jìn)行排序的方法詳解
這篇文章主要介紹了Java中對(duì)list元素進(jìn)行排序的方法詳解,是Java入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下2015-09-09Java?Unsafe創(chuàng)建對(duì)象的方法實(shí)現(xiàn)
Java中使用Unsafe實(shí)例化對(duì)象是一項(xiàng)十分有趣而且強(qiáng)大的功能,本文主要介紹了Java?Unsafe創(chuàng)建對(duì)象的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07java網(wǎng)絡(luò)編程中向指定URL發(fā)送GET POST請(qǐng)求示例
這篇文章主要介紹了java向指定URL發(fā)送GET POST請(qǐng)求示例,學(xué)習(xí)JAVA網(wǎng)絡(luò)編程一定會(huì)用到的,大家參考使用吧2013-11-11Java Spring5學(xué)習(xí)之JdbcTemplate詳解
這篇文章主要介紹了Java Spring5學(xué)習(xí)之JdbcTemplate詳解,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-05-05JAVA實(shí)現(xiàn)SOCKET多客戶端通信的案例
這篇文章主要介紹了JAVA實(shí)現(xiàn)SOCKET多客戶端通信的案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-12-12