目录
poll
函数原型poll
poll使用基本流程
代码
该代码存在的问题:
EMFILE处理 (太多的文件)
epoll
epoll_create
epoll_ctl
epoll_wait
epoll LT 基本流程
代码
epoll ET 基本流程
LT/ET触发条件
select/poll/epoll对比
原理
一个进程所能打开的最大连接数
FD剧增后带来的IO效率问题
消息传递方式
poll
函数原型poll
函数原形
int poll(struct pollfd fd[], nfds_t nfds, int timeout);
函数功能
把当前的文件指针挂到等待队列 (多路检测可用套接字)
所属头文件
#include <poll.h>
返回值
成功时,poll()返回结构体中revents域不为0的文件描述符个数; 如果在超时前没有任何事件发生,poll()返回0;失败时,poll()返回-1,并设置errno为下列值之一:EBADF:一个或多个结构体中指定的文件描述符无效。EFAULT:fds指针指向的地址超出进程的地址空间。EINTR:请求的事件之前产生一个信号,调用可以重新发起。EINVAL:nfds参数超出PLIMIT_NOFILE值。ENOMEM:可用内存不足,无法完成请求。
参数说明
struct pollfd的结构如下:struct pollfd{int fd; // 文件描述符short event;// 请求的事件short revent;// 返回的事件} 每个pollfd结构体指定了一个被监视的文件描述符。每个结构体的events是监视该文件描述符的事件掩码,由用户来设置。 revents是文件描述符的操作结果事件,内核在调用返回时设置。 events中请求的任何事件都可能在revents中返回。第一个参数是一个数组,即poll函数可以监视多个文件描述符。 第二个参数nfds:要监视的描述符的数目。 第三个参数timeout: 指定等待的毫秒数,无论I/O是否准备好,poll都会返回。timeout指定为负数值表示无限超时;timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。这种情况下,poll()就像它的名字那样,一旦选举出来,立即返回。
poll使用基本流程
刚开始加入监听套接字
如果监听套接字的可读事件POLLIN到来,那么我们就调用accept()函数 返回一个已连接的套接字,得到一个新的文件描述符connfd
关注connfd的POLLIN事件
下一次poll()时候 我们就有两个文件描述符(listenfd,connfd)
有可能这两个文件描述符都产生了事件
如果是监听套接字,还是做上面的事情
如果是已连接的套接字产生的事件,我们就遍历连接套接字集合
看一下那些产生了可读(POLLIN)事件 产生我们就处理响应
代码
signal(SIGPIPE, SIG_IGN); //忽略SIGPIPE信号
SIGPIPE信号产生:
如果客户端关闭套接字 close
服务器端:调用了一次write,服务器会接收一个RST segment(TCP传输层接收到的)
如果服务器端再次调用了write,这个时候就会产生SIGPIPE信号 (该信号默认处理方式是退出这个进程)
TIME_WAIT状态 对大并发服务器的影响
应尽可能在服务器端避免出现TIME_WAIT状态
如果服务器端主动断开连接close,服务器就会进入TIME_WAIT
如何避免?
协议涉及上,应该让客户端主动断开连接,这样就会把TIME_WAIT状态分散到大量的客户端
如果客户端不活跃了,一些恶意客户端不断开连接,这样就会占用服务器端的连接资源
服务器端也要有个机制来踢掉不活跃的close
SOCK_NONBLOCK (非阻塞)+ I/O复用
SOCK_CLOEXEC: 进程被替换的时候 ,文件描述符处于关闭状态
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <sys/wait.h>
#include <poll.h>#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>#include <vector>
#include <iostream>#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while(0)typedef std::vector<struct pollfd> PollFdList;int main(void)
{signal(SIGPIPE, SIG_IGN);signal(SIGCHLD, SIG_IGN); //避免僵死进程int listenfd;//监听套接字 //非阻塞if ((listenfd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP)) < 0)ERR_EXIT("socket");//填充地址相关struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_port = htons(5188);servaddr.sin_addr.s_addr = htonl(INADDR_ANY);int on = 1;//设置地址的重复利用if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)ERR_EXIT("setsockopt");//绑定if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)ERR_EXIT("bind");//监听if (listen(listenfd, SOMAXCONN) < 0)ERR_EXIT("listen");//========================poll的使用=======================//struct pollfd pfd;pfd.fd = listenfd;pfd.events = POLLIN; //关注POLLIN(读)事件PollFdList pollfds; //pollfd队列pollfds.push_back(pfd);int nready; //待处理的事件数struct sockaddr_in peeraddr;socklen_t peerlen;int connfd;while (1){nready = poll(&*pollfds.begin(), pollfds.size(), -1); //无限超时等待if (nready == -1){if (errno == EINTR)continue;ERR_EXIT("poll");}if (nready == 0) // nothing happendedcontinue;if (pollfds[0].revents & POLLIN) //判断是否有POLLIN事件到来{peerlen = sizeof(peeraddr);connfd = accept4(listenfd, (struct sockaddr*)&peeraddr,&peerlen, SOCK_NONBLOCK | SOCK_CLOEXEC); //非阻塞的 CLOEXEC标记的if (connfd == -1)ERR_EXIT("accept4");{//把得到的已连接的套接字加入监听队列pfd.fd = connfd;pfd.events = POLLIN;pfd.revents = 0;pollfds.push_back(pfd);--nready;//连接成功std::cout<<"ip="<<inet_ntoa(peeraddr.sin_addr)<<" port="<<ntohs(peeraddr.sin_port)<<std::endl;//说明事件都处理完了 if (nready == 0) continue;}//遍历已连接字套接字子集for (PollFdList::iterator it=pollfds.begin()+1; //第一个套接字总是监听套接字it != pollfds.end() && nready >0; ++it){if (it->revents & POLLIN) //具有可读事件{--nready; //处理一个事件,待处理事件数减一connfd = it->fd;char buf[1024] = {0};int ret = read(connfd, buf, 1024);if (ret == -1)ERR_EXIT("read");if (ret == 0) //对方关闭了套接字{std::cout<<"client close"<<std::endl;it = pollfds.erase(it); //移除--it;close(connfd);continue;}std::cout<<buf;write(connfd, buf, strlen(buf));}}}return 0;
}
该代码存在的问题:
接收这边:
read 可能并没有把connfd所对应的接收缓冲区的数据都读完,那么connfd仍然是处于活跃状态
如果应用层发过来数据包刚好分包了
一个数据包,两次read
我们应该将读到的数据保存在connfd的应用层缓冲区
(我们应该对每一个已连接的套接字分配一个应用层缓存区缓冲区,读的时候把读的数据追加到缓冲区的末尾)
如何解析协议: 让协议的解析去应用层缓冲区读数据
write 对请求进行应答,假设我们应答量比较大 (假设10000字节,write只发送了1000字节)
发送缓冲区可能满了,connfd是一个非阻塞套接字,write调用可能并不能把所有的数据都发送
我们应该有一个应用层发送缓冲区
POLLOUT事件(可写事件)触发条件:connfd的发送缓冲区(内核)不满的时候(可以容纳事件)
我们不能一接收到connfd时候,就关注他的POLLOUT事件,
(如果一开始就关注POLLOUT事件,刚开始时候这个发送缓冲区是没有数据的,不满的,但是有没有要发送的数据,这个时候会一直触发这个POLLOUT事件,就出现busy-loop) 忙等待
所以我们应该在有未发完数据情况下(内核发送缓冲区满)关注这个POLLOUT事件
随着内核将数据发送出去,内核发送缓冲区的数据就会被移除,那么就会腾出空间接收更多的数据,这个时候POLLOUT事件发生,这个时候,我们遍历下来,检测到事件,遍历已连接套接字事情的时候,就有可能遍历到POLLOUT事件到来
POLL其实是LT触发模式(水平) epoll的LT模式跟POLL是完全一样的
EMFILE处理 (太多的文件)
accept(2)返回EMFILE的处理
调高进程文件描述符数目(治标不治本)
死等(效率低)
退出程序
关闭监听套接字。那什么时候重新打开呢?
如果是epoll模型,可以改用edge trigger。问题是如果漏掉了一次accept(2),程序再也不会收到新连接。
准备一个空闲的文件描述符。遇到这种情况,先关闭这个空闲文件,获得一个文件描述符名额;再accept(2)拿到socket连接的文件描述符;随后立刻close(2),这样就优雅地断开了与客户端的连接;最后重新打开空闲文件,把“坑”填上,以备再次出现这种情况时使用
epoll
epoll_create
函数原形
int epoll_create(int size); int epoll_create1(int flags);
函数功能
创建一个epoll的句柄
所属头文件
#include <sys/epoll.h>
返回值
调用成功时返回一个epoll句柄描述符,失败时返回-1。
参数说明
size 表明内核要监听的描述符数量 自从Linux 2.6.8开始,size参数被忽略,但是依然要大于0。flags: 0: 如果这个参数是0,这个函数等价于poll_create(0) EPOLL_CLOEXEC:这是这个参数唯一的有效值,如果这个参数设置为这个。那么当进程替换映像的时候会关闭这个文件描述符,这样新的映像中就无法对这个文件描述符操作,适用于多进程编程+映像替换的环境里
epoll_ctl
函数原形
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数功能
操作一个多路复用的文件描述符
所属头文件
#include <sys/epoll.h>
返回值
success:0 error:-1 errno被设置
参数说明
epfd:epoll_create1的返回值op:要执行的命令 EPOLL_CTL_ADD:向多路复用实例加入一个连接socket的文件描述符 EPOLL_CTL_MOD:改变多路复用实例中的一个socket的文件描述符的触发事件 EPOLL_CTL_DEL:移除多路复用实例中的一个socket的文件描述符fd:要操作的socket的文件描述符event: typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64;} epoll_data_t;struct epoll_event {uint32_t events; /* Epoll events */epoll_data_t data; /* User data variable */ }; events可以是下列命令的任意按位与 EPOLLIN: 对应的文件描述有可以读取的内容 EPOLLOUT:对应的文件描述符有可以写入 EPOLLRDHUP:写到一半的时候连接断开 EPOLLPRI:发生异常情况,比如所tcp连接中收到了带外消息 EPOLLET: 设置多路复用实例的文件描述符的事件触发机制为边沿触发,默认为水平触发 1、当多路复用的实例中注册了一个管道,并且设置了触发事件EPOLLIN, 2、管道对端的写入2kb的数据, 3、epoll_wait收到了一个可读事件,并向上层抛出,这个文件描述符 4、调用者调用read读取了1kb的数据, 5、再次调用epoll_wait边沿触发:上面的调用结束后,在输入缓存区中还有1kb的数据没有读取,但是epoll_wait将不会再抛出文件描述符。这就导致接受数据不全,对端得不到回应,可能会阻塞或者自己关闭 因为边沿触发的模式下,只有改变多路复用实例中某个文件描述符的状态,才会抛出事件。 相当于,边沿触发方式,内核只会在第一次通知调用者,不管对这个文件描述符做了怎么样的操作水平触发: 只要文件描述符处于可操作状态,每次调用epoll_wait,内核都会通知你EPOLLONESHOT:epoll_wait只会对该文件描述符第一个到达的事件有反应,之后的其他事件都不向调用者抛出。需要调用epoll_ctl函数,对它的事件掩码重新设置 EPOLLWAKEUP EPOLLEXCLUSIVE
epoll_wait
函数原形
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
函数功能
等待一个epoll队列中的文件描述符的I/O事件发生
所属头文件
#include <sys/epoll.h>
返回值
>=0,表示准备就绪的文件描述符个数 -1:出错,errno被设置
参数说明
epfd:目标epoll队列的描述符 events:用于放置epoll队列中准备就绪(被触发)的事件 maxevents:最大事件? timeout:指定函数等待的时间。这个函数阻塞这么长一段时间之后接触阻塞。
epoll LT 基本流程
poll模型:
每次调用poll函数的时候,都需要把监听套接字和已连接套接字所感兴趣的事件的数组拷贝到内核空间(数据拷贝)
epoll模型:
不需要进行拷贝,他只要关注过一次就可以了,只要你关注的事件没有发生改变,你也就不需要对他进行操作,他已经在内核当中有数据维护了,通过epollfd来管理了,就不需要再传递了
代码
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <sys/epoll.h>#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>#include <vector>
#include <algorithm>
#include <iostream>typedef std::vector<struct epoll_event> EventList;#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while(0)int main(void)
{signal(SIGPIPE, SIG_IGN);signal(SIGCHLD, SIG_IGN);int idlefd = open("/dev/null", O_RDONLY | O_CLOEXEC);int listenfd;//if ((listenfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)if ((listenfd = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP)) < 0)ERR_EXIT("socket");struct sockaddr_in servaddr;memset(&servaddr, 0, sizeof(servaddr));servaddr.sin_family = AF_INET;servaddr.sin_port = htons(5188);servaddr.sin_addr.s_addr = htonl(INADDR_ANY);int on = 1;if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)ERR_EXIT("setsockopt");if (bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)ERR_EXIT("bind");if (listen(listenfd, SOMAXCONN) < 0)ERR_EXIT("listen");//===========================================epoll用法==============================================//std::vector<int> clients;int epollfd;epollfd = epoll_create1(EPOLL_CLOEXEC);struct epoll_event event;event.data.fd = listenfd; //加入监听套接字event.events = EPOLLIN //关注它的可读事件,默认的触发模式是LT模式 /* | EPOLLET*/;//添加关注事件和监听套接字到epollfdepoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event);EventList events(16); //事件列表struct sockaddr_in peeraddr;socklen_t peerlen;int connfd;int nready;while (1){nready = epoll_wait(epollfd, &*events.begin(), static_cast<int>(events.size()), -1); //-1设置为超时等待if (nready == -1){if (errno == EINTR)continue;ERR_EXIT("epoll_wait");}//无事件发生if (nready == 0) // nothing happendedcontinue;//事件列表满了 倍增if ((size_t)nready == events.size())events.resize(events.size()*2);//统一处理事件(监听和已连接)for (int i = 0; i < nready; ++i){//处理监听套接字if (events[i].data.fd == listenfd){peerlen = sizeof(peeraddr);connfd = ::accept4(listenfd, (struct sockaddr*)&peeraddr,&peerlen, SOCK_NONBLOCK | SOCK_CLOEXEC); //非阻塞 closeexec//错误处理if (connfd == -1){if (errno == EMFILE){close(idlefd);idlefd = accept(listenfd, NULL, NULL);close(idlefd);idlefd = open("/dev/null", O_RDONLY | O_CLOEXEC);continue;}elseERR_EXIT("accept4");}std::cout<<"ip="<<inet_ntoa(peeraddr.sin_addr)<<" port="<<ntohs(peeraddr.sin_port)<<std::endl;clients.push_back(connfd);//将该文件描述符加入关注 event.data.fd = connfd;event.events = EPOLLIN; //电平触发 /* | EPOLLET*/epoll_ctl(epollfd, EPOLL_CTL_ADD, connfd, &event);}//处理已连接套接字(都是活跃的套接字)else if (events[i].events & EPOLLIN){connfd = events[i].data.fd;if (connfd < 0)continue;char buf[1024] = {0};int ret = read(connfd, buf, 1024);if (ret == -1)ERR_EXIT("read");if (ret == 0) //对方关闭{std::cout<<"client close"<<std::endl;close(connfd);event = events[i];epoll_ctl(epollfd, EPOLL_CTL_DEL, connfd, &event);clients.erase(std::remove(clients.begin(), clients.end(), connfd), clients.end());continue;}std::cout<<buf;write(connfd, buf, strlen(buf));}}}return 0;
}
epoll ET 基本流程
内核发送缓冲区: 能发送(高电平)(缓冲区满) 不能发送(低电平)(缓冲区空) 高---->低
内核接受缓冲区: 能接收(低电平)(缓冲区空) 不能接收(高电平)(缓冲区满) 低---->高
边缘触发: 从高到低 或者 从低到高
对于EPOLLOUT:
缓冲区数据满可以理解成低电平状态(不可发送数据) -------> 高电平(可发送数据) 触发
所以我们一开就关注EPOLLOUT事件 不会出现busy-loop
对于EPOLLIN
然后处理已连接套接字事件,已连接套接字产生了可读事件
confd 接收缓冲区从无到有 (从低电平--->高电平) 因而触发了EPOLLIN事件
如果读完所以数据 就会产生EAGAIN事件
为什么一定要读到EAGAIN为止呢?
因为我们读的数据接收缓冲区中有1000个字节,我们只读了10个字节,剩下990个字节,这个时候他还处于高电平状态,今后,不管对方发了多少数据过来,他都不会再触发了,因为只有产生边缘,才会触发(一直处于高电平状态,不会触发)
发送的时候,如果没有发送完,我们就把未发送完的数据添加到应用层的发送缓冲区当中,这个时候内核的发送缓冲区满了,不可以发送数据了(低电平状态)一旦对方接受了数据,内核发送缓冲区就空出了位置,就变成了可发送的状态(高电平状态) EPOLLOUT事件触发 ,epoll_wait捕获到,处理EPOLLOUT事件,这个时候我们就把刚才未发送完的数据从应用层发送缓冲区取出发送,直到应用层缓冲区发送完(从应用层缓冲区拷贝到内核发送缓冲区) 或者 内核缓冲区不够大,没有办法全部发送过去,就会产生一个EAGAIN的错误
(如果不能发送完,一定要发送到EAGAIN) 为什么?
假设发送200字节 内核只能一次发100字节 ,此时只发送了50字节到内核,并没有到EAGAIN状态,那么内核发送缓冲区将一直处于高电平状态。就不会再次触发
LT/ET触发条件
LT 电平触发
EPOLLIN事件
内核中的socket接收缓冲区为空 ,低电平-->不触发
内核中的socket接收缓冲区不为空 ,高电平 -->触发
EPOLLOUT事件
内核中的socket发送缓冲区不满 , 高电平 -->触发
内核中的socket发送缓冲区满, 低电平-->不触发
ET 边缘触发
低到高->触发 或者 高到低->触发
select/poll/epoll对比
原理
原理
select
select本质上是通过设置或者检查存放fd标志位的数据结构来进行下一步处理。这样所带来的缺点是:
1 单个进程可监视的fd数量被限制
2 需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大
3 对socket进行扫描时是线性扫描
poll
poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。
它没有最大连接数的限制,原因是它是基于链表来存储的,但是同样有一个缺点:
大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。
epoll
在前面说到的复制问题上,epoll使用mmap减少复制开销。
还有一个特点是,epoll使用“事件”的就绪通知方式,通过epoll_ctl注册fd,一旦该fd就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知
一个进程所能打开的最大连接数
一个进程所能打开的最大连接数
select
单个进程所能打开的最大连接数有FD_SETSIZE宏定义,其大小是32个整数的大小(在32位的机器上,大小就是32*32,同理64位机器上FD_SETSIZE为32*64),当然我们可以对进行修改,然后重新编译内核,但是性能可能会受到影响,这需要进一步的测试。
poll
poll本质上和select没有区别,但是它没有最大连接数的限制,原因是它是基于链表来存储的
epoll
没有上限,但是很大,1G内存的机器上可以打开10万左右的连接,2G内存的机器可以打开20万左右的连接
FD剧增后带来的IO效率问题
FD剧增后带来的IO效率问题
select
因为每次调用时都会对连接进行线性遍历,所以随着FD的增加会造成遍历速度慢的“线性下降性能问题”。
poll
同上
epoll
因为epoll内核中实现是根据每个fd上的callback函数来实现的,只有活跃的socket才会主动调用callback,所以在活跃socket较少的情况下,使用epoll没有前面两者的线性下降的性能问题,但是所有socket都很活跃的情况下,可能会有性能问题。
消息传递方式
消息传递方式
select
内核需要将消息传递到用户空间,都需要内核拷贝动作
poll
同上
epoll
epoll通过内核和用户空间共享一块内存来实现的。