欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Kafka是什么及如何使用SpringBoot對接Kafka(最新推薦)

 更新時間:2023年11月07日 10:28:53   作者:戰(zhàn)斧  
這篇文章主要介紹了Kafka是什么,以及如何使用SpringBoot對接Kafka,今天我們通過一個Demo講解了在SpringBoot中如何對接Kafka,也介紹了下關鍵類?KafkaTemplate,需要的朋友可以參考下

繼上一次教大家手把手安裝kafka后,今天我們直接來到入門實操教程,也就是使用SpringBoot該怎么對接和使用kafka。當然,在一開始我們也會比較細致的介紹一下kafka本身。那么話不多說,馬上開始今天的學習吧

一、Kafka與流處理

我們先來看看比較正式的介紹:Kafka是一種流處理平臺,由LinkedIn公司創(chuàng)建,現(xiàn)在是Apache下的開源項目。Kafka通過發(fā)布/訂閱機制實現(xiàn)消息的異步傳輸和處理。它具有高吞吐量、低延遲、可伸縮性和可靠性等優(yōu)點,使其成為了流處理和實時數(shù)據(jù)管道的首選解決方案

介紹其實是比較清晰的,如果你是第一次接觸“流處理”概念,我們也可以做一點解釋,流處理指的是對連續(xù)、實時產(chǎn)生的數(shù)據(jù)流進行實時處理、計算和分析的過程。

假設你正在玩一款在線游戲,其他玩家的動作和游戲事件會實時地傳到服務器上。這些事件就形成了一條數(shù)據(jù)流。在流處理中,我們會對這條數(shù)據(jù)流進行實時處理,例如計算每個玩家的分數(shù)、監(jiān)控游戲區(qū)域內(nèi)的異常情況、統(tǒng)計玩家在線時長等等。這樣,游戲管理員就可以實時地監(jiān)控和管理游戲,而不需要等到游戲結(jié)束才進行操作。
類似的,流處理還可以應用在其他實時性要求比較高的場景中,例如金融交易、物聯(lián)網(wǎng)、實時監(jiān)測等。通過對數(shù)據(jù)流進行實時處理,我們可以更加精準地掌握數(shù)據(jù)變化的情況,并及時做出反應和調(diào)整,

二、Spring Boot與Kafka的整合Demo

1. 新建springboot工程

如果你沒有現(xiàn)成的Spring boot項目,那么我們可以使用IDEA自帶的Spring Initializr 來創(chuàng)建一個spring-boot的項目

此時我們可以直接選擇使用Apache Kafka,另外項目還可以加個Spring Web準備讓前臺調(diào)用

2. 添加Kafka依賴

如果你不是像上述一樣新建的項目,那你也可以選擇在已有的Spring Boot應用程序中使用Kafka,那么你需要在pom.xml文件中添加以下依賴:

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.8.11</version>
</dependency>

3. 配置Kafka

在application.properties文件中添加以下配置:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group

這里我們指定了Kafka服務器的地址和端口,并配置了消費者組的ID,關于消費者組的概念,其實就是某一些消費者具備相同的功能,因此會把他們設為同一個消費者組,這樣他們就不會重復消費同一條消息了。更具體地原理,我們會在之后地篇章中介紹。

4. 創(chuàng)建Kafka生產(chǎn)者

在Kafka中,生產(chǎn)者是發(fā)送消息的應用程序或服務。在Spring Boot中,我們可以使用KafkaTemplate類來創(chuàng)建Kafka生產(chǎn)者

package com.zhanfu.kafkademo.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    public void sendMessage(String message) {
        kafkaTemplate.send("test_topic", message);
    }
}

這里我們使用@Autowired注解來自動注入KafkaTemplate,并使用send方法將消息發(fā)送到名為“test_topic”的Kafka主題中。

5. 創(chuàng)建Kafka消費者

在Kafka中,消費者是接收并處理訂閱主題消息的應用程序或服務。在Spring Boot中,我們可以使用@KafkaListener注解來創(chuàng)建Kafka消費者。

package com.zhanfu.kafkademo.listener;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaLis {
    @KafkaListener(topics = "test_topic", groupId = "test_group")
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

6. 應用程序入口

現(xiàn)在我們已經(jīng)完成了Spring Boot和Kafka的整合。我們可以啟動Spring Boot應用程序,然后發(fā)送消息并消費它,以測試我們的應用程序是否正確地與Kafka集成。

package com.zhanfu.kafkademo.controller;
import com.zhanfu.kafkademo.service.KafkaService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageController {
    @Autowired
    private KafkaService kafkaService;
    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        kafkaService.sendMessage(message);
        return "Message sent successfully";
    }
}

在這個例子中,我們使用@Autowired注解來自動注入KafkaProducer,并通過發(fā)送消息的方法來調(diào)用sendMessage方法。最終項目整體框架如圖:

三、啟動與驗證

首先自然是啟動 Kafka ,怎么啟動可參考 《上手第一關,手把手教你安裝kafka與可視化工具kafka-eagle》,然后是啟動我們的Spring Boot項目

然后在瀏覽器中輸入

http://127.0.0.1:8080/send/hello

最后檢查我們的項目日志:

可以看到,整個發(fā)送和接收的流程都走通了

四、KafkaTemplate 介紹

不難看出,在Springboot中,使用kafka的關鍵在于 KafkaTemplate, 它是 Spring 提供的 Kafka 生產(chǎn)者模版,用于向 Kafka 集群發(fā)送消息。并且把 Kafka 的生產(chǎn)者客戶端封裝成了一個 Spring Bean,提供更加方便易用的 API。

它有三個主要屬性:

  • producerFactory:生產(chǎn)者工廠類,用于創(chuàng)建 KafkaProducer 實例。
  • defaultTopic:默認主題名稱,如果在發(fā)送消息時沒有指定主題名稱,則使用該默認主題。
  • messageConverter:消息轉(zhuǎn)換器,用于將消息對象轉(zhuǎn)換為 Kafka ProducerRecord

它的主要方法:

  • send(ProducerRecord<K,V> record):向指定的 Kafka 主題發(fā)送一條消息。ProducerRecord 包含了主題名稱、分區(qū)編號、Key 和 Value 等信息。
  • send(String topic, V data):向指定的 Kafka 主題發(fā)送一條消息。
  • send(String topic, K key, V data):向指定的 Kafka 主題發(fā)送一條消息,并指定消息的 Key。
  • execute(ProducerCallback<K,V> callback):使用回調(diào)方式發(fā)送消息,可以自定義消息的創(chuàng)建過程和錯誤處理過程。
  • inTransaction():啟用事務,多個 send 方法調(diào)用將被包裝在一個事務中,保證 Kafka 事務的原子性。

除了上述方法外,KafkaTemplate 還提供了其他方法,如 sendDefault()、sendOffsetsToTransaction() 等,可以根據(jù)實際需要進行選擇和使用。

需要注意的是,在使用 KafkaTemplate 發(fā)送消息時應該注意消息的序列化方式、主題和分區(qū)的選擇以及錯誤處理等問題,以保證消息的可靠性和正確性。

當然,很多同學可能還注意到一個細節(jié),我們在上面的Demo中,我們直接將其 @Autowired進我們的代碼中,這是怎么做到的呢?換句話說,這個 KafkaTemplate 為什么自己就會被spring 容器管理的呢?其實這得益于SpringBoot中對Kafka有了很多自動配置的內(nèi)容。如下:

如上圖,相信對Spring Boot熟悉的同學看到 ConditionalOnClass 、 ConditionalOnMissingBean 應該就明白了。其實Spring Boot 早就貼心的為我們預留了這些自動配置,只要我們引入了 spring-kafka 包,使得項目中出現(xiàn)了 KafkaTemplate 類,那么它就能被自動配置并存入Spring 容器內(nèi)

總結(jié)

今天我們通過一個Demo講解了在SpringBoot中如何對接Kafka,也介紹了下關鍵類 KafkaTemplate ,得益于Spring Boot 的自動配置,開發(fā)者要做的配置內(nèi)容其實并不多,使用也主要是依賴其提供的API,相對簡單,相信大家很容易也都學會了,那么在后面的過程中,我們將繼續(xù)學習其使用,并且會著重講解 Kafka 的原理與結(jié)構(gòu)

到此這篇關于Kafka是什么,以及如何使用SpringBoot對接Kafka的文章就介紹到這了,更多相關SpringBoot對接Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java方法遞歸調(diào)用實例解析

    Java方法遞歸調(diào)用實例解析

    這篇文章主要介紹了Java方法遞歸調(diào)用實例解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-02-02
  • Java深入淺出講解String類常見方法

    Java深入淺出講解String類常見方法

    在C語言中,如果要表示字符串而且對字符串進行操作的話,依靠的是數(shù)組和指針,而Java中提供了String類用來專門表示字符串,String類中常見的方法,以及一些細節(jié)是本篇重點
    2022-04-04
  • Spring MVC 4.1.3 + MyBatis零基礎搭建Web開發(fā)框架(注解模式)

    Spring MVC 4.1.3 + MyBatis零基礎搭建Web開發(fā)框架(注解模式)

    本篇文章主要介紹了Spring MVC 4.1.3 + MyBatis零基礎搭建Web開發(fā)框架(注解模式),具有一定的參考價值,感興趣的小伙伴們可以參考一下。
    2017-03-03
  • 一篇文章帶你了解JAVA結(jié)構(gòu)化編程詳情

    一篇文章帶你了解JAVA結(jié)構(gòu)化編程詳情

    下面小編就為大家?guī)硪黄v解JAVA結(jié)構(gòu)化編程的文章。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2021-09-09
  • C++/java 繼承類的多態(tài)詳解及實例代碼

    C++/java 繼承類的多態(tài)詳解及實例代碼

    這篇文章主要介紹了C++/java 繼承類的多態(tài)詳解及實例代碼的相關資料,需要的朋友可以參考下
    2017-02-02
  • 一文詳解kafka序列化器和攔截器

    一文詳解kafka序列化器和攔截器

    這篇文章主要為大家介紹了kafka序列化器和攔截器使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-03-03
  • spring本地事務失效的原因分析

    spring本地事務失效的原因分析

    這篇文章給大家介紹了spring本地事務失效的情況原因分析,給大家羅列了五種原因分析,通過代碼示例介紹的非常詳細,具有一定的參考價值,需要的朋友可以參考下
    2023-10-10
  • SpringBoot的@GetMapping路徑匹配規(guī)則、國際化詳細教程

    SpringBoot的@GetMapping路徑匹配規(guī)則、國際化詳細教程

    這篇文章主要介紹了SpringBoot的@GetMapping路徑匹配規(guī)則、國際化,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2023-11-11
  • java如何生成登錄隨機驗證碼

    java如何生成登錄隨機驗證碼

    這篇文章主要為大家詳細介紹了java如何生成登錄隨機驗證碼,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-12-12
  • 詳解如何在Spring Boot中實現(xiàn)容錯機制

    詳解如何在Spring Boot中實現(xiàn)容錯機制

    容錯機制是構(gòu)建健壯和可靠的應用程序的重要組成部分,它可以幫助應用程序在面對異?;蚬收蠒r保持穩(wěn)定運行,Spring Boot提供了多種機制來實現(xiàn)容錯,包括異常處理、斷路器、重試和降級等,本文將介紹如何在Spring Boot中實現(xiàn)這些容錯機制,需要的朋友可以參考下
    2023-10-10

最新評論