欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot使用Kafka來優(yōu)化接口請(qǐng)求的并發(fā)方式

 更新時(shí)間:2024年07月30日 14:58:57   作者:培根芝士  
這篇文章主要介紹了SpringBoot使用Kafka來優(yōu)化接口請(qǐng)求的并發(fā)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

在Spring Boot中使用 Kafka 來優(yōu)化接口請(qǐng)求的并發(fā),主要是通過將耗時(shí)的任務(wù)異步化到Kafka消息隊(duì)列中來實(shí)現(xiàn)。這樣,接口可以立即響應(yīng)客戶端,而不需要等待耗時(shí)任務(wù)完成。

在Spring Boot應(yīng)用程序中調(diào)用Kafka通常涉及使用Spring Kafka庫,它提供了與Apache Kafka的高級(jí)集成,使得從Spring Boot應(yīng)用程序中發(fā)送和接收消息變得更加簡單和直觀。

安裝Apache Kafka

編寫docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

使用docker compose啟動(dòng)容器

docker-compose up -d

添加依賴

首先,需要在pom.xml中添加Spring Kafka的依賴。

<!-- Spring Kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置Kafka

在application.properties文件中配置Kafka的屬性。

# application.properties  
spring.kafka.bootstrap-servers=localhost:9092  
spring.kafka.consumer.group-id=myGroup  
spring.kafka.consumer.auto-offset-reset=earliest  
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer  
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer  
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer  
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

Kafka生產(chǎn)者服務(wù)

創(chuàng)建一個(gè)服務(wù)類來發(fā)送消息到Kafka。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String value) {
        // 異步發(fā)送消息
        kafkaTemplate.send(topic, key, value).addCallback(success -> {
            System.out.println("Message sent successfully: " + value);
        }, failure -> {
            System.err.println("Failed to send message: " + value);
        });
    }
}

kafkaTemplate.send 方法是 Spring Kafka 提供的一個(gè)非常靈活的方法,它允許以不同的方式發(fā)送消息到 Kafka 集群。

當(dāng)調(diào)用 kafkaTemplate.send 方法時(shí),可以指定要發(fā)送到的 topic、key 和 value,但 key 是可選的。

  • 未指定 key:當(dāng)不指定 key 時(shí),Kafka 會(huì)根據(jù)配置的分區(qū)器(默認(rèn)是 DefaultPartitioner)來決定消息應(yīng)該被發(fā)送到哪個(gè)分區(qū)。在沒有 key 的情況下,分區(qū)器可能會(huì)采用輪詢(round-robin)或其他算法來隨機(jī)選擇一個(gè)分區(qū)進(jìn)行消息發(fā)送。這種方式下,消息的分布可能會(huì)比較均勻,但無法控制具有相同邏輯標(biāo)識(shí)的消息被發(fā)送到同一個(gè)分區(qū)。
  • 指定 key:當(dāng)指定 key 時(shí),Kafka 會(huì)根據(jù) key 的哈希值來計(jì)算分區(qū)號(hào),確保具有相同 key 的消息被發(fā)送到同一個(gè)分區(qū)。這種方式有助于保持消息的順序性,因?yàn)?Kafka 保證同一個(gè)分區(qū)內(nèi)的消息是有序的。

Kafka消費(fèi)者服務(wù)

創(chuàng)建一個(gè)監(jiān)聽器來接收Kafka中的消息。

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumerService {

    @KafkaListener(topics = "your-topic-name", groupId = "myGroup")
    public void listen(String message) {
        // 處理消息(可能是耗時(shí)的操作)
        System.out.println("Received message in group 'myGroup': " + message);
        // 處理耗時(shí)操作
        ...
    }
}

在使用Kafka消費(fèi)者時(shí),Kafka本身已經(jīng)設(shè)計(jì)為支持并發(fā)消費(fèi),即可以通過配置多個(gè)消費(fèi)者實(shí)例(partitions的數(shù)量通常決定了并行度的一個(gè)上限,因?yàn)镵afka會(huì)盡量將不同的partitions分配給不同的消費(fèi)者以提高并行度)來實(shí)現(xiàn)并行處理。

但是,如果想要在消費(fèi)者內(nèi)部進(jìn)一步提高處理消息的并發(fā)度,可以結(jié)合使用Kafka消費(fèi)者和Java的線程池來實(shí)現(xiàn)。

@Service  
public class KafkaConsumerService {  
  
    @KafkaListener(topics = "your-topic-name", groupId = "myGroup")  
    public void listen(String message) {  
        // 將消息發(fā)送到線程池處理  
        executorService.submit(() -> processMessage(message));  
    }  
  
    private void processMessage(String message) {  
        // 處理消息的邏輯  
        System.out.println("Processing message: " + message);  
        // 模擬耗時(shí)操作  
        try {  
            Thread.sleep(1000);  
        } catch (InterruptedException e) {  
            Thread.currentThread().interrupt();  
        }  
    }  
  
    // 線程池配置  
    private ExecutorService executorService = Executors.newFixedThreadPool(30);  
  
    // 確保優(yōu)雅關(guān)閉線程池  
    @PreDestroy  
    public void shutdown() {  
        executorService.shutdown();  
        try {  
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {  
                executorService.shutdownNow();  
            }  
        } catch (InterruptedException ex) {  
            executorService.shutdownNow();  
            Thread.currentThread().interrupt();  
        }  
    }  
}

控制器

在控制器中調(diào)用Kafka生產(chǎn)者服務(wù)來發(fā)送消息,并立即響應(yīng)客戶端。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
  
@RestController
public class MyController {

    @Autowired  
    private KafkaProducerService kafkaProducerService;

    @PostMapping("/send")
    public String sendMessage(@RequestParam String message) {
        // 發(fā)送消息到Kafka,并立即返回響應(yīng)
        kafkaProducerService.sendMessage("your-topic-name", "key1", message);
        return "Message sent to Kafka";
    }
}

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 利用Springboot實(shí)現(xiàn)Jwt認(rèn)證的示例代碼

    利用Springboot實(shí)現(xiàn)Jwt認(rèn)證的示例代碼

    這篇文章主要介紹了利用Springboot實(shí)現(xiàn)Jwt認(rèn)證的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12
  • java如何將int數(shù)組轉(zhuǎn)化為Integer數(shù)組

    java如何將int數(shù)組轉(zhuǎn)化為Integer數(shù)組

    這篇文章主要介紹了java如何將int數(shù)組轉(zhuǎn)化為Integer數(shù)組,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-11-11
  • IDEA2020.1常用配置說明

    IDEA2020.1常用配置說明

    這篇文章主要介紹了IDEA2020.1常用配置說明,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-06-06
  • 淺談Java中Collections.sort對(duì)List排序的兩種方法

    淺談Java中Collections.sort對(duì)List排序的兩種方法

    本文介紹了Java中Collections.sort對(duì)List排序的兩種方法以及Comparable 與Comparator區(qū)別,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • Mybatis自定義攔截器和插件開發(fā)詳解

    Mybatis自定義攔截器和插件開發(fā)詳解

    這篇文章主要給大家介紹了關(guān)于Mybatis自定義攔截器和插件開發(fā)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-04-04
  • JAVA提高第九篇 集合體系

    JAVA提高第九篇 集合體系

    這篇文章主要為大家詳細(xì)介紹了JAVA提高第九篇集合體系的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • 詳解Spring MVC/Boot 統(tǒng)一異常處理最佳實(shí)踐

    詳解Spring MVC/Boot 統(tǒng)一異常處理最佳實(shí)踐

    在 Web 開發(fā)中, 我們經(jīng)常會(huì)需要處理各種異常,這篇文章主要介紹了詳解Spring MVC/Boot 統(tǒng)一異常處理最佳實(shí)踐,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2019-01-01
  • 一文秒懂java到底是值傳遞還是引用傳遞

    一文秒懂java到底是值傳遞還是引用傳遞

    這篇文章主要介紹了java到底是值傳遞還是引用傳遞的相關(guān)知識(shí),本文通過幾個(gè)例子給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-06-06
  • SpringBoot項(xiàng)目發(fā)送釘釘消息功能實(shí)現(xiàn)

    SpringBoot項(xiàng)目發(fā)送釘釘消息功能實(shí)現(xiàn)

    在工作中的一些告警需要發(fā)送釘釘通知,有的是發(fā)給個(gè)人,有的要發(fā)到群里,這時(shí)項(xiàng)目就需要接入釘釘,實(shí)現(xiàn)發(fā)消息的功能,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2024-02-02
  • Java線程池流程編排運(yùn)用實(shí)戰(zhàn)源碼

    Java線程池流程編排運(yùn)用實(shí)戰(zhàn)源碼

    這篇文章主要介紹了Java線程池流程編排運(yùn)用實(shí)戰(zhàn)源碼,就在流程引擎的基礎(chǔ)上運(yùn)用?ThreadPoolExecutor,使用線程池實(shí)現(xiàn)?SpringBean?的異步執(zhí)行
    2022-03-03

最新評(píng)論