欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java分布式流處理組件Producer入門詳解

 更新時間:2023年03月07日 10:55:35   作者:謝先生說技術  
這篇文章主要為大家介紹了java分布式流處理組件Producer入門詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

前面兩章我們花費了很長的時間將Kafka的整體架構,包括其中涉及到的角色、每個角色所對對應的用途進行了整體的一個串聯(lián)。然后我們也通過Kafka所提供的腳本進行了相對應的操作,并且對核心參數進行了分析。

相信大家對于Kafka的處理和消費流程已經有了一個比較籠統(tǒng)的概念。光是如此還是不夠的,那么接下來我們就開始對其中的每一個角色做一個詳細的分析。

先從生產者開始,我們需要對其中有如下了解:

  • 了解外部數據是如何通過生產者,經過層層編碼,然后進入到了集群內部進行存儲。
  • 同步和異步數據是如何操作,Broker如何處理應答。
  • 消息發(fā)送失敗后的重試機制
  • ...

等等的一切,慢慢往下看吧~~~

基于Java的API

首先, 在了解生產者發(fā)送消息的原理之前,我們應該先學會如何去發(fā)送消息。

Kafka為我們提供了很多項目可以操作的API客戶端,包括:

  • C/C++
  • GO
  • Python
  • ...

更多需要對接Kafka的項目可以點擊這里進行查看

我本人屬于Java開發(fā),所以我這里就通過Java項目來做一個QuickStart項目

通過官網查看API菜單,官方文檔上也是Java的版本。我們根據提示一步步操作即可~

先新建maven項目,并且引入對應的****kafka-clients依賴

建議:Kafka-clients依賴版本,最好和安裝的kafka版本一致

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.1</version>
</dependency>

同步發(fā)送

Kafka生產者主要靠KafkaProducer來進行操作。點擊到對應的文檔頁面,我們可以看到關于KafkaProducer<K,V> 的詳細信息。

一個好的組件是非常貼心的, 甚至我們都不用去網上搜任何相關的資料,只需要通過查看對應的注釋就可以知道這個東西該怎么用。

Properties config = new Properties();
// --bootstrap-server
config.setProperty(
  ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
  "master:9092,node01:9092,node02:9092"
);
// key 序列化器
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化器
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
try(Producer<String, String> producer = new KafkaProducer<>(config)) {
    ProducerRecord<String, String> record = new ProducerRecord<>(
            "newTopic001",
            "key01",
            "data from " + KafkaQuickProducer.class.getName()
    ); 
    RecordMetadata recordMetadata = producer.send(record).get();
    System.out.println(
            MessageFormat.format("{0}\t{1}\t{2}\t{3}", 
                    recordMetadata.topic(), 
                    recordMetadata.partition(),
                    recordMetadata.offset(), 
                    recordMetadata.timestamp()
            )
    );
} catch (Exception e) {
    e.printStackTrace();
}

以上代碼就是同步發(fā)送的過程,這已經是在開發(fā)過程中需要配置的最小單元,而其他關于生產者的配置,我們可以通過ProducerConfig來進行查看

** 與命令行上的參數,基本上是一模一樣的**

而關于序列化器的問題,我們在下面原理的部分說明

異步發(fā)送

我們在調用同步send的時候,發(fā)現(xiàn)有兩個參數的方法, 而這個方法實現(xiàn)的就是****異步發(fā)送

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

異步發(fā)送會將發(fā)送結果以事件驅動的形式傳遞,那么這里,我們就需要注意一點:

  • 程序調用完成之后,不能讓他立即執(zhí)行,否則我們無法查看到具體的發(fā)送結果

接下來我們看具體的程序實現(xiàn)。理論上:我們只需要改最后發(fā)送的部分

Properties config = new Properties();
// --bootstrap-server
config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,node01:9092,node02:9092");
// key 序列化器
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化器
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
try(Producer<String, String> producer = new KafkaProducer<>(config)) {
    ProducerRecord<String, String> record = new ProducerRecord<>(
            "newTopic001",
            "key01",
            "data from " + KafkaQuickProducer.class.getName()
    );
    async(producer, record);
} catch (Exception e) {
    e.printStackTrace();
}
// 異步發(fā)送
private static void async(Producer<String, String> producer, ProducerRecord<String, String> record) {
    producer.send(record, (recordMetadata, exception) -> {
        if (null != exception) {
            exception.printStackTrace();
            return;
        }
        System.out.println(
                MessageFormat.format("{0}\t{1}\t{2}\t{3}",
                        recordMetadata.topic(),
                        recordMetadata.partition(),
                        recordMetadata.offset(),
                        recordMetadata.timestamp()
                )
        );
    });
    try {
        // 將程序進行阻塞,防止由于消息發(fā)送成功之后進程停止而無法接收到事件反饋
        System.in.read();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

這屬于整個生產者發(fā)送消息方式的最小單元,本文屬于Producer入門階段。

在ProducerConfig中還包含了非常多的配置項,更多的配置信息我們會在優(yōu)化章節(jié)中說明。

原理

在第一部分,我們已經了解到,關于生產者最基本的使用方式,到這里,其實我想跟大家聊一聊:

  • 生產者在發(fā)送消息的時候中間到底經歷了什么?

大家應該已經看到上面的那張原理圖,我們可以從中找出答案!

主線程

**這里我們分為兩個線程塊來說明, 第一部分是Main主線程, 也就是生產者在調用****send()**方法時所在的線程

在這里,我們可以看到:

  • 外部數據首先被封裝為ProducerRecord**,然后調用**send()**方法。
  • 在send()過程中,經過攔截器、序列化器、分區(qū)器等處理之后進入到RecordAccumulator中。

接下來我們仔細聊一聊攔截器、序列化器、分區(qū)器的作用

攔截器

攔截器很類似于我們在SpringMVC中Interceptor的功能,而且在Producer中我們是可以自定義攔截器的。

我們可以在發(fā)送之前對數據進行攔截處理,比如說:統(tǒng)計生產者發(fā)送數據的總量等等。

當然目前來講,我們如果不開發(fā)Kafka監(jiān)控平臺的話,這里攔截器的用處并不大。我們忽略不計即可

后續(xù)如果有機會的話,我們可以專門寫篇文章,用來介紹如何開發(fā)一個攔截器

序列化器

而序列化器,主要對兩個部分的數據進行處理:

  • Key
  • Value
byte[] serializedKey 
  = serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue
  = valueSerializer.serialize(record.topic(), record.headers(), record.value());

從本質上來講,外部數據屬于屬于對象,而對象不能直接通過網絡進行傳輸。 所以我們就需要一個序列化器,將它轉換成字節(jié)數組,進而進行傳輸

Kafka本身為我們提供了很多可用的序列化器,不過我們能用到最多的還是StringSerializer。

在生產端將消息進行序列話,那么在消費端必然會進行反序列化操作

分區(qū)器

我們知道Kafka是以Topic為消息發(fā)送的主體,不過由于Topic是一個虛擬的概念, 所以我們沒有辦法在實際中查看到關于Topic的相關信息。 但是前面我們也說過, 當前Topic下的消息數據都是通過Partition進行存儲的。

發(fā)送出去的消息需要存儲在哪個分區(qū)中就是通過分區(qū)器來進行指定的,在我們沒有指定分區(qū)策略的情況下,生產者會通過默認的分區(qū)策略指定當前消息應該存儲在哪個分區(qū)下

分區(qū)的內容還是比較多的,我們會在下一節(jié)做詳細的說明

RecordAccumulator

此時,在主線程的區(qū)域中,當消息進入到默認大小為32m的記錄緩沖區(qū)時, 本區(qū)的工作就到此結束。

緩沖區(qū)中有多個雙端隊列,分別對應Topic不同的分區(qū)。每一個分區(qū)就會創(chuàng)建一個雙端隊列。

此時的消息將會被按照批次的方式存放在隊列中, 默認一批為16k大小。當緩沖區(qū)達到指定條件之后,****sender線程將會被喚醒,Sender程序將會沖隊列中不斷拉出消息進行下一步的發(fā)送

Sender線程

影響Sender線程喚醒的條件

想要喚醒Sender線程有兩個因素,但不是說這兩個條件都必須滿足,他們是或的關系。

batch.size是一個條件,這也是后期針對生產者優(yōu)化的主要參數之一。

當發(fā)送消息之后,生產者會將消息進行整合。將其按照一批一批的方式發(fā)送給Broker,從而減少網絡間的傳輸請求次數。默認情況下為16k。

而如果一批數據的大小累計達到了設置的batch.size之后,sender才會做發(fā)送數據的操作

這是第一個限制

下面再來介紹一個非常強勢的參數:liner.ms。生產者優(yōu)化的主要參數之二。

這么說吧,如果你設置的liner.ms=0,表示不延遲直接發(fā)送。那么batch.size就不會生效了

而liner.ms=0屬于默認配置

如果數據一直沒有達到設置的batch.size大小,數據也不能不發(fā)對吧。所以Kafka也就為我們提供了這樣的參數:

  • 當sender等待liner.ms設置的時間之后【單位ms】,不管數據如何都會將消息進行發(fā)送
  • 如未設置當前參數,表示沒有延遲,直接發(fā)送

下面舉個小例子

config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "5000");

開始發(fā)送

RecordAccumulator內存儲的數據拉取出來之后,開始將其創(chuàng)建為一個個的Request請求。這里需要注意的是:

  • NetworkClient并非一股腦的將全部可發(fā)送數據進行傳輸請求

正相反,為了能夠保證不同分區(qū)所對應DQueue的數據進入到對應的Broker所在的分區(qū)內,Kafka將按照<BrokerId, Request>的形式對請求進行傳輸。如果傳輸到達Broker之后沒有acks應答,那么當前節(jié)點下最多能夠保存5個未響應的請求。

ACKS

這里簡單聊一下它的應答方式。在ProducerConfig.ACKS_DOC下我們也可以看到相關的說明:

  • acks=0: 生產者不會等待Broker的應答,直接表示消息已經發(fā)送成功。而消息有沒有真正達到Broker,不關心。

當然了,這種方式在性能上來講是最好的,適合一些數據不重要的場景

  • acks=1: 生產者將消息發(fā)送到Broker之后,由Leader在本地將消息進行存儲之后,返回發(fā)送成功的應答。

如果Follower還沒有同步到消息,Leader就已經掛了。那么此時就會出現(xiàn)消息丟失的情況

  • acks=all:生產者將消息發(fā)送到Broker之后,由Leader在本地將消息進行存儲,并且Follower同步完消息之后才會返回發(fā)送成功的應答。

這種方式是最能保證數據安全的情況,但是性能也是最低的~

最后:

  • 當Broker返回成功應答之后,RecordAccumulator中的數據將會被清理
  • 如果失敗,可以嘗試重試等操作

總結

而到了這里,本次關于Producer理論篇就結束了,針對API部分大家需要多練,可以先看看關于ProducerConfig內的配置參數說明,可以嘗試先練習練習。

很貼心的框架

后面我們還會介紹一些核心參數

Kafka的分區(qū)處理也是一個比較核心的內容,接下來我們會著重介紹

以上就是java分布式流處理組件Producer入門詳解的詳細內容,更多關于java分布式Producer的資料請關注腳本之家其它相關文章!

相關文章

  • IDEA中的打包Build Artifacts圖文詳解

    IDEA中的打包Build Artifacts圖文詳解

    當項目開發(fā)完畢,需要對外發(fā)布時,我們就會用到IDEABuild Artifacts功能,那么如果在idea中打包呢,這篇文章主要介紹了IDEA中的打包Build Artifacts詳解,需要的朋友可以參考下
    2024-03-03
  • 深入聊聊Java內存泄露問題

    深入聊聊Java內存泄露問題

    所謂內存泄露就是指一個不再被程序使用的對象或變量一直被占據在內存中,下面這篇文章主要給大家介紹了關于Java內存泄露問題的相關資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-04-04
  • Java字節(jié)緩存流的構造方法之文件IO流

    Java字節(jié)緩存流的構造方法之文件IO流

    這篇文章主要介紹了Java字節(jié)緩存流的構造方法之文件IO流,同時也介紹了字符流中的一些相關的內容,并且通過大量的案例供大家理解。最后通過一些經典的案例幫助大家對前面所學的知識做了一個綜合的應用,需要的朋友可以參考一下
    2022-04-04
  • 詳解Spring Boot下Druid連接池的使用配置分析

    詳解Spring Boot下Druid連接池的使用配置分析

    本篇文章主要介紹了詳解Spring Boot下Druid連接池的使用配置分析,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-06-06
  • 聊聊java中引用數據類型有哪些

    聊聊java中引用數據類型有哪些

    這篇文章主要介紹了java中引用數據類型有哪些,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10
  • Java詳細講解文件的讀寫操作方法

    Java詳細講解文件的讀寫操作方法

    文件讀寫主要依靠io流完成,流(Stream)是指一連串的數據(字符或字節(jié)),是以先進先出的方式發(fā)送信息的通道,數據源發(fā)送的數據經過這個通道到達目的地,按流向區(qū)分為輸入流和輸出流
    2022-04-04
  • Spring-AOP @AspectJ進階之如何綁定代理對象

    Spring-AOP @AspectJ進階之如何綁定代理對象

    這篇文章主要介紹了Spring-AOP @AspectJ進階之如何綁定代理對象的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • jenkins+maven+svn自動部署和發(fā)布的詳細圖文教程

    jenkins+maven+svn自動部署和發(fā)布的詳細圖文教程

    Jenkins是一個開源的、可擴展的持續(xù)集成、交付、部署的基于web界面的平臺。這篇文章主要介紹了jenkins+maven+svn自動部署和發(fā)布的詳細圖文教程,需要的朋友可以參考下
    2020-09-09
  • java實現(xiàn)文件重命名

    java實現(xiàn)文件重命名

    這篇文章主要為大家詳細介紹了java實現(xiàn)文件重命名,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-03-03
  • @JsonSerialize不起作用的解決方案

    @JsonSerialize不起作用的解決方案

    這篇文章主要介紹了@JsonSerialize不起作用的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-10-10

最新評論