一、Netty 异步和事件驱动

1.1 java 网络编程

最早期的 Java API(java.net)只支持由本地系统套接字(Socket)库提供的所谓的阻塞函数。这段代码片段将只能同时处理一个连接,要管理多个并发客户端,需要为每个新的客户端 Socket 创建一个新的 Thread。

1.2 Java NIO

使用上了选择器,进行选择空闲的 socket 连接。

image-20210610185755023.png

1.3 Netty 核心组件

  • Channel

    目前,可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以 被打开或者被关闭,连接或者断开连接。在内部,将会为每个 Channel 分配一个 EventLoop,用以处理所有事件

  • 回调

    一个回调其实就是一个方法,一个指向已经被提供给另外一个方法的方法的引用。指定执行该有的顺序。

  • Future

    Future 提供了另一种在操作完成时通知应用程序的方式。

  • 事件和 ChannelHandler

    具体的处理。

二、开发你的第一款Netty应用程序

2.1 编写Echo服务器处理逻辑

EchoServerHandler 简单打印一下接收到的消息, channelRead 表示接收消息,可以看到 msg 转换成了ByteBuf,然后打印,也就是把Client传过来的消息打印了一下,你会发现每次打印完后,channelReadComplete也会调用,如果你试着传一个超长的字符串过来,超过 1024 个字母长度,你会发现 channelRead 会调用多次,而c hannelReadComplete 只调用一次。

所以这就比较清晰了吧,因为 ByteBuf 是有长度限制的,所以超长了,就会多次读取,也就是调用多次channelRead,而 channelReadComplete 则是每条消息只会调用一次,无论你多长,分多少次读取,只在该条消息最后一次读取完成的时候调用。

/**
 * 标示一个Channel-Handler 可以被多个 Channel 安全地共享
 */
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println(
                "Server received: " + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        //  通知ChannelInboundHandler最后一次对channelRead()的调用是当前批量读取中的最后一条消息;
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
           .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
            Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

2.2 引导服务器

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoServer(port).start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        // 创建Event-LoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建ServerBootstrap
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
             .channel(NioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch)
                         throws Exception {
                     ch.pipeline().addLast(serverHandler);
                 }
             });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

2.3 编写Echo客户端处理逻辑

@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        // 记录已接收,消息的转储
        System.out.println(
                "Client received: " + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 被通知 Channel
        // 是活跃的时候,发
        // 送一条消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",
                CharsetUtil.UTF_8));
        // super.channelActive(ctx);
    }

}

2.4 引导服务器

public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        String host = "127.0.0.1";
        int port = 8080;
        new EchoClient(host, port).start();
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .remoteAddress(new InetSocketAddress(host, port))
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch)
                         throws Exception {
                     ch.pipeline().addLast(
                             new EchoClientHandler());
                 }
             });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

三、Netty的组件和设计

3.1 Channel 接口

基本的 I/O 操作(bind()、connect()、read()和 write())依赖于底层网络传输所提 供的原语。在基于 Java 的网络编程中,其基本的构造是 class Socket。Netty 的 Channel 接 口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。此外,Channel 也是拥有许多 预定义的、专门化实现的广泛类层次结构的根,下面是一个简短的部分清单:

  • EmbeddedChannel
  • LocalServerChannel
  • NioDatagramChannel
  • NioSctpChannel
  • NioSocketChannel

3.2 EventLoop 接口

图 3-1 在高层次上说明了 Channel、EventLoop、Thread 以及 EventLoopGroup 之间的关系

image-20210610185745766.png

  • 一个 EventLoopGroup 包含一个或者多个 EventLoop
  • 一个 EventLoop 在它的生命周期内只和一个 Thread 绑定
  • 所有由 EventLoop 处理的 I/O 事件都将在它专有的 Thread 上被处理
  • 一个 Channel 在它的生命周期内只注册于一个 EventLoop
  • 一个 EventLoop 可能会被分配给一个或多个 Channel

3.3 ChannelFuture 接口

Netty 中所有的 I/O 操作都是异步的。因为一个操作可能不会 立即返回,所以我们需要一种用于在之后的某个时间点确定其结果的方法。为此,Netty 提供了 ChannelFuture 接口,其 addListener()方法注册了一个 ChannelFutureListener,以 便在某个操作完成时(无论是否成功)得到通知。

3.4 ChannelHandler 和 ChannelPipeline

Netty 的主要组件是 ChannelHandler,它充当了所有 处理入站和出站数据的应用程序逻辑的容器。

ChannelPipeline 为 ChannelHandler 链提供了容器。

一个 ChannelPipeline 装着多个 ChannelHandler

image-20210610185734660.png

经常使用到的适配器类

  • ChannelHandlerAdapter

  • ChannelInboundHandlerAdapter

  • ChannelOutboundHandlerAdapter

  • ChannelDuplexHandler

四、传输

  • OIO——阻塞传输 OioServerSocketChannel

  • NIO——异步传输 NioServerSocketChannel

  • Local——JVM 内部的异步通信

    用于在同一个 JVM 中运行的客户端和服务器程序之间的异步通信

  • Embedded——测试你的 ChannelHandle

零拷贝 零拷贝(zero-copy)是一种目前只有在使用 NIO 和 Epoll 传输时才可使用的特性。它使你可以快速 高效地将数据从文件系统移动到网络接口,而不需要将其从内核空间复制到用户空间,其在像 FTP 或者 HTTP 这样的协议中可以显著地提升性能。但是,并不是所有的操作系统都支持这一特性。

五、ByteBuf

5.1 数据结构

ByteBuf 维护了两个不同的索引:一个用于读取,一个用于写入。当你从 ByteBuf 读取时, 它的 readerIndex 将会被递增已经被读取的字节数。同样地,当你写入 ByteBuf 时,它的 writerIndex 也会被递增。名称以 read 或者 write 开头的 ByteBuf 方法,将会推进其对应的索引,而名称以 set 或 者 get 开头的操作则不会。

image-20210610185725190.png

5.2 ByteBuf的使用模式

  • 堆缓冲区(数组)

    数据存储在JVM的堆中可以快速创建和快速释放,并且提供了数组的直接快速访问的方法

    ByteBuf heapBuf = ...;
    // 检查 ByteBuf 是否有一个支撑数组
    if (heapBuf.hasArray()) { 
    byte[] array = heapBuf.array();
    int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
    int length = heapBuf.readableBytes();
    handleArray(array, offset, length); //使用数组、偏移量和长度作为参数调用你的方法
    }
    
  • 直接缓冲区

    数据直接在内存中,不存在从JVM拷贝数据到直接缓冲区的过程,性能好

    ByteBuf directBuf = ...;
    if (!directBuf.hasArray()) {
    int length = directBuf.readableBytes();
    byte[] array = new byte[length];
    directBuf.getBytes(directBuf.readerIndex(), array);
    handleArray(array, 0, length); //使用数组、偏移量和长度作为参数调用你的方法
    }
    
  • 复合缓冲区

    Netty 通过一个 ByteBuf 子类——CompositeByteBuf ——实现了这个模式,它提供了一 个将多个缓冲区表示为单个合并缓冲区的虚拟表示。

image-20210610185717582.png

常用方法:

        ByteBuf byteBuf = Unpooled.copiedBuffer("hello netty! 123", Charset.defaultCharset());

        // 遍历方式1
        for (int i = 0; i < byteBuf.capacity(); i++) {
            System.out.println((char) byteBuf.getByte(i));
        }
        // 遍历方式2
        System.out.println("----------------------");
        while (byteBuf.isReadable()) {
            System.out.println((char) byteBuf.readByte());
        }

        // 切片,对元数据进行操作会影响本体,切割后元数据也就剩下那么多了
        ByteBuf slice = byteBuf.slice(0, 11);
        System.out.println(slice.toString(Charset.defaultCharset()));
        byteBuf.setByte(0, (byte) 'H');
        System.out.println(slice.toString(Charset.defaultCharset()));

        // 复制的场景,不影响本体
        ByteBuf copy = byteBuf.copy(0, 11);
        System.out.println(copy.toString(Charset.defaultCharset()));
        byteBuf.setByte(0, (byte) 'H');
        System.out.println(copy.toString(Charset.defaultCharset()));

5.3 Unpooled 缓冲区

可能某些情况下,你未能获取一个到 ByteBufAllocator 的引用。对于这种情况,Netty 提 供了一个简单的称为 Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的 ByteBuf 实例。

名 称 描 述
buffer()
buffer(int initialCapacity)
buffer(int initialCapacity, int maxCapacity)
返回一个未池化的基于堆内存存储的
ByteBuf
directBuffer()
directBuffer(int initialCapacity)
directBuffer(int initialCapacity, int maxCapacity)
返回一个未池化的基于直接内存存储
的 ByteBuf
wrappedBuffer() 返回一个包装了给定数据的 ByteBuf
copiedBuffer() 返回一个复制了给定数据的 ByteBuf

六、ChannelHandler和ChannelPipeline

6.1 ChannelPipeline 接口

如果你认为ChannelPipeline是一个拦截流经Channel的入站和出站事件的Channel- Handler 实例链

image-20210610185708024.png

通过调用 ChannePipeline 上的相关方法, ChannelHandler 可以添加、删除或者替换其他 的 ChannelHandler,从而实时地修改 ChannelPipeline 的布局。(它也可以将它自己从 ChannelPipeline 中移除。)这是 ChannelHandler 最重要的能力之一,所以我们将仔细地来看 看它是如何做到的。

ChannelPipeline pipeline = ..;
FirstHandler firstHandler = new FirstHandler();
pipeline.addLast("handler1", firstHandler);
pipeline.addFirst("handler2", new SecondHandler());
pipeline.addLast("handler3", new ThirdHandler());
...
pipeline.remove("handler3");
pipeline.remove(firstHandler);
pipeline.replace("handler2", "handler4", new ForthHandler());

Handle的方法实现顺序

  1. channelRegistered
  2. channelActive
  3. channelRead
  4. channelReadComplete
  5. channelInactive
  6. channelUnregistered

6.2 ChannelPipeline触发事件

ChannelPipeline的API公开了用于调用入站和出站操作的附加方法:

ChannelPipeline的入站操作:

方法名称 描述
fireChannelRegistered 调用ChannelPipeline中下一个ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法
fireChannelUnregistered 调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelUnregistered(ChannelHandlerContext)方法
fireChannelActive 调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelActive(ChannelHandlerContext)方法
fireChannelInactive 调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelInactive(ChannelHandlerContext)方
fireExceptionCaught 调用ChannelPipeline中下一个ChannelInboundHandler的fireExceptionCaught(ChannelHandlerContext)方法
fireUserEventTriggered 调用ChannelPipeline中下一个ChannelInboundHandler的fireUserEventTriggered(ChannelHandlerContext)方法
fireChannelRead 调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelRead(ChannelHandlerContext)方法
fireChannelReadComplete 调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelReadComplete(ChannelHandlerContext)方法
fireChannelWritabilityChanged 调用ChannelPipeline中下一个ChannelInboundHandler的fireChannelWritabilityChanged(ChannelHandlerContext)方法

6.3 使用 ChannelHandlerContext

@Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Channel read message: " + msg);
ctx.fireChannelRead(msg);
}
}
上次更新时间: 2024/5/7 05:59:02