  • 加载配置(命令行或者配置文件)

  • 启动TCP监听,客户端的列表保存在redisserver的clients中

  • 启动AE Event Loop事件,异步处理客户请求



void aeMain(aeEventLoop *eventLoop) {    eventLoop->stop = 0;    while (!eventLoop->stop) {        // 如果有需要在事件处理前执行的函数,那么运行它        if (eventLoop->beforesleep != NULL)            eventLoop->beforesleep(eventLoop);        aeProcessEvents(eventLoop, AE_ALL_EVENTS);   // 开始处理事件    }}

事件处理框架非常简单,从初始化、服务到结束,分别对应的函数:aeCreateEventLoop、aeMain、aeDeleteEventLoop 其中,aeMain是事件循环的主体函数,它又会调用 aeProcessEvents函数,三个主体函数会调用aeApiCreate、aeApiPool、aeApiFree三个接口函数进行处理。 这三个接口函数又会映射到具体的某一种网络模型中


void aeDeleteEventLoop(aeEventLoop *eventLoop) {    aeApiFree(eventLoop);    zfree(eventLoop->events);    zfree(eventLoop->fired);    zfree(eventLoop);}


aeEventLoop *aeCreateEventLoop(int setsize) {    aeEventLoop *eventLoop;    int i;     if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;    eventLoop->setsize = setsize;    eventLoop->lastTime = time(NULL);    eventLoop->timeEventHead = NULL;    eventLoop->timeEventNextId = 0;    eventLoop->stop = 0;    eventLoop->maxfd = -1;    eventLoop->beforesleep = NULL;    if (aeApiCreate(eventLoop) == -1) goto err;    /* Events with mask == AE_NONE are not set. So let's initialize the     * vector with it. */    for (i = 0; i < setsize; i++)        eventLoop->events[i].mask = AE_NONE;    return eventLoop; err:    if (eventLoop) {        zfree(eventLoop->events);        zfree(eventLoop->fired);        zfree(eventLoop);    }                                                                                                                                                                                            return NULL;}

处理事件的时候,aeMain函数调用aeProcessEvents函数,在一个循环中处理文件事件和到期的时间事件。 aeProcessEvents函数调用aeSearchNearestTimer函数来查询事件循环中最先要过期的事件,时间复杂度为O(N)。先处理文件事件,然后再处理时间事件。


int aeProcessEvents(aeEventLoop *eventLoop, int flags){        int processed = 0, numevents;         /* Nothing to do? return ASAP */    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;         /* Note that we want call select() even if there are no     * file events to process as long as we want to process time     * events, in order to sleep until the next time event is ready     * to fire. */    if (eventLoop->maxfd != -1 ||        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {        int j;        aeTimeEvent *shortest = NULL;        struct timeval tv, *tvp;            if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))            shortest = aeSearchNearestTimer(eventLoop);        if (shortest) {            long now_sec, now_ms;                 /* Calculate the time missing for the nearest             * timer to fire. */            aeGetTime(&now_sec, &now_ms);            tvp = &tv;            tvp->tv_sec = shortest->when_sec - now_sec;            if (shortest->when_ms < now_ms) {                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;                tvp->tv_sec --;            } else {                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;            }            if (tvp->tv_sec < 0) tvp->tv_sec = 0;            if (tvp->tv_usec < 0) tvp->tv_usec = 0;        } else {            /* If we have to check for events but need to return             * ASAP because of AE_DONT_WAIT we need to set the timeout             * to zero */            if (flags & AE_DONT_WAIT) {                tv.tv_sec = tv.tv_usec = 0;                tvp = &tv;            } else {                /* Otherwise we can block */                tvp = NULL; /* wait forever */            }        }             numevents = aeApiPoll(eventLoop, tvp);        for (j = 0; j < numevents; j++) {            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];            int mask = eventLoop->fired[j].mask;            int fd = eventLoop->fired[j].fd;            int rfired = 0;     |       /* note the fe->mask & mask & ... code: maybe an already processed             * event removed an element that fired and we still didn't             * processed, so we check if the event is still valid. */            if (fe->mask & mask & AE_READABLE) {                rfired = 1;                fe->rfileProc(eventLoop,fd,fe->clientData,mask);            }            if (fe->mask & mask & AE_WRITABLE) {                if (!rfired || fe->wfileProc != fe->rfileProc)                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);            }            processed++;        }    }    /* Check time events */    if (flags & AE_TIME_EVENTS)        processed += processTimeEvents(eventLoop);         return processed; /* return the number of processed file/time events */}




void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;    char cip[REDIS_IP_STR_LEN];    REDIS_NOTUSED(el);    REDIS_NOTUSED(mask);    REDIS_NOTUSED(privdata);     while(max--) {        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);        if (cfd == ANET_ERR) {            if (errno != EWOULDBLOCK)                redisLog(REDIS_WARNING,                    "Accepting client connection: %s", server.neterr);            return;        }            redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);        acceptCommonHandler(cfd,0);    }}

client输入get命令,redis server最终会调用getCommand函数,client输入set命令,redis最终会调用setCommand函数

redis执行完用户的一个命令后,会将结果写入到redisClient对象中的reply list中,而sendReplyToClient函数会不断的从该list中数据,异步地发送给client。需要注意的是,sendReplyToClient函数也是通过aeCreateFileEvent注册的


通过processInputBuffer()来解析querybuf, 若c->querybuf存在多条命令,则依次解析并处理这些命令


void processInputBuffer(redisClient *c) {    /* Keep processing while there is something in the input buffer */    while(sdslen(c->querybuf)) {        /* Return if clients are paused. */        if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;              /* Immediately abort if the client is in the middle of something. */        if (c->flags & REDIS_BLOCKED) return;             /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is         * written to the client. Make sure to not let the reply grow after         * this flag has been set (i.e. don't process more commands). */        if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;              /* Determine request type when unknown. */        if (!c->reqtype) {            if (c->querybuf[0] == '*') {                c->reqtype = REDIS_REQ_MULTIBULK;            } else {                c->reqtype = REDIS_REQ_INLINE;            }        }              if (c->reqtype == REDIS_REQ_INLINE) {            if (processInlineBuffer(c) != REDIS_OK) break;        } else if (c->reqtype == REDIS_REQ_MULTIBULK) {            if (processMultibulkBuffer(c) != REDIS_OK) break;        } else {            redisPanic("Unknown request type");        }              /* Multibulk processing could see a <= 0 length. */        if (c->argc == 0) {            resetClient(c);        } else {            /* Only reset the client when the command was executed. */            if (processCommand(c) == REDIS_OK)                resetClient(c);        }    } }
  • 如果是telnet发送的裸协议数据是没有*打头的表示参数个数的辅助信息,用processInlineBuffer()函数解析输入

  • 其他则通过processMultibulkBuffer()函数解析

  • 若解析函数返回REDIS_ERR,则等待下一次read(),是因为客户端缓存数据还没构成一条命令即不满足Redis协议格式;否则返回REDIS_OK, 处理命令


数据读取 readQueryFromClient

调用系统函数read来读取客户端传送过来的数据, 调用read后对读取过程中所遇到的情况:

  • 系统中断(nread == -1 && errno == EAGAIN)

  • 读取出错(nread == -1 && errno != EAGAIN) freeClient()

  • 客户端关闭(nread == 0) freeClient()

  • 超过读取数据限制(1GB)则报错。 读取完后进入processInputBuffer进行协议解析

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {    redisClient *c = (redisClient*) privdata;    int nread, readlen;    size_t qblen;    REDIS_NOTUSED(el);    REDIS_NOTUSED(mask);          server.current_client = c;    readlen = REDIS_IOBUF_LEN;    /* If this is a multi bulk request, and we are processing a bulk reply     * that is large enough, try to maximize the probability that the query     * buffer contains exactly the SDS string representing the object, even     * at the risk of requiring more read(2) calls. This way the function     * processMultiBulkBuffer() can avoid copying buffers to create the     * Redis Object representing the argument. */    if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1        && c->bulklen >= REDIS_MBULK_BIG_ARG)    {        int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);              if (remaining < readlen) readlen = remaining;    }           qblen = sdslen(c->querybuf);    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);    nread = read(fd, c->querybuf+qblen, readlen);    if (nread == -1) {        if (errno == EAGAIN) {            nread = 0;        } else {            redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));            freeClient(c);            return;        }    } else if (nread == 0) {        redisLog(REDIS_VERBOSE, "Client closed connection");        freeClient(c);        return;    }     if (nread) {        sdsIncrLen(c->querybuf,nread);        c->lastinteraction = server.unixtime;        if (c->flags & REDIS_MASTER) c->reploff += nread;        server.stat_net_input_bytes += nread;    } else {        server.current_client = NULL;        return;    }     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();              bytes = sdscatrepr(bytes,c->querybuf,64);        redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);        sdsfree(ci);        sdsfree(bytes);        freeClient(c);        return;    }     processInputBuffer(c);    server.current_client = NULL;}  




如果客户端传送的数据的第一个字符时‘*’,那么传送数据将被当做multibulk协议处理,否则将被当做inline协议处理。Inline协议的具体解析函数是processInlineBuffer(),multibulk协议的具体解析函数是processMultibulkBuffer()。 当协议解析完毕,即客户端传送的数据已经解析出命令字段和参数字段,接下来进行命令处理,命令处理函数是processCommand。

发送数据 sendReplyToClient

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {    redisClient *c = privdata;    int nwritten = 0, totwritten = 0, objlen;    size_t objmem;    robj *o;    REDIS_NOTUSED(el);    REDIS_NOTUSED(mask);     while(c->bufpos > 0 || listLength(c->reply)) {        if (c->bufpos > 0) {            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);            if (nwritten <= 0) break;            c->sentlen += nwritten;            totwritten += nwritten;             /* If the buffer was sent, set bufpos to zero to continue with             * the remainder of the reply. */            if (c->sentlen == c->bufpos) {                c->bufpos = 0;                c->sentlen = 0;            }        } else {            o = listNodeValue(listFirst(c->reply));            objlen = sdslen(o->ptr);            objmem = getStringObjectSdsUsedMemory(o);             if (objlen == 0) {                listDelNode(c->reply,listFirst(c->reply));                c->reply_bytes -= objmem;                continue;            }             nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);            if (nwritten <= 0) break;            c->sentlen += nwritten;            totwritten += nwritten;             /* If we fully sent the object on head go to the next one */            if (c->sentlen == objlen) {                listDelNode(c->reply,listFirst(c->reply));                c->sentlen = 0;                c->reply_bytes -= objmem;            }        }        /* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT         * bytes, in a single threaded server it's a good idea to serve         * other clients as well, even if a very large request comes from         * super fast link that is always able to accept data (in real world         * scenario think about 'KEYS *' against the loopback interface).         *         * However if we are over the maxmemory limit we ignore that and         * just deliver as much data as it is possible to deliver. */        server.stat_net_output_bytes += totwritten;        if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&            (server.maxmemory == 0 ||             zmalloc_used_memory() < server.maxmemory)) break;    }    if (nwritten == -1) {        if (errno == EAGAIN) {            nwritten = 0;        } else {            redisLog(REDIS_VERBOSE,                "Error writing to client: %s", strerror(errno));            freeClient(c);            return;        }    }    if (totwritten > 0) {        /* For clients representing masters we don't count sending data         * as an interaction, since we always send REPLCONF ACK commands         * that take some time to just fill the socket output buffer.         * We just rely on data / pings received for timeout detection. */        if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime;    }    if (c->bufpos == 0 && listLength(c->reply) == 0) {        c->sentlen = 0;        aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);         /* Close connection after entire reply has been sent. */        if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);    }}


  • 发送缓冲区(c->buf)的内容

  • 发送回复链表(c->reply)的内容


  • 被系统中断(nwritten == -1 && errno == EAGAIN)

  • 写数据出错(nwritten == -1 && errno != EAGAIN),释放客户端freeClient()





