package com.tz.simple.udp; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.tz.uitl.Common; /** * UDP程序测试<BR> * 时间服务器,采用线程池处理<BR> * * @author http://www.daimami.com * */ public class UDPTimeServer extends Thread { final private static int DAYTIME_PORT = 7897; private DatagramSocket serverSocket = null; private final int THREAD_POOL = 10; private ExecutorService threadPool = null; public UDPTimeServer() throws IOException { serverSocket = new DatagramSocket(DAYTIME_PORT); threadPool = Executors.newFixedThreadPool(THREAD_POOL); System.out.println("时间服务器启动成功,监听端口 [" + DAYTIME_PORT + "] 线程池 [" + THREAD_POOL + "]."); } public void run() { while (true) { try { byte buffer[] = new byte[256]; DatagramPacket packet = new DatagramPacket(buffer, buffer.length); serverSocket.receive(packet); System.out.println("收到客户端 [" + packet.getSocketAddress() + " ] 请求."); threadPool.execute(new HandlerThread(packet)); } catch (Exception e) { } } } /** * 客户端数据处理线程 * * @author http://www.daimami.com * */ private class HandlerThread extends Thread { private DatagramPacket packet = null; public HandlerThread(DatagramPacket p) { packet = p; } public void run() { try { String date = new Date().toString(); byte[] buffer = date.getBytes(); InetAddress address = packet.getAddress(); int port = packet.getPort(); packet = new DatagramPacket(buffer, buffer.length, address, port); serverSocket.send(packet); System.out.println("线程 [" + getName() + "] 向客户端 [" + packet.getSocketAddress() + " ] 发送数据 [" + Common.byte2string(buffer, false) + "]."); } catch (Exception e) { } finally { packet = null; } } } public static void main(String args[]) throws IOException { new UDPTimeServer().start(); } } package com.tz.simple.tcp; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import com.tz.uitl.Common; /** * 非阻塞式TCP时间服务器 * * @author http://www.daimami.com * */ public class NIOTCPTimeServer extends Thread { private int listenPort = 8485; private ServerSocketChannel serverSocket = null; private Selector acceptSelector = null; private Selector readSelector = null; public NIOTCPTimeServer(int p) { listenPort = p; try { acceptSelector = Selector.open(); readSelector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.configureBlocking(false); serverSocket.socket().bind(new InetSocketAddress(listenPort)); serverSocket.register(acceptSelector, SelectionKey.OP_ACCEPT); System.out.println("服务器启动成功, 监听端口 [" + listenPort + "]"); new ReadThread().start(); } catch (Exception e) { System.out.println("服务器启动失败!"); e.printStackTrace(); } } /** * 主线程方法只处理客户端连接<BR> * 同时将连接客户端事件响应注册为读响应<BR> */ public void run() { while (true) { try { if (acceptSelector.select(100) > 0) { Set keys = acceptSelector.selectedKeys(); for (Iterator i = keys.iterator(); i.hasNext();) { SelectionKey key = (SelectionKey) i.next(); i.remove(); ServerSocketChannel readyChannel = (ServerSocketChannel) key .channel(); SocketChannel incomingChannel = readyChannel.accept(); System.out.println("客户端 [" + incomingChannel.socket() .getRemoteSocketAddress() + "] 与服务器建立连接."); incomingChannel.configureBlocking(false); incomingChannel.register(readSelector, SelectionKey.OP_READ, new StringBuffer()); } } } catch (Exception e) { e.printStackTrace(); } } } /** * 客户端数据读线程 * * @author http://www.daimami.com * */ private class ReadThread extends Thread { public void run() { int keysReady = -1; while (true) { try { /** * 检查客户数据事件,超时1秒 */ keysReady = readSelector.select(100); if (keysReady > 0) { Set readyKeys = readSelector.selectedKeys(); for (Iterator i = readyKeys.iterator(); i.hasNext();) { SelectionKey key = (SelectionKey) i.next(); SocketChannel incomingChannel = (SocketChannel) key .channel(); readData(incomingChannel); i.remove(); } } } catch (Exception e) { e.printStackTrace(); } } } private void readData(SocketChannel channel) { try { ByteBuffer readBuf = ByteBuffer.allocate(1024); if (channel.read(readBuf) > 0) { readBuf.flip(); System.out.println("收到客户端 [" + channel.socket().getRemoteSocketAddress() + "] 数据 [" + Common.decode(readBuf) + "]."); } } catch (Exception e) { try { channel.close(); System.out.println("关闭一个终端."); } catch (Exception ex) { } } } } /** * @param args */ public static void main(String[] args) { try { new NIOTCPTimeServer(8485).start(); } catch (Exception e) { } } } package com.tz.simple.tcp; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import com.tz.uitl.Common; /** * 非阻塞式TCP时间服务器<BR> * 同并发接收服务器数据<BR> * * @author http://www.daimami.com * */ public class TCPTimeClient extends Thread { private int serverPort = 8485; private SocketChannel client = null; private Selector readSelector = null; public TCPTimeClient(int p) throws IOException { serverPort = p; readSelector = Selector.open(); client = SocketChannel.open(); client.connect(new InetSocketAddress("192.168.100.38", serverPort)); client.configureBlocking(false); client.register(readSelector, SelectionKey.OP_READ); System.out.println("客户端 [" + client.socket().getLocalSocketAddress() + "] 启动, 连接服务器 [" + client.socket().getRemoteSocketAddress() + "]"); } public void run() { while (true) { try { int keysReady = readSelector.select(100); if (keysReady > 0) { Set readyKeys = readSelector.selectedKeys(); for (Iterator i = readyKeys.iterator(); i.hasNext();) { i.next(); i.remove(); int nbytes = -1; ByteBuffer byteBuffer = ByteBuffer.allocate(1024); nbytes = client.read(byteBuffer); if (nbytes > 0) { byteBuffer.flip(); System.out.println("收到服务器传来数据 " + Common.decode(byteBuffer)); } } } client.write(ByteBuffer.wrap("abcd".getBytes())); } catch (Exception e) { e.printStackTrace(); } finally { } } } /** * @param args */ public static void main(String[] args) { try { new TCPTimeClient(8485).start(); } catch (Exception e) { e.printStackTrace(); } } } ** * 将数组转成字符串 在调试或记录日志时用到 * * @param array * @return */ public static String byte2string(byte[] array) { StringBuilder sb = new StringBuilder(); sb.append("Length " + array.length + " Content "); for (int i = 0; i < leng; i++) { sb = sb.append(String.format("%02X", array[i])).append(":"); } int ind = sb.lastIndexOf(":"); sb.delete(ind, ind + 1); return sb.toString(); } /** * 对字节流进行GBK解码 * * @param byteBuffer * @return */ public static String decode(ByteBuffer byteBuffer) { Charset charset = Charset.forName("ISO-8859-1"); CharsetDecoder decoder = charset.newDecoder(); try { CharBuffer charBuffer = decoder.decode(byteBuffer); return new String(charBuffer.toString().getBytes("ISO8859_1"), "GBK").trim(); } catch (Exception e) { return null; } }