컨테이너 CNI 전체 판독 calico 구현(3)

40694 단어 Kubernetes네트워킹
전편에서bridge의 실현을 소개했는데, 여기서calico의 실현을 소개한다.전편과 같은 구조로 먼저dd를 보고del을 보십시오. 구체적으로 카드를 추가하는 코드는 다음과 같습니다.
func cmdAdd(args *skel.CmdArgs) error {
    // Unmarshal the network config, and perform validation
    conf := NetConf{}
    if err := json.Unmarshal(args.StdinData, &conf); err != nil {
        return fmt.Errorf("failed to load netconf: %v", err)
    }

    cniVersion := conf.CNIVersion

    ConfigureLogging(conf.LogLevel)

    workload, orchestrator, err := GetIdentifiers(args)
    if err != nil {
        return err
    }

    logger := CreateContextLogger(workload)

    // Allow the nodename to be overridden by the network config
    updateNodename(conf, logger)

    logger.WithFields(log.Fields{
        "Orchestrator": orchestrator,
        "Node":         nodename,
    }).Info("Extracted identifiers")

    logger.WithFields(log.Fields{"NetConfg": conf}).Info("Loaded CNI NetConf")
    calicoClient, err := CreateClient(conf)
    if err != nil {
        return err
    }

    // Always check if there's an existing endpoint.
    endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{
        Node:         nodename,
        Orchestrator: orchestrator,
        Workload:     workload})
    if err != nil {
        return err
    }

    logger.Debugf("Retrieved endpoints: %v", endpoints)

    var endpoint *api.WorkloadEndpoint
    if len(endpoints.Items) == 1 {
        endpoint = &endpoints.Items[0]
    }

    fmt.Fprintf(os.Stderr, "Calico CNI checking for existing endpoint: %v
"
, endpoint) // Collect the result in this variable - this is ultimately what gets "returned" by this function by printing // it to stdout. var result *current.Result // If running under Kubernetes then branch off into the kubernetes code, otherwise handle everything in this // function. if orchestrator == "k8s" { if result, err = k8s.CmdAddK8s(args, conf, nodename, calicoClient, endpoint); err != nil { return err } } else { // Default CNI behavior - use the CNI network name as the Calico profile. profileID := conf.Name if endpoint != nil { // There is an existing endpoint - no need to create another. // This occurs when adding an existing container to a new CNI network // Find the IP address from the endpoint and use that in the response. // Don't create the veth or do any networking. // Just update the profile on the endpoint. The profile will be created if needed during the // profile processing step. fmt.Fprintf(os.Stderr, "Calico CNI appending profile: %s
"
, profileID) endpoint.Spec.Profiles = append(endpoint.Spec.Profiles, profileID) result, err = CreateResultFromEndpoint(endpoint) logger.WithField("result", result).Debug("Created result from endpoint") if err != nil { return err } } else { // There's no existing endpoint, so we need to do the following: // 1) Call the configured IPAM plugin to get IP address(es) // 2) Configure the Calico endpoint // 3) Create the veth, configuring it on both the host and container namespace. // 1) Run the IPAM plugin and make sure there's an IP address returned. logger.WithFields(log.Fields{"paths": os.Getenv("CNI_PATH"), "type": conf.IPAM.Type}).Debug("Looking for IPAM plugin in paths") ipamResult, err := ipam.ExecAdd(conf.IPAM.Type, args.StdinData) logger.WithField("IPAM result", ipamResult).Info("Got result from IPAM plugin") if err != nil { return err } // Convert IPAM result into current Result. // IPAM result has a bunch of fields that are optional for an IPAM plugin // but required for a CNI plugin, so this is to populate those fields. // See CNI Spec doc for more details. result, err = current.NewResultFromResult(ipamResult) if err != nil { ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return err } if len(result.IPs) == 0 { ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return goerrors.New("IPAM plugin returned missing IP config") } // Parse endpoint labels passed in by Mesos, and store in a map. labels := map[string]string{} for _, label := range conf.Args.Mesos.NetworkInfo.Labels.Labels { labels[label.Key] = label.Value } // 2) Create the endpoint object endpoint = api.NewWorkloadEndpoint() endpoint.Metadata.Name = args.IfName endpoint.Metadata.Node = nodename endpoint.Metadata.Orchestrator = orchestrator endpoint.Metadata.Workload = workload endpoint.Metadata.Labels = labels endpoint.Spec.Profiles = []string{profileID} logger.WithField("endpoint", endpoint).Debug("Populated endpoint (without nets)") if err = PopulateEndpointNets(endpoint, result); err != nil { // Cleanup IP allocation and return the error. ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return err } logger.WithField("endpoint", endpoint).Info("Populated endpoint (with nets)") fmt.Fprintf(os.Stderr, "Calico CNI using IPs: %s
"
, endpoint.Spec.IPNetworks) // 3) Set up the veth hostVethName, contVethMac, err := DoNetworking(args, conf, result, logger, "") if err != nil { // Cleanup IP allocation and return the error. ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return err } logger.WithFields(log.Fields{ "HostVethName": hostVethName, "ContainerVethMac": contVethMac, }).Info("Networked namespace") mac, err := net.ParseMAC(contVethMac) if err != nil { // Cleanup IP allocation and return the error. ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return err } endpoint.Spec.MAC = &cnet.MAC{HardwareAddr: mac} endpoint.Spec.InterfaceName = hostVethName } // Write the endpoint object (either the newly created one, or the updated one with a new ProfileIDs). if _, err := calicoClient.WorkloadEndpoints().Apply(endpoint); err != nil { // Cleanup IP allocation and return the error. ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return err } logger.WithField("endpoint", endpoint).Info("Wrote endpoint to datastore") } // Handle profile creation - this is only done if there isn't a specific policy handler. if conf.Policy.PolicyType == "" { logger.Debug("Handling profiles") // Start by checking if the profile already exists. If it already exists then there is no work to do. // The CNI plugin never updates a profile. exists := true _, err = calicoClient.Profiles().Get(api.ProfileMetadata{Name: conf.Name}) if err != nil { _, ok := err.(errors.ErrorResourceDoesNotExist) if ok { exists = false } else { // Cleanup IP allocation and return the error. ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return err } } if !exists { // The profile doesn't exist so needs to be created. The rules vary depending on whether k8s is being used. // Under k8s (without full policy support) the rule is permissive and allows all traffic. // Otherwise, incoming traffic is only allowed from profiles with the same tag. fmt.Fprintf(os.Stderr, "Calico CNI creating profile: %s
"
, conf.Name) var inboundRules []api.Rule if orchestrator == "k8s" { inboundRules = []api.Rule{{Action: "allow"}} } else { inboundRules = []api.Rule{{Action: "allow", Source: api.EntityRule{Tag: conf.Name}}} } profile := &api.Profile{ Metadata: api.ProfileMetadata{ Name: conf.Name, Tags: []string{conf.Name}, }, Spec: api.ProfileSpec{ EgressRules: []api.Rule{{Action: "allow"}}, IngressRules: inboundRules, }, } logger.WithField("profile", profile).Info("Creating profile") if _, err := calicoClient.Profiles().Create(profile); err != nil { // Cleanup IP allocation and return the error. ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return err } } } // Set Gateway to nil. Calico-IPAM doesn't set it, but host-local does. // We modify IPs subnet received from the IPAM plugin (host-local), // so Gateway isn't valid anymore. It is also not used anywhere by Calico. for _, ip := range result.IPs { ip.Gateway = nil } // Print result to stdout, in the format defined by the requested cniVersion. return types.PrintResult(result, cniVersion) }

창설은bridge와 비슷하지만 여기 코드가 좀 길고 한 걸음 한 걸음 해석되며calico는 표준 cni의 실현을 실현했다. 물론kubernetes 아래에서도 모든 cni의 용기 관리 플랫폼에서 실행할 수 있다.먼저 k8s 관련 실현 k8s를 보십시오.CmdAddK8s,
func CmdAddK8s(args *skel.CmdArgs, conf utils.NetConf, nodename string, calicoClient *calicoclient.Client, endpoint *api.WorkloadEndpoint) (*current.Result, error) {
    var err error
    var result *current.Result

    k8sArgs := utils.K8sArgs{}
    err = types.LoadArgs(args.Args, &k8sArgs)
    if err != nil {
        return nil, err
    }

    utils.ConfigureLogging(conf.LogLevel)

    workload, orchestrator, err := utils.GetIdentifiers(args)
    if err != nil {
        return nil, err
    }
    logger := utils.CreateContextLogger(workload)
    logger.WithFields(log.Fields{
        "Orchestrator": orchestrator,
        "Node":         nodename,
    }).Info("Extracted identifiers for CmdAddK8s")

    if endpoint != nil {
        // This happens when Docker or the node restarts. K8s calls CNI with the same parameters as before.
        // Do the networking (since the network namespace was destroyed and recreated).
        // There's an existing endpoint - no need to create another. Find the IP address from the endpoint
        // and use that in the response.
        result, err = utils.CreateResultFromEndpoint(endpoint)
        if err != nil {
            return nil, err
        }
        logger.WithField("result", result).Debug("Created result from existing endpoint")
        // If any labels changed whilst the container was being restarted, they will be picked up by the policy
        // controller so there's no need to update the labels here.
    } else {
        client, err := newK8sClient(conf, logger)
        if err != nil {
            return nil, err
        }
        logger.WithField("client", client).Debug("Created Kubernetes client")

        if conf.IPAM.Type == "host-local" && strings.EqualFold(conf.IPAM.Subnet, "usePodCidr") {
            // We've been told to use the "host-local" IPAM plugin with the Kubernetes podCidr for this node.
            // Replace the actual value in the args.StdinData as that's what's passed to the IPAM plugin.
            fmt.Fprintf(os.Stderr, "Calico CNI fetching podCidr from Kubernetes
"
) var stdinData map[string]interface{} if err := json.Unmarshal(args.StdinData, &stdinData); err != nil { return nil, err } podCidr, err := getPodCidr(client, conf, nodename) if err != nil { return nil, err } logger.WithField("podCidr", podCidr).Info("Fetched podCidr") stdinData["ipam"].(map[string]interface{})["subnet"] = podCidr fmt.Fprintf(os.Stderr, "Calico CNI passing podCidr to host-local IPAM: %s
"
, podCidr) args.StdinData, err = json.Marshal(stdinData) if err != nil { return nil, err } logger.WithField("stdin", string(args.StdinData)).Debug("Updated stdin data") } labels := make(map[string]string) annot := make(map[string]string) // Only attempt to fetch the labels and annotations from Kubernetes // if the policy type has been set to "k8s". This allows users to // run the plugin under Kubernetes without needing it to access the // Kubernetes API if conf.Policy.PolicyType == "k8s" { var err error labels, annot, err = getK8sLabelsAnnotations(client, k8sArgs) if err != nil { return nil, err } logger.WithField("labels", labels).Debug("Fetched K8s labels") logger.WithField("annotations", annot).Debug("Fetched K8s annotations") // Check for calico IPAM specific annotations and set them if needed. if conf.IPAM.Type == "calico-ipam" { v4pools := annot["cni.projectcalico.org/ipv4pools"] v6pools := annot["cni.projectcalico.org/ipv6pools"] if len(v4pools) != 0 || len(v6pools) != 0 { var stdinData map[string]interface{} if err := json.Unmarshal(args.StdinData, &stdinData); err != nil { return nil, err } var v4PoolSlice, v6PoolSlice []string if len(v4pools) > 0 { if err := json.Unmarshal([]byte(v4pools), &v4PoolSlice); err != nil { logger.WithField("IPv4Pool", v4pools).Error("Error parsing IPv4 IPPools") return nil, err } if _, ok := stdinData["ipam"].(map[string]interface{}); !ok { logger.Fatal("Error asserting stdinData type") os.Exit(0) } stdinData["ipam"].(map[string]interface{})["ipv4_pools"] = v4PoolSlice logger.WithField("ipv4_pools", v4pools).Debug("Setting IPv4 Pools") } if len(v6pools) > 0 { if err := json.Unmarshal([]byte(v6pools), &v6PoolSlice); err != nil { logger.WithField("IPv6Pool", v6pools).Error("Error parsing IPv6 IPPools") return nil, err } if _, ok := stdinData["ipam"].(map[string]interface{}); !ok { logger.Fatal("Error asserting stdinData type") os.Exit(0) } stdinData["ipam"].(map[string]interface{})["ipv6_pools"] = v6PoolSlice logger.WithField("ipv6_pools", v6pools).Debug("Setting IPv6 Pools") } newData, err := json.Marshal(stdinData) if err != nil { logger.WithField("stdinData", stdinData).Error("Error Marshaling data") return nil, err } args.StdinData = newData logger.WithField("stdin", string(args.StdinData)).Debug("Updated stdin data") } } } ipAddrsNoIpam := annot["cni.projectcalico.org/ipAddrsNoIpam"] ipAddrs := annot["cni.projectcalico.org/ipAddrs"] // switch based on which annotations are passed or not passed. switch { case ipAddrs == "" && ipAddrsNoIpam == "": // Call IPAM plugin if ipAddrsNoIpam or ipAddrs annotation is not present. logger.Debugf("Calling IPAM plugin %s", conf.IPAM.Type) ipamResult, err := ipam.ExecAdd(conf.IPAM.Type, args.StdinData) if err != nil { return nil, err } logger.Debugf("IPAM plugin returned: %+v", ipamResult) // Convert IPAM result into current Result. // IPAM result has a bunch of fields that are optional for an IPAM plugin // but required for a CNI plugin, so this is to populate those fields. // See CNI Spec doc for more details. result, err = current.NewResultFromResult(ipamResult) if err != nil { utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return nil, err } if len(result.IPs) == 0 { utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return nil, errors.New("IPAM plugin returned missing IP config") } case ipAddrs != "" && ipAddrsNoIpam != "": // Can't have both ipAddrs and ipAddrsNoIpam annotations at the same time. e := fmt.Errorf("Can't have both annotations: 'ipAddrs' and 'ipAddrsNoIpam' in use at the same time") logger.Error(e) return nil, e case ipAddrsNoIpam != "": // ipAddrsNoIpam annotation is set so bypass IPAM, and set the IPs manually. overriddenResult, err := overrideIPAMResult(ipAddrsNoIpam, logger) if err != nil { return nil, err } logger.Debugf("Bypassing IPAM to set the result to: %+v", overriddenResult) // Convert overridden IPAM result into current Result. // This method fill in all the empty fields necessory for CNI output according to spec. result, err = current.NewResultFromResult(overriddenResult) if err != nil { return nil, err } if len(result.IPs) == 0 { return nil, errors.New("Failed to build result") } case ipAddrs != "": // When ipAddrs annotation is set, we call out to the configured IPAM plugin // requesting the specific IP addresses included in the annotation. result, err = ipAddrsResult(ipAddrs, conf, args, logger) if err != nil { return nil, err } logger.Debugf("IPAM result set to: %+v", result) } // Create the endpoint object and configure it. endpoint = api.NewWorkloadEndpoint() endpoint.Metadata.Name = args.IfName endpoint.Metadata.Node = nodename endpoint.Metadata.ActiveInstanceID = args.ContainerID endpoint.Metadata.Orchestrator = orchestrator endpoint.Metadata.Workload = workload endpoint.Metadata.Labels = labels // Set the profileID according to whether Kubernetes policy is required. // If it's not, then just use the network name (which is the normal behavior) // otherwise use one based on the Kubernetes pod's Namespace. if conf.Policy.PolicyType == "k8s" { endpoint.Spec.Profiles = []string{fmt.Sprintf("k8s_ns.%s", k8sArgs.K8S_POD_NAMESPACE)} } else { endpoint.Spec.Profiles = []string{conf.Name} } // Populate the endpoint with the output from the IPAM plugin. if err = utils.PopulateEndpointNets(endpoint, result); err != nil { // Cleanup IP allocation and return the error. utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return nil, err } logger.WithField("endpoint", endpoint).Info("Populated endpoint") } fmt.Fprintf(os.Stderr, "Calico CNI using IPs: %s
"
, endpoint.Spec.IPNetworks) // Whether the endpoint existed or not, the veth needs (re)creating. hostVethName := k8sbackend.VethNameForWorkload(workload) _, contVethMac, err := utils.DoNetworking(args, conf, result, logger, hostVethName) if err != nil { // Cleanup IP allocation and return the error. logger.Errorf("Error setting up networking: %s", err) utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return nil, err } mac, err := net.ParseMAC(contVethMac) if err != nil { // Cleanup IP allocation and return the error. logger.Errorf("Error parsing MAC (%s): %s", contVethMac, err) utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return nil, err } endpoint.Spec.MAC = &cnet.MAC{HardwareAddr: mac} endpoint.Spec.InterfaceName = hostVethName logger.WithField("endpoint", endpoint).Info("Added Mac and interface name to endpoint") // Write the endpoint object (either the newly created one, or the updated one) if _, err := calicoClient.WorkloadEndpoints().Apply(endpoint); err != nil { // Cleanup IP allocation and return the error. utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData) return nil, err } logger.Info("Wrote updated endpoint to datastore") return result, nil }

1. 먼저 newK8sClient를 통해kubernetes client를 만듭니다.conf.IPAM을 판단합니다.Type ipam의 네트워크 구현은 물론 calico 자체가 하나의 calico-ipam의 네트워크 관리 실현을 갖추고 있으며host-local 네트워크를 사용하여usePodCidr를 설정하면 k8s를 통해podCidr를 얻을 수 있다. 이 안에는 k8s에 node의podCidr를 설정해야 한다.
2. 네트워크 정책의 사용 여부를 판독한다.이 안에 k8sapi를 통해pod의 annotation을 가져오고calico/k8sns, 관련 공간의 label
3. 다음은calico-ipam의 주소 분배에 들어갑니다. 여기서 k8s의 여부와 지정한 IP 주소를 판단해야 합니다. 설정이 없으면 ipamResult,err:=ipam을 호출합니다.ExecAdd(conf.IPAM.Type,args.StdinData)에서 IP를 할당합니다.만약 ipAddrs!=""설명은 IP를 지정하면 ipam 분배가 필요하지 않습니다. ipam 주소가 되돌아오는 주소result를 차지했음을 직접 알려 줍니다.err = ipAddrsResult(ipAddrs,conf,args,logger),k8s는 ippool 내의 IP 주소를 설정하는 것 외에 IPAM이 관리하지 않는 ip주소를 스스로 정의하는 ipAddrs NoIpam도 지원합니다!"",이건 아이패드가 필요 없어요.
4. 네트워크 카드를 만듭니다. 이것은bridge의 실현과 같습니다.utils를 통해.DoNetworking은 이 함수에서netlink를 통과합니다.LinkAdd에서veth에 대한 네트워크 카드를 만들고 호스트 쪽에서cali가 개통되며, 다음 11위는 용기의 id로 시작합니다.그 다음에 카드를 용기에 삽입하여 IP와 루트를 설정합니다. 이것은bridge와 같습니다. 구체적인 코드는 다음과 같습니다.

if err = ip.AddDefaultRoute(gw, contVeth); err != nil {
                    return fmt.Errorf("failed to add route %v", err)
                }

if err = netlink.AddrAdd(contVeth, &netlink.Addr{IPNet: &addr.Address}); err != nil {
                    return fmt.Errorf("failed to add IP addr to %q: %v", contVethName, err)
                }

그리고 호스트 루트를 설정했다. 바로 목표 IP를 설정하면 용기의 데이터가 모두 용기에 전달된다. 호스트 측에 있는 카드는cali-xxxxxx 그 카드이다.
5. 그리고 endpoint를 만들 수 있습니다. 먼저 각종 파라미터를 조립한 다음calicoClient를 통해WorkloadEndpoints().이 endpoint를 만듭니다.하나의calico의endpoint는 하나의 네트워크 포인트를 대표하는데 인터넷 카드의 별명으로 간단하게 이해할 수 있다. 바로kubernetes에pod가 있지만 하나의pod에는endpoint가 여러 개 있는 것과 같이 결합이 풀린다.calico의 endpoint에는 다음과 같은 metadata 정보가 포함되어 있습니다.
        endpoint.Metadata.Name = args.IfName
        endpoint.Metadata.Node = nodename
        endpoint.Metadata.ActiveInstanceID = args.ContainerID
        endpoint.Metadata.Orchestrator = orchestrator
        endpoint.Metadata.Workload = workload
        endpoint.Metadata.Labels = labels

인터넷 카드 정보도 있어요.
endpoint.Spec.MAC = &cnet.MAC{HardwareAddr: mac}
endpoint.Spec.InterfaceName = hostVethName

좋은 웹페이지 즐겨찾기