nginx upstream 사용 및 소스 코드 분석



nginx upstream 메커니즘 으로 인해 nginx 는 역방향 프 록 시 서버 가 될 수 있 습 니 다. nginx 는 한편, 하류 클 라 이언 트 로부터 http 요청 을 받 고 요청 을 처리 하 며 요청 에 따라 tcp 메 시 지 를 상류 서버 로 보 내 고 상류 서버 의 반환 메시지 에 따라 하류 클 라 이언 트 에 응답 메 시 지 를 보 냅 니 다.upstream 메커니즘 도 부하 분담 기능 을 제공 하여 클 러 스 터 서버 의 한 서버 에 부하 분담 을 요청 할 수 있 습 니 다.
2.1upstream 의 절차 소개
1. 클 라 이언 트 요청 메 시 지 를 분석 하고 상류 서버 에 보 내 는 요청 메 시 지 를 구축한다.2 호출 ngxhttp_upstream_init 는 상위 서버 와 tcp 연결 을 시작 합 니 다.  3. 첫 번 째 단계 에서 구 성 된 요청 메 시 지 를 보 냅 니 다.4 상류 서버 로부터 응답 헤드 를 받 아 분석 하여 하류 로 전송 한다.5 상위 서버 에서 온 해당 체 를 받 아 리 트 윗 한다.이 5 단계 에서 upstream 체 제 는 개발 자 스스로 해당 하 는 처리 방식 을 설정 하여 자신의 목적 을 달성 하도록 허용 하 는 것 도 개발 자 들 이 upstream 을 사용 하 는 방식 이다.2.2upstream 의 개발 자 들 이 upstream 체 제 를 사용 할 때 주로 위의 5 단계 의 처리 반전 함 수 를 설정 합 니 다.http 역방향 대 리 를 예 로 들 면:
ngx_http_proxy_handler(ngx_http_request_t *r)
{
   	:
  	:
	:
	//  http proxy    upstream       
	//             
    u->create_request = ngx_http_proxy_create_request;
	//        ,       
	u->reinit_request = ngx_http_proxy_reinit_request;
	//                 
    u->process_header = ngx_http_proxy_process_status_line;
	//     
    u->abort_request = ngx_http_proxy_abort_request;
	//           
    u->finalize_request = ngx_http_proxy_finalize_request;

	//  upstream buffer   , 0 ,       ,
	//            , 1 ,   buffer,  
	//             
    u->buffering = plcf->upstream.buffering;

	// buffering 1      pipe  ,       ,       buffer         
    u->pipe = ngx_pcalloc(r->pool, sizeof(ngx_event_pipe_t));
    if (u->pipe == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    u->pipe->input_filter = ngx_event_pipe_copy_input_filter;

    u->accel = 1;
	//        ,     ,    ngx_http_upstream_init,
	//  upstream   
    rc = ngx_http_read_client_request_body(r, ngx_http_upstream_init);
	:
    :
    return NGX_DONE;
}

2.3upstream 소스 코드 분석 (1.0.15 버 전)
2.3.1 상류 서버 로 보 내 는 요청 을 구축 하고 상류 서버 와 의 연결 을 구축한다.
주요 함 수 는 ngx http upstream init request 입 니 다. 이 함 수 는 사용자 가 등록 한 요청 구축 함 수 를 호출 하여 상류 서버 로 보 내 는 요청 을 구축 하 는 동시에 상류 서버 와 의 연결 을 구축 합 니 다. 먼저 두 개의 보조 함 수 를 소개 합 니 다.
ngx http upstream rd check broken connection: 이 함 수 는 nginx 와 클 라 이언 트 간 의 링크 가 사용 가능 한 지 확인 하 는 데 사 용 됩 니 다.
ngx http upstream connect: 이 함 수 는 상위 서버 와 연결 하 는 데 사 용 됩 니 다.
2.3.2 상류 로 요청 발송 
   상위 서버 와 의 연결 이 구축 되면 상위 서버 에 요청 을 보 냅 니 다. 주요 함 수 는 ngx http upstream send request 입 니 다.
static void
ngx_http_upstream_check_broken_connection(ngx_http_request_t *r,
    ngx_event_t *ev)
{
    int                  n;
    char                 buf[1];
    ngx_err_t            err;
    ngx_int_t            event;
    ngx_connection_t     *c;
    ngx_http_upstream_t  *u;

    c = r->connection;
    u = r->upstream;

	//        , recv     0,MSG_PEEK    
	//    ,             ,     1
	//   ,            
    n = recv(c->fd, buf, 1, MSG_PEEK);

    err = ngx_socket_errno;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ev->log, err,
                   "http upstream recv(): %d", n);
	//ev->write           ,        ,
	//      NGX_eagain,        
    if (ev->write && (n >= 0 || err == NGX_EAGAIN)) {
        return;
    }

    if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && ev->active) {

        event = ev->write ? NGX_WRITE_EVENT : NGX_READ_EVENT;

        if (ngx_del_event(ev, event, 0) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
    }
	//   socket    ,        
    if (n > 0) {
        return;
    }

	//    -1,    NGX_EAGAIN,  recv      
    if (n == -1) {
        if (err == NGX_EAGAIN) {
            return;
        }
		//          
        ev->error = 1;

    } else { //n=0,          
        err = 0;
    }
	//        ,       
    ev->eof = 1;
    c->error = 1;

    if (!u->cacheable && u->peer.connection) {
        ngx_log_error(NGX_LOG_INFO, ev->log, err,
                      "client prematurely closed connection, "
                      "so upstream connection is closed too");
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_CLIENT_CLOSED_REQUEST);
        return;
    }

    ngx_log_error(NGX_LOG_INFO, ev->log, err,
                  "client prematurely closed connection");

    if (u->peer.connection == NULL) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_CLIENT_CLOSED_REQUEST);
    }
}
2 ngx_http_upstream_connect

static void
ngx_http_upstream_connect(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;
    ngx_time_t        *tp;
    ngx_connection_t  *c;

    r->connection->log->action = "connecting to upstream";

    r->connection->single_connection = 0;
    //              
    if (u->state && u->state->response_sec) {
        tp = ngx_timeofday();
        u->state->response_sec = tp->sec - u->state->response_sec;
        u->state->response_msec = tp->msec - u->state->response_msec;
    }

    u->state = ngx_array_push(r->upstream_states);
    if (u->state == NULL) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t));
	
    //              
    tp = ngx_timeofday();
    u->state->response_sec = tp->sec;
    u->state->response_msec = tp->msec;

	//         
    rc = ngx_event_connect_peer(&u->peer);
	//printf("@@@@####rc is %d
", (int)rc); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "http upstream connect: %i", rc); if (rc == NGX_ERROR) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->state->peer = u->peer.name; // busy declined , ngx_http_upstream_next, // connect , , // ngx_http_upstream_finalize_request if (rc == NGX_BUSY) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no live upstreams"); ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_NOLIVE); return; } if (rc == NGX_DECLINED) { ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR); return; } /* rc == NGX_OK || rc == NGX_AGAIN */ c = u->peer.connection; c->data = r; // //ngx_http_upstream_handler c->write->handler = ngx_http_upstream_handler; c->read->handler = ngx_http_upstream_handler; //ngx_http_upstream_handler u->write_event_handler read_event_handler u->write_event_handler = ngx_http_upstream_send_request_handler; u->read_event_handler = ngx_http_upstream_process_header; c->sendfile &= r->connection->sendfile; u->output.sendfile = c->sendfile; c->pool = r->pool; c->log = r->connection->log; c->read->log = c->log; c->write->log = c->log; /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */ u->writer.out = NULL; u->writer.last = &u->writer.out; u->writer.connection = c; u->writer.limit = 0; if (u->request_sent) { if (ngx_http_upstream_reinit(r, u) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } if (r->request_body && r->request_body->buf && r->request_body->temp_file && r == r->main) { /* * the r->request_body->buf can be reused for one request only, * the subrequests should allocate their own temporay bufs */ u->output.free = ngx_alloc_chain_link(r->pool); if (u->output.free == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } u->output.free->buf = r->request_body->buf; u->output.free->next = NULL; u->output.allocated = 1; r->request_body->buf->pos = r->request_body->buf->start; r->request_body->buf->last = r->request_body->buf->start; r->request_body->buf->tag = u->output.tag; } u->request_sent = 0; // , , // if (rc == NGX_AGAIN) { ngx_add_timer(c->write, u->conf->connect_timeout); return; } #if (NGX_HTTP_SSL) if (u->ssl && c->ssl == NULL) { ngx_http_upstream_ssl_init_connection(r, u, c); return; } #endif // , ngx_http_upstream_send_request(r, u); } 3 ngx_http_upstream_init_request static void ngx_http_upstream_init_request(ngx_http_request_t *r) { ngx_str_t *host; ngx_uint_t i; ngx_resolver_ctx_t *ctx, temp; ngx_http_cleanup_t *cln; ngx_http_upstream_t *u; ngx_http_core_loc_conf_t *clcf; ngx_http_upstream_srv_conf_t *uscf, **uscfp; ngx_http_upstream_main_conf_t *umcf; if (r->aio) { return; } u = r->upstream; u->store = (u->conf->store || u->conf->store_lengths); //ignore_client_abort 0 nginx if (!u->store && !r->post_action && !u->conf->ignore_client_abort) { r->read_event_handler = ngx_http_upstream_rd_check_broken_connection; r->write_event_handler = ngx_http_upstream_wr_check_broken_connection; } // ,request_bufs create_request if (r->request_body) { u->request_bufs = r->request_body->bufs; } // create_request if (u->create_request(r) != NGX_OK) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //u->conf->local u->peer.local = u->conf->local; // http core loc clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); // upstream , //buf u->output.alignment = clcf->directio_alignment; u->output.pool = r->pool; u->output.bufs.num = 1; u->output.bufs.size = clcf->client_body_buffer_size; // u->output.output_filter = ngx_chain_writer; u->output.filter_ctx = &u->writer; u->writer.pool = r->pool; if (r->upstream_states == NULL) { r->upstream_states = ngx_array_create(r->pool, 1, sizeof(ngx_http_upstream_state_t)); if (r->upstream_states == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } } else { u->state = ngx_array_push(r->upstream_states); if (u->state == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_memzero(u->state, sizeof(ngx_http_upstream_state_t)); } // ngx_http_upstream_cleanup request cleanup , // request , cln = ngx_http_cleanup_add(r, 0); if (cln == NULL) { ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } //ngx_http_upstream_cleanup resolve , ngx_http_upstream_finalize cln->handler = ngx_http_upstream_cleanup; cln->data = r; u->cleanup = &cln->handler; //u->resolved , // , , // if (u->resolved == NULL) { uscf = u->conf->upstream; } else { //upstream , ok if (u->resolved->sockaddr) { if (ngx_http_upstream_create_round_robin_peer(r, u->resolved) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } ngx_http_upstream_connect(r, u); return; } // host upstream host = &u->resolved->host; umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module); uscfp = umcf->upstreams.elts; // upstream , upstream for (i = 0; i < umcf->upstreams.nelts; i++) { uscf = uscfp[i]; if (uscf->host.len == host->len && ((uscf->port == 0 && u->resolved->no_port) || uscf->port == u->resolved->port) && ngx_memcmp(uscf->host.data, host->data, host->len) == 0) { goto found; } } if (u->resolved->port == 0) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no port in upstream \"%V\"", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } temp.name = *host; // ctx = ngx_resolve_start(clcf->resolver, &temp); if (ctx == NULL) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } if (ctx == NGX_NO_RESOLVER) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "no resolver defined to resolve %V", host); ngx_http_upstream_finalize_request(r, u, NGX_HTTP_BAD_GATEWAY); return; } //ngx_http_upstream_resolve_handler ctx->name = *host; ctx->type = NGX_RESOLVE_A; ctx->handler = ngx_http_upstream_resolve_handler; ctx->data = r; ctx->timeout = clcf->resolver_timeout; u->resolved->ctx = ctx; if (ngx_resolve_name(ctx) != NGX_OK) { u->resolved->ctx = NULL; ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } return; } found: //peer.init() upstream , //for example:us->peer.init = ngx_http_upstream_init_ip_hash_peer; if (uscf->peer.init(r, uscf) != NGX_OK) { ngx_http_upstream_finalize_request(r, u, NGX_HTTP_INTERNAL_SERVER_ERROR); return; } // ngx_http_upstream_connect(r, u); }

2.3.3 상류 서버 가 되 돌아 오 는 응답 헤드 처리
상위 서버 에 요청 을 보 낸 후 서버 의 응답 을 기 다 려 야 합 니 다. 먼저 서버 가 보 낸 응답 헤드 를 처리 합 니 다. 처리 함 수 는 ngx http upstream process header 입 니 다.
static void
ngx_http_upstream_send_request(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_int_t          rc;
    ngx_connection_t  *c;

	//peer.connection  nginx         connection
    c = u->peer.connection;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "http upstream send request");
    
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    c->log->action = "sending request to upstream";
	//  ngx_output_chain            ,request_sent
	//              ,     ,    
	//      OK ,            u->output  
    rc = ngx_output_chain(&u->output, u->request_sent ? NULL : u->request_bufs);
    //  request_sent  ,         
    u->request_sent = 1;

    if (rc == NGX_ERROR) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }
	//               ,   ,    
	//     
    if (c->write->timer_set) {
        ngx_del_timer(c->write);
    }
	//NGX_AGAIN          ,           
	//        ,        。
	if (rc == NGX_AGAIN) {
        ngx_add_timer(c->write, u->conf->send_timeout);
		//                
        if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        return;
    }

    /* rc == NGX_OK */

    if (c->tcp_nopush == NGX_TCP_NOPUSH_SET) {
        if (ngx_tcp_push(c->fd) == NGX_ERROR) {
            ngx_log_error(NGX_LOG_CRIT, c->log, ngx_socket_errno,
                          ngx_tcp_push_n " failed");
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

        c->tcp_nopush = NGX_TCP_NOPUSH_UNSET;
    }
	//      ,          
    ngx_add_timer(c->read, u->conf->read_timeout);

#if 1
	//       ,       ready
    if (c->read->ready) {

        /* post aio operation */

        /*
         * TODO comment
         * although we can post aio operation just in the end
         * of ngx_http_upstream_connect() CHECK IT !!!
         * it's better to do here because we postpone header buffer allocation
         */
		//     ready ,        
        ngx_http_upstream_process_header(r, u);
        return;
    }
#endif
	//          dummy  ,         ,   
	//    
    u->write_event_handler = ngx_http_upstream_dummy_handler;

    if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }
}

2.3.4 처리 응답 패키지
되 돌아 오 는 응답 헤드 를 처리 하면 응답 패 키 지 를 처리 해 야 합 니 다. 응답 패 키 지 를 처리 하 는 것 이 비교적 복잡 합 니 다. 서브 요청 을 처리 할 때 응답 패 키 지 를 전달 하지 않 고 처리 하면 됩 니 다. upstream 모드 에서 받 은 요청 을 전달 해 야 합 니 다. 이때 하류 네트워크 속도 우선 과 상류 네트워크 속도 우선 두 가지 가 있 습 니 다. 하류 네트워크 속도 가 우선 이 고 하류 네트워크 속도 가 상류 보다 빠르다 고 가정 하면 분 류 됩 니 다.고정 크기 의 buffer 버퍼 를 설치 하여 데 이 터 를 수신 하 는 동시에 퍼 가기 합 니 다. 상류 네트워크 속도 가 우선 이 고 상류 네트워크 속도 가 하류 보다 빠르다 고 가정 하기 때문에 여러 개의 buffer 버퍼 를 사용 하여 데 이 터 를 캐 시 하 는 동시에 필요 할 때 임시 파일 을 사용 하여 받 은 데 이 터 를 캐 시 합 니 다.
ngx http upstream process body in memory: 이 함 수 는 하위 요청 상황 을 처리 하고 응답 패 키 지 를 전달 하지 않 습 니 다.
ngx http upstream send response: 이 함 수 는 퍼 가기 응답 패키지 의 상황 을 처리 합 니 다. 이 함 수 는 퍼 가기 응답 헤드 와 응답 체 를 전달 합 니 다. 퍼 가기 응답 체 는 상류 네트워크 속도 우선 과 하류 네트워크 속도 우선 두 가지 상황 을 동시에 고려 합 니 다.
static void
ngx_http_upstream_process_header(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ssize_t            n;
    ngx_int_t          rc;
    ngx_connection_t  *c;

    c = u->peer.connection;
    c->log->action = "reading response header from upstream";
    //     
    if (c->read->timedout) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }
	//   upstream       
    if (!u->request_sent && ngx_http_upstream_test_connect(c) != NGX_OK) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }
	//           
    if (u->buffer.start == NULL) {
		//       
        u->buffer.start = ngx_palloc(r->pool, u->conf->buffer_size);
        if (u->buffer.start == NULL) {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }
		//            
        u->buffer.pos = u->buffer.start;
        u->buffer.last = u->buffer.start;
        u->buffer.end = u->buffer.start + u->conf->buffer_size;
        u->buffer.temporary = 1;

        u->buffer.tag = u->output.tag;
		//          ngx_list  ,        
		//       
        if (ngx_list_init(&u->headers_in.headers, r->pool, 8,
                          sizeof(ngx_table_elt_t))
            != NGX_OK)
        {
            ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
            return;
        }

    }

	//       !!!      
    for ( ;; ) {

        n = c->recv(c, u->buffer.last, u->buffer.end - u->buffer.last);
		//        
        if (n == NGX_AGAIN) {

            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
                ngx_http_upstream_finalize_request(r, u,
                                               NGX_HTTP_INTERNAL_SERVER_ERROR);
                return;
            }

            return;
        }
		//    0,  upstream        
        if (n == 0) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "upstream prematurely closed connection");
        }
	
        if (n == NGX_ERROR || n == 0) {
            ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
            return;
        }

        u->buffer.last += n;

		//          
        rc = u->process_header(r);
		//         	
        if (rc == NGX_AGAIN) {
			//buffer    ,           
            if (u->buffer.last == u->buffer.end) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "upstream sent too big header");

                ngx_http_upstream_next(r, u,
                                       NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
                return;
            }

            continue;
        }

        break;
    }
	//         ,     
    if (rc == NGX_HTTP_UPSTREAM_INVALID_HEADER) {
        ngx_http_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_INVALID_HEADER);
        return;
    }

    if (rc == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }

    /* rc == NGX_OK */
	//      ,       300
    if (u->headers_in.status_n > NGX_HTTP_SPECIAL_RESPONSE) {
        if (r->subrequest_in_memory) {
            u->buffer.last = u->buffer.pos;
        }

        if (ngx_http_upstream_test_next(r, u) == NGX_OK) {
            return;
        }
        //       300     ,  404  ,
        //     
        if (ngx_http_upstream_intercept_errors(r, u) == NGX_OK) {
            return;
        }
		
    }
	// u->headers_in          , u->headers_in  
	//        r->headers_out  ,     
    if (ngx_http_upstream_process_headers(r, u) != NGX_OK) {
        return;
    }
	//     ,       
    if (!r->subrequest_in_memory) {
		//     ,      ,      
        ngx_http_upstream_send_response(r, u);
        return;
    }

    /* subrequest content in memory */
	//           ,     input_filter    ,
	//    input_filter   ngx_http_upstream_non_buffered_filter,
	//         
    if (u->input_filter == NULL) {
        u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
        u->input_filter = ngx_http_upstream_non_buffered_filter;
        u->input_filter_ctx = r;
    }

    if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
        ngx_http_upstream_finalize_request(r, u,
                                           NGX_HTTP_INTERNAL_SERVER_ERROR);
        return;
    }
	//buffer.last buffer.pos        
    n = u->buffer.last - u->buffer.pos;
	//                
    if (n) {
        u->buffer.last -= n;

        u->state->response_length += n;

        if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }

		//            ,       
        if (u->length == 0) {
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }
    }
	//           
    u->read_event_handler = ngx_http_upstream_process_body_in_memory;
	//       u->input_filter         ,
	//            ,     ,    
	//       
    ngx_http_upstream_process_body_in_memory(r, u);

위 에서 보 듯 이 ngx http upstream send response 에서 bufferring 의 태그 에 따라 패키지 의 퍼 가기 에 대해 서로 다른 처 리 를 했 습 니 다.
1. 패 킷 메 시 지 를 받 기 위해 고정 buffer 를 사용 합 니 다. 상류 메 시 지 를 읽 기 위해 함수 ngx http upstrea m process non buffered upstream 을 사용 합 니 다. 아래 로 내 려 가서 메 시 지 를 쓸 때 함수 ngx http upstre am process non buffered downstream 을 사용 합 니 다. 실제로 이 두 함 수 는 마지막 으로 ngx http upstrea m process non buffered request 를 호출 하여 읽 기와 쓰기 방향 을 제어 합 니 다.。
2. 여러 개의 buffer 와 임시 파일 을 사용 하여 패키지 메 시 지 를 받 습 니 다. 상류 메 시 지 를 읽 을 때 ngx http upstream pr ocess upstream 을 사용 하고, 아래 에 메 시 지 를 쓸 때 ngx http upstream process downstream 을 사용 합 니 다. 두 함 수 는 마지막 으로 ngx event pipe 를 호출 하여 하나의 매개 변수 표 시 를 통 해 읽 기와 쓰기 방향 을 제어 합 니 다.
1ngx_http_upstream_process_body_in_memory
static void
ngx_http_upstream_process_body_in_memory(ngx_http_request_t *r,
    ngx_http_upstream_t *u)
{
    size_t             size;
    ssize_t            n;
    ngx_buf_t         *b;
    ngx_event_t       *rev;
    ngx_connection_t  *c;
	//c nginx upstream            
    c = u->peer.connection;
    rev = c->read;

	//     ,  upstream
    if (rev->timedout) {
        return;
    }
	//u->buffer         
    b = &u->buffer;
    for ( ;; ) {
        size = b->end - b->last;
        if (size == 0) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }
		//       u->buffer 
        n = c->recv(c, b->last, size);
		//       ,       
        if (n == NGX_AGAIN) {
            break;
        }
		//                 
        if (n == 0 || n == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, n);
            return;
        }
		//response_length           
        u->state->response_length += n;
		//           ,           ,
		//     ,     ngx_http_upstream_non_buffered_filter, 
		//         buffer        
        if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
            return;
        }
		//     ready  
        if (!rev->ready) {
            break;
        }
    }
	//     
    if (ngx_handle_read_event(rev, 0) != NGX_OK) {
        ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
        return;
    }

    if (rev->active) {
        ngx_add_timer(rev, u->conf->read_timeout);

    } else if (rev->timer_set) {
        ngx_del_timer(rev);
    }
}

2 ngx_http_upstream_send_response
	                   ,                 。
static void
ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    int                        tcp_nodelay;
    ssize_t                    n;
    ngx_int_t                  rc;
    ngx_event_pipe_t          *p;
    ngx_connection_t          *c;
    ngx_http_core_loc_conf_t  *clcf;

	//      ,       request headers_in  ,
	//   ,           ,   ,   
	//     request out       ,       
	//      
    rc = ngx_http_send_header(r);

    if (rc == NGX_ERROR || rc > NGX_OK || r->post_action) {
        ngx_http_upstream_finalize_request(r, u, rc);
        return;
    }
	//c     nginx       
    c = r->connection;

    if (r->header_only) {

        if (u->cacheable || u->store) {

            if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) {
                ngx_connection_error(c, ngx_socket_errno,
                                     ngx_shutdown_socket_n " failed");
            }

            r->read_event_handler = ngx_http_request_empty_handler;
            r->write_event_handler = ngx_http_request_empty_handler;
            c->error = 1;

        } else {
            ngx_http_upstream_finalize_request(r, u, rc);
            return;
        }
    }
	// header_sent  ,           
    u->header_sent = 1;
	//       ,              ,
	//            ,      ,OK,
	//  ,         ,      
    if (r->request_body && r->request_body->temp_file) {
        ngx_pool_run_cleanup_file(r->pool, r->request_body->temp_file->file.fd);
        r->request_body->temp_file->file.fd = NGX_INVALID_FILE;
    }
	//  http core  loc    
    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
	//u->buffering 0        ,           
	//         
    if (!u->buffering) {
		//   input_filter  ,         ,input_filter  
		// buffer             ,         
		//     buffer  out  ,         
        if (u->input_filter == NULL) {
			//    
            u->input_filter_init = ngx_http_upstream_non_buffered_filter_init;
			//      buffer        ,     ngx_buf  
			//    
			u->input_filter = ngx_http_upstream_non_buffered_filter;
            u->input_filter_ctx = r;
        }
		//  upstream          
        u->read_event_handler = ngx_http_upstream_process_non_buffered_upstream;
		//  request          
		r->write_event_handler =
                             ngx_http_upstream_process_non_buffered_downstream;

        r->limit_rate = 0;
		//  input_filter       
        if (u->input_filter_init(u->input_filter_ctx) == NGX_ERROR) {
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }

        if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
            ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");

            tcp_nodelay = 1;

            if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
                               (const void *) &tcp_nodelay, sizeof(int)) == -1)
            {
                ngx_connection_error(c, ngx_socket_errno,
                                     "setsockopt(TCP_NODELAY) failed");
                ngx_http_upstream_finalize_request(r, u, 0);
                return;
            }

            c->tcp_nodelay = NGX_TCP_NODELAY_SET;
        }
		//buffer.last buffer.pos            
        n = u->buffer.last - u->buffer.pos;
		//n>0,  buffer           
        if (n) {
			//      last    input_filter    ,  
			//    
            u->buffer.last = u->buffer.pos;
			//          n
            u->state->response_length += n;
			// input_filter           
            if (u->input_filter(u->input_filter_ctx, n) == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, 0);
                return;
            }
			//     ,              ,
			//              
            ngx_http_upstream_process_non_buffered_downstream(r);

        } else {
         	// buffer       ,      ,   
         	//         
            u->buffer.pos = u->buffer.start;
            u->buffer.last = u->buffer.start;

            if (ngx_http_send_special(r, NGX_HTTP_FLUSH) == NGX_ERROR) {
                ngx_http_upstream_finalize_request(r, u, 0);
                return;
            }
			//         ,      ,     
			//               。
            if (u->peer.connection->read->ready) {
                ngx_http_upstream_process_non_buffered_upstream(r, u);
            }
        }

        return;
    }

    /* TODO: preallocate event_pipe bufs, look "Content-Length" */

	//      buffer 1   ,     nginx     buffer
	//       ,           
    p = u->pipe;
	// pipe       ,               
	//              	
	p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
    p->output_ctx = r;
    p->tag = u->output.tag;
	//             
    p->bufs = u->conf->bufs;
	//  busy               
    p->busy_size = u->conf->busy_buffers_size;
    p->upstream = u->peer.connection;
    p->downstream = c;
    p->pool = r->pool;
    p->log = c->log;

    p->cacheable = u->cacheable || u->store;

    p->temp_file = ngx_pcalloc(r->pool, sizeof(ngx_temp_file_t));
    if (p->temp_file == NULL) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }
	//              ,fd 
    p->temp_file->file.fd = NGX_INVALID_FILE;
    p->temp_file->file.log = c->log;
    p->temp_file->path = u->conf->temp_path;
    p->temp_file->pool = r->pool;

    if (p->cacheable) {
        p->temp_file->persistent = 1;

    } else {
        p->temp_file->log_level = NGX_LOG_WARN;
        p->temp_file->warn = "an upstream response is buffered "
                             "to a temporary file";
    }
	//                
    p->max_temp_file_size = u->conf->max_temp_file_size;
    p->temp_file_write_size = u->conf->temp_file_write_size;

    p->preread_bufs = ngx_alloc_chain_link(r->pool);
    if (p->preread_bufs == NULL) {
        ngx_http_upstream_finalize_request(r, u, 0);
        return;
    }
	//          ,           
	//         
    p->preread_bufs->buf = &u->buffer;
    p->preread_bufs->next = NULL;
    u->buffer.recycled = 1;

    p->preread_size = u->buffer.last - u->buffer.pos;

    if (u->cacheable) {

        p->buf_to_file = ngx_calloc_buf(r->pool);
        if (p->buf_to_file == NULL) {
            ngx_http_upstream_finalize_request(r, u, 0);
            return;
        }

        p->buf_to_file->pos = u->buffer.start;
        p->buf_to_file->last = u->buffer.pos;
        p->buf_to_file->temporary = 1;
    }

    if (ngx_event_flags & NGX_USE_AIO_EVENT) {
        /* the posted aio operation may corrupt a shadow buffer */
        p->single_buf = 1;
    }

    /* TODO: p->free_bufs = 0 if use ngx_create_chain_of_bufs() */
    p->free_bufs = 1;
    u->buffer.last = u->buffer.pos;

    if (u->conf->cyclic_temp_file) {
        p->cyclic_temp_file = 1;
        c->sendfile = 0;

    } else {
        p->cyclic_temp_file = 0;
    }
	//    ,       ,        
    p->read_timeout = u->conf->read_timeout;
    p->send_timeout = clcf->send_timeout;
    p->send_lowat = clcf->send_lowat;
	//  upstreadm        ,  request        
    u->read_event_handler = ngx_http_upstream_process_upstream;
    r->write_event_handler = ngx_http_upstream_process_downstream;
	//      ,          
    ngx_http_upstream_process_upstream(r, u);
}

좋은 웹페이지 즐겨찾기