当前位置: 代码迷 >> 综合 >> Reactor — NIO原理以及Reactor模式
  详细解决方案

Reactor — NIO原理以及Reactor模式

热度:49   发布时间:2023-12-24 20:22:39.0

一、JAVA NIO

NIO主要有三大核心部分:Channel(通道),Buffer(缓冲区), Selector。传统IO基于字节流和字符流进行操作,而NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。

NIO和传统IO(一下简称IO)之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。NIO的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。 NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变得可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

Channel

Channel和IO中的Stream(流)是差不多一个等级的。只不过Stream是单向的,譬如:InputStream, OutputStream.而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。
NIO中的Channel的主要实现有:分别可以对应文件IO、UDP和TCP(Server和Client)

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

Buffer

缓冲区,实际上是一个容器,一个连续数组。Channel提供从文件、网络读取数据的渠道,但是读写的数据都必须经过Buffer。NIO中的关键Buffer实现有:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分别对应基本数据类型: byte, char, double, float, int, long, short。当然NIO中还有MappedByteBuffer, HeapByteBuffer, DirectByteBuffer等这里先不进行陈述。

Selector

NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,接着我们可以处理这些数据。

Selector内部原理实际是在做一个对所注册的channel的轮询访问,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,比如数据来了,他就会站起来报告,交出一把钥匙,让我们通过这把钥匙来读取这个channel的内容。

Selector运行单线程处理多个Channel,如果你的应用打开了多个通道,但每个连接的流量都很低,使用Selector就会很方便。例如在一个聊天服务器中。要使用Selector, 得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新的连接进来、数据接收等。

Reactor模式原理

Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
在这里插入图片描述
在非堵塞I/O API中,监视器是其一个重要的类Selector,被监视的源目标是可以被Selector联系的SelectableChannel(基本也属于Selector的相关部分),事件类型有:是否有接受的连接(OP_ACCEPT)、是否可以连接(OP_CONNECT)、是否可以读取(OP_READ)和是否可以写入(OP_WRITE)。

监视器Selector主要是监视这些事件,一旦发生,生成SelectionKey对象,Selector是自我触发、自我激活的,因此是典型的Reactor模式实现,但是,在非堵塞I/O API中,并不是由Selector来实现事件的处理,事件处理是由Selector激活出来后,通过其他处理器Handler来实现处理,开发者使用非堵塞I/O API需要做的工作就是:获取Selector激发的事件,然后根据相应事件类型,编制自己的处理器代码来进行具体处理。例如,如果是可读取事件,那么编制代码从SelectableChannel读取数据包,然后处理这个数据包。

在非堵塞I/O API中,使用Reactor模式将事件发生和事件处理两个部分实现分离解耦,事件发生部分只负责事件的激活,而事件处理由专门的处理器实现具体处理。

1、Reactor模式事件机制

经典的网络服务的设计如下图,在每个线程中完成对数据的处理:
在这里插入图片描述
但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。Reactor模式类似于AWT中的Event处理:
在这里插入图片描述

2、Reactor模式参与者

1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread
2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener
在这里插入图片描述
Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SelectorKey来告知我们,在此我们实现事件和handler的绑定。

了解以上基本原理,我们结合代码看看reactor模式代码使用:

public class Reactor implements Runnable{
    final Selector selector;final ServerSocketChannel serverSocket;Reactor(int port) throws IOException {
    selector = Selector.open();serverSocket = ServerSocketChannel.open();InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);serverSocket.socket().bind(address);serverSocket.configureBlocking(false);//向selector注册该channelSelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);logger.debug("-->Start serverSocket.register!");//利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptorsk.attach(new Acceptor());logger.debug("-->attach(new Acceptor()!");}public void run() {
     // normally in a new Threadtry {
    while (!Thread.interrupted()){
    selector.select();Set selected = selector.selectedKeys();Iterator it = selected.iterator();//Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。while (it.hasNext())//来一个事件 第一次触发一个accepter线程//以后触发SocketReadHandlerdispatch((SelectionKey)(it.next()));selected.clear();}}catch (IOException ex) {
    logger.debug("reactor stop!"+ex);}}//运行Acceptor或SocketReadHandlervoid dispatch(SelectionKey k) {
    Runnable r = (Runnable)(k.attachment());if (r != null){
    // r.run();//....}}class Acceptor implements Runnable {
     // innerpublic void run() {
    try {
    logger.debug("-->ready for accept!");SocketChannel c = serverSocket.accept();if (c != null)//调用Handler来处理channelnew SocketReadHandler(selector, c);}catch(IOException ex) {
    logger.debug("accept stop!"+ex);}}}
}

以上代码中巧妙使用了SocketChannel的attach功能,将Handler和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应的Handler。

Handler代码:


public class SocketReadHandler implements Runnable {
    public static Logger logger = Logger.getLogger(SocketReadHandler.class);private Test test=new Test();final SocketChannel socket;final SelectionKey sk;static final int READING = 0, SENDING = 1;int state = READING;public SocketReadHandler(Selector sel, SocketChannel c)throws IOException {
    socket = c;socket.configureBlocking(false);sk = socket.register(sel, 0);//将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。//参看dispatch(SelectionKey k)sk.attach(this);//同时将SelectionKey标记为可读,以便读取。sk.interestOps(SelectionKey.OP_READ);sel.wakeup();}public void run() {
    try{
    // test.read(socket,input);readRequest() ;}catch(Exception ex){
    logger.debug("readRequest error"+ex);}}/** * 处理读取data * @param key * @throws Exception */
private void readRequest() throws Exception {
    ByteBuffer input = ByteBuffer.allocate(1024);input.clear();try{
    int bytesRead = socket.read(input);......//激活线程池 处理这些requestrequestHandle(new Request(socket,btt));.....}catch(Exception e) {
    }}

Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读、处理、写、发出等流程处理。
将数据读出后,可以将这些数据处理线程做成一个线程池,这样数据读出后,立即扔到线程池中,这样加速处理速度:
在这里插入图片描述