欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java Kafka分區(qū)發(fā)送及消費實戰(zhàn)

 更新時間:2022年07月14日 10:08:19   作者:熱黃油啤酒  
本文主要介紹了Kafka分區(qū)發(fā)送及消費實戰(zhàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

前言

Kafka是現在非常熱門的分布式消息隊列,常用于微服務間異步通信,業(yè)務解耦等場景。kafka的性能非常強大,但是單個微服務吞吐性能是有上限的,我們就會用到分布式微服務,多消費者多生產者進行數據處理,保證性能同時也能根據業(yè)務量進行橫向拓展,對于同一個微服務的多個實例,輸入輸出的topic是同一個,這時候我們就可以利用Kafka分區(qū)消費來解決這個問題。

業(yè)務場景

我們開發(fā)的是一個物聯網系統,大量設備接入到平臺實時發(fā)送數據,有秒級數據和分鐘級別數據等等,處理流程包含接入、處理、存儲,這三個模塊間就是使用kafka進行數據流轉,數據處理模塊中包含多個微服務,單條數據會經歷多次處理,部分業(yè)務耗時較長,導致在高頻率接收到數據時候單體服務無法達到吞吐平衡,于是對于這些服務進行了分布式部署,多個實例進行消費處理。

業(yè)務實現

不指定分區(qū)

我們在給kafka發(fā)送消息時候,如果不指定分區(qū),是不需要手動創(chuàng)建topic的,發(fā)送時沒有topic,kafka會自動創(chuàng)建一個分區(qū)為1的topic,如下:

@Service
public class ProductService {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    public void send(String msg, String topic) {
        kafkaTemplate.send(topic, msg);
    }
}

指定分區(qū)

topic分區(qū)初始化及配置

指定分區(qū)發(fā)送時候,如果未配置topic分區(qū)數,指定>0的分區(qū),會提示分區(qū)不存在,這時候我們就需要提前創(chuàng)建好topic及分區(qū)

手動創(chuàng)建,服務啟動前,使用kafka tool手動創(chuàng)建topic 不推薦 x

自動創(chuàng)建,服務啟動時,使用KafkaClient創(chuàng)建 推薦 √

/**
 * 初始化多分區(qū)的topic 基于springboot2
 */
@Component
public void TopicInitRunner implements ApplicationRunner {

    @Autowired
    private AdminClient adminClient;
    
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 通過配置文件讀取自定義配置的topic名及分區(qū)數 省略...
        // Key topic V 分區(qū)數
        Map<String, Integer> topicPartitionMap = new HashMap<>();
        for (Map.Entry<String, Integer> e : topicPartitionMap.entrySet()) {
            createTopic(e.getKey(), e.getValue());
        }
        
    }

    public void createTopic(String topic, int partition) {
        NewTopic newTopic = new NewTopic(topic, partition);
        adminClient.createTopics(Lists.newArrayList(newTopic));
    }
}

/**
 * 配置類參考 基于springboot2
 * 如果只進行普通的單消息發(fā)送 無需添加此配置到項目中
 */
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfig());
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        return new KafkaAdmin(props);
    }
}

生產者分區(qū)發(fā)送方案

上面講到如何初始化分區(qū)topic,這時候我們的kafka環(huán)境已經準備好了,我們先使用TopicInitRunner為我們創(chuàng)建一個名稱為 partition-topic 分區(qū)數為三,現在講一講如何均勻的講消息發(fā)送的每個分區(qū)上,如何保證多消費者實例是負載均衡的,具體方案如下:

  • 1.因為每條消息都是設備上傳的,都會有設備id,先給每個設備生成一個自增號,這樣1000個設備,每個設備就會有0到999的自增號,放到緩存中,每次根據消息中的設備id獲取到該設備的自增號
  • 2.使用自增號對分區(qū)數進行取模操作,代碼實現如下:
public class ProductService {
    /**
     * data為需要發(fā)送的數據
     */
    public void partitionSend(String topic, int partition, JSONObject data) {
         // 獲取設備id
        String deviceId = data.getString("deviceId");
        // 獲取自增數 如果是新設備會創(chuàng)建一個并放入緩存中
        int inc = getDeviceInc(deviceId);
        // 如果分區(qū)數為3 設備自增id為1 取模結果為1 就是發(fā)送到1分區(qū) 這樣1000個設備就可以保證每個分區(qū)發(fā)送數據量是1000 / 3
        int targetPartition = Math.floorMod(inc, partition);
        // 分區(qū)發(fā)送時候 需要指定一個唯一k 可以使用uuid或者百度提供的雪花算法獲取id 字符串即可
        kafkaTemplate.send(topic, partition, getUuid(), data.toJSONString());
    }
}

消費者

我們講到消費者使用分布式部署,一個微服務有多個實例,我們只需要按照服務監(jiān)聽的topic分區(qū)數創(chuàng)建對應數目的服務實例即可,這樣kafka就會自動分配對應分區(qū)的數據到每個實例。

我們采取批量消費,進一步提高服務吞吐性能,消費及配置代碼如下,配置文件參考springbootkafka配置即可,主要設計kafka服務配置,消費及生產配置,比較核心的是

@Component
public class DataListener {

    @Autowired
    private MongoTemplate mongoTemplate;

    /**
     * 站點報文監(jiān)聽消費
     *
     * @param records
     */
    @KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory")
    public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) {        
    }
    
    /**
     * 消費者配置
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    /**
     * 批量消費配置
     */
    @Bean
    public KafkaListenerContainerFactory batchConsumerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setBatchListener(true);
        return factory;
    }
}

到此這篇關于Java Kafka分區(qū)發(fā)送及消費實戰(zhàn)的文章就介紹到這了,更多相關Kafka分區(qū)發(fā)送及消費內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java項目如何防止SQL注入(多種方案匯總)

    Java項目如何防止SQL注入(多種方案匯總)

    SQL注入即是指web應用程序對用戶輸入數據的合法性沒有判斷或過濾不嚴,攻擊者可以在web應用程序中事先定義好的查詢語句的結尾上添加額外的SQL語句,這篇文章主要介紹了?Java項目防止SQL注入的四種方案,需要的朋友可以參考下
    2023-12-12
  • 深入分析Java內存區(qū)域的使用詳解

    深入分析Java內存區(qū)域的使用詳解

    本篇文章對Java內存區(qū)域的使用進行了詳細的介紹。需要的朋友參考下
    2013-05-05
  • Java中對list元素進行排序的方法詳解

    Java中對list元素進行排序的方法詳解

    這篇文章主要介紹了Java中對list元素進行排序的方法詳解,是Java入門學習中的基礎知識,需要的朋友可以參考下
    2015-09-09
  • Java?Unsafe創(chuàng)建對象的方法實現

    Java?Unsafe創(chuàng)建對象的方法實現

    Java中使用Unsafe實例化對象是一項十分有趣而且強大的功能,本文主要介紹了Java?Unsafe創(chuàng)建對象的方法實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-07-07
  • java網絡編程中向指定URL發(fā)送GET POST請求示例

    java網絡編程中向指定URL發(fā)送GET POST請求示例

    這篇文章主要介紹了java向指定URL發(fā)送GET POST請求示例,學習JAVA網絡編程一定會用到的,大家參考使用吧
    2013-11-11
  • Java Spring5學習之JdbcTemplate詳解

    Java Spring5學習之JdbcTemplate詳解

    這篇文章主要介紹了Java Spring5學習之JdbcTemplate詳解,文中有非常詳細的代碼示例,對正在學習java的小伙伴們有非常好的幫助,需要的朋友可以參考下
    2021-05-05
  • 一文帶你搞懂Java中方法重寫與方法重載的區(qū)別

    一文帶你搞懂Java中方法重寫與方法重載的區(qū)別

    這篇文章主要介紹了Java中方法重寫與方法重載有哪些區(qū)別,文中有詳細的代碼示例,對大家的學習或工作有一定的幫助,需要的朋友可以參考下
    2023-05-05
  • JAVA實現SOCKET多客戶端通信的案例

    JAVA實現SOCKET多客戶端通信的案例

    這篇文章主要介紹了JAVA實現SOCKET多客戶端通信的案例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • JAVA如何調用Shell腳本

    JAVA如何調用Shell腳本

    本篇文章主要介紹了JAVA如何調用Shell腳本,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-08-08
  • Java中方法的使用、重載與遞歸的詳細介紹

    Java中方法的使用、重載與遞歸的詳細介紹

    前面我們提到了方法需要參數類型,但是如果我們需要用一個函數同時兼容多種參數的情況應該怎么辦呢? 這里就可以使用到方法重載,對Java中方法的使用、重載與遞歸相關知識感興趣的朋友一起看看吧
    2021-11-11

最新評論