Netty搭建WebSocket服務器實戰(zhàn)教程
更新時間:2024年03月14日 09:57:51 作者:別怕我只是一只羊~
這篇文章主要介紹了Netty搭建WebSocket服務器實戰(zhàn),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
項目結構:
引入jar包:
<!-- netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.77.Final</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
application.yml配置
netty: server: host: 127.0.0.1 port: 27001 use-epoll: false
配置類
@Configuration @ConfigurationProperties(prefix = ServerProperties.PREFIX) @Data public class ServerProperties { public static final String PREFIX = "netty.server"; /** * 服務器ip */ private String ip; /** * 服務器端口 */ private Integer port; /** * 傳輸模式linux上開啟會有更高的性能 */ private boolean useEpoll; }
業(yè)務類:
@Data @Accessors(chain = true) public class Policy implements Serializable { private static final long serialVersionUID = 6816331623389002880L; private Integer fileLevel; private Integer validTime; }
/** * 服務端 到 客戶端 */ @Data @Accessors(chain = true) public class RequestDTO implements Serializable { private static final long serialVersionUID = 4284674560985442616L; /** * 標識 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String request; @JsonInclude(JsonInclude.Include.NON_EMPTY) private String response; /** * 認證碼 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String key; /** * 返回結果 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String result; /** * 狀態(tài)碼 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer status; /** * 人員列表 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private List<Person> persons; /** * 發(fā)送該命令的賬戶名 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String handler; /** * 發(fā)送該命令的賬戶ID */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer handlerId; /** * 艙門下標 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer index; /** * 單位 編號 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer departmentId; /** * 單位名稱 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private String departmentName; /** * 修改柜門參數 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private List<CabinetInfoVO> data; /** * 人員對應的開門命令 */ @JsonInclude(JsonInclude.Include.NON_EMPTY) private List<Command> command; /** * 根據設備編號查詢設備信息 */ // 設備編號 @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer deviceId; // 設備編號 @JsonInclude(JsonInclude.Include.NON_EMPTY) private String deviceNumber; @JsonInclude(JsonInclude.Include.NON_EMPTY) private Integer personId; // 人員編號 @JsonInclude(JsonInclude.Include.NON_EMPTY) private String personNumber; // 人員姓名 @JsonInclude(JsonInclude.Include.NON_EMPTY) private String personName; // 所屬單位 ID // 所屬單位名稱 // 文件列表 private String[] fileNames; /** * 刪除過期文件 */ private Long fileId; private String fileName; private String localPath; }
/** * 客戶端 到 服務端 */ @Data @Accessors(chain = true) public class ResponseDTO implements Serializable { private static final long serialVersionUID = -7674457360121804081L; /** * 標識 */ private String request; private String response; /** * 認證結果值 */ private Integer value; /** * 響應狀態(tài)碼 */ private Integer status; private Integer index; /** * 客戶端ID */ private Long id; /** * 客戶端IP地址 */ private String ip; /** * 客戶端版本號 */ private String version; /** * 裝備柜行數 */ private Integer rows; /** * 裝備柜列數 */ private Integer cols; /** * 各柜門狀態(tài) */ private String lockStatus; /** * 柜內物品名稱 */ private String boxName; /** * 日志內容 */ private String content; /** * 操作類型 * 1.人員主動解鎖 * 2.管理員利用賬號權限強制解鎖 */ private Integer handlerType; /** * type 為1時表示人員編號,為2時表示管理員賬號用戶名 */ private String handler; /** * 命令所屬人員 */ private Integer personId; /** * 命令主鍵編號 */ private Integer commandId; /** * 設備編號 */ private String deviceNumber; /** * 文件列表 */ private String[] fileNames; /** * insertFile */ private Integer deviceId; private String fileName; private String size; private Integer type; private String createTime; private String copyTime; /** * 設備編號(缺?。? */ // 人員編號 private String personNumber; private String personName; private Integer departmentId; private String departmentName; private Integer duration; private Integer width; private Integer height; private Boolean autoImportant; private String localPath; private String devicePath; private Integer collectionId; /** * 刪除文件 */ private Long fileId; private List<Policy> policy; public String getRequest() { if (ObjectUtils.isEmpty(request)) { return null; } return request; } public String getResponse() { if (ObjectUtils.isEmpty(response)) { return null; } return response; } public Integer getValue() { if (ObjectUtils.isEmpty(value)) { return null; } return value; } public Integer getStatus() { if (ObjectUtils.isEmpty(status)) { return null; } return status; } public Long getId() { if (ObjectUtils.isEmpty(id)) { return null; } return id; } public String getIp() { if (ObjectUtils.isEmpty(ip)) { return null; } return ip; } public String getVersion() { if (ObjectUtils.isEmpty(version)) { return null; } return version; } public Integer getRows() { if (ObjectUtils.isEmpty(rows)) { return null; } return rows; } public Integer getCols() { if (ObjectUtils.isEmpty(cols)) { return null; } return cols; } public String getLockStatus() { if (ObjectUtils.isEmpty(lockStatus)) { return null; } return lockStatus; } public String getContent() { if (ObjectUtils.isEmpty(content)) { return null; } return content; } public Integer getHandlerType() { if (ObjectUtils.isEmpty(handlerType)) { return null; } return handlerType; } public String getHandler() { if (ObjectUtils.isEmpty(handler)) { return null; } return handler; } public String getBoxName() { if (ObjectUtils.isEmpty(boxName)) { return null; } return boxName; } public Integer getPersonId() { if (ObjectUtils.isEmpty(personId)) { return null; } return personId; } public Integer getCommandId() { if (ObjectUtils.isEmpty(commandId)) { return null; } return commandId; } public String getDeviceNumber() { if (ObjectUtils.isEmpty(deviceNumber)) { return null; } return deviceNumber; } public String[] getFileNames() { if (ObjectUtils.isEmpty(fileNames) || fileNames.length == 0) { return null; } return fileNames; } public String getFileName() { if (ObjectUtils.isEmpty(fileName)) { return null; } return fileName; } public String getSize() { if (ObjectUtils.isEmpty(size)) { return null; } return size; } public Integer getType() { if (ObjectUtils.isEmpty(type)) { return null; } return type; } public String getCreateTime() { if (ObjectUtils.isEmpty(createTime)) { return null; } return createTime; } public String getCopyTime() { if (ObjectUtils.isEmpty(copyTime)) { return null; } return copyTime; } public String getPersonNumber() { if (ObjectUtils.isEmpty(personNumber)) { return null; } return personNumber; } public String getPersonName() { if (ObjectUtils.isEmpty(personName)) { return null; } return personName; } public Integer getDepartmentId() { if (ObjectUtils.isEmpty(departmentId)) { return null; } return departmentId; } public String getDepartmentName() { if (ObjectUtils.isEmpty(departmentName)) { return null; } return departmentName; } public Integer getDuration() { if (ObjectUtils.isEmpty(duration)) { return null; } return duration; } public Integer getWidth() { if (ObjectUtils.isEmpty(width)) { return null; } return width; } public Integer getHeight() { if (ObjectUtils.isEmpty(height)) { return null; } return height; } public Boolean isAutoImportant() { if (ObjectUtils.isEmpty(autoImportant)) { return null; } return autoImportant; } public String getLocalPath() { if (ObjectUtils.isEmpty(localPath)) { return null; } return localPath; } public String getDevicePath() { if (ObjectUtils.isEmpty(devicePath)) { return null; } return devicePath; } public Integer getCollectionId() { if (ObjectUtils.isEmpty(collectionId)) { return null; } return collectionId; } public Boolean getAutoImportant() { return autoImportant; } }
客戶端信息類:
@Data @Accessors(chain = true) public class SocketSession implements Serializable { private static final long serialVersionUID = 7585070255615177561L; private Channel channel; private long cabinetId; //智能柜唯一ID private int rows; private int cols; private Map<Integer, Boolean> lockStatusMap;//智能柜門鎖狀態(tài) private Map<Integer, String> boxNameMap; //智能柜柜內物品名稱 private long authenticationKey; //socket連接認證私有密鑰 private long lastHeartbeatTime; //最后一次心跳時間 }
管理WebSocket握手會話
/** * 管理webSocket 握手會話 */ public class WebSocketSession { private final static HashMap<ChannelId, WebSocketServerHandshaker> CHANNEL_SHAKER = new HashMap<>(); /** * 添加 */ public static void setChannelShaker(ChannelId channelId, WebSocketServerHandshaker handShaker) { CHANNEL_SHAKER.put(channelId, handShaker); } /** * 獲取 */ public static WebSocketServerHandshaker getChannelShaker(ChannelId channelId) { return CHANNEL_SHAKER.get(channelId); } /** * 釋放 */ public static void clear(ChannelId channelId) { CHANNEL_SHAKER.remove(channelId); } }
客戶端連接時認證方法
/** * 計算當前請求時間 * * @param key 時間戳 * @param authentication 根據key計算的結果 * @return */ public static boolean verify(long key, int authentication) { byte[] time_bytes = ByteBuffer.allocate(Long.BYTES / Byte.BYTES).putLong(key).array(); int count = 0; boolean even = time_bytes[time_bytes.length - 1] % 2 == 0; for (int i = 0; i < time_bytes.length; i++) { int val = time_bytes[i] & 0xFF; if (even) { count += (val << 1); } else { count += (val << 2); } } return count == authentication; }
客戶端發(fā)送請求的所有操作工具類:
@Component @Slf4j public class OperateUtil { private final CollectionService collectionService; private final CollectionMapper collectionMapper; private final PersonMapper personMapper; private final LocklogService locklogService; private final CommandMapper commandMapper; private final DepartmentMapper departmentMapper; private final DeviceMapper deviceMapper; private final FileMapper fileMapper; private final AllProperties allProperties; private final ServerMapper serverMapper; public OperateUtil( CollectionService collectionService, PersonMapper personMapper, LocklogService locklogService, CollectionMapper collectionMapper, CommandMapper commandMapper, DepartmentMapper departmentMapper, DeviceMapper deviceMapper, FileMapper fileMapper, AllProperties allProperties, ServerMapper serverMapper ) { this.collectionService = collectionService; this.personMapper = personMapper; this.locklogService = locklogService; this.collectionMapper = collectionMapper; this.commandMapper = commandMapper; this.departmentMapper = departmentMapper; this.deviceMapper = deviceMapper; this.fileMapper = fileMapper; this.allProperties = allProperties; this.serverMapper = serverMapper; } public String ManyOperate(ResponseDTO responseDTO, ChannelHandlerContext ctx) { log.info("請求對象:{}", responseDTO); String msgHandler = "請求標識為空"; if (!ObjectUtils.isEmpty(responseDTO) && !ObjectUtils.isEmpty(responseDTO.getRequest())) { // 客戶端請求,服務端響應 return msgHandler(responseDTO, responseDTO.getRequest(), ctx); } if (!ObjectUtils.isEmpty(responseDTO) && !ObjectUtils.isEmpty(responseDTO.getResponse())) { // 服務端請求,客戶端響應 return msgHandler(responseDTO, responseDTO.getResponse(), ctx); } return msgHandler; } /** * 操作分發(fā) * @param responseDTO * @param msg * @return */ private String msgHandler(ResponseDTO responseDTO, String msg, ChannelHandlerContext ctx) { switch (msg) { case "authenticate": return checkAuthenticate(responseDTO, ctx); case "login": return getCollectionInfo(responseDTO, ctx); case "heartbeat": return getHeartbeat(ctx); case "lockStatus": getLockStatus(responseDTO, ctx); return ""; case "personInfo": return getPersonInfo(responseDTO); case "unlockLog": return addLockLog(responseDTO); case "authenticationResult": return null; case "getOpenDoorCommand": return getDoorTask(responseDTO); case "completeOpenDoorCommand": return reciveDoor(responseDTO); case "deviceInfo": return getDvinfoByDvno(responseDTO); case "fileExist": return getFileExist(responseDTO); case "insertFile": return saveFile(responseDTO); case "getOverdueFile": return getOverdueFile(responseDTO); case "deletedOverdueFile": return deleteOverdueFile(responseDTO); default: return null; } } /** * 刪除過期文件 * @param responseDTO * @return */ private String deleteOverdueFile(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO)) { log.info("刪除文件參數為空"); requestDTO.setResponse("deletedOverdueFile") .setStatus(400); return JSON.toJSONString(requestDTO); } Boolean deleteFile = fileMapper.deleteFile(responseDTO.getFileId()); if (deleteFile) { requestDTO.setResponse("deletedOverdueFile") .setStatus(200); return JSON.toJSONString(requestDTO); } requestDTO.setResponse("deletedOverdueFile") .setStatus(201); return JSON.toJSONString(requestDTO); } /** * 查詢過期文件 * @param responseDTO * @return */ private String getOverdueFile(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO) || ObjectUtils.isEmpty(responseDTO.getCollectionId())) { log.info("查詢過期參數為空"); return null; } // 當前時間的實例 Calendar now = Calendar.getInstance(); now.setTime(new Date()); for (Policy policy : responseDTO.getPolicy()) { // 過期時間 now.set(Calendar.DATE, now.get(Calendar.DATE) - policy.getValidTime()); // 查詢過期文件 File overdueFile = fileMapper.getOverdueFile(policy.getFileLevel(), now.getTime()); if (!ObjectUtils.isEmpty(overdueFile)) { // 查詢到文件,填充返回,中斷循環(huán) requestDTO.setFileId(overdueFile.getFileId()) .setFileName(overdueFile.getFileName()) .setLocalPath(overdueFile.getFileLocalPath()); requestDTO.setResponse("getOverdueFile") .setStatus(200); return JSON.toJSONString(requestDTO); } } requestDTO.setResponse("getOverdueFile") .setStatus(201); return JSON.toJSONString(requestDTO); } /** * 存儲文件 * @param responseDTO * @return */ private String saveFile(ResponseDTO responseDTO) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO)) { log.info("上報文件參數為空"); return null; } Server serverBySerialCode = serverMapper.getServerBySerialCode(allProperties.getSerialCode()); File file = new File(); Optional<ResponseDTO> optionalResponseDTO = Optional.ofNullable(responseDTO); file.setFileName(optionalResponseDTO.map(ResponseDTO::getFileName).orElseThrow()) .setFileSize(Long.parseLong(responseDTO.getSize())) .setFileType(responseDTO.getType()) .setFileDeviceId(responseDTO.getDeviceId()) .setFileDeviceNumber(responseDTO.getDeviceNumber()) .setFilePersonNumber(responseDTO.getPersonNumber()) .setFilePersonId(responseDTO.getPersonId()) .setFilePersonName(responseDTO.getPersonName()) .setFileDepartmentId(responseDTO.getDepartmentId()) .setFileDepartmentName(responseDTO.getDepartmentName()) .setFileDuration(responseDTO.getDuration()) .setFileWidth(responseDTO.getWidth()) .setFileHeight(responseDTO.getHeight()) .setFileAutoImportant(!responseDTO.getAutoImportant() ? 0 : 1) .setFileLocalPath(responseDTO.getLocalPath()) .setFileDevicePath(responseDTO.getDevicePath()) .setFileCollectionId(responseDTO.getCollectionId()) .setFileNeedUpload(1) .setFileManualImportant(0) .setFileServerId(Optional.ofNullable(serverBySerialCode).map(Server::getServerId).orElse(0)) .setFileDeleteCollection(0) .setFileDeleteServer(0) .setFileLock(0); try { file.setFileCreateTime(sdf.parse(responseDTO.getCreateTime())) .setFileCopyTime(sdf.parse(responseDTO.getCopyTime())); } catch (ParseException e) { throw new RuntimeException(e); } boolean insertFile = fileMapper.insertFile(file); if (insertFile) { requestDTO.setResponse("insertFile") .setStatus(200) .setIndex(responseDTO.getIndex()); return JSON.toJSONString(requestDTO); } requestDTO.setResponse("insertFile") .setStatus(-1) .setIndex(responseDTO.getIndex()); return JSON.toJSONString(requestDTO); } /** * 查詢文件是否存在 * @param responseDTO * @return */ private String getFileExist(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO) || ObjectUtils.isEmpty(responseDTO.getFileNames())) { log.info("文件名稱列表 參數為空"); return null; } // 返回給客戶端的文件名集合 List<String> result = new ArrayList<>(); // 逐一檢查文件名 for (String fileName : responseDTO.getFileNames()) { File fileByName = fileMapper.getFileByName(fileName); if (ObjectUtils.isEmpty(fileByName)) { // 未查詢到數據,將文件名返回給客戶端 result.add(fileName); } } String[] file = result.toArray(new String[result.size()]); requestDTO.setResponse("fileExist") .setFileNames(file) .setIndex(responseDTO.getIndex()); return JSONObject.toJSONString(requestDTO); } private String getDvinfoByDvno(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (ObjectUtils.isEmpty(responseDTO) || ObjectUtils.isEmpty(responseDTO.getDeviceNumber())) { log.info("設備編號 參數為空"); return null; } // 查詢數據庫 Device deviceByDeviceNum = deviceMapper.getDeviceByDeviceNum(responseDTO.getDeviceNumber()); if (ObjectUtils.isEmpty(deviceByDeviceNum)) { requestDTO.setResponse("deviceInfo") .setStatus(201); return JSONObject.toJSONString(requestDTO); } Optional<Device> optionalDevice = Optional.ofNullable(deviceByDeviceNum); // 拼接參數 requestDTO.setResponse("deviceInfo") .setStatus(200) .setIndex(responseDTO.getIndex()) .setDeviceId(optionalDevice.map(Device::getDeviceId).orElse(0)) .setDeviceNumber(optionalDevice.map(Device::getDeviceNumber).orElse("")) .setPersonId(optionalDevice.map(Device::getPerson).map(Person::getPersonId).orElse(0)) .setPersonNumber( optionalDevice.map(Device::getPerson).map(Person::getPersonNumber).orElse("")) .setPersonName(optionalDevice.map(Device::getPerson).map(Person::getPersonName).orElse("")) .setDepartmentId(optionalDevice.map(Device::getDepartment).map(Department::getDepartmentId) .orElse(0)) .setDepartmentName( optionalDevice.map(Device::getDepartment).map(Department::getDepartmentName) .orElse("")); return JSONObject.toJSONString(requestDTO); } /** * 請求認證方法 * @param responseDTO * @return */ private String checkAuthenticate(ResponseDTO responseDTO, ChannelHandlerContext ctx) { RequestDTO requestDTO = new RequestDTO(); assert responseDTO != null; if (ObjectUtils.isEmpty(responseDTO.getValue())) { log.error("value值為空"); return null; } for (SocketSession socketSession : WebsocketMessageHandler.AllSocket) { if (ctx.channel() == socketSession.getChannel()) { long key = socketSession.getAuthenticationKey(); boolean verify = MyEncrypt.verify(key, responseDTO.getValue()); if (verify) { log.info("認證成功,返回給客戶端"); requestDTO.setRequest("authenticationResult") .setResult("OK"); return JSONObject.toJSONString(requestDTO); } } } log.info("認證失敗,返回給客戶端"); requestDTO.setRequest("authenticationResult") .setResult("認證失敗"); return JSONObject.toJSONString(requestDTO); } /** * 上報采集站信息 * @param responseDTO * @return */ private String getCollectionInfo(ResponseDTO responseDTO, ChannelHandlerContext ctx) { log.info("收到登錄請求,裝備柜信息:{}", responseDTO); RequestDTO requestDTO = new RequestDTO(); // 工作站 CollectionDTO collectionDTO = new CollectionDTO(); if (ObjectUtils.isEmpty(responseDTO) || ObjectUtils.isEmpty( responseDTO.getId()) || ObjectUtils.isEmpty(responseDTO.getLockStatus())) { log.info("由于請求內容是空的 或者 裝備柜ID 或者 鎖狀態(tài) 是空的---處理請求結束"); return null; } // 將各柜門狀態(tài)緩存下來 getStatus(responseDTO, ctx); // 先根據 ID 查詢采集站是否 存在 // 如果 采集站 存在 ,則 更新采集站信息,否則 添加采集站 // 工作站ID collectionDTO.setCollectionId(responseDTO.getId().toString()); // 查詢到采集站信息 List<Collection> collectionList = collectionMapper.getCollection( new Collection().setCollectionId(responseDTO.getId())); // 先將收到的信息set 進去 collectionDTO.setCollectionIp(responseDTO.getIp()) .setCollectionVersion(responseDTO.getVersion()); if (collectionList.size() > 0) { // 存在采集站,則更新 log.info("list:{}", collectionList); CommonResult commonResult = collectionService.updateCollection(collectionDTO); if (commonResult.getCode() == 200) { // 更新成功 if (!ObjectUtils.isEmpty(collectionList.get(0).getDepartment())) { // 如果采集站對應單位信息不為空 requestDTO.setDepartmentId(collectionList.get(0).getDepartment().getDepartmentId()) .setDepartmentName(collectionList.get(0).getDepartment().getDepartmentName()); } requestDTO.setResponse("login") .setStatus(200); return JSONObject.toJSONString(requestDTO); } } // 不存在采集站,則添加 CommonResult commonResult = null; try { collectionDTO.setCollectionDepartmentId("0"); commonResult = collectionService.addCollection(collectionDTO); } catch (ParseException e) { throw new RuntimeException(e); } if (commonResult.getCode() == 200) { requestDTO.setResponse("login") .setStatus(200); return JSONObject.toJSONString(requestDTO); } requestDTO.setResponse("login") .setStatus(-1); return JSONObject.toJSONString(requestDTO); } /** * 心跳請求 * @return */ private String getHeartbeat(ChannelHandlerContext ctx) { log.info("收到心跳請求:{}", ctx.channel().id()); HeartCache.getInstance().putValue(ctx.channel(), System.currentTimeMillis(), 2 * 60); RequestDTO requestDTO = new RequestDTO(); requestDTO.setResponse("heartbeat") .setStatus(200); return JSONObject.toJSONString(requestDTO); } /** * 獲取各柜門狀態(tài) * @param responseDTO */ private void getLockStatus(ResponseDTO responseDTO, ChannelHandlerContext ctx) { // 獲取各柜門狀態(tài) getStatus(responseDTO, ctx); } private Boolean strToBool(String status) { if ("1".equals(status)) { return true; } return false; } /** * 獲取各柜門狀態(tài) * @param responseDTO * @param ctx */ private void getStatus(ResponseDTO responseDTO, ChannelHandlerContext ctx) { log.info("將 裝備柜信息 緩存下來"); if (ObjectUtils.isEmpty(responseDTO.getLockStatus()) || ObjectUtils.isEmpty(responseDTO.getBoxName())) { log.info("由于 鎖狀態(tài) 或者 柜內物品名稱 是空的---則只將 工作站ID 記錄到 內存中"); for (SocketSession socketSession : WebsocketMessageHandler.AllSocket) { if (socketSession.getChannel() == ctx.channel()) { socketSession.setCabinetId(responseDTO.getId()); } break; } return; } // 狀態(tài)數組 String[] status = responseDTO.getLockStatus().split(",", -1); log.info("鎖狀態(tài)數組長度:{}", status.length); // 柜內物品名稱 String[] name = responseDTO.getBoxName().split(",", -1); log.info("柜內物品名稱數組長度:{}", name.length); // 先找到 此通道 對應的socket for (SocketSession socketSession : WebsocketMessageHandler.AllSocket) { if (socketSession.getChannel() == ctx.channel()) { // 鎖狀態(tài)集合 Map<Integer, Boolean> statusMap = new HashMap<>(); // 物品名稱集合 Map<Integer, String> boxMap = new HashMap<>(); for (int i = 0; i < status.length; i++) { statusMap.put(i + 1, strToBool(status[i])); boxMap.put(i + 1, name[i]); } if (!ObjectUtils.isEmpty(responseDTO.getId())) { socketSession.setCabinetId(responseDTO.getId()); } socketSession.setRows(responseDTO.getRows()) .setCols(responseDTO.getCols()) .setLockStatusMap(statusMap) .setBoxNameMap(boxMap); break; } } } /** * 獲取人員信息 * @param responseDTO * @return */ private String getPersonInfo(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); List<Person> allPerson = personMapper.getAllPerson(); requestDTO.setResponse("personInfo") .setStatus(200) .setPersons(allPerson); log.info("人員對象:{}", requestDTO); log.info("人員對象轉JSON:{}", JSONObject.toJSONString(requestDTO)); return JSONObject.toJSONString(requestDTO); } /** * 開鎖日志 * @param responseDTO * @return */ private String addLockLog(ResponseDTO responseDTO) { RequestDTO requestDTO = new RequestDTO(); if (!ObjectUtils.isEmpty(responseDTO) && !ObjectUtils.isEmpty(responseDTO.getHandlerType()) && !ObjectUtils.isEmpty(responseDTO.getHandler()) && !ObjectUtils.isEmpty(responseDTO.getContent())) { LocklogDTO locklogDTO = new LocklogDTO(); locklogDTO.setLocklogHandlerType(responseDTO.getHandlerType().toString()) .setLocklogHandler(responseDTO.getHandler()) .setLocklogContent(responseDTO.getContent()); CommonResult commonResult = locklogService.addLockLog(locklogDTO); if (commonResult.getCode() == 200) { requestDTO.setResponse("unlockLog") .setStatus(200); return JSONObject.toJSONString(requestDTO); } } log.info("開鎖日志為空 或者 操作類型為空 或者 操作人員為空 或者 操作內容為空 -- 處理結束"); requestDTO.setResponse("unlockLog") .setStatus(-1); return JSONObject.toJSONString(requestDTO); } /** * 人員開門查詢 * @param responseDTO * @return */ private String getDoorTask(ResponseDTO responseDTO) { if (ObjectUtils.isEmpty(responseDTO.getPersonId())) { log.info("人員編號為空---開門任務處理結束"); return null; } RequestDTO requestDTO = new RequestDTO(); Command command = new Command(); command.setCommandPersonId(responseDTO.getPersonId()); List<Command> commandList = commandMapper.getCommandByPersonId(responseDTO.getPersonId(), new Date()); if (commandList.size() > 0) { requestDTO.setResponse("getOpenDoorCommand") .setStatus(200) .setCommand(commandList); return JSONObject.toJSONString(requestDTO); } requestDTO.setResponse("getOpenDoorCommand") .setStatus(200); return JSONObject.toJSONString(requestDTO); } private String reciveDoor(ResponseDTO responseDTO) { if (ObjectUtils.isEmpty(responseDTO.getCommandId())) { log.info("命令主鍵編號為空---結束處理"); return null; } RequestDTO requestDTO = new RequestDTO(); Command command = new Command(); command.setCommandId(responseDTO.getCommandId()) .setCommandComplete(true) .setCommandCompleteTime(new Date()); Integer updateCommand = commandMapper.updateCommand(command); if (updateCommand > 0) { requestDTO.setRequest("completeOpenDoorCommand") .setStatus(200); return JSONObject.toJSONString(requestDTO); } requestDTO.setRequest("completeOpenDoorCommand") .setStatus(-1); return JSONObject.toJSONString(requestDTO); } }
心跳
public class CacheEntity implements Serializable { private static final long serialVersionUID = 3055325810872798183L; private Object value; /** * 保存的時間戳 */ private long gmtModify; /** * 過期時間 */ private int expire; public Object getValue() { if (ObjectUtils.isEmpty(value)) { return null; } return value; } public void setValue(Object value) { this.value = value; } public long getGmtModify() { return gmtModify; } public void setGmtModify(long gmtModify) { this.gmtModify = gmtModify; } public int getExpire() { return expire; } public void setExpire(int expire) { this.expire = expire; } public CacheEntity(Object value, long gmtModify, int expire) { super(); this.value = value; this.gmtModify = gmtModify; this.expire = expire; } }
@Slf4j public class HeartCache { private static final int DEFAULT_CAPACITY = 512; /** * 最大容量 */ private static final int MAX_CAPACITY = 100000; /** * 刷新緩存的頻率 */ private static final int MONITOR_DURATION = 2; // 啟動監(jiān)控線程 static { new Thread(new TimeoutTimerThread()).start(); } // 內部類方式實現單例 private static class HeartCacheInstance { private static final HeartCache INSTANCE = new HeartCache(); } public static HeartCache getInstance() { return HeartCache.HeartCacheInstance.INSTANCE; } private HeartCache() { } /** * 使用默認容量創(chuàng)建一個Map */ private static Map<Channel, CacheEntity> heartExpire = new ConcurrentHashMap<>(DEFAULT_CAPACITY); /** * 將key-value保存到本地緩存并制定該緩存的過期時間 * @param key * @param value * @param expireTime 過期時間,如果是-1 則表示永不過期 * @return * @param <T> */ public <T> boolean putValue(Channel key, T value, int expireTime) { return putCloneValue(key, value, expireTime); } /** * 將值通過序列化clone 處理后保存到緩存中,可以解決值引用的問題 * @param key * @param value * @param expireTime * @return * @param <T> */ private <T> boolean putCloneValue(Channel key, T value, int expireTime) { try { if (heartExpire.size() >= MAX_CAPACITY) { return false; } // 序列化賦值 CacheEntity entityClone = clone(new CacheEntity(value, System.nanoTime(), expireTime)); heartExpire.put(key, entityClone); return true; } catch (Exception e) { log.error("添加緩存失?。簕}", e.getMessage()); } return false; } /** * 序列化 克隆處理 * @param object * @return * @param <E> */ private <E extends Serializable> E clone(E object) { E cloneObject = null; try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(object); oos.close(); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); ObjectInputStream ois = new ObjectInputStream(bais); cloneObject = (E) ois.readObject(); ois.close(); } catch (Exception e) { log.error("緩存序列化失敗:{}", e.getMessage()); } return cloneObject; } /** * 從本地緩存中獲取key對應的值,如果該值不存則則返回null * @param key * @return */ public Object getValue(Channel key) { if (CollectionUtils.isEmpty(heartExpire)) { return null; } CacheEntity cacheEntity = heartExpire.get(key); if (ObjectUtils.isEmpty(cacheEntity)) { return null; } return cacheEntity.getValue(); } public void remove(Channel key) { if (CollectionUtils.isEmpty(heartExpire)) { return; } CacheEntity cacheEntity = heartExpire.get(key); if (ObjectUtils.isEmpty(cacheEntity)) { return; } heartExpire.remove(key); } public Integer count() { return heartExpire.size(); } /** * 清空所有 */ public void clear() { heartExpire.clear(); } /** * 過期處理線程 */ static class TimeoutTimerThread implements Runnable { @Override public void run() { while (true) { try { TimeUnit.SECONDS.sleep(MONITOR_DURATION); checkTime(); } catch (Exception e) { log.error("過期緩存清理失?。簕}", e.getMessage()); } } } /** * 過期緩存的具體處理方法 * * @throws Exception */ private void checkTime() throws Exception { // 開始處理過期 for (Channel key : heartExpire.keySet()) { CacheEntity tce = heartExpire.get(key); long timoutTime = TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - tce.getGmtModify()); // 過期時間 : timoutTime if (tce.getExpire() > timoutTime) { continue; } log.info(" 清除過期緩存 :{}", key); //清除過期緩存和刪除對應的緩存隊列 heartExpire.remove(key); log.info("斷開客戶端連接:{}", key.id()); key.disconnect(); } } } }
消息處理類:
/** * 消息處理,單例啟動 * * @author qiding */ @Slf4j @Component @ChannelHandler.Sharable @RequiredArgsConstructor public class MessageHandler extends SimpleChannelInboundHandler<Object> { private final WebsocketMessageHandler websocketHandler; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { log.info("FullHttpRequest"); FullHttpRequest request = (FullHttpRequest) msg; //處理握手數據 // 首次握手進行校驗 isFullHttpRequest(ctx, request); // 獲取請求uri String uri = request.uri(); // 參數分別是 (ws地址,子協議,是否擴展,最大frame長度) WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory( getWebSocketLocation(request), null, true, 5 * 1024 * 1024); WebSocketServerHandshaker handShaker = factory.newHandshaker(request); if (handShaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handShaker.handshake(ctx.channel(), request); } WebSocketSession.setChannelShaker(ctx.channel().id(), handShaker); //握手成功 連接建立完成 websocketHandler.online(ctx); } else if (msg instanceof PingWebSocketFrame) { log.info("PingWebSocketFrame"); // 處理握手PING/PONG PingWebSocketFrame pingWebSocketFrame = (PingWebSocketFrame) msg; ctx.writeAndFlush(new PongWebSocketFrame(pingWebSocketFrame.content().retain())); } else if (msg instanceof TextWebSocketFrame) { log.info("TextWebSocketFrame"); //處理websocket數據(字符串) websocketHandler.receivedMessage(ctx, (TextWebSocketFrame) msg); } } //客戶端掉線 @Override public void channelInactive(ChannelHandlerContext ctx) { log.info("斷開連接"); /* // 釋放緩存 ChannelStore.closeAndClean(ctx); // 斷開連接,刪除 map GlobalUtil.SESSIONMAP.remove(ctx.channel().id()); WebSocketSession.clear(ctx.channel().id()); */ websocketHandler.offline(ctx); } //新的客戶端 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("成功建立連接,channelId:{}", ctx.channel().id()); super.channelActive(ctx); } //以下是工具類方法 不做具體數據處理 /** * 判斷是否是正確的websocket 握手協議 */ private static void isFullHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) { if (!request.decoderResult().isSuccess()) { log.error("非webSocket請求"); sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.BAD_REQUEST, ctx.alloc().buffer())); ctx.close(); return; } if (!HttpMethod.GET.equals(request.method())) { log.error("非GET請求"); sendResponse(ctx, request, new DefaultFullHttpResponse(request.protocolVersion(), HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer())); ctx.close(); } } /** * SSL支持采用wss: */ private static String getWebSocketLocation(FullHttpRequest request) { return "ws://" + request.headers().get(HttpHeaderNames.HOST) + "/ws"; } private static void sendResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse resp) { HttpResponseStatus status = resp.status(); if (status != HttpResponseStatus.OK) { ByteBufUtil.writeUtf8(resp.content(), status.toString()); HttpUtil.setContentLength(req, resp.content().readableBytes()); } boolean keepAlive = HttpUtil.isKeepAlive(req) && status == HttpResponseStatus.OK; HttpUtil.setKeepAlive(req, keepAlive); ChannelFuture future = ctx.write(resp); if (!keepAlive) { future.addListener(ChannelFutureListener.CLOSE); } } }
/** * Websocket 消息處理器 * * @author qiding */ @Slf4j @Component public class WebsocketMessageHandler { private final OperateUtil operateUtil; public WebsocketMessageHandler(OperateUtil operateUtil) { this.operateUtil = operateUtil; } /** * 存儲所有在線的連接 */ public static final List<SocketSession> AllSocket = new ArrayList<>(); public void online(ChannelHandlerContext ctx) { //1. 立馬發(fā)送驗證請求 告知終端需要驗證 long key = System.currentTimeMillis(); String jsonRequest = "{\"request\":\"authenticate\",\"key\":\"" + key + "\"}"; //new一個緩存對象 保存該連接的信息 SocketSession socketSession = new SocketSession(); socketSession.setAuthenticationKey(key) .setLastHeartbeatTime(System.currentTimeMillis()) .setChannel(ctx.channel()); AllSocket.add(socketSession); // 有socket連接,將連接存入 redis HeartCache.getInstance().putValue(ctx.channel(), System.currentTimeMillis(), 2 * 60); log.info("心跳總共連接數:{}", HeartCache.getInstance().count()); //send TextWebSocketFrame frame = new TextWebSocketFrame(jsonRequest); ctx.writeAndFlush(frame); } public void offline(ChannelHandlerContext ctx) { // 從心跳集合中移除 該連接 HeartCache.getInstance().remove(ctx.channel()); log.info("剩余心跳連接數:{}", HeartCache.getInstance().count()); //1. 從在線列表中移除該連接 for (int i = 0; i < AllSocket.size(); i++) { SocketSession ca = AllSocket.get(i); if (ca.getChannel() == ctx.channel()) { AllSocket.remove(i); break; } } } public void receivedMessage(ChannelHandlerContext ctx, TextWebSocketFrame msg) { //1. 將websocket的字符串消息轉換成Json對象 JSONObject jsonObject = JSONObject.parseObject(msg.text()); if (jsonObject.isEmpty()) { return; } if (msg.text().equals("1") || msg.text().equals("2")) { ctx.writeAndFlush(new TextWebSocketFrame(String.valueOf(HeartCache.getInstance().count()))); } log.info("接收到的消息:{}", msg.text()); ResponseDTO responseDTO = JSON.parseObject(msg.text(), ResponseDTO.class); String operateMsg = operateUtil.ManyOperate(responseDTO, ctx); if (!ObjectUtils.isEmpty(operateMsg)) { TextWebSocketFrame frame = new TextWebSocketFrame(operateMsg); ctx.writeAndFlush(frame); } } /** * 向某一個socket連接推送消息 */ public static void sendMsgToClient(long cabinetId, String msg) { //在AllSocket中找到cabinetId對應的連接,將該json消息推送出去 for (int i = 0; i < AllSocket.size(); i++) { SocketSession ca = AllSocket.get(i); if (ca.getCabinetId() == cabinetId) { ca.getChannel().writeAndFlush(new TextWebSocketFrame(msg)); break; } } } }
Netty頻道初始化:
/** * Netty 通道初始化 * * @author qiding */ @Component @RequiredArgsConstructor public class ChannelInit extends ChannelInitializer<SocketChannel> { private final MessageHandler messageHandler; @Override protected void initChannel(SocketChannel channel) { channel.pipeline() // 心跳時間 // 對http協議的支持. .addLast(new HttpServerCodec()) // 對大數據流的支持 .addLast(new ChunkedWriteHandler()) // 聚合 Http 將多個requestLine、requestHeader、messageBody信息轉化成單一的request或者response對象 .addLast(new HttpObjectAggregator(8192)) // 聚合 websocket 的數據幀,因為客戶端可能分段向服務器端發(fā)送數據 .addLast(new WebSocketFrameAggregator(1024 * 62)) // 添加消息處理器 .addLast("messageHandler", messageHandler); } }
TCPServer
public interface ITcpServer { /** * 主啟動程序,初始化參數 * * @throws Exception 初始化異常 */ void start() throws Exception; /** * 優(yōu)雅的結束服務器 * * @throws InterruptedException 提前中斷異常 */ @PreDestroy void destroy() throws InterruptedException; }
@Component @Slf4j @RequiredArgsConstructor public class TcpServer implements ITcpServer { private final ChannelInit channelInit; private final ServerProperties serverProperties; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; @Override public void start() throws Exception { log.info("初始化 TCP server ..."); bossGroup = serverProperties.isUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup(); workerGroup = serverProperties.isUseEpoll() ? new EpollEventLoopGroup() : new NioEventLoopGroup(); this.tcpServer(); } /** * 初始化 */ private void tcpServer() { try { new ServerBootstrap() .group(bossGroup, workerGroup) .channel( serverProperties.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .localAddress(new InetSocketAddress(serverProperties.getPort())) // 配置 編碼器、解碼器、業(yè)務處理 .childHandler(channelInit) // tcp緩沖區(qū) .option(ChannelOption.SO_BACKLOG, 128) // 將網絡數據積累到一定的數量后,服務器端才發(fā)送出去,會造成一定的延遲。希望服務是低延遲的,建議將TCP_NODELAY設置為true .childOption(ChannelOption.TCP_NODELAY, false) // 保持長連接 .childOption(ChannelOption.SO_KEEPALIVE, true) // 綁定端口,開始接收進來的連接 .bind().sync(); log.info("websocket server啟動成功!開始監(jiān)聽端口:{}", serverProperties.getPort()); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** * 銷毀 * @throws InterruptedException */ @PreDestroy @Override public void destroy() throws InterruptedException { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
啟動項中調用啟動:
@Override public void run(String... args) throws Exception { tcpServer.start(); }
到此這篇關于Netty搭建WebSocket服務器的文章就介紹到這了,更多相關Netty WebSocket服務器內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
JDK9為何要將String的底層實現由char[]改成了byte[]
String 類的源碼已經由?char[]?優(yōu)化為了?byte[]?來存儲字符串內容,為什么要這樣做呢?本文就詳細的介紹一下,感興趣的可以了解一下2022-03-03SpringBoot2學習之springboot與spring區(qū)別分析
這篇文章主要為大家介紹了SpringBoot2學習之springboot與spring區(qū)別分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05