欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot項目消費Kafka數(shù)據(jù)的方法

 更新時間:2025年01月14日 14:22:31   作者:布朗克168  
本文詳細介紹了如何在Spring Boot項目中配置和實現(xiàn)Kafka消費者和生產(chǎn)者,結(jié)合實例代碼給大家介紹的非常詳細,感興趣的朋友一起看看吧

一、引入依賴

你需要在 pom.xml 中添加 spring-kafka 相關(guān)依賴:

<dependencies>
    <!-- Spring Boot Web -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!-- Spring Boot Starter for Logging (optional but useful for debugging) -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <!-- Spring Boot Starter for Testing -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

二、添加Kafka配置

在 application.yml 或 application.properties 文件中配置 Kafka 連接屬性:

application.yml 示例:

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka服務器地址
    consumer:
      group-id: my-consumer-group   # 消費者組ID
      auto-offset-reset: earliest   # 消費者從頭開始讀取(如果沒有已提交的偏移量)
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 設置key的反序列化器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 設置value的反序列化器為字符串
    listener:
      missing-topics-fatal: false    # 如果主題不存在,不拋出致命錯誤

application.properties 示例:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 設置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  # 設置value的反序列化器為字符串

注意:spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 設置key的反序列化器
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer # 設置value的反序列化器為字符串
以上配置說明Kafka生產(chǎn)的數(shù)據(jù)是json字符串,那么消費接收的數(shù)據(jù)默認也是json字符串,如果接收消息想用對象接受,需要自定義序列化器,比如以下配置

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 對 Key 使用 StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.ErrorHandlingSerializer  # 對 Value 使用 ErrorHandlingSerializer
      properties:
        spring.json.value.default.type: com.example.Order  # 默認的 JSON 反序列化目標類型為 Order

三、創(chuàng)建 Kafka 消費者

創(chuàng)建一個 Kafka 消費者類來處理消息。你可以使用 @KafkaListener 注解來監(jiān)聽 Kafka 中的消息

(一)Kafka生產(chǎn)的消息是JSON 字符串

1、方式一

如果消息是 JSON 字符串,你可以使用 StringDeserializer 獲取消息后,再使用 ObjectMapper 將其轉(zhuǎn)換為
Java 對象(如 Order)。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka  // 啟用 Kafka 消費者
public class KafkaConsumer {
    private final ObjectMapper objectMapper = new ObjectMapper();
    // 監(jiān)聽 Kafka 中的 order-topic 主題
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(String message) {
        try {
            // 將 JSON 字符串反序列化為 Order 對象
            Order order = objectMapper.readValue(message, Order.class);
            System.out.println("Received order: " + order);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

說明:

@KafkaListener(topics = “my-topic”, groupId = “my-consumer-group”):
topics 表示監(jiān)聽的 Kafka 主題,groupId 表示消費者所屬的消費者組。
listen(String message): 該方法會被調(diào)用來處理收到的每條消息。在此示例中,我們打印出消息內(nèi)容。

2、方式二:需要直接訪問消息元數(shù)據(jù)

可以通過 ConsumerRecord 來接收 Kafka 消息。這種方式適用于需要直接訪問消息元數(shù)據(jù)(如
topic、partition、offset)的場景,也適合手動管理消息消費和偏移量提交的情況。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
    // 監(jiān)聽 Kafka 中的 order-topic 主題
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord<String, String> record) {
        // 獲取消息的詳細信息
        String key = record.key();           // 獲取消息的 key
        String value = record.value();       // 獲取消息的 value
        String topic = record.topic();       // 獲取消息的 topic
        int partition = record.partition(); // 獲取消息的分區(qū)
        long offset = record.offset();      // 獲取消息的偏移量
        long timestamp = record.timestamp(); // 獲取消息的時間戳
        // 處理消息(這里我們只是打印消息)
        System.out.println("Consumed record: ");
        System.out.println("Key: " + key);
        System.out.println("Value: " + value);
        System.out.println("Topic: " + topic);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
        System.out.println("Timestamp: " + timestamp);
    }
}

(二)Kafka生產(chǎn)的消息是對象Order

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
    // 監(jiān)聽 Kafka 中的 order-topic 主題
    @KafkaListener(topics = "order-topic", groupId = "order-consumer-group")
    public void consumeOrder(ConsumerRecord<String, Order> record) {
        // 獲取消息的詳細信息
        String key = record.key();           // 獲取消息的 key
        Order value = record.value();       // 獲取消息的 value
        String topic = record.topic();       // 獲取消息的 topic
        int partition = record.partition(); // 獲取消息的分區(qū)
        long offset = record.offset();      // 獲取消息的偏移量
        long timestamp = record.timestamp(); // 獲取消息的時間戳
        // 處理消息(這里我們只是打印消息)
        System.out.println("Consumed record: ");
        System.out.println("Key: " + key);
        System.out.println("Value: " + value);
        System.out.println("Topic: " + topic);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
        System.out.println("Timestamp: " + timestamp);
    }
}

四、創(chuàng)建 啟動類

確保你的 Spring Boot 啟動類正確配置了 Spring Boot 應用程序啟動。

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}

五、配置 Kafka 生產(chǎn)者(可選)

(一)消息類型為json串

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.databind.ObjectMapper;
@Service
@EnableKafka
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;  // 發(fā)送的是 String 類型消息
    private ObjectMapper objectMapper = new ObjectMapper();  // Jackson ObjectMapper 用于序列化
    // 發(fā)送訂單到 Kafka
    public void sendOrder(String topic, Order order) {
        try {
            // 將 Order 對象轉(zhuǎn)換為 JSON 字符串
            String orderJson = objectMapper.writeValueAsString(order);
            // 發(fā)送 JSON 字符串到 Kafka
            kafkaTemplate.send(topic, orderJson);  // 發(fā)送字符串消息
            System.out.println("Order JSON sent to Kafka: " + orderJson);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

(二)消息類型為對象Order

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    // 發(fā)送訂單到 Kafka
    public void sendOrder(String topic, Order order) {
        kafkaTemplate.send(topic, order);  // 發(fā)送訂單對象,Spring Kafka 會自動將 Order 轉(zhuǎn)換為 JSON
    }
}

六、啟動 Kafka 服務

啟動 Kafka 服務

bin/kafka-server-start.sh config/server.properties

七、測試 Kafka 消費者

你可以通過向 Kafka 發(fā)送消息來測試消費者是否工作正常。假設你已經(jīng)在 Kafka 中創(chuàng)建了一個名為 my-topic 的主題,可以使用 KafkaProducer 來發(fā)送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
    @Autowired
    private KafkaProducer kafkaProducer;
    @GetMapping("/sendOrder")
    public String sendOrder() {
        Order order = new Order();
        order.setOrderId(1L);
        order.setUserId(123L);
        order.setProduct("Laptop");
        order.setQuantity(2);
        order.setStatus("Created");
        kafkaProducer.sendOrder("order-topic", order);
        return "Order sent!";
    }
}

當你訪問 /sendOrder端點時,KafkaProducer 會將消息發(fā)送到 Kafka,KafkaConsumer 會接收到這條消息并打印出來。

九、測試和調(diào)試

你可以通過查看 Kafka 消費者日志,確保消息已經(jīng)被成功消費。你還可以使用 KafkaTemplate 發(fā)送消息,并確保 Kafka 生產(chǎn)者和消費者之間的連接正常。

十、 結(jié)語

至此,你已經(jīng)在 Spring Boot 中成功配置并實現(xiàn)了 Kafka 消費者和生產(chǎn)者。你可以根據(jù)需要擴展功能,例如處理更復雜的消息類型、批量消費等。

到此這篇關(guān)于Springboot項目如何消費Kafka數(shù)據(jù)的文章就介紹到這了,更多相關(guān)Springboot消費Kafka數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 如何計算Java對象占用了多少空間?

    如何計算Java對象占用了多少空間?

    在Java中沒有sizeof運算符,所以沒辦法知道一個對象到底占用了多大的空間,但是在分配對象的時候會有一些基本的規(guī)則,我們根據(jù)這些規(guī)則大致能判斷出來對象大小,需要的朋友可以參考下
    2016-01-01
  • Java編程細節(jié)重構(gòu)之為什么if-else不是好代碼詳析

    Java編程細節(jié)重構(gòu)之為什么if-else不是好代碼詳析

    這篇文章主要給大家介紹了關(guān)于Java編程細節(jié)重構(gòu)之為什么if-else不是好代碼的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學些學習吧
    2018-09-09
  • SpringBoot中加密模塊的使用

    SpringBoot中加密模塊的使用

    本文主要介紹了SpringBoot中加密模塊的使用,包括對稱加密、非對稱加密和哈希加密等,同時還會提供相應的代碼示例,感興趣的朋友可以參考一下
    2023-05-05
  • Spring?Boot?內(nèi)置工具類ReflectionUtils的實現(xiàn)

    Spring?Boot?內(nèi)置工具類ReflectionUtils的實現(xiàn)

    ReflectionUtils是一個反射工具類,它封裝了Java反射的操作,使得我們能夠更輕松地操作和訪問類的方法、字段,本文主要介紹了Spring?Boot?內(nèi)置工具類ReflectionUtils的實現(xiàn),感興趣的可以了解一下
    2023-11-11
  • Java使用Cookie實現(xiàn)認證跳轉(zhuǎn)功能

    Java使用Cookie實現(xiàn)認證跳轉(zhuǎn)功能

    在?Web?開發(fā)中,用戶身份認證是一個基礎而關(guān)鍵的功能點,本文將通過一個簡單的前后端示例系統(tǒng),介紹如何基于?Cookie?實現(xiàn)?Token?保存與自動跳轉(zhuǎn)認證的功能,并結(jié)合?Cookie?與?Header?的區(qū)別、使用場景、安全性等維度做全面分析,需要的朋友可以參考下
    2025-06-06
  • Java實現(xiàn)操作excel表格

    Java實現(xiàn)操作excel表格

    在日常工作中,對Excel工作表格的操作處理可是多的數(shù)不清楚,下面是java語言對其的操作,有需要的小伙伴可以參考下
    2015-10-10
  • mybatis的坑-integer類型為0的數(shù)據(jù)if?test失效問題

    mybatis的坑-integer類型為0的數(shù)據(jù)if?test失效問題

    這篇文章主要介紹了mybatis的坑-integer類型為0的數(shù)據(jù)if?test失效問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • 使用SpringBoot-JPA進行自定義保存及批量保存功能

    使用SpringBoot-JPA進行自定義保存及批量保存功能

    這篇文章主要介紹了使用SpringBoot-JPA進行自定義的保存及批量保存功能,本文通過實例代碼給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-06-06
  • SpringBoot應用快速部署到K8S的詳細教程

    SpringBoot應用快速部署到K8S的詳細教程

    這篇文章主要介紹了SpringBoot應用快速部署到K8S的詳細教程,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-12-12
  • Java中的ThreadLocal線程變量詳解

    Java中的ThreadLocal線程變量詳解

    這篇文章主要介紹了Java中的ThreadLocal線程變量詳解,ThreadLocal叫做線程變量,意思是在ThreadLocal中填充的變量屬于當前線程,該變量對其他線程而言是隔離的,它是用來提供線程內(nèi)部的局部變量,需要的朋友可以參考下
    2024-01-01

最新評論