深入理解Netty核心類及其作用
MessageToByteEncoder
MessageToByteEncoder是一個抽象編碼器,子類可重寫encode方法把對象編碼為ByteBuf輸出。
MessageToByteEncoder繼承自ChannelOutboundHandlerAdapter,encode在出站是被調(diào)用。
public class MyMessageEncoder extends MessageToByteEncoder<MessagePO> {
@Override
protected void encode(ChannelHandlerContext ctx, MessagePO msg, ByteBuf out) throws Exception {
System.out.println("MyMessageEncoder.encode,被調(diào)用");
String json = JSONObject.toJSONString(msg);
out.writeInt(json.getBytes(StandardCharsets.UTF_8).length);
out.writeBytes(json.getBytes(StandardCharsets.UTF_8));
}
}
ByteToMessageDecoder
ByteToMessageDecoder是一種ChannelInboundHandler,可以稱為解碼器,負(fù)責(zé)將byte字節(jié)流(ByteBuf)轉(zhuǎn)換成一種Message,Message是應(yīng)用可以自己定義的一種Java對象。
ByteToMessageDecoder:用于將字節(jié)轉(zhuǎn)為消息,需要檢測緩沖區(qū)是否有足夠的字節(jié)。
public class MyMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder.decode,被調(diào)用");
while (in.readableBytes() >= 4){
int num = in.readInt();
System.out.println("解碼出一個整數(shù):"+num);
out.add(num);
}
}
}
ReplayingDecoder
ReplayingDecoder:繼承自ByteToMessageDecoder,不需要檢測緩沖區(qū)是否有足夠的字節(jié),但是ReplayingDecoder的速度略慢于ByteToMessageDecoder,而且并不是所有的ByteBuf都支持。
項目復(fù)雜度高用ReplayingDecoder,否則使用ByteToMessageDecoder。
public class MyMessageDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("MyMessageDecoder.decode,被調(diào)用");
int length = in.readInt();
byte[] content = new byte[length];
in.readBytes(content);
String json = new String(content,StandardCharsets.UTF_8);
MessagePO po = JSONObject.parseObject(json,MessagePO.class);
out.add(po);
}
}
MessageToMessageEncoder
用于從一種消息編碼為另外一種消息,例如從POJO到POJO,是一種ChannelOutboundHandler
MessageToMessageDecoder
從一種消息解碼為另一種消息,例如POJO到POJO,是一種ChannelInboundHandler
MessageToMessageCodec
整合了MessageToMessageEncoder 和 MessageToMessageDecoder
public class RequestMessageCodec extends MessageToMessageCodec<String, RequestData> {
@Override
protected void encode(ChannelHandlerContext ctx, RequestData msg, List<Object> out) throws Exception {
System.out.println("RequestMessageCodec.encode 被調(diào)用 " + msg);
String json = JSONObject.toJSONString(msg);
out.add(json);
}
@Override
protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
System.out.println("RequestMessageCodec.decode 被調(diào)用 " + msg);
RequestData po = JSONObject.parseObject(msg, RequestData.class);
out.add(po);
}
}
ChannelInitializer
ChannelInitializer是一種特殊的ChannelInboundHandler,可以通過一種簡單的方式(調(diào)用initChannel方法)來初始化Channel。
通常在Bootstrap.handler(ChannelHandler) , ServerBootstrap.handler(ChannelHandler) 和 ServerBootstrap.childHandler(ChannelHandler)中給Channel設(shè)置ChannelPipeline。
注意:當(dāng)initChannel被執(zhí)行完后,會將當(dāng)前的handler從Pipeline中移除。
Bootstrap bootstrap = new Bootstrap().group(group)//設(shè)置線程組
.channel(NioSocketChannel.class)//設(shè)置客戶端通道的實現(xiàn)類
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());//加入自己的處理器
}
});
ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作為服務(wù)器的通道實現(xiàn)
.option(ChannelOption.SO_BACKLOG, 128)//設(shè)置線程隊列等待連接的個數(shù)
.childOption(ChannelOption.SO_KEEPALIVE, true)//設(shè)置保持活動連接狀態(tài)
// .handler(null)//該Handler對應(yīng)bossGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//給workerGroup的EventLoop對應(yīng)的管道設(shè)置處理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyServerHandler());
}
});
SimpleChannelInboundHandler
SimpleChannelInboundHandler繼承自ChannelInboundHandlerAdapter,可以通過泛型來規(guī)定消息類型。
處理入站的數(shù)據(jù)我們只需要實現(xiàn)channelRead0方法。
SimpleChannelInboundHandler在接收到數(shù)據(jù)后會自動release掉數(shù)據(jù)占用的Bytebuffer資源,ChannelInboundHandlerAdapter不會自動釋放。
public class MyClientHandler extends SimpleChannelInboundHandler<MessagePO> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessagePO msg) throws Exception {
System.out.println("收到服務(wù)端消息:" + msg);
}
}
DefaultEventLoopGroup
在向pipline中添加ChannelHandler時,可以提供一個新的線程組,Handler業(yè)務(wù)會在該線程中執(zhí)行。
當(dāng)加ChannelHandler需要執(zhí)行多線程并發(fā)業(yè)務(wù)時,DefaultEventLoopGroup可以派上大用處。
如果沒有設(shè)置DefaultEventLoopGroup,默認(rèn)使用的是EventLoopGroup workerGroup = new NioEventLoopGroup();
DefaultEventLoopGroup businessGroup = new DefaultEventLoopGroup(100); ... addLast(businessGroup, new MyNettyServerHandler())
/**
* 讀取客戶端發(fā)送過來的消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("收到客戶信息:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址:" + ctx.channel().remoteAddress());
System.out.println("處理線程:" + Thread.currentThread().getName());
ctx.executor().parent().execute(()->{
try {
System.out.println("parent().execute Thread = " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2L);
System.out.println("parent任務(wù)執(zhí)行完成1");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ctx.executor().parent().execute(()->{
try {
System.out.println("parent().execute Thread = " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2L);
System.out.println("parent任務(wù)執(zhí)行完成2");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ctx.executor().parent().execute(()->{
try {
System.out.println("parent().execute Thread = " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2L);
System.out.println("parent任務(wù)執(zhí)行完成3");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}以上代碼執(zhí)行日志如下:
收到客戶信息:Hello 服務(wù)端
客戶端地址:/127.0.0.1:60345
處理線程:defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-2
parent().execute Thread = defaultEventLoopGroup-4-3
程序繼續(xù)~~ defaultEventLoopGroup-4-1
parent().execute Thread = defaultEventLoopGroup-4-4
parent任務(wù)執(zhí)行完成1
parent任務(wù)執(zhí)行完成3
parent任務(wù)執(zhí)行完成2
EventLoop定時任務(wù)
可以在Handler中通過方法ctx.channel().eventLoop().schedule()添加定時任務(wù)
ctx.channel().eventLoop().schedule(()->{
try {
System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2L);
System.out.println("定時任務(wù)執(zhí)行完成");
} catch (InterruptedException e) {
e.printStackTrace();
}
},10L,TimeUnit.SECONDS);
到此這篇關(guān)于深入理解Netty核心類及其作用的文章就介紹到這了,更多相關(guān)Netty核心類內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解javaweb中jstl如何循環(huán)List中的Map數(shù)據(jù)
這篇文章主要介紹了詳解javaweb中jstl如何循環(huán)List中的Map數(shù)據(jù)的相關(guān)資料,希望通過本文能幫助到大家,讓大家理解掌握這部分內(nèi)容,需要的朋友可以參考下2017-10-10
BMIDE環(huán)境導(dǎo)入項目報編碼錯誤解決方案
這篇文章主要介紹了BMIDE環(huán)境導(dǎo)入項目報編碼錯誤解決方案,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-10-10
Java實現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實例代碼
本文主要介紹了Java實現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實例代碼,代碼簡單易懂,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02
Java中StringBuilder與StringBuffer的區(qū)別
在Java編程中,字符串的拼接是一項常見的操作。為了有效地處理字符串的拼接需求,Java提供了兩個主要的類:StringBuilder和StringBuffer,本文主要介紹了Java中StringBuilder與StringBuffer的區(qū)別,感興趣的可以了解一下2023-08-08
基于Spring Boot保護(hù)Web應(yīng)用程序
這篇文章主要介紹了基于Spring Boot保護(hù)Web應(yīng)用程序,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-03-03
通過AOP攔截Spring?Boot日志并將其存入數(shù)據(jù)庫功能實現(xiàn)
本文介紹了如何使用Spring Boot和AOP技術(shù)實現(xiàn)攔截系統(tǒng)日志并保存到數(shù)據(jù)庫中的功能,包括配置數(shù)據(jù)庫連接、定義日志實體類、定義日志攔截器、使用AOP攔截日志并保存到數(shù)據(jù)庫中等步驟,感興趣的朋友一起看看吧2023-08-08
Spring項目中使用Junit單元測試并配置數(shù)據(jù)源的操作
這篇文章主要介紹了Spring項目中使用Junit單元測試并配置數(shù)據(jù)源的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09

