netty對proxy protocol代理協(xié)議的支持詳解
簡介
我們知道proxy protocol是haproxy提出的一個代理協(xié)議,通過這個協(xié)議,所有實現(xiàn)這個協(xié)議的proxy或者LBS,都可以附帶真實客戶端的IP地址和端口號,這使得proxy protocol在實際應(yīng)用中非常有用。
這么優(yōu)秀的協(xié)議,沒有理由netty不支持。本文將會談一下netty中對proxy protoco代理協(xié)議的支持。
netty對proxy protocol協(xié)議的支持
proxy protocol協(xié)議其實很簡單,就是在請求前面帶了proxy header信息。
在netty中這個header信息叫做HAProxyMessage:
public final class HAProxyMessage extends AbstractReferenceCounted {
HAProxyMessage是一個ReferenceCounted,這一點和ByteBuf很類似,說明HAProxyMessage保留著和ByteBuf很類似的特性。
根據(jù)proxy protocol協(xié)議,該協(xié)議可以分為兩個版本,分別是v1和v2,其中v1版本是文本協(xié)議,而v2版本支持二進制的格式。
顯然從代碼編寫和調(diào)試的角度來看v1更加友好,但是從程序的角度來看,v2可能性能更高。
HAProxyMessage中有個專門的HAProxyProtocolVersion類,來表示proxy protocol的版本信息:
public enum HAProxyProtocolVersion { V1(VERSION_ONE_BYTE), V2(VERSION_TWO_BYTE);
HAProxyProtocolVersion是一個枚舉類,在它里面定義了和proxy協(xié)議相對應(yīng)的兩個版本號。
在版本號之后是command,在netty中用HAProxyCommand來表示:
public enum HAProxyCommand { LOCAL(HAProxyConstants.COMMAND_LOCAL_BYTE), PROXY(HAProxyConstants.COMMAND_PROXY_BYTE);
HAProxyCommand也是一個枚舉類,里面定義了兩個command的值,分別是local和proxy。
其中l(wèi)ocal表示該請求是代理服務(wù)器主動發(fā)起的,而不是客戶端發(fā)起的,比如監(jiān)控檢測等請求。
proxy表示該請求是一個代理請求。
接下來是AddressFamily和TransportProtocol,這兩個字段用同一個byte來表示,所以這兩個類都是HAProxyProxiedProtocol的內(nèi)部類。
先看下AddressFamily的定義:
public enum AddressFamily { AF_UNSPEC(AF_UNSPEC_BYTE), AF_IPv4(AF_IPV4_BYTE), AF_IPv6(AF_IPV6_BYTE), AF_UNIX(AF_UNIX_BYTE);
AddressFamily中定義了4個address family類型,分別是unspec,ipv4,ipv6和unix。分別對應(yīng)未知family,ipv4,ipv6和unix domain socket。
再看下TransportProtocol的定義:
public enum TransportProtocol { UNSPEC(TRANSPORT_UNSPEC_BYTE), STREAM(TRANSPORT_STREAM_BYTE), DGRAM(TRANSPORT_DGRAM_BYTE);
TransportProtocol有3個值,分別是unspec,stream和dgram。分別對應(yīng)未知協(xié)議,http/https協(xié)議,udp/tcp協(xié)議。
因為AddressFamily和TransportProtocol實際上是同一個byte,所以經(jīng)過組合之后可以得到下面的幾個枚舉值:
UNKNOWN(TPAF_UNKNOWN_BYTE, AddressFamily.AF_UNSPEC, TransportProtocol.UNSPEC), TCP4(TPAF_TCP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.STREAM), TCP6(TPAF_TCP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.STREAM), UDP4(TPAF_UDP4_BYTE, AddressFamily.AF_IPv4, TransportProtocol.DGRAM), UDP6(TPAF_UDP6_BYTE, AddressFamily.AF_IPv6, TransportProtocol.DGRAM), UNIX_STREAM(TPAF_UNIX_STREAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.STREAM), UNIX_DGRAM(TPAF_UNIX_DGRAM_BYTE, AddressFamily.AF_UNIX, TransportProtocol.DGRAM);
以上的枚舉值也是HAProxyProxiedProtocol中定義的值。
接下就是源ip地址,目標(biāo)地ip地址,源端口和目標(biāo)端口這幾個值,定義為屬性表示如下:
private final String sourceAddress; private final String destinationAddress; private final int sourcePort; private final int destinationPort;
最后,proxy protocol中還可以包含額外的字段tlv,tlv在netty中也是一種byteBuf,使用HAProxyTLV表示:
public class HAProxyTLV extends DefaultByteBufHolder
因為tlv是key value結(jié)構(gòu),所以看下HAProxyTLV的構(gòu)造函數(shù):
public HAProxyTLV(Type type, ByteBuf content) { this(type, Type.byteValueForType(type), content); }
HAProxyTLV接受一個type和byteBuf的value。
Type是一個枚舉類,在netty中可以支持下面的值:
public enum Type { PP2_TYPE_ALPN, PP2_TYPE_AUTHORITY, PP2_TYPE_SSL, PP2_TYPE_SSL_VERSION, PP2_TYPE_SSL_CN, PP2_TYPE_NETNS, OTHER;
在HAProxyMessage中,tlv是一個list來保存的:
private final List<HAProxyTLV> tlvs;
到此,所有HAProxyMessage所需要的參數(shù)都齊了,我們看下HAProxyMessage的構(gòu)造函數(shù):
public HAProxyMessage( HAProxyProtocolVersion protocolVersion, HAProxyCommand command, HAProxyProxiedProtocol proxiedProtocol, String sourceAddress, String destinationAddress, int sourcePort, int destinationPort, List<? extends HAProxyTLV> tlvs)
HAProxyMessage會將所有的參數(shù)都存儲到本地的變量中,供后續(xù)使用。
因為proxy protocol有兩個版本,v1和v2,所以HAProxyMessage中提供了兩個將header編碼為AProxyMessage對象的方法,分別是:
static HAProxyMessage decodeHeader(ByteBuf header)
和:
static HAProxyMessage decodeHeader(String header)
有了proxy protocol的java表示之后,我們再來看一下HAProxyMessage的編碼解碼器。
HAProxyMessage的編碼解碼器
netty對HAProxyMessage對象的支持表現(xiàn)在兩個地方,netty提供了兩個類分別對HAProxyMessage進行編碼和解碼,這兩個類是HAProxyMessageEncoder和HAProxyMessageDecoder。
先看一下HAProxyMessageEncoder:
public final class HAProxyMessageEncoder extends MessageToByteEncoder<HAProxyMessage>
HAProxyMessageEncoder繼承自MessageToByteEncoder,傳入的泛型是HAProxyMessage,表示是將HAProxyMessage編碼成為ByteBuf。
它的encode方法很簡單,根據(jù)HAProxyMessage傳入的message版本信息,分別進行編碼:
protected void encode(ChannelHandlerContext ctx, HAProxyMessage msg, ByteBuf out) throws Exception { switch (msg.protocolVersion()) { case V1: encodeV1(msg, out); break; case V2: encodeV2(msg, out); break; default: throw new HAProxyProtocolException("Unsupported version: " + msg.protocolVersion()); } }
HAProxyMessageDecoder是跟HAProxyMessageEncoder相反的動作,是將接收到的ByteBuf解析成為HAProxyMessage:
public class HAProxyMessageDecoder extends ByteToMessageDecoder
因為HAProxyMessage有兩個版本,那么怎么判斷接收到的ByeBuf是哪個版本呢?
其實很簡單,因為v1版本和v2版本的開始字符是不一樣的,v1版本的開頭是一個text:"PROXY", v2版本的開頭是一個固定的二進制串,如下所示:
static final byte[] BINARY_PREFIX = { (byte) 0x0D, (byte) 0x0A, (byte) 0x0D, (byte) 0x0A, (byte) 0x00, (byte) 0x0D, (byte) 0x0A, (byte) 0x51, (byte) 0x55, (byte) 0x49, (byte) 0x54, (byte) 0x0A }; static final byte[] TEXT_PREFIX = { (byte) 'P', (byte) 'R', (byte) 'O', (byte) 'X', (byte) 'Y', };
看下它的decode方法實現(xiàn):
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (version == -1) { if ((version = findVersion(in)) == -1) { return; } } ByteBuf decoded; if (version == 1) { decoded = decodeLine(ctx, in); } else { decoded = decodeStruct(ctx, in); } if (decoded != null) { finished = true; try { if (version == 1) { out.add(HAProxyMessage.decodeHeader(decoded.toString(CharsetUtil.US_ASCII))); } else { out.add(HAProxyMessage.decodeHeader(decoded)); } } catch (HAProxyProtocolException e) { fail(ctx, null, e); } } }
上面代碼的邏輯是先從ByteBuf中根據(jù)版本號decode出header信息放到ByteBuf中。
然后再根據(jù)版本號的不同,分別調(diào)用HAProxyMessage的兩個不同版本的decodeHeader方法進行解碼。最終得到HAProxyMessage。
netty中proxy protocol的代碼示例
有了netty對proxy protocol的支持,那么在netty中搭建支持proxy protocol的服務(wù)器和客戶端就很容易了。
先看一下如何搭建支持proxy protocol的服務(wù)器:
private static void startServer(int port) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ServerInitializer()); b.bind(port).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
代碼和常規(guī)的netty server一樣,這里使用了NioEventLoopGroup和NioServerSocketChannel,搭建了一個支持TCP協(xié)議的netty服務(wù)器。
ServerInitializer中包含了netty自帶的HAProxy編碼器和自定義的消息處理器:
class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new LoggingHandler(LogLevel.DEBUG), new HAProxyMessageDecoder(), new SimpleChannelInboundHandler() { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { if (msg instanceof HAProxyMessage) { log.info("proxy message is : {}", msg); } else if (msg instanceof ByteBuf) { log.info("bytebuf message is : {}", ByteBufUtil.prettyHexDump((ByteBuf) msg)); } } }); } }
這里使用netty自帶的HAProxyMessageDecoder,用來將ByteBuf消息解碼為HAProxyMessage,然后在自定義的SimpleChannelInboundHandler中對HAProxyMessage進行處理。
這里的服務(wù)器可以處理兩種消息,一種是HAProxyMessage,一種是原始的ByteBuf。處理的結(jié)果就是將消息打印出來。
然后看下客戶端的定義:
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ClientHander()); Channel ch = b.connect(host, port).sync().channel();
客戶端使用的是EventLoopGroup和NioSocketChannel,是基于TCP協(xié)議的請求。
這里添加了自定義的handler:ClientHander,ClientHander繼承自ChannelOutboundHandlerAdapter用來對client發(fā)出的消息進行處理。
這里看一下它的handlerAdded方法:
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { ctx.pipeline().addBefore(ctx.name(), null, HAProxyMessageEncoder.INSTANCE); super.handlerAdded(ctx); }
可以看到handlerAdded方法向channelPipeline中添加了HAProxyMessageEncoder,用于編碼HAProxyMessage。
因為對于一個connection來說,HAProxyMessage只需要用到一次,后續(xù)的正常消息就不需要這個編碼器了,所以我們需要在write方法中監(jiān)聽HAProxyMessage的狀態(tài),如果寫入成功之后,就從pipeline中移出HAProxyMessageEncoder和ClientHander。
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ChannelFuture future1 = ctx.write(msg, promise); if (msg instanceof HAProxyMessage) { future1.addListener((ChannelFutureListener) future2 -> { if (future2.isSuccess()) { ctx.pipeline().remove(HAProxyMessageEncoder.INSTANCE); ctx.pipeline().remove(ClientHander.this); } else { ctx.close(); } }); } }
最后我們構(gòu)建了一個虛擬的HAProxyMessage,然后通過netty客戶端進行發(fā)送:
HAProxyMessage message = new HAProxyMessage( HAProxyProtocolVersion.V2, HAProxyCommand.PROXY, HAProxyProxiedProtocol.TCP4, "127.0.0.1", "127.0.0.2", 8000, 9000); ch.writeAndFlush(message).sync(); ch.writeAndFlush(Unpooled.copiedBuffer("this is a proxy protocol message!", CharsetUtil.UTF_8)).sync(); ch.close().sync();
總結(jié)
上面的代碼只是一個簡單的模擬proxy protocol在netty中的使用情況,并不代表上面的代碼就可以在實際的項目中應(yīng)用了。如果你想使用的話,可以在下面的代碼上面繼續(xù)豐富和完善。
本文的代碼,大家可以參考:
以上就是netty對proxy protocol代理協(xié)議的支持詳解的詳細(xì)內(nèi)容,更多關(guān)于netty支持proxy protocol代理協(xié)議的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
基于java開發(fā)之系統(tǒng)托盤的應(yīng)用
本篇文章介紹了,基于java開發(fā)之系統(tǒng)托盤的應(yīng)用。需要的朋友參考下2013-05-05maven+阿里云創(chuàng)建國內(nèi)鏡像的中央倉庫(親測可用)
本篇文章主要介紹了maven+阿里云創(chuàng)建國內(nèi)鏡像的中央倉庫(親測可用),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12在Java的Hibernate框架中使用SQL語句的簡單介紹
這篇文章主要介紹了在Java的Hibernate框架中使用SQL語句的方法,Hibernate是Java的SSH三大web開發(fā)框架之一,需要的朋友可以參考下2016-01-01Java多線程執(zhí)行處理業(yè)務(wù)時間太久解決方法代碼示例
這篇文章主要介紹了Java多線程執(zhí)行處理業(yè)務(wù)時間太久解決方法代碼示例的相關(guān)資料,具有一定借鑒價值,需要的朋友可以參考下。2017-12-12