Java應(yīng)用層協(xié)議WebSocket實(shí)現(xiàn)消息推送
前言
大部分的web開(kāi)發(fā)者,開(kāi)發(fā)的業(yè)務(wù)都是基于Http協(xié)議的:前端請(qǐng)求后端接口,攜帶參數(shù),后端執(zhí)行業(yè)務(wù)代碼,再返回結(jié)果給前端。作者參與開(kāi)發(fā)的項(xiàng)目,有一個(gè)報(bào)警推送的功能,服務(wù)端實(shí)時(shí)推送報(bào)警信息給瀏覽器端;還有像抖音里面,如果有人關(guān)注、回復(fù)你的評(píng)論時(shí),抖音就會(huì)推送相關(guān)消息給你了,你就會(huì)收到一條消息。
有些同學(xué)會(huì)說(shuō)了,基于Http協(xié)議也能實(shí)現(xiàn)?。呵岸硕〞r(shí)訪問(wèn)后端(每隔3s或者幾秒),后端返回消息數(shù)據(jù),前端拿到后彈出消息。這種方式太low了,而且每個(gè)瀏覽器都這樣,使用系統(tǒng)的人一多,服務(wù)器的壓力就太大了些。那到底用什么技術(shù)手段實(shí)現(xiàn)呢?我們的主角就登場(chǎng)了。
WebSocket是在單個(gè)TCP連接上進(jìn)行全雙工通信的應(yīng)用層協(xié)議(Http協(xié)議也是應(yīng)用層),瀏覽器端和服務(wù)端都可主動(dòng)發(fā)送數(shù)據(jù)給另一端。這樣是不是比Http協(xié)議更適合消息推送這種場(chǎng)景。
瀏覽器端
作者建了一個(gè)SpringBoot項(xiàng)目,Html放在src\main\resources\static下:
<!DOCTYPE html>
<html lang="zh" xmlns:th="http://www.thymeleaf.org">
<head>
<!-- 解決中文亂碼-->
<meta charset="UTF-8"/>
<title></title>
<script type="text/javascript" src="./js/jquery.min.js"></script>
</head>
<body>
<input id="input1" type="text" /><br/>
<input type="button" value="瀏覽器發(fā)送服務(wù)端" onclick="btnClick()" />
<input type="button" value="服務(wù)端發(fā)送瀏覽器" onclick="btnClick1()" />
<input type="button" value="重新打開(kāi)連接" onclick="btnClick2()" />
<br/>
<textarea id="textArea" style="height: 50px"></textarea>
<script>
var ws;
webSocketInit();
function webSocketInit() {
ws =new WebSocket('ws://localhost:8080/bootdemo/webSocket/10086');
// 獲取連接狀態(tài)
console.log('ws連接狀態(tài)[初始]:' + ws.readyState);
//監(jiān)聽(tīng)是否連接成功
ws.onopen = function () {
console.log('ws連接狀態(tài)[成功]:' + ws.readyState);
};
// 接聽(tīng)服務(wù)器發(fā)回的信息并處理展示
ws.onmessage = function (obj) {
console.log('接收到來(lái)自服務(wù)器的消息:');
var txt = $("#textArea").val();
$("#textArea").val(txt + "\n" + obj.data);
$("#textArea").scrollTop($("#textArea")[0].scrollHeight);
//完成通信后關(guān)閉WebSocket連接
// ws.close();
};
// 監(jiān)聽(tīng)連接關(guān)閉事件
ws.onclose = function () {
// 監(jiān)聽(tīng)整個(gè)過(guò)程中websocket的狀態(tài)
console.log('ws連接狀態(tài)[關(guān)閉]:' + ws.readyState);
};
// 監(jiān)聽(tīng)并處理error事件
ws.onerror = function (error) {
console.log(error);
};
}
function btnClick() {
console.log("瀏覽器端發(fā)送消息:");
//連接成功則發(fā)送一個(gè)數(shù)據(jù)
ws.send($("#input1").val());
}
function btnClick1() {
$.ajax({
url: 'http://localhost:8080/bootdemo/pushWebSocket/publish?' +
'userId=10086&message=' + $("#input1").val(),
type: 'GET',
success: function (data) {
// console.log(data);
}
});
}
function btnClick2() {
webSocketInit();
}
</script>
</body>
</html>服務(wù)器端
先引入依賴(lài):
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
bean上添加@ServerEndpoint,作為WebSocket的服務(wù)端。
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@Slf4j
@ServerEndpoint("/webSocket/{userId}")
public class WebSocketServer {
//與某個(gè)客戶(hù)端的連接會(huì)話,需要通過(guò)它來(lái)給客戶(hù)端發(fā)送數(shù)據(jù)
private Session session;
private static final CopyOnWriteArraySet<WebSocketServer> webSockets =
new CopyOnWriteArraySet<>();
// 用來(lái)存在線連接數(shù)
private static final Map<String, Session> sessionPool =
new HashMap<String, Session>();
/**
* 連接成功調(diào)用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam(value = "userId")
String userId) {
try {
this.session = session;
webSockets.add(this);
sessionPool.put(userId, session);
}
catch (Exception e) {
}
}
/**
* 收到客戶(hù)端消息后調(diào)用的方法
*/
@OnMessage
public void onMessage(String message) {
log.info("websocket消息: 收到客戶(hù)端消息:" + message);
}
public void sendOneMessage(String userId, String message) {
Session session = sessionPool.get(userId);
if (session != null && session.isOpen()) {
try {
log.info("服務(wù)端推送消息:" + message);
session.getAsyncRemote().sendText(message);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}
進(jìn)行注冊(cè):
@Configuration
public class WebSocketConfigOne {
/**
* 這個(gè)bean會(huì)自動(dòng)注冊(cè)使用了@ServerEndpoint注解聲明的對(duì)象
* 沒(méi)有的話會(huì)報(bào)404
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
推送消息的控制器:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.HashMap;
import java.util.Map;
@Controller
@RequestMapping("/pushWebSocket")
public class WebSocketController {
@Autowired
private WebSocketServer webSocketServer;
@GetMapping("/publish")
@ResponseBody
public Map publish(String userId, String message) {
webSocketServer.sendOneMessage(userId, message);
HashMap<String, Object> map = new HashMap<>();
map.put("code", 200);
return map;
}
}
還有我的配置文件application.properties:
# web port
server.port=8080
server.servlet.context-path=/bootdemo
運(yùn)行啟動(dòng)類(lèi)后,訪問(wèn)html(localhost:8080/bootdemo/index.html)如下:

有的同學(xué)一思索,點(diǎn)擊圖中的第2個(gè)按鈕"服務(wù)端發(fā)送瀏覽器",你這好像也是前端先請(qǐng)求,再推送的消息;我們的WebSocketController#publish方法,在真實(shí)的場(chǎng)景下,可以在后端的定時(shí)任務(wù)中、消息中間件的消費(fèi)者端調(diào)用,不用前端先發(fā)送請(qǐng)求。
當(dāng)然SpringBoot有專(zhuān)門(mén)構(gòu)建WebSocket服務(wù)端的方式。
核心配置類(lèi):
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.HandshakeInterceptor;
import javax.servlet.http.HttpServletRequest;
import java.util.Map;
@Configuration
@EnableWebSocket
@Slf4j
public class WebSocketConfig1 implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry
registry) {
registry.addHandler(new MyWebSocketHandler(), "/webSocket/{userId}")//設(shè)置連接路徑和處理
.setAllowedOrigins("*")
.addInterceptors(new MyWebSocketInterceptor());//設(shè)置攔截器
}
class MyWebSocketInterceptor implements HandshakeInterceptor {
//前置攔截一般用來(lái)注冊(cè)用戶(hù)信息,綁定 WebSocketSession
@Override
public boolean beforeHandshake(ServerHttpRequest request,
ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
log.info("前置攔截~~");
if (!(request instanceof ServletServerHttpRequest)) {
return true;
}
HttpServletRequest servletRequest =
((ServletServerHttpRequest)request).getServletRequest();
Map map = (Map)servletRequest.getAttribute(HandlerMapping.
URI_TEMPLATE_VARIABLES_ATTRIBUTE);
String userId = (String)map.get("userId");
attributes.put("userId", userId);
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request,
ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
log.info("后置攔截~~");
}
}
}核心處理器:
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class MyWebSocketHandler implements WebSocketHandler {
private static final Map<String, WebSocketSession> SESSIONS =
new ConcurrentHashMap<>();
/**
* 建立新的socket連接后回調(diào)的方法
*/
@Override
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
String userId = (String) session.getAttributes().get("userId");
SESSIONS.put(userId, session);
}
/**
* 接收到瀏覽器端的消息后回調(diào)的方法
*/
@Override
public void handleMessage(WebSocketSession session,
WebSocketMessage<?> message) throws Exception {
String msg = message.getPayload().toString();
log.info("收到客戶(hù)端消息:" + msg);
}
/**
* 連接出錯(cuò)時(shí)回調(diào)的方法
*/
@Override
public void handleTransportError(WebSocketSession session,
Throwable exception) throws Exception {
log.info("連接出錯(cuò)");
if (session.isOpen()) {
session.close();
}
}
/**
* 連接關(guān)閉時(shí)回調(diào)的方法
*/
@Override
public void afterConnectionClosed(WebSocketSession session,
CloseStatus closeStatus) throws Exception {
log.info("連接關(guān)閉:status:" + closeStatus);
}
/**
* 是否處理部分消息,返回false就行
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
/**
* 推送消息給瀏覽器端
*/
public void sendMessage(String userId, String message) {
WebSocketSession webSocketSession = SESSIONS.get(userId);
if (webSocketSession == null || !webSocketSession.isOpen()) {
return;
}
try {
webSocketSession.sendMessage(new TextMessage(message));
}
catch (Exception ex) {
log.error("推送消息異常:" + ex);
}
}
}控制器也改造下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.HashMap;
import java.util.Map;
@Controller
@RequestMapping("/pushWebSocket")
public class WebSocketController {
@Autowired
private MyWebSocketHandler handler;
@GetMapping("/publish")
@ResponseBody
public Map publish(String userId, String message) {
handler.sendMessage(userId, message);
HashMap<String, Object> map = new HashMap<>();
map.put("code", 200);
return map;
}
}
前端部分不用做修改,和之前一樣的代碼。
到此這篇關(guān)于Java應(yīng)用層協(xié)議WebSocket實(shí)現(xiàn)消息推送的文章就介紹到這了,更多相關(guān)Java WebSocket內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java實(shí)現(xiàn)使用Websocket發(fā)送消息詳細(xì)代碼舉例
- 如何在Java中使用WebSocket協(xié)議
- springboot整合websocket后啟動(dòng)報(bào)錯(cuò)(javax.websocket.server.ServerContainer not available)
- Java實(shí)現(xiàn)WebSocket四個(gè)步驟
- java中Websocket的使用方法例子
- java基于websocket實(shí)現(xiàn)im聊天功能
- Java?spring?MVC環(huán)境中實(shí)現(xiàn)WebSocket的示例代碼
- Java中實(shí)現(xiàn)WebSocket方法詳解
- 教你如何使用Java實(shí)現(xiàn)WebSocket
- 一步步教你如何使用Java實(shí)現(xiàn)WebSocket
- java?WebSocket?服務(wù)端實(shí)現(xiàn)代碼
- Java中使用WebSocket的幾種方式
相關(guān)文章
如何使用IDEA查看java文件編譯后的字節(jié)碼內(nèi)容
這篇文章主要介紹了如何使用IDEA查看java文件編譯后的字節(jié)碼內(nèi)容,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
Java并發(fā)編程ReentrantReadWriteLock加讀鎖流程
這篇文章主要介紹了Java并發(fā)編程ReentrantReadWriteLock加讀鎖流程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05
Java如何利用Socket進(jìn)行數(shù)據(jù)讀寫(xiě)
這篇文章主要介紹了Java如何利用Socket進(jìn)行數(shù)據(jù)讀寫(xiě),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10
Spring Boot實(shí)現(xiàn)郵件服務(wù)(附:常見(jiàn)郵箱的配置)
這篇文章主要給大家介紹了關(guān)于Spring Boot實(shí)現(xiàn)郵件服務(wù)的相關(guān)資料,文中還附上了常見(jiàn)郵箱的配置,通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-12-12
Spring中@RabbitHandler和@RabbitListener的區(qū)別詳析
@RabbitHandler是用于處理消息的方法注解,它與@RabbitListener注解一起使用,這篇文章主要給大家介紹了關(guān)于Spring中@RabbitHandler和@RabbitListener區(qū)別的相關(guān)資料,需要的朋友可以參考下2024-02-02
Springboot使用RestTemplate調(diào)用第三方接口的操作代碼
這篇文章主要介紹了Springboot使用RestTemplate調(diào)用第三方接口,我只演示了最常使用的請(qǐng)求方式get、post的簡(jiǎn)單使用方法,當(dāng)然RestTemplate的功能還有很多,感興趣的朋友可以參考RestTemplate源碼2022-12-12

