欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java Kafka分區(qū)發(fā)送及消費(fèi)實(shí)戰(zhàn)

 更新時(shí)間:2022年07月14日 10:08:19   作者:熱黃油啤酒  
本文主要介紹了Kafka分區(qū)發(fā)送及消費(fèi)實(shí)戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

前言

Kafka是現(xiàn)在非常熱門的分布式消息隊(duì)列,常用于微服務(wù)間異步通信,業(yè)務(wù)解耦等場(chǎng)景。kafka的性能非常強(qiáng)大,但是單個(gè)微服務(wù)吞吐性能是有上限的,我們就會(huì)用到分布式微服務(wù),多消費(fèi)者多生產(chǎn)者進(jìn)行數(shù)據(jù)處理,保證性能同時(shí)也能根據(jù)業(yè)務(wù)量進(jìn)行橫向拓展,對(duì)于同一個(gè)微服務(wù)的多個(gè)實(shí)例,輸入輸出的topic是同一個(gè),這時(shí)候我們就可以利用Kafka分區(qū)消費(fèi)來解決這個(gè)問題。

業(yè)務(wù)場(chǎng)景

我們開發(fā)的是一個(gè)物聯(lián)網(wǎng)系統(tǒng),大量設(shè)備接入到平臺(tái)實(shí)時(shí)發(fā)送數(shù)據(jù),有秒級(jí)數(shù)據(jù)和分鐘級(jí)別數(shù)據(jù)等等,處理流程包含接入、處理、存儲(chǔ),這三個(gè)模塊間就是使用kafka進(jìn)行數(shù)據(jù)流轉(zhuǎn),數(shù)據(jù)處理模塊中包含多個(gè)微服務(wù),單條數(shù)據(jù)會(huì)經(jīng)歷多次處理,部分業(yè)務(wù)耗時(shí)較長,導(dǎo)致在高頻率接收到數(shù)據(jù)時(shí)候單體服務(wù)無法達(dá)到吞吐平衡,于是對(duì)于這些服務(wù)進(jìn)行了分布式部署,多個(gè)實(shí)例進(jìn)行消費(fèi)處理。

業(yè)務(wù)實(shí)現(xiàn)

不指定分區(qū)

我們?cè)诮okafka發(fā)送消息時(shí)候,如果不指定分區(qū),是不需要手動(dòng)創(chuàng)建topic的,發(fā)送時(shí)沒有topic,kafka會(huì)自動(dòng)創(chuàng)建一個(gè)分區(qū)為1的topic,如下:

@Service
public class ProductService {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    public void send(String msg, String topic) {
        kafkaTemplate.send(topic, msg);
    }
}

指定分區(qū)

topic分區(qū)初始化及配置

指定分區(qū)發(fā)送時(shí)候,如果未配置topic分區(qū)數(shù),指定>0的分區(qū),會(huì)提示分區(qū)不存在,這時(shí)候我們就需要提前創(chuàng)建好topic及分區(qū)

手動(dòng)創(chuàng)建,服務(wù)啟動(dòng)前,使用kafka tool手動(dòng)創(chuàng)建topic 不推薦 x

自動(dòng)創(chuàng)建,服務(wù)啟動(dòng)時(shí),使用KafkaClient創(chuàng)建 推薦 √

/**
 * 初始化多分區(qū)的topic 基于springboot2
 */
@Component
public void TopicInitRunner implements ApplicationRunner {

    @Autowired
    private AdminClient adminClient;
    
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 通過配置文件讀取自定義配置的topic名及分區(qū)數(shù) 省略...
        // Key topic V 分區(qū)數(shù)
        Map<String, Integer> topicPartitionMap = new HashMap<>();
        for (Map.Entry<String, Integer> e : topicPartitionMap.entrySet()) {
            createTopic(e.getKey(), e.getValue());
        }
        
    }

    public void createTopic(String topic, int partition) {
        NewTopic newTopic = new NewTopic(topic, partition);
        adminClient.createTopics(Lists.newArrayList(newTopic));
    }
}

/**
 * 配置類參考 基于springboot2
 * 如果只進(jìn)行普通的單消息發(fā)送 無需添加此配置到項(xiàng)目中
 */
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfig());
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        return new KafkaAdmin(props);
    }
}

生產(chǎn)者分區(qū)發(fā)送方案

上面講到如何初始化分區(qū)topic,這時(shí)候我們的kafka環(huán)境已經(jīng)準(zhǔn)備好了,我們先使用TopicInitRunner為我們創(chuàng)建一個(gè)名稱為 partition-topic 分區(qū)數(shù)為三,現(xiàn)在講一講如何均勻的講消息發(fā)送的每個(gè)分區(qū)上,如何保證多消費(fèi)者實(shí)例是負(fù)載均衡的,具體方案如下:

  • 1.因?yàn)槊織l消息都是設(shè)備上傳的,都會(huì)有設(shè)備id,先給每個(gè)設(shè)備生成一個(gè)自增號(hào),這樣1000個(gè)設(shè)備,每個(gè)設(shè)備就會(huì)有0到999的自增號(hào),放到緩存中,每次根據(jù)消息中的設(shè)備id獲取到該設(shè)備的自增號(hào)
  • 2.使用自增號(hào)對(duì)分區(qū)數(shù)進(jìn)行取模操作,代碼實(shí)現(xiàn)如下:
public class ProductService {
    /**
     * data為需要發(fā)送的數(shù)據(jù)
     */
    public void partitionSend(String topic, int partition, JSONObject data) {
         // 獲取設(shè)備id
        String deviceId = data.getString("deviceId");
        // 獲取自增數(shù) 如果是新設(shè)備會(huì)創(chuàng)建一個(gè)并放入緩存中
        int inc = getDeviceInc(deviceId);
        // 如果分區(qū)數(shù)為3 設(shè)備自增id為1 取模結(jié)果為1 就是發(fā)送到1分區(qū) 這樣1000個(gè)設(shè)備就可以保證每個(gè)分區(qū)發(fā)送數(shù)據(jù)量是1000 / 3
        int targetPartition = Math.floorMod(inc, partition);
        // 分區(qū)發(fā)送時(shí)候 需要指定一個(gè)唯一k 可以使用uuid或者百度提供的雪花算法獲取id 字符串即可
        kafkaTemplate.send(topic, partition, getUuid(), data.toJSONString());
    }
}

消費(fèi)者

我們講到消費(fèi)者使用分布式部署,一個(gè)微服務(wù)有多個(gè)實(shí)例,我們只需要按照服務(wù)監(jiān)聽的topic分區(qū)數(shù)創(chuàng)建對(duì)應(yīng)數(shù)目的服務(wù)實(shí)例即可,這樣kafka就會(huì)自動(dòng)分配對(duì)應(yīng)分區(qū)的數(shù)據(jù)到每個(gè)實(shí)例。

我們采取批量消費(fèi),進(jìn)一步提高服務(wù)吞吐性能,消費(fèi)及配置代碼如下,配置文件參考springbootkafka配置即可,主要設(shè)計(jì)kafka服務(wù)配置,消費(fèi)及生產(chǎn)配置,比較核心的是

@Component
public class DataListener {

    @Autowired
    private MongoTemplate mongoTemplate;

    /**
     * 站點(diǎn)報(bào)文監(jiān)聽消費(fèi)
     *
     * @param records
     */
    @KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory")
    public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) {        
    }
    
    /**
     * 消費(fèi)者配置
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    /**
     * 批量消費(fèi)配置
     */
    @Bean
    public KafkaListenerContainerFactory batchConsumerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setBatchListener(true);
        return factory;
    }
}

到此這篇關(guān)于Java Kafka分區(qū)發(fā)送及消費(fèi)實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)Kafka分區(qū)發(fā)送及消費(fèi)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java項(xiàng)目如何防止SQL注入(多種方案匯總)

    Java項(xiàng)目如何防止SQL注入(多種方案匯總)

    SQL注入即是指web應(yīng)用程序?qū)τ脩糨斎霐?shù)據(jù)的合法性沒有判斷或過濾不嚴(yán),攻擊者可以在web應(yīng)用程序中事先定義好的查詢語句的結(jié)尾上添加額外的SQL語句,這篇文章主要介紹了?Java項(xiàng)目防止SQL注入的四種方案,需要的朋友可以參考下
    2023-12-12
  • 深入分析Java內(nèi)存區(qū)域的使用詳解

    深入分析Java內(nèi)存區(qū)域的使用詳解

    本篇文章對(duì)Java內(nèi)存區(qū)域的使用進(jìn)行了詳細(xì)的介紹。需要的朋友參考下
    2013-05-05
  • Java中對(duì)list元素進(jìn)行排序的方法詳解

    Java中對(duì)list元素進(jìn)行排序的方法詳解

    這篇文章主要介紹了Java中對(duì)list元素進(jìn)行排序的方法詳解,是Java入門學(xué)習(xí)中的基礎(chǔ)知識(shí),需要的朋友可以參考下
    2015-09-09
  • Java?Unsafe創(chuàng)建對(duì)象的方法實(shí)現(xiàn)

    Java?Unsafe創(chuàng)建對(duì)象的方法實(shí)現(xiàn)

    Java中使用Unsafe實(shí)例化對(duì)象是一項(xiàng)十分有趣而且強(qiáng)大的功能,本文主要介紹了Java?Unsafe創(chuàng)建對(duì)象的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-07-07
  • java網(wǎng)絡(luò)編程中向指定URL發(fā)送GET POST請(qǐng)求示例

    java網(wǎng)絡(luò)編程中向指定URL發(fā)送GET POST請(qǐng)求示例

    這篇文章主要介紹了java向指定URL發(fā)送GET POST請(qǐng)求示例,學(xué)習(xí)JAVA網(wǎng)絡(luò)編程一定會(huì)用到的,大家參考使用吧
    2013-11-11
  • Java Spring5學(xué)習(xí)之JdbcTemplate詳解

    Java Spring5學(xué)習(xí)之JdbcTemplate詳解

    這篇文章主要介紹了Java Spring5學(xué)習(xí)之JdbcTemplate詳解,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-05-05
  • 一文帶你搞懂Java中方法重寫與方法重載的區(qū)別

    一文帶你搞懂Java中方法重寫與方法重載的區(qū)別

    這篇文章主要介紹了Java中方法重寫與方法重載有哪些區(qū)別,文中有詳細(xì)的代碼示例,對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2023-05-05
  • JAVA實(shí)現(xiàn)SOCKET多客戶端通信的案例

    JAVA實(shí)現(xiàn)SOCKET多客戶端通信的案例

    這篇文章主要介紹了JAVA實(shí)現(xiàn)SOCKET多客戶端通信的案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • JAVA如何調(diào)用Shell腳本

    JAVA如何調(diào)用Shell腳本

    本篇文章主要介紹了JAVA如何調(diào)用Shell腳本,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-08-08
  • Java中方法的使用、重載與遞歸的詳細(xì)介紹

    Java中方法的使用、重載與遞歸的詳細(xì)介紹

    前面我們提到了方法需要參數(shù)類型,但是如果我們需要用一個(gè)函數(shù)同時(shí)兼容多種參數(shù)的情況應(yīng)該怎么辦呢? 這里就可以使用到方法重載,對(duì)Java中方法的使用、重載與遞歸相關(guān)知識(shí)感興趣的朋友一起看看吧
    2021-11-11

最新評(píng)論