spring中websocket定時(shí)任務(wù)實(shí)現(xiàn)實(shí)時(shí)推送
有時(shí)候業(yè)務(wù)要求websocket連接后,服務(wù)端實(shí)時(shí)每隔一段時(shí)間就將數(shù)據(jù)推送給客戶端進(jìn)行響應(yīng),這時(shí)就需要websocket+定時(shí)任務(wù)一起來實(shí)現(xiàn)實(shí)時(shí)推送數(shù)據(jù)給客戶端了。
使用的定時(shí)任務(wù)方式為spring的TaskScheduler對(duì)象實(shí)現(xiàn)任務(wù)調(diào)度。
TaskScheduler定時(shí)任務(wù)實(shí)現(xiàn)
TaskScheduler接口提供了多種調(diào)度方法來實(shí)現(xiàn)運(yùn)行任務(wù)的執(zhí)行。
public interface TaskScheduler { //通過觸發(fā)器來決定task是否執(zhí)行 ScheduledFuture schedule(Runnable task, Trigger trigger); //在starttime的時(shí)候執(zhí)行一次 ScheduledFuture schedule(Runnable task, Date startTime); ScheduledFuture schedule(Runnable task, Instant startTime); //從starttime開始每個(gè)period時(shí)間段執(zhí)行一次task ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period); ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); //每隔period執(zhí)行一次 ScheduledFuture scheduleAtFixedRate(Runnable task, long period); ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period); //從startTime開始每隔delay長(zhǎng)時(shí)間執(zhí)行一次 ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); //每隔delay時(shí)間執(zhí)行一次 ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay); }
簡(jiǎn)單測(cè)試一下
import cn.hutool.core.date.DateUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; /** * The type Task scheduler test. * * @author yjj * @version 1.0 * @since 2022 -12-28 15:45:17 */ @Slf4j @Component @RequiredArgsConstructor public class TaskSchedulerTest { private final TaskScheduler taskScheduler; @Bean public void test() { //每隔3秒執(zhí)行一次 Trigger trigger = new CronTrigger("0/3 * * * * *"); //每隔1秒執(zhí)行一次 //Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS); taskScheduler.schedule(new MyThread(), trigger); } private class MyThread implements Runnable { @Override public void run() { log.info("定時(shí)執(zhí)行線程名稱=【{}】,執(zhí)行時(shí)間=【{}】", Thread.currentThread().getName(), DateUtil.date()); } } }
效果就是每個(gè)3秒執(zhí)行一次
websocket+定時(shí)任務(wù)實(shí)時(shí)推送
實(shí)現(xiàn)的業(yè)務(wù)需求如下:客戶端連上來以后就每隔3秒向客戶端實(shí)時(shí)推送消息。有關(guān)websocket的實(shí)現(xiàn)見文章websocket簡(jiǎn)單實(shí)現(xiàn)
TestWebsocket.java
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.map.MapUtil; import cn.hutool.core.text.CharSequenceUtil; import cn.hutool.json.JSONUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; import org.springframework.stereotype.Component; import org.springframework.web.socket.*; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; /** * 測(cè)試websocket * * @author yjj * @version 1.0 * @since 2022 -12-28 14:55:29 */ @Slf4j @Component @RequiredArgsConstructor public class TestWebsocket implements WebSocketHandler { protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>(); /** * 定時(shí)任務(wù)集合 */ Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>(); /** * taskScheduler */ private final TaskScheduler taskScheduler; /** * 建立連接后操作 * * @param session 連接session信息 * @throws Exception exception */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { sendMessage("連接成功~~~~~~,sessionId=" + session.getId()); WEB_SOCKET_SESSIONS.add(session); //設(shè)置定時(shí)任務(wù),每隔3s執(zhí)行一次 Trigger trigger = new CronTrigger("0/3 * * * * *"); //開啟一個(gè)定時(shí)任務(wù) ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger); //根據(jù)session連接id定時(shí)任務(wù)線程存到map中 stringScheduledFutureMap.put(session.getId(), schedule); } private class CustomizeTask implements Runnable { private final String sessionId; CustomizeTask(String sessionId) { this.sessionId = sessionId; } @Override public void run() { try { String message = CharSequenceUtil.format("定時(shí)執(zhí)行線程名稱=【{}】,執(zhí)行時(shí)間=【{}】", Thread.currentThread().getName(), DateUtil.date()); sendMessage(JSONUtil.toJsonStr(message), sessionId); } catch (IOException e) { e.printStackTrace(); } } } /** * 接收到消息后的處理 * * @param session 連接session信息 * @param message 信息 * @throws Exception exception */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { sendMessage("接收到的消息為=【" + message + "】,sessionId=【" + session.getId() + "】,回復(fù)消息=【你好呀!】"); } /** * ws連接出錯(cuò)時(shí)調(diào)用 * * @param session session連接信息 * @param exception exception * @throws Exception exception */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { if (session.isOpen()) { sendMessage("ws連接出錯(cuò),即將關(guān)閉此session,sessionId=【" + session.getId() + "】"); session.close(); } WEB_SOCKET_SESSIONS.remove(session); } /** * 連接關(guān)閉后調(diào)用 * * @param session session連接信息 * @param closeStatus 關(guān)閉狀態(tài) * @throws Exception exception */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { if (session.isOpen()) { sendMessage("ws連接即將關(guān)閉此session,sessionId=【" + session.getId() + "】"); session.close(); } WEB_SOCKET_SESSIONS.remove(session); String sessionId = session.getId(); ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class); if (scheduledFuture != null) { //暫停對(duì)應(yīng)session的開啟的定時(shí)任務(wù) scheduledFuture.cancel(true); //集合移除 stringScheduledFutureMap.remove(sessionId); } } /** * 是否支持分片消息 */ @Override public boolean supportsPartialMessages() { return false; } /** * 群發(fā)發(fā)送消息 * * @param message 消息 * @throws IOException ioException */ public void sendMessage(String message) throws IOException { if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) { for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) { webSocketSession.sendMessage(new TextMessage(message)); } } } /** * 發(fā)給指定連接消息 * * @param message 消息 * @throws IOException ioException */ public void sendMessage(String message, String sessionId) throws IOException { if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) { for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) { if (sessionId.equals(webSocketSession.getId())) { webSocketSession.sendMessage(new TextMessage(message)); } } } } }
websocket綁定URL
import com.yjj.test.websocket.TestWebsocket; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import javax.annotation.Resource; /** * websocket配置 * * @author yjj * @version 1.0 * @since 2022 -12-28 15:10:11 */ @EnableWebSocket @Configuration public class WebSocketConfig implements WebSocketConfigurer { @Resource private TestWebsocket testWebsocket; /** * Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired. * * @param registry */ @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*"); } }
websocket與定時(shí)任務(wù)同時(shí)存在時(shí),需要加入配置定義線程池進(jìn)行線程的管理
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; /** * 當(dāng)定時(shí)任務(wù)和websocket同時(shí)存在時(shí)報(bào)錯(cuò)解決 * * @author yjj * @version 1.0 * @since 2022 -04-28 17:35:54 */ @Configuration public class ScheduledConfig { /** * Schedule本身是單線程執(zhí)行的 * * @return the task scheduler */ @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler(); scheduling.setPoolSize(20); return scheduling; } }
效果如下
連接上以后服務(wù)每隔3秒會(huì)向客戶端實(shí)時(shí)推送消息
到此這篇關(guān)于spring中websocket定時(shí)任務(wù)實(shí)現(xiàn)實(shí)時(shí)推送的文章就介紹到這了,更多相關(guān)spring websocket實(shí)時(shí)推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot+Vue實(shí)現(xiàn)Socket通知推送的完整步驟
- Springboot集成SSE實(shí)現(xiàn)單工通信消息推送流程詳解
- SpringBoot整合WebSocket實(shí)現(xiàn)后端向前端主動(dòng)推送消息方式
- Spring?Boot?使用?SSE?方式向前端推送數(shù)據(jù)詳解
- SpringBoot+WebSocket實(shí)現(xiàn)消息推送功能
- Springboot整合企業(yè)微信機(jī)器人助手推送消息的實(shí)現(xiàn)
- SpringBoot整合WxJava開啟消息推送的實(shí)現(xiàn)
- SpringBoot2.0集成WebSocket實(shí)現(xiàn)后臺(tái)向前端推送信息
- SpringBoot+WebSocket+Netty實(shí)現(xiàn)消息推送的示例代碼
- Spring SseEmitter推送消息及常用方法
相關(guān)文章
SpringBoot接口參數(shù)的默認(rèn)值與必要性最佳實(shí)踐記錄
這篇文章主要介紹了SpringBoot接口參數(shù)的默認(rèn)值與必要性,通過合理設(shè)置接口參數(shù)的默認(rèn)值和必要性,我們可以創(chuàng)建出既健壯又靈活的?RESTful?API,需要的朋友可以參考下2024-08-08詳解SpringBoot如何優(yōu)雅的進(jìn)行全局異常處理
在SpringBoot的開發(fā)中,為了提高程序運(yùn)行的魯棒性,我們經(jīng)常需要對(duì)各種程序異常進(jìn)行處理,但是如果在每個(gè)出異常的地方進(jìn)行單獨(dú)處理的話,這會(huì)引入大量業(yè)務(wù)不相關(guān)的異常處理代碼,這篇文章帶大家了解一下如何優(yōu)雅的進(jìn)行全局異常處理2023-07-07Spring?cloud負(fù)載均衡@LoadBalanced?&?LoadBalancerClient
由于Spring?cloud2020之后移除了Ribbon,直接使用Spring?Cloud?LoadBalancer作為客戶端負(fù)載均衡組件,我們討論Spring負(fù)載均衡以Spring?Cloud2020之后版本為主,學(xué)習(xí)Spring?Cloud?LoadBalance2023-11-11使用MUI框架構(gòu)建App請(qǐng)求http接口實(shí)例代碼
下面小編就為大家分享一篇使用MUI框架構(gòu)建App請(qǐng)求http接口實(shí)例代碼,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-01-01使用Java編寫一個(gè)圖片word互轉(zhuǎn)工具
這篇文章主要介紹了使用Java編寫一個(gè)PDF?Word文件轉(zhuǎn)換工具的相關(guān)資料,需要的朋友可以參考下2023-01-01BeanUtils.copyProperties復(fù)制屬性失敗的原因及解決方案
這篇文章主要介紹了BeanUtils.copyProperties復(fù)制屬性失敗的原因及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08