当前位置: 代码迷 >> J2SE >> java NIO读出数据比写下数据少
  详细解决方案

java NIO读出数据比写下数据少

热度:92   发布时间:2016-04-24 00:27:12.0
java NIO读出数据比写入数据少
简单写了个C/S模式的NIO,例子:客户端向服务器写50000次数据,服务接受到数据后打印出来。
由于首次使用NIO,碰到个问题:通过计数显示,客户端写了50000次。
但是,服务器端执行读操作时,每次读到10000次左右时就不再读了,跳回了selector.select()监听。
按理说,数据没读完,我每次读取完也执行了key.interestOps(SelectionKey.OP_READ),应该是能直到数据读取完才对,才会回到事件监听。
可是,现在就是不对了,求教。是我理解有错,代码不会,还是什么原因?
客户端代码:
Java code
public class MyMQService{    // 信道选择器    private Selector selector = null;    // 与服务器通信的信道    private SocketChannel socketChannel = null;    // 要连接的服务器Ip地址    private String hostIp = "localhost";    // 要连接的远程服务器在监听的端口    private int hostListenningPort = 33445;    private byte[] buffer = new byte[256];    private static int count;    /**     * 构造函数     */    public MyMQService() {        try {            initialize();        } catch (IOException e) {            System.out.println("初始化服务器连接异常" + e.getMessage());            e.printStackTrace();        }    }    /**     * 初始化函数     *     * @throws IOException 异常     */    private void initialize() throws IOException {        // 打开监听信道并设置为非阻塞模式        socketChannel = SocketChannel.open(new InetSocketAddress(hostIp, hostListenningPort));        socketChannel.configureBlocking(false);        // 打开并注册选择器到信道        selector = Selector.open();        socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));        // 启动读取线程        new TCPClientReadThread(selector);    }       public void put(Object obj) throws IOException {        synchronized (this) {            count++;            System.out.println("第@"+count);            Message meg = new Message(1, obj);            // 对象转数组            byte[] objarr = NioUtil.obj2Byte(meg);            // 将对象赋值给固定长度的数组            System.arraycopy(objarr, 0, buffer, 0, objarr.length);            // 发送本次对象            socketChannel.write(ByteBuffer.wrap(buffer));        }    }        public static void main(String[] args) throws IOException {        MyMQService client1 = new MyMQService();        for (int i = 0; i < 10000; i++) {            client1.put("dataxxxx");        }    }}public class TCPClientReadThread implements Runnable {        private Selector selector;        // 超时时间,单位毫秒    private static final int TimeOut = 3000;        public TCPClientReadThread(Selector selector) {        this.selector = selector;                new Thread(this).start();    }        public void run() {        // Object obj = null;        try {            while (true) {                // 等待某信道就绪(或超时)                if (selector.select(TimeOut) == 0) {                    // System.out.println("客户端运行中……");                    continue;                }                // 遍历每个有可用IO操作Channel对应的SelectionKey                for (SelectionKey key : selector.selectedKeys()) {                    // 如果该SelectionKey对应的Channel中有可读的数据                    if (key.isReadable()) {                        ClientTCPProtocol ci = new ClientTCPProtocol(1024);                        ci.handleRead(key);                        // obj = ci.getMegObj();                        // System.out.println("服务器返回消息:" + obj);                        // 为下一次读取作准备                        key.interestOps(SelectionKey.OP_READ);                    }                                        // 删除正在处理的SelectionKey                    selector.selectedKeys().remove(key);                }            }        } catch (Exception ex) {            ex.printStackTrace();        }    }}

服务器端代码:
Java code
public class MyMQServer {    // 超时时间,单位毫秒    private static final int TimeOut = 6000;    // 本地监听端口    private static final int ListenPort = 33445;    public static void main(String[] args) throws IOException {        // 创建选择器        Selector selector = Selector.open();        // 打开监听信道        ServerSocketChannel listenerChannel = ServerSocketChannel.open();        // 与本地端口绑定        listenerChannel.socket().bind(new InetSocketAddress(ListenPort));        // 设置为非阻塞模式        listenerChannel.configureBlocking(false);        // 将选择器绑定到监听信道,只有非阻塞信道才可以注册选择器.并在注册过程中指出该信道可以进行Accept操作        listenerChannel.register(selector, SelectionKey.OP_ACCEPT);        // 创建一个处理协议的实现类,由它来具体操作        TCPProtocol protocol = new ServerTCPProtocol3();        // 反复循环,等待IO        while (true) {            // 等待某信道就绪(或超时)            if (selector.select(TimeOut) == 0) {                System.out.println("服务器运行中……");                continue;            }            // 取得迭代器.selectedKeys()中包含了每个准备好某一I/O操作的信道的SelectionKey            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();            while (iter.hasNext()) {                SelectionKey key = iter.next();                iter.remove();                try {                    if (key.isAcceptable()) {                        // 有客户端连接请求时                        protocol.handleAccept(key);                    }                    if (key.isReadable()) {                        // 从客户端读取数据                        protocol.handleRead(key);                    }                    if (key.isValid() && key.isWritable()) {                        // 客户端可写时                        protocol.handleWrite(key);                    }                } catch (IOException e) {                    // 出现IO异常(如客户端断开连接)时移除处理过的键                    e.printStackTrace();                    key.channel().close();                    continue;                }            }        }    }}public class ServerTCPProtocol3 implements TCPProtocol {    private static int icount = 0;    private byte[] allBuffer = new byte[256];    // 当存在客户端访问时,判断是读,还是写    public void handleAccept(SelectionKey key) throws IOException {        SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();        clientChannel.configureBlocking(false);        clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(256));    }    public void handleRead(SelectionKey key) throws IOException {        readObjByChannel(key);    }    /**     * 从通道中读取对象放到服务器队列中     *     * @param clientChannel 客户端通道     */    public void readObjByChannel(SelectionKey key) throws IOException {        icount++;        System.out.println("进入1");        SocketChannel channel = (SocketChannel) key.channel();        // 拿到256长度的缓冲区,一个key一个        ByteBuffer buffer = (ByteBuffer) key.attachment();        int count = channel.read(buffer);        if (count > 0) {            buffer.flip();            Message meg = (Message)NioUtil.byte2Obj(buffer.array());            // 客户端请求类型            int type = meg.getType();            if (type == 1) {                // 向服务器发数据                try {                    System.out.println("进入2");                    // 将对象消息放到队列中                    System.out.println("接收到来自客户端:" + channel.socket().getRemoteSocketAddress() + "的消息," + meg.getMeg() + "@第" + icount + "次");                    // 向客户端写成功消息                    meg.setMeg("OK");                    byte[] objarr =  NioUtil.obj2Byte(meg);                    System.arraycopy(objarr, 0, allBuffer, 0, objarr.length);                    channel.write(ByteBuffer.wrap(allBuffer));                    buffer.clear();                } catch (Exception e) {                    e.printStackTrace();                }            }        } else if(count < 0){            System.out.println("错啦");            channel.close();        }        key.interestOps(SelectionKey.OP_READ);        System.out.println("结束4");    }    public void handleWrite(SelectionKey key) throws IOException {           }
  相关解决方案