Spring?Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示
為什么使用Kafka?
不使用Rabbitmq或者Rocketmq是因?yàn)镵afka是Hadoop集群下的組成部分,對(duì)于大數(shù)據(jù)的相關(guān)開發(fā)適應(yīng)性好,且當(dāng)前業(yè)務(wù)場(chǎng)景下不需要使用死信隊(duì)列,不過要注意Kafka對(duì)于更新時(shí)間慢的數(shù)據(jù)拉取也較慢,因此對(duì)與實(shí)時(shí)性要求高可以選擇其他MQ。
使用消息隊(duì)列是因?yàn)樵撝虚g件具有實(shí)時(shí)性,且可以作為廣播進(jìn)行消息分發(fā)。
為什么使用SSE?
使用Websocket傳輸信息的時(shí)候,會(huì)轉(zhuǎn)成二進(jìn)制數(shù)據(jù),產(chǎn)生一定的時(shí)間損耗,SSE直接傳輸文本,不存在這個(gè)問題
由于Websocket是雙向的,讀取日志的時(shí)候,如果有人連接ws日志,會(huì)發(fā)送大量異常信息,會(huì)給使用段和日志段造成問題;SSE是單向的,不需要考慮這個(gè)問題,提高了安全性
另外就是SSE支持?jǐn)嗑€重連;Websocket協(xié)議本身并沒有提供心跳機(jī)制,所以長時(shí)間沒有數(shù)據(jù)發(fā)送時(shí),會(huì)將這個(gè)連接斷掉,因此需要手寫心跳機(jī)制進(jìn)行實(shí)現(xiàn)。
此外,由于是長連接的一個(gè)實(shí)現(xiàn)方式,所以SSE也可以替代Websocket實(shí)現(xiàn)掃碼登陸(比如通過SSE的超時(shí)組件在實(shí)現(xiàn)二維碼的超時(shí)功能,具體實(shí)現(xiàn)我可以整理一下)
另外,如果是普通項(xiàng)目,不需要過高的實(shí)時(shí)性,則不需要使用Websocket,使用SSE即可
代碼實(shí)現(xiàn)
pom.xml引入SSE和Kafka
<!-- SSE,一般springboot開發(fā)web應(yīng)用的都有 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka,最主要的是第一個(gè),剩下兩個(gè)是測(cè)試用的 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>application.properties增加Kafka配置信息
# KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
配置Kafka信息
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}配置controller,通過web方式開啟效果
@RestController
@RequestMapping(path = "sse")
public class KafkaSSEController {
private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
/**
* @param message
* @apiNote 發(fā)送信息到Kafka主題中
*/
@PostMapping("/send")
public void sendMessage(@RequestBody String message) {
kafkaTemplate.send("my-topic", message);
}
/**
* 監(jiān)聽Kafka數(shù)據(jù)
*
* @param message
*/
@KafkaListener(topics = "my-topic", groupId = "my-group-id")
public void consume(String message) {
System.out.println("Received message: " + message);
}
/**
* 連接sse服務(wù)
*
* @param id
* @return
* @throws IOException
*/
@GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter push(@RequestParam("id") String id) throws IOException {
// 超時(shí)時(shí)間設(shè)置為5分鐘,用于演示客戶端自動(dòng)重連
SseEmitter sseEmitter = new SseEmitter(5_60_000L);
// 設(shè)置前端的重試時(shí)間為1s
// send(): 發(fā)送數(shù)據(jù),如果傳入的是一個(gè)非SseEventBuilder對(duì)象,那么傳遞參數(shù)會(huì)被封裝到 data 中
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("連接成功"));
sseCache.put(id, sseEmitter);
System.out.println("add " + id);
sseEmitter.send("你好", MediaType.APPLICATION_JSON);
SseEmitter.SseEventBuilder data = SseEmitter.event().name("finish").id("6666").data("哈哈");
sseEmitter.send(data);
// onTimeout(): 超時(shí)回調(diào)觸發(fā)
sseEmitter.onTimeout(() -> {
System.out.println(id + "超時(shí)");
sseCache.remove(id);
});
// onCompletion(): 結(jié)束之后的回調(diào)觸發(fā)
sseEmitter.onCompletion(() -> System.out.println("完成?。?!"));
return sseEmitter;
}
/**
* http://127.0.0.1:8080/sse/push?id=7777&content=%E4%BD%A0%E5%93%88aaaaaa
* @param id
* @param content
* @return
* @throws IOException
*/
@ResponseBody
@GetMapping(path = "push")
public String push(String id, String content) throws IOException {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
sseEmitter.send(content);
}
return "over";
}
@ResponseBody
@GetMapping(path = "over")
public String over(String id) {
SseEmitter sseEmitter = sseCache.get(id);
if (sseEmitter != null) {
// complete(): 表示執(zhí)行完畢,會(huì)斷開連接
sseEmitter.complete();
sseCache.remove(id);
}
return "over";
}
}前端方式
<html>
<head>
<script>
console.log('start')
const clientId = "your_client_id_x"; // 設(shè)置客戶端ID
const eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 訂閱服務(wù)器端的SSE
eventSource.onmessage = event => {
console.log(event.data)
const message = JSON.parse(event.data);
console.log(`Received message from server: ${message}`);
};
// 發(fā)送消息給服務(wù)器端 可通過 postman 調(diào)用,所以下面 sendMessage() 調(diào)用被注釋掉了
function sendMessage() {
const message = "hello sse";
fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(message)
});
console.log('dddd'+JSON.stringify(message))
}
// sendMessage()
</script>
</head>
</html>到此這篇關(guān)于Spring Boot整合Kafka+SSE實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)展示的文章就介紹到這了,更多相關(guān)SpringBoot實(shí)時(shí)數(shù)據(jù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatis-Plus自定義SQL和復(fù)雜查詢的實(shí)現(xiàn)
MyBatis-Plus增強(qiáng)了MyBatis的功能,提供注解和XML兩種自定義SQL方式,支持復(fù)雜查詢?nèi)缍啾黻P(guān)聯(lián)、動(dòng)態(tài)分頁等,通過注解如@Select、@Insert、@Update、@Delete實(shí)現(xiàn)CRUD操作,本文就來介紹一下,感興趣的可以了解一下2024-10-10
SpringBoot動(dòng)態(tài)修改yml配置文件的方法詳解
這篇文章主要為大家詳細(xì)介紹了SpringBoot動(dòng)態(tài)修改yml配置文件的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-03-03
Java數(shù)據(jù)結(jié)構(gòu)之KMP算法的實(shí)現(xiàn)
這篇文章主要為大家詳細(xì)介紹了Java數(shù)據(jù)結(jié)構(gòu)中KMP算法的原理與實(shí)現(xiàn),文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)Java有一定的幫助,需要的可以參考一下2022-11-11
SpringBoot自動(dòng)配置特點(diǎn)與原理詳細(xì)分析
這篇文章主要介紹了SpringBoot自動(dòng)配置原理分析,SpringBoot是我們經(jīng)常使用的框架,那么你能不能針對(duì)SpringBoot實(shí)現(xiàn)自動(dòng)配置做一個(gè)詳細(xì)的介紹。如果可以的話,能不能畫一下實(shí)現(xiàn)自動(dòng)配置的流程圖。牽扯到哪些關(guān)鍵類,以及哪些關(guān)鍵點(diǎn)2022-08-08

