欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring websocket并發(fā)發(fā)送消息異常的解決

 更新時間:2023年09月04日 10:47:20   作者:lakernote  
本文主要介紹了 Spring websocket并發(fā)發(fā)送消息異常的解決,當多個線程同時嘗試通過 WebSocket 會話發(fā)送消息時,會拋出異常,下面就來解決一下,感興趣的可以了解一下

今天剛剛經(jīng)歷了一個坑,非常新鮮,我立刻決定記錄下來。首先,讓我們先看一下我們項目中使用的 Spring WebSocket 示例代碼。

示例代碼

在我們的項目中,我們使用了 Spring WebSocket 來實現(xiàn)服務(wù)器與客戶端之間的實時通信。下面是一個簡化的示例代碼:

WebSocketConfig配置代碼

@Configuration
@EnableWebSocket // 啟動Websocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(myHandler(), "/myHandler/**")
            // 添加攔截器,可以獲取連接的param和 header 用作認證鑒權(quán)
            .addInterceptors(new LakerSessionHandshakeInterceptor())
            // 設(shè)置運行跨域
            .setAllowedOrigins("*");
    }
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        // 設(shè)置默認會話空閑超時 以毫秒為單位 非正值意味著無限超時,默認值 0 ,默認沒10s檢查一次空閑就關(guān)閉
        container.setMaxSessionIdleTimeout(10 * 1000L);
        // 設(shè)置異步發(fā)送消息的默認超時時間 以毫秒為單位 非正值意味著無限超時 ,默認值-1,還沒看到作用
        container.setAsyncSendTimeout(10 * 1000L);
        // 設(shè)置文本消息的默認最大緩沖區(qū)大小 以字符為單位,默認值 8 * 1024
        container.setMaxTextMessageBufferSize(8 * 1024);
        // 設(shè)置二進制消息的默認最大緩沖區(qū)大小 以字節(jié)為單位,默認值 8 * 1024
        container.setMaxBinaryMessageBufferSize(8 * 1024);
        return container;
    }
    @Bean
    public WebSocketHandler myHandler() {
        return new MyHandler();
    }
}

握手攔截器代碼

public class LakerSessionHandshakeInterceptor extends HttpSessionHandshakeInterceptor {
    // 攔截器返回false,則不會進行websocket協(xié)議的轉(zhuǎn)換
    // 可以獲取請求參數(shù)做認證鑒權(quán)
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
        // 將所有查詢參數(shù)復(fù)制到 WebSocketSession 屬性映射
        Enumeration<String> parameterNames = servletRequest.getParameterNames();
        while (parameterNames.hasMoreElements()) {
            String parameterName = parameterNames.nextElement();
            // 放入的值可以從WebSocketHandler里面的WebSocketSession拿出來
            attributes.put(parameterName, servletRequest.getParameter(parameterName));
        }
        if (servletRequest.getHeader(HttpHeaders.AUTHORIZATION) != null) {
            setErrorResponse(response, HttpStatus.UNAUTHORIZED);
            return false;
        }
        return true;
    }
    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
    }
    private void setErrorResponse(ServerHttpResponse response, HttpStatus status, String errorMessage) {
        response.setStatusCode(status);
        if (errorMessage != null) {
            try {
                objectMapper.writeValue(response.getBody(), new ErrorDetail(status.value(), errorMessage));
            } catch (IOException ioe) {
            }
        }
    }
}

業(yè)務(wù)處理器代碼

public class MyHandler extends AbstractWebSocketHandler {
     private final Map<String, WebSocketSession> webSocketSessionMap = new ConcurrentHashMap<>();
    //成功連接時
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        super.afterConnectionEstablished(session);
        // 設(shè)置最大報文大小,如果報文過大則會報錯的,可以將限制調(diào)大。
        // 會覆蓋config中的配置。
        session.setBinaryMessageSizeLimit(8 * 1024);
        session.setTextMessageSizeLimit(8 * 1024);
        webSocketSessionMap.put(session.getId(), session);
    }
    //處理textmessage
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        super.handleTextMessage(session, message);
        // 有消息就廣播下
        for (Map.Entry<String, WebSocketSession> entry : webSocketSessionMap.entrySet()) {
            String s = entry.getKey();
            WebSocketSession webSocketSession = entry.getValue();
            if (webSocketSession.isOpen()) {
            	webSocketSession.sendMessage(new TextMessage(s + ":" + message.getPayload()));
            	log.info("send to {} msg:{}", s, message.getPayload());
            }
        }
    }
    //報錯時
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        super.handleTransportError(session, exception);
          log.warn("handleTransportError:: sessionId: {} {}", session.getId(), exception.getMessage(), exception);
        if (session.isOpen()) {
            try {
                session.close();
            } catch (Exception ex) {
                // ignore
            }
        }
    }
   //連接關(guān)閉時
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        super.afterConnectionClosed(session, status);
        WebSocketSession session =  webSocketSessionMap.remove(session.getId());
        if (session.isOpen()) {
            try {
                session.close();
            } catch (Exception ex) {
                // ignore
            }
        }
    }
    //處理binarymessage
    @Override
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
        super.handleBinaryMessage(session, message);
    }
    //處理pong
    @Override
    protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
        super.handlePongMessage(session, message);
    }
    //是否支持報文拆包
    // 決定是否接受半包,因為websocket協(xié)議比較底層,好像Tcp協(xié)議一樣,如果發(fā)送大消息可能會拆成多個小報文。如果不希望處理不完整的報文,希望底層幫忙聚合成完整消息將此方法返回false,這樣底層會等待完整報文到達聚合后才回調(diào)。
    @Override
    public boolean supportsPartialMessages() {
        return super.supportsPartialMessages();
    }
}

問題復(fù)現(xiàn)

在我們的測試環(huán)境中,我們發(fā)現(xiàn)當多個線程同時嘗試通過 WebSocket 會話發(fā)送消息時,會拋出異常。

現(xiàn)在我們用JMeter模擬100個用戶發(fā)消息。

當執(zhí)行一會兒后會發(fā)現(xiàn)服務(wù)端出現(xiàn)如下異常日志:

java.lang.IllegalStateException: 遠程 endpoint 處于 [TEXT_PARTIAL_WRITING] 狀態(tài),是被調(diào)用方法的無效狀態(tài)
    at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.checkState(WsRemoteEndpointImplBase.java:1274)
    at org.apache.tomcat.websocket.WsRemoteEndpointImplBase$StateMachine.textPartialStart(WsRemoteEndpointImplBase.java:1231)
    at org.apache.tomcat.websocket.WsRemoteEndpointImplBase.sendPartialString(WsRemoteEndpointImplBase.java:226)
    at org.apache.tomcat.websocket.WsRemoteEndpointBasic.sendText(WsRemoteEndpointBasic.java:49)
    at org.springframework.web.socket.adapter.standard.StandardWebSocketSession.sendTextMessage(StandardWebSocketSession.java:215)
    at org.springframework.web.socket.adapter.AbstractWebSocketSession.sendMessage(AbstractWebSocketSession.java:108)

原因分析

經(jīng)過分析,發(fā)現(xiàn)異常的根本原因是在并發(fā)發(fā)送消息時,WebSocket 會話的狀態(tài)發(fā)生了異常。

具體來說,當一個線程正在發(fā)送文本消息時,另一個線程也嘗試發(fā)送消息,就會導(dǎo)致狀態(tài)不一致,從而觸發(fā)異常。

WebSocketSession.sendMessage 不是線程安全的,內(nèi)部有個狀態(tài)機來管理防止并發(fā)導(dǎo)致問題以fail-fast方式快速告訴使用者。

那么問題代碼就在這里了

WebSocketSession 是不支持并發(fā)發(fā)送消息的,我們使用者要保證其線程安全,這是我一開始沒預(yù)期到的。

解決方案

方案一 加鎖同步發(fā)送

為了解決并發(fā)發(fā)送消息導(dǎo)致的異常,我們可以在發(fā)送消息的代碼塊上加鎖,確保同一時刻只有一個線程能夠執(zhí)行發(fā)送操作。

下面是使用加鎖機制的示例代碼:

改起來非常簡單,只需要對webSocketSession實例加個鎖即可。

缺點

當并發(fā)度較高時,越后面排隊等待鎖的人被block的越久。

大致模型圖

方案二 使用ConcurrentWebSocketSessionDecorator

另一種解決并發(fā)發(fā)送消息的方法是使用 ConcurrentWebSocketSessionDecorator 。這是 Spring WebSocket 提供的一個裝飾器類,用于增強底層的 WebSocketSession 的線程安全性。它通過并發(fā)安全的方式包裝原始的 WebSocketSession 對象,確保在多線程環(huán)境下安全地訪問和修改會話屬性,以及進行消息發(fā)送操作。

ConcurrentWebSocketSessionDecorator 的工作原理是利用線程安全的數(shù)據(jù)結(jié)構(gòu)和同步機制,確保對會話執(zhí)行的操作的原子性和一致性。當需要發(fā)送消息時,裝飾器會獲取鎖或使用并發(fā)數(shù)據(jù)結(jié)構(gòu)來協(xié)調(diào)多個線程之間的訪問。這樣可以防止對會話狀態(tài)的并發(fā)修改,避免潛在的競態(tài)條件。

下面是使用 ConcurrentWebSocketSessionDecorator 的示例代碼:

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        log.info("{} Connection established!", session.getId());
        // webSocketSessionMap.put(session.getId(), session);
        // 把線程安全的session代理裝飾類放到容器里
        webSocketSessionMap.put(session.getId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000));
    }

ConcurrentWebSocketSessionDecorator原理

ConcurrentWebSocketSessionDecorator整體代碼大概200行還是比較容易看懂的。

// 包裝一個WebSocketSession以保證一次只有一個線程可以發(fā)送消息。
// 如果發(fā)送速度慢,后續(xù)嘗試從其他線程發(fā)送更多消息將無法獲取刷新鎖,消息將被緩沖。屆時,將檢查指定的緩沖區(qū)大小限制和發(fā)送時間限制,如果超過限制,會話將關(guān)閉。
public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorator {
	private static final Log logger = LogFactory.getLog(ConcurrentWebSocketSessionDecorator.class);
	private final int sendTimeLimit;
	private final int bufferSizeLimit;
	private final OverflowStrategy overflowStrategy;
	@Nullable
	private Consumer<WebSocketMessage<?>> preSendCallback;
	private final Queue<WebSocketMessage<?>> buffer = new LinkedBlockingQueue<>();
	private final AtomicInteger bufferSize = new AtomicInteger();
	private volatile long sendStartTime;
	private volatile boolean limitExceeded;
	private volatile boolean closeInProgress;
	private final Lock flushLock = new ReentrantLock();
	private final Lock closeLock = new ReentrantLock();
	public ConcurrentWebSocketSessionDecorator(WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit) {
		this(delegate, sendTimeLimit, bufferSizeLimit, OverflowStrategy.TERMINATE);
	}
    // delegate 需要代理的session
    // sendTimeLimit 表示發(fā)送**單個消息**的最大時間
    // bufferSizeLimit 表示發(fā)送消息的隊列最大字節(jié)數(shù),不是消息的數(shù)量而是消息的總大小
    // overflowStrategy 表示超過限制時的策略有2個
                 // - 斷開連接(默認選項)
                 // - 丟棄最老的數(shù)據(jù),直到大小滿足
	public ConcurrentWebSocketSessionDecorator(
			WebSocketSession delegate, int sendTimeLimit, int bufferSizeLimit, OverflowStrategy overflowStrategy) {
		super(delegate);
		this.sendTimeLimit = sendTimeLimit;
		this.bufferSizeLimit = bufferSizeLimit;
		this.overflowStrategy = overflowStrategy;
	}
    // 返回自當前發(fā)送開始以來的時間(毫秒),如果當前沒有發(fā)送正在進行則返回 0。
    // 即花費的耗時
	public long getTimeSinceSendStarted() {
		long start = this.sendStartTime;
		return (start > 0 ? (System.currentTimeMillis() - start) : 0);
	}
	// 設(shè)置在將消息添加到發(fā)送緩沖區(qū)后調(diào)用的回調(diào)
	public void setMessageCallback(Consumer<WebSocketMessage<?>> callback) {
		this.preSendCallback = callback;
	}
	@Override
	public void sendMessage(WebSocketMessage<?> message) throws IOException {
        //檢查超限了就不發(fā)了
		if (shouldNotSend()) {
			return;
		}
		// 消息放到buffer隊列
		this.buffer.add(message);
        // 增加bufferSize用于后面判斷是不是超限了
		this.bufferSize.addAndGet(message.getPayloadLength());
		// 發(fā)送緩沖區(qū)后調(diào)用的回調(diào)
		if (this.preSendCallback != null) {
			this.preSendCallback.accept(message);
		}
		do {
            // 嘗試獲取鎖,發(fā)送消息,只有一個線程負責發(fā)送所有消息
			if (!tryFlushMessageBuffer()) {
				// 沒獲取鎖的線程,對當前buffer和時間檢查,
                // 檢查不過就拋異常 然后框架自己會抓取異常關(guān)閉當前的連接
				checkSessionLimits();
				break;
			}
		}
		while (!this.buffer.isEmpty() && !shouldNotSend());
	}
	// 超限了 不能發(fā)送了
	private boolean shouldNotSend() {
		return (this.limitExceeded || this.closeInProgress);
	}
	// 嘗試獲取鎖 并發(fā)送所有緩存的消息
	private boolean tryFlushMessageBuffer() throws IOException {
		if (this.flushLock.tryLock()) {
			try {
                 // 循環(huán)發(fā)送消息
				while (true) {
                    // 一次拉一個消息
					WebSocketMessage<?> message = this.buffer.poll();
                    // 沒消息了 或者 超限了
					if (message == null || shouldNotSend()) {
                        // 退出 完活
						break;
					}
                    // 釋放bufferSize
					this.bufferSize.addAndGet(-message.getPayloadLength());
                    // 用于判斷單個消息是否發(fā)送超時的
					this.sendStartTime = System.currentTimeMillis();
                    // 發(fā)送消息
					getDelegate().sendMessage(message);
                    // 重置開始時間
					this.sendStartTime = 0;
				}
			}
			finally {
				this.sendStartTime = 0;
				this.flushLock.unlock();
			}
			return true;
		}
		return false;
	}
   //檢查是否超時,是否超過buffer限制
	private void checkSessionLimits() {
        // 應(yīng)該發(fā)送 且 獲取到關(guān)閉鎖
		if (!shouldNotSend() && this.closeLock.tryLock()) {
			try {
                 //檢測是否發(fā)送超時
				if (getTimeSinceSendStarted() > getSendTimeLimit()) {
					String format = "Send time %d (ms) for session '%s' exceeded the allowed limit %d";
					String reason = String.format(format, getTimeSinceSendStarted(), getId(), getSendTimeLimit());
					limitExceeded(reason);
				}
                 //檢測buffer大小,根據(jù)策略要么拋異常關(guān)閉連接,要么丟棄數(shù)據(jù)
				else if (getBufferSize() > getBufferSizeLimit()) {
					switch (this.overflowStrategy) {
                        // 關(guān)閉連接,拋出異??蚣芫蜁P(guān)閉連接    
						case TERMINATE:
							String format = "Buffer size %d bytes for session '%s' exceeds the allowed limit %d";
							String reason = String.format(format, getBufferSize(), getId(), getBufferSizeLimit());
							limitExceeded(reason);
							break;
                        // 丟棄老數(shù)據(jù)    
						case DROP:
							int i = 0;
							while (getBufferSize() > getBufferSizeLimit()) {
								WebSocketMessage<?> message = this.buffer.poll();
								if (message == null) {
									break;
								}
								this.bufferSize.addAndGet(-message.getPayloadLength());
								i++;
							}
							if (logger.isDebugEnabled()) {
								logger.debug("Dropped " + i + " messages, buffer size: " + getBufferSize());
							}
							break;
						default:
							// Should never happen..
							throw new IllegalStateException("Unexpected OverflowStrategy: " + this.overflowStrategy);
					}
				}
			}
			finally {
				this.closeLock.unlock();
			}
		}
	}
	private void limitExceeded(String reason) {
		this.limitExceeded = true;
		throw new SessionLimitExceededException(reason, CloseStatus.SESSION_NOT_RELIABLE);
	}
	@Override
	public void close(CloseStatus status) throws IOException {
		this.closeLock.lock();
		try {
			if (this.closeInProgress) {
				return;
			}
			if (!CloseStatus.SESSION_NOT_RELIABLE.equals(status)) {
				try {
					checkSessionLimits();
				}
				catch (SessionLimitExceededException ex) {
					// Ignore
				}
				if (this.limitExceeded) {
					if (logger.isDebugEnabled()) {
						logger.debug("Changing close status " + status + " to SESSION_NOT_RELIABLE.");
					}
					status = CloseStatus.SESSION_NOT_RELIABLE;
				}
			}
			this.closeInProgress = true;
			super.close(status);
		}
		finally {
			this.closeLock.unlock();
		}
	}
	public enum OverflowStrategy {
		TERMINATE,
		DROP
	}
}

大致模型圖

方案三 自研事件驅(qū)動隊列(借鑒 Tomcat)

除了使用加鎖和 ConcurrentWebSocketSessionDecorator,我們還可以借鑒 Tomcat 的事件驅(qū)動隊列機制來解決并發(fā)發(fā)送消息的問題。具體的實現(xiàn)需要一些復(fù)雜的邏輯和代碼,涉及到消息隊列、線程池和事件處理機制,因此在這里我不展開討論。如果你對這個方案感興趣,可以參考 Tomcat 的源代碼,了解更多關(guān)于事件驅(qū)動隊列的實現(xiàn)細節(jié)。

這個參考之前的tomcat設(shè)計即可。

img

總結(jié)

在本篇博文中,我們討論了在使用 Spring WebSocket 進行并發(fā)發(fā)送消息時可能遇到的異常情況。我們深入分析了異常的原因,并提供了三種解決方案:加鎖同步發(fā)送、使用 ConcurrentWebSocketSessionDecorator 和自研事件驅(qū)動隊列(借鑒 Tomcat)。每種方案都有其適用的場景和注意事項,你可以根據(jù)自己的需求選擇合適的方法來解決并發(fā)發(fā)送消息的異常問題。

希望本文對你有所幫助,讓你在使用 Spring WebSocket 時能夠避免類似的坑。如果你對本文有任何疑問或意見,歡迎在評論區(qū)留言,我們將盡力為你解答。謝謝閱讀!

參考鏈接:

到此這篇關(guān)于Spring websocket并發(fā)發(fā)送消息異常的解決的文章就介紹到這了,更多相關(guān)Spring websocket并發(fā)發(fā)送消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家! 

相關(guān)文章

  • Docker 解決openjdk容器里無法使用JDK的jmap等命令問題

    Docker 解決openjdk容器里無法使用JDK的jmap等命令問題

    這篇文章主要介紹了Docker 解決openjdk容器里無法使用JDK的jmap等命令問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • springboot枚舉類型傳遞的步驟

    springboot枚舉類型傳遞的步驟

    這篇文章主要介紹了springboot枚舉類型傳遞的步驟,幫助大家更好的理解和學習使用springboot,感興趣的朋友可以了解下
    2021-04-04
  • 詳解Spring Boot 使用Java代碼創(chuàng)建Bean并注冊到Spring中

    詳解Spring Boot 使用Java代碼創(chuàng)建Bean并注冊到Spring中

    本篇介紹了Spring Boot 使用Java代碼創(chuàng)建Bean并注冊到Spring中,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-02-02
  • java 實現(xiàn)字節(jié)流和字節(jié)緩沖流讀寫文件時間對比

    java 實現(xiàn)字節(jié)流和字節(jié)緩沖流讀寫文件時間對比

    這篇文章主要介紹了java 實現(xiàn)字節(jié)流和字節(jié)緩沖流讀寫文件時間對比,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-01-01
  • mybatis遞歸 一對多的實現(xiàn)方法示例

    mybatis遞歸 一對多的實現(xiàn)方法示例

    這篇文章主要給大家介紹了關(guān)于mybatis遞歸 一對多實現(xiàn)的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2018-08-08
  • 基于Java SSM框架開發(fā)圖書借閱系統(tǒng)源代碼

    基于Java SSM框架開發(fā)圖書借閱系統(tǒng)源代碼

    本文給大家介紹了基于Java SSM框架開發(fā)圖書借閱系統(tǒng),開發(fā)環(huán)境基于idea2020+mysql數(shù)據(jù)庫,前端框架使用bootstrap4框架,完美了實現(xiàn)圖書借閱系統(tǒng),喜歡的朋友快來體驗吧
    2021-05-05
  • 你知道Spring如何解決所有循環(huán)依賴的嗎

    你知道Spring如何解決所有循環(huán)依賴的嗎

    這篇文章主要來和大家聊聊Spring?能解決所有循環(huán)依賴嗎,文中的示例代碼講解詳細,對我們學習Spring有一定的幫助,需要的小伙伴可以參考一下
    2023-07-07
  • SpringAOP四種通知類型+環(huán)繞通知說明

    SpringAOP四種通知類型+環(huán)繞通知說明

    這篇文章主要介紹了SpringAOP四種通知類型+環(huán)繞通知說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-09-09
  • SpringBoot中Date格式化處理的三種實現(xiàn)

    SpringBoot中Date格式化處理的三種實現(xiàn)

    Spring Boot作為一個簡化Spring應(yīng)用開發(fā)的框架,提供了多種處理日期格式化的方法,本文主要介紹了SpringBoot中Date格式化處理實現(xiàn),具有一定的參考價值,感興趣的可以了解一下
    2024-03-03
  • Java中的HashMap集合深度解析

    Java中的HashMap集合深度解析

    這篇文章主要介紹了Java中的HashMap集合深度解析, HashMap可以說是Java中最常用的集合類框架之一,是Java語言中非常典型的數(shù)據(jù)結(jié)構(gòu),我們總會在不經(jīng)意間用到它,很大程度上方便了我們?nèi)粘i_發(fā),需要的朋友可以參考下
    2023-09-09

最新評論