Socket编程深入理解:OS->BIO->NIO->Netty
本文假设读者具有一定的socket编程基础并且已经了解并会使用基本的socket编程。本文旨在从我的角度梳理从BIO到NIO再到多路复用器等的发展过程。有兴趣的同学强烈推荐这个视频教程:https://link.zhihu.com/?target=https%3A//links.jianshu.com/go%3Fto%3Dhttps%253A%252F%252Fwww.bilibili.com%252Fvideo%252FBV1RV411r72B%253Fp%253D1
首先从操作系统讲起:
一、操作系统基本知识
上图是一个基本的计算机基本工作图,注意下列几点:
- 计算机主要有处理器和内存两个部分,对于内存空间,计算机将它分为了内核空间和用户空间。内核空间是计算机指定的一块空间,这个空间只能由操作系统访问,用户程序是无法访问的,同时CPU中的一些特定的指令和寄存器也是只有操作系统可以访问的,这是第一点。
- 一个CPU同一时间只能执行一个程序,要么操作系统、要么App进程。计算机通过时钟中断来达到计算机完成操作系统和各个用户程序间的切换,例如当前CPU正在执行APP1,当下一个中断到来后CPU会先将App1在CPU中的寄存器值写回App1内存空间,同时清空缓存,然后执行操作系统之前注册的回调函数进行进程调度,然后开始执行下一个App进程。如果内存中进程或线程非常多的话,那么这个分时时间片就非常小,系统就会进行这种反复的进程切换或线程切换,消耗资源。因此我们希望进程和线程尽可能少。
- 内核程序即操作系统,不仅负责了用户程序的调度,同时也负责了与外部设备的交互,如网卡、鼠标等。如果用户程序需要与外设交互,则需要调用内核程序的功能,这叫做系统调用,和函数调用是不同的。系统调用会使CPU执行内核程序,用户程序使通过系统的软中断实现的,在中断中执行内核的程序,因此也需要CPU保护用户程序的现场数据,消耗资源。这表明App每触发一次系统调用,就会进行一次现场数据保护。
- 值得注意的一点是:在linux系统中这个内核程序就对应了Linux系统,Linux系统并没用直接采用机器指令实现,而是通过C语言进行编程实现的。因此用户程序的系统调用实际调用的就是Linux中对应的C语言程序。
二、BIO编程
BIO又称Block IO,即阻塞IO编程。在jdk1.4之前JVM虚拟机只提供了BIO这种网络实现。
在jdk中,BIO的实现主要通过ServerSocket和Socket来实现的,以服务端ServerSocket为例:
- 首先创建该对象,并且绑定监听端口, ServerSocket ss=new ServerSocket(9999); 对应系统调用:socket(…)=3;listen(3,50)
- 然后最后开始等待客户端连接 Socket s = ss.accept(); 对应系统调用:accept(3, 此时注意,由于还没有客户端连接,因此系统调用方法会一直等待在这个地方,导致了阻塞,因此BIO是一种阻塞的IO实现。
- 阻塞不仅体现在等待连接,同时如果连接上了系统通过 InputStream is = s.getInputStream();来读取二进制数据也是堵塞的,因为对应系统调用 recv(),该函数在没有数据过来是也会一直堵塞在这里直到有数据进来。
对于客户端而言,从Socket连接中读取服务器数据也是一种阻塞方法。
优点:
- 显然,操作简单,理解容易。
缺点:
- 首先由于BIO是阻塞的,因此对于服务器每建立一个连接就必须新开一个线程,这样当并发高的时候线程过多,就导致CPU大量的时间用在了线程切换上,浪费资源。
- 效率低下,当客户端连接后没有发送数据的话,服务器的每个线程都会陷入一种空等待状态,占用整个进程资源。
三、NIO编程
基于上述BIO的确定,jdk在1.4之后引入了NIO编程,又叫New IO,即新的IO,这种IO提供了非阻塞和阻塞两种方式作为选择,一般采用非阻塞方式。注意,NIO的非阻塞其实产生的系统调用和BIO是一样的,但是在系统函数socket创建套接字文件描述符fd时存在一个参数供以选择,叫做sock_NONBLOCK,即在这个地方就可以选择该Socket调用是非阻塞还是阻塞的。因此,NIO在jdk层面体现为New IO,因为它是阻塞或非阻塞的,在操作系统的层面可以体现为非阻塞IO。
NIO特点如下:
-
与BIO不同,NIO不采用byte数组作为数据传输渠道,而是采用一种新的容器Buffer(缓冲区),同时两个核心API也分别变成了SocketChannel和ServerSoketChannel,操作上其实和之前的BIO很相似
-
相较于BIO,NIO非阻塞变成的最根本不同在于,为操作系统socket fd指定为非阻塞后,其accept方法和recv方法如果没能立刻建立连接或接收到数据它不会阻塞,而是返回一个-1或null,因此用户程序不需要阻塞在哪儿,而是可以干自己的事情,然后时不时来确定有没有连接或数据即可。在Java中通过channel.configureBlocking(false);指定是否阻塞。
-
一个正常的NIO程序的服务器端编程流程如下:
- 1、创建ServerSoketChannel对象,指定端口号和非阻塞属性
- 2、建立while循环,在循环中首先通过系统调用accpet,如果返回非空,那么说明存在新的连接,获得客户端连接通道socketChannel,加入到通道集合中。
- 3、对所有的客户端通道集合进行读取遍历,如果有数据则进行处理。
-
从上诉可以看到,NIO是一种同步非阻塞的方法,同步和非同步的概念首次出现在多线程的sync中,在并发场景中,由于多个线程需要操作同一个共享资源,可能就会出现错误,因此每个线程都要相互之间获得对方的信息,借助该资源的锁机制让每个线程可以在使用该资源时先判断是否有其它线程正在使用该资源,达到串行化同步的效果,总结起来就是每个线程取用该资源时先判断该资源是否可用,可以用我在用。这也完美体现在NIO中,对于accept,我先判断是否有连接可用,可用我才用,不能我就等待着或者先去干其它事情,对应了BIO和NIO。因此BIO和NIO都是一种同步方法。
优点:
- 非阻塞可以使用一个线程处理多个连接的消息,可以解决线程过多的问题。
- 非阻塞不需要阻塞等待,白白消耗CPU资源,效率更高。
缺点:
- 加入当前存在了10000个客户端连接,并且只有3个连接发送消息到服务器,但是NIO程序还是需要产生10000次recv的系统调用来进行遍历,也需要产生10000次线程保护,这样是及其消耗CPU资源的事情。
- 解决办法:一次性将10000个客户端连接通道的fd给到内核,让内核自身进行遍历来返回结果。
四、NIO+多路复用器编程
针对单独使用NIO需要反复系统调用recv的缺点,采用的解决方案就是采用系统调用select一次性把所有fd丢给内核去调用,从而实现多路客户端连接服用同一次系统调用,因此叫做多路复用器。对于多路服用器,linux操作系统提供了三种解决方案,分别是select、poll 和epoll,其中select和poll基本是一类,只是select有限制一次遍历的大小而poll没有,但epoll是一种更好的实现方案,当前用的最多的也是epoll,很多框架都是优先选用epoll。
首先介绍select:
-
在JDK层面,JDK封装提供了一个类叫做selector,作为监视器对象,以服务端为例,工作流程如下:
- 服务端先创建channel通道,绑定端口,然后设置非组塞。 channel = ServerSocketChannel.open();
- 然后创建selector选择器对象,在操作系统中对应了多路复用器对象select。 selector = Selector.open();
- 然后channel需要不断的监听端口,接收客户端连接,这时channel在selector中注册自己需要监听的事件,通过语句channel.register(selector, SelectionKey.OP_ACCEPT);
- 然后建立大循环,在循环中首先通过selector.select方法检测是否出现了对应的事件,如果出现了连接事件,则通过selector的selectionKeys获得对应的客户端通道,然后为该通道继续注册读监听事件,socket.register(selector,SelectionKey.OP_READ);监听每个通道的数据读取。
-
上述步骤的逻辑很简单,假如JVM选择了操作系统中的select实现的话,当服务器channel注册监听连接事件或客户端通道注册监听读取事件时,JVM会开辟一片内存空间用于存储在多路复用器中组测的各种fd(socket+关注的事件)。然后当用户程序调用selector.select时会JVM会产生系统调用select将之前注册的所有fd丢给内核去遍历,然后如果某个fd出现事件,他会返回该fd给JVM。
-
注意该select方法其实是一种阻塞方法,如果没有事件的话会一直阻塞,所以一般设置一个超时时间。selector.select(2000)==0
-
最后需要注意的一点就是NIO+多路复用器其实也是一种同步的方法,因为复用器只会提供给你每个通道是否有连接或者可读的状态,还是需要自己去读取,即自己读取之前还是通过selector去判断是否可读的,可读在读,不可读就算了,因此是同步方法。
然后就是重点epoll:
首先在JDK的操作层面上两者的操作是没有区别的,但是在系统调用上是完全不同的。在上述select的解决方法中存在两个问题,
- 每次系统select调用都需要传输所有的注册过的通道文件描述符fd
- 每次内核都需要去完整的遍历所有的fds
因此epoll给出了另外一种解决方案,解决方法如下:
- 如果采用epoll,在通道通过selector组测事件监听时,会产生系统调用epoll_ctl(),该函数在内核中开辟一个内存空间1,将对应通道的fd存放到内核空间中,从而方便之后的内核遍历,不需要直接传输。
- 在内核中对于那些放进来的通道fds,如果出现了连接或数据读取等事件,内核会通过中断自动的将这些出现事件的fds转移到另一块内存空间2,因此当用户程序调用selector.select(2000)==0方法时,对应了epoll的epoll.wait(),该方法只需要在内存空间2中直接读取即可,基本实现了时间复杂度O(1),效率极高。
基于NIO+多路复用器的聊天程序
总结:其实对于多路复用器的操作就两三步:1、建立多路复用器selector 2、向selector注册事件 3、通过select遍历读取有事件的通道
服务器代码:
public class NioChatServer {
ServerSocketChannel channel;Selector selector;public void start() throws IOException {
//1、得到服务器套接字socket,// 调用内核函数得到系统给的socket文件描述符fd,即对象channel = ServerSocketChannel.open();//2、得到选择器对象用于中间监视,// 调用系统内核的select\poll\epoll函数,以epoll为例,调用epoll_create()函数为例,得到多路复用器对象,// 同时在内核中开辟一个空间用于存储在多路复用器中组测的各种fd(socket+关注的事件)selector = Selector.open(); //得到一个连接器//3、为socket绑定端口//调用内核函数bind绑定端口channel.bind(new InetSocketAddress(9999));//4、设置非阻塞//在内核函数中,accept提供了非阻塞的输入参数来指定,在这个地方配置了那么socket在调用accept方法的话就会调用非阻塞方式channel.configureBlocking(false);//5、为socket在多路复用器中注册对应事件// 如果采用epoll,在内核中对应了epoll_ctl()函数,会将socket的fd存放到内核空间中,方便内核遍历,// 如果用的select或者poll,JVM会自己开辟一块内存空间,然后把对应的 fd 放进去channel.register(selector, SelectionKey.OP_ACCEPT);//6、开始判断有没有事件到来while(true){
//如果虚拟机采用复用器 select。则调用select方法,并传入所有所有注册过的fds,返回有状态的fd//如果采用epoll,则直接调用epoll.wait()方法进行读取,这是一种阻塞方法,可以设置超时事件,返回待操作fd的数目if(selector.select(2000)==0){
//阻塞检测两秒System.out.println("服务器在干自己的事");}//7、得到所有事件的集合然后遍历//直接得到之前从内核中得到的所有fd,记得遍历完了要删除,不然虚拟机是不会帮忙删除的Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();if(selectionKey.isAcceptable()){
System.out.println("客户端连接事件..");System.out.println("获取客户端socket");//得到客户端socketSocketChannel socket = channel.accept();SocketAddress remoteAddress = socket.getRemoteAddress();String username = remoteAddress.toString().substring(1);System.out.println("客户端"+username+"建立连接");//设置为非阻塞模式socket.configureBlocking(false);//为客户端socket注册输入中断,后面一个参数是有输入时该事件会提供一个buffer附件System.out.println("为"+username+"注册读事件");socket.register(selector,SelectionKey.OP_READ);}if(selectionKey.isReadable()){
//获得客户端socketSocketChannel socket = (SocketChannel)selectionKey.channel();String username = socket.getRemoteAddress().toString().substring(1);//从socket取数据ByteBuffer buf = ByteBuffer.allocate(1024);socket.read(buf);String msg = new String(buf.array()).trim();System.out.println("客户端"+username+"发来:"+msg);if(msg.equals("bye")){
socket.close();iterator.remove();//删除该集合中的该事件continue;}broadCall(socket,msg);}//清除事件,防止重复处理iterator.remove();//删除该集合中的该事件}}}public void broadCall(SocketChannel socketChannel,String msg) throws IOException {
System.out.println("开始广播消息");//获得所有在该连接器的注册过的所有键Set<SelectionKey> keys = selector.keys();String username = socketChannel.getRemoteAddress().toString().substring(1);msg="客户端"+username+"发来:"+msg;for(SelectionKey key:keys){
SelectableChannel channel = key.channel();//得到对应的通道if(channel instanceof SocketChannel&&channel!=socketChannel){
SocketChannel socket = (SocketChannel) channel;socket.write(ByteBuffer.wrap(msg.getBytes()));//发送信息}}}public static void main(String[] args) {
try {
new NioChatServer().start();} catch (IOException e) {
e.printStackTrace();}}
}
客户端代码:
public class NioChatClient implements Runnable{
SocketChannel channel;public NioChatClient() throws IOException {
//非阻塞主要体现在建立连接的时候如果还未连接上是可以去做其它操作的,// 通过connect来开启一个连接操作,如果立马连接上了就返回true,如果没有连接上就返回false,然后通过finishConnect来不断检测此次连接的状态,直到连接上//这个过程是可以干很多其它事情的//1、建立通道channel = SocketChannel.open();//2、设置阻塞方式channel.configureBlocking(false);//3、提供服务端的IP和端口号InetSocketAddress inet = new InetSocketAddress("127.0.0.1",9999);//4、连接服务器端if(!channel.connect(inet)){
//开启一次连接,如果成功,立即返回//连接失败,一直尝试结束连接while(!channel.finishConnect()){
System.out.println("我在干其他事情");}}}public void sendMsg() throws IOException {
Scanner scanner = new Scanner(System.in);while(true){
String msg = scanner.nextLine();if(msg.equals("bye")) {
channel.write(ByteBuffer.wrap(msg.getBytes()));channel.close();return;}channel.write(ByteBuffer.wrap(msg.getBytes()));}}public void readMsg() throws IOException {
while(true){
ByteBuffer buf = ByteBuffer.allocate(1024);int cnt = channel.read(buf);if(cnt>0){
System.out.println(new String(buf.array()));}}}@Overridepublic void run() {
try {
this.sendMsg();} catch (IOException e) {
e.printStackTrace();}}public static void main(String[] args) throws IOException {
NioChatClient client = new NioChatClient();new Thread(client).start();client.readMsg();}
}
五、NETTY框架
Netty框架是一款基于NIO编程的java开源框架, Netty 提供异步的、 基于事件驱动的网络应用程序框架, 用以快速开发高性能、 高可靠性的网络 IO 程序。
在NIO+多路复用器的上述代码中,我们采用的是单线程模型,即在服务器端只通过一个线程来实现客户端的连接和数据的读写。
但是我们通常采用线程池模型,在这个模型中,我们采用一个线程单独来做客户端的添加工作(对于Netty的Boss线程池),然后采用多个线程来做数据的读写和处理工作(对于Netty的Worker线程池)。该模型的工作原理为:
- 先在Boss的线程中定义一个多路复用器,为Boss线程注册接收事件,将得到的客户端连接存放到一个公共数组中,数组大小为worker线程池的连接数,数组中存放着一个个队列,队列中存放的才是这些客户端通道。
- Worker中的每个线程首先先通过自己的selector检查是否有事件进来,处理完后再在公共数组对应索引检查队列中是否有还有通道得到注册,如果有,取出该channel注册到自己的selector中。这就对应了下图的监听IO、处理数据、处理任务队列。
经过这两步该模型就可以工作起来,并且所有客户端通道都会在每个线程间平均分配,Netty的基本工作原理就如上所示。Netty的模型如下图:
除了上述工作,Netty还做了一件事情就是将用户的处理程序从Socket底层完全剥离出来,在Netty中定义了一个名叫ChannelPipeline的事件通道,该通道的工作原理如下:
- 首先,对于每个NioEventLoop都有自己的ChannelPipeline通道
- 当NioEventLoop检测到IO事件后,会自动先读取网络数据,然后将数据打包成一个Object对象交给一个个ChannelHandler去执行就可,用户只需要编写ChannelHandler对象,然后对应不同的事件编写不同的方法,最后将该类的对象交给该管道即可。这就不需要用户自己去查询然后读取数据,从而Netty在使用上为用户提供了一种异步非阻塞的操作方法。
- 每个通道都独享了自己每个channelHandler用户类实例,这样就避免了多个线程操作同一个类对象的安全问题,因此用户的类的变量如果需要多个线程共享,需要定义为实例变量。
基于Netty的聊天程序
服务器代码
用户处理类
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static Set<SocketChannel> socketChannels = new HashSet<SocketChannel>();@Override//当发生客户端连接事件时触发public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();socketChannels.add(channel);//添加该socketSystem.out.println("用户"+channel.remoteAddress().toString().substring(1)+":建立连接");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();socketChannels.remove(channel);//删除该socketSystem.out.println("用户"+channel.remoteAddress().toString().substring(1)+":断开连接");}@Override//当有socket收到读取事件时会异步调用下列代码//值得注意的是该调用还会提供读取到内容的二进制数据,而不是想多路复用+NIO一样只提供状态,然后需要自己通过socketChannel.read去读取//因此其实Netty给使用者一种异步非阻塞操作的感觉public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();System.out.println("用户"+channel.remoteAddress().toString().substring(1)+":"+((ByteBuf)msg).toString(CharsetUtil.UTF_8));for(SocketChannel sc:socketChannels){
if(sc.equals(channel)) continue;sc.writeAndFlush((Unpooled.copiedBuffer("用户"+channel.remoteAddress().toString().substring(1)+":"+((ByteBuf)msg).toString(CharsetUtil.UTF_8),CharsetUtil.UTF_8)));}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();}
}
服务器类
public class NettyChatServer {
public static void main(String[] args) {
//创建一个负责建立连接并分配事件的Boss连接池EventLoopGroup boss = new NioEventLoopGroup(1);//创建一个负责读取并处理数据的worker连接池EventLoopGroup worker = new NioEventLoopGroup(3);//创建服务器启动助手并配置参数ServerBootstrap b = new ServerBootstrap();b.group(boss,worker)//添加两个线程池.channel(NioServerSocketChannel.class)//选择非阻塞socket模型最为服务器通道实现.option(ChannelOption.SO_BACKLOG,128)//设置监听队列的最大等待连接数.childHandler(new ChannelInitializer<SocketChannel>() {
//创建通道对象public void initChannel(SocketChannel sc){
//因为不管是什么事件,都是通过内核返回给JVM的socket fd来操作实现的sc.pipeline().addLast(new NettyServerHandler());//在通道中添加处理对象的实例,用于调用}});try {
ChannelFuture cf = b.bind(9999).sync();System.out.println("服务器已就绪");cf.channel().closeFuture().sync();boss.shutdownGracefully();worker.shutdownGracefully();} catch (InterruptedException e) {
e.printStackTrace();}}
}
客户端代码
用户处理类
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override//客户端连接上后给大家打了声招呼public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();ctx.writeAndFlush(Unpooled.copiedBuffer("hello大家好,我是"+channel.remoteAddress().toString().substring(1), CharsetUtil.UTF_8));}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(((ByteBuf)msg).toString(CharsetUtil.UTF_8));}
}
用户类
public class NettyChatClient {
public static void main(String[] args) {
//创建一个负责读取并处理数据的worker连接池EventLoopGroup worker = new NioEventLoopGroup(1);//创建客户端启动助手并配置参数try {
Bootstrap b = new Bootstrap();b.group(worker)//添加两个线程池.channel(NioSocketChannel.class)//选择非阻塞socket模型最为服务器通道实现.handler(new ChannelInitializer<SocketChannel>() {
//创建通道对象@Overridepublic void initChannel(SocketChannel sc){
//因为不管是什么事件,都是通过内核返回给JVM的socket fd来操作实现的sc.pipeline().addLast(new NettyClientHandler());//在通道中添加处理对象的实例,用于调用System.out.println("客户端已就绪");}});ChannelFuture cf = b.connect("127.0.0.1", 9999).sync();SocketChannel channel = (SocketChannel) cf.channel();Scanner scanner=new Scanner(System.in);while (scanner.hasNextLine()){
String msg=scanner.nextLine();System.out.println(msg);channel.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));}} catch (InterruptedException e) {
e.printStackTrace();}finally {
worker.shutdownGracefully();}}
}