컨테이너 CNI 전체 판독 calico 구현(3)
40694 단어 Kubernetes네트워킹
func cmdAdd(args *skel.CmdArgs) error {
// Unmarshal the network config, and perform validation
conf := NetConf{}
if err := json.Unmarshal(args.StdinData, &conf); err != nil {
return fmt.Errorf("failed to load netconf: %v", err)
cniVersion := conf.CNIVersion
workload, orchestrator, err := GetIdentifiers(args)
if err != nil {
return err
logger := CreateContextLogger(workload)
// Allow the nodename to be overridden by the network config
updateNodename(conf, logger)
"Orchestrator": orchestrator,
"Node": nodename,
}).Info("Extracted identifiers")
logger.WithFields(log.Fields{"NetConfg": conf}).Info("Loaded CNI NetConf")
calicoClient, err := CreateClient(conf)
if err != nil {
return err
// Always check if there's an existing endpoint.
endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{
Node: nodename,
Orchestrator: orchestrator,
Workload: workload})
if err != nil {
return err
logger.Debugf("Retrieved endpoints: %v", endpoints)
var endpoint *api.WorkloadEndpoint
if len(endpoints.Items) == 1 {
endpoint = &endpoints.Items[0]
fmt.Fprintf(os.Stderr, "Calico CNI checking for existing endpoint: %v
", endpoint)
// Collect the result in this variable - this is ultimately what gets "returned" by this function by printing
// it to stdout.
var result *current.Result
// If running under Kubernetes then branch off into the kubernetes code, otherwise handle everything in this
// function.
if orchestrator == "k8s" {
if result, err = k8s.CmdAddK8s(args, conf, nodename, calicoClient, endpoint); err != nil {
return err
} else {
// Default CNI behavior - use the CNI network name as the Calico profile.
profileID := conf.Name
if endpoint != nil {
// There is an existing endpoint - no need to create another.
// This occurs when adding an existing container to a new CNI network
// Find the IP address from the endpoint and use that in the response.
// Don't create the veth or do any networking.
// Just update the profile on the endpoint. The profile will be created if needed during the
// profile processing step.
fmt.Fprintf(os.Stderr, "Calico CNI appending profile: %s
", profileID)
endpoint.Spec.Profiles = append(endpoint.Spec.Profiles, profileID)
result, err = CreateResultFromEndpoint(endpoint)
logger.WithField("result", result).Debug("Created result from endpoint")
if err != nil {
return err
} else {
// There's no existing endpoint, so we need to do the following:
// 1) Call the configured IPAM plugin to get IP address(es)
// 2) Configure the Calico endpoint
// 3) Create the veth, configuring it on both the host and container namespace.
// 1) Run the IPAM plugin and make sure there's an IP address returned.
logger.WithFields(log.Fields{"paths": os.Getenv("CNI_PATH"),
"type": conf.IPAM.Type}).Debug("Looking for IPAM plugin in paths")
ipamResult, err := ipam.ExecAdd(conf.IPAM.Type, args.StdinData)
logger.WithField("IPAM result", ipamResult).Info("Got result from IPAM plugin")
if err != nil {
return err
// Convert IPAM result into current Result.
// IPAM result has a bunch of fields that are optional for an IPAM plugin
// but required for a CNI plugin, so this is to populate those fields.
// See CNI Spec doc for more details.
result, err = current.NewResultFromResult(ipamResult)
if err != nil {
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
if len(result.IPs) == 0 {
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return goerrors.New("IPAM plugin returned missing IP config")
// Parse endpoint labels passed in by Mesos, and store in a map.
labels := map[string]string{}
for _, label := range conf.Args.Mesos.NetworkInfo.Labels.Labels {
labels[label.Key] = label.Value
// 2) Create the endpoint object
endpoint = api.NewWorkloadEndpoint()
endpoint.Metadata.Name = args.IfName
endpoint.Metadata.Node = nodename
endpoint.Metadata.Orchestrator = orchestrator
endpoint.Metadata.Workload = workload
endpoint.Metadata.Labels = labels
endpoint.Spec.Profiles = []string{profileID}
logger.WithField("endpoint", endpoint).Debug("Populated endpoint (without nets)")
if err = PopulateEndpointNets(endpoint, result); err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
logger.WithField("endpoint", endpoint).Info("Populated endpoint (with nets)")
fmt.Fprintf(os.Stderr, "Calico CNI using IPs: %s
", endpoint.Spec.IPNetworks)
// 3) Set up the veth
hostVethName, contVethMac, err := DoNetworking(args, conf, result, logger, "")
if err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
"HostVethName": hostVethName,
"ContainerVethMac": contVethMac,
}).Info("Networked namespace")
mac, err := net.ParseMAC(contVethMac)
if err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
endpoint.Spec.MAC = &cnet.MAC{HardwareAddr: mac}
endpoint.Spec.InterfaceName = hostVethName
// Write the endpoint object (either the newly created one, or the updated one with a new ProfileIDs).
if _, err := calicoClient.WorkloadEndpoints().Apply(endpoint); err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
logger.WithField("endpoint", endpoint).Info("Wrote endpoint to datastore")
// Handle profile creation - this is only done if there isn't a specific policy handler.
if conf.Policy.PolicyType == "" {
logger.Debug("Handling profiles")
// Start by checking if the profile already exists. If it already exists then there is no work to do.
// The CNI plugin never updates a profile.
exists := true
_, err = calicoClient.Profiles().Get(api.ProfileMetadata{Name: conf.Name})
if err != nil {
_, ok := err.(errors.ErrorResourceDoesNotExist)
if ok {
exists = false
} else {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
if !exists {
// The profile doesn't exist so needs to be created. The rules vary depending on whether k8s is being used.
// Under k8s (without full policy support) the rule is permissive and allows all traffic.
// Otherwise, incoming traffic is only allowed from profiles with the same tag.
fmt.Fprintf(os.Stderr, "Calico CNI creating profile: %s
", conf.Name)
var inboundRules []api.Rule
if orchestrator == "k8s" {
inboundRules = []api.Rule{{Action: "allow"}}
} else {
inboundRules = []api.Rule{{Action: "allow", Source: api.EntityRule{Tag: conf.Name}}}
profile := &api.Profile{
Metadata: api.ProfileMetadata{
Name: conf.Name,
Tags: []string{conf.Name},
Spec: api.ProfileSpec{
EgressRules: []api.Rule{{Action: "allow"}},
IngressRules: inboundRules,
logger.WithField("profile", profile).Info("Creating profile")
if _, err := calicoClient.Profiles().Create(profile); err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
// Set Gateway to nil. Calico-IPAM doesn't set it, but host-local does.
// We modify IPs subnet received from the IPAM plugin (host-local),
// so Gateway isn't valid anymore. It is also not used anywhere by Calico.
for _, ip := range result.IPs {
ip.Gateway = nil
// Print result to stdout, in the format defined by the requested cniVersion.
return types.PrintResult(result, cniVersion)
창설은bridge와 비슷하지만 여기 코드가 좀 길고 한 걸음 한 걸음 해석되며calico는 표준 cni의 실현을 실현했다. 물론kubernetes 아래에서도 모든 cni의 용기 관리 플랫폼에서 실행할 수 있다.먼저 k8s 관련 실현 k8s를 보십시오.CmdAddK8s,
func CmdAddK8s(args *skel.CmdArgs, conf utils.NetConf, nodename string, calicoClient *calicoclient.Client, endpoint *api.WorkloadEndpoint) (*current.Result, error) {
var err error
var result *current.Result
k8sArgs := utils.K8sArgs{}
err = types.LoadArgs(args.Args, &k8sArgs)
if err != nil {
return nil, err
workload, orchestrator, err := utils.GetIdentifiers(args)
if err != nil {
return nil, err
logger := utils.CreateContextLogger(workload)
"Orchestrator": orchestrator,
"Node": nodename,
}).Info("Extracted identifiers for CmdAddK8s")
if endpoint != nil {
// This happens when Docker or the node restarts. K8s calls CNI with the same parameters as before.
// Do the networking (since the network namespace was destroyed and recreated).
// There's an existing endpoint - no need to create another. Find the IP address from the endpoint
// and use that in the response.
result, err = utils.CreateResultFromEndpoint(endpoint)
if err != nil {
return nil, err
logger.WithField("result", result).Debug("Created result from existing endpoint")
// If any labels changed whilst the container was being restarted, they will be picked up by the policy
// controller so there's no need to update the labels here.
} else {
client, err := newK8sClient(conf, logger)
if err != nil {
return nil, err
logger.WithField("client", client).Debug("Created Kubernetes client")
if conf.IPAM.Type == "host-local" && strings.EqualFold(conf.IPAM.Subnet, "usePodCidr") {
// We've been told to use the "host-local" IPAM plugin with the Kubernetes podCidr for this node.
// Replace the actual value in the args.StdinData as that's what's passed to the IPAM plugin.
fmt.Fprintf(os.Stderr, "Calico CNI fetching podCidr from Kubernetes
var stdinData map[string]interface{}
if err := json.Unmarshal(args.StdinData, &stdinData); err != nil {
return nil, err
podCidr, err := getPodCidr(client, conf, nodename)
if err != nil {
return nil, err
logger.WithField("podCidr", podCidr).Info("Fetched podCidr")
stdinData["ipam"].(map[string]interface{})["subnet"] = podCidr
fmt.Fprintf(os.Stderr, "Calico CNI passing podCidr to host-local IPAM: %s
", podCidr)
args.StdinData, err = json.Marshal(stdinData)
if err != nil {
return nil, err
logger.WithField("stdin", string(args.StdinData)).Debug("Updated stdin data")
labels := make(map[string]string)
annot := make(map[string]string)
// Only attempt to fetch the labels and annotations from Kubernetes
// if the policy type has been set to "k8s". This allows users to
// run the plugin under Kubernetes without needing it to access the
// Kubernetes API
if conf.Policy.PolicyType == "k8s" {
var err error
labels, annot, err = getK8sLabelsAnnotations(client, k8sArgs)
if err != nil {
return nil, err
logger.WithField("labels", labels).Debug("Fetched K8s labels")
logger.WithField("annotations", annot).Debug("Fetched K8s annotations")
// Check for calico IPAM specific annotations and set them if needed.
if conf.IPAM.Type == "calico-ipam" {
v4pools := annot["cni.projectcalico.org/ipv4pools"]
v6pools := annot["cni.projectcalico.org/ipv6pools"]
if len(v4pools) != 0 || len(v6pools) != 0 {
var stdinData map[string]interface{}
if err := json.Unmarshal(args.StdinData, &stdinData); err != nil {
return nil, err
var v4PoolSlice, v6PoolSlice []string
if len(v4pools) > 0 {
if err := json.Unmarshal([]byte(v4pools), &v4PoolSlice); err != nil {
logger.WithField("IPv4Pool", v4pools).Error("Error parsing IPv4 IPPools")
return nil, err
if _, ok := stdinData["ipam"].(map[string]interface{}); !ok {
logger.Fatal("Error asserting stdinData type")
stdinData["ipam"].(map[string]interface{})["ipv4_pools"] = v4PoolSlice
logger.WithField("ipv4_pools", v4pools).Debug("Setting IPv4 Pools")
if len(v6pools) > 0 {
if err := json.Unmarshal([]byte(v6pools), &v6PoolSlice); err != nil {
logger.WithField("IPv6Pool", v6pools).Error("Error parsing IPv6 IPPools")
return nil, err
if _, ok := stdinData["ipam"].(map[string]interface{}); !ok {
logger.Fatal("Error asserting stdinData type")
stdinData["ipam"].(map[string]interface{})["ipv6_pools"] = v6PoolSlice
logger.WithField("ipv6_pools", v6pools).Debug("Setting IPv6 Pools")
newData, err := json.Marshal(stdinData)
if err != nil {
logger.WithField("stdinData", stdinData).Error("Error Marshaling data")
return nil, err
args.StdinData = newData
logger.WithField("stdin", string(args.StdinData)).Debug("Updated stdin data")
ipAddrsNoIpam := annot["cni.projectcalico.org/ipAddrsNoIpam"]
ipAddrs := annot["cni.projectcalico.org/ipAddrs"]
// switch based on which annotations are passed or not passed.
switch {
case ipAddrs == "" && ipAddrsNoIpam == "":
// Call IPAM plugin if ipAddrsNoIpam or ipAddrs annotation is not present.
logger.Debugf("Calling IPAM plugin %s", conf.IPAM.Type)
ipamResult, err := ipam.ExecAdd(conf.IPAM.Type, args.StdinData)
if err != nil {
return nil, err
logger.Debugf("IPAM plugin returned: %+v", ipamResult)
// Convert IPAM result into current Result.
// IPAM result has a bunch of fields that are optional for an IPAM plugin
// but required for a CNI plugin, so this is to populate those fields.
// See CNI Spec doc for more details.
result, err = current.NewResultFromResult(ipamResult)
if err != nil {
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
if len(result.IPs) == 0 {
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, errors.New("IPAM plugin returned missing IP config")
case ipAddrs != "" && ipAddrsNoIpam != "":
// Can't have both ipAddrs and ipAddrsNoIpam annotations at the same time.
e := fmt.Errorf("Can't have both annotations: 'ipAddrs' and 'ipAddrsNoIpam' in use at the same time")
return nil, e
case ipAddrsNoIpam != "":
// ipAddrsNoIpam annotation is set so bypass IPAM, and set the IPs manually.
overriddenResult, err := overrideIPAMResult(ipAddrsNoIpam, logger)
if err != nil {
return nil, err
logger.Debugf("Bypassing IPAM to set the result to: %+v", overriddenResult)
// Convert overridden IPAM result into current Result.
// This method fill in all the empty fields necessory for CNI output according to spec.
result, err = current.NewResultFromResult(overriddenResult)
if err != nil {
return nil, err
if len(result.IPs) == 0 {
return nil, errors.New("Failed to build result")
case ipAddrs != "":
// When ipAddrs annotation is set, we call out to the configured IPAM plugin
// requesting the specific IP addresses included in the annotation.
result, err = ipAddrsResult(ipAddrs, conf, args, logger)
if err != nil {
return nil, err
logger.Debugf("IPAM result set to: %+v", result)
// Create the endpoint object and configure it.
endpoint = api.NewWorkloadEndpoint()
endpoint.Metadata.Name = args.IfName
endpoint.Metadata.Node = nodename
endpoint.Metadata.ActiveInstanceID = args.ContainerID
endpoint.Metadata.Orchestrator = orchestrator
endpoint.Metadata.Workload = workload
endpoint.Metadata.Labels = labels
// Set the profileID according to whether Kubernetes policy is required.
// If it's not, then just use the network name (which is the normal behavior)
// otherwise use one based on the Kubernetes pod's Namespace.
if conf.Policy.PolicyType == "k8s" {
endpoint.Spec.Profiles = []string{fmt.Sprintf("k8s_ns.%s", k8sArgs.K8S_POD_NAMESPACE)}
} else {
endpoint.Spec.Profiles = []string{conf.Name}
// Populate the endpoint with the output from the IPAM plugin.
if err = utils.PopulateEndpointNets(endpoint, result); err != nil {
// Cleanup IP allocation and return the error.
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
logger.WithField("endpoint", endpoint).Info("Populated endpoint")
fmt.Fprintf(os.Stderr, "Calico CNI using IPs: %s
", endpoint.Spec.IPNetworks)
// Whether the endpoint existed or not, the veth needs (re)creating.
hostVethName := k8sbackend.VethNameForWorkload(workload)
_, contVethMac, err := utils.DoNetworking(args, conf, result, logger, hostVethName)
if err != nil {
// Cleanup IP allocation and return the error.
logger.Errorf("Error setting up networking: %s", err)
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
mac, err := net.ParseMAC(contVethMac)
if err != nil {
// Cleanup IP allocation and return the error.
logger.Errorf("Error parsing MAC (%s): %s", contVethMac, err)
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
endpoint.Spec.MAC = &cnet.MAC{HardwareAddr: mac}
endpoint.Spec.InterfaceName = hostVethName
logger.WithField("endpoint", endpoint).Info("Added Mac and interface name to endpoint")
// Write the endpoint object (either the newly created one, or the updated one)
if _, err := calicoClient.WorkloadEndpoints().Apply(endpoint); err != nil {
// Cleanup IP allocation and return the error.
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
logger.Info("Wrote updated endpoint to datastore")
return result, nil
1. 먼저 newK8sClient를 통해kubernetes client를 만듭니다.conf.IPAM을 판단합니다.Type ipam의 네트워크 구현은 물론 calico 자체가 하나의 calico-ipam의 네트워크 관리 실현을 갖추고 있으며host-local 네트워크를 사용하여usePodCidr를 설정하면 k8s를 통해podCidr를 얻을 수 있다. 이 안에는 k8s에 node의podCidr를 설정해야 한다.
2. 네트워크 정책의 사용 여부를 판독한다.이 안에 k8sapi를 통해pod의 annotation을 가져오고calico/k8sns, 관련 공간의 label
3. 다음은calico-ipam의 주소 분배에 들어갑니다. 여기서 k8s의 여부와 지정한 IP 주소를 판단해야 합니다. 설정이 없으면 ipamResult,err:=ipam을 호출합니다.ExecAdd(conf.IPAM.Type,args.StdinData)에서 IP를 할당합니다.만약 ipAddrs!=""설명은 IP를 지정하면 ipam 분배가 필요하지 않습니다. ipam 주소가 되돌아오는 주소result를 차지했음을 직접 알려 줍니다.err = ipAddrsResult(ipAddrs,conf,args,logger),k8s는 ippool 내의 IP 주소를 설정하는 것 외에 IPAM이 관리하지 않는 ip주소를 스스로 정의하는 ipAddrs NoIpam도 지원합니다!"",이건 아이패드가 필요 없어요.
4. 네트워크 카드를 만듭니다. 이것은bridge의 실현과 같습니다.utils를 통해.DoNetworking은 이 함수에서netlink를 통과합니다.LinkAdd에서veth에 대한 네트워크 카드를 만들고 호스트 쪽에서cali가 개통되며, 다음 11위는 용기의 id로 시작합니다.그 다음에 카드를 용기에 삽입하여 IP와 루트를 설정합니다. 이것은bridge와 같습니다. 구체적인 코드는 다음과 같습니다.
if err = ip.AddDefaultRoute(gw, contVeth); err != nil {
return fmt.Errorf("failed to add route %v", err)
if err = netlink.AddrAdd(contVeth, &netlink.Addr{IPNet: &addr.Address}); err != nil {
return fmt.Errorf("failed to add IP addr to %q: %v", contVethName, err)
그리고 호스트 루트를 설정했다. 바로 목표 IP를 설정하면 용기의 데이터가 모두 용기에 전달된다. 호스트 측에 있는 카드는cali-xxxxxx 그 카드이다.
5. 그리고 endpoint를 만들 수 있습니다. 먼저 각종 파라미터를 조립한 다음calicoClient를 통해WorkloadEndpoints().이 endpoint를 만듭니다.하나의calico의endpoint는 하나의 네트워크 포인트를 대표하는데 인터넷 카드의 별명으로 간단하게 이해할 수 있다. 바로kubernetes에pod가 있지만 하나의pod에는endpoint가 여러 개 있는 것과 같이 결합이 풀린다.calico의 endpoint에는 다음과 같은 metadata 정보가 포함되어 있습니다.
endpoint.Metadata.Name = args.IfName
endpoint.Metadata.Node = nodename
endpoint.Metadata.ActiveInstanceID = args.ContainerID
endpoint.Metadata.Orchestrator = orchestrator
endpoint.Metadata.Workload = workload
endpoint.Metadata.Labels = labels
인터넷 카드 정보도 있어요.
endpoint.Spec.MAC = &cnet.MAC{HardwareAddr: mac}
endpoint.Spec.InterfaceName = hostVethName
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Kubernetes 배포 master 서비스1. 서버 초기화 1 방화벽 닫기 [모든 마스터 실행] 2 selinux 닫기 [모든 마스터 실행] 3 호스트 이름 구성 [모든 마스터 실행] hostnamectl set-hostname 호스트 이름 4 구성 이름 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.