ChannelPipeline可以将ChannelHandler 连接在一起处理业务逻辑。下面会讲解他们之间的关系,以及一个重要的组件ChannelHandlerContext。理解他们之间的交互,对于Netty构建模块化的、可重用的实现至关重要。
Channel 接口
Channel 的生命周期
Channel 接口定义了和ChannelInboundHandler 密切相关的状态模型。下面是Channel 的4个状态:
如上图所示,当Channel 发生状态变更时,会生成对应的事件。这些事件将会转发给 ChannelPipeline 中的 ChannelHandler,然后Handler处理对应的事件。
ChannelHandler 的生命周期
下表列出了ChannelHandler 的生命周期操作,在ChannelHandler 被添加/移除时会调用这些操作。这些方法每个都会有 ChannelHandlerContext 的入参。
ChannelInboundHandler 接口
下面列出了ChannelInboundHandler 的生命周期方法。这些方法会在数据接收/发送或Channel状态发生改变时被调用。这些方法和Channel 的生命周期密切相关。
channelRegistered:当Channel 已经注册到EventLoop并且能够处理IO时被调用
channelUnregistered:当Channel从EventLoop注销且无法处理IO时被调用
channelActive:当Channel 被激活后调用,此时Channel 已经连接/绑定完成
channelInactive:当Channel离开活动状态,并且不再连接它的远程节点时被调用
channelReadComplete:当Channel 的一个读操作完成时被调用
channelRead:当Channel 进行完成1次读取后调用
channelWritabilityChanged:当Channel 的可写状态发生变化时被调用,用户需要确保写操作不会完成的太快(避免OOM),或者可以在Channel 再次变为可写时恢复写入。可以通过Channel 的isWritable() 方法检测 Channel 的可写性。可写性相关的阈值可以通过Channel.config().setWriteLowWaterMark()和Channel.config().setWriteHighWaterMark()来设置。
userEventTriggered():当ChannelInboundHandler.fireUserEventTriggered()方法被调用时触发调用,因为此时有一个Object传给了ChannelPipeline
当重写ChannelInboundHandler 的 channelRead() 方法时,它将显示的释放与池化ByteBuf 相关的实例内存。释放内存空间,Netty 提供了一个实用的方法:ReferenceCountUtil.release()。
ChannelOutboundHandler 接口
出站操作和数据由ChannelOutboundHandler处理,其方法会被Channel、ChannelPipeline和ChannelHandlerContext 调用。ChannelOutboundHandler可以按需推迟事件,这样可以用来处理一些复杂请求。例如 write 暂停了,可以将 flush操作延迟进行。ChannelOutboundHandler的相关方法如下:
bind():当将Channel绑定到本地时被调用
connect():当将Channel连接到远程节点时被调用
disconnect():当请求将Channel从远程节点断开时被调用
close():当请求被关闭时被调用
deregister():当将Channel从EventLoop上注销时被调用
read():当请求从Channel读取更多数据时被调用
flush():当通过Channel将数据 flush 到远程节点时被调用
write():当通过Channel将数据写到远程节点时被调用
ChannelHandler 适配器
通常我们业务代码中实现入站和出站Handler时,可以继承 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter。这两个适配器分别提供了入站和出站Handler的基本实现。通过抽象类 ChannelHandlerAdapter,获得了ChannelHandler的方法。类层次结构如下:
ChannelHandlerAdapter 提供了isSharable(),如果Handler标注为 @Sharable,则方法返回为 true,它表示这个Handler实例可以被添加到多个ChannelPipeline中。在HandlerAdapter 中调用关联的ChannelHandlerContext 上的等效方法,可以将事件转发到Pipeline 中的下一个ChannelHandler。
业务开发中,也可以通过集成Adapter的子类(SimpleChannelInboundHandler),来实现入站的Handler。
资源管理
当通过调用 ChannelInboundHandler.channelRead()或ChannelOutboundHandler.write()处理数据时,都需要确保没有资源泄漏。Netty使用引用计数来处理池化的ByteBuf,所以在使用完ByteBuf后,调整其引用计数是很重要的。
为了帮助诊断潜在的资源泄漏问题,Netty 提供了ResourceLeakDetector,它将对应用程序缓冲区约1%的采样检测内存泄漏,相关开销较小。目前Netty定义了4种检测级别:
DISABLED:禁止泄漏检测。只有在程序完全测试之后才设置
SIMPLE:使用1%的采样频率检测泄漏。这个是默认设置,适合绝大部分的情况
ADVANCED:除了SIMPLE 的功能外,还会报告对应消息被访问的位置
PARANOID:功能类似于ADVANCED,但是采样率为100%,对性能影响很大,适合调试阶段
总之,如果一个消息被消费或者被丢弃了,并没有传递给下一个Handler,那么它就有责任调用 ReferenceCountUtil.release() 释放资源。如果消息到达了网络传输层,那么它被写入时或Channel关闭时,都会被自动释放。
ChannelPipeline 接口
每个新创建的Channel 都会被分配一个新的 ChannelPipeline,这个关联是永久的。Channel 既不能附加另外一个ChannelPipeline,也不能离开当前的。事件触发后,将会被对应的入站和出站Handler处理。然后通过调用 ChannelHandlerContext,事件将会被继续传递。
ChannelHandlerContext 可以让当前Handler 和 ChannelPipeline 和其他Handler 交互。ChannelHandler可以通知触发ChannelPipeline 中的下一个Handler,甚至可以动态修改其所属的ChannelPipeline。
修改ChannelPipeline
ChannelHandler 可以通过添加、删除或替换其他的ChannelHandler 来实时修改ChannelPipeline的布局。下面看一下具体的方法:
addFirst()、addBefore()、addAfter()、addLast():将一个ChannelHandler 添加到ChannelPipeline 中
remove():将Handler从Pipeline中移除
replace():将ChannelPipeline 中的一个Handler 替换为另外一个
下面看一下 ChannelPipeline 中用于访问 ChannelHandler 的方法
get():通过类型或者名称返回ChannelHandler
context():返回和ChannelHandler绑定的ChannelContext
names():返回ChannelPipeline中所有ChannelHandler的名称
触发事件
ChannelPipeline 的 API 开放了用于调用入站和出站的方法,入站相关的方法,如下所示:
fireChannelRegistered():调用 pipeline 中下一个 ChannelInboundHandler 的 channelRegistered()
fireChannelUnregistered():调用 pipeline 中下一个 ChannelInboundHandler 的 channelUnregistered()
fireChannelActive():调用 pipeline 中下一个 ChannelInboundHandler 的 channelActived()
fireChannelInactive():调用 pipeline 中下一个 ChannelInboundHandler 的 channelInactived()
fireExceptionCaught():调用 pipeline 中下一个 ChannelInboundHandler 的 exceptionCaught()
fireUserEventTriggered():调用 pipeline 中下一个 ChannelInboundHandler 的 userEventTriggered()
fireChannelRead():调用 pipeline 中下一个 ChannelInboundHandler 的 channelRead()
fireChannelReadComplete():调用 pipeline 中下一个 ChannelInboundHandler 的 channelReadComplete()
fireChannelWritablityChanged():调用 pipeline 中下一个 ChannelInboundHandler 的 channelWritablityChanged()
出站相关的方法如下:
bind():将 channel 绑定到一个本地端口,调用 pipeline 中的下一个ChannelOutboundHandler 的 bind()
connect():将 channel 连接到一个远程端口,调用 pipeline 中的下一个ChannelOutboundHandler 的 connect()
disconnect():将 channel 断开连接,调用 pipeline 中的下一个ChannelOutboundHandler 的 disconnect()
close():将 channel 关闭,调用 pipeline 中的下一个ChannelOutboundHandler 的 close()
deregister():将 channel 从 EventLoop 中注销,调用 pipeline 中的下一个ChannelOutboundHandler 的 deregister()
flush():flush 数据发送到远端,调用 pipeline 中的下一个ChannelOutboundHandler 的 flush()
write():将数据写入到 channel,调用 pipeline 中的下一个ChannelOutboundHandler 的 write()
writeAndFlush():将数据写入到 channel, 并flush到远端,调用 pipeline 中的下一个ChannelOutboundHandler 的 writeAndFlush()
read():一次从 channel 中读取数据,调用 pipeline 中的下一个ChannelOutboundHandler 的 read()
总之,pipeline 保存了与 channel 关联的 ChannelHandler;pipeline 可以根据需要动态添加或删除 handler;pipeline 提供了操作入站和出站的API。
ChannelHandlerContext 接口
context 代表了 handler 和 pipeline 之间的关联,当有 handler 添加到 pipeline 中时,都会创建 context。context 的主要功能是管理其关联的 handler 和同一个 pipeline 中的其他 handler。
context 提供的方法类似于 channel 和 pipeline 中的方法,不过不同的是:context 方法是带有上下文关联的,是从当前 handler 往后传播;而 channel 或 pipeline 对应的方法是上下文无关的,会传播整个 pipeline。如果对于后者使用不当,可能会造成事件传播的死循环。ChannelHandlerContext 的API 汇总如下:
alloc():返回和这个context关联的 channel 所配置的 ByteBufAllocator
bind():绑定到给定的 SocketAddress,并返回 ChannelFuture
channel():返回 context 关联的 channel
close():关闭 channel,并返回ChannelFuture
connect():连接到戈丁的 SocketAddress,并返回 ChannelFuture
deregister():从 EventLoop 上注销,并返回 ChannelFuture
disconnect():从远程节点断开,并返回 ChannelFuture
executor():返回调度事件的 EventLoop
fireChannelActive():触发下一个 channel 的 channelActive()
fireChannelInactive():触发下一个 channel 的 channelInactive()
fireChannelRead():触发下一个 channel 的 channelRead()
fireChannelReadComplete():触发下一个 channel 的 channelReadComplete()
fireChannelRegistered():触发下一个 channel 的channelRegistered()
fireChannelUnregistered():触发下一个 channel 的 channelUnregistered()
fireChannelWritabilityChanged():触发下一个 channel 的 channelWritabilityChanged()
fireExceptionCaught():触发下一个 channel 的 exceptionCaught()
fireUserEventTriggered():触发下一个 handler 的 userEventTriggered()
handler():返回 context 关联的 handler()
isRemoved():如果关联的 handler 已经从 pipeline 中移除,则返回 true
name():返回实例的唯一名称
pipeline():返回 context 关联的 pipeline
read():将数据从 channel 读取到第一个缓冲区,如果成功则触发 READ 事件,当最后一个消息读取完成后,触发 channelReadComplete()
write():写入消息并向后传播
writeAndFlush():写入消息并向后传播,并 flush 到远端
总之,context 和 handler 之间的关联是不变的,因此缓存其引用是安全的;context 提供的方法是和上下文关联的,会产生更短的事件流,应尽可能使用该特性提高性能。
使用 ChannelHandlerContext
通过 context 获取到 channel,然后调用 write(),事件将流经整个 pipeline。
通过 context 获取到关联的 pipeline,然后调用 pipeline 的 write(),事件也将流经整个 pipeline。
通过 context 的 write()方法,事件将从当前 handler 向后传播,会跳过前面的 handler。
ChannelHandler 和 ChannelHandlerContext 的高级用法
context 可以获取到 pipeline,通过 pipeline 可以动态的替换 handler 达到切换协议的目的。除此之外,我们还可以缓存 context,以供后续使用。
对于一个 handler 如果要提供给多个 pipeline 使用,也就是说该 handler 是可共享的,需要用 @Sharable 注解标注 handler 类。否则提供给多个 pipeline 使用时,会触发异常。共享 handler 时我们也需要注意,handler 提供的方法需要是线程安全的。
异常处理
入站异常
入账异常需要重写 ChannelInboundHandler 的 exceptionCaught() 方法,如下所示:
ChannelHandler.exceptionCaught() 的默认实现是将当前异常传给下一个 handler;如果异常到达了pipeline 的尾端,它将会被标记为未处理;如果需要自定义处理异常,需要重写 exceptionCaught()。
出站异常
每个出站的操作都会返回一个 ChannelFuture,注册到 ChannelFuture 的 ChannelFutureListener 在操作完成时被通知是否成功。基本上所有的 ChannelOutboundHandler 的方法都会传入一个 ChannelPromise 的实例。作为 ChannelFuture 的子类,ChannelPromise 可以被分配用于异步通知的监听器。除此之外,ChannelPromise 还具有立即通知的可写方法 setSuccess() setFailure()。
下面看下,出站消息打印错误堆栈的2种实现。
参考:《Netty实战》、《极客时间:Netty源码剖析与实战》