SpringBoot使用Kafka來優(yōu)化接口請求的并發(fā)方式
在Spring Boot中使用 Kafka 來優(yōu)化接口請求的并發(fā),主要是通過將耗時的任務(wù)異步化到Kafka消息隊列中來實現(xiàn)。這樣,接口可以立即響應(yīng)客戶端,而不需要等待耗時任務(wù)完成。
在Spring Boot應(yīng)用程序中調(diào)用Kafka通常涉及使用Spring Kafka庫,它提供了與Apache Kafka的高級集成,使得從Spring Boot應(yīng)用程序中發(fā)送和接收消息變得更加簡單和直觀。
安裝Apache Kafka
編寫docker-compose.yml
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock使用docker compose啟動容器
docker-compose up -d
添加依賴
首先,需要在pom.xml中添加Spring Kafka的依賴。
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>配置Kafka
在application.properties文件中配置Kafka的屬性。
# application.properties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
Kafka生產(chǎn)者服務(wù)
創(chuàng)建一個服務(wù)類來發(fā)送消息到Kafka。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String value) {
// 異步發(fā)送消息
kafkaTemplate.send(topic, key, value).addCallback(success -> {
System.out.println("Message sent successfully: " + value);
}, failure -> {
System.err.println("Failed to send message: " + value);
});
}
}kafkaTemplate.send 方法是 Spring Kafka 提供的一個非常靈活的方法,它允許以不同的方式發(fā)送消息到 Kafka 集群。
當(dāng)調(diào)用 kafkaTemplate.send 方法時,可以指定要發(fā)送到的 topic、key 和 value,但 key 是可選的。
- 未指定 key:當(dāng)不指定 key 時,Kafka 會根據(jù)配置的分區(qū)器(默認(rèn)是 DefaultPartitioner)來決定消息應(yīng)該被發(fā)送到哪個分區(qū)。在沒有 key 的情況下,分區(qū)器可能會采用輪詢(round-robin)或其他算法來隨機選擇一個分區(qū)進行消息發(fā)送。這種方式下,消息的分布可能會比較均勻,但無法控制具有相同邏輯標(biāo)識的消息被發(fā)送到同一個分區(qū)。
- 指定 key:當(dāng)指定 key 時,Kafka 會根據(jù) key 的哈希值來計算分區(qū)號,確保具有相同 key 的消息被發(fā)送到同一個分區(qū)。這種方式有助于保持消息的順序性,因為 Kafka 保證同一個分區(qū)內(nèi)的消息是有序的。
Kafka消費者服務(wù)
創(chuàng)建一個監(jiān)聽器來接收Kafka中的消息。
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "your-topic-name", groupId = "myGroup")
public void listen(String message) {
// 處理消息(可能是耗時的操作)
System.out.println("Received message in group 'myGroup': " + message);
// 處理耗時操作
...
}
}在使用Kafka消費者時,Kafka本身已經(jīng)設(shè)計為支持并發(fā)消費,即可以通過配置多個消費者實例(partitions的數(shù)量通常決定了并行度的一個上限,因為Kafka會盡量將不同的partitions分配給不同的消費者以提高并行度)來實現(xiàn)并行處理。
但是,如果想要在消費者內(nèi)部進一步提高處理消息的并發(fā)度,可以結(jié)合使用Kafka消費者和Java的線程池來實現(xiàn)。
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "your-topic-name", groupId = "myGroup")
public void listen(String message) {
// 將消息發(fā)送到線程池處理
executorService.submit(() -> processMessage(message));
}
private void processMessage(String message) {
// 處理消息的邏輯
System.out.println("Processing message: " + message);
// 模擬耗時操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 線程池配置
private ExecutorService executorService = Executors.newFixedThreadPool(30);
// 確保優(yōu)雅關(guān)閉線程池
@PreDestroy
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException ex) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}控制器
在控制器中調(diào)用Kafka生產(chǎn)者服務(wù)來發(fā)送消息,并立即響應(yīng)客戶端。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MyController {
@Autowired
private KafkaProducerService kafkaProducerService;
@PostMapping("/send")
public String sendMessage(@RequestParam String message) {
// 發(fā)送消息到Kafka,并立即返回響應(yīng)
kafkaProducerService.sendMessage("your-topic-name", "key1", message);
return "Message sent to Kafka";
}
}總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
利用Springboot實現(xiàn)Jwt認(rèn)證的示例代碼
這篇文章主要介紹了利用Springboot實現(xiàn)Jwt認(rèn)證的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12
java如何將int數(shù)組轉(zhuǎn)化為Integer數(shù)組
這篇文章主要介紹了java如何將int數(shù)組轉(zhuǎn)化為Integer數(shù)組,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-11-11
淺談Java中Collections.sort對List排序的兩種方法
本文介紹了Java中Collections.sort對List排序的兩種方法以及Comparable 與Comparator區(qū)別,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12
詳解Spring MVC/Boot 統(tǒng)一異常處理最佳實踐
在 Web 開發(fā)中, 我們經(jīng)常會需要處理各種異常,這篇文章主要介紹了詳解Spring MVC/Boot 統(tǒng)一異常處理最佳實踐,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-01-01
SpringBoot項目發(fā)送釘釘消息功能實現(xiàn)
在工作中的一些告警需要發(fā)送釘釘通知,有的是發(fā)給個人,有的要發(fā)到群里,這時項目就需要接入釘釘,實現(xiàn)發(fā)消息的功能,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-02-02

