欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot項(xiàng)目消費(fèi)Kafka數(shù)據(jù)的方法

 更新時(shí)間:2025年01月14日 14:22:31   作者:布朗克168  
本文詳細(xì)介紹了如何在Spring Boot項(xiàng)目中配置和實(shí)現(xiàn)Kafka消費(fèi)者和生產(chǎn)者,結(jié)合實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友一起看看吧

一、引入依賴(lài)

你需要在 pom.xml 中添加 spring-kafka 相關(guān)依賴(lài):

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- Spring Boot Starter for Logging (optional but useful for debugging) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <!-- Spring Boot Starter for Testing -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

二、添加Kafka配置

在 application.yml 或 application.properties 文件中配置 Kafka 連接屬性:

application.yml 示例:

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka服務(wù)器地址
    consumer:
      group-id: my-consumer-group   # 消費(fèi)者組ID
      auto-offset-reset: earliest   # 消費(fèi)者從頭開(kāi)始讀?。ㄈ绻麤](méi)有已提交的偏移量)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 設(shè)置key的反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 設(shè)置value的反序列化器為字符串
    listener:
      missing-topics-fatal: false    # 如果主題不存在,不拋出致命錯(cuò)誤

application.properties 示例:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 設(shè)置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 設(shè)置value的反序列化器為字符串

注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 設(shè)置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 設(shè)置value的反序列化器為字符串
以上配置說(shuō)明Kafka生產(chǎn)的數(shù)據(jù)是json字符串,那么消費(fèi)接收的數(shù)據(jù)默認(rèn)也是json字符串,如果接收消息想用對(duì)象接受,需要自定義序列化器,比如以下配置

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 對(duì) Key 使用 StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer  # 對(duì) Value 使用 ErrorHandlingSerializer
      properties:
        spring.json.value.default.type: com.example.Order  # 默認(rèn)的 JSON 反序列化目標(biāo)類(lèi)型為 Order

三、創(chuàng)建 Kafka 消費(fèi)者

創(chuàng)建一個(gè) Kafka 消費(fèi)者類(lèi)來(lái)處理消息。你可以使用 @KafkaListener 注解來(lái)監(jiān)聽(tīng) Kafka 中的消息

(一)Kafka生產(chǎn)的消息是JSON 字符串

1、方式一

如果消息是 JSON 字符串,你可以使用 StringDeserializer 獲取消息后,再使用 ObjectMapper 將其轉(zhuǎn)換為
Java 對(duì)象(如 Order)。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka  // 啟用 Kafka 消費(fèi)者
public class KafkaConsumer {
    private final ObjectMapper objectMapper = new ObjectMapper();
    // 監(jiān)聽(tīng) Kafka 中的 order-topic 主題
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(String message) {
        try {
            // 將 JSON 字符串反序列化為 Order 對(duì)象
            Order order = objectMapper.readValue(message, Order.class);
            System.out.println("Received order: " + order);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

說(shuō)明:

@KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics 表示監(jiān)聽(tīng)的 Kafka 主題,groupId 表示消費(fèi)者所屬的消費(fèi)者組。
listen(String message): 該方法會(huì)被調(diào)用來(lái)處理收到的每條消息。在此示例中,我們打印出消息內(nèi)容。

2、方式二:需要直接訪問(wèn)消息元數(shù)據(jù)

可以通過(guò) ConsumerRecord 來(lái)接收 Kafka 消息。這種方式適用于需要直接訪問(wèn)消息元數(shù)據(jù)(如
topic、partition、offset)的場(chǎng)景,也適合手動(dòng)管理消息消費(fèi)和偏移量提交的情況。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
    // 監(jiān)聽(tīng) Kafka 中的 order-topic 主題
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord<String, String> record) {
        // 獲取消息的詳細(xì)信息
        String key = record.key();           // 獲取消息的 key
        String value = record.value();       // 獲取消息的 value
        String topic = record.topic();       // 獲取消息的 topic
        int partition = record.partition(); // 獲取消息的分區(qū)
        long offset = record.offset();      // 獲取消息的偏移量
        long timestamp = record.timestamp(); // 獲取消息的時(shí)間戳
        // 處理消息(這里我們只是打印消息)
        System.out.println("Consumed record: ");
        System.out.println("Key: " + key);
        System.out.println("Value: " + value);
        System.out.println("Topic: " + topic);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
        System.out.println("Timestamp: " + timestamp);
    }
}

(二)Kafka生產(chǎn)的消息是對(duì)象Order

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
    // 監(jiān)聽(tīng) Kafka 中的 order-topic 主題
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord<String, Order> record) {
        // 獲取消息的詳細(xì)信息
        String key = record.key();           // 獲取消息的 key
        Order value = record.value();       // 獲取消息的 value
        String topic = record.topic();       // 獲取消息的 topic
        int partition = record.partition(); // 獲取消息的分區(qū)
        long offset = record.offset();      // 獲取消息的偏移量
        long timestamp = record.timestamp(); // 獲取消息的時(shí)間戳
        // 處理消息(這里我們只是打印消息)
        System.out.println("Consumed record: ");
        System.out.println("Key: " + key);
        System.out.println("Value: " + value);
        System.out.println("Topic: " + topic);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
        System.out.println("Timestamp: " + timestamp);
    }
}

四、創(chuàng)建 啟動(dòng)類(lèi)

確保你的 Spring Boot 啟動(dòng)類(lèi)正確配置了 Spring Boot 應(yīng)用程序啟動(dòng)。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}

五、配置 Kafka 生產(chǎn)者(可選)

(一)消息類(lèi)型為json串

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
@EnableKafka
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;  // 發(fā)送的是 String 類(lèi)型消息
    private ObjectMapper objectMapper = new ObjectMapper();  // Jackson ObjectMapper 用于序列化
    // 發(fā)送訂單到 Kafka
    public void sendOrder(String topic, Order order) {
        try {
            // 將 Order 對(duì)象轉(zhuǎn)換為 JSON 字符串
            String orderJson = objectMapper.writeValueAsString(order);
            // 發(fā)送 JSON 字符串到 Kafka
            kafkaTemplate.send(topic, orderJson);  // 發(fā)送字符串消息
            System.out.println("Order JSON sent to Kafka: " + orderJson);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

(二)消息類(lèi)型為對(duì)象Order

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    // 發(fā)送訂單到 Kafka
    public void sendOrder(String topic, Order order) {
        kafkaTemplate.send(topic, order);  // 發(fā)送訂單對(duì)象,Spring Kafka 會(huì)自動(dòng)將 Order 轉(zhuǎn)換為 JSON
    }
}

六、啟動(dòng) Kafka 服務(wù)

啟動(dòng) Kafka 服務(wù)

bin/kafka-server-start.sh config/server.properties

七、測(cè)試 Kafka 消費(fèi)者

你可以通過(guò)向 Kafka 發(fā)送消息來(lái)測(cè)試消費(fèi)者是否工作正常。假設(shè)你已經(jīng)在 Kafka 中創(chuàng)建了一個(gè)名為 my-topic 的主題,可以使用 KafkaProducer 來(lái)發(fā)送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
    @Autowired
    private KafkaProducer kafkaProducer;
    @GetMapping("/sendOrder")
    public String sendOrder() {
        Order order = new Order();
        order.setOrderId(1L);
        order.setUserId(123L);
        order.setProduct("Laptop");
        order.setQuantity(2);
        order.setStatus("Created");
        kafkaProducer.sendOrder("order-topic", order);
        return "Order sent!";
    }
}

當(dāng)你訪問(wèn) /sendOrder端點(diǎn)時(shí),KafkaProducer 會(huì)將消息發(fā)送到 Kafka,KafkaConsumer 會(huì)接收到這條消息并打印出來(lái)。

九、測(cè)試和調(diào)試

你可以通過(guò)查看 Kafka 消費(fèi)者日志,確保消息已經(jīng)被成功消費(fèi)。你還可以使用 KafkaTemplate 發(fā)送消息,并確保 Kafka 生產(chǎn)者和消費(fèi)者之間的連接正常。

十、 結(jié)語(yǔ)

至此,你已經(jīng)在 Spring Boot 中成功配置并實(shí)現(xiàn)了 Kafka 消費(fèi)者和生產(chǎn)者。你可以根據(jù)需要擴(kuò)展功能,例如處理更復(fù)雜的消息類(lèi)型、批量消費(fèi)等。

到此這篇關(guān)于Springboot項(xiàng)目如何消費(fèi)Kafka數(shù)據(jù)的文章就介紹到這了,更多相關(guān)Springboot消費(fèi)Kafka數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java中Static關(guān)鍵字的五種用法詳解

    Java中Static關(guān)鍵字的五種用法詳解

    這篇文章主要介紹了Java中static的五種用法:修飾成員變量,修飾成員方法,修飾內(nèi)部類(lèi),靜態(tài)代碼塊,靜態(tài)導(dǎo)包,想詳細(xì)了解的小伙伴可以參考閱讀本文
    2023-03-03
  • mybatis-flex與springBoot整合的實(shí)現(xiàn)示例

    mybatis-flex與springBoot整合的實(shí)現(xiàn)示例

    Mybatis-flex提供了簡(jiǎn)單易用的API,開(kāi)發(fā)者只需要簡(jiǎn)單的配置即可使用,本文主要介紹了mybatis-flex與springBoot整合,具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-01-01
  • 談?wù)凷pring Boot 數(shù)據(jù)源加載及其多數(shù)據(jù)源簡(jiǎn)單實(shí)現(xiàn)(小結(jié))

    談?wù)凷pring Boot 數(shù)據(jù)源加載及其多數(shù)據(jù)源簡(jiǎn)單實(shí)現(xiàn)(小結(jié))

    這篇文章主要介紹了談?wù)凷pring Boot 數(shù)據(jù)源加載及其多數(shù)據(jù)源簡(jiǎn)單實(shí)現(xiàn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2019-04-04
  • Mybatis步驟分解實(shí)現(xiàn)一個(gè)增刪改查程序

    Mybatis步驟分解實(shí)現(xiàn)一個(gè)增刪改查程序

    MybatisPlus是國(guó)產(chǎn)的第三方插件, 它封裝了許多常用的CURDapi,免去了我們寫(xiě)mapper.xml的重復(fù)勞動(dòng)。本文將整合MybatisPlus實(shí)現(xiàn)增刪改查功能,感興趣的可以了解一下
    2022-05-05
  • 一文詳解Java中枚舉類(lèi)的使用

    一文詳解Java中枚舉類(lèi)的使用

    這篇文章主要介紹了深入淺出講解Java中的枚舉類(lèi),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,感興趣的朋友可以了解下
    2022-11-11
  • Java虛擬機(jī)執(zhí)行引擎知識(shí)總結(jié)

    Java虛擬機(jī)執(zhí)行引擎知識(shí)總結(jié)

    這篇文章主要介紹了有關(guān)Java虛擬機(jī)執(zhí)行引擎的知識(shí),文中實(shí)例簡(jiǎn)單易懂,方便大家更好的學(xué)習(xí),有興趣的朋友可以了解下
    2020-06-06
  • 簡(jiǎn)單了解Spring中BeanFactory與FactoryBean的區(qū)別

    簡(jiǎn)單了解Spring中BeanFactory與FactoryBean的區(qū)別

    這篇文章主要介紹了簡(jiǎn)單了解Spring中BeanFactory與FactoryBean的區(qū)別,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-12-12
  • javaSystem.out.println()輸出byte[]、char[]異常的問(wèn)題詳析

    javaSystem.out.println()輸出byte[]、char[]異常的問(wèn)題詳析

    這篇文章主要給大家介紹了關(guān)于javaSystem.out.println()輸出byte[]、char[]異常問(wèn)題的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看啊
    2019-01-01
  • 解決打開(kāi)的idea項(xiàng)目maven不生效問(wèn)題

    解決打開(kāi)的idea項(xiàng)目maven不生效問(wèn)題

    這篇文章主要給大家介紹了關(guān)于如何解決打開(kāi)的idea項(xiàng)目maven不生效問(wèn)題,最近在配置maven時(shí),發(fā)現(xiàn)無(wú)論配置幾遍,IDEA中的maven配置總會(huì)還原成默認(rèn)的,所以這里給大家分享下解決辦法,需要的朋友可以參考下
    2023-07-07
  • Java后端接入微信小程序登錄功能(登錄流程)

    Java后端接入微信小程序登錄功能(登錄流程)

    這篇文章主要介紹了Java后端接入微信小程序登錄功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-06-06

最新評(píng)論