您的位置:

深入了解NettyHandler

一、NettyHandler传递

Netty框架提供可重用的事件驱动的网络应用程序编程框架,设计灵活且具有高效的性能。NettyHandler是这个框架中的一个重要组成部分,它负责处理所有入站和出站的数据流。NettyHandler是包含在Netty的ChannelPipeline中的一个拦截器,每个拦截器都可以执行自定义的逻辑处理,然后传递给下一个拦截器。

NettyHandler可以通过channelActive(),channelRead()和channelWrite()这些方法来处理Channel的网络事件。其中,channelActive()方法在连接建立时被调用,channelRead()方法在收到消息时被调用,channelWrite()方法在发送消息时被调用。对于入站数据,它将以ByteBuf形式传递到NettyHandler中,而出站数据则以ByteBuf的形式从NettyHandler中传递。

下面是一个简单的NettyHandler示例:

public class MyNettyHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //连接到服务端成功时调用
        super.channelActive(ctx);
        System.out.println("Channel is active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收到服务端消息时调用
        ByteBuf in = (ByteBuf)msg;
        try {
            String message = in.toString(CharsetUtil.UTF_8);
            System.out.println("Received the server message:" + message);
        } finally {
            in.release();
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //服务端消息读取完成时调用
        super.channelReadComplete(ctx);
        System.out.println("Channel read complete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //异常处理方法
        super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        ctx.close();
    }
}

二、NettyHandler和ChildHandler

ChildHandler是一种特殊类型的拦截器,它用于处理NettyHandler创建的子通道。NettyHandler与ChildHandler之间有很多相似之处,但它们在用途上不同。ChildHandler被用于处理连接建立之后,NettyHandler创建的子通道,通常对于子通道的配置和状态管理,我们使用ChildHandler实现。

下面是一个NettyHandler和ChildHandler如何使用的示例程序:

public class MyServerInitializer extends ChannelInitializer{

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //添加多个Handler到ChannelPipeline中
        ch.pipeline().addLast(new MyNettyHandler());
        ch.pipeline().addLast(new MyChildHandler());
    }
}

public class MyChildHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //子通道连接到服务端成功时调用
        super.channelActive(ctx);
        System.out.println("sub channel is active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收到子通道消息时调用
        ByteBuf in = (ByteBuf)msg;
        try {
            String message = in.toString(CharsetUtil.UTF_8);
            System.out.println("Received the sub channel message:" + message);
        } finally {
            in.release();
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //子通道消息读取完成时调用
        super.channelReadComplete(ctx);
        System.out.println("Sub channel read complete");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //异常处理方法
        super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        ctx.close();
    }
}

  

三、常见NettyHandler

NettyHandler是Netty框架中的基本模块,而在实际应用中,我们还会根据实际需要编写自定义的NettyHandler。下面是一些常见的NettyHandler示例:

1. HeartbeatHandler

HeartbeatHandler用于实现心跳机制,可以在一段时间内检查一些连接的状态并发送心跳信息来保证连接的持续性。

public class HeartbeatHandler extends ChannelDuplexHandler {

    //心跳包内容,用于检查连接状态
    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case READER_IDLE:
                    //长时间没有读取到数据,触发心跳事件
                    ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
                    break;
                default:
                    break;
            }
        }
    }
}

2. AuthHandler

AuthHandler用于身份验证,例如在连接服务端之前需要验证客户端的身份,可以通过AuthHandler实现。

public class AuthHandler extends ChannelInboundHandlerAdapter {

    private static final String USERNAME = "admin";
    private static final String PASSWORD = "secret";

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //身份验证代码
        String[] data = ((String) msg).split(",");
        String username = data[0];
        String password = data[1];
        if (USERNAME.equals(username) && PASSWORD.equals(password)) {
            ctx.pipeline().remove(this);
            System.out.println("Authentication successful");
        } else {
            System.out.println("Authentication failed");
            ctx.close();
        }
    }
}

3. CompressionHandler

CompressionHandler用于数据压缩和解压缩。例如,当网络带宽有限时,可以使用CompressionHandler自动压缩传输数据,以节省带宽。当然,数据交换的过程中,CompressionHandler也会自动解压缩数据。

public class CompressionHandler extends ChannelDuplexHandler {

    private static final int DEFAULT_THRESHOLD = 1024;
    private int compressionThreshold = DEFAULT_THRESHOLD;

    public CompressionHandler() {
    }

    public CompressionHandler(int compressionThreshold) {
        this.compressionThreshold = compressionThreshold;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (buf.readableBytes() >= compressionThreshold) {
                ByteBuf compressionBuf = compress(buf);
                buf.release();
                super.channelRead(ctx, compressionBuf);
            } else {
                super.channelRead(ctx, buf);
            }
        } else {
            super.channelRead(ctx, msg);
        }
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf)msg;
            ByteBuf compressionBuf = compress(buf);
            buf.release();
            super.write(ctx, compressionBuf, promise);
        } else {
            super.write(ctx, msg, promise);
        }
    }

    private ByteBuf compress(ByteBuf buf) {
        //压缩方法
    }
}

4. LengthFieldBasedFrameDecoder

LengthFieldBasedFrameDecoder是一个可以处理TCP粘包问题的解码器,用于分割接收到的二进制数据。它可以从二进制数据流中读取指定的长度字段,并根据该长度字段的值分割收到的数据。

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
    private static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024;

    private final ByteOrder byteOrder;
    private final int maxFrameLength;
    private final int lengthFieldOffset;
    private final int lengthFieldLength;
    private final int lengthAdjustment;
    private final int initialBytesToStrip;
    private final boolean failFast;

    public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);
    }

    public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        this(ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true);
    }

    public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        this.byteOrder = byteOrder;
        this.maxFrameLength = maxFrameLength;
        this.lengthFieldOffset = lengthFieldOffset;
        this.lengthFieldLength = lengthFieldLength;
        this.lengthAdjustment = lengthAdjustment;
        this.initialBytesToStrip = initialBytesToStrip;
        this.failFast = failFast;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            try {
                List out = new ArrayList();
                while (buf.readableBytes() >= lengthFieldOffset + lengthFieldLength) {
                    //读取length字段值
                    int frameLength = getFrameLength(buf, buf.readerIndex() + lengthFieldOffset, lengthFieldLength, byteOrder);
                    if (frameLength < 0) {
                        buf.skipBytes(lengthFieldOffset + lengthFieldLength);
                        continue;
                    }
                    //数据不完整
                    if (frameLength > buf.readableBytes()) {
                        break;
                    }
                    //丢弃length字段
                    if (initialBytesToStrip > frameLength) {
                        buf.skipBytes(lengthFieldOffset + lengthFieldLength + frameLength);
                        continue;
                    }
                    //根据读取到的length值解码数据
                    buf.skipBytes(lengthFieldOffset + lengthFieldLength + initialBytesToStrip);
                    int readerIndex = buf.readerIndex();
                    ByteBuf frame = buf.slice(readerIndex, frameLength - initialBytesToStrip);
                    try {
                        out.add(frame);
                    } finally {
                        buf.readerIndex(readerIndex + frameLength - initialBytesToStrip);
                    }
                }
                if (!out.isEmpty()) {
                    ctx.fireChannelRead(out);
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                buf.release();
            }
        } else {
            super.channelRead(ctx, msg);
        }
    }

    private static int getFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
        //读取length字段值方法
    }
}


四、结语

NettyHandler是Netty框架中的一种非常重要的组件,其在网络编程中的作用不可忽视。本文分别介绍了NettyHandler的传递机制和与ChildHandler之间的联系,并给出了一些常见的NettyHandler的示例。在实际开发中,我们需要根据实际需要编写自定义的NettyHandler,使得我们的网络应用程序更加灵活和高效。

文章目录
顶部