Netty 源码解析
1. Reactor 模型
在解析 Netty 源码之前,我们首先要搞清楚 Reactor 模型。因为现在的网络通信框架,大多数都是基于 Reactor 模型进行设计和开发的,Netty 也不例外。
1.1 Reactor 单线程模型
Selector.select()方法查询Channel是否就绪,遍历完所有Channel作为一次轮询,一次轮询后没有任何就绪,如果设置了阻塞时间,判断是否超时,超时直接返回0,否则一直阻塞继续轮询,只要有任意一个Channel就绪了立即返回
Reactor 单线程模型,指的是当前的 Sever 会为每一个通信对端形成一个 Channel,而所有这些 Channel 都会与一个线程相绑定,该线程用于完成它们间的所有通信处理。该线程需要完成的操作有:
- 若当前为 Server,则该线程需要接收并处理 Client 的连接请求
- 若当前为 Client,则该线程需要向 Server 发起连接
- 读取通信对端的消息
- 向通信对端发送消息
1.2 Reactor 线程池模型
Reactor 单线程模型中使用一个线程处理所有通信对端的所有请求,在高并发场景中会严重影响系统性能。所以,就将单线程模型中的这一个线程替换为了一个线程池。大大提高了系统性能。
1.3 Reactor 多线程池模型
若请求连接的并发量是数以百万计的,且 IO 操作还比较耗时,此时的 Server 即使采用的是 Reactor 线程池模型,系统性能也会急剧下降。此时,可以将连接操作与 IO 操作分开处理,形成 Reactor 的多线程模型。
当客户端通过处理连接请求的 Channel 连接上 Server 后,系统会为该客户端再生成一个子 Channel 专门用于处理该客户端的 IO 请求。这两类不同的 Channel 连接着两类不同的线程池。而线程池中的线程数量,可以根据需求分别设置。提高了系统性能。
1.4 Netty-Server 的 Reactor 模型
Netty-Server 采用了多线程池模型。不过线程池是由 EventLoopGroup 充当。EventLoopGroup中的每一个 EventLoop 都绑定着一个线程,用于处理该 Channel 与当前 Server 间的操作。一个 Channel 只能与一个 EventLoop 绑定,但一个 EventLoop 可以绑定多个 Channel。即 Channel与 EventLoop 间的关系是 n:1。
经过生产下实践证明的,一般对于百万级的 QPS,parentGroup 设置为 2 ,childGroup设置为 4 ,就完全没有问题。
1.5 Netty-Client 的 Reactor 模型
Netty-Client 采用的是线程池模型。因为其只需要与 Server 连接一次即可,无需区分连接请求与 IO 请求。
强调一下:以上模型可以看出,一个EventLoop专门绑定一个Selector
Netty 框架中导入外部工程
- 1.先解压Netty源码,并用idea导入成maven工程
- 2.将之前写的Demo工程,拷贝到解压的源码目录里
- 3.idea中导入这个复制的Demo工程,也是作为maven导入
- 4.删掉Demo的pom文件(之前有编译的话target目录也删了),复制netty源码中的all的pom
- 5.第一次执行Demo,会发现报错,少了一些类
找到Netty源码的common工程中有个包templates.io.netty.util.collection,里面有很多模板文件:
再打开codegen.groovy脚本:
可以发现,我们可以通过脚本将模板文件生成类,规则就是将K替换成各种数据类型例如Byte、Character、Short…等
- 6.现在我们需要运行该脚本,需要先加个插件
<plugin><artifactId>maven-checkstyle-plugin</artifactId><configuration><skip>true</skip></configuration> </plugin>
- 7.在Netty源码common工程所在目录,执行命令进行预编译
mvn compile -Dcheckstyle.skip=true
编译成功后,可以看到,会自动生成这些class文件
- 8.然后在运行就不会报错了,接下来就可以在源码中运行,并随时写注释了
2. Netty 服务端启动
我们先来分析第一个流程:Netty 服务端启动,一共包括四个子流程
- 创建服务端 Channel
- 初始化服务端 Channel
- 将Channel注册给 Selector
- 端口绑定
入口:
Channel的创建是在bind里面触发的
public class SomeServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup parentGroup = new NioEventLoopGroup();EventLoopGroup childGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(parentGroup, childGroup)// 指定要创建Channel类型.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 获取channel中的PipelineChannelPipeline pipeline = ch.pipeline();// StringDecoder:字符串解码器,将Channel中的ByteBuf数据解码为Stringpipeline.addLast(new StringDecoder());// StringEncoder:字符串编码器,将String编码为将要发送到Channel中的ByteBufpipeline.addLast(new StringEncoder());pipeline.addLast(new SomeServerHandler());}});ChannelFuture future = bootstrap.bind(8888).sync();System.out.println("服务器已启动");future.channel().closeFuture().sync();} finally {parentGroup.shutdownGracefully();childGroup.shutdownGracefully();}}
}
先跟bind方法:
//io.netty.bootstrap.AbstractBootstrap#bind(int)
public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));
}//io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
public ChannelFuture bind(SocketAddress localAddress) {validate();//做一些验证if (localAddress == null) {throw new NullPointerException("localAddress");}return doBind(localAddress);
}
- validate()做一些验证:
public B validate() {if (group == null) { // EventLoopGroup不能为空throw new IllegalStateException("group not set");}if (channelFactory == null) { //ChannelFactory不能为空throw new IllegalStateException("channel or channelFactory not set");}return self(); }
继续看doBind方法:
该方法核心逻辑就两件事:
一是创建初始化channel,并注册到Selector(其中注册到Selector这个操作是异步的
)
二就是将这个初始化和注册成功的channel绑定到指定端口
因为第一个逻辑是异步执行的,所以这里有两个分支:
- 如果异步操作已经完成,创建一个
可修改的异步结果对象
,直接进行绑定端口号
操作,返回 - 如果异步操作还未完成,创建一个
正在注册的可修改异步结果对象
直接返回,然后通过监听器,一但异步操作完成,在进行绑定端口
其中注册到Selector这个异步操作
的结果是成功还是失败的逻辑判断是在doBind0里处理的,即在绑定端口的时候先确定channel是否已经注册成功,这个在本章最后端口绑定的时候会讲
private ChannelFuture doBind(final SocketAddress localAddress) {// 创建、初始化channel,并将其注册到Selectorfinal ChannelFuture regFuture = initAndRegister();// 从异步结果中获取channelfinal Channel channel = regFuture.channel();// 获取异步操作执行过程中发生的异常直接返回// 注意:只要有异常,说明此异步操作已经完成结束了if (regFuture.cause() != null) {return regFuture;}// 通常走到这里的时候,说明channel已经创建成功并且初始化成功了(看initAndRegister逻辑可以推断)// 但是注册到Selector的操作是异步的,有可能还在处理也有可能已经完成了// 判断当前异步操作是否完成(完成不代表成功,也有可能是异常)if (regFuture.isDone()) { // 若异步操作完成// At this point we know that the registration was complete and successful.// 创建一个可修改的异步结果对象channelFutureChannelPromise promise = channel.newPromise();// 绑定端口号,regFuture的成功与否是在这个方法里面判断的doBind0(regFuture, channel, localAddress, promise);return promise;} else { // 若异步操作未完成// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 为异步操作添加监听器regFuture.addListener(new ChannelFutureListener() {// 当异步操作完成(成功,异常),就会触发该方法的执行@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// 获取异步操作执行过程中发生的异常Throwable cause = future.cause();if (cause != null) { // 异步执行过程发生异常// Registration on the EventLoop failed so fail // the ChannelPromise directly to not cause an// IllegalStateException once we try to access the // EventLoop of the Channel.// 修改异步结果为:失败promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586// 注册,也是修改promise.registered();// 绑定端口号doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}
一些需要注意的点:
- 如果异步操作完成,创建一个可修改的异步结果对象ChannelPromise
看到它继承ChannelFuture和Promise/** * Special {@link ChannelFuture} which is writable. * 可写的特殊的channelFuture */ public interface ChannelPromise extends ChannelFuture, Promise<Void> {...}
ChannelFuture代表的就是对通道异步操作的结果对象,可以通过它获取异步操作的结果,ChannelFuture继承自io.netty.util.concurrent.Future
其中ChannelFuture接口本身方法大多数都是添加删除监听,获取channel、同步等操作,基本都是在读,不存在写的操作
而Promise接口具有setFailure、setSuccess、trySuccess、tryFailure等方法,允许修改异步结果
/** * Special {@link Future} which is writable. * 特殊的{@link Future}是可写的。 */ public interface Promise<V> extends Future<V> {...}
即Promise是一个特殊的{@link Future}是可写的。
在异步处理过程中可以设置这个异步结果成功还是失败等信息
- 如果异步操作未完成,
还是创建一个可修改的异步结果对象
,只不过是一个特殊的实现类PendingRegistrationPromise(等待注册的Promise,继承体系中也是继承ChannelPromise)
PendingRegistrationPromise -> DefaultChannelPromise -> ChannelPromise
现在我们详细跟initAndRegister方法:
该方法内部包含了我们需要关注的四个核心流程中的其中三个:
创建一个Channel
初始化Channel
将当前channel注册给selector
final ChannelFuture initAndRegister() {Channel channel = null;try {// 创建一个channelchannel = channelFactory.newChannel();// 初始化channelinit(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(),GlobalEventExecutor.INSTANCE).setFailure(t);}// 将当前channel注册给selectorChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;
}
2.1 创建服务端 Channel
先看创建一个channel的分支,看到channel = channelFactory.newChannel();
那么channelFactory是什么?什么时候创建的?
我们回到一开始的SomeServer启动类,看到channel(Class<? extends C> channelClass)方法,该方法是用来指定创建的channel的类型,实际上就是这个方法创建channelFactory的:
//io.netty.bootstrap.AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");}// 创建一个channelFactory然后返回自己return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}//io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.channel.ChannelFactory<? extends C>)
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {return channelFactory((ChannelFactory<C>) channelFactory);
}//io.netty.bootstrap.AbstractBootstrap#channelFactory(io.netty.bootstrap.ChannelFactory<? extends C>)
public B channelFactory(ChannelFactory<? extends C> channelFactory) {if (channelFactory == null) {throw new NullPointerException("channelFactory");}if (this.channelFactory != null) {throw new IllegalStateException("channelFactory set already");}//赋值到成员变量this.channelFactory = channelFactory;return self();
}
上面代码可以看出channelFactory就是io.netty.channel.ReflectiveChannelFactory
好我们现在看一下channelFactory.newChannel()方法的具体实现:
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {private final Constructor<? extends T> constructor;public ReflectiveChannelFactory(Class<? extends T> clazz) {ObjectUtil.checkNotNull(clazz, "clazz");try {// 我们通过io.netty.bootstrap.AbstractBootstrap#channel传进来的是// io.netty.channel.socket.nio.NioServerSocketChannel// 初始化NioServerSocketChannel的构造器this.constructor = clazz.getConstructor();} catch (NoSuchMethodException e) {throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +" does not have a public non-arg constructor", e);}}@Overridepublic T newChannel() {try {// 使用反射机制,调用其无参构造器,创建channelreturn constructor.newInstance();} catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);}}...
}
可以看出来,是通过Class的无参构造
创建Channel的,而这个Class就是io.netty.channel.socket.nio.NioServerSocketChannel
,我们接着看NioServerSocketChannel的无参构造在初始化的时候都做了什么?找到NioServerSocketChannel的无参构造:
public class NioServerSocketChannel extends AbstractNioMessageChannelimplements io.netty.channel.socket.ServerSocketChannel {...public NioServerSocketChannel() {// 我们Netty的channel实际上是对原生的NIO的channel的封装this(newSocket(DEFAULT_SELECTOR_PROVIDER));}...// 获取到一个全局性的provider,用来创建Channel或者Selector// nio讲解的时候介绍过private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();private static ServerSocketChannel newSocket(SelectorProvider provider) {try {/*** Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.** See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.*/// 通过全局性的provider,创建一个原生的NIO的channelreturn provider.openServerSocketChannel();} catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);}}public NioServerSocketChannel(ServerSocketChannel channel) {// 封装super(null, channel, SelectionKey.OP_ACCEPT);// 创建channel的配置对象config = new NioServerSocketChannelConfig(this, javaChannel().socket());}
}
可以看到通过全局性的provider,创建一个原生的NIO的ServerSocketChannel,再看super的构造都做了什么
//io.netty.channel.nio.AbstractNioMessageChannel#AbstractNioMessageChannel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);
}//io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);//SelectableChannel ch,此时ch就是Nio原生的ServerSocketChannel对象this.ch = ch;//存储此channel需要关注的兴趣集//此时就是SelectionKey.OP_ACCEPT,接收事件this.readInterestOp = readInterestOp;try {// 指定channel为非阻塞ch.configureBlocking(false);} catch (IOException e) {try {ch.close();} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially initialized socket.", e2);}}throw new ChannelException("Failed to enter non-blocking mode.", e);}
}//继续看父类构造
//io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)
protected AbstractChannel(Channel parent) {// 此时parent是nullthis.parent = parent;// 为Netty的channel生成idid = newId();// 创建一个底层操作对象unsafe = newUnsafe();// 创建当前channel所绑定的channelPipelinepipeline = newChannelPipeline();
}protected ChannelId newId() {return DefaultChannelId.newInstance();
}protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this);}
-
看下id是由什么组成的
//io.netty.channel.DefaultChannelId#newInstance public static DefaultChannelId newInstance() {return new DefaultChannelId(); }//io.netty.channel.DefaultChannelId#DefaultChannelId private DefaultChannelId() {//看到整个ID是由machine(mark地址)、process(进程)、sequence(序列)、timestamp(时间戳)、random(随机数)组成data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];int i = 0;// machineIdSystem.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);i += MACHINE_ID.length;// processIdi = writeInt(i, PROCESS_ID);// sequencei = writeInt(i, nextSequence.getAndIncrement());// timestamp (kind of)i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());// randomint random = PlatformDependent.threadLocalRandom().nextInt();i = writeInt(i, random);assert i == data.length;hashCode = Arrays.hashCode(data); }
-
看下unsafe 是什么对象
不安全操作应永不从用户代码被调用。这些方法仅用于实现实际传输
,必须从I/O线程调用,以下方法除外
unsafe 提供一堆方法用来实际传输,例如 写数据write、注册register、刷新flush、connect连接、close关闭、bind绑定…
Unsafe这些方法就是最底层了,封装的都是和Nio的交互!!!!
-
再看下创建NioServerSocketChannelConfig配置对象
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
其中javaChannel()返回的就是之前创建的ServerSocketChannel对象
NioServerSocketChannelConfig的继承体系:
NioServerSocketChannelConfig -> DefaultServerSocketChannelConfig -> ServerSocketChannelConfig -> ChannelConfig
实际上是一个channel的配置属性set集合,后面会通过它对channel进行配置
-
ChannelPipeline后面会专门讲,这里只要记住是在NioServerSocketChannel构造里初始化的
总结
这里创建了一个 channel,主要完成了以下几个流程:
- ServerBootstrap在指定Channel通道类型的时候,底层创建了一个
ReflectiveChannelFactory
,该工厂可以通过使用指定的Class的构造器来创建Channel,而我们指定的就是NioServerSocketChannel - ServerBootstrap在调用bind方法的时候,底层会通过channelFactory.newChannel()创建Channel,即ReflectiveChannelFactory
- 在利用构造器创建Channel实例的时候,会触发对应类的类初始化和构造器方法,我们主要关注的是NioServerSocketChannel构造器初始化都做了哪些事:
- 通过全局性的
SelectorProvider
,创建一个原生的NIO的channel
- 生成了 channel 的 id,
ChannelId
(包括介绍了ChannelId由哪些组成) - 创建了真正的数据传输对象
Unsafe
- 创建并绑定了
ChannelPipeline
- 设置当前Channel需要关注的事件(OP_ACCEPT事件)
- 指定channel为非阻塞
- 创建 channel 的的配置类
NioServerSocketChannelConfig
对象
- 通过全局性的
2.2 初始化服务端 Channel
继续分析源码之前,说明一个事情:
一般用Netty的时候,ServerBootstrap是可以使用option()/childOption()方法进行一些TCP相关配置
除了option可以通过attr()/childAttr()添加属性,这些属性是绑定在channel上的,可以在处理器中使用
例如:
继续分析,看初始化channel的方法io.netty.bootstrap.AbstractBootstrap#init
入口一开始分析过(忘记的看上面):
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind
io.netty.bootstrap.AbstractBootstrap#initAndRegister
io.netty.bootstrap.AbstractBootstrap#init
该方法是一个抽象方法,具体实现在其子类,我们是Server端所以看
io.netty.bootstrap.ServerBootstrap#init,主要分三大步:
- 处理bootstrap中的option设置属性
- 处理bootstrap中的attr设置属性
- 向pipeline中添加ChannelInitializer处理器
(目的是为了添加ServerBootstrapAcceptor处理器,该处理器用于处理client的连接,通过bootstrap设置的childOption和childAttr属性都会传给这个处理器,当接受连接的时候为客户端的channel设置这些属性
)
//io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {// 处理bootstrap中的option设置属性final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}// 处理bootstrap中的attr设置属性final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {// 将bootstrap中设置的所有attr属性配置给channelfor (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}// 向pipeline中添加处理器ChannelPipeline p = channel.pipeline();// 获取bootstrap中设置的所有child开头的属性final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// ChannelInitializer是一个处理器,其存在的意义是,为pipeline添加其它处理器p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 获取bootstrap中配置的handler()ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}// ch.eventLoop()是获取到当前channel所绑定的evenLoop// 然后再使用该eventLoop所绑定的线程来执行指定的任务ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 向pipeline中添加ServerBootstrapAcceptor处理器// 该处理器用于处理client的连接pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}
A、设置 options
先获取options,看options0()方法:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {...private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();//io.netty.bootstrap.AbstractBootstrap#options0final Map<ChannelOption<?>, Object> options0() {return options;}...
}
什么时候赋值的呢?之前说过在使用Netty入口类的时候,可以用option()/childOption()进行配置
//io.netty.bootstrap.AbstractBootstrap#option
public <T> B option(ChannelOption<T> option, T value) {if (option == null) {throw new NullPointerException("option");}if (value == null) {synchronized (options) {options.remove(option);}} else {synchronized (options) { // 初始化optionsoptions.put(option, value);}}return self();
}//io.netty.bootstrap.ServerBootstrap#childOption
//只有Server端才需要child,设置的属性是给接受到的客户端连接的Channel用的
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {if (childOption == null) {throw new NullPointerException("childOption");}if (value == null) {synchronized (childOptions) {childOptions.remove(childOption);}} else {synchronized (childOptions) {childOptions.put(childOption, value);}}return this;
}
在看下setChannelOptions方法,为Channel设置Option,Option配置最终放到哪了?注意此时处理的是为parent的option,即为接受客户端连接的Channel设置option
//io.netty.bootstrap.AbstractBootstrap#setChannelOptions
static void setChannelOptions(Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {// 遍历通过bootstrap设置的所有optionfor (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {setChannelOption(channel, e.getKey(), e.getValue(), logger);}
}//io.netty.bootstrap.AbstractBootstrap#setChannelOption
private static void setChannelOption(Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {try {// 将设置的option配置给channel的configif (!channel.config().setOption((ChannelOption<Object>) option, value)) {logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);}} catch (Throwable t) {logger.warn("Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);}
}
看下channel.config()返回的是啥,因为我们的channel是NioServerSocketChannel,所以看下io.netty.channel.socket.nio.NioServerSocketChannel#config方法
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel {//...private final ServerSocketChannelConfig config;public NioServerSocketChannel(ServerSocketChannel channel) {// 封装super(null, channel, SelectionKey.OP_ACCEPT);// 初始化channel的配置对象config = new NioServerSocketChannelConfig(this, javaChannel().socket());}@Overridepublic ServerSocketChannelConfig config() {return config;}//...
}
可以看出来返回的就是NioServerSocketChannelConfig,调用的就是io.netty.channel.socket.nio.NioServerSocketChannel.NioServerSocketChannelConfig#setOption
//io.netty.channel.socket.nio.NioServerSocketChannel.NioServerSocketChannelConfig#setOption
public <T> boolean setOption(ChannelOption<T> option, T value) {//我们是NioChannel,肯定走第一个分支if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);}return super.setOption(option, value);
}//io.netty.channel.socket.nio.NioChannelOption#setOption
static <T> boolean setOption(Channel jdkChannel, NioChannelOption<T> option, T value) {java.nio.channels.NetworkChannel channel = (java.nio.channels.NetworkChannel) jdkChannel;if (!channel.supportedOptions().contains(option.option)) {return false;}if (channel instanceof ServerSocketChannel && option.option == java.net.StandardSocketOptions.IP_TOS) {// Skip IP_TOS as a workaround for a JDK bug:// See http://mail.openjdk.java.net/pipermail/nio-dev/2018-August/005365.htmlreturn false;}try {//看到底层还是用Nio的setOption方法channel.setOption(option.option, value);return true;} catch (IOException e) {throw new ChannelException(e);}
}
B、 设置 attr
和Options同理,ServerBootstrap类是可以用attr()/childAttr()方法添加属性,这些属性最终会绑定到channel上,可以通过channel获取到,一般用在处理器,看下取值和赋值的方法:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {...private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();//io.netty.bootstrap.AbstractBootstrap#attrs0 取值final Map<AttributeKey<?>, Object> attrs0() {return attrs;}...//io.netty.bootstrap.AbstractBootstrap#attr 设值public <T> B attr(AttributeKey<T> key, T value) {if (key == null) {throw new NullPointerException("key");}if (value == null) {synchronized (attrs) {attrs.remove(key);}} else {synchronized (attrs) {attrs.put(key, value);}}return self();}
}
在看下将这些值如何绑定到channel的,调用channel.attr(key).set(e.getValue())方法,注意此时是在处理parent的attr
//io.netty.bootstrap.ServerBootstrap#init
void init(Channel channel) throws Exception {// 处理bootstrap中的option设置属性final Map<ChannelOption<?>, Object> options = options0();synchronized (options) {setChannelOptions(channel, options, logger);}// 处理bootstrap中的attr设置属性final Map<AttributeKey<?>, Object> attrs = attrs0();synchronized (attrs) {// 将bootstrap中设置的所有attr属性配置给channelfor (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {@SuppressWarnings("unchecked")AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();channel.attr(key).set(e.getValue());}}...
}
我们知道channel就是NioServerSocketChannel,NioServerSocketChannel的attr方法,实际上是继承自DefaultAttributeMap,所以NioServerSocketChannel本身就是一个属性Map:
就不看了,知道就好了
C、 添加处理器
注意上面说的设置option和attr,这些参数都是为ServerSocketChannel设置使用的,ServerSocketChannel是用来接受客户端连接请求的,而childOption和childAttr设置的参数都是为了在Server端接受到客户端连接请求后,为其客户端的channel设置使用的,所以下面可以看到会先将child的option和attr先交给
专门处理client连接的处理器
@Override void init(Channel channel) throws Exception {...// 向pipeline中添加处理器ChannelPipeline p = channel.pipeline();// 获取bootstrap中设置的所有child开头的属性final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}... }
上面的childGroup和childHandler,childOptions,childAttrs都是我们之前通过ServerBootstrap设置的:
@Override
void init(Channel channel) throws Exception {...// 向pipeline中添加处理器ChannelPipeline p = channel.pipeline();// 获取bootstrap中设置的所有child开头的属性final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry<ChannelOption<?>, Object>[] currentChildOptions;final Entry<AttributeKey<?>, Object>[] currentChildAttrs;synchronized (childOptions) {currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));}synchronized (childAttrs) {currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));}// ChannelInitializer是一个处理器,其存在的意义是,为pipeline添加其它处理器// 并且initChannel执行完后pipline上会移除这个处理器封装的节点(后面专门说)// 注意,此时initChannel方法不会立即执行,只有当当前channel注册成功之后才会执行p.addLast(new ChannelInitializer<Channel>() {@Overridepublic void initChannel(final Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 获取bootstrap中配置的handler()ChannelHandler handler = config.handler();if (handler != null) {pipeline.addLast(handler);}// ch.eventLoop()是获取到当前channel所绑定的evenLoop// 然后再使用该eventLoop所绑定的线程来执行指定的任务ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// 向pipeline中添加ServerBootstrapAcceptor处理器// 该处理器用于处理client的连接pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}
给监听客户端连接请求的channel添加一个ChannelInitializer处理器(后面会详细介绍),为pipeline添加其它处理器,一但添加完,ChannelInitializer就没用了,就会被回收(回收的是处理器封装的节点,处理器本身不会回收,后面会讲,不是本篇重点)。
为什么不直接调用p.addLast?而是添加一个ChannelInitializer再添加ServerBootstrapAcceptor?
第一,如果是希望把添加ServerBootstrapAcceptor处理器这个逻辑异步处理的话,是一定要放在ChannelInitializer.initChannel这个方法里,因为此时channel还没有注册,也就意味着还没有和eventLoop绑定,希望通过eventLoop执行异步任务,而此时ch.eventLoop()肯定是null会报错(注册逻辑后面讲)
第二,至于为什么添加ServerBootstrapAcceptor处理器的逻辑一定要异步处理,我也不是很清楚,但是Netty几乎所有的操作都是通过异步任务处理的,如果有人明白这个异步的必要性,求赐教!!!
添加的ChannelInitializer会做哪些事?
-
获取ServerBootstrap中配置的handler(),如果配置了会加进到ChannelPipeline (服务端一般不配handle,配置childHandler)
-
获取当前channel绑定的evenLoop的线程执行指定的任务
向pipeline中添加ServerBootstrapAcceptor处理器,该处理器用于处理client的连接
接下来我们看下ServerBootstrapAcceptor处理器(是个内部类)逻辑:
继承了ChannelInboundHandlerAdapter ,当client发送来连接请求时,会触发channelRead()方法的执行
- 此时通道收到的msg就是当前Server的子channel
- 初始化这个子channel(为客户端的子channel设置handler,option,attr)
- 将当前子channel注册到selector(此时用的是childGroup进行注册的)
//io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {private final EventLoopGroup childGroup;private final ChannelHandler childHandler;private final Entry<ChannelOption<?>, Object>[] childOptions;private final Entry<AttributeKey<?>, Object>[] childAttrs;private final Runnable enableAutoReadTask;ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {//将传进来的都保存到成员变量里this.childGroup = childGroup;this.childHandler = childHandler;this.childOptions = childOptions;this.childAttrs = childAttrs;// Task which is scheduled to re-enable auto-read.// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may// not be able to load the class because of the file limit it already reached.//// See https://github.com/netty/netty/issues/1328enableAutoReadTask = new Runnable() {@Overridepublic void run() {channel.config().setAutoRead(true);}};}// 当client发送来连接请求时,会触发channelRead()方法的执行@Override@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) {// 注意,这里client发送来的就是连接当前Server的子channelfinal Channel child = (Channel) msg;// 初始化这个子channel// 对用于处理client 读写请求的子channel设置handler,option,attrchild.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);for (Entry<AttributeKey<?>, Object> e: childAttrs) {child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());}try {// 将当前子channel注册到selector// 注意这里用的就是childGroup了childGroup.register(child).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {//如果注册失败,强制关闭forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}}private static void forceClose(Channel child, Throwable t) {child.unsafe().closeForcibly();logger.warn("Failed to register an accepted channel: {}", child, t);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {final ChannelConfig config = ctx.channel().config();if (config.isAutoRead()) {// stop accept new connections for 1 second to allow the channel to recover// See https://github.com/netty/netty/issues/1328config.setAutoRead(false);ctx.channel().eventLoop().schedule(enableAutoReadTask, 1, TimeUnit.SECONDS);}// still let the exceptionCaught event flow through the pipeline to give the user// a chance to do something with itctx.fireExceptionCaught(cause);}
}
总结
初始化服务端Channel总共有三步:
- A、设置 options
- Option参数怎么来的?在启动程序中通过ServerBootstrap.option可以设置值
- Option参数最终设置到哪了?通过io.netty.channel.socket.nio.NioServerSocketChannel.NioServerSocketChannelConfig#setOption进行设置的,底层最终调用nio的ServerSocketChannel.setOption方法
- Option参数有什么用?设置的都是TCP相关的参数
- B、 设置 attr
- Attr参数怎么来的?在启动程序中通过ServerBootstrap.attr可以赋值
- Attr参数存哪里?NioServerSocketChannel继承自DefaultAttributeMap,参数绑定在自己身上
- Attr参数有什么用?pipeline链中的处理器可以获取到这些参数
- C、 添加处理器
- 这个时候会给监听客户端连接的Channel添加一个ChannelInitializer(
大概原因是想调用Channel对应的eventLoop的线程执行异步任务,通过异步任务添加ServerBootstrap.handler指定的处理器,和ServerBootstrapAcceptor处理器,但是这个时候Channel还没有注册到Selector,eventLoop还没绑定
) - 异步任务中会为当前监听的channel添加一个
ServerBootstrapAcceptor
处理器,该处理器用于处理client的连接(会将当前监听的channel,childEventLoopGroup,childOptions,childAttrs都通过构造传给这个处理器,这些东西都是在启动类的时候通过ServerBootstrap设置的) - ServerBootstrapAcceptor的逻辑:
- 当client发送来连接请求时,会触发channelRead()方法的执行(
channelRead的msg就是客户端对应的Channel
) - 然后初始化这个子channel,为该channel添加childHandler,设置childOptions和childAttrs
- 将当前子channel注册到selector(这个selector是childGroup中的selector)
- 当client发送来连接请求时,会触发channelRead()方法的执行(
- 这个时候会给监听客户端连接的Channel添加一个ChannelInitializer(
2.3 将 Channel 注册给 Selector
入口:
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind
io.netty.bootstrap.AbstractBootstrap#initAndRegister
回到io.netty.bootstrap.AbstractBootstrap#initAndRegister方法:
final ChannelFuture initAndRegister() {Channel channel = null;try {// 创建一个channelchannel = channelFactory.newChannel();// 初始化channelinit(channel);} catch (Throwable t) {if (channel != null) {// channel can be null if newChannel crashed (eg SocketException("too many open files"))channel.unsafe().closeForcibly();// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutorreturn new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}// 将当前channel注册给selectorChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) {if (channel.isRegistered()) {//如果异常,已注册,则直接关闭channel.close();} else {//否则强制关闭channel.unsafe().closeForcibly();}}// If we are here and the promise is not failed, it's one of the following cases:// 1) If we attempted registration from the event loop, the registration has been completed at this point.// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.// 2) If we attempted registration from the other thread, the registration request has been successfully// added to the event loop's task queue for later execution.// i.e. It's safe to attempt bind() or connect() now:// because bind() or connect() will be executed *after* the scheduled registration task is executed// because register(), bind(), and connect() are all bound to the same thread.return regFuture;
}
注册核心方法就是 ChannelFuture regFuture = config().group().register(channel)
config().group()返回的就是NioEventLoopGroup,即我们ServerBootstrap里设置的parentGroup
//group就是通过io.netty.bootstrap.ServerBootstrap#group设置的parentGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {super.group(parentGroup);if (childGroup == null) {throw new NullPointerException("childGroup");}if (this.childGroup != null) {throw new IllegalStateException("childGroup set already");}this.childGroup = childGroup;return this;
}
注意NioEventLoopGroup继承MultithreadEventLoopGroup,register方法默认实现在MultithreadEventLoopGroup中,我们看下MultithreadEventLoopGroup.register方法:
//io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {// 从parentGroup中根据算法选择一个eventLoop来完成注册return next().register(channel);
}//io.netty.channel.MultithreadEventLoopGroup#next
public EventLoop next() {return (EventLoop) super.next();
}//io.netty.util.concurrent.MultithreadEventExecutorGroup#next
public EventExecutor next() {return chooser.next();
}
chooser.next()返回一个EventExecutor(就是EventLoop后面详细介绍),而chooser是一个算法选择器,有两个实现,通常执行的是GenericEventExecutorChooser
,里面维护了一个EventExecutor的数组
(后面也会详细讲,不在本篇重点)
private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();//EventExecutor的数组private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}
}
继续追踪EventLoop的register方法,MultithreadEventLoopGroup是从EventLoopGroup里面选,返回一个EventLoop,所以现在看SingleThreadEventLoop
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {```@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}@Overridepublic ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, "promise");//Unsafe!!!之前说过底层最终通过它来处理各种连接,之前初始化已经跟过创建逻辑了promise.channel().unsafe().register(this, promise);return promise;}...
}
NioSocketChannel里创建的Unsafe,是
NioSocketChannelUnsafe
//io.netty.channel.AbstractChannel#AbstractChannel protected AbstractChannel(Channel parent) {this.parent = parent;// 为Netty的channel生成idid = newId();// 底层操作对象unsafe = newUnsafe();// 创建当前channel所绑定的channelPipelinepipeline = newChannelPipeline(); }//io.netty.channel.AbstractChannel#newUnsafe protected abstract AbstractUnsafe newUnsafe();//io.netty.channel.socket.nio.NioSocketChannel#newUnsafe protected AbstractNioUnsafe newUnsafe() {return new NioSocketChannelUnsafe(); }
NioSocketChannelUnsafe是NioSocketChannel的内部类:
private final class NioSocketChannelUnsafe extends NioByteUnsafe {@Overrideprotected Executor prepareToClose() {try {if (javaChannel().isOpen() && config().getSoLinger() > 0) {// We need to cancel this key of the channel so we may not end up in a eventloop spin// because we try to read or write until the actual close happens which may be later due// SO_LINGER handling.// See https://github.com/netty/netty/issues/4449doDeregister();return GlobalEventExecutor.INSTANCE;}} catch (Throwable ignore) {// Ignore the error as the underlying channel may be closed in the meantime and so// getSoLinger() may produce an exception. In this case we just return null.// See https://github.com/netty/netty/issues/4449}return null;} }
NioSocketChannelUnsafe 的register方法是继承io.netty.channel.AbstractChannel.AbstractUnsafe的默认实现:
protected abstract class AbstractUnsafe implements Unsafe {...@Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {//空报错if (eventLoop == null) {throw new NullPointerException("eventLoop");}//已注册,修改ChannelFuture的结果if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}//不匹配if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}// 这里实现了channel与eventLoop的绑定!!!AbstractChannel.this.eventLoop = eventLoop;// 判断当前正在执行的线程是否是当前eventLoop所绑定的线程if (eventLoop.inEventLoop()) {// 若当前线程是eventLoop绑定线程,则直接让这个线程来完成注册操作register0(promise);} else {// 当前线程不是eventLoop绑定线程,则首先会创建一个线程// 创建线程逻辑后面会讲,不在本篇范围内// 然后使用这个新创建的eventLoop线程来完成注册try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}
}
几个核心关注点:
-
Channel和EventLoop的绑定方法就在这!
AbstractChannel.this.eventLoop = eventLoop;
一个eventLoop可以绑定多个channel -
eventLoop.inEventLoop()方法:
- 作用:判断当前正在执行的线程是否是当前eventLoop所绑定的线程
- 目的:第一次走到该方法的时候,当前线程肯定是主线程,不希望让主线程做注册操作,比较耗时,让parent的EventLoopGroup里的线程处理,
即最终注册的操作线程都是EventLoopGroup里的线程
eventLoop里面绑定的那个线程,不是一开始就初始化好了这个线程,只有在第一次注册的时候才会创建新的线程
(后面讲,不是这次的重点)
-
判断当前线程是否就是eventLoop的线程:
//io.netty.util.concurrent.AbstractEventExecutor#inEventLoop public boolean inEventLoop() {return inEventLoop(Thread.currentThread()); }//io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop public boolean inEventLoop(Thread thread) {return thread == this.thread; }
断点调试看一下:
可以看到当前线程并不是EventLoop绑定的线程,EventLoop绑定的线程还是null
eventLoop.execute第一次执行的时候是没有线程,是如何创建的?这个问题就要从NioEventLoopGroup的构造中开始看了
简单介绍一下,不是本次重点,后面会专门说:
io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup()
…各种构造一直走
io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup//io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {//核心地方,这个就是具有创建线程功能的总executorexecutor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}//chilren维护的就是当前group中的所有子eventLoopchildren = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {//构建子EventLoopchildren[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {...}}//chooser就是上面提到过的算法选择器,实现类会根据指定算法从children中返回一个eventLoopchooser = chooserFactory.newChooser(children);... }
看下newChild方法,具体实现在其子类NioEventLoopGroup
//io.netty.channel.nio.NioEventLoopGroup#newChild protected EventLoop newChild(Executor executor, Object... args) throws Exception {return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
这里可以知道NioEventLoopGroup里的子线程类型都是
NioEventLoop
,而NioEventLoop.execute方法执行底层其实是调用构造传入的executor的execute方法,即ThreadPerTaskExecutor(newDefaultThreadFactory())
(底层怎么调用下一篇会介绍)看下ThreadPerTaskExecutor,其中就会有创建线程的逻辑:
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {threadFactory.newThread(command).start();} }
回到我们真正关注的重点,继续看AbstractUnsafe(具体类是NioSocketChannelUnsafe)
具体的注册方法io.netty.channel.AbstractChannel.AbstractUnsafe#register0:
//io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {try {// check if the channel is still open as it could be closed in the mean time when the register// call was outside of the eventLoopif (!promise.setUncancellable() || !ensureOpen(promise)) {return;}//状态,是不是还从没有注册过,是的话就是第一次注册boolean firstRegistration = neverRegistered;// 完成注册doRegister();// 修改状态值neverRegistered = false;registered = true;// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the// user may already fire events through the pipeline in the ChannelFutureListener.// 触发handlerAdded()方法的执行pipeline.invokeHandlerAddedIfNeeded();safeSetSuccess(promise);// 触发channelRegistered()方法的执行pipeline.fireChannelRegistered();// Only fire a channelActive if the channel has never been registered. This prevents firing// multiple channel actives if the channel is deregistered and re-registered.// 若当前channel是激活状态,且是第一次注册,// 则触发channelActive()的执行if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {// This channel was registered before and autoRead() is set. This means we need to begin read// again so that we process inbound data.//// See https://github.com/netty/netty/issues/4805// 执行到这里到时候Channel已经注册成功了// 此时SelectionKey的兴趣集还是0// beginRead方法会将SelectionKey的兴趣集// 设置为之前NioServerSocketChannel构造中设置的值beginRead();}}} catch (Throwable t) {// Close the channel directly to avoid FD leak.closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}
}
注意点:
- pipeline.invokeHandlerAddedIfNeeded():触发handlerAdded()方法的执行
pipeline.fireChannelRegistered():触发channelRegistered()方法的执行
pipeline.fireChannelActive():触发channelActive()的执行
以上几个方法都会触发pipline链上的处理器对应的方法的执行:
(后期会讲,先混个眼熟)
pipeline的分析,也是后期再讲,不是本篇重点
继续看doRegister()方法,其具体实现在AbstractNioChannel
//io.netty.channel.nio.AbstractNioChannel#doRegister
protected void doRegister() throws Exception {boolean selected = false;for (;;) {try {// 其实netty的channel的注册,本质上是原生的nio的channel的注册// 第二个参数0是形参ops的值,表示当前对任何事件都暂不关注selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return;} catch (CancelledKeyException e) {if (!selected) {// Force the Selector to select now as the "canceled" SelectionKey may still be// cached and not removed because no Select.select(..) operation was called yet.eventLoop().selectNow();selected = true;} else {// We forced a select operation on the selector before but the SelectionKey is still cached// for whatever reason. JDK bug ?throw e;}}}
}
细心的人会发现,这里注册到Selector的时候,ops的值是0,表示对任何事件都不关注,但是我们此时的Channel是NioServerSocketChannel,不是应该需要关注ACCEPT事件吗?
其实默认都是先注册为0,而注册ACCEPT事件的逻辑是在注册成功之后的beginRead方法里
private void register0(ChannelPromise promise) {try {...// 完成注册doRegister();...if (isActive()) {if (firstRegistration) {...} else if (config().isAutoRead()) {// 执行到这里到时候Channel已经注册成功了// 此时SelectionKey的兴趣集还是0// beginRead方法会将SelectionKey的兴趣集// 设置为之前NioServerSocketChannel构造中设置的值beginRead();}}} catch (Throwable t) {...}
}//io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
public final void beginRead() {assertEventLoop();if (!isActive()) {return;}try {doBeginRead();} catch (final Exception e) {invokeLater(new Runnable() {@Overridepublic void run() {pipeline.fireExceptionCaught(e);}});close(voidPromise());}
}//io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was called// 因为已经注册到Selector了,所以肯定是有SelectionKey的final SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;}readPending = true;//这里interestOps应该是0final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {//其中readInterestOp就是之前在NioServerSocketChannel构造里赋值的//当时赋值是SelectionKey.OP_ACCEPTselectionKey.interestOps(interestOps | readInterestOp);}
}//回顾一下NioServerSocketChannel的构造:
public NioServerSocketChannel(ServerSocketChannel channel) {// 封装super(null, channel, SelectionKey.OP_ACCEPT);// 获取channel的配置对象config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//super构造
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {...this.readInterestOp = readInterestOp;...
}
总结
NioServerSocketChannel 的注册,实际上是由 Channel 所绑定的 EventLoop 中的线程来完成的,而注册的本质是将原生的 NIO 的 Channel 注册到了 Seletor。
步骤:
- ServerBootstrap调用bind方法后,触发initAndRegister时,会调用config().group().register(channel)方法实现注册,
本质是调用了NioEventLoopGroup的register方法
- NioEventLoopGroup的register方法,会根据算法选择其中一个NioEventLoop,继续调用register方法,即
NioEventLoop.register
,它是继承SingleThreadEventLoop,具体实现在SingleThreadEventLoop
里面 - SingleThreadEventLoop.register方法实际上是调用了promise.channel().unsafe().register(this, promise)方法,即依靠
Unsafe
完成注册的,即NioSocketChannelUnsafe
,register的具体实现是在io.netty.channel.AbstractChannel.AbstractUnsafe
- AbstractUnsafe中,会用上面NioEventLoop绑定的线程(而不是主线程,并且第一次调用的时候才创建并绑定线程的,一开始NioEventLoop绑定的线程是null)来进行注册操作,底层还是利用Nio的api注册到了Selector,此时还会触发一系列pipline的事件(注册的时候兴趣集是0,之后在beginRead方法里修改了SelectionKey的兴趣集为ACCEPT)
2.4 端口绑定
现在我们看最后一个流程,端口绑定,回到doBind:
io.netty.bootstrap.AbstractBootstrap#bind(int)
io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)
io.netty.bootstrap.AbstractBootstrap#doBind
private ChannelFuture doBind(final SocketAddress localAddress) {// 创建、初始化channel,并将其注册到Selectorfinal ChannelFuture regFuture = initAndRegister();// 从异步结果中获取channelfinal Channel channel = regFuture.channel();// 获取异步操作执行过程中发生的异常if (regFuture.cause() != null) {return regFuture;}// 判断当前异步操作是否完成:或者是成功,或者是异常if (regFuture.isDone()) { // 若异步操作成功// At this point we know that the registration was complete and successful.// 创建一个可修改的异步结果对象channelFutureChannelPromise promise = channel.newPromise();// 绑定端口号doBind0(regFuture, channel, localAddress, promise);return promise;} else { // 若异步操作未完成// Registration future is almost always fulfilled already, but just in case it's not.final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);// 为异步操作添加监听器regFuture.addListener(new ChannelFutureListener() {// 当异步操作完成(成功,异常),就会触发该方法的执行@Overridepublic void operationComplete(ChannelFuture future) throws Exception {// 获取异步操作执行过程中发生的异常Throwable cause = future.cause();if (cause != null) { // 异步执行过程发生异常// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an// IllegalStateException once we try to access the EventLoop of the Channel.// 修改异步结果为:失败promise.setFailure(cause);} else {// Registration was successful, so set the correct executor to use.// See https://github.com/netty/netty/issues/2586promise.registered();// 绑定端口号doBind0(regFuture, channel, localAddress, promise);}}});return promise;}
}
之前我们分析过,一但注册的操作完成,就会触发doBind0进行端口号绑定:
//io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {//异步操作是成功才会触发绑定端口逻辑//这里异步操作就是注册selector的操作if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});
}
可以看到,绑定端口的工作,也是让eventLoop的异步处理的,并且只有在channel注册成功的时候才会进行绑定端口的工作,看下channel.bind(localAddress, promise)方法,实际上是调用NioServerSocketChannel.bind,其具体实现是其父类AbstractChannel中
//io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {//调用了pipline的bind方法return pipeline.bind(localAddress, promise);
}//继续走
//io.netty.channel.DefaultChannelPipeline#bind
//tail是AbstractChannelHandlerContext,代表的是pipline链上的节点(封装了处理器)中的最后一个节点,后面再说,不是这次本文重点
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);
}
注意:tail代表的是pipline链上的节点(封装了处理器的节点,不是处理器)中的最后一个节点
,head代表的是第一个节点,head和tail是Netty定义好的,中间那些节点,是我们添加进去的处理器对应的节点,我们只要添加处理器就行了,Netty会自动封装成节点放到pipline上
//io.netty.channel.AbstractChannelHandlerContext#bind
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {if (localAddress == null) {throw new NullPointerException("localAddress");}if (isNotValidPromise(promise, false)) {// cancelledreturn promise;}//这个方法先不讲,后面讲Channel 的 inBound 与 outBound 处理器时会专门讲//简单理解为获取当前节点的下一个节点final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);EventExecutor executor = next.executor();if (executor.inEventLoop()) {//执行绑定next.invokeBind(localAddress, promise);} else {safeExecute(executor, new Runnable() {@Overridepublic void run() {//执行绑定next.invokeBind(localAddress, promise);}}, promise, null);}return promise;
}
直接跟invokeBind:
//io.netty.channel.AbstractChannelHandlerContext#invokeBind
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {if (invokeHandler()) {try {((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);}} else {bind(localAddress, promise);}
}
ChannelOutboundHandler,后面讲Channel 的 inBound 与 outBound 处理器时会专门讲,现在直接跟bind方法,只要知道这里最终会调io.netty.channel.DefaultChannelPipeline.HeadContext.bind()方法即可,即最终会调到头结点的bind方法:
final class HeadContext extends AbstractChannelHandlerContextimplements ChannelOutboundHandler, ChannelInboundHandler {private final Unsafe unsafe;HeadContext(DefaultChannelPipeline pipeline) {super(pipeline, null, HEAD_NAME, HeadContext.class);unsafe = pipeline.channel().unsafe();setAddComplete();}//io.netty.channel.DefaultChannelPipeline.HeadContext#bindpublic void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {unsafe.bind(localAddress, promise);}...
}
可以看到又调了unsafe,底层代码了:
继续跟
//io.netty.channel.AbstractChannel.AbstractUnsafe#bind
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}// See: https://github.com/netty/netty/issues/576if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&localAddress instanceof InetSocketAddress &&!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {// Warn a user about the fact that a non-root user can't receive a// broadcast packet on *nix if the socket is bound on non-wildcard address.logger.warn("A non-root user can't receive a broadcast packet if the socket " +"is not bound to a wildcard address; binding to a non-wildcard " +"address (" + localAddress + ") anyway as requested.");}// 若当前channel未被激活,则该方法返回falseboolean wasActive = isActive();try {// 绑定// 一旦端口被绑定了,则channel就被激活了doBind(localAddress);} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}//if (!wasActive && isActive()) {invokeLater(new Runnable() {@Overridepublic void run() {// 触发pipline中的处理器中的channelActive()方法的执行pipeline.fireChannelActive();}});}safeSetSuccess(promise);
}
doBind绑定,是个抽象方法,具体实现我们需要看NioServerSocketChannel
//io.netty.channel.socket.nio.NioServerSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());}
}
如果是客户端NioSocketChannel也差不多:
//io.netty.channel.socket.nio.NioSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {doBind0(localAddress);
}private void doBind0(SocketAddress localAddress) throws Exception {// 若当前JDK平台使用的版本>=7,则...if (PlatformDependent.javaVersion() >= 7) {SocketUtils.bind(javaChannel(), localAddress);} else {SocketUtils.bind(javaChannel().socket(), localAddress);}
}//io.netty.util.internal.SocketUtils#bind(java.nio.channels.SocketChannel, java.net.SocketAddress)
public static void bind(final SocketChannel socketChannel, final SocketAddress address) throws IOException {try {AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() {@Overridepublic Void run() throws IOException {socketChannel.bind(address);return null;}});} catch (PrivilegedActionException e) {throw (IOException) e.getCause();}
}