Spring websocket并發(fā)發(fā)送消息異常的解決
今天剛剛經(jīng)歷了一個坑,非常新鮮,我立刻決定記錄下來。首先,讓我們先看一下我們項目中使用的 Spring WebSocket 示例代碼。
示例代碼
在我們的項目中,我們使用了 Spring WebSocket 來實現(xiàn)服務(wù)器與客戶端之間的實時通信。下面是一個簡化的示例代碼:
WebSocketConfig配置代碼
@Configuration @EnableWebSocket // 啟動Websocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myHandler(), "/myHandler/**") // 添加攔截器,可以獲取連接的param和 header 用作認證鑒權(quán) .addInterceptors(new LakerSessionHandshakeInterceptor()) // 設(shè)置運行跨域 .setAllowedOrigins("*"); } @Bean public ServletServerContainerFactoryBean createWebSocketContainer() { ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean(); // 設(shè)置默認會話空閑超時 以毫秒為單位 非正值意味著無限超時,默認值 0 ,默認沒10s檢查一次空閑就關(guān)閉 container.setMaxSessionIdleTimeout(10 * 1000L); // 設(shè)置異步發(fā)送消息的默認超時時間 以毫秒為單位 非正值意味著無限超時 ,默認值-1,還沒看到作用 container.setAsyncSendTimeout(10 * 1000L); // 設(shè)置文本消息的默認最大緩沖區(qū)大小 以字符為單位,默認值 8 * 1024 container.setMaxTextMessageBufferSize(8 * 1024); // 設(shè)置二進制消息的默認最大緩沖區(qū)大小 以字節(jié)為單位,默認值 8 * 1024 container.setMaxBinaryMessageBufferSize(8 * 1024); return container; } @Bean public WebSocketHandler myHandler() { return new MyHandler(); } }
握手攔截器代碼
public class LakerSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor { // 攔截器返回false,則不會進行websocket協(xié)議的轉(zhuǎn)換 // 可以獲取請求參數(shù)做認證鑒權(quán) @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); // 將所有查詢參數(shù)復(fù)制到 WebSocketSession 屬性映射 Enumeration<String> parameterNames = servletRequest.getParameterNames(); while (parameterNames.hasMoreElements()) { String parameterName = parameterNames.nextElement(); // 放入的值可以從WebSocketHandler里面的WebSocketSession拿出來 attributes.put(parameterName, servletRequest.getParameter(parameterName)); } if (servletRequest.getHeader(HttpHeaders.AUTHORIZATION) != null) { setErrorResponse(response, HttpStatus.UNAUTHORIZED); return false; } return true; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { } private void setErrorResponse(ServerHttpResponse response, HttpStatus status, String errorMessage) { response.setStatusCode(status); if (errorMessage != null) { try { objectMapper.writeValue(response.getBody(), new ErrorDetail(status.value(), errorMessage)); } catch (IOException ioe) { } } } }
業(yè)務(wù)處理器代碼
public class MyHandler extends AbstractWebSocketHandler { private final Map<String, WebSocketSession> webSocketSessionMap = new ConcurrentHashMap<>(); //成功連接時 @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { super.afterConnectionEstablished(session); // 設(shè)置最大報文大小,如果報文過大則會報錯的,可以將限制調(diào)大。 // 會覆蓋config中的配置。 session.setBinaryMessageSizeLimit(8 * 1024); session.setTextMessageSizeLimit(8 * 1024); webSocketSessionMap.put(session.getId(), session); } //處理textmessage @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { super.handleTextMessage(session, message); // 有消息就廣播下 for (Map.Entry<String, WebSocketSession> entry : webSocketSessionMap.entrySet()) { String s = entry.getKey(); WebSocketSession webSocketSession = entry.getValue(); if (webSocketSession.isOpen()) { webSocketSession.sendMessage(new TextMessage(s + ":" + message.getPayload())); log.info("send to {} msg:{}", s, message.getPayload()); } } } //報錯時 @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { super.handleTransportError(session, exception); log.warn("handleTransportError:: sessionId: {} {}", session.getId(), exception.getMessage(), exception); if (session.isOpen()) { try { session.close(); } catch (Exception ex) { // ignore } } } //連接關(guān)閉時 @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { super.afterConnectionClosed(session, status); WebSocketSession session = webSocketSessionMap.remove(session.getId()); if (session.isOpen()) { try { session.close(); } catch (Exception ex) { // ignore } } } //處理binarymessage @Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { super.handleBinaryMessage(session, message); } //處理pong @Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { super.handlePongMessage(session, message); } //是否支持報文拆包 // 決定是否接受半包,因為websocket協(xié)議比較底層,好像Tcp協(xié)議一樣,如果發(fā)送大消息可能會拆成多個小報文。如果不希望處理不完整的報文,希望底層幫忙聚合成完整消息將此方法返回false,這樣底層會等待完整報文到達聚合后才回調(diào)。 @Override public boolean supportsPartialMessages() { return super.supportsPartialMessages(); } }
問題復(fù)現(xiàn)
在我們的測試環(huán)境中,我們發(fā)現(xiàn)當多個線程同時嘗試通過 WebSocket 會話發(fā)送消息時,會拋出異常。
現(xiàn)在我們用JMeter模擬100個用戶發(fā)消息。
當執(zhí)行一會兒后會發(fā)現(xiàn)服務(wù)端出現(xiàn)如下異常日志:
java.lang.IllegalStateException: 遠程 endpoint 處于 [TEXT_PARTIAL_WRITING] 狀態(tài),是被調(diào)用方法的無效狀態(tài)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1274)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textPartialStart(WsRemoteEndpointImplBase.java:1231)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:226)
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49)
at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:215)
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:108)
原因分析
經(jīng)過分析,發(fā)現(xiàn)異常的根本原因是在并發(fā)發(fā)送消息時,WebSocket 會話的狀態(tài)發(fā)生了異常。
具體來說,當一個線程正在發(fā)送文本消息時,另一個線程也嘗試發(fā)送消息,就會導(dǎo)致狀態(tài)不一致,從而觸發(fā)異常。
即 WebSocketSession.sendMessage
其不是線程安全的,內(nèi)部有個狀態(tài)機來管理防止并發(fā)導(dǎo)致問題以fail-fast方式快速告訴使用者。
那么問題代碼就在這里了
WebSocketSession
是不支持并發(fā)發(fā)送消息的,我們使用者要保證其線程安全,這是我一開始沒預(yù)期到的。
解決方案
方案一 加鎖同步發(fā)送
為了解決并發(fā)發(fā)送消息導(dǎo)致的異常,我們可以在發(fā)送消息的代碼塊上加鎖,確保同一時刻只有一個線程能夠執(zhí)行發(fā)送操作。
下面是使用加鎖機制的示例代碼:
改起來非常簡單,只需要對webSocketSession實例加個鎖即可。
缺點
當并發(fā)度較高時,越后面排隊等待鎖的人被block的越久。
大致模型圖
方案二 使用ConcurrentWebSocketSessionDecorator
另一種解決并發(fā)發(fā)送消息的方法是使用 ConcurrentWebSocketSessionDecorator
。這是 Spring WebSocket 提供的一個裝飾器類,用于增強底層的 WebSocketSession
的線程安全性。它通過并發(fā)安全的方式包裝原始的 WebSocketSession
對象,確保在多線程環(huán)境下安全地訪問和修改會話屬性,以及進行消息發(fā)送操作。
ConcurrentWebSocketSessionDecorator
的工作原理是利用線程安全的數(shù)據(jù)結(jié)構(gòu)和同步機制,確保對會話執(zhí)行的操作的原子性和一致性。當需要發(fā)送消息時,裝飾器會獲取鎖或使用并發(fā)數(shù)據(jù)結(jié)構(gòu)來協(xié)調(diào)多個線程之間的訪問。這樣可以防止對會話狀態(tài)的并發(fā)修改,避免潛在的競態(tài)條件。
下面是使用 ConcurrentWebSocketSessionDecorator
的示例代碼:
@Override public void afterConnectionEstablished(WebSocketSession session) { log.info("{} Connection established!", session.getId()); // webSocketSessionMap.put(session.getId(), session); // 把線程安全的session代理裝飾類放到容器里 webSocketSessionMap.put(session.getId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000)); }
ConcurrentWebSocketSessionDecorator原理
ConcurrentWebSocketSessionDecorator整體代碼大概200行還是比較容易看懂的。
// 包裝一個WebSocketSession以保證一次只有一個線程可以發(fā)送消息。 // 如果發(fā)送速度慢,后續(xù)嘗試從其他線程發(fā)送更多消息將無法獲取刷新鎖,消息將被緩沖。屆時,將檢查指定的緩沖區(qū)大小限制和發(fā)送時間限制,如果超過限制,會話將關(guān)閉。 public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorator { private static final Log logger = LogFactory.getLog(ConcurrentWebSocketSessionDecorator.class); private final int sendTimeLimit; private final int bufferSizeLimit; private final OverflowStrategy overflowStrategy; @Nullable private Consumer<WebSocketMessage<?>> preSendCallback; private final Queue<WebSocketMessage<?>> buffer = new LinkedBlockingQueue<>(); private final AtomicInteger bufferSize = new AtomicInteger(); private volatile long sendStartTime; private volatile boolean limitExceeded; private volatile boolean closeInProgress; private final Lock flushLock = new ReentrantLock(); private final Lock closeLock = new ReentrantLock(); public ConcurrentWebSocketSessionDecorator(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit) { this(delegate, sendTimeLimit, bufferSizeLimit, OverflowStrategy.TERMINATE); } // delegate 需要代理的session // sendTimeLimit 表示發(fā)送**單個消息**的最大時間 // bufferSizeLimit 表示發(fā)送消息的隊列最大字節(jié)數(shù),不是消息的數(shù)量而是消息的總大小 // overflowStrategy 表示超過限制時的策略有2個 // - 斷開連接(默認選項) // - 丟棄最老的數(shù)據(jù),直到大小滿足 public ConcurrentWebSocketSessionDecorator( WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit, OverflowStrategy overflowStrategy) { super(delegate); this.sendTimeLimit = sendTimeLimit; this.bufferSizeLimit = bufferSizeLimit; this.overflowStrategy = overflowStrategy; } // 返回自當前發(fā)送開始以來的時間(毫秒),如果當前沒有發(fā)送正在進行則返回 0。 // 即花費的耗時 public long getTimeSinceSendStarted() { long start = this.sendStartTime; return (start > 0 ? (System.currentTimeMillis() - start) : 0); } // 設(shè)置在將消息添加到發(fā)送緩沖區(qū)后調(diào)用的回調(diào) public void setMessageCallback(Consumer<WebSocketMessage<?>> callback) { this.preSendCallback = callback; } @Override public void sendMessage(WebSocketMessage<?> message) throws IOException { //檢查超限了就不發(fā)了 if (shouldNotSend()) { return; } // 消息放到buffer隊列 this.buffer.add(message); // 增加bufferSize用于后面判斷是不是超限了 this.bufferSize.addAndGet(message.getPayloadLength()); // 發(fā)送緩沖區(qū)后調(diào)用的回調(diào) if (this.preSendCallback != null) { this.preSendCallback.accept(message); } do { // 嘗試獲取鎖,發(fā)送消息,只有一個線程負責發(fā)送所有消息 if (!tryFlushMessageBuffer()) { // 沒獲取鎖的線程,對當前buffer和時間檢查, // 檢查不過就拋異常 然后框架自己會抓取異常關(guān)閉當前的連接 checkSessionLimits(); break; } } while (!this.buffer.isEmpty() && !shouldNotSend()); } // 超限了 不能發(fā)送了 private boolean shouldNotSend() { return (this.limitExceeded || this.closeInProgress); } // 嘗試獲取鎖 并發(fā)送所有緩存的消息 private boolean tryFlushMessageBuffer() throws IOException { if (this.flushLock.tryLock()) { try { // 循環(huán)發(fā)送消息 while (true) { // 一次拉一個消息 WebSocketMessage<?> message = this.buffer.poll(); // 沒消息了 或者 超限了 if (message == null || shouldNotSend()) { // 退出 完活 break; } // 釋放bufferSize this.bufferSize.addAndGet(-message.getPayloadLength()); // 用于判斷單個消息是否發(fā)送超時的 this.sendStartTime = System.currentTimeMillis(); // 發(fā)送消息 getDelegate().sendMessage(message); // 重置開始時間 this.sendStartTime = 0; } } finally { this.sendStartTime = 0; this.flushLock.unlock(); } return true; } return false; } //檢查是否超時,是否超過buffer限制 private void checkSessionLimits() { // 應(yīng)該發(fā)送 且 獲取到關(guān)閉鎖 if (!shouldNotSend() && this.closeLock.tryLock()) { try { //檢測是否發(fā)送超時 if (getTimeSinceSendStarted() > getSendTimeLimit()) { String format = "Send time %d (ms) for session '%s' exceeded the allowed limit %d"; String reason = String.format(format, getTimeSinceSendStarted(), getId(), getSendTimeLimit()); limitExceeded(reason); } //檢測buffer大小,根據(jù)策略要么拋異常關(guān)閉連接,要么丟棄數(shù)據(jù) else if (getBufferSize() > getBufferSizeLimit()) { switch (this.overflowStrategy) { // 關(guān)閉連接,拋出異??蚣芫蜁P(guān)閉連接 case TERMINATE: String format = "Buffer size %d bytes for session '%s' exceeds the allowed limit %d"; String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit()); limitExceeded(reason); break; // 丟棄老數(shù)據(jù) case DROP: int i = 0; while (getBufferSize() > getBufferSizeLimit()) { WebSocketMessage<?> message = this.buffer.poll(); if (message == null) { break; } this.bufferSize.addAndGet(-message.getPayloadLength()); i++; } if (logger.isDebugEnabled()) { logger.debug("Dropped " + i + " messages, buffer size: " + getBufferSize()); } break; default: // Should never happen.. throw new IllegalStateException("Unexpected OverflowStrategy: " + this.overflowStrategy); } } } finally { this.closeLock.unlock(); } } } private void limitExceeded(String reason) { this.limitExceeded = true; throw new SessionLimitExceededException(reason, CloseStatus.SESSION_NOT_RELIABLE); } @Override public void close(CloseStatus status) throws IOException { this.closeLock.lock(); try { if (this.closeInProgress) { return; } if (!CloseStatus.SESSION_NOT_RELIABLE.equals(status)) { try { checkSessionLimits(); } catch (SessionLimitExceededException ex) { // Ignore } if (this.limitExceeded) { if (logger.isDebugEnabled()) { logger.debug("Changing close status " + status + " to SESSION_NOT_RELIABLE."); } status = CloseStatus.SESSION_NOT_RELIABLE; } } this.closeInProgress = true; super.close(status); } finally { this.closeLock.unlock(); } } public enum OverflowStrategy { TERMINATE, DROP } }
大致模型圖
方案三 自研事件驅(qū)動隊列(借鑒 Tomcat)
除了使用加鎖和 ConcurrentWebSocketSessionDecorator
,我們還可以借鑒 Tomcat 的事件驅(qū)動隊列機制來解決并發(fā)發(fā)送消息的問題。具體的實現(xiàn)需要一些復(fù)雜的邏輯和代碼,涉及到消息隊列、線程池和事件處理機制,因此在這里我不展開討論。如果你對這個方案感興趣,可以參考 Tomcat 的源代碼,了解更多關(guān)于事件驅(qū)動隊列的實現(xiàn)細節(jié)。
這個參考之前的tomcat設(shè)計即可。
總結(jié)
在本篇博文中,我們討論了在使用 Spring WebSocket 進行并發(fā)發(fā)送消息時可能遇到的異常情況。我們深入分析了異常的原因,并提供了三種解決方案:加鎖同步發(fā)送、使用 ConcurrentWebSocketSessionDecorator
和自研事件驅(qū)動隊列(借鑒 Tomcat)。每種方案都有其適用的場景和注意事項,你可以根據(jù)自己的需求選擇合適的方法來解決并發(fā)發(fā)送消息的異常問題。
希望本文對你有所幫助,讓你在使用 Spring WebSocket 時能夠避免類似的坑。如果你對本文有任何疑問或意見,歡迎在評論區(qū)留言,我們將盡力為你解答。謝謝閱讀!
參考鏈接:
到此這篇關(guān)于Spring websocket并發(fā)發(fā)送消息異常的解決的文章就介紹到這了,更多相關(guān)Spring websocket并發(fā)發(fā)送消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Docker 解決openjdk容器里無法使用JDK的jmap等命令問題
這篇文章主要介紹了Docker 解決openjdk容器里無法使用JDK的jmap等命令問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12詳解Spring Boot 使用Java代碼創(chuàng)建Bean并注冊到Spring中
本篇介紹了Spring Boot 使用Java代碼創(chuàng)建Bean并注冊到Spring中,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02java 實現(xiàn)字節(jié)流和字節(jié)緩沖流讀寫文件時間對比
這篇文章主要介紹了java 實現(xiàn)字節(jié)流和字節(jié)緩沖流讀寫文件時間對比,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01基于Java SSM框架開發(fā)圖書借閱系統(tǒng)源代碼
本文給大家介紹了基于Java SSM框架開發(fā)圖書借閱系統(tǒng),開發(fā)環(huán)境基于idea2020+mysql數(shù)據(jù)庫,前端框架使用bootstrap4框架,完美了實現(xiàn)圖書借閱系統(tǒng),喜歡的朋友快來體驗吧2021-05-05SpringBoot中Date格式化處理的三種實現(xiàn)
Spring Boot作為一個簡化Spring應(yīng)用開發(fā)的框架,提供了多種處理日期格式化的方法,本文主要介紹了SpringBoot中Date格式化處理實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2024-03-03