一、EventLoop的继承关系
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
?在使用Netty时,上面的代码是必不可少的,即创建一个EventLoop对象,EventLoop的类继承关系如下:
?①ScheduledExecutorService接口表示是一个定时任务接口,即EventLoop可以接受定时任务。
?②EventLoop接口:Netty接口文档说明了该接口的作用,一旦Channel注册了,就处理该Channel对应的所有IO操作。
?③SingleThreadEventExecutor表示这是一个单个线程的线程池。
?④EventLoop是一个单例的线程池,里面含有一个死循环的线程不断的做着3件事情:监听端口、处理端口事件、处理队列事件。每个EventLoop都可以绑定多个Channel,而每个Channel始终只能由一个EventLoop来处理。
二、NioEventLoop的使用
?对于EventLoop的使用,主要就是eventLoop.excute(task)的调用过程,该方法的实现在SingleThreadEventExecutor类中:
@Override
public void execute(Runnable task){
if(task == null){
throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if(inEventLoop ){
addTask(task);}else{
startThead();addTask(task);if(isShutdown() && removeTask(task)){
reject();}}if(!addTaskWakesUp && wakesUpForTask(task)){
wakeup(inEventLoop);}
}
?①首先判断该EventLoop的线程是否是当前线程,如果是,直接添加到任务队列中,否则则尝试启动线程(由于线程是单个的,因此只能启动一次),随后再将任务添加到队列中。
?②如果线程已经停止,并且删除任务失败,则执行拒绝策略,默认的拒绝策略是抛出异常。
?③如果addTaskWakesUp是false,且任务不是NonWakeupRunnable类型,就尝试唤醒selector,这时阻塞在selector的线程就会立即返回。
?addTask(task)的源码:
protected void addTask(Runnable task){
if(task == null){
throw new NullPointerException("task");}if(!offerTask(task)){
reject(task);}
}final boolean offerTask(Runnable task){
if(isShutdown()){
reject();}// 将任务加入队列return taskQueue.offer(task);
}
三、NioEventLoop的父类SingleThreadEventExecutor的startThread方法
?当执行execute()方法的时候,如果当前线程不是EventLoop所属线程,则尝试启动线程,也就是会调用startThread():
private void startThread(){
if(state == ST_NOT_STARTED){
try{
doStartThread();}catch(Throwable cause){
STATE_UPDATER.set(this,ST_NOT_STARTED);PlatformDependent.throwException(cause);}}
}
?该方法首先判断线程是否启动过了,保证EventLoop只有一个线程,如果没有启动过,则尝试使用CAS将state的状态改为ST_STARTED,也就是将线程的启动状态改为已启动。然后调用doStartThread方法,若失败则回滚线程状态为未启动。doStartThread()的代码如下:
private void doStartThread(){
executor.execute(new Runnable(){
@Overridepublic void run(){
boolean success = false;updateLastExecutionTime();try{
SingleThreadEventExecutor.this.run();success = true;}finally{
for(;;){
int oldState = state;if(oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,oldState,ST_SHUTTING_DOWN)){
break;}}try{
for(;;){
if(confirmShutdown()){
break;}}}finally{
try{
cleanup();}finally{
STATE_UPDATER.set(SingleThreadEventExecutor.this,ST_TERMINATED);threadLock.release();terminationFuture.setSuccess(null);}}}}});
}
?①首先调用executor的execute()方法,这个executor就是在创建EventLoopGroup的时候创建的ThreadPerTaskExecutor类,该execute方法会将Runnable包装成Netty的FastThreadLocalThread。
?②任务中首先判断线程中断状态,然后设置最后一次的执行时间。
?③执行当前NioEventLoop的run方法,注意:这个方法是个死循环,是整个EventLoop的核心。
?④在finally块中,使用CAS不断修改state的状态,尝试将其改成ST_SHUTTING_DOWN,也就是当线程Loop结束的时候,关闭线程。最后还要死循环确认是否关闭,否则不会break,然后执行cleanup操作,更新状态为ST_TERMINATED,并释放当前线程锁。如果任务队列不是空,则打印队列中还有多少个未完成的任务,并回调terminationFuture的setSuccess方法。
四、NioEventLoop的run()
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {
selector.wakeup();}// fall throughdefault:}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {
try {
processSelectedKeys();} finally {
// Ensure we always run tasks.runAllTasks();}} else {
final long ioStartTime = System.nanoTime();try {
processSelectedKeys();} finally {
// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {
handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {
if (isShuttingDown()) {
closeAll();if (confirmShutdown()) {
return;}}} catch (Throwable t) {
handleLoopException(t);}}
}
?整个run()方法做了3件事:
??select获取感兴趣的事件;
??processSelectedKeys处理事件;
??runAllTasks执行队列中的任务;
?select()方法的代码如下:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;try {
int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();selectCnt = 1;}break;}if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;}if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);rebuildSelector();selector = this.selector;// Select again to populate selectedKeys.selector.selectNow();selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}
}
?调用selector的select()方法默认阻塞1秒钟,如果有定时任务,则在定时任务剩余时间的基础上再加上0.5秒的阻塞时间。当执行execute方法的时候,也就是添加任务的时候,唤醒selector,防止selector阻塞时间过长。
五、EventLoop运行机制小结
?每次执行execute方法都是向队列中添加任务,当第一次添加时就启动线程,执行run方法。而run方法是整个EventLoop的核心,在run方法中一直不停的循环做3件事:
?1、调用selector的select方法,默认阻塞1秒钟,如果有定时任务,则在定时任务剩余时间的基础上再加上0.5秒进行阻塞。当执行execute方法的时候,也就是添加任务的时候,唤醒selector,防止selector阻塞时间过长。
?2、当selector返回的时候,会调用processSelectedKeys方法对selectKey进行处理。
?3、当processSelectedKeys方法执行结束后,则按照ioRatio的比例执行runAllTasks方法,默认是IO任务时间和非IO任务时间是相同的,用户可以根据自己的应用特点进行调整。比如非IO任务比较多,那么就将ioRatio调小一点,这样非IO任务就能执行长一点时间,防止队列积攒过多的任务。