用异步输入输出流编写Socket进程通信程序
在Merlin中加入了用于实现异步输入输出机制的应用程序接口包:java.nio(新的输入输出包,定义了很多基本类型缓冲(Buffer)),java.nio.channels(通道及选择器等,用于异步输入输出),java.nio.charset(字符的编码解码)。通道(Channel)首先在选择器(Selector)中注册自己感兴趣的事件,当相应的事件发生时,选择器便通过选择键(SelectionKey)通知已注册的通道。然后通道将需要处理的信息,通过缓冲(Buffer)打包,编码/解码,完成输入输出控制。
通道介绍:
这里主要介绍ServerSocketChannel和 SocketChannel.它们都是可选择的(selectable)通道,分别可以工作在同步和异步两种方式下(注意,这里的可选择不是指可以选择两种工作方式,而是指可以有选择的注册自己感兴趣的事件)。可以用channel.configureBlocking(Boolean )来设置其工作方式。与以前版本的API相比较,ServerSocketChannel就相当于ServerSocket(ServerSocketChannel封装了ServerSocket),而SocketChannel就相当于Socket(SocketChannel封装了Socket)。当通道工作在同步方式时,编程方法与以前的基本相似,这里主要介绍异步工作方式。
所谓异步输入输出机制,是指在进行输入输出处理时,不必等到输入输出处理完毕才返回。所以异步的同义语是非阻塞(None Blocking)。在服务器端,ServerSocketChannel通过静态函数open()返回一个实例serverChl。然后该通道调用serverChl.socket().bind()绑定到服务器某端口,并调用register(Selector sel, SelectionKey.OP_ACCEPT)注册OP_ACCEPT事件到一个选择器中(ServerSocketChannel只可以注册OP_ACCEPT事件)。当有客户请求连接时,选择器就会通知该通道有客户连接请求,就可以进行相应的输入输出控制了;在客户端,clientChl实例注册自己感兴趣的事件后(可以是OP_CONNECT,OP_READ,OP_WRITE的组合),调用clientChl.connect(InetSocketAddress )连接服务器然后进行相应处理。注意,这里的连接是异步的,即会立即返回而继续执行后面的代码。
选择器和选择键介绍:
选择器(Selector)的作用是:将通道感兴趣的事件放入队列中,而不是马上提交给应用程序,等已注册的通道自己来请求处理这些事件。换句话说,就是选择器将会随时报告已经准备好了的通道,而且是按照先进先出的顺序。那么,选择器是通过什么来报告的呢?选择键(SelectionKey)。选择键的作用就是表明哪个通道已经做好了准备,准备干什么。你也许马上会想到,那一定是已注册的通道感兴趣的事件。不错,例如对于服务器端serverChl来说,可以调用key.isAcceptable()来通知serverChl有客户端连接请求。相应的函数还有:SelectionKey.isReadable(),SelectionKey.isWritable()。一般的,在一个循环中轮询感兴趣的事件(具体可参照下面的代码)。如果选择器中尚无通道已注册事件发生,调用Selector.select()将阻塞,直到有事件发生为止。另外,可以调用selectNow()或者select(long timeout)。前者立即返回,没有事件时返回0值;后者等待timeout时间后返回。一个选择器最多可以同时被63个通道一起注册使用。
应用实例:
下面是用异步输入输出机制实现的客户/服务器实例程序――程序清单1(限于篇幅,只给出了服务器端实现,读者可以参照着实现客户端代码):
程序类图
public class NBlockingServer {int port = 8000;int BUFFERSIZE = 1024;Selector selector = null;ServerSocketChannel serverChannel = null;HashMap clientChannelMap = null;//用来存放每一个客户连接对应的套接字和通道public NBlockingServer( int port ) {this.clientChannelMap = new HashMap();this.port = port;}public void initialize() throws IOException {//初始化,分别实例化一个选择器,一个服务器端可选择通道this.selector = Selector.open();this.serverChannel = ServerSocketChannel.open();this.serverChannel.configureBlocking(false);//设置为异步方式InetAddress localhost = InetAddress.getLocalHost();InetSocketAddress isa = new InetSocketAddress(localhost, this.port );this.serverChannel.socket().bind(isa);//将该套接字绑定到服务器某一可用端口}//结束时释放资源public void finalize() throws IOException {this.serverChannel.close();this.selector.close();}//将读入字节缓冲的信息解码public String decode( ByteBuffer byteBuffer ) throws CharacterCodingException {Charset charset = Charset.forName( "ISO-8859-1" );CharsetDecoder decoder = charset.newDecoder();CharBuffer charBuffer = decoder.decode( byteBuffer );String result = charBuffer.toString();return result;}//监听端口,当通道准备好时进行相应操作public void portListening() throws IOException, InterruptedException {//服务器端通道注册OP_ACCEPT事件SelectionKey acceptKey =this.serverChannel.register( this.selector,SelectionKey.OP_ACCEPT );//当有已注册的事件发生时,select()返回值将大于0while (acceptKey.selector().select() > 0 ) {System.out.println("event happened");//取得所有已经准备好的所有选择键Set readyKeys = this.selector.selectedKeys();//使用迭代器对选择键进行轮询Iterator i = readyKeys.iterator();while (i.hasNext()) {SelectionKey key = (SelectionKey) i.next();if ( key.isReadable() ) {//如果是通道读准备好事件System.out.println("Readable");//取得选择键对应的通道和套接字SelectableChannel nextReady =(SelectableChannel) key.channel();Socket socket = (Socket) key.attachment();//处理该事件,处理方法已封装在类ClientChInstance中this.readFromChannel( socket.getChannel(),(ClientChInstance)this.clientChannelMap.get( socket ) );}else if ( key.isWritable() ) {//如果是通道写准备好事件System.out.println("writeable");//取得套接字后处理,方法同上Socket socket = (Socket) key.attachment();SocketChannel channel = (SocketChannel) socket.getChannel();this.writeToChannel( channel,"This is from server!");}}}}//对通道的写操作public void writeToChannel( SocketChannel channel, String message ) throws IOException {ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );int nbytes = channel.write( buf );}//对通道的读操作public void readFromChannel( SocketChannel channel, ClientChInstance clientInstance )throws IOException, InterruptedException {ByteBuffer byteBuffer = ByteBuffer.allocate( BUFFERSIZE );int nbytes = channel.read( byteBuffer );byteBuffer.flip();String result = this.decode( byteBuffer );//当客户端发出”@exit”退出命令时,关闭其通道if ( result.indexOf( "@exit" ) >= 0 ) {channel.close();}else {clientInstance.append( result.toString() );//读入一行完毕,执行相应操作if ( result.indexOf( "/n" ) >= 0 ){System.out.println("client input"+result);clientInstance.execute();}}}//该类封装了怎样对客户端的通道进行操作,具体实现可以通过重载execute()方法public class ClientChInstance {SocketChannel channel;StringBuffer buffer=new StringBuffer();public ClientChInstance( SocketChannel channel ) {this.channel = channel;}public void execute() throws IOException {String message = "This is response after reading from channel!";writeToChannel( this.channel, message );buffer = new StringBuffer();}//当一行没有结束时,将当前字窜置于缓冲尾public void append( String values ) {buffer.append( values );}}//主程序public static void main( String[] args ) {NBlockingServer nbServer = new NBlockingServer(8000);try {nbServer.initialize();} catch ( Exception e ) {e.printStackTrace();System.exit( -1 );}try {nbServer.portListening();}catch ( Exception e ) {e.printStackTrace();}}}