欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka在客戶端實現(xiàn)消息的發(fā)送與讀取

 更新時間:2023年12月11日 10:10:42   作者:warybee  
這篇文章主要介紹了Kafka在客戶端實現(xiàn)消息的發(fā)送與讀取,KafkaProducer是用于發(fā)送消息的類,ProducerRecord類用于封裝Kafka的消息,KafkaProducer的實例化需要指定的參數(shù),Producer的參數(shù)定義在 org.apache.kafka.clients.producer.ProducerConfig類中,需要的朋友可以參考下

1.創(chuàng)建Maven工程

引入kafka相關依賴,POM文件如下:

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.7</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.生產(chǎn)者API(Producer API)

2.1 生成者處理流程

在這里插入圖片描述

  1. Producer創(chuàng)建時,會創(chuàng)建一個Sender線程并設置為守護線程。
  2. 生產(chǎn)消息時,內部其實是異步流程;生產(chǎn)的消息先經(jīng)過攔截器->序列化器->**分區(qū)器,**然后將消 息緩存在緩沖區(qū)(該緩沖區(qū)也是在Producer創(chuàng)建時創(chuàng)建)。
  3. 批次發(fā)送的條件為:緩沖區(qū)數(shù)據(jù)大小達到batch.size或者linger.ms達到上限,哪個先達到就算 哪個。
  4. 批次發(fā)送后,發(fā)往指定分區(qū),然后落盤到broker;如果生產(chǎn)者配置了retrires參數(shù)大于0并且失 敗原因允許重試,那么客戶端內部會對該消息進行重試。
  5. 落盤到broker成功,返回生產(chǎn)元數(shù)據(jù)給生產(chǎn)者。
  6. 元數(shù)據(jù)返回有兩種方式:一種是通過阻塞直接返回,另一種是通過回調返回。

2.2 常用參數(shù)介紹

生產(chǎn)者主要的對象有: KafkaProducer , ProducerRecord 。

  • KafkaProducer 是用于發(fā)送消息的類;
  • ProducerRecord 類用于封裝Kafka的消息。

KafkaProducer 的實例化需要指定的參數(shù),Producer的參數(shù)定義在 org.apache.kafka.clients.producer.ProducerConfig類中。

常用參數(shù)說明如下:

  • bootstrap.servers: 配置生產(chǎn)者如何與broker建立連接。該參數(shù)設置的是初始化參數(shù)。如果生 產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個broker的地址,而不 是全部,當生產(chǎn)者連接上此處指定的broker之后,在通過該連接發(fā)現(xiàn)集群 中的其他節(jié)點。
  • key.serializer: 要發(fā)送信息的key數(shù)據(jù)的序列化類。kafka-clients提供了常用類型的序列化類,序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口。

在這里插入圖片描述

  • **value.serializer:**要發(fā)送消息的alue數(shù)據(jù)的序列化類。kafka-clients提供了常用類型的序列化類,序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口。

acks: 默認值:all。

acks=0: 生產(chǎn)者不等待broker對消息的確認,只要將消息放到緩沖區(qū),就認為消息 已經(jīng)發(fā)送完成。 該情形不能保證broker是否真的收到了消息,retries配置也不會生效。發(fā) 送的消息的返回的消息偏移量永遠是-1

acks=1 表示消息只需要寫到主分區(qū)即可,然后就響應客戶端,而不等待副本分區(qū)的 確認。 在該情形下,如果主分區(qū)收到消息確認之后就宕機了,而副本分區(qū)還沒來得 及同步該消息,則該消息丟失。

acks=all 首領分區(qū)會等待所有的ISR副本分區(qū)確認記錄。 該處理保證了只要有一個ISR副本分區(qū)存活,消息就不會丟失。 這是Kafka最強的可靠性保證,等效于 acks=-1

  • retries: 設置該屬性為一個大于1的值,將在消息發(fā)送失敗的時候重新發(fā)送消 息。該重試與客戶端收到異常重新發(fā)送并無二至。允許重試但是不設 置參數(shù)max.in.flight.requests.per.connection為1,存在消息亂序 的可能,因為如果兩個批次發(fā)送到同一個分區(qū),第一個失敗了重試, 第二個成功了,則第一個消息批在第二個消息批后。int類型的值,默 認:0,可選值:[0,…,2147483647
  • compression.type: 生產(chǎn)者生成數(shù)據(jù)的壓縮格式。默認是none(沒有壓縮)。允許的 值:none,gzip,snappy和lz4。壓縮是對整個消息批次來講 的。消息批的效率也影響壓縮的比例。消息批越大,壓縮效率越好。默認是none。

2.3 生產(chǎn)者代碼

package com.warybee;
import org.apache.kafka.clients.producer.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
 * @author joy
 */
public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map<String, Object> configs = new HashMap<>();
        // 設置連接Kafka的初始連接用到的服務器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 設置key的序列化類
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 設置value的序列化類
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs);
        //發(fā)送100條消息
        for (int i = 0; i < 100; i++) {
            ProducerRecord<Integer,String> producerRecord=new ProducerRecord<>
                    (       "test_topic_1",
                            0,
                            i,
                            "test topic msg "+i);
            //消息的異步確認
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if (exception==null){
                        System.out.println("消息的主題:"+recordMetadata.topic());
                        System.out.println("消息的分區(qū):"+recordMetadata.partition());
                        System.out.println("消息的偏移量:"+recordMetadata.offset());
                    }else {
                        System.out.println("發(fā)送消息異常");
                    }
                }
            });
        }
        // 關閉生產(chǎn)者
        kafkaProducer.close();
    }
}

3.消費者API(Consumer API)

3.1 常用參數(shù)介紹

KafkaConsumer 的實例化需要指定的參數(shù),Consumer的參數(shù)定義在 org.apache.kafka.clients.consumer.ConsumerConfig類中。

常用參數(shù)說明如下:

  • bootstrap.servers: 配置生產(chǎn)者如何與broker建立連接。該參數(shù)設置的是初始化參數(shù)。如果生 產(chǎn)者需要連接的是Kafka集群,則這里配置集群中幾個broker的地址,而不 是全部,當生產(chǎn)者連接上此處指定的broker之后,在通過該連接發(fā)現(xiàn)集群 中的其他節(jié)點。
  • key.deserializer: key數(shù)據(jù)的反序列化類。kafka-clients提供了常用類型的反序列化類,反序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Deserializer 接口。
  • value.deserializer: Value數(shù)據(jù)的反序列化類。kafka-clients提供了常用類型的反序列化類,反序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Deserializer 接口。
  • group.id: 消費組ID,用于指定當前消費者屬于哪個消費組。
  • auto.offset.reset: 當kafka中沒有偏移量或者當前偏移量在服務器中不存在時,kafka該如何處理?參數(shù)值如下:
  • earliest: automatically reset the offset to the earliest offset(自動重置偏移量到最早的偏移量)
  • latest: automatically reset the offset to the latest offset(自動重置偏移量為最新的)
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group(如果消費組上一個偏移量不存在,向consumer 拋出異常)
  • anything: throw exception to the consumer.(向consumer 拋出異常)
  • client.id : 消費消息的時候向服務器發(fā)送的id字符串。在ip/port基礎上 提供應用的邏輯名稱,記錄在服務端的請求日志中,用于追蹤請求的源。
  • enable.auto.commit : 如果設置為true,消費者會自動周期性地向服務器提交偏移量。

3.2 消費者與消費組概念介紹

每一個Consumer屬于一個特定的Consumer Group,消費者可以通過指定group.id,來確定其所在消費組。

group_id一般設置為應用的邏輯名稱。比如多個訂單處理程序組成一個消費組,可以設置group_id 為"order_process"。

消費組均衡地給消費者分配分區(qū),每個分區(qū)只由消費組中一個消費者消費

一個擁有四個分區(qū)的主題,包含一個消費者的消費組。此時,消費組中的消費者消費主題中的所有分區(qū)。

在這里插入圖片描述

如果在消費組中添加一個消費者2,則每個消費者分別從兩個分區(qū)接收消息。

在這里插入圖片描述

如果消費組有四個消費者,則每個消費者可以分配到一個分區(qū)

在這里插入圖片描述

如果向消費組中添加更多的消費者,超過主題分區(qū)數(shù)量,則有一部分消費者就會閑置,不會接收任 何消息

在這里插入圖片描述

向消費組添加消費者是橫向擴展消費能力的主要方式。

3.3 消費者代碼

package com.warybee;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
/**
 * @author joy
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 設置連接Kafka的初始連接用到的服務器地址
        // 如果是集群,則可以通過此初始連接發(fā)現(xiàn)集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化類
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //value反序列化類
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //創(chuàng)建消費者對象
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
        List<String> topics = new ArrayList<>();
        topics.add("test_topic_1");
        //消費者訂閱主題
        consumer.subscribe(topics);
        while (true){
            //批量拉取主題消息,每3秒拉取一次
            ConsumerRecords<Integer, String> records = consumer.poll(3000);
            //變量消息
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("主題:"+record.topic() + "\t"
                       +"分區(qū):" + record.partition() + "\t"
                        +"偏移量:" +  + record.offset() + "\t"
                        +"Key:"+ record.key() + "\t"
                        +"Value:"+ record.value());
            }
        }
    }
}

依次運行生產(chǎn)者和消費者??刂婆_可以看到消費者接收到的消息

4. 客戶端鏈接異常信息處理

如果運行代碼過程中,java客戶端連接出現(xiàn)Connection refused: no further information錯誤:

java.net.ConnectException: Connection refused: no further information
   ............................省略其他錯誤信息...................

修改 ${KAFKA_HOME}/config/server.properties

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners = PLAINTEXT://localhost:9092

將localhost修改為kafka所在服務器的IP地址即可。如果java程序和kafka在同一個服務器上,則不需要修改。

到此這篇關于Kafka在客戶端實現(xiàn)消息的發(fā)送與讀取的文章就介紹到這了,更多相關Kafka消息發(fā)送與讀取內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • java線程池ThreadPoolExecutor類使用小結

    java線程池ThreadPoolExecutor類使用小結

    這篇文章主要介紹了java線程池ThreadPoolExecutor類使用,本文主要對ThreadPoolExecutor的使用方法進行一個詳細的概述,示例代碼介紹了ThreadPoolExecutor的構造函數(shù)的相關知識,感興趣的朋友一起看看吧
    2022-03-03
  • java中transient關鍵字分析

    java中transient關鍵字分析

    這篇文章主要介紹了java中transient關鍵字分析,transient與類對象的序列化息息相關,序列化保存的是 類對象 狀態(tài),被transient關鍵字修飾的成員變量,在類的實例化對象的序列化處理過程中會被忽略,變量不會貫穿對象的序列化和反序列化,需要的朋友可以參考下
    2023-09-09
  • 深入理解注解與自定義注解的一些概念

    深入理解注解與自定義注解的一些概念

    今天給大家?guī)淼奈恼率亲⒔獾南嚓P知識,本文圍繞著注解與自定義注解的一些概念展開,文中詳細介紹了這些知識,需要的朋友可以參考下
    2021-06-06
  • 使用SpringBoot簡單實現(xiàn)無感知的刷新 Token功能

    使用SpringBoot簡單實現(xiàn)無感知的刷新 Token功能

    實現(xiàn)無感知的刷新 Token 是一種提升用戶體驗的常用技術,可以在用戶使用應用時自動更新 Token,無需用戶手動干預,這種技術在需要長時間保持用戶登錄狀態(tài)的應用中非常有用,以下是使用Spring Boot實現(xiàn)無感知刷新Token的一個場景案例和相應的示例代碼
    2024-09-09
  • Java實現(xiàn)一個簡易版的多級菜單功能

    Java實現(xiàn)一個簡易版的多級菜單功能

    這篇文章主要給大家介紹了關于Java如何實現(xiàn)一個簡易版的多級菜單功能的相關資料,文中通過實例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2022-01-01
  • Lambda表達式的使用及注意事項

    Lambda表達式的使用及注意事項

    這篇文章主要介紹了Lambda表達式的使用及注意事項,主要圍繞?Lambda表達式的省略模式?Lambda表達式和匿名內部類的區(qū)別的相關內容展開詳情,感興趣的小伙伴可以參考一下
    2022-06-06
  • JAVA實現(xiàn)的簡單萬年歷代碼

    JAVA實現(xiàn)的簡單萬年歷代碼

    這篇文章主要介紹了JAVA實現(xiàn)的簡單萬年歷代碼,涉及Java日期操作的相關技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-10-10
  • IDEA設置Tab選項卡快速的操作

    IDEA設置Tab選項卡快速的操作

    這篇文章主要介紹了IDEA設置Tab選項卡快速的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-02-02
  • springboot通過spel結合aop實現(xiàn)動態(tài)傳參的案例

    springboot通過spel結合aop實現(xiàn)動態(tài)傳參的案例

    SpEl 是Spring框架中的一個利器,Spring通過SpEl能在運行時構建復雜表達式、存取對象屬性、對象方法調用等,今天通過本文給大家介紹springboot?spel結合aop實現(xiàn)動態(tài)傳參,需要的朋友可以參考下
    2022-07-07
  • Java讀寫Windows共享文件夾的方法實例

    Java讀寫Windows共享文件夾的方法實例

    本篇文章主要介紹了Java讀寫Windows共享文件夾的方法實例,具有一定的參考價值,有興趣的同學可以了解一下。
    2016-11-11

最新評論