kubernetes 소스 코드 분석의 ingress (3)

14184 단어 Kubernetes
실전 을 통 해 사용자 도 ingress 가 어떻게 사용 되 는 지 대충 알 고 있 습 니 다. 다음은 내부 구조 상의 그림 에서 nginx ingress contrller 의 배치 구조 도 를 보 여 주 었 습 니 다. ingress contrller 는 돌아 가면 서 apiserver 를 감청 하 는 방식 으로 ingress 자원 의 변 화 를 얻 고 ingress 자원 을 로 컬 캐 시 에 저장 합 니 다.nginx 에 해당 하 는 설정 수정 로드 를 알 립 니 다.ingress contrller 는 ingress, service, endpoint, secret, node, configmap 의 일련의 자원 을 감시 하고 자원 에 변화 가 생기 면 (증가, 삭제 와 수정 포함) 즉시 backend, 예 를 들 어 nginx 등 을 알 립 니 다.apiserver 에 대한 요청 횟수 를 줄 이기 위해 nginx controllder 는 매번 요청 을 로 컬 에 캐 시 합 니 다. 이 캐 시 import 는 kubernetes 가 제공 하 는 'k8s. io / kubernetes / pkg / client / cache' 아래 코드 에 들 어가 서 서비스 시작 코드 를 먼저 봅 니 다. controllers / nginx / pkg / cmd / controller / main. go 는 설정 을 어떻게 업데이트 하 는 지 자세히 봅 니 다.ingress controller 에 있 는 listwatch 부터 볼 게 요.
ic.ingLister.Store, ic.ingController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Extensions().RESTClient(), "ingresses", ic.cfg.Namespace, fields.Everything()),
        &extensions.Ingress{}, ic.cfg.ResyncPeriod, ingEventHandler)

    ic.endpLister.Store, ic.endpController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "endpoints", ic.cfg.Namespace, fields.Everything()),
        &api.Endpoints{}, ic.cfg.ResyncPeriod, eventHandler)

    ic.secrLister.Store, ic.secrController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "secrets", api.NamespaceAll, fields.Everything()),
        &api.Secret{}, ic.cfg.ResyncPeriod, secrEventHandler)

    ic.mapLister.Store, ic.mapController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "configmaps", api.NamespaceAll, fields.Everything()),
        &api.ConfigMap{}, ic.cfg.ResyncPeriod, mapEventHandler)

    ic.svcLister.Indexer, ic.svcController = cache.NewIndexerInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "services", ic.cfg.Namespace, fields.Everything()),
        &api.Service{},
        ic.cfg.ResyncPeriod,
        cache.ResourceEventHandlerFuncs{},
        cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})

    ic.nodeLister.Store, ic.nodeController = cache.NewInformer(
        cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "nodes", api.NamespaceAll, fields.Everything()),
        &api.Node{}, ic.cfg.ResyncPeriod, eventHandler)

모든 자원 감청 에는 자신의 이벤트 handler 가 있 습 니 다. 예 를 들 어...
    eventHandler := cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            ic.syncQueue.Enqueue(obj)
        },
        DeleteFunc: func(obj interface{}) {
            ic.syncQueue.Enqueue(obj)
        },
        UpdateFunc: func(old, cur interface{}) {
            if !reflect.DeepEqual(old, cur) {
                ic.syncQueue.Enqueue(cur)
            }
        },
    }

감청 한 메 시 지 를 syncQueue 에 넣 습 니 다. 이 syncQueue 의 실현 은 core / pkg / task / quue. go 에서 안에 있 는 sync 방법 으로 대기 열 에 있 는 요소 key 를 처리 합 니 다. 이것 은 contrller manager 의 디자인 과 일맥상통 합 니 다.
// worker processes work in the queue through sync.
func (t *Queue) worker() {
    for {
        key, quit := t.queue.Get()
        if quit {
            close(t.workerDone)
            return
        }
        glog.V(3).Infof("syncing %v", key)
        if err := t.sync(key); err != nil {
            glog.Warningf("requeuing %v, err %v", key, err)
            t.queue.AddRateLimited(key)
        } else {
            t.queue.Forget(key)
        }

        t.queue.Done(key)
    }
}

다음은 모든 이 벤트 를 어떻게 처리 하 는 지,
func (ic *GenericController) sync(key interface{}) error {
    ic.syncRateLimiter.Accept()

    if ic.syncQueue.IsShuttingDown() {
        return nil
    }

    if !ic.controllersInSync() {
        time.Sleep(podStoreSyncedPollPeriod)
        return fmt.Errorf("deferring sync till endpoints controller has synced")
    }

    upstreams, servers := ic.getBackendServers()
    var passUpstreams []*ingress.SSLPassthroughBackend
    for _, server := range servers {
        if !server.SSLPassthrough {
            continue
        }

        for _, loc := range server.Locations {
            if loc.Path != rootLocation {
                continue
            }
            passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{
                Backend:  loc.Backend,
                Hostname: server.Hostname,
            })
            break
        }
    }

    data, err := ic.cfg.Backend.OnUpdate(ingress.Configuration{
        Backends:            upstreams,
        Servers:             servers,
        TCPEndpoints:        ic.getStreamServices(ic.cfg.TCPConfigMapName, api.ProtocolTCP),
        UDPEndpoints:        ic.getStreamServices(ic.cfg.UDPConfigMapName, api.ProtocolUDP),
        PassthroughBackends: passUpstreams,
    })
    if err != nil {
        return err
    }

    out, reloaded, err := ic.cfg.Backend.Reload(data)
    if err != nil {
        incReloadErrorCount()
        glog.Errorf("unexpected failure restarting the backend: 
%v"
, string(out)) return err } if reloaded { glog.Infof("ingress backend successfully reloaded...") incReloadCount() } return nil }

이 방법 은 nginx 의 다양한 설정 을 구성 한 다음 에 OnUpdate 를 통 해 설정 을 업데이트 하고 마지막 으로 Reload 를 통 해 설정 을 업데이트 합 니 다.먼저 nginx 가 OnUpdate 에 대한 실현 방법 을 보고 (controllers / nginx / pkg / cmd / controller / nginx. go) 키 코드 를 캡 처 합 니 다.
content, err := n.t.Write(config.TemplateConfig{
        ProxySetHeaders:     setHeaders,
        MaxOpenFiles:        maxOpenFiles,
        BacklogSize:         sysctlSomaxconn(),
        Backends:            ingressCfg.Backends,
        PassthroughBackends: ingressCfg.PassthroughBackends,
        Servers:             ingressCfg.Servers,
        TCPBackends:         ingressCfg.TCPEndpoints,
        UDPBackends:         ingressCfg.UDPEndpoints,
        HealthzURI:          ngxHealthPath,
        CustomErrors:        len(cfg.CustomHTTPErrors) > 0,
        Cfg:                 cfg,
    })

이렇게 하면 template 템 플 릿 요 소 를 교체 하고 업 데 이 트 된 템 플 릿 으로 돌아 간 다음 nginx 에 게 설정 을 업데이트 하 라 고 알 립 니 다.
func (n NGINXController) Reload(data []byte) ([]byte, bool, error) {
    if !n.isReloadRequired(data) {
        return []byte("Reload not required"), false, nil
    }

    err := ioutil.WriteFile(cfgPath, data, 0644)
    if err != nil {
        return nil, false, err
    }

    o, e := exec.Command(n.binary, "-s", "reload").CombinedOutput()

    return o, true, e
}

isReloadRequired 는 파일 이 동일 한 지 확인 하고 업데이트 가 필요 한 지 확인 합 니 다.Write File 덮어 쓰기 프로필 이 필요 하 다 면.nginx - s reload 를 통 해 설정 을 업데이트 합 니 다. ok.nginx 의 설정 업데이트 가 끝 났 습 니 다.그럼 전체 절차 가 끝나 면

좋은 웹페이지 즐겨찾기