当前位置: 代码迷 >> 综合 >> Netty17——Pipeline、Handler和HandlerContext的创建
  详细解决方案

Netty17——Pipeline、Handler和HandlerContext的创建

热度:85   发布时间:2023-12-22 03:23:17.0

?Netty中的ChannelPipeline、ChannelHandler和ChannelHandlerContext是非常核心的组件。

一、三者间的关系

?每当ServerSocket创建一个新的连接,就会创建一个Socket,对应的就是一个目标客户端;每一个新创建的Socket都会分配一个全新的ChannelPipeline;每个ChannelPipeline内部都含有多个ChannelHandlerContext,这些ChannelHandlerContext组成了一个双向链表,用于包装我们调用addLast()方法时添加的ChannelHandler。
在这里插入图片描述
?ChannelSocket和ChannelPipeline是一对一的关联关系,而ChannlePipeline内部的多个ChannelHandlerContext形成了一个链表,ChannelHandlerContext只是对ChannelHandler的封装。
?当一个请求进来的时候,会进入ChannelSocket对应的ChannelPipeline,并经过ChannelPipeline中所有的ChannelHandler,这里使用了过滤器的设计模式。

二、ChannelPipeline的作用和设计

在这里插入图片描述
?可以看到接口ChannelPipeline继承了接口ChannelInboundInvoker、ChannelOutboundInvoker和Iterable,表示它可以调用数据出站和入站的方法,同时也能遍历内部的链表。
?ChannelPipeline接口中的核心方法如下:
在这里插入图片描述
?从上图中可以看出来,ChannelPipeline的几个代表性的方法基本上都是针对Handler链表的插入、追加、删除和替换操作,类似一个LinkedList。同时也能返回Channel(即socket)。
?Netty的源码中,在ChannelPipeline接口的介绍中有如下一个图示:
在这里插入图片描述
?对上图的解释:
??①这是一个Handler的List,Handler用于处理或拦截入站或出站事件,Pipeline实现了过滤器的高级形式,以便用户控制事件如何处理以及Handler在Pipeline中如何交互。
??②上图描述了一个典型的Handler在Pipeline中处理I/O事件的方式,IO事件由InboundHandler或者OutboundHandler处理,并通过调用ChannelHandlerContext.fireChannelRead方法转发给其最近的处理程序。
??③入站事件由入站处理程序以自下而上的方向处理,如图所示。入站处理程序通常处理由图底部的I/O线程生成入站数据。入站数据通常从如SocketChannel.read(ByteBuffer)获取。
??④通常一个ChannelPipeline有多个Handler,例如,一个典型的服务器在每个通道的管道中都会有以下处理程序:
???协议解码器:将二进制数据转换为Java对象
???协议编码器:将Java对象转换为二进制数据
???业务逻辑处理程序:执行实际业务逻辑(如访问数据库)
??⑤你的业务程序不能将线程阻塞,会影响IO的速度,进而影响整个Netty程序的性能。如果你的业务程序很快,就可以放在IO线程中,反之,你需要异步执行。或者在添加Handler的时候添加一个线程池,例如:

// 这个任务的执行将不会阻塞IO线程,因为执行它的线程来自group线程池
pipeline.addLast(group,"handler",new MyBusinessLogicHandler());
三、ChannelHandler的作用和设计
public interface ChannelHandler {
    // 当把ChannelHandler添加到Pipeline时调用void handlerAdded(ChannelHandlerContext ctx) throws Exception;// 当把ChannelHandler从Pipeline移除时调用void handlerRemoved(ChannelHandlerContext ctx) throws Exception;// 当处理过程中在Pipeline中发生异常时调用@Deprecatedvoid exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;@Inherited@Documented@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@interface Sharable {
    // no value}
}

?ChannelHandler的作用就是处理IO事件或拦截IO事件,并将其转发给下一个处理程序ChannelHandler。Handler处理事件时分入站和出站的,两个方向的操作都是不同的。因此,Netty定义了两个子接口继承ChannelHandler——ChannelInboundHandler和ChannelOutboundHandler。
?1、ChannelInboundHandler——入站事件接口
在这里插入图片描述
?核心方法如下:
??channelActive:当Channel处于活动状态时被调用
??channelRegistered:Channel注册时被调用
??channelRead:从Channel中读取数据时被调用
?Netty会回调这些方法,因此如果我们关注这其中的某些事件的时候就需要重写对应的这些方法。
?2、ChannelOutboundHandler——出站事件接口
在这里插入图片描述
?核心方法如下:
??bind:将Channel绑定到本地地址时调用
??close:关闭Channel时调用
?出站操作都是一些连接和写出数据类似的方法。
?3、ChannelDuplexHandler——处理入站和出站事件
在这里插入图片描述
?ChannelDuplexHandler间接实现了入站接口并直接实现了出站接口,是一个通用的能够同时处理入站事件和出站事件的类。

四、ChannelHandlerContext的作用和设计

在这里插入图片描述
?ChannelHandlerContext继承了出站方法调用接口和入站方法调用接口。
?ChannelInboundInvoker接口的方法如下:
在这里插入图片描述
?ChannelOutboundInvoker接口的方法如下:
在这里插入图片描述
?这两个Invoker接口就是针对入站或出站方法来的,就是在入站或出站Handler的外层再包装一层,达到在Handler的方法前后拦截并做一些特定操作的目的。
?ChannelHandlerContext接口的方法如下:
在这里插入图片描述
?ChannelHandlerContext不仅仅是继承了ChannelInboundInvoker和ChannelOutboundInvoker,同时也定义了一些自己的方法,这些方法能够获取Context上下文环境中对应的channel、executor、handler、pipeline、内存分配器、关联的handler是否被删除等信息。Context就是包装了Handler相关的一切,以方便Context可以在pipeline方便的操作Handler。

五、ChannelPipeline|ChannelHandler|ChannelHandlerContext的创建过程

?分为3个步骤来看创建过程:
?1、任何一个ChannelSocket创建的同时都会创建一个Pipeline。
?2、当用户或系统内部调用ChannelPipeline的addXxx方法添加Handler时,都会创建一个包装这个Handler的Context。
在这里插入图片描述
?3、这些Context在ChannelPipeline中组成了双向链表。

5.1 Socket创建时创建ChannelPipeline

?SocketChannel的抽象父类AbstractChannel的构造方法如下:

protected AbstractChannel(Channel parent) {
    this.parent = parent;id = newId();unsafe = newUnsafe();pipeline = newChannelPipeline();
}

?调用newChannelPipeline()时会调用如下的DefaultChannelPipeline的构造方法:

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
//构造了首尾节点,并形成双向链表
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =  new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;
}

?上述方法中做了以下几件事:
?①将channel赋值给channel属性,用于pipeline操作channel
?②创建一个future和promise,用于异步回调
?③创建一个inbound的tailContext,创建一个既是inbound类型又是outbound类型的headContext,tailContext和headContext非常重要,所有pipeline中的事件都会经过它们
?④将两个Context相互连接,形成双向链表

5.2 在addXxx添加处理器的时候创建ContextYyy

?以addLast()为例,DefaultChannelPipeline的addLast()方法创建Context的代码如下:

@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
    throw new NullPointerException("handlers");}for (ChannelHandler h: handlers) {
    if (h == null) {
    break;}addLast(executor, null, h);}return this;
}

?继续调用addLast():

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;synchronized (this) {
    checkMultiplicity(handler);newCtx = newContext(group, filterName(name, handler), handler);addLast0(newCtx);// If the registered is false it means that the channel was not registered on an eventloop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.if (!registered) {
    newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {
    newCtx.setAddPending();executor.execute(new Runnable() {
    @Overridepublic void run() {
    callHandlerAdded0(newCtx);}});return this;}}callHandlerAdded0(newCtx);return this;
}

?在该方法中做了以下几件事:
?①pipeline添加handler,参数是线程池,name是null,handler是我们或者系统传入的handler,Netty为了防止多个线程导致安全问题,同步了这段代码
?②检查handler实例是否是共享的,如果不是,并且已经被别的pipeline使用了,则抛出异常

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
    ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;if (!h.isSharable() && h.added) {
    throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}
}

?③调用newContext(group,filterName(name,handler),handler)方法,创建一个Context,这里就可以看出来,每次添加一个handler,都会创建一个关联的Context
?④调用addLast0(),将Context追加到链表中tailContext的前面:

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;
}

?⑤如果这个通道还没有注册到selector上,就将这个Context添加到这个pipeline的待办任务中,当注册好了以后,就会调用callHandlerAdded0(newCtx)——这个方法默认什么都不做,用户可以重写这个方法

六、Pipeline Handler HandlerContext创建过程梳理

?上面分析了Pipeline、Handler和HandlerContext的创建过程:每当创建ChannelSocket的时候都会创建一个绑定的pipeline,它们是一对一的,创建pipeline的时候也会创建tail节点和head节点,形成最初的双向链表。tail是入站inbound类型的handler,head既是inbound也是outbound类型的handler。在调用pipeline的addLast等方法的时候会根据给定的handler创建一个个的Context,并每次都将创建的Context插入到双向链表的tailContext的前面。其步骤大致如下:
?①每当创建 ChannelSocket 的时候都会创建一个绑定的 pipeline,它们是一对一的关系,创建 pipeline 的时候会创建 tail 节点和 head 节点,形成最初的链表
?②在调用 pipeline 的 addLast等方法的时候,会根据给定的 handler 创建一个 Context, 然后,将这个 Context 插入到链表的尾端(tail 前面)
?③Context 包装 handler,多个 Context 在 pipeline 中形成双向链表
?④入站方向叫 inbound,由 head 节点开始,出站方法叫 outbound ,由 tail 节点开始

  相关解决方案