public class MyNettyHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //连接到服务端成功时调用 super.channelActive(ctx); System.out.println("Channel is active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收到服务端消息时调用 ByteBuf in = (ByteBuf)msg; try { String message = in.toString(CharsetUtil.UTF_8); System.out.println("Received the server message:" + message); } finally { in.release(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //服务端消息读取完成时调用 super.channelReadComplete(ctx); System.out.println("Channel read complete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //异常处理方法 super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } }
public class MyServerInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { //添加多个Handler到ChannelPipeline中 ch.pipeline().addLast(new MyNettyHandler()); ch.pipeline().addLast(new MyChildHandler()); } } public class MyChildHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //子通道连接到服务端成功时调用 super.channelActive(ctx); System.out.println("sub channel is active"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收到子通道消息时调用 ByteBuf in = (ByteBuf)msg; try { String message = in.toString(CharsetUtil.UTF_8); System.out.println("Received the sub channel message:" + message); } finally { in.release(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //子通道消息读取完成时调用 super.channelReadComplete(ctx); System.out.println("Sub channel read complete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //异常处理方法 super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } }
1. HeartbeatHandler
public class HeartbeatHandler extends ChannelDuplexHandler { //心跳包内容,用于检查连接状态 private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case READER_IDLE: //长时间没有读取到数据,触发心跳事件 ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()); break; default: break; } } } }
2. AuthHandler
public class AuthHandler extends ChannelInboundHandlerAdapter { private static final String USERNAME = "admin"; private static final String PASSWORD = "secret"; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //身份验证代码 String[] data = ((String) msg).split(","); String username = data[0]; String password = data[1]; if (USERNAME.equals(username) && PASSWORD.equals(password)) { ctx.pipeline().remove(this); System.out.println("Authentication successful"); } else { System.out.println("Authentication failed"); ctx.close(); } } }
3. CompressionHandler
public class CompressionHandler extends ChannelDuplexHandler { private static final int DEFAULT_THRESHOLD = 1024; private int compressionThreshold = DEFAULT_THRESHOLD; public CompressionHandler() { } public CompressionHandler(int compressionThreshold) { this.compressionThreshold = compressionThreshold; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; if (buf.readableBytes() >= compressionThreshold) { ByteBuf compressionBuf = compress(buf); buf.release(); super.channelRead(ctx, compressionBuf); } else { super.channelRead(ctx, buf); } } else { super.channelRead(ctx, msg); } } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf)msg; ByteBuf compressionBuf = compress(buf); buf.release(); super.write(ctx, compressionBuf, promise); } else { super.write(ctx, msg, promise); } } private ByteBuf compress(ByteBuf buf) { //压缩方法 } }
4. LengthFieldBasedFrameDecoder
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024; private final ByteOrder byteOrder; private final int maxFrameLength; private final int lengthFieldOffset; private final int lengthFieldLength; private final int lengthAdjustment; private final int initialBytesToStrip; private final boolean failFast; public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0); } public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this(ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); } public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { this.byteOrder = byteOrder; this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; this.lengthAdjustment = lengthAdjustment; this.initialBytesToStrip = initialBytesToStrip; this.failFast = failFast; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; try { List