当前位置: 代码迷 >> 综合 >> NIO Reactor
  详细解决方案

NIO Reactor

热度:83   发布时间:2023-11-09 17:46:02.0

NIO 编程案例

服务端


import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;public class NioServer {
    private static Map<String,SocketChannel>  clientMap =new HashMap<>();public static void main(String[] args) throws IOException {
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8899));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while(selector.select()>0) {
    Set<SelectionKey> selectionKeys =selector.selectedKeys();for (SelectionKey selectionKey: selectionKeys) {
    if(selectionKey.isValid()&&selectionKey.isAcceptable()) {
    System.out.println("---- NioServer Acceptable -----");ServerSocketChannel ssc =(ServerSocketChannel)selectionKey.channel();SocketChannel sc = ssc.accept();sc.configureBlocking(false);//注册事件sc.register(selector, SelectionKey.OP_READ);// 存储客户端String key ="【"+UUID.randomUUID()+"】";clientMap.put(key, sc);}else if(selectionKey.isValid()&&selectionKey.isReadable()) {
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);System.out.println("---- NioServer Readable -----");SocketChannel sc =(SocketChannel)selectionKey.channel();try{
    int count = sc.read(byteBuffer);byteBuffer.flip();if(count>0) {
    Charset charset =Charset.forName("UTF-8");String msg = String.valueOf(charset.decode(byteBuffer).array());System.out.println(sc.getLocalAddress() + " 客户端发送的信息 : " + msg);// 消息分发for (Entry<String, SocketChannel>  entry: clientMap.entrySet()) {
    byteBuffer.clear();if(entry.getValue().equals(sc)) {
    byteBuffer.put(("自己发送的消息 : "+ msg).getBytes());}else {
    byteBuffer.put( (entry.getKey() +"客户端发送的消息 : "+ msg).getBytes() );}byteBuffer.flip();entry.getValue().write(byteBuffer);}}}catch (Exception e){
    selectionKey.cancel();}}
// else if(selectionKey.isValid()&&selectionKey.isWritable()){
    
// ByteBuffer bb = ByteBuffer.allocate(1024);
// System.out.println("---- NioServer Writable -----");
// }selectionKeys.remove(selectionKey);}}}
}

客户端

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.*;public class NioClient {
    public static void client() throws IOException {
    Scanner input = new Scanner(System.in);SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);socketChannel.connect(new InetSocketAddress("127.0.0.1",8899));Selector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_CONNECT);while(selector.select()>0) {
    Set<SelectionKey> selectionKeys =selector.selectedKeys();for (SelectionKey selectionKey: selectionKeys) {
    SocketChannel sc =(SocketChannel)selectionKey.channel();if(selectionKey.isValid()&&selectionKey.isConnectable()) {
    ByteBuffer byteBuffer= ByteBuffer.allocate(1024);System.out.println("---- NioClient Connectable -----");sc.finishConnect();// 完成连接sc.register(selector,SelectionKey.OP_READ);byteBuffer.put( (" 服务器已接受客户端的连接 !!!").getBytes() );byteBuffer.flip();sc.write(byteBuffer);// 启动新线程,监听 cmd 输入new Thread(()->{
    while(input.hasNext()){
    byteBuffer.clear();String msg = input.nextLine();byteBuffer.put(msg.getBytes(StandardCharsets.UTF_8));byteBuffer.flip();try {
    sc.write(byteBuffer);} catch (IOException e) {
    e.printStackTrace();}}}).start();}else if(selectionKey.isValid()&&selectionKey.isReadable()) {
    ByteBuffer byteBuffer= ByteBuffer.allocate(1024);System.out.println("---- NioClient Readable -----");int count = sc.read(byteBuffer);byteBuffer.flip();if(count>0) {
    String msg= new String(byteBuffer.array(),0,count);System.out.println(msg);}}
// else if(selectionKey.isValid()&&selectionKey.isWritable()){
    
// ByteBuffer bb= ByteBuffer.allocate(1024);
// System.out.println("---- NioClient Writable -----");
// }
// sc.register(selector, SelectionKey.OP_READ);selectionKeys.remove(selectionKey);}}}public static void main(String[] args) throws IOException {
    
// for (int i = 0; i < 10; i++) {
    
// new Thread(()->{
    
// try { client(); } catch (IOException e) { e.printStackTrace(); }
// }) .start();
//
// }try {
     client(); } catch (IOException e) {
     e.printStackTrace(); }}
}

Reactor

单reactor 单线程

/*** 处理事件的接口(接受事件,读写事件等)*/
public interface Handler {
    // 处理事件的方法public void handle(Selector selector , SelectableChannel channel, SelectionKey selectionKey) ;
}
/*** 只是在接口的基础上做了扩展*/
public abstract class AbstractHandler implements  Handler{
    private final List<ClientMsg> clients = new CopyOnWriteArrayList<>();public List<ClientMsg> getClients() {
    return clients;}
}
/*** 处理Acceptable 事件*/
public class AcceptHandler extends AbstractHandler{
    public AcceptHandler()  {
      }public AcceptHandler(Selector selector, ServerSocketChannel socketChannel) throws IOException {
    socketChannel.configureBlocking(false);SelectionKey selectionKey = socketChannel.register(selector, 0);//注册一个不存在的事件selectionKey.attach(this);selectionKey.interestOps(SelectionKey.OP_ACCEPT);//将此键的 interest 集合设置为给定值 : 注册读事件selector.wakeup();//唤醒前一个阻塞的socket}@Overridepublic void handle(Selector selector , SelectableChannel channel, SelectionKey selectionKey) {
    try{
    if(selectionKey.isAcceptable()){
    SocketChannel socketChannel = ((ServerSocketChannel)channel).accept();if(socketChannel!=null) {
     new ReaderHandler(selector,socketChannel);}}}catch (Exception e){
    selectionKey.cancel();}}
}/*** 处理Reader事件*/
public class ReaderHandler extends AbstractHandler{
    static final int MAX_IN = 1024;static ByteBuffer input = ByteBuffer.allocate(MAX_IN);public ReaderHandler()  {
    }public ReaderHandler(Selector selector, SocketChannel socketChannel) throws IOException {
    socketChannel.configureBlocking(false);// 也可以注解注册SelectionKey.OP_READ; 这里先不关心任何事件, 后面注册读事件SelectionKey selectionKey = socketChannel.register(selector, 0);//注册一个不存在的事件selectionKey.attach(this);selectionKey.interestOps(SelectionKey.OP_READ);//将此键的 interest 集合设置为给定值 : 注册读事件selector.wakeup();//唤醒前一个阻塞的socket}public ReaderHandler(Selector selector, SocketChannel socketChannel,SelectionKey selectionKey) throws IOException {
    socketChannel.configureBlocking(false);// 也可以注解注册SelectionKey.OP_READ; 这里先不关心任何事件, 后面注册读事件selectionKey = socketChannel.register(selector, 0);//注册一个不存在的事件selectionKey.attach(this);selectionKey.interestOps(SelectionKey.OP_READ);//将此键的 interest 集合设置为给定值 : 注册读事件selector.wakeup();//唤醒前一个阻塞的socket}@Overridepublic void handle(Selector selector, SelectableChannel channel, SelectionKey selectionKey)  {
    try{
    SocketChannel socketChannel = (SocketChannel) channel;input.clear();//读入socketChannel.read(input);input.flip();// 写出到StringString msg = String.valueOf( Charset.forName("UTF-8").decode(input).array() );System.out.println("---ReaderHandler.handle() msg : " + msg);// 处理数据process();// 读取完毕后 注入Writer 事件if(socketChannel!=null) new WriterHandler(selector,socketChannel);}catch (Exception e){
    selectionKey.cancel();}}boolean inputIsComplete() {
    // 加入实现return true;}void process() {
    // 加入实现}
}/*** 处理 Writer 事件*/
public class WriterHandler extends AbstractHandler{
    static final int MAX_OUT = 1024;static ByteBuffer output = ByteBuffer.allocate(MAX_OUT);public WriterHandler()  {
    }public WriterHandler(Selector selector, SocketChannel socketChannel) throws IOException {
    socketChannel.configureBlocking(false);SelectionKey selectionKey = socketChannel.register(selector, 0);//注册一个不存在的事件selectionKey.attach(this);selectionKey.interestOps(SelectionKey.OP_WRITE);//注册写事件selector.wakeup();//唤醒前一个阻塞的socket}@Overridepublic void handle(Selector selector, SelectableChannel channel, SelectionKey selectionKey)  {
    try{
    SocketChannel socketChannel = (SocketChannel) channel;output.clear();String msg = "默认消息\n";process();output.put(msg.getBytes());output.flip();socketChannel.write(output);System.out.println("---WriterHandler.handle() msg : " + msg);
// if (outputIsComplete()) {
    
// selectionKey.cancel();
// }// 读取完毕后 注入Reader事件if(socketChannel!=null) new ReaderHandler(selector,(SocketChannel)selectionKey.channel());}catch (Exception e){
    selectionKey.cancel();}}boolean outputIsComplete() {
    // 加入实现return true;}void process() {
    // 加入实现}
}
/*** 核心类Reactor */
public class Reactor implements Runnable{
    final Selector selector;final ServerSocketChannel serverSocketChannel;/*** reactor 1: 初始设置*/Reactor(int port) throws IOException {
    System.out.println("----Reactor 构造函数" );selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(port));AcceptHandler acceptor= new AcceptHandler(selector,serverSocketChannel);}/*** reactor 2: 分发循环*/@Overridepublic void run() {
    try {
    System.out.println("----Reactor.run() 的线程名 : "+Thread.currentThread().getName()+"\n");while (!Thread.interrupted() ) {
    //获取注册事件selector.select(); //这个方法是阻塞方法 ,而selector.wakeup()会唤醒阻塞中的selectorSet<SelectionKey> sks = selector.selectedKeys();Iterator<SelectionKey> it = sks.iterator();while (it.hasNext()) {
    SelectionKey selectionKey = it.next();dispatcher(selectionKey);//分发每一个请求}// 也可以在while循环中使用iterator的remove方法sks.clear();}} catch (IOException ignored) {
    // ignored}}private void dispatcher(SelectionKey selectionKey) throws IOException {
    Selector selector = selectionKey.selector();SelectableChannel channel  = selectionKey.channel();Handler handler = (Handler) selectionKey.attachment();//获取之前绑定的对象if (null != handler) {
    handler.handle(selector,channel,selectionKey);}}public static void main(String[] args) throws Exception {
    Reactor reactor= new Reactor(8899);new Thread(reactor).start();//这就是 reactor 线程 服务端线程}
}

单reactor 多线程


主从reactor 多线程