NIO 编程案例
服务端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;public class NioServer {
private static Map<String,SocketChannel> clientMap =new HashMap<>();public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8899));Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while(selector.select()>0) {
Set<SelectionKey> selectionKeys =selector.selectedKeys();for (SelectionKey selectionKey: selectionKeys) {
if(selectionKey.isValid()&&selectionKey.isAcceptable()) {
System.out.println("---- NioServer Acceptable -----");ServerSocketChannel ssc =(ServerSocketChannel)selectionKey.channel();SocketChannel sc = ssc.accept();sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);String key ="【"+UUID.randomUUID()+"】";clientMap.put(key, sc);}else if(selectionKey.isValid()&&selectionKey.isReadable()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);System.out.println("---- NioServer Readable -----");SocketChannel sc =(SocketChannel)selectionKey.channel();try{
int count = sc.read(byteBuffer);byteBuffer.flip();if(count>0) {
Charset charset =Charset.forName("UTF-8");String msg = String.valueOf(charset.decode(byteBuffer).array());System.out.println(sc.getLocalAddress() + " 客户端发送的信息 : " + msg);for (Entry<String, SocketChannel> entry: clientMap.entrySet()) {
byteBuffer.clear();if(entry.getValue().equals(sc)) {
byteBuffer.put(("自己发送的消息 : "+ msg).getBytes());}else {
byteBuffer.put( (entry.getKey() +"客户端发送的消息 : "+ msg).getBytes() );}byteBuffer.flip();entry.getValue().write(byteBuffer);}}}catch (Exception e){
selectionKey.cancel();}}
selectionKeys.remove(selectionKey);}}}
}
客户端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.*;public class NioClient {
public static void client() throws IOException {
Scanner input = new Scanner(System.in);SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);socketChannel.connect(new InetSocketAddress("127.0.0.1",8899));Selector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_CONNECT);while(selector.select()>0) {
Set<SelectionKey> selectionKeys =selector.selectedKeys();for (SelectionKey selectionKey: selectionKeys) {
SocketChannel sc =(SocketChannel)selectionKey.channel();if(selectionKey.isValid()&&selectionKey.isConnectable()) {
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);System.out.println("---- NioClient Connectable -----");sc.finishConnect();sc.register(selector,SelectionKey.OP_READ);byteBuffer.put( (" 服务器已接受客户端的连接 !!!").getBytes() );byteBuffer.flip();sc.write(byteBuffer);new Thread(()->{
while(input.hasNext()){
byteBuffer.clear();String msg = input.nextLine();byteBuffer.put(msg.getBytes(StandardCharsets.UTF_8));byteBuffer.flip();try {
sc.write(byteBuffer);} catch (IOException e) {
e.printStackTrace();}}}).start();}else if(selectionKey.isValid()&&selectionKey.isReadable()) {
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);System.out.println("---- NioClient Readable -----");int count = sc.read(byteBuffer);byteBuffer.flip();if(count>0) {
String msg= new String(byteBuffer.array(),0,count);System.out.println(msg);}}
selectionKeys.remove(selectionKey);}}}public static void main(String[] args) throws IOException {
try {
client(); } catch (IOException e) {
e.printStackTrace(); }}
}
Reactor
单reactor 单线程
public interface Handler {
public void handle(Selector selector , SelectableChannel channel, SelectionKey selectionKey) ;
}
public abstract class AbstractHandler implements Handler{
private final List<ClientMsg> clients = new CopyOnWriteArrayList<>();public List<ClientMsg> getClients() {
return clients;}
}
public class AcceptHandler extends AbstractHandler{
public AcceptHandler() {
}public AcceptHandler(Selector selector, ServerSocketChannel socketChannel) throws IOException {
socketChannel.configureBlocking(false);SelectionKey selectionKey = socketChannel.register(selector, 0);selectionKey.attach(this);selectionKey.interestOps(SelectionKey.OP_ACCEPT);selector.wakeup();}@Overridepublic void handle(Selector selector , SelectableChannel channel, SelectionKey selectionKey) {
try{
if(selectionKey.isAcceptable()){
SocketChannel socketChannel = ((ServerSocketChannel)channel).accept();if(socketChannel!=null) {
new ReaderHandler(selector,socketChannel);}}}catch (Exception e){
selectionKey.cancel();}}
}
public class ReaderHandler extends AbstractHandler{
static final int MAX_IN = 1024;static ByteBuffer input = ByteBuffer.allocate(MAX_IN);public ReaderHandler() {
}public ReaderHandler(Selector selector, SocketChannel socketChannel) throws IOException {
socketChannel.configureBlocking(false);SelectionKey selectionKey = socketChannel.register(selector, 0);selectionKey.attach(this);selectionKey.interestOps(SelectionKey.OP_READ);selector.wakeup();}public ReaderHandler(Selector selector, SocketChannel socketChannel,SelectionKey selectionKey) throws IOException {
socketChannel.configureBlocking(false);selectionKey = socketChannel.register(selector, 0);selectionKey.attach(this);selectionKey.interestOps(SelectionKey.OP_READ);selector.wakeup();}@Overridepublic void handle(Selector selector, SelectableChannel channel, SelectionKey selectionKey) {
try{
SocketChannel socketChannel = (SocketChannel) channel;input.clear();socketChannel.read(input);input.flip();String msg = String.valueOf( Charset.forName("UTF-8").decode(input).array() );System.out.println("---ReaderHandler.handle() msg : " + msg);process();if(socketChannel!=null) new WriterHandler(selector,socketChannel);}catch (Exception e){
selectionKey.cancel();}}boolean inputIsComplete() {
return true;}void process() {
}
}
public class WriterHandler extends AbstractHandler{
static final int MAX_OUT = 1024;static ByteBuffer output = ByteBuffer.allocate(MAX_OUT);public WriterHandler() {
}public WriterHandler(Selector selector, SocketChannel socketChannel) throws IOException {
socketChannel.configureBlocking(false);SelectionKey selectionKey = socketChannel.register(selector, 0);selectionKey.attach(this);selectionKey.interestOps(SelectionKey.OP_WRITE);selector.wakeup();}@Overridepublic void handle(Selector selector, SelectableChannel channel, SelectionKey selectionKey) {
try{
SocketChannel socketChannel = (SocketChannel) channel;output.clear();String msg = "默认消息\n";process();output.put(msg.getBytes());output.flip();socketChannel.write(output);System.out.println("---WriterHandler.handle() msg : " + msg);
if(socketChannel!=null) new ReaderHandler(selector,(SocketChannel)selectionKey.channel());}catch (Exception e){
selectionKey.cancel();}}boolean outputIsComplete() {
return true;}void process() {
}
}
public class Reactor implements Runnable{
final Selector selector;final ServerSocketChannel serverSocketChannel;Reactor(int port) throws IOException {
System.out.println("----Reactor 构造函数" );selector = Selector.open();serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(port));AcceptHandler acceptor= new AcceptHandler(selector,serverSocketChannel);}@Overridepublic void run() {
try {
System.out.println("----Reactor.run() 的线程名 : "+Thread.currentThread().getName()+"\n");while (!Thread.interrupted() ) {
selector.select(); Set<SelectionKey> sks = selector.selectedKeys();Iterator<SelectionKey> it = sks.iterator();while (it.hasNext()) {
SelectionKey selectionKey = it.next();dispatcher(selectionKey);}sks.clear();}} catch (IOException ignored) {
}}private void dispatcher(SelectionKey selectionKey) throws IOException {
Selector selector = selectionKey.selector();SelectableChannel channel = selectionKey.channel();Handler handler = (Handler) selectionKey.attachment();if (null != handler) {
handler.handle(selector,channel,selectionKey);}}public static void main(String[] args) throws Exception {
Reactor reactor= new Reactor(8899);new Thread(reactor).start();}
}
单reactor 多线程
主从reactor 多线程