springboot整合SSE的項目實踐
SSE簡介
SSE(Server Sent Event),是一種可以主動從服務端推送消息的技術(shù)。SSE的本質(zhì)其實就是一個HTTP的長連接,只不過它給客戶端發(fā)送的不是一次性的數(shù)據(jù)包,而是一個stream流,格式為text/event-stream。所以客戶端不會關(guān)閉連接,會一直等著服務器發(fā)過來的新的數(shù)據(jù)流。
SSE服務端代碼
springboot中封裝了sse代碼,不需要額外的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>public class SseEmitterServer {
private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitterServer.class);
/**
* 當前連接數(shù)
*/
private static AtomicInteger count = new AtomicInteger(0);
private static Map<Integer, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
public static SseEmitter connect(Integer userId){
// 設(shè)置超時日期,0表示不過期
SseEmitter sseEmitter = new SseEmitter(0L);
// 注冊回調(diào)
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId,sseEmitter);
count.getAndIncrement();
LOGGER.info("創(chuàng)建新SSE連接,連接用戶編號:{}",userId);
LOGGER.info("現(xiàn)有連接用戶:"+sseEmitterMap.keySet());
return sseEmitter;
}
/**
* 給指定用戶發(fā)信息
*/
public static void sendMessage(Integer userId,String message){
if (!sseEmitterMap.containsKey(userId)) {
connect(userId);
}
try {
sseEmitterMap.get(userId).send(message);
LOGGER.info("給" + userId + "號發(fā)送消息:" + message);
} catch (IOException e) {
LOGGER.error("userId:{},發(fā)送信息出錯:{}", userId, e.getMessage());
e.printStackTrace();
}
}
/**
* 群發(fā)消息
*/
public static void batchSendMessage(String message){
if (sseEmitterMap != null&&!sseEmitterMap.isEmpty()) {
sseEmitterMap.forEach((k,v)->{
try {
v.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
LOGGER.error("userId:{},發(fā)送信息出錯:{}",k,e.getMessage());
e.printStackTrace();
}
});
}
}
public static void batchSendMessage(Set<Integer> userIds,String message){
userIds.forEach(userId->sendMessage(userId,message));
}
/**
* 移出用戶
*/
public static void removeUser(Integer userId){
sseEmitterMap.remove(userId);
count.getAndDecrement();
LOGGER.info("remove user id:{}",userId);
LOGGER.info("remain user id:"+sseEmitterMap.keySet());
}
public static List<Integer> getIds(){
return new ArrayList<>(sseEmitterMap.keySet());
}
public static int getUserCount(){
return count.intValue();
}
private static Runnable completionCallBack(Integer userId){
return ()->{
LOGGER.info("結(jié)束連接,{}",userId);
removeUser(userId);
};
}
private static Runnable timeoutCallBack(Integer userId){
return ()->{
LOGGER.info("連接超時,{}",userId);
removeUser(userId);
};
}
private static Consumer<Throwable> errorCallBack(Integer userId){
return throwable -> {
LOGGER.error("連接異常,{}",userId);
removeUser(userId);
};
}
}@RestController
@CrossOrigin(maxAge = 3600)
public class SseController {
@RequestMapping(value = "/sse/connect/{id}",method = RequestMethod.GET)
public SseEmitter connect(@PathVariable Integer id){
SseEmitter sseEmitter = SseEmitterServer.connect(id);
return sseEmitter;
}
/**
* 向指定用戶發(fā)送消息
*/
@RequestMapping(value = "/sse/send/{id}", method = RequestMethod.GET)
public EiInfo sendMsg(@PathVariable Integer id,@RequestParam("message") String message) {
EiInfo eiInfo = new EiInfo();
SseEmitterServer.sendMessage(id,message);
eiInfo.sysSetMsg("向"+id+"號用戶發(fā)送信息,"+message+",消息發(fā)送成功");
return eiInfo;
}
/**
* 向所有用戶發(fā)送消息
*/
@RequestMapping(value = "/sse/send/all", method = RequestMethod.GET)
public EiInfo sendMsg2AllUser(@RequestParam("message") String message) {
EiInfo eiInfo = new EiInfo();
SseEmitterServer.batchSendMessage(message);
eiInfo.sysSetMsg("向所有用戶發(fā)送信息,"+message+",消息發(fā)送成功");
return eiInfo;
}
/**
* 關(guān)閉用戶連接
*/
@RequestMapping(value = "/sse/close/{id}", method = RequestMethod.GET)
public EiInfo closeSse(@PathVariable Integer id) {
EiInfo eiInfo = new EiInfo();
SseEmitterServer.removeUser(id);
eiInfo.sysSetMsg("關(guān)閉"+id+"號連接。當前連接用戶有:"+SseEmitterServer.getIds());
return eiInfo;
}
}SSE測試
連接服務端
創(chuàng)建三個用戶,分別連接服務端



給指定用戶發(fā)送消息
使用接口測試工具,只給1號用戶發(fā)送消息

瀏覽器中,1號用戶接收到消息,2號3號未接收到消息


群發(fā)消息




關(guān)閉連接

當SSE客戶端連接后,如果長時間不斷開,它會保持連接狀態(tài)。SSE(Server-Sent Events)是一種基于HTTP的推送技術(shù),允許服務器實時向客戶端發(fā)送數(shù)據(jù)。當客戶端連接到服務器的SSE端點時,它會創(chuàng)建一個EventSource對象,該對象將與服務器進行長期連接,并接收服務器發(fā)送的事件。只有在客戶端手動關(guān)閉連接或連接發(fā)生錯誤時,才會斷開SSE連接。
請注意,由于SSE是基于HTTP的技術(shù),因此它可能受到瀏覽器或服務器的超時設(shè)置的影響。如果在長時間沒有收到服務器的數(shù)據(jù)時,連接可能會斷開。如果您希望在長時間不斷開連接的情況下保持SSE連接,請確保服務器發(fā)送數(shù)據(jù)以保持連接活躍,或者調(diào)整相關(guān)超時設(shè)置。
在使用SSE的客戶端連接中,如果長時間不斷開連接,可能會出現(xiàn)以下情況:
- 連接超時。如果服務器端沒有發(fā)送新的事件數(shù)據(jù),而客戶端也沒有重新建立連接,可能會超過服務器的連接超時時間。這可能導致服務器關(guān)閉連接,客戶端需要重新建立連接才能接收新的事件數(shù)據(jù)。
- 網(wǎng)絡異常。長時間不斷開連接可能會導致網(wǎng)絡異常,例如連接中斷、丟包等問題。在這種情況下,客戶端可能需要重新建立連接,以恢復和服務器的通信。為了避免長時間不斷開連接的問題,建議在合適的時機關(guān)閉連接。
到此這篇關(guān)于springboot整合SSE的項目實踐的文章就介紹到這了,更多相關(guān)springboot整合SSE內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決BufferedReader.readLine()遇見的坑
這篇文章主要介紹了解決BufferedReader.readLine()遇見的坑,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
Spring activiti如何實現(xiàn)指定任務處理者
這篇文章主要介紹了Spring activiti如何實現(xiàn)指定任務處理者,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-11-11
Spring boot如何通過@Scheduled實現(xiàn)定時任務及多線程配置
這篇文章主要介紹了Spring boot如何通過@Scheduled實現(xiàn)定時任務及多線程配置,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-12-12
SpringBoot+MyBatis實現(xiàn)MD5加密數(shù)據(jù)庫用戶密碼的方法
MD5技術(shù)主要用于對用戶密碼加密,增加賬戶的安全性,他具有不可逆的特性,不會被輕易解密,這篇文章給大家介紹SpringBoot+MyBatis實現(xiàn)MD5加密數(shù)據(jù)庫用戶密碼的方法,感興趣的朋友跟隨小編一起看看吧2024-03-03

