一、NettyHandler传递
Netty框架提供可重用的事件驱动的网络应用程序编程框架,设计灵活且具有高效的性能。NettyHandler是这个框架中的一个重要组成部分,它负责处理所有入站和出站的数据流。NettyHandler是包含在Netty的ChannelPipeline中的一个拦截器,每个拦截器都可以执行自定义的逻辑处理,然后传递给下一个拦截器。
NettyHandler可以通过channelActive(),channelRead()和channelWrite()这些方法来处理Channel的网络事件。其中,channelActive()方法在连接建立时被调用,channelRead()方法在收到消息时被调用,channelWrite()方法在发送消息时被调用。对于入站数据,它将以ByteBuf形式传递到NettyHandler中,而出站数据则以ByteBuf的形式从NettyHandler中传递。
下面是一个简单的NettyHandler示例:
public class MyNettyHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //连接到服务端成功时调用 super.channelActive(ctx); System.out.println("Channel is active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收到服务端消息时调用 ByteBuf in = (ByteBuf)msg; try { String message = in.toString(CharsetUtil.UTF_8); System.out.println("Received the server message:" + message); } finally { in.release(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //服务端消息读取完成时调用 super.channelReadComplete(ctx); System.out.println("Channel read complete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //异常处理方法 super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } }
二、NettyHandler和ChildHandler
ChildHandler是一种特殊类型的拦截器,它用于处理NettyHandler创建的子通道。NettyHandler与ChildHandler之间有很多相似之处,但它们在用途上不同。ChildHandler被用于处理连接建立之后,NettyHandler创建的子通道,通常对于子通道的配置和状态管理,我们使用ChildHandler实现。
下面是一个NettyHandler和ChildHandler如何使用的示例程序:
public class MyServerInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { //添加多个Handler到ChannelPipeline中 ch.pipeline().addLast(new MyNettyHandler()); ch.pipeline().addLast(new MyChildHandler()); } } public class MyChildHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //子通道连接到服务端成功时调用 super.channelActive(ctx); System.out.println("sub channel is active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收到子通道消息时调用 ByteBuf in = (ByteBuf)msg; try { String message = in.toString(CharsetUtil.UTF_8); System.out.println("Received the sub channel message:" + message); } finally { in.release(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //子通道消息读取完成时调用 super.channelReadComplete(ctx); System.out.println("Sub channel read complete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //异常处理方法 super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } }
三、常见NettyHandler
NettyHandler是Netty框架中的基本模块,而在实际应用中,我们还会根据实际需要编写自定义的NettyHandler。下面是一些常见的NettyHandler示例:
1. HeartbeatHandler
HeartbeatHandler用于实现心跳机制,可以在一段时间内检查一些连接的状态并发送心跳信息来保证连接的持续性。
public class HeartbeatHandler extends ChannelDuplexHandler { //心跳包内容,用于检查连接状态 private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case READER_IDLE: //长时间没有读取到数据,触发心跳事件 ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()); break; default: break; } } } }
2. AuthHandler
AuthHandler用于身份验证,例如在连接服务端之前需要验证客户端的身份,可以通过AuthHandler实现。
public class AuthHandler extends ChannelInboundHandlerAdapter { private static final String USERNAME = "admin"; private static final String PASSWORD = "secret"; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //身份验证代码 String[] data = ((String) msg).split(","); String username = data[0]; String password = data[1]; if (USERNAME.equals(username) && PASSWORD.equals(password)) { ctx.pipeline().remove(this); System.out.println("Authentication successful"); } else { System.out.println("Authentication failed"); ctx.close(); } } }
3. CompressionHandler
CompressionHandler用于数据压缩和解压缩。例如,当网络带宽有限时,可以使用CompressionHandler自动压缩传输数据,以节省带宽。当然,数据交换的过程中,CompressionHandler也会自动解压缩数据。
public class CompressionHandler extends ChannelDuplexHandler { private static final int DEFAULT_THRESHOLD = 1024; private int compressionThreshold = DEFAULT_THRESHOLD; public CompressionHandler() { } public CompressionHandler(int compressionThreshold) { this.compressionThreshold = compressionThreshold; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.readableBytes() >= compressionThreshold) { ByteBuf compressionBuf = compress(buf); buf.release(); super.channelRead(ctx, compressionBuf); } else { super.channelRead(ctx, buf); } } else { super.channelRead(ctx, msg); } } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf)msg; ByteBuf compressionBuf = compress(buf); buf.release(); super.write(ctx, compressionBuf, promise); } else { super.write(ctx, msg, promise); } } private ByteBuf compress(ByteBuf buf) { //压缩方法 } }
4. LengthFieldBasedFrameDecoder
LengthFieldBasedFrameDecoder是一个可以处理TCP粘包问题的解码器,用于分割接收到的二进制数据。它可以从二进制数据流中读取指定的长度字段,并根据该长度字段的值分割收到的数据。
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024; private final ByteOrder byteOrder; private final int maxFrameLength; private final int lengthFieldOffset; private final int lengthFieldLength; private final int lengthAdjustment; private final int initialBytesToStrip; private final boolean failFast; public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0); } public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this(ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); } public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { this.byteOrder = byteOrder; this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; this.lengthAdjustment = lengthAdjustment; this.initialBytesToStrip = initialBytesToStrip; this.failFast = failFast; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; try { List