Redis 네트워크 모델 의 소스 코드 에 대한 상세 한 분석

머리말
Redis 의 네트워크 모델 은 I/O 다 중 재 활용 프로그램 을 바탕 으로 이 루어 진다.원본 코드 에는 네 가지 다 중 복합 함수 라 이브 러 리 epoll,select,evport,kqueue 가 포함 되 어 있 습 니 다.프로그램 을 컴 파일 할 때 시스템 에 따라 이 네 가지 라 이브 러 리 중 하 나 를 자동 으로 선택 합 니 다.다음은 epoll 을 예 로 들 어 Redis 의 I/O 모듈 의 소스 코드 를 분석 합 니 다.
epoll 시스템 호출 방법
Redis 네트워크 이벤트 처리 모듈 의 코드 는 모두 epoll 의 세 가지 시스템 방법 을 중심 으로 작 성 된 것 입 니 다.먼저 이 세 가지 방법 을 분명히 하면 뒤 에는 어렵 지 않 을 것 이다.
epfd = epoll_create(1024);
epoll 인 스 턴 스 생 성
매개 변수:이 epoll 인 스 턴 스 의 최대 감청 가능 한 socket fd(파일 설명자)수 를 표시 합 니 다.
복귀:epoll 전용 파일 설명자.
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epoll 의 이 벤트 를 관리 하고 이 벤트 를 등록,수정,삭제 합 니 다.
인자:
epfd:epoll 인 스 턴 스 의 파일 설명자;
op:수치 세 가지:EPOLLCTL_ADD 등록,EPOLLCTL_MOD 수정,EPOLLCTL_DEL 삭제;
fd:socket 의 파일 설명자;
epoll_이벤트*이벤트:이벤트
이 벤트 는 자바 NIO 의 채널'채널'과 유사 한 이 벤트 를 대표 합 니 다.epoll_이벤트 의 구 조 는 다음 과 같다.

typedef union epoll_data {
void *ptr;
int fd; /* socket      */
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events                   ,  EPOLLIN(fd  )、EPOLLOUT(fd  ) */
epoll_data_t data; /* User data variable */
};
int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);
자바 NIO 에서 select 방법 과 같은 이벤트 가 준비 되 었 는 지 기다 리 십시오.이벤트 가 준비 되면,준 비 된 이 벤트 를 이벤트 그룹 에 저장 합 니 다.
매개 변수
epfd:epoll 인 스 턴 스 의 파일 설명자;
이벤트:준 비 된 이벤트 그룹;
intmaxevents:매번 처리 할 수 있 는 이벤트 수;
timeout:시간 을 막 고 준비 되 어 있 는 이벤트 의 시간 초과 값 을 기다 리 고 있 습 니 다.
소스 코드 분석
이벤트
Redis 이벤트 시스템 에서 이 벤트 를 두 가지 유형 으로 나 눕 니 다.
  • 파일 이벤트;네트워크 소켓 에 대응 하 는 이벤트;
  • 시간 이벤트:Redis 의 정시 조작 이벤트,예 를 들 어 server Cron 함수.
  • 다음은 이벤트 의 등록,트리거 두 절차 에서 소스 코드 를 분석 합 니 다.
    귀속 이벤트
    이벤트 루프 만 들 기
    initServer 방법(redis.c 의 main 함수 호출)에서 RedisDb 대상 을 만 드 는 동시에'eventLoop'대상 을 초기 화 합 니 다.저 는 이벤트 프로세서 대상 이 라 고 부 릅 니 다.구조 체 의 관건 적 인 구성원 변 수 는 다음 과 같다.
    
    struct aeEventLoop{
    aeFileEvent *events;//          
    aeFiredEvent *fired;//          
    aeTimeEvent *timeEventHead;//      
    ...
    }
    eventLoop 을 ae.c 의'aeCreate EventLoop'방법 에서 초기 화 합 니 다.이 방법 은 이벤트 Loop 을 초기 화 하 는 것 외 에 다음 과 같은 방법 으로 epoll 인 스 턴 스 를 초기 화 합 니 다.
    
    /*
     * ae_epoll.c
     *        epoll   ,       eventLoop
     */
    static int aeApiCreate(aeEventLoop *eventLoop) {
    
      aeApiState *state = zmalloc(sizeof(aeApiState));
    
      if (!state) return -1;
    
      //         
      state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
      if (!state->events) {
        zfree(state);
        return -1;
      }
    
      //    epoll   
      state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
      if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
      }
    
      //     eventLoop
      eventLoop->apidata = state;
      return 0;
    }
    바로 이곳 에서 시스템 방법"epoll"을 호출 했 습 니 다.create”。이곳 의 state 는 aeApi State 구조 로 다음 과 같다.
    
    /*
     *     
     */
    typedef struct aeApiState {
    
      // epoll      
      int epfd;
    
      //    
      struct epoll_event *events;
    
    } aeApiState;
    이 state 는 이벤트 Loop->apidata 가 기록 합 니 다.
    ip 포트 와 핸들 바 인 딩
    listenToPort 방법 으로 TCP 포트 를 열 면 모든 IP 포트 는 파일 설명자 ipfd 에 대응 합 니 다(서버 에 여러 개의 ip 주소 가 있 을 수 있 기 때 문 입 니 다)
    
    //    TCP     ,            
    if (server.port != 0 &&
      listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
      exit(1);
    주의:*eventLoop 과 ipfd 는 각각 server.el 과 server.ipfd[]에 의 해 인용 되 었 습 니 다.server 는 구조 체 RedisServer 의 인 스 턴 스 로 Redis 의 전역 변수 입 니 다.
    등록 이벤트
    다음 코드 는 모든 파일 설명자 에 이벤트 함 수 를 연결 합 니 다.
    
    // initServer  :
    for (j = 0; j < server.ipfd_count; j++) {
      if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
        acceptTcpHandler,NULL) == AE_ERR)
        {
          redisPanic(
            "Unrecoverable error creating server.ipfd file event.");
        }
    }
    // ae.c    aeCreateFileEvent   
    /*
     *    mask     ,   fd      ,
     *   fd    ,   proc   
     */
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
    {
      if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
      }
    
      if (fd >= eventLoop->setsize) return AE_ERR;
    
      //         
      aeFileEvent *fe = &eventLoop->events[fd];
    
      //      fd      
      if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    
      //         ,        
      fe->mask |= mask;
      if (mask & AE_READABLE) fe->rfileProc = proc;
      if (mask & AE_WRITABLE) fe->wfileProc = proc;
    
      //     
      fe->clientData = clientData;
    
      //      ,           fd
      if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    
      return AE_OK;
    }
    aeCreate FileEvent 함수 에서 호출 하 는 방법 이 있 습 니 다:aeApiAddEvent,코드 는 다음 과 같 습 니 다.
    
    /*
     * ae_epoll.c
     *         fd
     */
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
      aeApiState *state = eventLoop->apidata;
      struct epoll_event ee;
    
      /* If the fd was already monitored for some event, we need a MOD
       * operation. Otherwise we need an ADD operation. 
       *
       *    fd         ,       ADD   。
       *
       *          /    ,       MOD   。
       */
      int op = eventLoop->events[fd].mask == AE_NONE ?
          EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    
      //       epoll
      ee.events = 0;
      mask |= eventLoop->events[fd].mask; /* Merge old events */
      if (mask & AE_READABLE) ee.events |= EPOLLIN;
      if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
      ee.data.u64 = 0; /* avoid valgrind warning */
      ee.data.fd = fd;
    
      if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    
      return 0;
    }
    여 기 는 실제로 시스템 호출 방법"epoll"입 니 다.ctl"이벤트(파일 설명자)를 epoll 에 등록 합 니 다.우선 epoll 을 패키지 해 야 합 니 다.이벤트 구조,즉 ee,"epollctl"이 를 epoll 에 등록 합 니 다.
    그 밖 에 aeCreate FileEvent 는 다음 두 가지 중요 한 작업 을 완 성 했 습 니 다.
  • 이벤트 함수"acceptTcpHandler"를 이벤트 Loop 에 저장 합 니 다.즉,이벤트 Loop->이벤트[fd]->rfileProc 에서 참조 합 니 다(wfileProc 일 수도 있 습 니 다.각각 읽 기와 쓰기 이 벤트 를 대표 합 니 다).
  • 이벤트 Loop->이벤트[fd]->mask 에 작업 코드 를 추가 합 니 다(mask 는 자바 NIO 의 ops 작업 코드 와 유사 하 며 이벤트 형식 을 대표 합 니 다).
  • 사건 감청 과 집행
    redis.c 의 main 함 수 는 ae.c 의 main 방법 을 호출 합 니 다.다음 과 같 습 니 다.
    
    /*
     *          
     */
    void aeMain(aeEventLoop *eventLoop) {
    
      eventLoop->stop = 0;
    
      while (!eventLoop->stop) {
    
        //                 ,     
        if (eventLoop->beforesleep != NULL)
          eventLoop->beforesleep(eventLoop);
    
        //       
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
      }
    }
    상기 코드 는 aeProcessEvents 방법 을 사용 하여 사건 을 처리 할 것 입 니 다.방법 은 다음 과 같 습 니 다.
    
    /* Process every pending time event, then every pending file event
     * (that may be registered by time event callbacks just processed).
     *
     *             ,            。
     *                
     */
     int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
      int processed = 0, numevents;
    
      /* Nothing to do? return ASAP */
      if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    
      if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
    
        //          
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
          shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
          //           
          //                                   
          long now_sec, now_ms;
    
          /* Calculate the time missing for the nearest
           * timer to fire. */
          //                    
          //           tv    
          aeGetTime(&now_sec, &now_ms);
          tvp = &tv;
          tvp->tv_sec = shortest->when_sec - now_sec;
          if (shortest->when_ms < now_ms) {
            tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
            tvp->tv_sec --;
          } else {
            tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
          }
    
          //       0 ,           ,        0 (   )
          if (tvp->tv_sec < 0) tvp->tv_sec = 0;
          if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
          
          //       ,        
          //      AE_DONT_WAIT            ,         
    
          /* If we have to check for events but need to return
           * ASAP because of AE_DONT_WAIT we need to set the timeout
           * to zero */
          if (flags & AE_DONT_WAIT) {
            //          
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
          } else {
            /* Otherwise we can block */
            //                  
            tvp = NULL; /* wait forever */
          }
        }
    
        //       ,      tvp   
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
          //            
          aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
    
          int mask = eventLoop->fired[j].mask;
          int fd = eventLoop->fired[j].fd;
          int rfired = 0;
    
          /* note the fe->mask & mask & ... code: maybe an already processed
           * event removed an element that fired and we still didn't
           * processed, so we check if the event is still valid. */
          //    
          if (fe->mask & mask & AE_READABLE) {
            // rfired    /           
            rfired = 1;
            fe->rfileProc(eventLoop,fd,fe->clientData,mask);
          }
          //    
          if (fe->mask & mask & AE_WRITABLE) {
            if (!rfired || fe->wfileProc != fe->rfileProc)
              fe->wfileProc(eventLoop,fd,fe->clientData,mask);
          }
    
          processed++;
        }
      }
    
      /* Check time events */
      //       
      if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    
      return processed; 
    }
    이 함수 에서 코드 는 크게 세 가지 주요 절차 로 나 뉜 다.
  • 시간 이벤트 와 현재 시간의 관계 에 따라 시간 tvp 를 차단 하기 로 결정 합 니 다.
  • aeApipoll 방법 을 호출 하여 준 비 된 이 벤트 를 이벤트 Loop->fired[]에 기록 하고 준 비 된 이벤트 수 를 되 돌려 줍 니 다.
  • 이벤트 Loop->fired[]를 옮 겨 다 니 며 모든 준비 이 벤트 를 옮 겨 다 니 며 이전에 연 결 된 방법 rfileProc 또는 wfileProc 를 실행 합 니 다.
  • ae_epoll.c 의 aeApipoll 방법 은 다음 과 같 습 니 다.
    
    /*
     *        
     */
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
      aeApiState *state = eventLoop->apidata;
      int retval, numevents = 0;
    
      //     
      retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
          tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    
      //          ?
      if (retval > 0) {
        int j;
    
        //              
        //      eventLoop   fired    
        numevents = retval;
        for (j = 0; j < numevents; j++) {
          int mask = 0;
          struct epoll_event *e = state->events+j;
    
          if (e->events & EPOLLIN) mask |= AE_READABLE;
          if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
          if (e->events & EPOLLERR) mask |= AE_WRITABLE;
          if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
    
          eventLoop->fired[j].fd = e->data.fd;
          eventLoop->fired[j].mask = mask;
        }
      }
      
      //          
      return numevents;
    }
    실행 epollwait 후 준 비 된 이벤트 가 이벤트 Loop->apidata->이벤트 이벤트 슬롯 에 기 록 됩 니 다.뒤의 순환 은 이벤트 슬롯 에 있 는 이 벤트 를 이벤트 Loop->fired[]에 기록 하 는 것 입 니 다.구체 적 인 설명:모든 사건 은 하나의 epoll 입 니 다.이벤트 구 조 는 e 로 대체 하면 e.data.fd 는 파일 설명 자 를 대표 합 니 다.e->events 는 조작 코드 를 표시 하고 조작 코드 를 mask 로 바 꾸 며 마지막 으로 fd 와 mask 를 eventLoop->fired[j]에 기록 합 니 다.
    그 다음 에 외층 의 aeProcessEvents 방법 에서 함수 포인터 rfileProc 또는 wfileProc 가 가리 키 는 방법 을 실행 합 니 다.예 를 들 어 앞에서 언급 한'acceptTcpHandler'입 니 다.
    총결산
    Redis 의 네트워크 모듈 은 사실 간단 하고 쉬 운 Reactor 모델 이다.본 고 는'서버 등록 사건->클 라 이언 트 연결 받 기->감청 사건 이 준비 되 었 는 지->실행 사건'이라는 노선 을 따라 Redis 소스 코드 를 분석 하고 Redis 가 클 라 이언 트 connect 를 받 아들 이 는 과정 을 묘사 했다.사실 NIO 의 사상 은 거의 비슷 하 다.
    여기 서 Redis 네트워크 모델 의 소스 코드 에 대한 상세 한 분석 에 관 한 글 은 여기까지 소개 되 었 습 니 다.더 많은 관련 Redis 네트워크 모델 소스 코드 내용 은 우리 의 이전 글 을 검색 하거나 아래 의 관련 글 을 계속 조회 하 시기 바 랍 니 다.앞으로 저 희 를 많이 사랑 해 주세요!

    좋은 웹페이지 즐겨찾기