您的位置:

Netty学习

一、Netty有必要学吗

Netty是一个基于NIO的客户端/服务端框架,用于开发网络应用程序。相对于传统的Java I/O,Netty提供了更好的性能、更好的可维护性和更好的可扩展性。学习Netty可以让我们更加深入地理解网络编程,还能够在工作中运用到这个高性能的框架。

与其他的网络编程框架相比,Netty在处理高并发、高吞吐量以及低延迟等方面有很大的优势。如果我们需要开发高性能的网络应用,并且在实际应用中跑得更快、更稳定,那么学习Netty就是非常有必要的。

最后,在如今互联网高速发展的背景下,网络安全一直是人们关注的焦点。Netty提供了基于SSL的安全传输,可以保证数据在传输过程中的安全性。所以,即使我们不经常开发网络应用程序,学习Netty也有一定的必要性。

二、Netty客户端维护多个连接

在实际应用中,我们可能需要维护多个网络连接,例如连接多个服务器进行数据交互。当使用传统Java I/O时,每个网络连接都需要一个线程来处理,如果连接数非常多,那么就会出现线程数过多的问题,导致系统崩溃。但是在Netty中,我们可以使用连接池技术来解决这个问题,从而提高应用程序的性能和可靠性。

(一)连接池实现方式举例


public class NettyConnectionPool {
  private static final String HOST = "127.0.0.1";
  private static final int PORT = 8080;
  
  private static EventLoopGroup group = new NioEventLoopGroup();
  private static Bootstrap bootstrap = new Bootstrap();
  private static final int MAX_POOL_SIZE = 10;
  private static final Queue<Channel> pool = new ConcurrentLinkedQueue<Channel>();
  
  // 初始化连接池
  static {
       bootstrap.group(group);
    bootstrap.channel(NioSocketChannel.class);
    bootstrap.option(ChannelOption.TCP_NODELAY, true);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
      @Override
      public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new NettyClientHandler());
      }
    });
    while (pool.size() < MAX_POOL_SIZE) {
       pool.add(bootstrap.connect(HOST, PORT).channel());
     }
  }
  
  // 获取连接
  public static Channel getConnection() {
    if (pool.isEmpty()) {
       return null;
    } else {
       return pool.poll();
    }
  }
  
  // 释放连接
  public static void releaseConnection(Channel channel) {
    if (channel != null && pool.size() < MAX_POOL_SIZE) {
       pool.add(channel);
    } else {
       channel.close();
    }
  }
}

(二)连接池使用方式

使用连接池技术,我们可以更好地管理网络连接,提高应用程序的性能和可靠性。


public class NettyClient {
  private static final int MAX_RETRY = 3;
  
  // 启动连接
  public static void start() {
     Bootstrap bootstrap = new Bootstrap();
     EventLoopGroup group = new NioEventLoopGroup();
     bootstrap.group(group);
     bootstrap.channel(NioSocketChannel.class);
     bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
     bootstrap.option(ChannelOption.TCP_NODELAY, true);
     bootstrap.handler(new ChannelInitializer<SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline pipeline = ch.pipeline();
             pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
             pipeline.addLast(new LengthFieldPrepender(4));
             pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
             pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
             pipeline.addLast(new NettyClientHandler());
        }
     });
     ChannelFuture future = connect(bootstrap, "127.0.0.1", 8080, MAX_RETRY);
     future.addListener(new ChannelFutureListener() {
         @Override
      public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
          Channel channel = future.channel();
          NettyConnectionPool.releaseConnection(channel);
        } else {
          System.out.println("连接失败!");
        }
   }
 });
 }
  
  // 连接或重连服务器
  private static ChannelFuture connect(Bootstrap bootstrap, String host, int port, int retry) {
     return bootstrap.connect(host, port).addListener(new ChannelFutureListener() {
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
             if (future.isSuccess()) {
                 System.out.println("连接成功!");
             } else if (retry == 0) {
                 System.out.println("重试次数已用完,放弃连接!");
             } else {
                 int order = (MAX_RETRY - retry) + 1;
                 int delay = 1 << order;
                 System.out.println(new Date() + ": 连接失败,第" + order + "次重连……");
                 bootstrap.config().group().schedule(() ->
                         connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
             }
         }
     });
 }
}

三、Netty的对象池技术

Netty的对象池技术是指在应用程序中,为了防止频繁地创建对象而引入的一种技术方案。由于频繁创建对象会导致申请和销毁内存空间的开销,从而影响应用程序的性能。因此,使用对象池技术可以大大地提高应用程序的性能。

(一)对象池实现方式举例


public class NettyObjectPool {
  private static final int DEFAULT_POOL_SIZE = 10;
  private final int maxPoolSize;
  private final PoolObjectFactory factory;
  private final Deque pool;
  
  // 实现 PoolObjectFactory 接口
  public interface PoolObjectFactory<T> {
    T createObject();
     void destroyObject(T obj);
  }
  
  // 初始化对象池
  public NettyObjectPool(PoolObjectFactory factory) {
     this(DEFAULT_POOL_SIZE, factory);
  }
  
  public NettyObjectPool(int maxPoolSize, PoolObjectFactory factory) {
     this.maxPoolSize = maxPoolSize;
     this.factory = factory;
     this.pool = new ConcurrentLinkedDeque();
    for (int i = 0; i < maxPoolSize; i++) {
      pool.add(factory.createObject());
    }
  }
  
  // 获取对象
  public Object borrowObject() {
    if (pool.isEmpty()) {
       return null;
    } else {
       return pool.pop();
    }
  }
  
  // 归还对象
  public void returnObject(Object obj) {
    if (pool.size() >= maxPoolSize) {
       factory.destroyObject(obj);
    } else {
       pool.add(obj);
    }
  }
}

(二)对象池使用方式

使用对象池技术,我们可以更好地管理内存,减少内存分配和销毁的开销,提高应用程序的性能。


public class NettyClient {
  private static final NettyObjectPool<NettyClientHandler> handlerPool =
    new NettyObjectPool(MAX_POOL_SIZE, new NettyObjectPool.PoolObjectFactory<NettyClientHandler>() {
      @Override
      public NettyClientHandler createObject() {
        return new NettyClientHandler();
      }
      @Override
      public void destroyObject(NettyClientHandler handler) {
        handler = null;
      }
    });
  
  // 启动连接
  public static void start() {
     Bootstrap bootstrap = new Bootstrap();
     EventLoopGroup group = new NioEventLoopGroup();
     bootstrap.group(group);
     bootstrap.channel(NioSocketChannel.class);
     bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
     bootstrap.option(ChannelOption.TCP_NODELAY, true);
     bootstrap.handler(new ChannelInitializer<SocketChannel>() {
          @Override
          protected void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline pipeline = ch.pipeline();
             pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
             pipeline.addLast(new LengthFieldPrepender(4));
             pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
             pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
      NettyClientHandler handler = handlerPool.borrowObject();
      pipeline.addLast(handler);
      channel.attr(AttributeKey.valueOf("handler")).set(handler); // 将handler保存到channel的属性中
         }
     });
     ChannelFuture future = connect(bootstrap, "127.0.0.1", 8080, MAX_RETRY);
     future.addListener(new ChannelFutureListener() {
         @Override
         public void operationComplete(ChannelFuture future) throws Exception {
             if (future.isSuccess()) {
                 System.out.println("连接成功!");
                 Channel channel = future.channel();
                 NettyClientHandler handler = (NettyClientHandler) channel.pipeline().last();
                 handlerPool.returnObject(handler); // 将handler对象归还给对象池
             } else {
                 System.out.println("连接失败!");
             }
         }
     });
 }
}