使用java項目搭建一個netty服務(wù)
映入依賴,只要保證有這個依賴,就不需要單獨引入依賴,支持多個端口直連,支持多個實現(xiàn)層解析數(shù)據(jù),
<groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>3.3.4</version>
yml配置
# TCP設(shè)備對接
iot:
device:
port1: 1883
port2: 1885
package com.cqcloud.platform.handler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.cqcloud.platform.service.IotNbIotMqttService;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.service.impl.IotNbIotServiceImpl;
import com.cqcloud.platform.service.impl.IotPushServiceImpl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import jakarta.annotation.PostConstruct;
/**
* @author weimeilayer@gmail.com ?
* @date ???? 2022年3月8日???? ????
*/
@Component
public class NettyTcpServer {
/**
* 用于自設(shè)備1協(xié)議端口
*/
private static int PORT1;
/**
* 來自設(shè)備2協(xié)議端口
*/
private static int PORT2;
@Value("${iot.device.port1}")
public int port1Value;
@Value("${iot.device.port2}")
public int port2Value;
@PostConstruct
public void init() {
PORT1 = port1Value;
PORT2 = port2Value;
}
public void start() throws Exception {
final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
// 創(chuàng)建 MqttService 和 MqttPushService 實例
IotNbIotMqttService iotNbIotMqttService = new IotNbIotServiceImpl();
IotPushService iotPushService = new IotPushServiceImpl();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 直接使用 ByteBuf,無需編碼器和解碼器
// 根據(jù)端口注入不同的服務(wù)
if (ch.localAddress().getPort() == PORT1) {
pipeline.addLast(new TcpIotNbServerHandler(iotNbIotMqttService)); // 業(yè)務(wù)邏輯處理器
} else if (ch.localAddress().getPort() == PORT2) {
pipeline.addLast(new TcpIotServerHandler(iotPushService)); // 新處理器
}
}
});
// 綁定第一個端口并啟動
ChannelFuture future1 = bootstrap.bind(PORT1).sync();
// 綁定第二個端口并啟動
ChannelFuture future2 = bootstrap.bind(PORT2).sync();
// 等待服務(wù)器關(guān)閉
future1.channel().closeFuture().sync();
future2.channel().closeFuture().sync();
} finally {
// 優(yōu)雅地關(guān)閉線程池
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}啟動類需要
public static void main(String[] args) throws IOException {
ConfigurableEnvironment env = new SpringApplication(DynamicYearningApplication.class).run(args).getEnvironment();
String envPort = env.getProperty("server.port");
String port = Objects.isNull(envPort) ? "8000" : envPort;
String envContext = env.getProperty("server.servlet.context-path");
String contextPath = Objects.isNull(envContext) ? "" : envContext;
String path = port + contextPath + "/doc.html";
String externalAPI = InetAddress.getLocalHost().getHostAddress();
Console.log("Access URLs:\n\t-------------------------------------------------------------------------\n\tLocal-swagger: \t\thttp://127.0.0.1:{}\n\tExternal-swagger: \thttp://{}:{}\n\t-------------------------------------------------------------------------",path, externalAPI, path);
// 加上以下代碼
NettyTcpServer server = new NettyTcpServer();
try {
server.start();
} catch (Exception e) {
e.printStackTrace();
}
}
創(chuàng)建TcpIotServerHandler
package com.cqcloud.platform.handler;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.cqcloud.platform.entity.IotCommandRecords;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.utils.DeviceActionParser;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* 設(shè)備協(xié)議
* @author weimeilayer@gmail.com ?
* @date ???? 2022年3月8日 ???? ????
*/
@Slf4j
public class TcpIotServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 接口注入
private final IotPushService iotPushService;
public TcpIotServerHandler(IotPushService iotPushService) {
this.iotPushService = iotPushService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
byte[] byteArray;
if (in.readableBytes() <= 0) {
in.release();
return;
}
byteArray = new byte[in.readableBytes()];
in.readBytes(byteArray);
if (byteArray.length <= 0) {
in.release();
return;
}
// 將消息傳遞給 iotPushService
iotPushService.pushMessageArrived(byteArray);
}
// 發(fā)送響應(yīng)的統(tǒng)一輔助方法
private void sendResponse(ChannelHandlerContext ctx, String hexResponse) {
byte[] responseBytes = hexStringToByteArray(hexResponse);
ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
ctx.writeAndFlush(responseBuffer);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印異常堆棧跟蹤,便于調(diào)試和錯誤排查
cause.printStackTrace();
// 關(guān)閉當(dāng)前的通道,釋放相關(guān)資源
ctx.close();
}
}創(chuàng)建 TcpIotNbServerHandler
package com.cqcloud.platform.handler;
import com.cqcloud.platform.service.IotNbIotMqttService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* NB-IOT CAT1數(shù)據(jù)格協(xié)議
*
* @author weimeilayer@gmail.com
* @date ????2022年3月8日????????
*/
public class TcpIotNbServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final IotNbIotMqttService iotNbIotMqttService;
// 構(gòu)造函數(shù)注入 MqttService
public TcpIotNbServerHandler(IotNbIotMqttService iotNbIotMqttService) {
this.iotNbIotMqttService = iotNbIotMqttService;
}
@Override
public void channelRead0(ChannelHandlerContext ctx,ByteBuf in) {
byte[] byteArray;
if (in.readableBytes() <= 0) {
in.release();
return;
}
byteArray = new byte[in.readableBytes()];
in.readBytes(byteArray);
if (byteArray.length <= 0) {
in.release();
return;
}
// 將 byte[] 數(shù)據(jù)傳遞給 iotNbIotMqttService
iotNbIotMqttService.messageArrived(byteArray);
//發(fā)送固定事件默認(rèn)回復(fù)
sendResponse(ctx);
}
// 發(fā)送響應(yīng)的統(tǒng)一輔助方法
private void sendResponse(ChannelHandlerContext ctx) {
// 回復(fù)客戶端--向設(shè)備回復(fù)AAAA8001(設(shè)備將保持20秒不休眠),平臺盡量在10秒
byte[] responseBytes = new byte[] { (byte) 0xAA, (byte) 0xAA, (byte) 0x80, (byte) 0x01 };
ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);
ctx.writeAndFlush(responseBuffer);
}
//將響應(yīng)消息轉(zhuǎn)換為字節(jié)數(shù)組
public static byte[] hexStringToByteArray(String s) {
int len = s.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+ Character.digit(s.charAt(i + 1), 16));
}
return data;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
創(chuàng)建接口類IotPushService
package com.cqcloud.platform.service;
/**
* @author weimeilayer@gmail.com
* @date ????2022年3月8日????????
*/
public interface IotPushService {
public void pushMessageArrived(byte[] message);
}
創(chuàng)建IotNbIotMqttService 類
package com.cqcloud.platform.service;
/**
* @author weimeilayer@gmail.com
* @date ????2022年3月8日????????
*/
public interface IotNbIotMqttService {
public void messageArrived(byte[] message);
}
創(chuàng)建實現(xiàn)類IotNbIotServiceImpl
package com.cqcloud.platform.service.impl;
import org.springframework.stereotype.Service;
import com.cqcloud.platform.service.IotNbIotMqttService;
import com.cqcloud.platform.utils.DataParser;
import lombok.AllArgsConstructor;
/**
* @author weimeilayer@gmail.com
* @date ????2022年3月8日????????
*/
@Service
@AllArgsConstructor
public class IotNbIotServiceImpl implements IotNbIotMqttService {
@Override
public void messageArrived(byte[] message) {
// 將 byte 數(shù)組轉(zhuǎn)換為十六進(jìn)制字符串
String convertData = printByteArray(message);
// 打印字節(jié)數(shù)組內(nèi)容
System.out.println("來自于xxx數(shù)據(jù)格式協(xié)議的1883端口的數(shù)據(jù)字節(jié)數(shù)組內(nèi)容:"+ convertData);
//調(diào)用解析方法
dispatchMessage(convertData);
}
// 將 byte[] 轉(zhuǎn)換為十六進(jìn)制字符串的輔助方法
public static String bytesToHex(byte[] bytes) {
StringBuilder hex = new StringBuilder();
for (byte b : bytes) {
// 將每個字節(jié)轉(zhuǎn)換為兩位的十六進(jìn)制表示
hex.append(String.format("%02X", b));
}
return hex.toString();
}
public static String printByteArray(byte[] byteArray) {
StringBuilder hexString = new StringBuilder();
for (byte b : byteArray) {
// 將字節(jié)轉(zhuǎn)換為無符號的十六進(jìn)制字符串,去掉空格
hexString.append(String.format("%02X", b & 0xFF));
}
System.out.println("Byte Array (Hex): " + hexString.toString());
return hexString.toString();
}
public void dispatchMessage(String byteArray) {
String prefix = byteArray.substring(0, 2);
// 根據(jù) messageID 進(jìn)行判斷
System.out.println("來自于數(shù)據(jù)格式協(xié)議來自于1883端口的數(shù)據(jù)處理消息:" +byteArray);
}
}
創(chuàng)建 IotPushServiceImpl
package com.cqcloud.platform.service.impl;
import org.springframework.stereotype.Service;
import com.cqcloud.platform.service.IotPushService;
import com.cqcloud.platform.utils.DeviceActionParser;
import lombok.AllArgsConstructor;
/**
* 發(fā)送指令實現(xiàn)類
* @author weimeilayer@gmail.com
* @date ????2022年3月8日????????
*/
@Service
@AllArgsConstructor
public class IotPushServiceImpl implements IotPushService {
@Override
public void pushMessageArrived(byte[] message) {
// 解析字節(jié)數(shù)組
System.out.println("來自物聯(lián)網(wǎng)平臺的設(shè)備協(xié)議于1885端口的數(shù)據(jù)設(shè)備返回的的內(nèi)容處理");
//打印數(shù)據(jù)
printByteArray(message);
//調(diào)用解析方法
dispatchMessage(message);
}
//設(shè)備回復(fù)的接受內(nèi)容
public static void dispatchMessage(byte[] byteArray) {
}
public static void printByteArray(byte[] byteArray) {
StringBuilder hexString = new StringBuilder();
for (byte b : byteArray) {
// 將字節(jié)轉(zhuǎn)換為無符號的十六進(jìn)制字符串,去掉空格
hexString.append(String.format("%02X", b & 0xFF));
}
System.out.println("Byte Array (Hex): " + hexString.toString());
}
// 將十六進(jìn)制字符串轉(zhuǎn)換為字節(jié)數(shù)組的實用方法
public static byte[] stringToBytes(String s) {
int len = s.length();
byte[] data = new byte[len / 2];
for (int i = 0; i < len; i += 2) {
data[i / 2] = (byte) ((Character.digit(s.charAt(i), 16) << 4)
+ Character.digit(s.charAt(i+1), 16));
}
return data;
}
// 提取設(shè)備類型的十六進(jìn)制字符串
private static String extractDeviceTypeHex(byte[] byteArray) {
// 轉(zhuǎn)換為十六進(jìn)制字符串
String hexString = bytesToHex(byteArray);
// 提取設(shè)備類型
return hexString.substring(10, 12); // 設(shè)備類型的位數(shù)
}
// 輔助方法:將字節(jié)數(shù)組轉(zhuǎn)換為十六進(jìn)制字符串
private static String bytesToHex(byte[] bytes) {
StringBuilder hexString = new StringBuilder();
for (byte b : bytes) {
String hex = Integer.toHexString(0xFF & b);
if (hex.length() == 1) {
hexString.append('0'); // 確保每個字節(jié)都為兩位
}
hexString.append(hex);
}
return hexString.toString().toUpperCase(); // 返回大寫格式
}
// 將十六進(jìn)制字符串轉(zhuǎn)換為 byte
private static byte hexStringToByte(String hex) {
return (byte) Integer.parseInt(hex, 16);
}
}
然后使用網(wǎng)絡(luò)根據(jù)助手請求。

以上就是使用java項目搭建一個netty服務(wù)的詳細(xì)內(nèi)容,更多關(guān)于java搭建netty服務(wù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring中Bean有關(guān)NullPointerException異常的原因分析
在Spring中使用@Autowired注解注入的bean不能在靜態(tài)上下文中訪問,否則會導(dǎo)致NullPointerException,解決方法包括避免在靜態(tài)方法中使用注入的bean,或者使用Spring的ApplicationContext來獲取bean,但后者不推薦2024-12-12
淺談SpringBoot在使用測試的時候是否需要@RunWith
本文主要介紹了淺談SpringBoot在使用測試的時候是否需要@RunWith,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
Java判斷一個時間是否在當(dāng)前時間區(qū)間代碼示例
這篇文章主要給大家介紹了關(guān)于使用Java判斷一個時間是否在當(dāng)前時間區(qū)間的相關(guān)資料,在日常開發(fā)中我們經(jīng)常會涉及到時間的大小比較或者是判斷某個時間是否在某個時間段內(nèi),需要的朋友可以參考下2023-07-07
springboot實現(xiàn)注冊加密與登錄解密功能(demo)
這篇文章主要介紹了springboot實現(xiàn)注冊的加密與登錄的解密功能,本文通過demo實例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02
Springboot 整合 Java DL4J 實現(xiàn)時尚穿搭推薦系統(tǒng)(實例代碼)
本文介紹了如何使用SpringBoot和JavaDeeplearning4j框架搭建一個時尚穿搭推薦系統(tǒng),文章詳細(xì)闡述了系統(tǒng)的技術(shù)架構(gòu)、數(shù)據(jù)集格式、Maven依賴配置、模型訓(xùn)練和預(yù)測代碼實現(xiàn),以及單元測試和預(yù)期輸出結(jié)果2024-10-10
spring-boot2.7.8添加swagger的案例詳解
這篇文章主要介紹了spring-boot2.7.8添加swagger的案例詳解,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01

