Spring?Boot整合Kafka+SSE實現(xiàn)實時數(shù)據(jù)展示
為什么使用Kafka?
不使用Rabbitmq或者Rocketmq是因為Kafka是Hadoop集群下的組成部分,對于大數(shù)據(jù)的相關(guān)開發(fā)適應(yīng)性好,且當前業(yè)務(wù)場景下不需要使用死信隊列,不過要注意Kafka對于更新時間慢的數(shù)據(jù)拉取也較慢,因此對與實時性要求高可以選擇其他MQ。
使用消息隊列是因為該中間件具有實時性,且可以作為廣播進行消息分發(fā)。
為什么使用SSE?
使用Websocket傳輸信息的時候,會轉(zhuǎn)成二進制數(shù)據(jù),產(chǎn)生一定的時間損耗,SSE直接傳輸文本,不存在這個問題
由于Websocket是雙向的,讀取日志的時候,如果有人連接ws日志,會發(fā)送大量異常信息,會給使用段和日志段造成問題;SSE是單向的,不需要考慮這個問題,提高了安全性
另外就是SSE支持斷線重連;Websocket協(xié)議本身并沒有提供心跳機制,所以長時間沒有數(shù)據(jù)發(fā)送時,會將這個連接斷掉,因此需要手寫心跳機制進行實現(xiàn)。
此外,由于是長連接的一個實現(xiàn)方式,所以SSE也可以替代Websocket實現(xiàn)掃碼登陸(比如通過SSE的超時組件在實現(xiàn)二維碼的超時功能,具體實現(xiàn)我可以整理一下)
另外,如果是普通項目,不需要過高的實時性,則不需要使用Websocket,使用SSE即可
代碼實現(xiàn)
pom.xml引入SSE和Kafka
<!-- SSE,一般springboot開發(fā)web應(yīng)用的都有 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafka,最主要的是第一個,剩下兩個是測試用的 --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency>
application.properties增加Kafka配置信息
# KafkaProperties spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=community-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
配置Kafka信息
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
配置controller,通過web方式開啟效果
@RestController @RequestMapping(path = "sse") public class KafkaSSEController { private static final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>(); @Resource private KafkaTemplate<String, String> kafkaTemplate; /** * @param message * @apiNote 發(fā)送信息到Kafka主題中 */ @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } /** * 監(jiān)聽Kafka數(shù)據(jù) * * @param message */ @KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); } /** * 連接sse服務(wù) * * @param id * @return * @throws IOException */ @GetMapping(path = "subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) public SseEmitter push(@RequestParam("id") String id) throws IOException { // 超時時間設(shè)置為5分鐘,用于演示客戶端自動重連 SseEmitter sseEmitter = new SseEmitter(5_60_000L); // 設(shè)置前端的重試時間為1s // send(): 發(fā)送數(shù)據(jù),如果傳入的是一個非SseEventBuilder對象,那么傳遞參數(shù)會被封裝到 data 中 sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("連接成功")); sseCache.put(id, sseEmitter); System.out.println("add " + id); sseEmitter.send("你好", MediaType.APPLICATION_JSON); SseEmitter.SseEventBuilder data = SseEmitter.event().name("finish").id("6666").data("哈哈"); sseEmitter.send(data); // onTimeout(): 超時回調(diào)觸發(fā) sseEmitter.onTimeout(() -> { System.out.println(id + "超時"); sseCache.remove(id); }); // onCompletion(): 結(jié)束之后的回調(diào)觸發(fā) sseEmitter.onCompletion(() -> System.out.println("完成?。?!")); return sseEmitter; } /** * http://127.0.0.1:8080/sse/push?id=7777&content=%E4%BD%A0%E5%93%88aaaaaa * @param id * @param content * @return * @throws IOException */ @ResponseBody @GetMapping(path = "push") public String push(String id, String content) throws IOException { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { sseEmitter.send(content); } return "over"; } @ResponseBody @GetMapping(path = "over") public String over(String id) { SseEmitter sseEmitter = sseCache.get(id); if (sseEmitter != null) { // complete(): 表示執(zhí)行完畢,會斷開連接 sseEmitter.complete(); sseCache.remove(id); } return "over"; } }
前端方式
<html> <head> <script> console.log('start') const clientId = "your_client_id_x"; // 設(shè)置客戶端ID const eventSource = new EventSource(`http://localhost:9999/v1/sse/subscribe/${clientId}`); // 訂閱服務(wù)器端的SSE eventSource.onmessage = event => { console.log(event.data) const message = JSON.parse(event.data); console.log(`Received message from server: ${message}`); }; // 發(fā)送消息給服務(wù)器端 可通過 postman 調(diào)用,所以下面 sendMessage() 調(diào)用被注釋掉了 function sendMessage() { const message = "hello sse"; fetch(`http://localhost:9999/v1/sse/publish/${clientId}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(message) }); console.log('dddd'+JSON.stringify(message)) } // sendMessage() </script> </head> </html>
到此這篇關(guān)于Spring Boot整合Kafka+SSE實現(xiàn)實時數(shù)據(jù)展示的文章就介紹到這了,更多相關(guān)SpringBoot實時數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MyBatis-Plus自定義SQL和復雜查詢的實現(xiàn)
MyBatis-Plus增強了MyBatis的功能,提供注解和XML兩種自定義SQL方式,支持復雜查詢?nèi)缍啾黻P(guān)聯(lián)、動態(tài)分頁等,通過注解如@Select、@Insert、@Update、@Delete實現(xiàn)CRUD操作,本文就來介紹一下,感興趣的可以了解一下2024-10-10SpringBoot動態(tài)修改yml配置文件的方法詳解
這篇文章主要為大家詳細介紹了SpringBoot動態(tài)修改yml配置文件的方法,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-03-03Java數(shù)據(jù)結(jié)構(gòu)之KMP算法的實現(xiàn)
這篇文章主要為大家詳細介紹了Java數(shù)據(jù)結(jié)構(gòu)中KMP算法的原理與實現(xiàn),文中的示例代碼講解詳細,對我們學習Java有一定的幫助,需要的可以參考一下2022-11-11