阻塞I/O
??Linux下的默认socket都是阻塞文件描述符,那么我们写网络程序的所有调用函数,都是阻塞的这样性能十分之低。
??最主要的问题是,如果可能同时有多个连接到来的时候,阻塞I/O根本无法正常工作
/* This won't work. */
char buf[1024];
int i, n;
while (i_still_want_to_read()) {for (i=0; i<n_sockets; ++i) {n = recv(fd[i], buf, sizeof(buf), 0);if (n==0)handle_close(fd[i]);else if (n<0)handle_error(fd[i], errno);elsehandle_input(fd[i], buf, n);}
}
??在上面这个列子中,即使fd[2]的连接到来,我们也无法处理,直到 fd[0] ,fd[1]连接处理后,才能处理fd[2],所以阻塞在这种场景根本无法工作
??那么为了处理这个问题。现在有俩张方案,一个是单进程/线程的 多路复用来处理,一种是多线程/多进程程序来处理,每一个线程/进程处理一个连接。
多线程/多进程处理方案
??多进程和多线程在大多数平台下开销很大,无论申请还是上下文切换。所以我们可能有的时候使用线程池来解决,可线程池治标不治本,当同时并发数万个连接的时候,效率就太低了,因为真正工作的线程就是每个cpu上的线程。
非阻塞处理方案
int i, n;
char buf[1024];
for (i=0; i < n_sockets; ++i)fcntl(fd[i], F_SETFL, O_NONBLOCK);while (i_still_want_to_read()) {for (i=0; i < n_sockets; ++i) {n = recv(fd[i], buf, sizeof(buf), 0);if (n == 0) {handle_close(fd[i]);} else if (n < 0) {if (errno == EAGAIN); /* The kernel didn't have any data for us to read. */elsehandle_error(fd[i], errno);} else {handle_input(fd[i], buf, n);}}
}
??虽然是非阻塞,但是这个性能很差,有来个原因。
- 没有数据到达时,它不会不停的自旋轮询,浪费CPU时间片。
- 如果要处理多个连接的时候,无论是否有数据,每次recv的时候都会陷入系统调用,这个也是一种浪费。
??那么这样,我们就希望只有当 数据到达的时候,I/O条件就绪的时候I/O,那么就引来了多路复用。
多路复用select
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>
/* for select */
#include <sys/select.h>#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>#define MAX_LINE 16384char
rot13_char(char c)
{/* We don't want to use isalpha here; setting the locale would change* which characters are considered alphabetical. */if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}struct fd_state {char buffer[MAX_LINE];size_t buffer_used;int writing;size_t n_written;size_t write_upto;
};struct fd_state *
alloc_fd_state(void)
{struct fd_state *state = malloc(sizeof(struct fd_state));if (!state)return NULL;state->buffer_used = state->n_written = state->writing =state->write_upto = 0;return state;
}void
free_fd_state(struct fd_state *state)
{free(state);
}void
make_nonblocking(int fd)
{fcntl(fd, F_SETFL, O_NONBLOCK);
}int
do_read(int fd, struct fd_state *state)
{char buf[1024];int i;ssize_t result;while (1) {result = recv(fd, buf, sizeof(buf), 0);if (result <= 0)break;for (i=0; i < result; ++i) {if (state->buffer_used < sizeof(state->buffer))state->buffer[state->buffer_used++] = rot13_char(buf[i]);if (buf[i] == '\n') {state->writing = 1;state->write_upto = state->buffer_used;}}}if (result == 0) {return 1;} else if (result < 0) {if (errno == EAGAIN)return 0;return -1;}return 0;
}int
do_write(int fd, struct fd_state *state)
{while (state->n_written < state->write_upto) {ssize_t result = send(fd, state->buffer + state->n_written,state->write_upto - state->n_written, 0);if (result < 0) {if (errno == EAGAIN)return 0;return -1;}assert(result != 0);state->n_written += result;}if (state->n_written == state->buffer_used)state->n_written = state->write_upto = state->buffer_used = 0;state->writing = 0;return 0;
}void
run(void)
{int listener;struct fd_state *state[FD_SETSIZE];struct sockaddr_in sin;int i, maxfd;fd_set readset, writeset, exset;sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;sin.sin_port = htons(40713);for (i = 0; i < FD_SETSIZE; ++i)state[i] = NULL;listener = socket(AF_INET, SOCK_STREAM, 0);make_nonblocking(listener);#ifndef WIN32{int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));}
#endifif (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {perror("bind");return;}if (listen(listener, 16)<0) {perror("listen");return;}FD_ZERO(&readset);FD_ZERO(&writeset);FD_ZERO(&exset);while (1) {maxfd = listener;FD_ZERO(&readset);FD_ZERO(&writeset);FD_ZERO(&exset);FD_SET(listener, &readset);for (i=0; i < FD_SETSIZE; ++i) {if (state[i]) {if (i > maxfd)maxfd = i;FD_SET(i, &readset);if (state[i]->writing) {FD_SET(i, &writeset);}}}if (select(maxfd+1, &readset, &writeset, &exset, NULL) < 0) {perror("select");return;}if (FD_ISSET(listener, &readset)) {struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr*)&ss, &slen);if (fd < 0) {perror("accept");} else if (fd > FD_SETSIZE) {close(fd);} else {make_nonblocking(fd);state[fd] = alloc_fd_state();assert(state[fd]);/*XXX*/}}for (i=0; i < maxfd+1; ++i) {int r = 0;if (i == listener)continue;if (FD_ISSET(i, &readset)) {r = do_read(i, state[i]);}if (r == 0 && FD_ISSET(i, &writeset)) {r = do_write(i, state[i]);}if (r) {free_fd_state(state[i]);state[i] = NULL;close(i);}}}
}int
main(int c, char **v)
{setvbuf(stdout, NULL, _IONBF, 0);run();return 0;
}
??如上代码列子,虽然解决了无故的系统调用,但是还是没有解决轮询开销。如果只想处理少许的连接这个表现很好,但是当连接过多,select性能就会很差。
??用户层面的开销可能就是要遍历你自己往select中插入文件描述符的数组,而内核的开销却要遍历最大的文件描述符,这可能接近于整个 fd_set 的集合。可能你只往select注册了10个文件描述符,但是最大的文件描述符是4095,内核遍历的时候要遍历4095个文描。
??故不同平台下都有代替select的函数 , epoll , poll , kqueue 。所以 Libevent 跨平台很好,我们使用它就就可以使用这些替代函数了。
Libevent解决方案
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>#include <event2/event.h>#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>#define MAX_LINE 16384void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);char
rot13_char(char c)
{/* We don't want to use isalpha here; setting the locale would change* which characters are considered alphabetical. */if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}struct fd_state {char buffer[MAX_LINE];size_t buffer_used;size_t n_written;size_t write_upto;struct event *read_event;struct event *write_event;
};struct fd_state *
alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{struct fd_state *state = malloc(sizeof(struct fd_state));if (!state)return NULL;state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);if (!state->read_event) {free(state);return NULL;}state->write_event =event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);if (!state->write_event) {event_free(state->read_event);free(state);return NULL;}state->buffer_used = state->n_written = state->write_upto = 0;assert(state->write_event);return state;
}void
free_fd_state(struct fd_state *state)
{event_free(state->read_event);event_free(state->write_event);free(state);
}void
do_read(evutil_socket_t fd, short events, void *arg)
{struct fd_state *state = arg;char buf[1024];int i;ssize_t result;while (1) {assert(state->write_event);result = recv(fd, buf, sizeof(buf), 0);if (result <= 0)break;for (i=0; i < result; ++i) {if (state->buffer_used < sizeof(state->buffer))state->buffer[state->buffer_used++] = rot13_char(buf[i]);if (buf[i] == '\n') {assert(state->write_event);event_add(state->write_event, NULL);state->write_upto = state->buffer_used;}}}if (result == 0) {free_fd_state(state);} else if (result < 0) {if (errno == EAGAIN) // XXXX use evutil macroreturn;perror("recv");free_fd_state(state);}
}void
do_write(evutil_socket_t fd, short events, void *arg)
{struct fd_state *state = arg;while (state->n_written < state->write_upto) {ssize_t result = send(fd, state->buffer + state->n_written,state->write_upto - state->n_written, 0);if (result < 0) {if (errno == EAGAIN) // XXX use evutil macroreturn;free_fd_state(state);return;}assert(result != 0);state->n_written += result;}if (state->n_written == state->buffer_used)state->n_written = state->write_upto = state->buffer_used = 1;event_del(state->write_event);
}void
do_accept(evutil_socket_t listener, short event, void *arg)
{struct event_base *base = arg;struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr*)&ss, &slen);if (fd < 0) { // XXXX eagain??perror("accept");} else if (fd > FD_SETSIZE) {close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */} else {struct fd_state *state;evutil_make_socket_nonblocking(fd);state = alloc_fd_state(base, fd);assert(state); /*XXX err*/assert(state->write_event);event_add(state->read_event, NULL);}
}void
run(void)
{evutil_socket_t listener;struct sockaddr_in sin;struct event_base *base;struct event *listener_event;base = event_base_new();if (!base)return; /*XXXerr*/sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;sin.sin_port = htons(40713);listener = socket(AF_INET, SOCK_STREAM, 0);evutil_make_socket_nonblocking(listener);#ifndef WIN32{int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));}
#endifif (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {perror("bind");return;}if (listen(listener, 16)<0) {perror("listen");return;}listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept,(void*)base);event_add(listener_event, NULL);event_base_dispatch(base);
}int
main(int c, char **v)
{setvbuf(stdout, NULL, _IONBF, 0);run();return 0;
}
??接下来就有俩个问题。
- 我们在堆上申请内存,这样代码写起来看起来很麻烦使用buffer
- 如果是windows上 , IOCP 这种异步接口更快 比 libevent用的多路复用接口。
buffer
??Libevent自带buffer,我们可以使用这个,然代码看起来更简单。
/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>#include <event2/event.h>
#include <event2/buffer.h>
#include <event2/bufferevent.h>#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>#define MAX_LINE 16384void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);char
rot13_char(char c)
{/* We don't want to use isalpha here; setting the locale would change* which characters are considered alphabetical. */if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}void
readcb(struct bufferevent *bev, void *ctx)
{struct evbuffer *input, *output;char *line;size_t n;int i;input = bufferevent_get_input(bev);output = bufferevent_get_output(bev);while ((line = evbuffer_readln(input, &n, EVBUFFER_EOL_LF))) {for (i = 0; i < n; ++i)line[i] = rot13_char(line[i]);evbuffer_add(output, line, n);evbuffer_add(output, "\n", 1);free(line);}if (evbuffer_get_length(input) >= MAX_LINE) {/* Too long; just process what there is and go on so that the buffer* doesn't grow infinitely long. */char buf[1024];while (evbuffer_get_length(input)) {int n = evbuffer_remove(input, buf, sizeof(buf));for (i = 0; i < n; ++i)buf[i] = rot13_char(buf[i]);evbuffer_add(output, buf, n);}evbuffer_add(output, "\n", 1);}
}void
errorcb(struct bufferevent *bev, short error, void *ctx)
{if (error & BEV_EVENT_EOF) {/* connection has been closed, do any clean up here *//* ... */} else if (error & BEV_EVENT_ERROR) {/* check errno to see what error occurred *//* ... */} else if (error & BEV_EVENT_TIMEOUT) {/* must be a timeout event handle, handle it *//* ... */}bufferevent_free(bev);
}void
do_accept(evutil_socket_t listener, short event, void *arg)
{struct event_base *base = arg;struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr*)&ss, &slen);if (fd < 0) {perror("accept");} else if (fd > FD_SETSIZE) {close(fd);} else {struct bufferevent *bev;evutil_make_socket_nonblocking(fd);bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);bufferevent_enable(bev, EV_READ|EV_WRITE);}
}void
run(void)
{evutil_socket_t listener;struct sockaddr_in sin;struct event_base *base;struct event *listener_event;base = event_base_new();if (!base)return; /*XXXerr*/sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;sin.sin_port = htons(40713);listener = socket(AF_INET, SOCK_STREAM, 0);evutil_make_socket_nonblocking(listener);#ifndef WIN32{int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));}
#endifif (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0) {perror("bind");return;}if (listen(listener, 16)<0) {perror("listen");return;}listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);/*XXX check it */event_add(listener_event, NULL);event_base_dispatch(base);
}int
main(int c, char **v)
{setvbuf(stdout, NULL, _IONBF, 0);run();return 0;
}
总结
??看完介绍,感觉libevent就是对不同平台下的高效的多路复用接口的一个封装。平台移植性高,虽然可能性能不是最优的,比如在windows下。