首页 > 阅读redis(1)|逐步分析Redis建立监听服务的过程
头像
fibonaccii
发布于 2021-05-14 00:42
+ 关注

阅读redis(1)|逐步分析Redis建立监听服务的过程

导读: 这一期来讲解下redis建立监听服务的过程。

在网络编程的前两期,讲解了一个服务器的基本设计:

redis的服务器部分自然也符合上述两期的设计,并且以redis为例进行了部分源码分析。从本期开始,将开始全面分析redis源码之路。



走近reactor设计模式,剖析高性能服务器框架核心一文中我们已经知道,高性能服务的核心在于怎么设计好事件循环eventloop结构。下面,就先来看看redis中表征eventloop的结构体 struct aeEventLoop

struct aeEventLoop

struct aeEventLoop结构体如下。

  typedef struct aeEventLoop {
      int maxfd;                        /* 当前注册的fd最大数 */
      int setsize;                       /* 能注册的最大fd数 */
      long long timeEventNextId;       /* 当前定时器事件最大id */
      time_t lastTime;                   /* Used to detect system clock skew */ 
      aeFileEvent* events;               /* 数组,已经注册的事件, events 建立了一个映射关系:fd <--> event*/
      aeFiredEvent* fired;               /* 数组,已经触发的事件,记录的是触发的fd,及其事件类型 */
      aeTimeEvent* timeEventHead;     /* 定时器链表的头节点 */
      int stop;                         /* EventLoop 是否停止 */
      void* apidata;                    /* 这个用于存放和 epollfd 相关的数据 */
      aeBeforeSleepProc *beforesleep; /* 在 epoll_wait 阻塞之前调用的函数 */
      aeBeforeSleepProc *aftersleep;  /* 在 epoll_wait 唤醒之后调用的函数 */
      int flags;                      /* 设置的标志位 */
  } aeEventLoop;

下面解释下其中几个字段。

events & fired

eventsfired 字段,实际上都是数组,events 数组的下标表达的是fd,而对应的元素是该fd 关注的事件, fired 的下标仅仅是索引 index,该 index位置上的元素是记录了触发的fd 及其对应的事件类型。

数组,可以视为一种简单的map,不过数组是在下标索引和该位置的元素之间建立了映射关系。因此,eventsfired字段,就是在fd/index和对应的事件之间建立映射关系。

其中,events中的元素类型是struct aeFileEventfired中元素的类型是struct aeFiredEvent,下面是完整的字段及其解释。

typedef struct aeFileEvent {
    int mask;                  // 关注的事件:AE_NONE、AE_READABLE 、AE_WRITABLE、AE_BARRIER 
    aeFileProc *rfileProc;   // 可读事件的回调函数 
    aeFileProc *wfileProc;     // 可写事件的回调函数
    void *clientData;     
} aeFileEvent;

typedef struct aeFiredEvent {
    int fd;        // 有事件触发的文件描述符
    int mask;   // 这个fd触发具体的事件类型
} aeFiredEvent;

timeEventHead

在redis中,是使用链表存储定时器任务,而timeEventHead字段,指向的是定时任务链表的头部节点。

由于每次定时任务都是在链表头部插入,因此 timeEventHead 记录的是最晚插入的节点(由于是直接插入,没有对定时器任务链表排序,因此头节点不一定是最早超时的任务)。

关于定时任务的设计,可以参考剖析服务器の定时器设计

typedef struct aeTimeEvent {
    long long id;       /* 由 timeEventNextId++ 赋值,自增主键*/
    long when_sec;      /* seconds */
    long when_ms;       /* milliseconds */
    aeTimeProc* timeProc;  // 时间回调函数
    aeEventFinalizerProc* finalizerProc;
    void* clientData;
    struct aeTimeEvent* prev;
    struct aeTimeEvent* next;
    int refcount; 
} aeTimeEvent;

aeCreateEventLoop

aeCreateEventLoop函数,创建aeEventLoop对象:

  • 分配内存:先给aeEventLoop对象eventloop分配内存,再给eventsfired字段分配内存,且设置events[i].mask不关注任何事件;
  • 使用aeApiCreate函数(这个函数在下面讲解),创建epollfd,为后文使用epoll_xxx函数准备;
  • 其他默认初始化

aeCreateEventLoop函数里,或者说是在redis里,大量使用了 goto 语句,这与很多书中所述的禁止使用goto相悖?

  1. 由于c语言本身的限制,不得不使用goto,或者说,不使用goto会造成代码冗余;
  2. 禁止goto使用在业务中,在c语言实现的底层框架中,还是会存在大量使用goto的情况。

因此,看到goto不要心生鄙夷,亦或胆怯。

下面,是完整的实现。

  aeEventLoop *aeCreateEventLoop(int setsize) {
      aeEventLoop *eventLoop;
      int i;

      // 1 分配内存
      if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) 
          goto err;
      eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
      eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
      if (eventLoop->events == NULL || eventLoop->fired == NULL) 
          goto err;
      // 2 初始化
      eventLoop->setsize = setsize;
      eventLoop->lastTime = time(NULL);
      eventLoop->timeEventHead = NULL;
      eventLoop->timeEventNextId = 0;
      eventLoop->stop = 0;
      eventLoop->maxfd = -1;
      eventLoop->beforesleep = NULL;
      eventLoop->aftersleep = NULL;
      eventLoop->flags = 0;
      // 3 创建 epollfd
      if (aeApiCreate(eventLoop) == -1) goto err;

      // 初始化时,什么事件也没关注
      for (i = 0; i < setsize; i++)
          eventLoop->events[i].mask = AE_NONE;
      return eventLoop;

  err:
      if (eventLoop) {
          zfree(eventLoop->events);
          zfree(eventLoop->fired);
          zfree(eventLoop);
      }
      return NULL;
  }

一个eventloop创建完成之后,下面就是要向这个eventloop中注册事件,然后等待事件的发生,最后去处理事件。下面就来介绍redis中是如何封装epoll_xxx系列函数,完成注册事件、等待事件发生。

epollfd

epoll_xxx系列函数,都是围绕epollfd进行的。在redis中,epoll_xxx系列函数被封装后以 aeApi_ 为前缀。

  • aeApiCreate:基于epoll_create函数封装,创建 epollfd
  • aeApiAddEvent:基于epoll_ctl函数封装,向epllfd中注册感兴趣的事件
  • aeApiDelEvent:基于epoll_ctl函数封装,从epollfd中删除感兴趣的事件
  • aeApiPoll:基于epoll_wait函数封装,阻塞等待事件发生

下面,依次介绍 aeApi_xxx 系列函数。

aeApiCreate

aeApiCreate函数,基于epoll_create函数创建epollfd,其中 epoll_create 函数的入口参数size没有任何含义,但必须是个大于0的正数,也可以直接使用 epoll_create1 函数来创建epollfd,不过要传入一个标志位。

struct aeApiState

epollfd的相关状态在redis中使用struct aeApiState结构体保存,而struct aeApiState对象最终存储在eventloop->apidata中,struct aeApiState字段完整如下。

  typedef struct aeApiState {
      int epfd;                // epollfd
      struct epoll_event* events;
  } aeApiState;

   struct epoll_event {
     uint32_t     events;   // epoll_wait中触发的事件
     epoll_data_t data;        // 存放触发事件对应的fd
  } __EPOLL_PACKED;

其中:

  • epfd:保存的是 aeApiCreate 函数创建的epollfd
  • events:是个数组类型,元素类型struct epoll_event对象中保存 epoll_wait 监听到的活跃事件及其对应的fdevents大小最大是eventLoop中的setsize

好嘞,介绍完相关的数据结构,下面来看看aeApiCreate函数,它的整个逻辑也很简单:

  • 分配相关数据结构的内存;
  • 使用epoll_create函数来创建efd,并使用struct aeApiState对象state来保存efd及其事件信息;
  • state存储在aeEventLoop::apidata字段处。

完整实现如下。

static int aeApiCreate(aeEventLoop *eventLoop) {
      aeApiState* state = zmalloc(sizeof(aeApiState));
      if (!state) return -1;

      state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
      if (!state->events) {
          zfree(state);
          return -1;
      }

      state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
      if (state->epfd == -1) {
          zfree(state->events);
          zfree(state);
          return -1;
      }
      eventLoop->apidata = state;     
      return 0;
  }

aeApiAddEvent

aeApiAddEvent函数,内部调用 epoll_ctl 函数, 将 fd 注册到epollfd的事件空间中。

这个行为对epollfd上已监听的事件不会产生任何影响,只是为fd多关注一个事件,下面会即将讲解 的listenfd 也是使用这个函数来注册感兴趣事情。

这个函数实现比较简单,如下。

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata; 
    struct epoll_event ee = {0}; /*avoid valgrind warning */

    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;

    mask |= eventLoop->events[fd].mask;              // 与操作,是为了不影响fd上之前关注的事件
    if (mask & AE_READABLE) ee.events |= EPOLLIN;   // 关注可读事件
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  // 关注可写事件
    ee.data.fd = fd;
    // 将 {fd, ee} 注册到 efd 的事件空间上去
    if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
    return 0;
}

aeApiDelEvent

aeApiDelEvent函数,内部也是基于epoll_ctl函数,使文件描述符fd不再关注delmask事件。

  • fd 仅不关注delmask事件后,如果fd仍关注其他事件,epoll_ctl函数的第二个参数则使用 EPOLL_CTL_MOD
  • 否则,就需要将fdefd的事件空间中删除,此时第二个参数使用EPOLL_CTL_DEL标志位。

完整实现如下。

  static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
      aeApiState *state = eventLoop->apidata;
      struct epoll_event ee = {0}; /* avoid valgrind warning */
      int mask = eventLoop->events[fd].mask & (~delmask); // 取消对于 delmask 的关注

      ee.events = 0;
      if (mask & AE_READABLE) ee.events |= EPOLLIN;
      if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
      ee.data.fd = fd;
      // 如果取消关注事件 delmask 后还由其他事件,那么就使用 EPOLL_CTL_MOD 标志位
      // 否则就将fd从state->efd的事件空间中删除,此时使用 EPOLL_CTL_DEL 标志位
      if (mask != AE_NONE) {
          epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
      } else {
          epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
      }
  }

aeApiPoll

aeApiPoll函数,内部是基于epoll_wait函数实现,用于阻塞等待epollfd事件空间中已注册的事件发生。

如果epoll_wait函数监听到事件发生,redis就会将这些事件及其对应的fd存储到eventloop.fired字段中,在上面解释过,这个字段是个数组。

完成的实现如下。

static int aeApiPoll(aeEventLoop* eventLoop, struct timeval* tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,                                         // epollfd
                        state->events,                                       // 监听的的事件类型及其fd
                        eventLoop->setsize,                                  // 最大可监听事件大小
                        tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);  // 最长阻塞时间
    if (retval > 0) {
        int j;
        // 将所有发生的事件及其对应的fd 添加到 eventloop.fired数组中
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event* e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; // 错误是即可读又是可写
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; // 对端关闭
            eventLoop->fired[j].fd = e->data.fd;                      // 记录触发事件的 fd
            eventLoop->fired[j].mask = mask;                          // 记录触发的事件类型
        }
    }
    return numevents;
}

上面介绍了struct aeEventLoop结构体,以及epoll_xxx系列封装后的aeApi_xxx系列函数。下面,就来看看redis是怎么利用这些函数一步步创建监听服务。

_anetTcpServer

_anetTcpServer函数,是redis内部函数,可以用于创建ipv-4、ipv-6监听服务器。因此,经过封装变成两个对外开发函数,分别用来创建ipv4ipv6服务。

int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
    return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);        // ipv4
}

int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
{
    return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);        // ipv6
}

// in server.c
int listenToPort(int port, int *fds, int *count) {
    int j;

    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        if (server.bindaddr[j] == NULL) {
            //... 
        } else if (strchr(server.bindaddr[j],':')) {
            /* Bind IPv6 address. */
            fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
        } else {
            /* Bind IPv4 address. */
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], server.tcp_backlog);
        }
        //...
    }
    return C_OK;
}

下面来看看_anetTcpServer函数,是怎么创建监听状态的服务器,步骤如下:

  1. getaddrinfo函数,获取本机上的所有ip地址及对应的TCP服务。此函数参数:

    • SOCK_STREAM:指定了服务类型,是属于tcp
    • AI_PASSIVE:如果传入的地址bindaddr不是NULL,那么这个设置无效。如果bindaddr==NNULL,则getaddrinfo返回的ip地址就是通配地址。

    关于getaddrinfo的使用,可以参考《UNIX网络编程》。 这个函数,为创建的listenfd任选一个由getaddrinfo返回的本地IP地址来绑定。

  2. 设置了SO_REUSEADDR参数,更多socket参数选项,可以参考《UNIX网络编程》。

  3. 调用anetListen函数,调用了绑定到固定端口、并开启监听

_anetTcpServer 函数,可以为本地的ipv-4、ipv-6各自创建一个监听文件描述符并保存在 server.ipfd中,个数由server.ipfd_count记录,监听的端口port默认是6379

  static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
  {
      int s = -1, rv;
      char _port[6];  /* strlen("65535") */
      struct addrinfo hints, *servinfo, *p; 

      snprintf(_port,6,"%d",port);
      memset(&hints,0,sizeof(hints));
      hints.ai_family = af;               /* ipv4 or ipv6*/
      hints.ai_socktype = SOCK_STREAM; /* TCP 数据类型 */
      hints.ai_flags = AI_PASSIVE;     /* 如果 bindarry==NULL,返回的就是通配地址 */

      if ((rv = getaddrinfo(bindaddr, _port, &hints, &servinfo)) != 0) {
          anetSetError(err, "%s", gai_strerror(rv));
          return ANET_ERR;
      }

      // 绑定成功一个就会跳出for循环
      for (p = servinfo; p != NULL; p = p->ai_next) {

          if ((s = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) // 创建TCP类型的 listenfd
              continue;
          if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) 
              goto error;
          if (anetSetReuseAddr(err, s) == ANET_ERR) goto error;                  // 设置地址复用
          if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) // 绑定地址并开启监听
              s = ANET_ERR;
          goto end;
      }
      if (p == NULL) {
          anetSetError(err, "unable to bind socket, errno: %d", errno);
          goto error;
      }

  error:
      if (s != -1) close(s);
      s = ANET_ERR;
  end:
      freeaddrinfo(servinfo);
      return s;
  }

anetListen

anetListen函数,是redis用来建立监听服务的,内部涉及两个步骤:

  • 先调用bind函数,将监听文件描述符 s 绑定到地址 sd上,
  • 调用listen函数,开启监听服务

完成的实现如下。

  static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
      if (bind(s,sa,len) == -1) {
          anetSetError(err, "bind: %s", strerror(errno));
          close(s);
          return ANET_ERR;
      }

      if (listen(s, backlog) == -1) {
          anetSetError(err, "listen: %s", strerror(errno));
          close(s);
          return ANET_ERR;
      }
      return ANET_OK;
  }

稍微总结下:

  • _anetTcpServer函数,创建的是一个阻塞的listenfd
  • listenToPort函数中将listenfd设置为非阻塞IO,listenToPort函数调用结束,创建非阻塞的监听服务器才算完成。

下一步,就要将listenfd注册到epollfd中,并且关注可读事件,这样服务器就能监听客户端的连接请求了(触发可读事件),这一过程由aeCreateFileEvent函数完成。

aeCreateFileEvent

aeCreateFileEvent函数,将文件描述符fd注册到 epollfd 的事件空间中,注册感兴趣的事件mask,以及感兴趣的事件发生时要执行的回答函数。

比如,在 initServer 函数中 aeCreateFileEvent 函数将 listenfd 挂在 epollfd 上并注册可读事件,目的是监听客户端的连接请求,客户端的连接请求处理函数是 acceptTcpHandler(这个函数在下一期会讲解)。

     //in server.c 
    for (j = 0; j < server.ipfd_count; j++) {
      if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
          {
              serverPanic("Unrecoverable error creating server.ipfd file event.");
          }
    }

由于 server.ipfd 中保存的是根据本地的ipv4、ipv6地址,因此最终会创建的两个 listenfd。而aeCreateFileEvent函数作用是将每个监听listenfd绑定到所属的eventloop->apidata->efd中去。

因此,上面这个for循环运行结束,会有两个监听服务,一个是监听ipv4,一个监听ipv6。

截图!!!

aeCreateFileEvent函数在此处的主要作用:

  • listenfd 注册到所属的 eventLoopepfd 中去,并关注可读事件;
  • 设置可读事件的回调函数 acceptTcpHandler,这个也是新的客户端连接到来时候的处理函数;
  • 更新最大注册的 fd,即maxfd

完整的实现如下。

  int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
  {
      if (fd >= eventLoop->setsize) {
          errno = ERANGE;
          return AE_ERR;
      }
      aeFileEvent* fe = &eventLoop->events[fd];

      if (aeApiAddEvent(eventLoop, fd, mask) == -1)
          return AE_ERR;
      fe->mask |= mask;
      if (mask & AE_READABLE) fe->rfileProc = proc;
      if (mask & AE_WRITABLE) fe->wfileProc = proc;
      fe->clientData = clientData;
      if (fd > eventLoop->maxfd)
          eventLoop->maxfd = fd;
      return AE_OK;
  }

至此,建立监听TCP服务器的流程基本完成:aeCreateEventLoop --> anetTcpServer--> aeCreateFileEvent

  • 建立了非阻塞的监听端口listenfd
  • 将该listenfd注册到epollfd中。

下面,就是要将服务器运行起来,让listenfd接受客户端的连接请求,这一过程由aeMain函数完成。

aeMain

aeMain函数,内部的核心函数是 aeProcessEvents,使得aeProcessEvents函数处理每次迭代中各个请求。

  void aeMain(aeEventLoop *eventLoop) {
      eventLoop->stop = 0;
      while (!eventLoop->stop) {
          aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                     AE_CALL_BEFORE_SLEEP|
                                     AE_CALL_AFTER_SLEEP);
      }
  }

aeProcessEvents

走近reactor设计模式,剖析高性能服务器框架核心一节中讲解过它的前半部分,即如何设置epoll_Wait的阻塞时间。下面来全面地看看它的功能。这个函数,整个流程可以分为5个部分:

  1. aeApiPoll 函数之前就做了一件事:计算epoll_wait需要阻塞的时间

    • 如果设置了AE_DONT_WAIT,那么就是不阻塞,epoll_wait的超时时间为0
    • 如果没有其他任务,只有定时器任务,那么epoll_wait阻塞时间即最早超时时间,防止定时器任务等待过久
    • 如果也没有定时器任务,那么就永远等待,直到有事件到来。

    这一部分,在走近reactor设计模式,剖析高性能服务器框架核心已经详细说过。

  2. beforesleep:主要是在epoll_wait阻塞前处理一些任务,防止因阻塞长时间无法执行,或者是一些准备工作;

  3. epoll_wait等待事件发生;

  4. aftersleep:可以用于完成 epoll_wait唤醒之后的一些校验工作;

  5. 处理事件:根据是否设置了AE_BARRIER,来决定同一个fd上是先处理可读事件还是先处理可写事件;

  6. 处理完活跃的事件,最早的定时器可能已经超时了,是时候去执行了。

完成的实现如下。

int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    int processed = 0, numevents;

    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);    // 如果有时间事件存在,则获取最近的定时器超时时间
        if (shortest) {
            long now_sec, now_ms;

            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            // 最近超时的时间转为毫秒单位
            long long ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms;

            if (ms > 0) {
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else {
            /* 要么是超时时间为0,即使设置了AE_DONT_WAIT,不要阻塞
             * 要么是没有定时时间,则可以阻塞 */

            // 不要阻塞,则设置超时时间为0 
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                /* Otherwise we can block */
                // 没有可触发事件,则等待
                t***ULL; /* wait forever */
            }
        }

        // 再次确认下
        if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }

        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop); // 在休眠之前执行

        numevents = aeApiPoll(eventLoop, tvp); // epoll_wait

        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop); // 在休眠之后执行

        // 逐个处理触发的事件
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd   = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            int invert = fe->mask & AE_BARRIER;

            // fe->mask 是之前关注的事件,mask是产生的事件,如果都有 AE_READABLE
            // 则触发可读事件
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    } 

    // 可以去处理时间事件了
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

编译运行

最后,可能再讲下怎么编译redis。在学习的时候,为了便于调试,因此会加上调试信息,并且去优化:

$ pwd
/home/codespace/redis-6.0.5
$ make FLAGS="-g -O0" -j 4   ///!!! 编译
$ cd src
$ ./redis-server             /// !!! 运行redis服务器

我们再使用lsof命令来看看redis-server打开的文件描述符,从下面也可以看出:

  • fd=5 是使用epoll_create函数创建的epollfd
  • ·fd=6fd=7是redis开启的两个listenfd,分别用于监听ipv4ipv6请求。
codespace@ubuntu:~/redis-6.0.5$ lsof -c redis-server
COMMAND    PID      USER   FD      TYPE DEVICE SIZE/OFF   NODE NAME
redis-ser 3556 codespace  cwd       DIR    8,5    12288 434667 /home/codespace/redis-6.0.5/src
redis-ser 3556 codespace  rtd       DIR    8,5     4096      2 /
redis-ser 3556 codespace  txt       REG    8,5 11793608 428898 /home/codespace/redis-6.0.5/src/redis-server
redis-ser 3556 codespace  mem       REG    8,5 16287648 534002 /usr/lib/locale/locale-archive
redis-ser 3556 codespace  mem       REG    8,5  2029224 538922 /usr/lib/x86_64-linux-gnu/libc-2.31.so
redis-ser 3556 codespace  mem       REG    8,5   157224 540595 /usr/lib/x86_64-linux-gnu/libpthread-2.31.so
redis-ser 3556 codespace  mem       REG    8,5    18816 539148 /usr/lib/x86_64-linux-gnu/libdl-2.31.so
redis-ser 3556 codespace  mem       REG    8,5  1369352 539990 /usr/lib/x86_64-linux-gnu/libm-2.31.so
redis-ser 3556 codespace  mem       REG    8,5   191472 538314 /usr/lib/x86_64-linux-gnu/ld-2.31.so
redis-ser 3556 codespace    0u      CHR  136,1      0t0      4 /dev/pts/1
redis-ser 3556 codespace    1u      CHR  136,1      0t0      4 /dev/pts/1
redis-ser 3556 codespace    2u      CHR  136,1      0t0      4 /dev/pts/1
redis-ser 3556 codespace    3r     FIFO   0,12      0t0  50175 pipe
redis-ser 3556 codespace    4w     FIFO   0,12      0t0  50175 pipe
redis-ser 3556 codespace    5u  a_inode   0,13        0  16306 [eventpoll]      //!!! epollfd
redis-ser 3556 codespace    6u     IPv6  50176      0t0    TCP *:6379 (LISTEN)    //!!! ipv6 监听,端口6379
redis-ser 3556 codespace    7u     IPv4  77825      0t0    TCP *:6379 (LISTEN)  //!!! ipv4 监听,端口6379
redis-ser 3556 codespace   99w      REG    8,5        0 428519 /home/codespace/.vscode-server/bin/cfa2e218100323074ac1948c885448fdf4de2a7f/vscode-remote-lock.codespace.cfa2e218100323074ac1948c885448fdf4de2a7f

到此,就大致讲解完毕redis建立监听服务的过程了。不出意外,会为这一期准备个直播,敬请期待。

全部评论

(0) 回帖
加载中...
话题 回帖

相关热帖

近期热帖

近期精华帖

热门推荐