Spring websocket并發(fā)發(fā)送消息異常的解決
今天剛剛經(jīng)歷了一個(gè)坑,非常新鮮,我立刻決定記錄下來。首先,讓我們先看一下我們項(xiàng)目中使用的 Spring WebSocket 示例代碼。
示例代碼
在我們的項(xiàng)目中,我們使用了 Spring WebSocket 來實(shí)現(xiàn)服務(wù)器與客戶端之間的實(shí)時(shí)通信。下面是一個(gè)簡化的示例代碼:
WebSocketConfig配置代碼
@Configuration
@EnableWebSocket // 啟動Websocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/myHandler/**")
// 添加攔截器,可以獲取連接的param和 header 用作認(rèn)證鑒權(quán)
.addInterceptors(new LakerSessionHandshakeInterceptor())
// 設(shè)置運(yùn)行跨域
.setAllowedOrigins("*");
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
// 設(shè)置默認(rèn)會話空閑超時(shí) 以毫秒為單位 非正值意味著無限超時(shí),默認(rèn)值 0 ,默認(rèn)沒10s檢查一次空閑就關(guān)閉
container.setMaxSessionIdleTimeout(10 * 1000L);
// 設(shè)置異步發(fā)送消息的默認(rèn)超時(shí)時(shí)間 以毫秒為單位 非正值意味著無限超時(shí) ,默認(rèn)值-1,還沒看到作用
container.setAsyncSendTimeout(10 * 1000L);
// 設(shè)置文本消息的默認(rèn)最大緩沖區(qū)大小 以字符為單位,默認(rèn)值 8 * 1024
container.setMaxTextMessageBufferSize(8 * 1024);
// 設(shè)置二進(jìn)制消息的默認(rèn)最大緩沖區(qū)大小 以字節(jié)為單位,默認(rèn)值 8 * 1024
container.setMaxBinaryMessageBufferSize(8 * 1024);
return container;
}
@Bean
public WebSocketHandler myHandler() {
return new MyHandler();
}
}握手?jǐn)r截器代碼
public class LakerSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
// 攔截器返回false,則不會進(jìn)行websocket協(xié)議的轉(zhuǎn)換
// 可以獲取請求參數(shù)做認(rèn)證鑒權(quán)
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
// 將所有查詢參數(shù)復(fù)制到 WebSocketSession 屬性映射
Enumeration<String> parameterNames = servletRequest.getParameterNames();
while (parameterNames.hasMoreElements()) {
String parameterName = parameterNames.nextElement();
// 放入的值可以從WebSocketHandler里面的WebSocketSession拿出來
attributes.put(parameterName, servletRequest.getParameter(parameterName));
}
if (servletRequest.getHeader(HttpHeaders.AUTHORIZATION) != null) {
setErrorResponse(response, HttpStatus.UNAUTHORIZED);
return false;
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
private void setErrorResponse(ServerHttpResponse response, HttpStatus status, String errorMessage) {
response.setStatusCode(status);
if (errorMessage != null) {
try {
objectMapper.writeValue(response.getBody(), new ErrorDetail(status.value(), errorMessage));
} catch (IOException ioe) {
}
}
}
}業(yè)務(wù)處理器代碼
public class MyHandler extends AbstractWebSocketHandler {
private final Map<String, WebSocketSession> webSocketSessionMap = new ConcurrentHashMap<>();
//成功連接時(shí)
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
// 設(shè)置最大報(bào)文大小,如果報(bào)文過大則會報(bào)錯的,可以將限制調(diào)大。
// 會覆蓋config中的配置。
session.setBinaryMessageSizeLimit(8 * 1024);
session.setTextMessageSizeLimit(8 * 1024);
webSocketSessionMap.put(session.getId(), session);
}
//處理textmessage
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
// 有消息就廣播下
for (Map.Entry<String, WebSocketSession> entry : webSocketSessionMap.entrySet()) {
String s = entry.getKey();
WebSocketSession webSocketSession = entry.getValue();
if (webSocketSession.isOpen()) {
webSocketSession.sendMessage(new TextMessage(s + ":" + message.getPayload()));
log.info("send to {} msg:{}", s, message.getPayload());
}
}
}
//報(bào)錯時(shí)
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
super.handleTransportError(session, exception);
log.warn("handleTransportError:: sessionId: {} {}", session.getId(), exception.getMessage(), exception);
if (session.isOpen()) {
try {
session.close();
} catch (Exception ex) {
// ignore
}
}
}
//連接關(guān)閉時(shí)
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
WebSocketSession session = webSocketSessionMap.remove(session.getId());
if (session.isOpen()) {
try {
session.close();
} catch (Exception ex) {
// ignore
}
}
}
//處理binarymessage
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
super.handleBinaryMessage(session, message);
}
//處理pong
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
super.handlePongMessage(session, message);
}
//是否支持報(bào)文拆包
// 決定是否接受半包,因?yàn)閣ebsocket協(xié)議比較底層,好像Tcp協(xié)議一樣,如果發(fā)送大消息可能會拆成多個(gè)小報(bào)文。如果不希望處理不完整的報(bào)文,希望底層幫忙聚合成完整消息將此方法返回false,這樣底層會等待完整報(bào)文到達(dá)聚合后才回調(diào)。
@Override
public boolean supportsPartialMessages() {
return super.supportsPartialMessages();
}
}問題復(fù)現(xiàn)
在我們的測試環(huán)境中,我們發(fā)現(xiàn)當(dāng)多個(gè)線程同時(shí)嘗試通過 WebSocket 會話發(fā)送消息時(shí),會拋出異常。
現(xiàn)在我們用JMeter模擬100個(gè)用戶發(fā)消息。

當(dāng)執(zhí)行一會兒后會發(fā)現(xiàn)服務(wù)端出現(xiàn)如下異常日志:
java.lang.IllegalStateException: 遠(yuǎn)程 endpoint 處于 [TEXT_PARTIAL_WRITING] 狀態(tài),是被調(diào)用方法的無效狀態(tài)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1274)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textPartialStart(WsRemoteEndpointImplBase.java:1231)
at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:226)
at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49)
at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:215)
at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:108)
原因分析
經(jīng)過分析,發(fā)現(xiàn)異常的根本原因是在并發(fā)發(fā)送消息時(shí),WebSocket 會話的狀態(tài)發(fā)生了異常。
具體來說,當(dāng)一個(gè)線程正在發(fā)送文本消息時(shí),另一個(gè)線程也嘗試發(fā)送消息,就會導(dǎo)致狀態(tài)不一致,從而觸發(fā)異常。

即 WebSocketSession.sendMessage 其不是線程安全的,內(nèi)部有個(gè)狀態(tài)機(jī)來管理防止并發(fā)導(dǎo)致問題以fail-fast方式快速告訴使用者。
那么問題代碼就在這里了

WebSocketSession是不支持并發(fā)發(fā)送消息的,我們使用者要保證其線程安全,這是我一開始沒預(yù)期到的。
解決方案
方案一 加鎖同步發(fā)送
為了解決并發(fā)發(fā)送消息導(dǎo)致的異常,我們可以在發(fā)送消息的代碼塊上加鎖,確保同一時(shí)刻只有一個(gè)線程能夠執(zhí)行發(fā)送操作。
下面是使用加鎖機(jī)制的示例代碼:

改起來非常簡單,只需要對webSocketSession實(shí)例加個(gè)鎖即可。
缺點(diǎn)
當(dāng)并發(fā)度較高時(shí),越后面排隊(duì)等待鎖的人被block的越久。
大致模型圖

方案二 使用ConcurrentWebSocketSessionDecorator
另一種解決并發(fā)發(fā)送消息的方法是使用 ConcurrentWebSocketSessionDecorator 。這是 Spring WebSocket 提供的一個(gè)裝飾器類,用于增強(qiáng)底層的 WebSocketSession 的線程安全性。它通過并發(fā)安全的方式包裝原始的 WebSocketSession 對象,確保在多線程環(huán)境下安全地訪問和修改會話屬性,以及進(jìn)行消息發(fā)送操作。
ConcurrentWebSocketSessionDecorator 的工作原理是利用線程安全的數(shù)據(jù)結(jié)構(gòu)和同步機(jī)制,確保對會話執(zhí)行的操作的原子性和一致性。當(dāng)需要發(fā)送消息時(shí),裝飾器會獲取鎖或使用并發(fā)數(shù)據(jù)結(jié)構(gòu)來協(xié)調(diào)多個(gè)線程之間的訪問。這樣可以防止對會話狀態(tài)的并發(fā)修改,避免潛在的競態(tài)條件。
下面是使用 ConcurrentWebSocketSessionDecorator 的示例代碼:
@Override
public void afterConnectionEstablished(WebSocketSession session) {
log.info("{} Connection established!", session.getId());
// webSocketSessionMap.put(session.getId(), session);
// 把線程安全的session代理裝飾類放到容器里
webSocketSessionMap.put(session.getId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000));
}ConcurrentWebSocketSessionDecorator原理
ConcurrentWebSocketSessionDecorator整體代碼大概200行還是比較容易看懂的。
// 包裝一個(gè)WebSocketSession以保證一次只有一個(gè)線程可以發(fā)送消息。
// 如果發(fā)送速度慢,后續(xù)嘗試從其他線程發(fā)送更多消息將無法獲取刷新鎖,消息將被緩沖。屆時(shí),將檢查指定的緩沖區(qū)大小限制和發(fā)送時(shí)間限制,如果超過限制,會話將關(guān)閉。
public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorator {
private static final Log logger = LogFactory.getLog(ConcurrentWebSocketSessionDecorator.class);
private final int sendTimeLimit;
private final int bufferSizeLimit;
private final OverflowStrategy overflowStrategy;
@Nullable
private Consumer<WebSocketMessage<?>> preSendCallback;
private final Queue<WebSocketMessage<?>> buffer = new LinkedBlockingQueue<>();
private final AtomicInteger bufferSize = new AtomicInteger();
private volatile long sendStartTime;
private volatile boolean limitExceeded;
private volatile boolean closeInProgress;
private final Lock flushLock = new ReentrantLock();
private final Lock closeLock = new ReentrantLock();
public ConcurrentWebSocketSessionDecorator(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit) {
this(delegate, sendTimeLimit, bufferSizeLimit, OverflowStrategy.TERMINATE);
}
// delegate 需要代理的session
// sendTimeLimit 表示發(fā)送**單個(gè)消息**的最大時(shí)間
// bufferSizeLimit 表示發(fā)送消息的隊(duì)列最大字節(jié)數(shù),不是消息的數(shù)量而是消息的總大小
// overflowStrategy 表示超過限制時(shí)的策略有2個(gè)
// - 斷開連接(默認(rèn)選項(xiàng))
// - 丟棄最老的數(shù)據(jù),直到大小滿足
public ConcurrentWebSocketSessionDecorator(
WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit, OverflowStrategy overflowStrategy) {
super(delegate);
this.sendTimeLimit = sendTimeLimit;
this.bufferSizeLimit = bufferSizeLimit;
this.overflowStrategy = overflowStrategy;
}
// 返回自當(dāng)前發(fā)送開始以來的時(shí)間(毫秒),如果當(dāng)前沒有發(fā)送正在進(jìn)行則返回 0。
// 即花費(fèi)的耗時(shí)
public long getTimeSinceSendStarted() {
long start = this.sendStartTime;
return (start > 0 ? (System.currentTimeMillis() - start) : 0);
}
// 設(shè)置在將消息添加到發(fā)送緩沖區(qū)后調(diào)用的回調(diào)
public void setMessageCallback(Consumer<WebSocketMessage<?>> callback) {
this.preSendCallback = callback;
}
@Override
public void sendMessage(WebSocketMessage<?> message) throws IOException {
//檢查超限了就不發(fā)了
if (shouldNotSend()) {
return;
}
// 消息放到buffer隊(duì)列
this.buffer.add(message);
// 增加bufferSize用于后面判斷是不是超限了
this.bufferSize.addAndGet(message.getPayloadLength());
// 發(fā)送緩沖區(qū)后調(diào)用的回調(diào)
if (this.preSendCallback != null) {
this.preSendCallback.accept(message);
}
do {
// 嘗試獲取鎖,發(fā)送消息,只有一個(gè)線程負(fù)責(zé)發(fā)送所有消息
if (!tryFlushMessageBuffer()) {
// 沒獲取鎖的線程,對當(dāng)前buffer和時(shí)間檢查,
// 檢查不過就拋異常 然后框架自己會抓取異常關(guān)閉當(dāng)前的連接
checkSessionLimits();
break;
}
}
while (!this.buffer.isEmpty() && !shouldNotSend());
}
// 超限了 不能發(fā)送了
private boolean shouldNotSend() {
return (this.limitExceeded || this.closeInProgress);
}
// 嘗試獲取鎖 并發(fā)送所有緩存的消息
private boolean tryFlushMessageBuffer() throws IOException {
if (this.flushLock.tryLock()) {
try {
// 循環(huán)發(fā)送消息
while (true) {
// 一次拉一個(gè)消息
WebSocketMessage<?> message = this.buffer.poll();
// 沒消息了 或者 超限了
if (message == null || shouldNotSend()) {
// 退出 完活
break;
}
// 釋放bufferSize
this.bufferSize.addAndGet(-message.getPayloadLength());
// 用于判斷單個(gè)消息是否發(fā)送超時(shí)的
this.sendStartTime = System.currentTimeMillis();
// 發(fā)送消息
getDelegate().sendMessage(message);
// 重置開始時(shí)間
this.sendStartTime = 0;
}
}
finally {
this.sendStartTime = 0;
this.flushLock.unlock();
}
return true;
}
return false;
}
//檢查是否超時(shí),是否超過buffer限制
private void checkSessionLimits() {
// 應(yīng)該發(fā)送 且 獲取到關(guān)閉鎖
if (!shouldNotSend() && this.closeLock.tryLock()) {
try {
//檢測是否發(fā)送超時(shí)
if (getTimeSinceSendStarted() > getSendTimeLimit()) {
String format = "Send time %d (ms) for session '%s' exceeded the allowed limit %d";
String reason = String.format(format, getTimeSinceSendStarted(), getId(), getSendTimeLimit());
limitExceeded(reason);
}
//檢測buffer大小,根據(jù)策略要么拋異常關(guān)閉連接,要么丟棄數(shù)據(jù)
else if (getBufferSize() > getBufferSizeLimit()) {
switch (this.overflowStrategy) {
// 關(guān)閉連接,拋出異??蚣芫蜁P(guān)閉連接
case TERMINATE:
String format = "Buffer size %d bytes for session '%s' exceeds the allowed limit %d";
String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit());
limitExceeded(reason);
break;
// 丟棄老數(shù)據(jù)
case DROP:
int i = 0;
while (getBufferSize() > getBufferSizeLimit()) {
WebSocketMessage<?> message = this.buffer.poll();
if (message == null) {
break;
}
this.bufferSize.addAndGet(-message.getPayloadLength());
i++;
}
if (logger.isDebugEnabled()) {
logger.debug("Dropped " + i + " messages, buffer size: " + getBufferSize());
}
break;
default:
// Should never happen..
throw new IllegalStateException("Unexpected OverflowStrategy: " + this.overflowStrategy);
}
}
}
finally {
this.closeLock.unlock();
}
}
}
private void limitExceeded(String reason) {
this.limitExceeded = true;
throw new SessionLimitExceededException(reason, CloseStatus.SESSION_NOT_RELIABLE);
}
@Override
public void close(CloseStatus status) throws IOException {
this.closeLock.lock();
try {
if (this.closeInProgress) {
return;
}
if (!CloseStatus.SESSION_NOT_RELIABLE.equals(status)) {
try {
checkSessionLimits();
}
catch (SessionLimitExceededException ex) {
// Ignore
}
if (this.limitExceeded) {
if (logger.isDebugEnabled()) {
logger.debug("Changing close status " + status + " to SESSION_NOT_RELIABLE.");
}
status = CloseStatus.SESSION_NOT_RELIABLE;
}
}
this.closeInProgress = true;
super.close(status);
}
finally {
this.closeLock.unlock();
}
}
public enum OverflowStrategy {
TERMINATE,
DROP
}
}大致模型圖

方案三 自研事件驅(qū)動隊(duì)列(借鑒 Tomcat)
除了使用加鎖和 ConcurrentWebSocketSessionDecorator,我們還可以借鑒 Tomcat 的事件驅(qū)動隊(duì)列機(jī)制來解決并發(fā)發(fā)送消息的問題。具體的實(shí)現(xiàn)需要一些復(fù)雜的邏輯和代碼,涉及到消息隊(duì)列、線程池和事件處理機(jī)制,因此在這里我不展開討論。如果你對這個(gè)方案感興趣,可以參考 Tomcat 的源代碼,了解更多關(guān)于事件驅(qū)動隊(duì)列的實(shí)現(xiàn)細(xì)節(jié)。
這個(gè)參考之前的tomcat設(shè)計(jì)即可。

總結(jié)
在本篇博文中,我們討論了在使用 Spring WebSocket 進(jìn)行并發(fā)發(fā)送消息時(shí)可能遇到的異常情況。我們深入分析了異常的原因,并提供了三種解決方案:加鎖同步發(fā)送、使用 ConcurrentWebSocketSessionDecorator 和自研事件驅(qū)動隊(duì)列(借鑒 Tomcat)。每種方案都有其適用的場景和注意事項(xiàng),你可以根據(jù)自己的需求選擇合適的方法來解決并發(fā)發(fā)送消息的異常問題。
希望本文對你有所幫助,讓你在使用 Spring WebSocket 時(shí)能夠避免類似的坑。如果你對本文有任何疑問或意見,歡迎在評論區(qū)留言,我們將盡力為你解答。謝謝閱讀!
參考鏈接:
到此這篇關(guān)于Spring websocket并發(fā)發(fā)送消息異常的解決的文章就介紹到這了,更多相關(guān)Spring websocket并發(fā)發(fā)送消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Docker 解決openjdk容器里無法使用JDK的jmap等命令問題
這篇文章主要介紹了Docker 解決openjdk容器里無法使用JDK的jmap等命令問題,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12
詳解Spring Boot 使用Java代碼創(chuàng)建Bean并注冊到Spring中
本篇介紹了Spring Boot 使用Java代碼創(chuàng)建Bean并注冊到Spring中,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-02-02
java 實(shí)現(xiàn)字節(jié)流和字節(jié)緩沖流讀寫文件時(shí)間對比
這篇文章主要介紹了java 實(shí)現(xiàn)字節(jié)流和字節(jié)緩沖流讀寫文件時(shí)間對比,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01
mybatis遞歸 一對多的實(shí)現(xiàn)方法示例
這篇文章主要給大家介紹了關(guān)于mybatis遞歸 一對多實(shí)現(xiàn)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08
基于Java SSM框架開發(fā)圖書借閱系統(tǒng)源代碼
本文給大家介紹了基于Java SSM框架開發(fā)圖書借閱系統(tǒng),開發(fā)環(huán)境基于idea2020+mysql數(shù)據(jù)庫,前端框架使用bootstrap4框架,完美了實(shí)現(xiàn)圖書借閱系統(tǒng),喜歡的朋友快來體驗(yàn)吧2021-05-05
SpringBoot中Date格式化處理的三種實(shí)現(xiàn)
Spring Boot作為一個(gè)簡化Spring應(yīng)用開發(fā)的框架,提供了多種處理日期格式化的方法,本文主要介紹了SpringBoot中Date格式化處理實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03

