ChannelHandler和ChannelPipeline

ChannelPipeline可以将ChannelHandler 连接在一起处理业务逻辑。下面会讲解他们之间的关系,以及一个重要的组件ChannelHandlerContext。理解他们之间的交互,对于Netty构建模块化的、可重用的实现至关重要。

Channel 接口

Channel 的生命周期

Channel 接口定义了和ChannelInboundHandler 密切相关的状态模型。下面是Channel 的4个状态:

如上图所示,当Channel 发生状态变更时,会生成对应的事件。这些事件将会转发给 ChannelPipeline 中的 ChannelHandler,然后Handler处理对应的事件。

ChannelHandler 的生命周期

下表列出了ChannelHandler 的生命周期操作,在ChannelHandler 被添加/移除时会调用这些操作。这些方法每个都会有 ChannelHandlerContext 的入参。

ChannelInboundHandler 接口

下面列出了ChannelInboundHandler 的生命周期方法。这些方法会在数据接收/发送或Channel状态发生改变时被调用。这些方法和Channel 的生命周期密切相关。

channelRegistered:当Channel 已经注册到EventLoop并且能够处理IO时被调用

channelUnregistered:当Channel从EventLoop注销且无法处理IO时被调用

channelActive:当Channel 被激活后调用,此时Channel 已经连接/绑定完成

channelInactive:当Channel离开活动状态,并且不再连接它的远程节点时被调用

channelReadComplete:当Channel 的一个读操作完成时被调用

channelRead:当Channel 进行完成1次读取后调用

channelWritabilityChanged:当Channel 的可写状态发生变化时被调用,用户需要确保写操作不会完成的太快(避免OOM),或者可以在Channel 再次变为可写时恢复写入。可以通过Channel 的isWritable() 方法检测 Channel 的可写性。可写性相关的阈值可以通过Channel.config().setWriteLowWaterMark()和Channel.config().setWriteHighWaterMark()来设置。

userEventTriggered():当ChannelInboundHandler.fireUserEventTriggered()方法被调用时触发调用,因为此时有一个Object传给了ChannelPipeline

当重写ChannelInboundHandler 的 channelRead() 方法时,它将显示的释放与池化ByteBuf 相关的实例内存。释放内存空间,Netty 提供了一个实用的方法:ReferenceCountUtil.release()。

ChannelOutboundHandler 接口

出站操作和数据由ChannelOutboundHandler处理,其方法会被Channel、ChannelPipeline和ChannelHandlerContext 调用。ChannelOutboundHandler可以按需推迟事件,这样可以用来处理一些复杂请求。例如 write 暂停了,可以将 flush操作延迟进行。ChannelOutboundHandler的相关方法如下:

bind():当将Channel绑定到本地时被调用

connect():当将Channel连接到远程节点时被调用

disconnect():当请求将Channel从远程节点断开时被调用

close():当请求被关闭时被调用

deregister():当将Channel从EventLoop上注销时被调用

read():当请求从Channel读取更多数据时被调用

flush():当通过Channel将数据 flush 到远程节点时被调用

write():当通过Channel将数据写到远程节点时被调用

ChannelHandler 适配器

通常我们业务代码中实现入站和出站Handler时,可以继承 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。这两个适配器分别提供了入站和出站Handler的基本实现。通过抽象类 ChannelHandlerAdapter,获得了ChannelHandler的方法。类层次结构如下:

ChannelHandlerAdapter 提供了isSharable(),如果Handler标注为 @Sharable,则方法返回为 true,它表示这个Handler实例可以被添加到多个ChannelPipeline中。在HandlerAdapter 中调用关联的ChannelHandlerContext 上的等效方法,可以将事件转发到Pipeline 中的下一个ChannelHandler。

业务开发中,也可以通过集成Adapter的子类(SimpleChannelInboundHandler),来实现入站的Handler。

资源管理

当通过调用 ChannelInboundHandler.channelRead()或ChannelOutboundHandler.write()处理数据时,都需要确保没有资源泄漏。Netty使用引用计数来处理池化的ByteBuf,所以在使用完ByteBuf后,调整其引用计数是很重要的。

为了帮助诊断潜在的资源泄漏问题,Netty 提供了ResourceLeakDetector,它将对应用程序缓冲区约1%的采样检测内存泄漏,相关开销较小。目前Netty定义了4种检测级别:

DISABLED:禁止泄漏检测。只有在程序完全测试之后才设置

SIMPLE:使用1%的采样频率检测泄漏。这个是默认设置,适合绝大部分的情况

ADVANCED:除了SIMPLE 的功能外,还会报告对应消息被访问的位置

PARANOID:功能类似于ADVANCED,但是采样率为100%,对性能影响很大,适合调试阶段

总之,如果一个消息被消费或者被丢弃了,并没有传递给下一个Handler,那么它就有责任调用 ReferenceCountUtil.release() 释放资源。如果消息到达了网络传输层,那么它被写入时或Channel关闭时,都会被自动释放。

ChannelPipeline 接口

每个新创建的Channel 都会被分配一个新的 ChannelPipeline,这个关联是永久的。Channel 既不能附加另外一个ChannelPipeline,也不能离开当前的。事件触发后,将会被对应的入站和出站Handler处理。然后通过调用 ChannelHandlerContext,事件将会被继续传递。

ChannelHandlerContext 可以让当前Handler 和 ChannelPipeline 和其他Handler 交互。ChannelHandler可以通知触发ChannelPipeline 中的下一个Handler,甚至可以动态修改其所属的ChannelPipeline。

修改ChannelPipeline

ChannelHandler 可以通过添加、删除或替换其他的ChannelHandler 来实时修改ChannelPipeline的布局。下面看一下具体的方法:

addFirst()、addBefore()、addAfter()、addLast():将一个ChannelHandler 添加到ChannelPipeline 中

remove():将Handler从Pipeline中移除

replace():将ChannelPipeline 中的一个Handler 替换为另外一个

ChannelPipeline pipeline = ...;
FirstHandler firstHandler = new FirstHandler();
pipeline.addLast("handler1", firstHandler); // 添加到尾部
pipeline.addFirst("handler2", new SecondHandler()); // 添加到头部
pipeline.addFirst("handler3", new ThirdHandler());
...
pipeline.remove("handler3");
pipeline.remove(firstHandler); // 移除Handler
pipeline.replace("handler2", "handler4", new ForthHandler()); // 替换Handler

下面看一下 ChannelPipeline 中用于访问 ChannelHandler 的方法

get():通过类型或者名称返回ChannelHandler

context():返回和ChannelHandler绑定的ChannelContext

names():返回ChannelPipeline中所有ChannelHandler的名称

触发事件

ChannelPipeline 的 API 开放了用于调用入站和出站的方法,入站相关的方法,如下所示:

fireChannelRegistered():调用 pipeline 中下一个 ChannelInboundHandler 的 channelRegistered()

fireChannelUnregistered():调用 pipeline 中下一个 ChannelInboundHandler 的 channelUnregistered()

fireChannelActive():调用 pipeline 中下一个 ChannelInboundHandler 的 channelActived()

fireChannelInactive():调用 pipeline 中下一个 ChannelInboundHandler 的 channelInactived()

fireExceptionCaught():调用 pipeline 中下一个 ChannelInboundHandler 的 exceptionCaught()

fireUserEventTriggered():调用 pipeline 中下一个 ChannelInboundHandler 的 userEventTriggered()

fireChannelRead():调用 pipeline 中下一个 ChannelInboundHandler 的 channelRead()

fireChannelReadComplete():调用 pipeline 中下一个 ChannelInboundHandler 的 channelReadComplete()

fireChannelWritablityChanged():调用 pipeline 中下一个 ChannelInboundHandler 的 channelWritablityChanged()

出站相关的方法如下:

bind():将 channel 绑定到一个本地端口,调用 pipeline 中的下一个ChannelOutboundHandler 的 bind()

connect():将 channel 连接到一个远程端口,调用 pipeline 中的下一个ChannelOutboundHandler 的 connect()

disconnect():将 channel 断开连接,调用 pipeline 中的下一个ChannelOutboundHandler 的 disconnect()

close():将 channel 关闭,调用 pipeline 中的下一个ChannelOutboundHandler 的 close()

deregister():将 channel 从 EventLoop 中注销,调用 pipeline 中的下一个ChannelOutboundHandler 的 deregister()

flush():flush 数据发送到远端,调用 pipeline 中的下一个ChannelOutboundHandler 的 flush()

write():将数据写入到 channel,调用 pipeline 中的下一个ChannelOutboundHandler 的 write()

writeAndFlush():将数据写入到 channel, 并flush到远端,调用 pipeline 中的下一个ChannelOutboundHandler 的 writeAndFlush()

read():一次从 channel 中读取数据,调用 pipeline 中的下一个ChannelOutboundHandler 的 read()

总之,pipeline 保存了与 channel 关联的 ChannelHandler;pipeline 可以根据需要动态添加或删除 handler;pipeline 提供了操作入站和出站的API。

ChannelHandlerContext 接口

context 代表了 handler 和 pipeline 之间的关联,当有 handler 添加到 pipeline 中时,都会创建 context。context 的主要功能是管理其关联的 handler 和同一个 pipeline 中的其他 handler。

context 提供的方法类似于 channel 和 pipeline 中的方法,不过不同的是:context 方法是带有上下文关联的,是从当前 handler 往后传播;而 channel 或 pipeline 对应的方法是上下文无关的,会传播整个 pipeline。如果对于后者使用不当,可能会造成事件传播的死循环。ChannelHandlerContext 的API 汇总如下:

alloc():返回和这个context关联的 channel 所配置的 ByteBufAllocator

bind():绑定到给定的 SocketAddress,并返回 ChannelFuture

channel():返回 context 关联的 channel

close():关闭 channel,并返回ChannelFuture

connect():连接到戈丁的 SocketAddress,并返回 ChannelFuture

deregister():从 EventLoop 上注销,并返回 ChannelFuture

disconnect():从远程节点断开,并返回 ChannelFuture

executor():返回调度事件的 EventLoop

fireChannelActive():触发下一个 channel 的 channelActive()

fireChannelInactive():触发下一个 channel 的 channelInactive()

fireChannelRead():触发下一个 channel 的 channelRead()

fireChannelReadComplete():触发下一个 channel 的 channelReadComplete()

fireChannelRegistered():触发下一个 channel 的channelRegistered()

fireChannelUnregistered():触发下一个 channel 的 channelUnregistered()

fireChannelWritabilityChanged():触发下一个 channel 的 channelWritabilityChanged()

fireExceptionCaught():触发下一个 channel 的 exceptionCaught()

fireUserEventTriggered():触发下一个 handler 的 userEventTriggered()

handler():返回 context 关联的 handler()

isRemoved():如果关联的 handler 已经从 pipeline 中移除,则返回 true

name():返回实例的唯一名称

pipeline():返回 context 关联的 pipeline

read():将数据从 channel 读取到第一个缓冲区,如果成功则触发 READ 事件,当最后一个消息读取完成后,触发 channelReadComplete()

write():写入消息并向后传播

writeAndFlush():写入消息并向后传播,并 flush 到远端

总之,context 和 handler 之间的关联是不变的,因此缓存其引用是安全的;context 提供的方法是和上下文关联的,会产生更短的事件流,应尽可能使用该特性提高性能。

使用 ChannelHandlerContext

通过 context 获取到 channel,然后调用 write(),事件将流经整个 pipeline。

ChannelHandlerContext context = ...;
Channel channel = context.channel(); // 获取 context 关联的 channel
channel.write(Unpooled.copiedBuffer("moguhu.com", CharsetUtil.UTF_8)); // 通过 channel 写入缓冲区

通过 context 获取到关联的 pipeline,然后调用 pipeline 的 write(),事件也将流经整个 pipeline。

ChannelHandlerContext context = ...;
ChannelPipeline pipeline = context.pipeline(); // 获取 context 关联的 pipeline
pipeline.write(Unpooled.copiedBuffer("moguhu.com", CharsetUtil.UTF_8)); // 通过 pipeline 写入缓冲区

通过 context 的 write()方法,事件将从当前 handler 向后传播,会跳过前面的 handler。

ChannelHandlerContext context = ...;
context.write(Unpooled.copiedBuffer("moguhu.com", CharsetUtil.UTF_8)); // 通过 context 写入缓冲区

ChannelHandler 和 ChannelHandlerContext 的高级用法

context 可以获取到 pipeline,通过 pipeline 可以动态的替换 handler 达到切换协议的目的。除此之外,我们还可以缓存 context,以供后续使用。

public class WriteHandler extends ChannelHandlerAdapter {
    private ChannelHandlerContext context;

    @Override
    public void handlerAdded(ChannelHandlerContext context) {
        this.context = context; // 缓存 context
    }

    public void send(String msg) {
        ctx.writeAndFlush(msg); // 使用缓存的 context 发送消息
    }

}

对于一个 handler 如果要提供给多个 pipeline 使用,也就是说该 handler 是可共享的,需要用 @Sharable 注解标注 handler 类。否则提供给多个 pipeline 使用时,会触发异常。共享 handler 时我们也需要注意,handler 提供的方法需要是线程安全的。

@Sharable // 共享注解
public class SharableHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext context, Object msg) {
        System.out.println("Channel read message : " + msg);
        context.fireChannelRead(msg); // 转发给下一个 handler
    }

}

异常处理

入站异常

入账异常需要重写 ChannelInboundHandler 的 exceptionCaught() 方法,如下所示:

public class InboundExceptionHandler extends ChannelInboundHandler {

    @Override
    public void exceptionCaught(ChannelHandlerContext context, Throwable cause) {
        cause.printStackTrace();
        context.close();
    }

}

ChannelHandler.exceptionCaught() 的默认实现是将当前异常传给下一个 handler;如果异常到达了pipeline 的尾端,它将会被标记为未处理;如果需要自定义处理异常,需要重写 exceptionCaught()。

出站异常

每个出站的操作都会返回一个 ChannelFuture,注册到 ChannelFuture 的 ChannelFutureListener 在操作完成时被通知是否成功。基本上所有的 ChannelOutboundHandler 的方法都会传入一个 ChannelPromise 的实例。作为 ChannelFuture 的子类,ChannelPromise 可以被分配用于异步通知的监听器。除此之外,ChannelPromise 还具有立即通知的可写方法 setSuccess() setFailure()。
下面看下,出站消息打印错误堆栈的2种实现。

ChannelFuture future = channel.write(message);
future.addListener(new ChannelFutureListener(){
    @Override
    public void operationComplete(ChannelFuture f) {
        if (!f.isSuccess()) {
            f.cause().printStackTrace();
            f.channel().close();
        }
    }
});
public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext context, Object msg, ChannelPromise promise) {
        promise.addListener(new ChannelFutureListener(){
            @Override
            public void operationComplete(ChannelFuture future) {
                if (!future.isSuccess()) {
                    future.cause().printStackTrace();
                    future.channel().close();
                }
            }
        });
    }
}


参考:《Netty实战》、《极客时间:Netty源码剖析与实战》