spring中websocket定時(shí)任務(wù)實(shí)現(xiàn)實(shí)時(shí)推送
有時(shí)候業(yè)務(wù)要求websocket連接后,服務(wù)端實(shí)時(shí)每隔一段時(shí)間就將數(shù)據(jù)推送給客戶端進(jìn)行響應(yīng),這時(shí)就需要websocket+定時(shí)任務(wù)一起來實(shí)現(xiàn)實(shí)時(shí)推送數(shù)據(jù)給客戶端了。
使用的定時(shí)任務(wù)方式為spring的TaskScheduler對(duì)象實(shí)現(xiàn)任務(wù)調(diào)度。
TaskScheduler定時(shí)任務(wù)實(shí)現(xiàn)
TaskScheduler接口提供了多種調(diào)度方法來實(shí)現(xiàn)運(yùn)行任務(wù)的執(zhí)行。
public interface TaskScheduler {
//通過觸發(fā)器來決定task是否執(zhí)行
ScheduledFuture schedule(Runnable task, Trigger trigger);
//在starttime的時(shí)候執(zhí)行一次
ScheduledFuture schedule(Runnable task, Date startTime);
ScheduledFuture schedule(Runnable task, Instant startTime);
//從starttime開始每個(gè)period時(shí)間段執(zhí)行一次task
ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period);
//每隔period執(zhí)行一次
ScheduledFuture scheduleAtFixedRate(Runnable task, long period);
ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);
//從startTime開始每隔delay長(zhǎng)時(shí)間執(zhí)行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
//每隔delay時(shí)間執(zhí)行一次
ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay);
}
簡(jiǎn)單測(cè)試一下
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
/**
* The type Task scheduler test.
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 15:45:17
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskSchedulerTest {
private final TaskScheduler taskScheduler;
@Bean
public void test() {
//每隔3秒執(zhí)行一次
Trigger trigger = new CronTrigger("0/3 * * * * *");
//每隔1秒執(zhí)行一次
//Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS);
taskScheduler.schedule(new MyThread(), trigger);
}
private class MyThread implements Runnable {
@Override
public void run() {
log.info("定時(shí)執(zhí)行線程名稱=【{}】,執(zhí)行時(shí)間=【{}】", Thread.currentThread().getName(), DateUtil.date());
}
}
}
效果就是每個(gè)3秒執(zhí)行一次

websocket+定時(shí)任務(wù)實(shí)時(shí)推送
實(shí)現(xiàn)的業(yè)務(wù)需求如下:客戶端連上來以后就每隔3秒向客戶端實(shí)時(shí)推送消息。有關(guān)websocket的實(shí)現(xiàn)見文章websocket簡(jiǎn)單實(shí)現(xiàn)
TestWebsocket.java
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
/**
* 測(cè)試websocket
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 14:55:29
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TestWebsocket implements WebSocketHandler {
protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>();
/**
* 定時(shí)任務(wù)集合
*/
Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>();
/**
* taskScheduler
*/
private final TaskScheduler taskScheduler;
/**
* 建立連接后操作
*
* @param session 連接session信息
* @throws Exception exception
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sendMessage("連接成功~~~~~~,sessionId=" + session.getId());
WEB_SOCKET_SESSIONS.add(session);
//設(shè)置定時(shí)任務(wù),每隔3s執(zhí)行一次
Trigger trigger = new CronTrigger("0/3 * * * * *");
//開啟一個(gè)定時(shí)任務(wù)
ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger);
//根據(jù)session連接id定時(shí)任務(wù)線程存到map中
stringScheduledFutureMap.put(session.getId(), schedule);
}
private class CustomizeTask implements Runnable {
private final String sessionId;
CustomizeTask(String sessionId) {
this.sessionId = sessionId;
}
@Override
public void run() {
try {
String message = CharSequenceUtil.format("定時(shí)執(zhí)行線程名稱=【{}】,執(zhí)行時(shí)間=【{}】", Thread.currentThread().getName(), DateUtil.date());
sendMessage(JSONUtil.toJsonStr(message), sessionId);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 接收到消息后的處理
*
* @param session 連接session信息
* @param message 信息
* @throws Exception exception
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
sendMessage("接收到的消息為=【" + message + "】,sessionId=【" + session.getId() + "】,回復(fù)消息=【你好呀!】");
}
/**
* ws連接出錯(cuò)時(shí)調(diào)用
*
* @param session session連接信息
* @param exception exception
* @throws Exception exception
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
if (session.isOpen()) {
sendMessage("ws連接出錯(cuò),即將關(guān)閉此session,sessionId=【" + session.getId() + "】");
session.close();
}
WEB_SOCKET_SESSIONS.remove(session);
}
/**
* 連接關(guān)閉后調(diào)用
*
* @param session session連接信息
* @param closeStatus 關(guān)閉狀態(tài)
* @throws Exception exception
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
if (session.isOpen()) {
sendMessage("ws連接即將關(guān)閉此session,sessionId=【" + session.getId() + "】");
session.close();
}
WEB_SOCKET_SESSIONS.remove(session);
String sessionId = session.getId();
ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class);
if (scheduledFuture != null) {
//暫停對(duì)應(yīng)session的開啟的定時(shí)任務(wù)
scheduledFuture.cancel(true);
//集合移除
stringScheduledFutureMap.remove(sessionId);
}
}
/**
* 是否支持分片消息
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 群發(fā)發(fā)送消息
*
* @param message 消息
* @throws IOException ioException
*/
public void sendMessage(String message) throws IOException {
if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
webSocketSession.sendMessage(new TextMessage(message));
}
}
}
/**
* 發(fā)給指定連接消息
*
* @param message 消息
* @throws IOException ioException
*/
public void sendMessage(String message, String sessionId) throws IOException {
if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
if (sessionId.equals(webSocketSession.getId())) {
webSocketSession.sendMessage(new TextMessage(message));
}
}
}
}
}
websocket綁定URL
import com.yjj.test.websocket.TestWebsocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import javax.annotation.Resource;
/**
* websocket配置
*
* @author yjj
* @version 1.0
* @since 2022 -12-28 15:10:11
*/
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Resource
private TestWebsocket testWebsocket;
/**
* Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
*
* @param registry
*/
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*");
}
}
websocket與定時(shí)任務(wù)同時(shí)存在時(shí),需要加入配置定義線程池進(jìn)行線程的管理
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* 當(dāng)定時(shí)任務(wù)和websocket同時(shí)存在時(shí)報(bào)錯(cuò)解決
*
* @author yjj
* @version 1.0
* @since 2022 -04-28 17:35:54
*/
@Configuration
public class ScheduledConfig {
/**
* Schedule本身是單線程執(zhí)行的
*
* @return the task scheduler
*/
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
scheduling.setPoolSize(20);
return scheduling;
}
}
效果如下
連接上以后服務(wù)每隔3秒會(huì)向客戶端實(shí)時(shí)推送消息

到此這篇關(guān)于spring中websocket定時(shí)任務(wù)實(shí)現(xiàn)實(shí)時(shí)推送的文章就介紹到這了,更多相關(guān)spring websocket實(shí)時(shí)推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Spring?Boot+Vue實(shí)現(xiàn)Socket通知推送的完整步驟
- Springboot集成SSE實(shí)現(xiàn)單工通信消息推送流程詳解
- SpringBoot整合WebSocket實(shí)現(xiàn)后端向前端主動(dòng)推送消息方式
- Spring?Boot?使用?SSE?方式向前端推送數(shù)據(jù)詳解
- SpringBoot+WebSocket實(shí)現(xiàn)消息推送功能
- Springboot整合企業(yè)微信機(jī)器人助手推送消息的實(shí)現(xiàn)
- SpringBoot整合WxJava開啟消息推送的實(shí)現(xiàn)
- SpringBoot2.0集成WebSocket實(shí)現(xiàn)后臺(tái)向前端推送信息
- SpringBoot+WebSocket+Netty實(shí)現(xiàn)消息推送的示例代碼
- Spring SseEmitter推送消息及常用方法
相關(guān)文章
SpringBoot接口參數(shù)的默認(rèn)值與必要性最佳實(shí)踐記錄
這篇文章主要介紹了SpringBoot接口參數(shù)的默認(rèn)值與必要性,通過合理設(shè)置接口參數(shù)的默認(rèn)值和必要性,我們可以創(chuàng)建出既健壯又靈活的?RESTful?API,需要的朋友可以參考下2024-08-08
詳解SpringBoot如何優(yōu)雅的進(jìn)行全局異常處理
在SpringBoot的開發(fā)中,為了提高程序運(yùn)行的魯棒性,我們經(jīng)常需要對(duì)各種程序異常進(jìn)行處理,但是如果在每個(gè)出異常的地方進(jìn)行單獨(dú)處理的話,這會(huì)引入大量業(yè)務(wù)不相關(guān)的異常處理代碼,這篇文章帶大家了解一下如何優(yōu)雅的進(jìn)行全局異常處理2023-07-07
Spring?cloud負(fù)載均衡@LoadBalanced?&?LoadBalancerClient
由于Spring?cloud2020之后移除了Ribbon,直接使用Spring?Cloud?LoadBalancer作為客戶端負(fù)載均衡組件,我們討論Spring負(fù)載均衡以Spring?Cloud2020之后版本為主,學(xué)習(xí)Spring?Cloud?LoadBalance2023-11-11
使用MUI框架構(gòu)建App請(qǐng)求http接口實(shí)例代碼
下面小編就為大家分享一篇使用MUI框架構(gòu)建App請(qǐng)求http接口實(shí)例代碼,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-01-01
使用Java編寫一個(gè)圖片word互轉(zhuǎn)工具
這篇文章主要介紹了使用Java編寫一個(gè)PDF?Word文件轉(zhuǎn)換工具的相關(guān)資料,需要的朋友可以參考下2023-01-01
BeanUtils.copyProperties復(fù)制屬性失敗的原因及解決方案
這篇文章主要介紹了BeanUtils.copyProperties復(fù)制屬性失敗的原因及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08

