当前位置: 代码迷 >> 综合 >> Netty(EventLoop 和线程模型)
  详细解决方案

Netty(EventLoop 和线程模型)

热度:104   发布时间:2023-11-20 01:27:41.0

EventLoop接口

    Netty的EventLoop是协同设计的一部分,它采用了两个基本的API:并发和网络编程。首先,io.netty.util.concurrent包构建在JDK的java.util.concurrent包上,用来提供线程执行器。其次,io.netty.channel包中的类,为了与Channel的事件进行交互,扩展了这些接口/类。

    在这个模型中,一个EventLoop将由一个永远都不会改变的Thread驱动,  同时任务(Runnable或者Callable)可以直接提交给EventLoop实现,  以立即执行或者调度执行。根据配置和可用核心的不同,可能会创建多个EventLoop实例用以优化资源的使用,并且单个EventLoop可能会被指派用于服务多个Channel。 

    需要注意的是,Netty的EventLoop在继承了ScheduledExecutorService的同时,只定义了一个方法,parent()。这个方法,如下面的代码片断所示,用于返回到当前EventLoop实现的实例所属的EventLoopGroup的引用。

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {@OverrideEventLoopGroup parent();
}

事件/任务的执行顺序  事件和任务是以先进先出(FIFO)的顺序执行的。这样可以通过保证字节内容总是按正确的顺序被处理,消除潜在的数据损坏的可能性。

    通过源码分析可以看到所有的任务都会在这个方法中执行,而NioEventLoop的run方法最外层套了一个for (;;),它会判断是否有任务提交,然后不断循环该内部代码。

Netty 4 中的 I/O 和事件处理

    由I/O操作触发的事件将流经安装了一个或者多个ChannelHandler的ChannelPipeline。传播这些事件的方法调用可以随后被Channel-Handler所拦截,并且可以按需地处理事件。在Netty4中,所有的I/O操作和事件都由已经被分配给了EventLoop的那个Thread来处理。

Netty 3 中的 I/O操作

    在以前的版本中所使用的线程模型只保证了入站(之前称为上游)事件会在所谓的I/O线程(对应于Netty4中的EventLoop)中执行。所有的出站(下游)事件都由调用线程处理,其可能是I/O线程也可能是别的线程。但是其需要在ChannelHandler中对出站事件进行仔细的同步。 简而言之,不可能保证多个线程不会在同一时刻尝试访问出站事件。

任务调度

JDK  的任务调度API

    在Java5之前,任务调度是建立在java.util.Timer类之上的,其使用了一个后台Thread,并且具有 与标准线程相同的限制。随后,JDK提供了java.util.concurrent包,它定义了interface ScheduledExecutorService。

java.util.concurrent.Executors类的工厂方法:

方法 描述

newScheduledThreadPool(int corePoolSize)

newScheduledThreadPool(int corePoolSize,ThreadFactory threadFactory)

创建一个ScheduledThreadExecutorService,用于调度命令在指定延迟之后运行或者周期性地执行。它使用corePoolSize参数来计算线程数

newSingleThreadScheduledExecutor()

newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

创建一个ScheduledThreadExecutorService,用于调度命令在指定延迟之后运行或者周期性地执行。它使用一个线程来执行被调度的任务

使用 EventLoop调度任务

    ScheduledExecutorService的实现具有局限性,例如,事实上作为线程池管理的一部分,将会有额外的线程创建。如果有大量任务被紧凑地调度,那么这将 成为一个瓶颈。Netty通过Channel的EventLoop实现任务调度解决了这一问题:

 ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {System.out.println("开启异步线程");}},1,TimeUnit.SECONDS);//调度任务在从现在开始的1秒之后执行

如果要开启一个周期性的任务,如心跳检测:

   ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {ctx.writeAndFlush(Unpooled.copiedBuffer("发送心跳"+new Date(), Charset.forName("UTF-8")));}},1,5, TimeUnit.SECONDS);//调度在1秒之后,并且以后每间隔5秒运行

如果想要取消这个任务可以这么写:

  ScheduledFuture<?> future=ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {System.out.println("开启异步线程");}},1,TimeUnit.SECONDS);future.cancel(false);

通过调用返回值future的cancel方法进行取消任务。

实现细节

线程管理 

    Netty线程模型的卓越性能取决于对于当前执行的Thread的身份的确定,也就是说,确定它是否是分配给当前Channel以及它的EventLoop的那一个线程。(EventLoop将负责处理一个Channel的整个生命周期内的所有事件。)

    如果(当前)调用线程正是支撑EventLoop的线程,那么所提交的代码块将会被 (直接 )执行。否则,EventLoop将调度该任务以便稍后执行,并将它放入到内部队列中。当EventLoop下次处理它的事件时,它会执行队列中的那些任务/事件。这也就解释了任何的Thread是如何与Channel直接交互而无需在ChannelHandler中进行额外同步的。

    每个EventLoop都有它自已的任务队列,独立于任何其他的EventLoop。

永远不要将一个长时间运行的任务放入到执行队列中,因为它将阻塞需要在同一线程上执行的任何其他任务。

如果必须要进行阻塞调用或者执行长时间运行的任务,建议使用一个专门的EventExecutor。

ChannelHandler的执行和阻塞

    通常ChannelPipeline中的每一个ChannelHandler都是通过它的EventLoop(I/O 线程)来处理传递给它的事件的。所以至关重要的是不要阻塞这个线程,因为这会对整体的I/O 处理产生负面的影响。但有时可能需要与那些使用阻塞API  的遗留代码进行交互。对于这种情况,ChannelPipeline有一些接受一个EventExecutorGroup的add()方法。如果一个事件被传递给一个自定义的EventExecutorGroup,它将被包含在这个EventExecutorGroup中的某个EventExecutor所处理,从而被从该Channel本身的EventLoop中移除。对于这种用例,Netty提供了一个叫DefaultEventExecutorGroup的默认实现。

EventLoop/线程的分配

    服务于Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根据不同的传输实现,EventLoop的创建和分配方式也不同。

异步传输

    异步传输实现只使用了少量的EventLoop(以及和它们相关联的Thread), 而且在当前的线程模型中,它们可能会被多个Channel所共享。这使得可以通过尽可能少量的Thread来支撑大量的Channel,而不是每个Channel分配一个Thread。

    EventLoopGroup负责为每个新创建的Channel分配一个EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的EventLoop可能会被分配给多个Channel。(这一点在将来的版本中可能会改变。)

    一旦一个Channel被分配给一个EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的Thread) 。

    需要注意的是,EventLoop的分配方式对ThreadLocal的使用的影响。因为一个EventLoop通常会被用于支撑多个Channel,所以对于所有相关联的Channel来说,ThreadLocal都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而,在一些无状态的上下文中,它仍然可以被用于在多个Channel之间共享一些重度的或者代价昂贵的对象,甚至是事件。

阻塞传输

用于像OIO(  旧的阻塞I/O)这样的其他传输的设计略有不同。这里每一个Channel都将被分配给一个EventLoop(以及它的Thread)。

    但是,正如同之前一样,得到的保证是每个Channel的I/O事件都将只会被一个Thread(用于支撑该Channel的EventLoop的那个Thread)处理。

参考《Netty实战》

附:

package netty.in.action;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;import java.nio.charset.Charset;public class EchoServer {final ByteBuf bufs= Unpooled.copiedBuffer("Hello,刘德华", Charset.forName("UTF-8"));public void bind(int port) throws Exception {//配置服务端的NIO线程组EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).childHandler(new ChildChannelHandler());// 绑定端口,同步等待成功ChannelFuture f=b.bind(port).sync();//等待服务端监听端口关闭f.channel().closeFuture().sync();} finally {//退出,释放线程池资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {protected void initChannel(SocketChannel ch) throws Exception {System.out.println("服务端启动……");ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {ByteBuf buf = (ByteBuf) msg;if(buf.hasArray()){byte[] array=buf.array();//返回该缓冲区的备份字节数组。int offset=buf.arrayOffset()+buf.readerIndex();//计算第一个字节的偏移量int length=buf.readableBytes();//获取可读字节数String s=new String(array,offset,length);System.out.println("s="+s);}else{byte[] array = new byte[buf.readableBytes()];//获取可读字节数并分配一个新的数组来保存buf.getBytes(buf.readerIndex(),array);//将字节复制到该数组String s=new String(array,0,buf.readableBytes());System.out.println("直接缓冲区:"+s);}byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println(body);bufs.retain();//引用计数器加一ChannelFuture future=ctx.writeAndFlush(bufs);future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess())System.out.println("成功");else{System.out.println("失败");future.cause().printStackTrace();future.channel().close();}}});
//                    ctx.close();}});}}public static void main(String[] args) throws Exception {int port=8080;new EchoServer().bind(port);}
}

package netty.in.action;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ByteProcessor;
import io.netty.util.concurrent.ScheduledFuture;import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.concurrent.TimeUnit;public class EchoClient {final ByteBuf buf= Unpooled.copiedBuffer("Hello,王宝强", Charset.forName("UTF-8"));public void connect(int port, String host) throws Exception {// 配置客户端NIO线程组EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChildChannelHandler2() );// 发起异步连接操作ChannelFuture f = b.connect(host, port).sync();// 等待客户端链路关闭f.channel().closeFuture().sync();} finally {// 优雅退出,释放NIO线程组group.shutdownGracefully();}}private class ChildChannelHandler2 extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {System.out.println("客户端启动……");ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {public void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(buf);}public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println(body);ScheduledFuture<?> future=ctx.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {System.out.println("开启异步线程");}},1,TimeUnit.SECONDS);
//                    future.cancel(false);ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {ctx.writeAndFlush(Unpooled.copiedBuffer("发送心跳"+new Date(), Charset.forName("UTF-8")));}},1,5, TimeUnit.SECONDS);}});}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {protected void initChannel(SocketChannel ch) throws Exception {System.out.println("客户端启动……");ByteBuf bufs= Unpooled.copiedBuffer("pipeline发送的数据->", Charset.forName("UTF-8"));ch.pipeline().write(bufs);//通过调用ChannelPipeline的write方法将数据写入通道,但是不刷新ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.channel().write(Unpooled.copiedBuffer("通过ChannelHandlerContext获取的channel发送的消息->",Charset.forName("UTF-8")));//通过ChannelHandlerContext获取的channel发送的消息->CompositeByteBuf messageBuf=Unpooled.compositeBuffer();ByteBuf headerBuf=buf;ByteBuf bodyBuf=buf;messageBuf.addComponent(bodyBuf);//将ByteBuf实例追加到CompositeByteBufmessageBuf.addComponent(headerBuf);for (ByteBuf buf:messageBuf){//遍历所有ByteBufSystem.out.println(buf);byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("复合缓冲区:"+body);}ctx.writeAndFlush(buf);}public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {ByteBuf buf = (ByteBuf) msg;ByteBuf copyBuf=((ByteBuf) msg).copy();
//                    System.out.println(buf.refCnt());//返回此对象的引用计数。如果为0,则表示此对象已被释放。
//                    buf.release();//释放引用计数对象for (int i = 0; i < buf.capacity(); i++) {byte b=buf.getByte(i);if((char)b>='a'&&(char)b<='z'||(char)b>='A'&&(char)b<='Z'||(char)b==',')System.out.println("i="+(char)b);}int i=buf.forEachByte(new ByteProcessor() {@Overridepublic boolean process(byte value) throws Exception {byte[] b=",".getBytes();if (b[0]!=value)return true;elsereturn false;}});System.out.println("i="+i+" value="+(char) buf.getByte(i));ByteBuf sliced = buf.slice(0,2);sliced.setByte(0,(byte)'h');byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println(body);ctx.fireChannelRead(copyBuf);}});ch.pipeline().addLast("text2",new ChannelInboundHandlerAdapter(){public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {ByteBuf buf = (ByteBuf) msg;byte[] req = new byte[buf.readableBytes()];buf.readBytes(req);String body = new String(req, "UTF-8");System.out.println("text2:"+body);ByteBuf bufs= Unpooled.copiedBuffer("test2发送的数据", Charset.forName("UTF-8"));ctx.writeAndFlush(bufs);ctx.close();}});
//            ch.pipeline().remove("text2");}}public static void main(String[] args) throws Exception {int port = 8080;new EchoClient().connect(port, "127.0.0.1");}
}