当前位置: 代码迷 >> 综合 >> Netty详解之五:EventLoop
  详细解决方案

Netty详解之五:EventLoop

热度:74   发布时间:2024-02-02 05:45:21.0

上一篇介绍了Netty的EventExecutor框架,这一篇还是围绕EventExecutor,介绍它在通信模块里是如何运用的。

EventLoopGroup和EventLoop

EventExecutor到了通信模块内,就变成了EventLoop,表明它是一个事件处理循环;EventLoop相关类型全部定义在io.netty.channel这个package下面。

EventLoopGroup接口扩展自EventExecutorGroup,增加了注册Channel的方法:

public interface EventLoopGroup extends EventExecutorGroup {ChannelFuture register(Channel channel);ChannelFuture register(ChannelPromise promise);
}

EventLoop接口扩展自EventExecutor和EventExecutorGroup:

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {@OverrideEventLoopGroup parent();
}

EventLoop为啥扩展EventExecutorGroup?难道就是为了继承register方法,对此本人持保留意见。

EventLoop和EventLoopGroup的关系,与EventExecutor和EventExecutorGroup的关系,如出一辙。

MultithreadEventLoopGroup和SingleThreadEventLoop

在实现层面,也有一对相互配合的抽象类,MultithreadEventLoopGroup和SingleThreadEventLoop。

先看MultithreadEventLoopGroup,它的代码比较少,下面只贴出值得注意的方法。

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {@Overridepublic ChannelFuture register(Channel channel) {//channel注册请求转发给下一个child EventLoopreturn next().register(channel);}@Overridepublic ChannelFuture register(ChannelPromise promise) {return next().register(promise);}
}

再看SingleThreadEventLoop:

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {//又加了一个任务队列private final Queue<Runnable> tailTasks;//注册Channel到EventLoop@Overridepublic ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));}//注册Channel到EventLoop,将注册请求转发给了channel@Overridepublic ChannelFuture register(final ChannelPromise promise) {promise.channel().unsafe().register(this, promise);return promise;}//添加了一个任务到tailTasks队列@UnstableApipublic final void executeAfterEventLoopIteration(Runnable task) {if (!tailTasks.offer(task)) {reject(task);}if (!(task instanceof LazyRunnable) && wakesUpForTask(task)) {wakeup(inEventLoop());}}//意思应该是,SingleThreadEventExecutor的任务全部执行完了,执行tailTasks内的任务@Overrideprotected void afterRunningAllTasks() {runAllTasksFrom(tailTasks);}//修改判断是否还有任务的条件,将tailTasks也考虑进去@Overrideprotected boolean hasTasks() {return super.hasTasks() || !tailTasks.isEmpty();}//修改判断待执行任务数量,将tailTasks.zie也计算进去@Overridepublic int pendingTasks() {return super.pendingTasks() + tailTasks.size();}
}

SingleThreadEventLoop做了两件事:

  • 增加了一个任务队列:tailTasks,从变量和相关方法的命名上来看,应该是每次事件循环末尾执行的任务;
  • 将注册Channel请求,又转发给了Channel;

NioEventLoopGroup和NioEventLoop

使用Netty进行TCP通信,底层可选用Java NIO,或平台本地接口(Epoll或Kqueue);从研究源码的角度,基于NIO的方式更加容易理解一些,我们就先易后难。如果底层使用Java NIO,就需要使用NioEventLoopGroup和NioEventLoop。

NioEventLoopGroup

NioEventLoopGroup没多少内容,最主要一点就是:重写了EventLoopGroup的newChild方法,创建NioEventLoop。

public class NioEventLoopGroup extends MultithreadEventLoopGroup {@Overrideprotected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);}
}

NioEventLoop

重头戏在NioEventLoop,它驱动了NIO Channel的IO操作。

先看它的成员字段:

public final class NioEventLoop extends SingleThreadEventLoop {//如果在一个EventLoop周期内,有超过CLEANUP_INTERVAL个Channel关闭,需要重新执行select操作private static final int CLEANUP_INTERVAL = 256; 	//这两个常量是为应对JAVA NIO BUG而设置的,后面会解释private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;private static final int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;//将select操作封装在一个接口内//之所以这么麻烦,是因为不同的IO模型有不同的select操作,Netty将他们的外观统一起来private final IntSupplier selectNowSupplier = new IntSupplier() {@Overridepublic int get() throws Exception {return selectNow();}};//unwrappedSelector是Java NIO的Selectorprivate Selector unwrappedSelector;//selector是的unwrappedSelector一个代理,实现了一些性能优化private Selector selector;//selectedKeys是存放select结果的容器private SelectedSelectionKeySet selectedKeys;//java NIO 对象private final SelectorProvider provider;//nextWakeupNanos记录当前select操作阻塞的状态//=AWAKE表示无阻塞,=NONE表示无限制阻塞,其他值代表阻塞时间private static final long AWAKE = -1L;private static final long NONE = Long.MAX_VALUE;private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);//select执行策略接口private final SelectStrategy selectStrategy;//ioRatio表示事件循环中IO操作(对比任务)占据的时间比例private volatile int ioRatio = 50;//累计取消的SelectionKeyprivate int cancelledKeys;//一个标记位,表示需要重新执行select操作private boolean needsToSelectAgain;
}

构造方法

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),rejectedExecutionHandler);this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");final SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

构造参数有两个点需要解释下:

  1. super调用的第三个参数对应基类SingleThreadEventExecutor.addTaskWakesUp,由于NioEventLoop并不依靠基类的taskQueue来阻塞,而是阻塞在select操作上,所以是false;
  2. queueFactory用来创建taskQueue,netty为了优化新能,有一些自定义、且平台相关的并发容器,我们暂不关注;

构造方法最重要的是调用了openSelector()方法,它创建java NIO Selector,也就是unwrappedSelector;并通过反射对unwrappedSelector的性能进行优化。openSelector的实现是一大堆java反射相关的代码,这里就不贴了。

Netty优化Selector的方式是:如果原生Selector的实现类是sun.nio.ch.SelectorImpl,它成员字段SelectorImpl.selectedKeys是一个HashSet,访问性能不够好(NioEventLoop需要频繁地访问它),尝试将它替换成一个SelectedSelectionKeySet类型的对象,也就是NioEventLoop.selectedKeys指向的对象。同时NioEventLoop.selector指向一个SelectedSelectionKeySetSelector对象(是unwrappedSelector的代理),执行相关操作时顺便维护NioEventLop.selectedKeys。

如果优化失败(这个优化操作依赖SelectorImpl内部实现细节,可能发生兼容性失败),那么NioEventLoop.selector和unwrappedSelecto就是同一个对象,而NioEventLoop.selectedKeys则是null。

SelectStrategy

SelectStrategy是用来控制EventLoop的select策略的,它的定义如下:

public interface SelectStrategy {/策略常量,指示EventLoop执行一次阻塞的selectint SELECT = -1;/策略常量,指示EventLoop应当立即重新执行事件循环,不要执行任何阻塞操作int CONTINUE = -2;/策略常量,指示EventLoop应当忙等(selector状态变化)int BUSY_WAIT = -3;//计算strategy//@param selectSupplier提供一次select的结果//@hasTasks EventLoop内是否有待执行的任务//@return 如果返回是上面三个常量之一,那么EventLoop按指示执行>=0 则是Selector.select的正常返回值,按SelectionKey结果进行IO操作即可int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}

IntSupplier这个接口名有点浪,它的实现需要提供一次select操作的结果。

一般使用默认的DefaultSelectStrategy即可:

final class DefaultSelectStrategy implements SelectStrategy {static final SelectStrategy INSTANCE = new DefaultSelectStrategy();//如果EventLoop有待执行的任务,执行selectSupplier并返回其结果//否则返回SelectStrategy.SELECT,指示EventLoop可以阻塞@Overridepublic int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}
}

再结合NioEventLoop.selectNowSupplier的实现,NioEventLoop.selectStrategy的行为就是:

  1. 如果NioEventLoop内有任务等待执行,那么执行非阻塞的selectNow();
  2. 否则指示NioEventLoop执行一次阻塞的select。

NioEventLoop.run

终于到了重头戏:NioEventLoop的事件循环逻辑;我们已经预见它干的活是处理IO+执行任务,现在就来看看它是怎么做的。

protected void run() {int selectCnt = 0;for (;;) {try {int strategy;try {strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE:continue;//nio 不支持忙等,按SELECT处理case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT://看下一个调度任务的执行时间,来决定阻塞多长时间long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE; }nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {/执行一次限时的阻塞selectstrategy = select(curDeadlineNanos);}} finally {nextWakeupNanos.lazySet(AWAKE);}default:}} catch (IOException e) {//如果发生IOException,说明Selector出问题了,重建Selector//参见:https://github.com/netty/netty/issues/8566rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}//selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUGselectCnt++;cancelledKeys = 0;needsToSelectAgain = false;//注意,执行到这里,strategy=selectedKeys数量//依据设定ioRatio,不同的执行任务策略final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {//有IO事件需要处理if (strategy > 0) {processSelectedKeys();}} finally {//注意,将ioRatio设置为100,并不代表任务不执行,反而是每次将任务队列执行完ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {//基于本次循环处理IO的时间,ioRatio,计算出执行任务耗时的上限final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {//这个分支代表:strategy=0,ioRatio<100,此时任务限时=0,意为:尽量少地执行一下任务//这个分支和strategy>0实际是一码事,代码简化了一下而已ranTasks = runAllTasks(0);}if (ranTasks || strategy > 0) {//ranTasks=true,或strategy>0,说明eventLoop干活了,没有空转,清空selectCntselectCnt = 0;} //unexpectedSelectorWakeup处理NIO BUG,待会再分析else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)			   //到里实际已经重建了SelectorselectCnt = 0;}} catch (CancelledKeyException e) {//这里忽略了异常处理代码...}try {//如果进入了ST_SHUTTING_DOWN状态,执行关闭流程if (isShuttingDown()) {//关闭所有ChannelcloseAll();//执行基类的confirmShutdown,尝试优雅关闭if (confirmShutdown()) {//尝试关闭成功退出eventLoop,否则下个循环再试return;}}} catch (Throwable t) {handleLoopException(t);}}
}

有了前面对SelectStrategy的学习,NioEventLoop.run方法就显得没有那么恐怖了,基本上就是几个步骤:

  • 按Selecttrategy的结果,确定本次循环的执行策略;
  • 如果执行策略是SelectStrategy.SELECT,阻塞时间参考下一次调度任务的触发时间;
  • 处理IO事件(processSelectedKeys方法);
  • 执行任务队列内的任务,执行任务的耗时受限于ioRatio;
  • 一次循环发生了空转(既没有IO需要处理、也没有执行任何任务),那么记录下来(selectCnt);
  • 如果连续发生空转(selectCnt达到一定值),netty认为触发了NIO的BUG(unexpectedSelectorWakeup处理);
  • 最后,检查一下是否应该关闭EventLoop。

其中ioRatio取值范围是(0,100],100是一个特殊值,代表不控制“处理IO事件”和“执行任务”的耗时比例,每次循环执行所有的任务。

阻塞唤醒

如果EventLoop阻塞在selector.select操作上,如果此时有任务加入,岂不是没有机会执行了?答案当然是不会,上一章介绍SingleThreadEventExecutor的时候已经解释过这个问题,NioEventLoop既然有自己的阻塞逻辑,那么就应该有相应的唤醒逻辑,它重写了SingleThreadEventExecutor的wakeup方法如下:

@Override
protected void wakeup(boolean inEventLoop) {if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {selector.wakeup();}
}

处理IO事件

NioEventLoop.run方法里调用processSelectedKeys来处理IO事件:

private void processSelectedKeys() {if (selectedKeys != null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());}
}

成员字段selectedKeys=null,说明Netty对Java Nio Selector的优化失败了;优化成功与否,SelectionKeys的遍历方式不一样,倒也没有本质区别,这里仅给出未优化selector的处理方式:

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {if (selectedKeys.isEmpty()) {return;}Iterator<SelectionKey> i = selectedKeys.iterator();for (;;) {final SelectionKey k = i.next();final Object a = k.attachment();i.remove();//SelectionKey的附件对象a,有可能就是Channel//还有可能是一个NioTask,这个是什么鬼暂时不知道,先忽略if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {@SuppressWarnings("unchecked")NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}//在遍历selectedKeys过程中,如果需要重新selectif (needsToSelectAgain) {selectAgain();selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {break;} else {i = selectedKeys.iterator();}}}
}

上面中有两个疑问,一是NioTask的作用;二是needsToSelectAgain这个标记是如何工作的。我们暂且放下,继续追踪正常流程,来到processSelectedKey:

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();//检查逻辑,是否Channel对应的SelectionKey已经失效if (!k.isValid()) {final EventLoop eventLoop;try {eventLoop = ch.eventLoop();} catch (Throwable ignored) {return;}//只有channel确实归属eventLoop,才能关闭它//如果用户主动将Channel从eventLoop取消注册,Channel有可能还是健康可用的,只是不再归属这个eventLoopif (eventLoop == this) {unsafe.close(unsafe.voidPromise());}return;}try {int readyOps = k.readyOps();//处理OP_CONNECT:连接建立,OP_CONNECT只需要处理一次,立即将该操作码从SelectionKey擦掉if ((readyOps & SelectionKey.OP_CONNECT) != 0) {int ops = k.interestOps();ops &= ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}//处理OP_WRITE(channel可写),将缓存刷入底层socketif ((readyOps & SelectionKey.OP_WRITE) != 0) {ch.unsafe().forceFlush();}//处理OP_READ和OP_ACCEPT都当做读来处理,前者是通信Socket的读,后者是监听Socket的读if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}

底层IO操作还是由具体的Channel(ch.unsafe)来执行的,NioEventLoop通过一个事件循环来驱动Channel工作。

重建Selector

Java NIO有一个Bug,一旦触发会导致Selector.select操作不阻塞,且一直返回0,这样EventLoop就变成空转了。为了解决这个问题,Netty会检测空转,并重建Selector。

触发重建的方法是unexpectedSelectorWakeup:

private boolean unexpectedSelectorWakeup(int selectCnt) {//如果EventLoop空转次数达到一个门限值,触发Selector重建if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {rebuildSelector();return true;}return false;
}

重建代码如下:

private void rebuildSelector0() {final Selector oldSelector = selector;final SelectorTuple newSelectorTuple;if (oldSelector == null) {return;}//打开一个新的Selectortry {newSelectorTuple = openSelector();} catch (Exception e) {return;}//然后遍历oldSelector上的注册信息,在新Selector上重新注册int nChannels = 0;for (SelectionKey key: oldSelector.keys()) {Object a = key.attachment();try {//已经失效的SelectionKey,就不用搞了if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {continue;}int interestOps = key.interestOps();key.cancel();重新注册SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);if (a instanceof AbstractNioChannel) {((AbstractNioChannel) a).selectionKey = newKey;}nChannels ++;} catch (Exception e) {//异常处理略过去...}}//新selector取代老selectorselector = newSelectorTuple.selector;unwrappedSelector = newSelectorTuple.unwrappedSelector;//关闭老selectortry {oldSelector.close();} catch (Throwable t) {//...}
}

总结

对Netty来说,EventLoop处在一个核心的位置,所有的IO操作都是在EventLoop中完成的,可以说EventLoop是Netty的发动机。

当Channel注册到EventLoop之后,EventLoop会监视Channel的状态,并驱动Channel的IO操作。Channel和EventLoop是多对一的关系,对单个Channel来说,它的IO操作都是非并发执行的。

EventLoop具体实现会依赖所使用的底层通信接口,如果底层使用Java NIO,那么对应的EventLoop就是NioEventLoop;NioEventLoop通过一个事件循环,反复执行Selector.select,并将捕获的事件转发给对应的Channel。

道理说明白了,确实也简单;关键还是在于细节,NioEventLoop帮我们做了大量诸如性能优化、屏蔽底层BUG的工作。