欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka?在?java?中的基本使用步驟

 更新時間:2025年09月05日 10:55:38   作者:張小虎在學(xué)習(xí)  
本文給大家介紹Kafka在java中的基本使用,本文分步驟結(jié)合實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧

1. 使用 kafka 原生客戶端

現(xiàn)在基本都直接使用 springboot 版本,但了解原生客戶端,能更好的理解 springboot 版的 kafka 客戶端原理。

步驟1:pom 引入核心依賴:

引入依賴時,盡量選擇和 kafka 版本對應(yīng)的依賴版本。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>4.0.0</version>
</dependency>

步驟2:提供者客戶端代碼:

提供者客戶端要做三件事:

  1. 設(shè)置提供者客戶端屬性(可選屬性都被定義在 ProducerConfig 類中)
  2. 設(shè)置要發(fā)送的消息
  3. 發(fā)送(有三種發(fā)送方式,下面代碼中都有)
public class MyProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 第一步:設(shè)置提供者屬性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            // 第二步:設(shè)置要發(fā)送的消息
            ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "testKey", "testValue");
            // 第三部:發(fā)送消息
            // send(producer, record);
            // sendSync(producer, record);
            sendASync(producer, record);
        }
    }
    /**
     * 發(fā)送方式1:單向推送,不關(guān)心服務(wù)器的應(yīng)答
     */
    private static void send(Producer<String, String> producer, ProducerRecord<String, String> record) {
        producer.send(record);
    }
    /**
     * 發(fā)送方式2:同步推送,得到服務(wù)器的應(yīng)答前會阻塞當前線程
     */
    private static void sendSync(Producer<String, String> producer, ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
        RecordMetadata metadata = producer.send(record).get();
        System.out.println(metadata.topic());
        System.out.println(metadata.partition());
        System.out.println(metadata.offset());
    }
    /**
     * 發(fā)送方式3:異步推送,不需等待服務(wù)器應(yīng)答,當服務(wù)器有應(yīng)答后會觸發(fā)函數(shù)回調(diào)
     */
    private static void sendASync(Producer<String, String> producer, ProducerRecord<String, String> record) {
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                throw new RuntimeException("向 kafka 推送失敗", exception);
            }
            System.out.println(metadata.topic());
            System.out.println(metadata.partition());
            System.out.println(metadata.offset());
        });
    }
}

步驟3:消費者客戶端代碼:

消費者客戶端要做三件事:

  1. 設(shè)置消費者客戶端屬性(可選屬性都被定義在 ConsumerConfig 類中)
  2. 設(shè)置消費者訂閱的主題
  3. 拉取消息
  4. 提交 offset(有兩種提交方式,下面代碼中都有)
public class MyConsumer {
    public static void main(String[] args) {
        // 第一步:設(shè)置消費者屬性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 第二步:設(shè)置要訂閱的主題
            consumer.subscribe(Collections.singletonList("testTopic"));
            while (true) {
                // 第三步:拉取消息,100 代表最大等待時間,如果時間到了還沒有拉取到消息就不阻塞了繼續(xù)往后執(zhí)行
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }
                // 第四步:提交 offset
                // consumer.commitSync(); // 同步提交,表示必須等到 offset 提交完畢,再去消費下?批數(shù)據(jù)
                consumer.commitSync(); // 異步提交,表示發(fā)送完提交 offset 請求后,就開始消費下?批數(shù)據(jù)了。不?等到Broker的確認。
            }
        }
    }
}

2. Kafka 集成 springboot

springboot 版本是最常用的,比原生客戶端使用方便。但是道理是一樣的,底層也是原生客戶端。

pom 引入核心依賴:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.0</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

yaml 配置文件:

這一步無非就是把原生客戶端中的屬性配置,寫在 yaml 中

spring:
  kafka:
    bootstrap-servers: 192.168.2.28:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: testGroup
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

提供者客戶端代碼:

只需要兩步:

  1. 注入 KafkaTemplate
  2. 發(fā)送
@RestController
public class ProducerController {
    /**
     * kafka
     */
    private KafkaTemplate<String, Object> kafkaTemplate;
    @Autowired
    public void setKafkaTemplate(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    @GetMapping("/test")
    public void send() {
        // 發(fā)送 kafka 消息
        kafkaTemplate.send("testTopic", "testKey", "testValue");
    }
}

消費者客戶端代碼:

只需要監(jiān)聽主題就可以

@RestController
public class ConsumerController {
    // 監(jiān)聽 kafka 消息
    @KafkaListener(topics = {"testTopic"})
    public void test(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}

到此這篇關(guān)于Kafka 在 java 中的基本使用的文章就介紹到這了,更多相關(guān)Kafka java使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 淺談java中的路徑表示

    淺談java中的路徑表示

    下面小編就為大家?guī)硪黄獪\談java中的路徑表示。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-04-04
  • SpringBoot中基于AOP和Semaphore實現(xiàn)API限流

    SpringBoot中基于AOP和Semaphore實現(xiàn)API限流

    調(diào)用速率限制是 Web API 中的常見要求,旨在防止濫用并確保公平使用資源,借助Spring Boot 中的 AOP,我們可以通過攔截方法調(diào)用并限制在特定時間范圍內(nèi)允許的請求數(shù)量來實現(xiàn)速率限制,需要的朋友可以參考下
    2024-10-10
  • ElasticSearch突然采集不到日志問題解決分析

    ElasticSearch突然采集不到日志問題解決分析

    這篇文章主要為大家介紹了ElasticSearch突然采集不到日志問題解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-04-04
  • Java如何獲取發(fā)送請求的電腦的IP地址

    Java如何獲取發(fā)送請求的電腦的IP地址

    文章介紹了如何通過HttpServletRequest獲取客戶端IP地址,特別是當客戶端通過代理訪問時,如何使用x-forwarded-for頭來獲取真實的IP地址
    2024-11-11
  • java數(shù)據(jù)結(jié)構(gòu)和算法之馬踏棋盤算法

    java數(shù)據(jù)結(jié)構(gòu)和算法之馬踏棋盤算法

    這篇文章主要為大家詳細介紹了java數(shù)據(jù)結(jié)構(gòu)和算法之馬踏棋盤算法,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-02-02
  • spring?java?動態(tài)獲取consul?K/V的方法

    spring?java?動態(tài)獲取consul?K/V的方法

    這篇文章主要介紹了spring?java?動態(tài)獲取consul?K/V的相關(guān)資料,主要包括springConsul配置kv路徑以及自動注入consulKV到服務(wù)中,本文給大家介紹的非常詳細,需要的朋友可以參考下
    2023-10-10
  • Java二維數(shù)組與稀疏數(shù)組相互轉(zhuǎn)換實現(xiàn)詳解

    Java二維數(shù)組與稀疏數(shù)組相互轉(zhuǎn)換實現(xiàn)詳解

    在某些應(yīng)用場景中需要大量的二維數(shù)組來進行數(shù)據(jù)存儲,但是二維數(shù)組中卻有著大量的無用的位置占據(jù)著內(nèi)存空間,稀疏數(shù)組就是為了優(yōu)化二維數(shù)組,節(jié)省內(nèi)存空間
    2022-09-09
  • Spring Boot中slf4j日志依賴關(guān)系示例詳解

    Spring Boot中slf4j日志依賴關(guān)系示例詳解

    在項目開發(fā)中,記錄日志是必做的一件事情。而當我們使用Springboot框架時,記錄日志就變得極其簡單了。下面這篇文章主要給大家介紹了關(guān)于Spring Boot中slf4j日志依賴關(guān)系的相關(guān)資料,需要的朋友可以參考下
    2018-11-11
  • Mybatis中resultMap標簽和sql標簽的設(shè)置方式

    Mybatis中resultMap標簽和sql標簽的設(shè)置方式

    這篇文章主要介紹了Mybatis中resultMap標簽和sql標簽的設(shè)置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • springboot 使用zookeeper實現(xiàn)分布式隊列的基本步驟

    springboot 使用zookeeper實現(xiàn)分布式隊列的基本步驟

    這篇文章主要介紹了springboot 使用zookeeper實現(xiàn)分布式隊列,通過ZooKeeper的協(xié)調(diào)和同步機制,多個應(yīng)用程序可以共享一個隊列,并按照先進先出的順序處理隊列中的消息,需要的朋友可以參考下
    2023-08-08

最新評論