当前位置: 代码迷 >> 综合 >> Netty(七)源码解析 之 Reactor 模型、Netty的服务端启动源码分析
  详细解决方案

Netty(七)源码解析 之 Reactor 模型、Netty的服务端启动源码分析

热度:61   发布时间:2024-02-11 13:58:09.0

Netty 源码解析

1. Reactor 模型

在解析 Netty 源码之前,我们首先要搞清楚 Reactor 模型。因为现在的网络通信框架,大多数都是基于 Reactor 模型进行设计和开发的,Netty 也不例外。

1.1 Reactor 单线程模型

在这里插入图片描述

Selector.select()方法查询Channel是否就绪,遍历完所有Channel作为一次轮询,一次轮询后没有任何就绪,如果设置了阻塞时间,判断是否超时,超时直接返回0,否则一直阻塞继续轮询,只要有任意一个Channel就绪了立即返回

Reactor 单线程模型,指的是当前的 Sever 会为每一个通信对端形成一个 Channel,而所有这些 Channel 都会与一个线程相绑定,该线程用于完成它们间的所有通信处理。该线程需要完成的操作有:

  • 若当前为 Server,则该线程需要接收并处理 Client 的连接请求
  • 若当前为 Client,则该线程需要向 Server 发起连接
  • 读取通信对端的消息
  • 向通信对端发送消息

1.2 Reactor 线程池模型

在这里插入图片描述

Reactor 单线程模型中使用一个线程处理所有通信对端的所有请求,在高并发场景中会严重影响系统性能。所以,就将单线程模型中的这一个线程替换为了一个线程池。大大提高了系统性能。

1.3 Reactor 多线程池模型

在这里插入图片描述

若请求连接的并发量是数以百万计的,且 IO 操作还比较耗时,此时的 Server 即使采用的是 Reactor 线程池模型,系统性能也会急剧下降。此时,可以将连接操作与 IO 操作分开处理,形成 Reactor 的多线程模型。

当客户端通过处理连接请求的 Channel 连接上 Server 后,系统会为该客户端再生成一个子 Channel 专门用于处理该客户端的 IO 请求。这两类不同的 Channel 连接着两类不同的线程池。而线程池中的线程数量,可以根据需求分别设置。提高了系统性能。

1.4 Netty-Server 的 Reactor 模型

在这里插入图片描述

Netty-Server 采用了多线程池模型。不过线程池是由 EventLoopGroup 充当。EventLoopGroup中的每一个 EventLoop 都绑定着一个线程,用于处理该 Channel 与当前 Server 间的操作。一个 Channel 只能与一个 EventLoop 绑定,但一个 EventLoop 可以绑定多个 Channel。即 Channel与 EventLoop 间的关系是 n:1。

经过生产下实践证明的,一般对于百万级的 QPS,parentGroup 设置为 2 ,childGroup设置为 4 ,就完全没有问题。

1.5 Netty-Client 的 Reactor 模型

在这里插入图片描述

Netty-Client 采用的是线程池模型。因为其只需要与 Server 连接一次即可,无需区分连接请求与 IO 请求。

强调一下:以上模型可以看出,一个EventLoop专门绑定一个Selector

Netty 框架中导入外部工程

  • 1.先解压Netty源码,并用idea导入成maven工程
    在这里插入图片描述
  • 2.将之前写的Demo工程,拷贝到解压的源码目录里
    在这里插入图片描述
    在这里插入图片描述
  • 3.idea中导入这个复制的Demo工程,也是作为maven导入
    在这里插入图片描述
  • 4.删掉Demo的pom文件(之前有编译的话target目录也删了),复制netty源码中的all的pom
    在这里插入图片描述
    在这里插入图片描述
  • 5.第一次执行Demo,会发现报错,少了一些类
    在这里插入图片描述
    找到Netty源码的common工程中有个包templates.io.netty.util.collection,里面有很多模板文件:
    在这里插入图片描述
    再打开codegen.groovy脚本:
    可以发现,我们可以通过脚本将模板文件生成类,规则就是将K替换成各种数据类型例如Byte、Character、Short…等
    在这里插入图片描述
    在这里插入图片描述
  • 6.现在我们需要运行该脚本,需要先加个插件
    <plugin><artifactId>maven-checkstyle-plugin</artifactId><configuration><skip>true</skip></configuration>
    </plugin>
    
    在这里插入图片描述
  • 7.在Netty源码common工程所在目录,执行命令进行预编译
    mvn compile -Dcheckstyle.skip=true
    在这里插入图片描述
    编译成功后,可以看到,会自动生成这些class文件
    在这里插入图片描述
    在这里插入图片描述
  • 8.然后在运行就不会报错了,接下来就可以在源码中运行,并随时写注释了
    在这里插入图片描述

2. Netty 服务端启动

我们先来分析第一个流程:Netty 服务端启动,一共包括四个子流程

  • 创建服务端 Channel
  • 初始化服务端 Channel
  • 将Channel注册给 Selector
  • 端口绑定

入口

Channel的创建是在bind里面触发的
在这里插入图片描述

public class SomeServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup parentGroup = new NioEventLoopGroup();EventLoopGroup childGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(parentGroup, childGroup)// 指定要创建Channel类型.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 获取channel中的PipelineChannelPipeline pipeline = ch.pipeline();// StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为Stringpipeline.addLast(new StringDecoder());// StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBufpipeline.addLast(new StringEncoder());pipeline.addLast(new SomeServerHandler());}});ChannelFuture future = bootstrap.bind(8888).sync();System.out.println("服务器已启动");future.channel().closeFuture().sync();} finally {parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}}
}

先跟bind方法:

//io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));
}//io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
public ChannelFuture bind(SocketAddress localAddress) {validate();//做一些验证if (localAddress == null) {throw new NullPointerException("localAddress");}return doBind(localAddress);
}
  • validate()做一些验证:
    public B validate() {if (group == null) {  // EventLoopGroup不能为空throw new IllegalStateException("group not set");}if (channelFactory == null) { //ChannelFactory不能为空throw new IllegalStateException("channel or channelFactory not set");}return self();
    }
    

继续看doBind方法:

该方法核心逻辑就两件事:

一是创建初始化channel,并注册到Selector(其中注册到Selector这个操作是异步的
二就是将这个初始化和注册成功的channel绑定到指定端口

因为第一个逻辑是异步执行的,所以这里有两个分支:

  • 如果异步操作已经完成,创建一个可修改的异步结果对象,直接进行绑定端口号操作,返回
  • 如果异步操作还未完成,创建一个正在注册的可修改异步结果对象直接返回,然后通过监听器,一但异步操作完成,在进行绑定端口

其中注册到Selector这个异步操作的结果是成功还是失败的逻辑判断是在doBind0里处理的,即在绑定端口的时候先确定channel是否已经注册成功,这个在本章最后端口绑定的时候会讲

private ChannelFuture doBind(final SocketAddress localAddress) {// 创建、初始化channel,并将其注册到Selectorfinal ChannelFuture regFuture = initAndRegister();// 从异步结果中获取channelfinal Channel channel = regFuture.channel();// 获取异步操作执行过程中发生的异常直接返回// 注意:只要有异常,说明此异步操作已经完成结束了if (regFuture.cause() != null) {return regFuture;}// 通常走到这里的时候,说明channel已经创建成功并且初始化成功了(看initAndRegister逻辑可以推断)// 但是注册到Selector的操作是异步的,有可能还在处理也有可能已经完成了// 判断当前异步操作是否完成(完成不代表成功,也有可能是异常)if (regFuture.isDone()) {   // 若异步操作完成// At this point we know that the registration was complete and successful.// 创建一个可修改的异步结果对象channelFutureChannelPromise promise = channel.newPromise();// 绑定端口号,regFuture的成功与否是在这个方法里面判断的doBind0(regFuture, channel, localAddress, promise);return promise;} else {  // 若异步操作未完成// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 为异步操作添加监听器regFuture.addListener(new ChannelFutureListener() {// 当异步操作完成(成功,异常),就会触发该方法的执行@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// 获取异步操作执行过程中发生的异常Throwable cause = future.cause();if (cause != null) {  // 异步执行过程发生异常// Registration on the EventLoop failed so fail // the ChannelPromise directly to not cause an// IllegalStateException once we try to access the // EventLoop of the Channel.// 修改异步结果为:失败promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586// 注册,也是修改promise.registered();// 绑定端口号doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}

一些需要注意的点:

  • 如果异步操作完成,创建一个可修改的异步结果对象ChannelPromise
    /** * Special {@link ChannelFuture} which is writable. * 可写的特殊的channelFuture */
    public interface ChannelPromise extends ChannelFuture, Promise<Void> {...}
    
    看到它继承ChannelFuture和Promise
    在这里插入图片描述
    ChannelFuture代表的就是对通道异步操作的结果对象,可以通过它获取异步操作的结果,ChannelFuture继承自io.netty.util.concurrent.Future
    其中ChannelFuture接口本身方法大多数都是添加删除监听,获取channel、同步等操作,基本都是在读,不存在写的操作
    在这里插入图片描述
    而Promise接口具有setFailure、setSuccess、trySuccess、tryFailure等方法,允许修改异步结果
    /** * Special {@link Future} which is writable. * 特殊的{@link Future}是可写的。 */
    public interface Promise<V> extends Future<V> {...}
    
    在这里插入图片描述
    即Promise是一个特殊的{@link Future}是可写的。在异步处理过程中可以设置这个异步结果成功还是失败等信息

  • 如果异步操作未完成,还是创建一个可修改的异步结果对象,只不过是一个特殊的实现类PendingRegistrationPromise(等待注册的Promise,继承体系中也是继承ChannelPromise)
    PendingRegistrationPromise -> DefaultChannelPromise -> ChannelPromise

现在我们详细跟initAndRegister方法:

该方法内部包含了我们需要关注的四个核心流程中的其中三个:

  • 创建一个Channel
  • 初始化Channel
  • 将当前channel注册给selector
final ChannelFuture initAndRegister() {Channel channel = null;try {// 创建一个channelchannel = channelFactory.newChannel();// 初始化channelinit(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(),GlobalEventExecutor.INSTANCE).setFailure(t);}// 将当前channel注册给selectorChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;
}

2.1 创建服务端 Channel

先看创建一个channel的分支,看到channel = channelFactory.newChannel();
那么channelFactory是什么?什么时候创建的?
我们回到一开始的SomeServer启动类,看到channel(Class<? extends C> channelClass)方法,该方法是用来指定创建的channel的类型,实际上就是这个方法创建channelFactory的:
在这里插入图片描述

//io.netty.bootstrap.AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}// 创建一个channelFactory然后返回自己return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}//io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory<? extends C>)
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {return channelFactory((ChannelFactory<C>) channelFactory);
}//io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>)
public B channelFactory(ChannelFactory<? extends C> channelFactory) {if (channelFactory == null) {throw new NullPointerException("channelFactory");}if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");}//赋值到成员变量this.channelFactory = channelFactory;return self();
}

上面代码可以看出channelFactory就是io.netty.channel.ReflectiveChannelFactory
好我们现在看一下channelFactory.newChannel()方法的具体实现:
在这里插入图片描述

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Constructor<? extends T> constructor;public ReflectiveChannelFactory(Class<? extends T> clazz) {ObjectUtil.checkNotNull(clazz, "clazz");try {// 我们通过io.netty.bootstrap.AbstractBootstrap#channel传进来的是// io.netty.channel.socket.nio.NioServerSocketChannel// 初始化NioServerSocketChannel的构造器this.constructor = clazz.getConstructor();} catch (NoSuchMethodException e) {throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +" does not have a public non-arg constructor", e);}}@Overridepublic T newChannel() {try {// 使用反射机制,调用其无参构造器,创建channelreturn constructor.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);}}...
}

可以看出来,是通过Class的无参构造创建Channel的,而这个Class就是io.netty.channel.socket.nio.NioServerSocketChannel,我们接着看NioServerSocketChannel的无参构造在初始化的时候都做了什么?找到NioServerSocketChannel的无参构造:

public class NioServerSocketChannel extends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel {...public NioServerSocketChannel() {// 我们Netty的channel实际上是对原生的NIO的channel的封装this(newSocket(DEFAULT_SELECTOR_PROVIDER));}...// 获取到一个全局性的provider,用来创建Channel或者Selector// nio讲解的时候介绍过private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();private static ServerSocketChannel newSocket(SelectorProvider provider) {try {/*** Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.** See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.*/// 通过全局性的provider,创建一个原生的NIO的channelreturn provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}}public NioServerSocketChannel(ServerSocketChannel channel) {// 封装super(null, channel, SelectionKey.OP_ACCEPT);// 创建channel的配置对象config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
}

可以看到通过全局性的provider,创建一个原生的NIO的ServerSocketChannel,再看super的构造都做了什么

//io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);
}//io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);//SelectableChannel ch,此时ch就是Nio原生的ServerSocketChannel对象this.ch = ch;//存储此channel需要关注的兴趣集//此时就是SelectionKey.OP_ACCEPT,接收事件this.readInterestOp = readInterestOp;try {// 指定channel为非阻塞ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", e2);}}throw new ChannelException("Failed to enter non-blocking mode.", e);}
}//继续看父类构造
//io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {// 此时parent是nullthis.parent = parent;// 为Netty的channel生成idid = newId();// 创建一个底层操作对象unsafe = newUnsafe();// 创建当前channel所绑定的channelPipelinepipeline = newChannelPipeline();
}protected ChannelId newId() {return DefaultChannelId.newInstance();
}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}
  • 看下id是由什么组成的

    //io.netty.channel.DefaultChannelId#newInstance
    public static DefaultChannelId newInstance() {return new DefaultChannelId();
    }//io.netty.channel.DefaultChannelId#DefaultChannelId
    private DefaultChannelId() {//看到整个ID是由machine(mark地址)、process(进程)、sequence(序列)、timestamp(时间戳)、random(随机数)组成data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];int i = 0;// machineIdSystem.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);i += MACHINE_ID.length;// processIdi = writeInt(i, PROCESS_ID);// sequencei = writeInt(i, nextSequence.getAndIncrement());// timestamp (kind of)i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());// randomint random = PlatformDependent.threadLocalRandom().nextInt();i = writeInt(i, random);assert i == data.length;hashCode = Arrays.hashCode(data);
    }
    
  • 看下unsafe 是什么对象
    不安全操作应永不从用户代码被调用。这些方法仅用于实现实际传输,必须从I/O线程调用,以下方法除外
    在这里插入图片描述
    unsafe 提供一堆方法用来实际传输,例如 写数据write、注册register、刷新flush、connect连接、close关闭、bind绑定…
    在这里插入图片描述
    Unsafe这些方法就是最底层了,封装的都是和Nio的交互!!!!

  • 再看下创建NioServerSocketChannelConfig配置对象

    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    

    其中javaChannel()返回的就是之前创建的ServerSocketChannel对象

    NioServerSocketChannelConfig的继承体系:
    NioServerSocketChannelConfig -> DefaultServerSocketChannelConfig -> ServerSocketChannelConfig -> ChannelConfig
    在这里插入图片描述
    实际上是一个channel的配置属性set集合,后面会通过它对channel进行配置

  • ChannelPipeline后面会专门讲,这里只要记住是在NioServerSocketChannel构造里初始化的

总结

这里创建了一个 channel,主要完成了以下几个流程:

  • ServerBootstrap在指定Channel通道类型的时候,底层创建了一个ReflectiveChannelFactory,该工厂可以通过使用指定的Class的构造器来创建Channel,而我们指定的就是NioServerSocketChannel
  • ServerBootstrap在调用bind方法的时候,底层会通过channelFactory.newChannel()创建Channel,即ReflectiveChannelFactory
  • 在利用构造器创建Channel实例的时候,会触发对应类的类初始化和构造器方法,我们主要关注的是NioServerSocketChannel构造器初始化都做了哪些事:
    • 通过全局性的SelectorProvider,创建一个原生的NIO的channel
    • 生成了 channel 的 id,ChannelId(包括介绍了ChannelId由哪些组成)
    • 创建了真正的数据传输对象 Unsafe
    • 创建并绑定了 ChannelPipeline
    • 设置当前Channel需要关注的事件(OP_ACCEPT事件)
    • 指定channel为非阻塞
    • 创建 channel 的的配置类 NioServerSocketChannelConfig对象

2.2 初始化服务端 Channel

继续分析源码之前,说明一个事情:
一般用Netty的时候,ServerBootstrap是可以使用option()/childOption()方法进行一些TCP相关配置
在这里插入图片描述
除了option可以通过attr()/childAttr()添加属性,这些属性是绑定在channel上的,可以在处理器中使用
在这里插入图片描述
例如:
在这里插入图片描述

继续分析,看初始化channel的方法io.netty.bootstrap.AbstractBootstrap#init

入口一开始分析过(忘记的看上面):
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind
io.netty.bootstrap.AbstractBootstrap#initAndRegister
io.netty.bootstrap.AbstractBootstrap#init

该方法是一个抽象方法,具体实现在其子类,我们是Server端所以看
io.netty.bootstrap.ServerBootstrap#init,主要分三大步:

  • 处理bootstrap中的option设置属性
  • 处理bootstrap中的attr设置属性
  • 向pipeline中添加ChannelInitializer处理器
    (目的是为了添加ServerBootstrapAcceptor处理器,该处理器用于处理client的连接,通过bootstrap设置的childOption和childAttr属性都会传给这个处理器,当接受连接的时候为客户端的channel设置这些属性
//io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {// 处理bootstrap中的option设置属性final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}// 处理bootstrap中的attr设置属性final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {// 将bootstrap中设置的所有attr属性配置给channelfor (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}// 向pipeline中添加处理器ChannelPipeline p = channel.pipeline();// 获取bootstrap中设置的所有child开头的属性final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// ChannelInitializer是一个处理器,其存在的意义是,为pipeline添加其它处理器p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 获取bootstrap中配置的handler()ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}// ch.eventLoop()是获取到当前channel所绑定的evenLoop// 然后再使用该eventLoop所绑定的线程来执行指定的任务ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 向pipeline中添加ServerBootstrapAcceptor处理器// 该处理器用于处理client的连接pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}

A、设置 options

先获取options,看options0()方法:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {...private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();//io.netty.bootstrap.AbstractBootstrap#options0final Map<ChannelOption<?>, Object> options0() {return options;}...
}

什么时候赋值的呢?之前说过在使用Netty入口类的时候,可以用option()/childOption()进行配置

//io.netty.bootstrap.AbstractBootstrap#option
public <T> B option(ChannelOption<T> option, T value) {if (option == null) {throw new NullPointerException("option");}if (value == null) {synchronized (options) {options.remove(option);}} else {synchronized (options) {  // 初始化optionsoptions.put(option, value);}}return self();
}//io.netty.bootstrap.ServerBootstrap#childOption
//只有Server端才需要child,设置的属性是给接受到的客户端连接的Channel用的
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {if (childOption == null) {throw new NullPointerException("childOption");}if (value == null) {synchronized (childOptions) {childOptions.remove(childOption);}} else {synchronized (childOptions) {childOptions.put(childOption, value);}}return this;
}

在看下setChannelOptions方法,为Channel设置Option,Option配置最终放到哪了?注意此时处理的是为parent的option,即为接受客户端连接的Channel设置option

//io.netty.bootstrap.AbstractBootstrap#setChannelOptions
static void setChannelOptions(Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {// 遍历通过bootstrap设置的所有optionfor (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {setChannelOption(channel, e.getKey(), e.getValue(), logger);}
}//io.netty.bootstrap.AbstractBootstrap#setChannelOption
private static void setChannelOption(Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {try {// 将设置的option配置给channel的configif (!channel.config().setOption((ChannelOption<Object>) option, value)) {logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);}} catch (Throwable t) {logger.warn("Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);}
}

看下channel.config()返回的是啥,因为我们的channel是NioServerSocketChannel,所以看下io.netty.channel.socket.nio.NioServerSocketChannel#config方法

public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {//...private final ServerSocketChannelConfig config;public NioServerSocketChannel(ServerSocketChannel channel) {// 封装super(null, channel, SelectionKey.OP_ACCEPT);// 初始化channel的配置对象config = new NioServerSocketChannelConfig(this, javaChannel().socket());}@Overridepublic ServerSocketChannelConfig config() {return config;}//...
}

可以看出来返回的就是NioServerSocketChannelConfig,调用的就是io.netty.channel.socket.nio.NioServerSocketChannel.NioServerSocketChannelConfig#setOption

//io.netty.channel.socket.nio.NioServerSocketChannel.NioServerSocketChannelConfig#setOption
public <T> boolean setOption(ChannelOption<T> option, T value) {//我们是NioChannel,肯定走第一个分支if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);}return super.setOption(option, value);
}//io.netty.channel.socket.nio.NioChannelOption#setOption
static <T> boolean setOption(Channel jdkChannel, NioChannelOption<T> option, T value) {java.nio.channels.NetworkChannel channel = (java.nio.channels.NetworkChannel) jdkChannel;if (!channel.supportedOptions().contains(option.option)) {return false;}if (channel instanceof ServerSocketChannel && option.option == java.net.StandardSocketOptions.IP_TOS) {// Skip IP_TOS as a workaround for a JDK bug:// See http://mail.openjdk.java.net/pipermail/nio-dev/2018-August/005365.htmlreturn false;}try {//看到底层还是用Nio的setOption方法channel.setOption(option.option, value);return true;} catch (IOException e) {throw new ChannelException(e);}
}

B、 设置 attr

和Options同理,ServerBootstrap类是可以用attr()/childAttr()方法添加属性,这些属性最终会绑定到channel上,可以通过channel获取到,一般用在处理器,看下取值和赋值的方法:

public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {...private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();//io.netty.bootstrap.AbstractBootstrap#attrs0 取值final Map<AttributeKey<?>, Object> attrs0() {return attrs;}...//io.netty.bootstrap.AbstractBootstrap#attr 设值public <T> B attr(AttributeKey<T> key, T value) {if (key == null) {throw new NullPointerException("key");}if (value == null) {synchronized (attrs) {attrs.remove(key);}} else {synchronized (attrs) {attrs.put(key, value);}}return self();}
}

在看下将这些值如何绑定到channel的,调用channel.attr(key).set(e.getValue())方法,注意此时是在处理parent的attr

//io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {// 处理bootstrap中的option设置属性final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}// 处理bootstrap中的attr设置属性final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {// 将bootstrap中设置的所有attr属性配置给channelfor (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}...
}

我们知道channel就是NioServerSocketChannel,NioServerSocketChannel的attr方法,实际上是继承自DefaultAttributeMap,所以NioServerSocketChannel本身就是一个属性Map:
在这里插入图片描述
就不看了,知道就好了
在这里插入图片描述

C、 添加处理器

注意上面说的设置option和attr,这些参数都是为ServerSocketChannel设置使用的,ServerSocketChannel是用来接受客户端连接请求的,而childOption和childAttr设置的参数都是为了在Server端接受到客户端连接请求后,为其客户端的channel设置使用的,所以下面可以看到会先将child的option和attr先交给专门处理client连接的处理器

@Override
void init(Channel channel) throws Exception {...// 向pipeline中添加处理器ChannelPipeline p = channel.pipeline();// 获取bootstrap中设置的所有child开头的属性final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}...
}

上面的childGroup和childHandler,childOptions,childAttrs都是我们之前通过ServerBootstrap设置的:
在这里插入图片描述

@Override
void init(Channel channel) throws Exception {...// 向pipeline中添加处理器ChannelPipeline p = channel.pipeline();// 获取bootstrap中设置的所有child开头的属性final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// ChannelInitializer是一个处理器,其存在的意义是,为pipeline添加其它处理器// 并且initChannel执行完后pipline上会移除这个处理器封装的节点(后面专门说)// 注意,此时initChannel方法不会立即执行,只有当当前channel注册成功之后才会执行p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 获取bootstrap中配置的handler()ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}// ch.eventLoop()是获取到当前channel所绑定的evenLoop// 然后再使用该eventLoop所绑定的线程来执行指定的任务ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 向pipeline中添加ServerBootstrapAcceptor处理器// 该处理器用于处理client的连接pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}

给监听客户端连接请求的channel添加一个ChannelInitializer处理器(后面会详细介绍),为pipeline添加其它处理器,一但添加完,ChannelInitializer就没用了,就会被回收(回收的是处理器封装的节点,处理器本身不会回收,后面会讲,不是本篇重点)。

为什么不直接调用p.addLast?而是添加一个ChannelInitializer再添加ServerBootstrapAcceptor?

第一,如果是希望把添加ServerBootstrapAcceptor处理器这个逻辑异步处理的话,是一定要放在ChannelInitializer.initChannel这个方法里,因为此时channel还没有注册,也就意味着还没有和eventLoop绑定,希望通过eventLoop执行异步任务,而此时ch.eventLoop()肯定是null会报错(注册逻辑后面讲)

第二,至于为什么添加ServerBootstrapAcceptor处理器的逻辑一定要异步处理,我也不是很清楚,但是Netty几乎所有的操作都是通过异步任务处理的,如果有人明白这个异步的必要性,求赐教!!!

添加的ChannelInitializer会做哪些事?

  • 获取ServerBootstrap中配置的handler(),如果配置了会加进到ChannelPipeline (服务端一般不配handle,配置childHandler)
    在这里插入图片描述

  • 获取当前channel绑定的evenLoop的线程执行指定的任务
    向pipeline中添加ServerBootstrapAcceptor处理器,该处理器用于处理client的连接

接下来我们看下ServerBootstrapAcceptor处理器(是个内部类)逻辑:

继承了ChannelInboundHandlerAdapter ,当client发送来连接请求时,会触发channelRead()方法的执行

  • 此时通道收到的msg就是当前Server的子channel
  • 初始化这个子channel(为客户端的子channel设置handler,option,attr)
  • 将当前子channel注册到selector(此时用的是childGroup进行注册的)
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;private final Runnable enableAutoReadTask;ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {//将传进来的都保存到成员变量里this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;// Task which is scheduled to re-enable auto-read.// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may// not be able to load the class because of the file limit it already reached.//// See https://github.com/netty/netty/issues/1328enableAutoReadTask = new Runnable() {@Overridepublic void run() {channel.config().setAutoRead(true);}};}// 当client发送来连接请求时,会触发channelRead()方法的执行@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {// 注意,这里client发送来的就是连接当前Server的子channelfinal Channel child = (Channel) msg;// 初始化这个子channel// 对用于处理client 读写请求的子channel设置handler,option,attrchild.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {// 将当前子channel注册到selector// 注意这里用的就是childGroup了childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {//如果注册失败,强制关闭forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}private static void forceClose(Channel child, Throwable t) {child.unsafe().closeForcibly();logger.warn("Failed to register an accepted channel: {}", child, t);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {final ChannelConfig config = ctx.channel().config();if (config.isAutoRead()) {// stop accept new connections for 1 second to allow the channel to recover// See https://github.com/netty/netty/issues/1328config.setAutoRead(false);ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);}// still let the exceptionCaught event flow through the pipeline to give the user// a chance to do something with itctx.fireExceptionCaught(cause);}
}

总结

初始化服务端Channel总共有三步:

  • A、设置 options
    • Option参数怎么来的?在启动程序中通过ServerBootstrap.option可以设置值
    • Option参数最终设置到哪了?通过io.netty.channel.socket.nio.NioServerSocketChannel.NioServerSocketChannelConfig#setOption进行设置的,底层最终调用nio的ServerSocketChannel.setOption方法
    • Option参数有什么用?设置的都是TCP相关的参数
  • B、 设置 attr
    • Attr参数怎么来的?在启动程序中通过ServerBootstrap.attr可以赋值
    • Attr参数存哪里?NioServerSocketChannel继承自DefaultAttributeMap,参数绑定在自己身上
    • Attr参数有什么用?pipeline链中的处理器可以获取到这些参数
  • C、 添加处理器
    • 这个时候会给监听客户端连接的Channel添加一个ChannelInitializer(大概原因是想调用Channel对应的eventLoop的线程执行异步任务,通过异步任务添加ServerBootstrap.handler指定的处理器,和ServerBootstrapAcceptor处理器,但是这个时候Channel还没有注册到Selector,eventLoop还没绑定
    • 异步任务中会为当前监听的channel添加一个ServerBootstrapAcceptor处理器,该处理器用于处理client的连接(会将当前监听的channel,childEventLoopGroup,childOptions,childAttrs都通过构造传给这个处理器,这些东西都是在启动类的时候通过ServerBootstrap设置的)
    • ServerBootstrapAcceptor的逻辑:
      • 当client发送来连接请求时,会触发channelRead()方法的执行(channelRead的msg就是客户端对应的Channel
      • 然后初始化这个子channel,为该channel添加childHandler,设置childOptions和childAttrs
      • 将当前子channel注册到selector(这个selector是childGroup中的selector)

2.3 将 Channel 注册给 Selector

入口:
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind
io.netty.bootstrap.AbstractBootstrap#initAndRegister
回到io.netty.bootstrap.AbstractBootstrap#initAndRegister方法:

final ChannelFuture initAndRegister() {Channel channel = null;try {// 创建一个channelchannel = channelFactory.newChannel();// 初始化channelinit(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 将当前channel注册给selectorChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {//如果异常,已注册,则直接关闭channel.close();} else {//否则强制关闭channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;
}

注册核心方法就是 ChannelFuture regFuture = config().group().register(channel)
config().group()返回的就是NioEventLoopGroup,即我们ServerBootstrap里设置的parentGroup

//group就是通过io.netty.bootstrap.ServerBootstrap#group设置的parentGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;
}

注意NioEventLoopGroup继承MultithreadEventLoopGroup,register方法默认实现在MultithreadEventLoopGroup中,我们看下MultithreadEventLoopGroup.register方法:

//io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {// 从parentGroup中根据算法选择一个eventLoop来完成注册return next().register(channel);
}//io.netty.channel.MultithreadEventLoopGroup#next
public EventLoop next() {return (EventLoop) super.next();
}//io.netty.util.concurrent.MultithreadEventExecutorGroup#next
public EventExecutor next() {return chooser.next();
}

chooser.next()返回一个EventExecutor(就是EventLoop后面详细介绍),而chooser是一个算法选择器,有两个实现,通常执行的是GenericEventExecutorChooser里面维护了一个EventExecutor的数组(后面也会详细讲,不在本篇重点)
在这里插入图片描述

private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();//EventExecutor的数组private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}
}

继续追踪EventLoop的register方法,MultithreadEventLoopGroup是从EventLoopGroup里面选,返回一个EventLoop,所以现在看SingleThreadEventLoop
在这里插入图片描述

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {```@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");//Unsafe!!!之前说过底层最终通过它来处理各种连接,之前初始化已经跟过创建逻辑了promise.channel().unsafe().register(this, promise);return promise;}...
}

NioSocketChannel里创建的Unsafe,是NioSocketChannelUnsafe

//io.netty.channel.AbstractChannel#AbstractChannel
protected AbstractChannel(Channel parent) {this.parent = parent;// 为Netty的channel生成idid = newId();// 底层操作对象unsafe = newUnsafe();// 创建当前channel所绑定的channelPipelinepipeline = newChannelPipeline();
}//io.netty.channel.AbstractChannel#newUnsafe
protected abstract AbstractUnsafe newUnsafe();//io.netty.channel.socket.nio.NioSocketChannel#newUnsafe
protected AbstractNioUnsafe newUnsafe() {return new NioSocketChannelUnsafe();
}

NioSocketChannelUnsafe是NioSocketChannel的内部类:
在这里插入图片描述

private final class NioSocketChannelUnsafe extends NioByteUnsafe {@Overrideprotected Executor prepareToClose() {try {if (javaChannel().isOpen() && config().getSoLinger() > 0) {// We need to cancel this key of the channel so we may not end up in a eventloop spin// because we try to read or write until the actual close happens which may be later due// SO_LINGER handling.// See https://github.com/netty/netty/issues/4449doDeregister();return GlobalEventExecutor.INSTANCE;}} catch (Throwable ignore) {// Ignore the error as the underlying channel may be closed in the meantime and so// getSoLinger() may produce an exception. In this case we just return null.// See https://github.com/netty/netty/issues/4449}return null;}
}

NioSocketChannelUnsafe 的register方法是继承io.netty.channel.AbstractChannel.AbstractUnsafe的默认实现:

protected abstract class AbstractUnsafe implements Unsafe {...@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {//空报错if (eventLoop == null) {throw new NullPointerException("eventLoop");}//已注册,修改ChannelFuture的结果if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}//不匹配if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}// 这里实现了channel与eventLoop的绑定!!!AbstractChannel.this.eventLoop = eventLoop;// 判断当前正在执行的线程是否是当前eventLoop所绑定的线程if (eventLoop.inEventLoop()) {// 若当前线程是eventLoop绑定线程,则直接让这个线程来完成注册操作register0(promise);} else {// 当前线程不是eventLoop绑定线程,则首先会创建一个线程// 创建线程逻辑后面会讲,不在本篇范围内// 然后使用这个新创建的eventLoop线程来完成注册try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}
}

几个核心关注点:

  • Channel和EventLoop的绑定方法就在这!
    AbstractChannel.this.eventLoop = eventLoop;
    一个eventLoop可以绑定多个channel

  • eventLoop.inEventLoop()方法:

    • 作用:判断当前正在执行的线程是否是当前eventLoop所绑定的线程
    • 目的:第一次走到该方法的时候,当前线程肯定是主线程,不希望让主线程做注册操作,比较耗时,让parent的EventLoopGroup里的线程处理,即最终注册的操作线程都是EventLoopGroup里的线程
    • eventLoop里面绑定的那个线程,不是一开始就初始化好了这个线程,只有在第一次注册的时候才会创建新的线程(后面讲,不是这次的重点)
  • 判断当前线程是否就是eventLoop的线程:

    //io.netty.util.concurrent.AbstractEventExecutor#inEventLoop
    public boolean inEventLoop() {return inEventLoop(Thread.currentThread());
    }//io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop
    public boolean inEventLoop(Thread thread) {return thread == this.thread;
    }
    

    断点调试看一下:
    在这里插入图片描述
    可以看到当前线程并不是EventLoop绑定的线程,EventLoop绑定的线程还是null
    eventLoop.execute第一次执行的时候是没有线程,是如何创建的?这个问题就要从NioEventLoopGroup的构造中开始看了
    简单介绍一下,不是本次重点,后面会专门说:
    io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
    …各种构造一直走
    io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup

    //io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {//核心地方,这个就是具有创建线程功能的总executorexecutor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}//chilren维护的就是当前group中的所有子eventLoopchildren = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {//构建子EventLoopchildren[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {...}}//chooser就是上面提到过的算法选择器,实现类会根据指定算法从children中返回一个eventLoopchooser = chooserFactory.newChooser(children);...
    }
    

    看下newChild方法,具体实现在其子类NioEventLoopGroup

    //io.netty.channel.nio.NioEventLoopGroup#newChild
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    

    这里可以知道NioEventLoopGroup里的子线程类型都是NioEventLoop,而NioEventLoop.execute方法执行底层其实是调用构造传入的executor的execute方法,即ThreadPerTaskExecutor(newDefaultThreadFactory())(底层怎么调用下一篇会介绍)

    看下ThreadPerTaskExecutor,其中就会有创建线程的逻辑:

    public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {threadFactory.newThread(command).start();}
    }
    

回到我们真正关注的重点,继续看AbstractUnsafe(具体类是NioSocketChannelUnsafe)具体的注册方法io.netty.channel.AbstractChannel.AbstractUnsafe#register0:

//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}//状态,是不是还从没有注册过,是的话就是第一次注册boolean firstRegistration = neverRegistered;// 完成注册doRegister();// 修改状态值neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.// 触发handlerAdded()方法的执行pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);// 触发channelRegistered()方法的执行pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.// 若当前channel是激活状态,且是第一次注册,// 则触发channelActive()的执行if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805// 执行到这里到时候Channel已经注册成功了// 此时SelectionKey的兴趣集还是0// beginRead方法会将SelectionKey的兴趣集// 设置为之前NioServerSocketChannel构造中设置的值beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}

注意点:

  • pipeline.invokeHandlerAddedIfNeeded():触发handlerAdded()方法的执行
    pipeline.fireChannelRegistered():触发channelRegistered()方法的执行
    pipeline.fireChannelActive():触发channelActive()的执行
    以上几个方法都会触发pipline链上的处理器对应的方法的执行:
    (后期会讲,先混个眼熟)
    在这里插入图片描述
    pipeline的分析,也是后期再讲,不是本篇重点

继续看doRegister()方法,其具体实现在AbstractNioChannel

//io.netty.channel.nio.AbstractNioChannel#doRegister
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 其实netty的channel的注册,本质上是原生的nio的channel的注册// 第二个参数0是形参ops的值,表示当前对任何事件都暂不关注selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}
}

细心的人会发现,这里注册到Selector的时候,ops的值是0,表示对任何事件都不关注,但是我们此时的Channel是NioServerSocketChannel,不是应该需要关注ACCEPT事件吗?
其实默认都是先注册为0,而注册ACCEPT事件的逻辑是在注册成功之后的beginRead方法里

private void register0(ChannelPromise promise) {try {...// 完成注册doRegister();...if (isActive()) {if (firstRegistration) {...} else if (config().isAutoRead()) {// 执行到这里到时候Channel已经注册成功了// 此时SelectionKey的兴趣集还是0// beginRead方法会将SelectionKey的兴趣集// 设置为之前NioServerSocketChannel构造中设置的值beginRead();}}} catch (Throwable t) {...}
}//io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
public final void beginRead() {assertEventLoop();if (!isActive()) {return;}try {doBeginRead();} catch (final Exception e) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireExceptionCaught(e);}});close(voidPromise());}
}//io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was called// 因为已经注册到Selector了,所以肯定是有SelectionKey的final SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;//这里interestOps应该是0final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {//其中readInterestOp就是之前在NioServerSocketChannel构造里赋值的//当时赋值是SelectionKey.OP_ACCEPTselectionKey.interestOps(interestOps | readInterestOp);}
}//回顾一下NioServerSocketChannel的构造:
public NioServerSocketChannel(ServerSocketChannel channel) {// 封装super(null, channel, SelectionKey.OP_ACCEPT);// 获取channel的配置对象config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//super构造
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {...this.readInterestOp = readInterestOp;...
}

总结

NioServerSocketChannel 的注册,实际上是由 Channel 所绑定的 EventLoop 中的线程来完成的,而注册的本质是将原生的 NIO 的 Channel 注册到了 Seletor。

步骤:

  • ServerBootstrap调用bind方法后,触发initAndRegister时,会调用config().group().register(channel)方法实现注册,本质是调用了NioEventLoopGroup的register方法
  • NioEventLoopGroup的register方法,会根据算法选择其中一个NioEventLoop,继续调用register方法,即NioEventLoop.register,它是继承SingleThreadEventLoop,具体实现在SingleThreadEventLoop里面
  • SingleThreadEventLoop.register方法实际上是调用了promise.channel().unsafe().register(this, promise)方法,即依靠Unsafe完成注册的,即NioSocketChannelUnsafe,register的具体实现是在io.netty.channel.AbstractChannel.AbstractUnsafe
  • AbstractUnsafe中,会用上面NioEventLoop绑定的线程(而不是主线程,并且第一次调用的时候才创建并绑定线程的,一开始NioEventLoop绑定的线程是null)来进行注册操作,底层还是利用Nio的api注册到了Selector,此时还会触发一系列pipline的事件(注册的时候兴趣集是0,之后在beginRead方法里修改了SelectionKey的兴趣集为ACCEPT)

2.4 端口绑定

现在我们看最后一个流程,端口绑定,回到doBind:
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind

private ChannelFuture doBind(final SocketAddress localAddress) {// 创建、初始化channel,并将其注册到Selectorfinal ChannelFuture regFuture = initAndRegister();// 从异步结果中获取channelfinal Channel channel = regFuture.channel();// 获取异步操作执行过程中发生的异常if (regFuture.cause() != null) {return regFuture;}// 判断当前异步操作是否完成:或者是成功,或者是异常if (regFuture.isDone()) {   // 若异步操作成功// At this point we know that the registration was complete and successful.// 创建一个可修改的异步结果对象channelFutureChannelPromise promise = channel.newPromise();// 绑定端口号doBind0(regFuture, channel, localAddress, promise);return promise;} else {  // 若异步操作未完成// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 为异步操作添加监听器regFuture.addListener(new ChannelFutureListener() {// 当异步操作完成(成功,异常),就会触发该方法的执行@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// 获取异步操作执行过程中发生的异常Throwable cause = future.cause();if (cause != null) {  // 异步执行过程发生异常// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.// 修改异步结果为:失败promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();// 绑定端口号doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}

之前我们分析过,一但注册的操作完成,就会触发doBind0进行端口号绑定:

//io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {//异步操作是成功才会触发绑定端口逻辑//这里异步操作就是注册selector的操作if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}

可以看到,绑定端口的工作,也是让eventLoop的异步处理的,并且只有在channel注册成功的时候才会进行绑定端口的工作,看下channel.bind(localAddress, promise)方法,实际上是调用NioServerSocketChannel.bind,其具体实现是其父类AbstractChannel中
在这里插入图片描述

//io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {//调用了pipline的bind方法return pipeline.bind(localAddress, promise);
}//继续走
//io.netty.channel.DefaultChannelPipeline#bind
//tail是AbstractChannelHandlerContext,代表的是pipline链上的节点(封装了处理器)中的最后一个节点,后面再说,不是这次本文重点
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);
}

注意:tail代表的是pipline链上的节点(封装了处理器的节点,不是处理器)中的最后一个节点,head代表的是第一个节点,head和tail是Netty定义好的,中间那些节点,是我们添加进去的处理器对应的节点,我们只要添加处理器就行了,Netty会自动封装成节点放到pipline上

//io.netty.channel.AbstractChannelHandlerContext#bind
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {if (localAddress == null) {throw new NullPointerException("localAddress");}if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}//这个方法先不讲,后面讲Channel 的 inBound 与 outBound 处理器时会专门讲//简单理解为获取当前节点的下一个节点final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();if (executor.inEventLoop()) {//执行绑定next.invokeBind(localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {//执行绑定next.invokeBind(localAddress, promise);}}, promise, null);}return promise;
}

直接跟invokeBind:

//io.netty.channel.AbstractChannelHandlerContext#invokeBind
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {bind(localAddress, promise);}
}

ChannelOutboundHandler,后面讲Channel 的 inBound 与 outBound 处理器时会专门讲,现在直接跟bind方法,只要知道这里最终会调io.netty.channel.DefaultChannelPipeline.HeadContext.bind()方法即可,即最终会调到头结点的bind方法:

final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, HeadContext.class);unsafe = pipeline.channel().unsafe();setAddComplete();}//io.netty.channel.DefaultChannelPipeline.HeadContext#bindpublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}...
}

可以看到又调了unsafe,底层代码了:
继续跟

//io.netty.channel.AbstractChannel.AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}// See: https://github.com/netty/netty/issues/576if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddress instanceof InetSocketAddress &&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {// Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn("A non-root user can't receive a broadcast packet if the socket " +"is not bound to a wildcard address; binding to a non-wildcard " +"address (" + localAddress + ") anyway as requested.");}// 若当前channel未被激活,则该方法返回falseboolean wasActive = isActive();try {// 绑定// 一旦端口被绑定了,则channel就被激活了doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}//if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {// 触发pipline中的处理器中的channelActive()方法的执行pipeline.fireChannelActive();}});}safeSetSuccess(promise);
}

doBind绑定,是个抽象方法,具体实现我们需要看NioServerSocketChannel

//io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}

如果是客户端NioSocketChannel也差不多:

//io.netty.channel.socket.nio.NioSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {doBind0(localAddress);
}private void doBind0(SocketAddress localAddress) throws Exception {// 若当前JDK平台使用的版本>=7,则...if (PlatformDependent.javaVersion() >= 7) {SocketUtils.bind(javaChannel(), localAddress);} else {SocketUtils.bind(javaChannel().socket(), localAddress);}
}//io.netty.util.internal.SocketUtils#bind(java.nio.channels.SocketChannel, java.net.SocketAddress)
public static void bind(final SocketChannel socketChannel, final SocketAddress address) throws IOException {try {AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws IOException {socketChannel.bind(address);return null;}});} catch (PrivilegedActionException e) {throw (IOException) e.getCause();}
}