nginx 소스 코드 분석 - reuseport 의 사용

본 고 는 주로 nginx 에서 reuseport 의 사용 을 소개 하고 글 에서 코드 가 비교적 많 으 며 본 고 를 읽 으 려 면 독자 가 nginx 사건 모듈 과 nginx 설정 과정 에 대해 알 아야 한다.
nginx 가 비교적 복잡 하고 필자 가 nginx 에 대한 이해 가 유한 하기 때문에 글 에 소홀 한 점 이 있 을 수 있 습 니 다. 지적 해 주 십시오. 대단히 감사합니다!
1. reuseport 의 설정 및 분석
reuseport 는 listen 명령 을 통 해 설정 합 니 다. 설정 은 다음 과 같 습 니 다.
listen 443 reuseport;

listen 명령 은 ngxhttp_core_module 모듈 의 ngxhttp_core_listen () 함수 해석.ngx_http_core_module. c 에 대한 분석 명령 은 다음 과 같 습 니 다.
{ ngx_string("listen"),
      NGX_HTTP_SRV_CONF|NGX_CONF_1MORE, //listen       server  ,        (443,reuseport  )
      ngx_http_core_listen, //  listen     
      NGX_HTTP_SRV_CONF_OFFSET, 
      0,
      NULL },

편폭 제한 으로 reuseport 와 관련 된 해석 만 소개 하고 listen 명령 의 해석 을 완전히 소개 하지 않 습 니 다.
//   listen       ,        (      ip+port)
for (n = 2; n < cf->args->nelts; n++) {
	...
	if (ngx_strcmp(value[n].data, "bind") == 0) {
       lsopt.set = 1;
       lsopt.bind = 1; //    ip+port  bind  ,    bind,      ip+port,          
       continue;
    }
	
	//   reuseport  , reuseport    1,   bind   1
	if (ngx_strcmp(value[n].data, "reuseport") == 0) {
        lsopt.reuseport = 1;
        lsopt.set = 1;
        lsopt.bind = 1;
        continue;
    }
	...
}

2. listening 구조 만 들 기
master 는 http block 을 분석 한 후에 ngx 를 사용 합 니 다.http_optimize_server () 함수 가 설정 한 ip + port 에 대해 해당 하 는 처 리 를 하고 다음 두 가지 작업 을 진행 합 니 다.
1. 현재 http block 의 listen 설정 이 중복 되 는 지 확인 합 니 다. 즉, 두 개의 listen 명령 이 같은 ip + port + server 를 설정 하 였 는 지 확인 합 니 다.name。중복 설정 이 있 으 면 뒤쪽 ip + port + server 를 무시 합 니 다.name 설정.검 사 는 ngxhttp_server_names () 함수 가 완성 되 었 습 니 다. 여기 서 상세 하 게 소개 하지 않 습 니 다. 관심 이 있 으 면 해당 코드 를 찾 아 볼 수 있 습 니 다.
2. 분 석 된 설정 에 따라 listening 구 조 를 만 들 고 master 후속 감청 포트 는 모두 listening 에서 얻 을 수 있 으 며 모든 listening 구 조 는 bid + listening 작업 에 대응 합 니 다. 여기 서 listening 의 생 성 을 중점적으로 소개 합 니 다.
우선 ngxhttp_optimize_server () 함수 호출 ngxhttp_init_listening () 함수, ngxhttp_init_listening () 코드 는 다음 과 같 습 니 다.
static ngx_int_t
ngx_http_init_listening(ngx_conf_t *cf, ngx_http_conf_port_t *port)
{
    ngx_uint_t                 i, last, bind_wildcard;
    ngx_listening_t           *ls;
    ngx_http_port_t           *hport;
    ngx_http_conf_addr_t      *addr;

/*
   port     ,  port    addrs( port+ip),  addr    server( server_name)
port - > addr1----|-----> server1
         addr2    |-----> server2
         addr3
    	  ...

*/

    addr = port->addrs.elts; //  port   addr,  port      ip+port
    last = port->addrs.nelts; //port addr   

	//   port addr      (    ),ip    (INADDR_ANY) addr    ,   bind addr   
    if (addr[last - 1].opt.wildcard) {
        addr[last - 1].opt.bind = 1;
        bind_wildcard = 1;

    } else {
        bind_wildcard = 0;
    }

    i = 0;

    while (i < last) {
   
        // port     INADDR_ANY+port addr,   addr(ip+port)    bind,          ip+addr    
        if (bind_wildcard && !addr[i].opt.bind) { 
            i++;
            continue;
        }

        ls = ngx_http_add_listening(cf, &addr[i]); //   ip+port    listening  ,    
        if (ls == NULL) {
            return NGX_ERROR;
        }

        hport = ngx_pcalloc(cf->pool, sizeof(ngx_http_port_t));
        if (hport == NULL) {
            return NGX_ERROR;
        }

       /*
                       ip+port           addr(INADDR_ANY+port) ,         ,         。
               ,  ip+port    server   ,      ngx_http_init_connection()     ip+port default_server。
       */
        ls->servers = hport;

        hport->naddrs = i + 1;

        switch (ls->sockaddr->sa_family) {

#if (NGX_HAVE_INET6)
        case AF_INET6:
            if (ngx_http_add_addrs6(cf, hport, addr) != NGX_OK) {
                return NGX_ERROR;
            }
            break;
#endif
        default: /* AF_INET */
            if (ngx_http_add_addrs(cf, hport, addr) != NGX_OK) {
                return NGX_ERROR;
            }
            break;
        }
        //    reuseport ip+port,    listening    worker_processes - 1 ,    worker     fd,       
        if (ngx_clone_listening(cf, ls) != NGX_OK) {
            return NGX_ERROR;
        }

        addr++;
        last--;
    }

    return NGX_OK;
}

ngx_clone_listening () 코드 는 다음 과 같 습 니 다.
ngx_clone_listening(ngx_conf_t *cf, ngx_listening_t *ls)
{
#if (NGX_HAVE_REUSEPORT)

    ngx_int_t         n;
    ngx_core_conf_t  *ccf;
    ngx_listening_t   ols;

    if (!ls->reuseport) { //      reuseport listening
        return NGX_OK;
    }

    ols = *ls;

    ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
                                           ngx_core_module);

    for (n = 1; n < ccf->worker_processes; n++) {

        /* create a socket for each worker process */

        ls = ngx_array_push(&cf->cycle->listening);
        if (ls == NULL) {
            return NGX_ERROR;
        }

        *ls = ols;
        ls->worker = n; //            n worker    ,  worker     
    }

#endif

    return NGX_OK;
}

3. 감청 소켓 만 들 기
master 설정 을 분석 한 후 listening 을 만 든 후 감청 소켓 을 만 들 기 시작 합 니 다.모든 listening 구 조 는 해당 하 는 감청 소켓 을 만 들 고 reuseport 속성 을 설정 하 며 마지막 으로 bid 와 listen 작업 을 합 니 다.구체 적 인 코드 는 다음 과 같다.
우선 ngx 호출open_listening_sockets () 소켓 을 만 들 고 bid 와 listen 작업 을 진행 합 니 다.
ngx_int_t
ngx_open_listening_sockets(ngx_cycle_t *cycle)
{
    int               reuseaddr;
    ngx_uint_t        i, tries, failed;
    ngx_err_t         err;
    ngx_log_t        *log;
    ngx_socket_t      s;
    ngx_listening_t  *ls;

    reuseaddr = 1;
#if (NGX_SUPPRESS_WARN)
    failed = 0;
#endif

    log = cycle->log;

    /* TODO: configurable try number */

    for (tries = 5; tries; tries--) { //  5 
        failed = 0;

        /* for each listening socket */

        ls = cycle->listening.elts;
        for (i = 0; i < cycle->listening.nelts; i++) {

            if (ls[i].ignore) {
                continue;
            }

#if (NGX_HAVE_REUSEPORT)

            if (ls[i].add_reuseport) {  //  reuseport  ,   reload       

                /*
                 * to allow transition from a socket without SO_REUSEPORT
                 * to multiple sockets with SO_REUSEPORT, we have to set
                 * SO_REUSEPORT on the old socket before opening new ones
                 */

                int  reuseport = 1;

                if (setsockopt(ls[i].fd, SOL_SOCKET, SO_REUSEPORT,
                               (const void *) &reuseport, sizeof(int))
                    == -1)
                {
                    ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno,
                                  "setsockopt(SO_REUSEPORT) %V failed, ignored",
                                  &ls[i].addr_text);
                }

                ls[i].add_reuseport = 0;
            }
#endif

            if (ls[i].fd != (ngx_socket_t) -1) {
                continue;
            }

            if (ls[i].inherited) {

                /* TODO: close on exit */
                /* TODO: nonblocking */
                /* TODO: deferred accept */

                continue;
            }

            s = ngx_socket(ls[i].sockaddr->sa_family, ls[i].type, 0); //     
           
           ...
           
#if (NGX_HAVE_REUSEPORT)

            if (ls[i].reuseport && !ngx_test_config) {  //  reuseport
                int  reuseport;
                reuseport = 1;
                setsockopt(s, SOL_SOCKET, SO_REUSEPORT, (const void *) &reuseport, sizeof(int);
                
            }
#endif
            bind(s, ls[i].sockaddr, ls[i].socklen); //band  
              
            if (ls[i].type != SOCK_STREAM) {
                ls[i].fd = s;
                continue;
            }

            listen(s, ls[i].backlog); //listen  
            ls[i].listen = 1;
            ls[i].fd = s;
        }

        if (!failed) {
            break;
        }

        /* TODO: delay configurable */

        ngx_log_error(NGX_LOG_NOTICE, log, 0,
                      "try again to bind() after 500ms");

        ngx_msleep(500);
    }

    if (failed) {
        ngx_log_error(NGX_LOG_EMERG, log, 0, "still could not bind()");
        return NGX_ERROR;
    }

    return NGX_OK;
}

이로써 reuseport 를 설정 하 는 ip + addr 마다 worker 를 만 들 었 습 니 다.프로 세 스 의 감청 소켓 입 니 다. 워 커 프로 세 스 가 이 소켓 을 어떻게 사용 하 는 지 소개 합 니 다.
4. 워 커 의 감청 소켓 사용
reuseport 가 설정 되 어 있 지 않 은 모든 감청 소켓 에 대해 파일 설명 부 표를 엽 니 다. 계승 을 통 해 모든 워 커 는 fd 를 유지 하지만 커 널 차원 에 서 는 표 항목 만 유지 합 니 다 (파일 설명 부 표를 열 면 모든 프로 세 스 가 공유 합 니 다).바로 이 때문에 놀 라 운 문제 가 발생 했 습 니 다. 초기 nginx 는 상호 배척 자물쇠 (공유 메모리 + 원자 조작) 를 통 해 놀 라 움 을 피 했 습 니 다.worker 호출 epollwait 전에 자 물 쇠 를 요청 합 니 다. 자 물 쇠 를 성공 적 으로 획득 해야만 epoll 을 통 해 fd 를 감청 할 수 있 습 니 다.워 커 프로 세 스 간 에 경쟁 자물쇠 가 필요 하기 때문에 성능 이 높 지 않 습 니 다.
reuseport 의 감청 소켓 을 설정 하면 모든 worker 프로 세 스 는 하나의 독립 된 fd 를 가지 고 있 으 며, worker 프로 세 스 간 에 서로 간섭 하지 않 고 커 널 차원 에서 부하 균형 을 실현 하 며 효율 이 더욱 높다.worker 프로 세 스 가 시작 되면 ngx 를 호출 합 니 다.event_process_init () 는 이벤트 모듈 을 초기 화 합 니 다. 코드 는 다음 과 같 습 니 다.
static ngx_int_t
ngx_event_process_init(ngx_cycle_t *cycle)
{
    ngx_uint_t           m, i;
    ngx_event_t         *rev, *wev;
    ngx_listening_t     *ls;
    ngx_connection_t    *c, *next, *old;
    ngx_core_conf_t     *ccf;
    ngx_event_conf_t    *ecf;
    ngx_event_module_t  *module;

    ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);
    ecf = ngx_event_get_conf(cycle->conf_ctx, ngx_event_core_module);

    if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) { //worker           
        ngx_use_accept_mutex = 1;
        ngx_accept_mutex_held = 0;
        ngx_accept_mutex_delay = ecf->accept_mutex_delay;

    } else {
        ngx_use_accept_mutex = 0;
    }
    
    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

#if (NGX_HAVE_REUSEPORT)
        if (ls[i].reuseport && ls[i].worker != ngx_worker) { //worker             
            continue;
        }
#endif

        c = ngx_get_connection(ls[i].fd, cycle->log);

        if (c == NULL) {
            return NGX_ERROR;
        }

        c->type = ls[i].type;
        c->log = &ls[i].log;

        c->listening = &ls[i];
        ls[i].connection = c;

        rev = c->read;

        rev->log = c->log;
        rev->accept = 1;

        rev->handler = (c->type == SOCK_STREAM) ? ngx_event_accept
                                                : ngx_event_recvmsg;

#if (NGX_HAVE_REUSEPORT)

        if (ls[i].reuseport) {
            if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {  //        epoll,LT  
                return NGX_ERROR;
            }

            continue;
        }

#endif

		//      ,worker                   epoll
        if (ngx_use_accept_mutex) { 
            continue;
        }

		//       ,worker            epoll
        if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}


워 커 프로 세 스 가 상호 배척 자 물 쇠 를 사용 하면 상호 배척 자 물 쇠 를 성공 적 으로 얻 었 을 때 감청 소켓 을 epoll 에 추가 합 니 다.자 물 쇠 를 가 져 오 는 데 실 패 했 을 때 감청 소켓 을 epoll 에서 제거 합 니 다.구체 적 인 코드 는 다음 과 같다.
ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) { //   

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex locked");

        if (ngx_accept_mutex_held && ngx_accept_events == 0) {
            return NGX_OK;
        }

        if (ngx_enable_accept_events(cycle) == NGX_ERROR) { //      ,        epoll
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }

        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1;

        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "accept mutex lock failed: %ui", ngx_accept_mutex_held);

    if (ngx_accept_mutex_held) {
    	//      ,       epoll   (  reuseport      )
        if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }

        ngx_accept_mutex_held = 0;
    }

    return NGX_OK;
}


static ngx_int_t
ngx_enable_accept_events(ngx_cycle_t *cycle)
{
    ngx_uint_t         i;
    ngx_listening_t   *ls;
    ngx_connection_t  *c;

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) { //          epoll, LT  

        c = ls[i].connection;

        if (c == NULL || c->read->active) {
            continue;
        }

        if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) {
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}


static ngx_int_t
ngx_disable_accept_events(ngx_cycle_t *cycle, ngx_uint_t all)
{
    ngx_uint_t         i;
    ngx_listening_t   *ls;
    ngx_connection_t  *c;

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

        c = ls[i].connection;

        if (c == NULL || !c->read->active) {
            continue;
        }

#if (NGX_HAVE_REUSEPORT)

        /*
         * do not disable accept on worker's own sockets
         * when disabling accept events due to accept mutex
         */

        if (ls[i].reuseport && !all) { //  reuseport          epoll  
            continue;
        }

#endif

        if (ngx_del_event(c->read, NGX_READ_EVENT, NGX_DISABLE_EVENT)
            == NGX_ERROR)
        {
            return NGX_ERROR;
        }
    }

    return NGX_OK;
}


5. reload 과정 에서 reuseport 의 사용
master 가 reload 를 진행 할 때 ngx 를 호출 합 니 다.init_cycle, 여 기 는 ngx 만 드 립 니 다.init_cycle 부분 코드:
ngx_cycle_t *
ngx_init_cycle(ngx_cycle_t *old_cycle)
{
	...
	if (ngx_conf_parse(&conf, &cycle->conf_file) != NGX_CONF_OK) { //master      ,    listening  
        environ = senv;											   //    ip+port      listening 
        ngx_destroy_cycle_pools(&conf);
        return NULL;
    }	
    ...
    if (old_cycle->listening.nelts) { //    listening  ,                 listening  
        ls = old_cycle->listening.elts;
        for (i = 0; i < old_cycle->listening.nelts; i++) {
            ls[i].remain = 0;
        }

        nls = cycle->listening.elts;
        for (n = 0; n < cycle->listening.nelts; n++) { //    listening     

            for (i = 0; i < old_cycle->listening.nelts; i++) {//    listening    ,  ip+port    ,          listening
                if (ls[i].ignore) {
                    continue;
                }

                if (ls[i].remain) {
                    continue;
                }

                if (ls[i].type != nls[n].type) {
                    continue;
                }

                if (ngx_cmp_sockaddr(nls[n].sockaddr, nls[n].socklen,
                                     ls[i].sockaddr, ls[i].socklen, 1)
                    == NGX_OK)
                {
                    /*
                               listening,  old worker    ,       ,
                       worker     worker                ,             。
					*/
                    nls[n].fd = ls[i].fd; 
                    nls[n].previous = &ls[i];
                    ls[i].remain = 1;

                    if (ls[i].backlog != nls[n].backlog) { //  listen          ,    listen      
                        nls[n].listen = 1;
                    }


#if (NGX_HAVE_REUSEPORT)
                    if (nls[n].reuseport && !ls[i].reuseport) { //  reuseport  
                        nls[n].add_reuseport = 1;
                    }
#endif
                    break;
                }
            }

            if (nls[n].fd == (ngx_socket_t) -1) {
                nls[n].open = 1;
            }
        }
    } else { //  listening  ,        ip+port           
        ls = cycle->listening.elts;
        for (i = 0; i < cycle->listening.nelts; i++) {
            ls[i].open = 1;
        }
    }
    
    if (ngx_open_listening_sockets(cycle) != NGX_OK) { //       ,     bind+listen  
        goto failed;
    }
}

한 번 에 reuseport 를 설정 하면 후속 reload 작업 은 reuseport 를 설정 하지 않 았 더 라 도 대응 하 는 ip + port 는 reuseport 속성 을 가지 고 있 습 니 다. 그러나 설정 을 해석 하 는 것 은 ip + port 는 하나의 listening 구조 에 만 대응 하고 실제 적 으로 모든 worker 가 하나의 감청 소켓 을 공유 하 므 로 mutex 를 통 해 놀 라 움 을 피해 야 합 니 다.
6. 부 드 러 운 업그레이드 과정 에서 reuseport 의 사용
부 드 러 운 업 그 레이 드 는 nginx 실행 가능 한 파일 을 교체 하 는 데 사 용 됩 니 다. 오래된 master 가 신 호 를 받 아 부 드 러 운 업 그 레이 드 를 할 때 먼저 fork 새 프로 세 스, 새 프로 세 스 를 호출 한 다음 exec 를 호출 하여 프로 세 스 컨 텍스트 를 바 꾸 고 새로운 master 프로 세 스 를 얻 습 니 다. 오래된 master 의 모든 감청 소켓 은 환경 변 수 를 통 해 새로운 master 에 전 달 됩 니 다.
새로운 master 프로 세 스 가 시작 되면 main 함수 에 들 어가 면 main 은 ngx 를 호출 합 니 다.add_inherited_sockets 는 부모 프로 세 스 가 설정 한 'NGINX' 환경 변 수 를 가 져 옵 니 다.이 환경 변 수 는 부모 프로 세 스 의 감청 소켓 을 저장 하고 소켓 간 에 콜론 으로 나 누 어 마지막 으로 분 호 를 추가 합 니 다.
static ngx_int_t
ngx_add_inherited_sockets(ngx_cycle_t *cycle)
{
    u_char           *p, *v, *inherited;
    ngx_int_t         s;
    ngx_listening_t  *ls;

    inherited = (u_char *) getenv(NGINX_VAR);

    if (inherited == NULL) {
        return NGX_OK;
    }

    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0,
                  "using inherited sockets from \"%s\"", inherited);

    if (ngx_array_init(&cycle->listening, cycle->pool, 10,
                       sizeof(ngx_listening_t))
        != NGX_OK)
    {
        return NGX_ERROR;
    }

    for (p = inherited, v = p; *p; p++) {
        if (*p == ':' || *p == ';') {
            s = ngx_atoi(v, p - v); //        fd
            if (s == NGX_ERROR) {
                ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
                              "invalid socket number \"%s\" in " NGINX_VAR
                              " environment variable, ignoring the rest"
                              " of the variable", v);
                break;
            }

            v = p + 1;

            ls = ngx_array_push(&cycle->listening);  //    listening  
            if (ls == NULL) {
                return NGX_ERROR;
            }

            ngx_memzero(ls, sizeof(ngx_listening_t));

            ls->fd = (ngx_socket_t) s; //    fd   listening  
        }
    }

    if (v != p) {
        ngx_log_error(NGX_LOG_EMERG, cycle->log, 0,
                      "invalid socket number \"%s\" in " NGINX_VAR
                      " environment variable, ignoring", v);
    }

    ngx_inherited = 1;

    return ngx_set_inherited_sockets(cycle); //            ,            listening
}

ngx_int_t
ngx_set_inherited_sockets(ngx_cycle_t *cycle)
{
    size_t                     len;
    ngx_uint_t                 i;
    ngx_listening_t           *ls;
    socklen_t                  olen;

    ls = cycle->listening.elts;
    for (i = 0; i < cycle->listening.nelts; i++) {

	/*         ,    ,     reuseport    */

#if (NGX_HAVE_REUSEPORT)

        reuseport = 0;
        olen = sizeof(int);

        if (getsockopt(ls[i].fd, SOL_SOCKET, SO_REUSEPORT,
                       (void *) &reuseport, &olen) //  reuseport  
            == -1)
        {
            ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_socket_errno,
                          "getsockopt(SO_REUSEPORT) %V failed, ignored",
                          &ls[i].addr_text);

        } else {
            ls[i].reuseport = reuseport ? 1 : 0; //   reuseport     listening
        }

#endif

    }

    return NGX_OK;
}


부모 프로 세 스에 서 물 려 받 은 listening 은 oldlistening 전달 ngxinit_cycle,ngx_init_cycle 함수 에서 설정 을 다시 해석 하여 새로운 listening 배열 을 생 성 합 니 다. 새로운 listening 배열 은 old 에서listening 배열 에서 부모 프로 세 스 의 감청 소켓 을 가 져 옵 니 다. 이 부분 코드 는 생략 되 었 습 니 다.
다시 한 번 말씀 드 리 지만 nginx 자체 가 복잡 하기 때문에 필 자 는 잘못 이해 할 수 있 습 니 다. 지적 해 주 십시오. 대단히 감사합니다!

좋은 웹페이지 즐겨찾기