컨테이너 CNI 전체 판독 calico 구현(3)
40694 단어 Kubernetes네트워킹
func cmdAdd(args *skel.CmdArgs) error {
// Unmarshal the network config, and perform validation
conf := NetConf{}
if err := json.Unmarshal(args.StdinData, &conf); err != nil {
return fmt.Errorf("failed to load netconf: %v", err)
}
cniVersion := conf.CNIVersion
ConfigureLogging(conf.LogLevel)
workload, orchestrator, err := GetIdentifiers(args)
if err != nil {
return err
}
logger := CreateContextLogger(workload)
// Allow the nodename to be overridden by the network config
updateNodename(conf, logger)
logger.WithFields(log.Fields{
"Orchestrator": orchestrator,
"Node": nodename,
}).Info("Extracted identifiers")
logger.WithFields(log.Fields{"NetConfg": conf}).Info("Loaded CNI NetConf")
calicoClient, err := CreateClient(conf)
if err != nil {
return err
}
// Always check if there's an existing endpoint.
endpoints, err := calicoClient.WorkloadEndpoints().List(api.WorkloadEndpointMetadata{
Node: nodename,
Orchestrator: orchestrator,
Workload: workload})
if err != nil {
return err
}
logger.Debugf("Retrieved endpoints: %v", endpoints)
var endpoint *api.WorkloadEndpoint
if len(endpoints.Items) == 1 {
endpoint = &endpoints.Items[0]
}
fmt.Fprintf(os.Stderr, "Calico CNI checking for existing endpoint: %v
", endpoint)
// Collect the result in this variable - this is ultimately what gets "returned" by this function by printing
// it to stdout.
var result *current.Result
// If running under Kubernetes then branch off into the kubernetes code, otherwise handle everything in this
// function.
if orchestrator == "k8s" {
if result, err = k8s.CmdAddK8s(args, conf, nodename, calicoClient, endpoint); err != nil {
return err
}
} else {
// Default CNI behavior - use the CNI network name as the Calico profile.
profileID := conf.Name
if endpoint != nil {
// There is an existing endpoint - no need to create another.
// This occurs when adding an existing container to a new CNI network
// Find the IP address from the endpoint and use that in the response.
// Don't create the veth or do any networking.
// Just update the profile on the endpoint. The profile will be created if needed during the
// profile processing step.
fmt.Fprintf(os.Stderr, "Calico CNI appending profile: %s
", profileID)
endpoint.Spec.Profiles = append(endpoint.Spec.Profiles, profileID)
result, err = CreateResultFromEndpoint(endpoint)
logger.WithField("result", result).Debug("Created result from endpoint")
if err != nil {
return err
}
} else {
// There's no existing endpoint, so we need to do the following:
// 1) Call the configured IPAM plugin to get IP address(es)
// 2) Configure the Calico endpoint
// 3) Create the veth, configuring it on both the host and container namespace.
// 1) Run the IPAM plugin and make sure there's an IP address returned.
logger.WithFields(log.Fields{"paths": os.Getenv("CNI_PATH"),
"type": conf.IPAM.Type}).Debug("Looking for IPAM plugin in paths")
ipamResult, err := ipam.ExecAdd(conf.IPAM.Type, args.StdinData)
logger.WithField("IPAM result", ipamResult).Info("Got result from IPAM plugin")
if err != nil {
return err
}
// Convert IPAM result into current Result.
// IPAM result has a bunch of fields that are optional for an IPAM plugin
// but required for a CNI plugin, so this is to populate those fields.
// See CNI Spec doc for more details.
result, err = current.NewResultFromResult(ipamResult)
if err != nil {
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
}
if len(result.IPs) == 0 {
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return goerrors.New("IPAM plugin returned missing IP config")
}
// Parse endpoint labels passed in by Mesos, and store in a map.
labels := map[string]string{}
for _, label := range conf.Args.Mesos.NetworkInfo.Labels.Labels {
labels[label.Key] = label.Value
}
// 2) Create the endpoint object
endpoint = api.NewWorkloadEndpoint()
endpoint.Metadata.Name = args.IfName
endpoint.Metadata.Node = nodename
endpoint.Metadata.Orchestrator = orchestrator
endpoint.Metadata.Workload = workload
endpoint.Metadata.Labels = labels
endpoint.Spec.Profiles = []string{profileID}
logger.WithField("endpoint", endpoint).Debug("Populated endpoint (without nets)")
if err = PopulateEndpointNets(endpoint, result); err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
}
logger.WithField("endpoint", endpoint).Info("Populated endpoint (with nets)")
fmt.Fprintf(os.Stderr, "Calico CNI using IPs: %s
", endpoint.Spec.IPNetworks)
// 3) Set up the veth
hostVethName, contVethMac, err := DoNetworking(args, conf, result, logger, "")
if err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
}
logger.WithFields(log.Fields{
"HostVethName": hostVethName,
"ContainerVethMac": contVethMac,
}).Info("Networked namespace")
mac, err := net.ParseMAC(contVethMac)
if err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
}
endpoint.Spec.MAC = &cnet.MAC{HardwareAddr: mac}
endpoint.Spec.InterfaceName = hostVethName
}
// Write the endpoint object (either the newly created one, or the updated one with a new ProfileIDs).
if _, err := calicoClient.WorkloadEndpoints().Apply(endpoint); err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
}
logger.WithField("endpoint", endpoint).Info("Wrote endpoint to datastore")
}
// Handle profile creation - this is only done if there isn't a specific policy handler.
if conf.Policy.PolicyType == "" {
logger.Debug("Handling profiles")
// Start by checking if the profile already exists. If it already exists then there is no work to do.
// The CNI plugin never updates a profile.
exists := true
_, err = calicoClient.Profiles().Get(api.ProfileMetadata{Name: conf.Name})
if err != nil {
_, ok := err.(errors.ErrorResourceDoesNotExist)
if ok {
exists = false
} else {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
}
}
if !exists {
// The profile doesn't exist so needs to be created. The rules vary depending on whether k8s is being used.
// Under k8s (without full policy support) the rule is permissive and allows all traffic.
// Otherwise, incoming traffic is only allowed from profiles with the same tag.
fmt.Fprintf(os.Stderr, "Calico CNI creating profile: %s
", conf.Name)
var inboundRules []api.Rule
if orchestrator == "k8s" {
inboundRules = []api.Rule{{Action: "allow"}}
} else {
inboundRules = []api.Rule{{Action: "allow", Source: api.EntityRule{Tag: conf.Name}}}
}
profile := &api.Profile{
Metadata: api.ProfileMetadata{
Name: conf.Name,
Tags: []string{conf.Name},
},
Spec: api.ProfileSpec{
EgressRules: []api.Rule{{Action: "allow"}},
IngressRules: inboundRules,
},
}
logger.WithField("profile", profile).Info("Creating profile")
if _, err := calicoClient.Profiles().Create(profile); err != nil {
// Cleanup IP allocation and return the error.
ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return err
}
}
}
// Set Gateway to nil. Calico-IPAM doesn't set it, but host-local does.
// We modify IPs subnet received from the IPAM plugin (host-local),
// so Gateway isn't valid anymore. It is also not used anywhere by Calico.
for _, ip := range result.IPs {
ip.Gateway = nil
}
// Print result to stdout, in the format defined by the requested cniVersion.
return types.PrintResult(result, cniVersion)
}
창설은bridge와 비슷하지만 여기 코드가 좀 길고 한 걸음 한 걸음 해석되며calico는 표준 cni의 실현을 실현했다. 물론kubernetes 아래에서도 모든 cni의 용기 관리 플랫폼에서 실행할 수 있다.먼저 k8s 관련 실현 k8s를 보십시오.CmdAddK8s,
func CmdAddK8s(args *skel.CmdArgs, conf utils.NetConf, nodename string, calicoClient *calicoclient.Client, endpoint *api.WorkloadEndpoint) (*current.Result, error) {
var err error
var result *current.Result
k8sArgs := utils.K8sArgs{}
err = types.LoadArgs(args.Args, &k8sArgs)
if err != nil {
return nil, err
}
utils.ConfigureLogging(conf.LogLevel)
workload, orchestrator, err := utils.GetIdentifiers(args)
if err != nil {
return nil, err
}
logger := utils.CreateContextLogger(workload)
logger.WithFields(log.Fields{
"Orchestrator": orchestrator,
"Node": nodename,
}).Info("Extracted identifiers for CmdAddK8s")
if endpoint != nil {
// This happens when Docker or the node restarts. K8s calls CNI with the same parameters as before.
// Do the networking (since the network namespace was destroyed and recreated).
// There's an existing endpoint - no need to create another. Find the IP address from the endpoint
// and use that in the response.
result, err = utils.CreateResultFromEndpoint(endpoint)
if err != nil {
return nil, err
}
logger.WithField("result", result).Debug("Created result from existing endpoint")
// If any labels changed whilst the container was being restarted, they will be picked up by the policy
// controller so there's no need to update the labels here.
} else {
client, err := newK8sClient(conf, logger)
if err != nil {
return nil, err
}
logger.WithField("client", client).Debug("Created Kubernetes client")
if conf.IPAM.Type == "host-local" && strings.EqualFold(conf.IPAM.Subnet, "usePodCidr") {
// We've been told to use the "host-local" IPAM plugin with the Kubernetes podCidr for this node.
// Replace the actual value in the args.StdinData as that's what's passed to the IPAM plugin.
fmt.Fprintf(os.Stderr, "Calico CNI fetching podCidr from Kubernetes
")
var stdinData map[string]interface{}
if err := json.Unmarshal(args.StdinData, &stdinData); err != nil {
return nil, err
}
podCidr, err := getPodCidr(client, conf, nodename)
if err != nil {
return nil, err
}
logger.WithField("podCidr", podCidr).Info("Fetched podCidr")
stdinData["ipam"].(map[string]interface{})["subnet"] = podCidr
fmt.Fprintf(os.Stderr, "Calico CNI passing podCidr to host-local IPAM: %s
", podCidr)
args.StdinData, err = json.Marshal(stdinData)
if err != nil {
return nil, err
}
logger.WithField("stdin", string(args.StdinData)).Debug("Updated stdin data")
}
labels := make(map[string]string)
annot := make(map[string]string)
// Only attempt to fetch the labels and annotations from Kubernetes
// if the policy type has been set to "k8s". This allows users to
// run the plugin under Kubernetes without needing it to access the
// Kubernetes API
if conf.Policy.PolicyType == "k8s" {
var err error
labels, annot, err = getK8sLabelsAnnotations(client, k8sArgs)
if err != nil {
return nil, err
}
logger.WithField("labels", labels).Debug("Fetched K8s labels")
logger.WithField("annotations", annot).Debug("Fetched K8s annotations")
// Check for calico IPAM specific annotations and set them if needed.
if conf.IPAM.Type == "calico-ipam" {
v4pools := annot["cni.projectcalico.org/ipv4pools"]
v6pools := annot["cni.projectcalico.org/ipv6pools"]
if len(v4pools) != 0 || len(v6pools) != 0 {
var stdinData map[string]interface{}
if err := json.Unmarshal(args.StdinData, &stdinData); err != nil {
return nil, err
}
var v4PoolSlice, v6PoolSlice []string
if len(v4pools) > 0 {
if err := json.Unmarshal([]byte(v4pools), &v4PoolSlice); err != nil {
logger.WithField("IPv4Pool", v4pools).Error("Error parsing IPv4 IPPools")
return nil, err
}
if _, ok := stdinData["ipam"].(map[string]interface{}); !ok {
logger.Fatal("Error asserting stdinData type")
os.Exit(0)
}
stdinData["ipam"].(map[string]interface{})["ipv4_pools"] = v4PoolSlice
logger.WithField("ipv4_pools", v4pools).Debug("Setting IPv4 Pools")
}
if len(v6pools) > 0 {
if err := json.Unmarshal([]byte(v6pools), &v6PoolSlice); err != nil {
logger.WithField("IPv6Pool", v6pools).Error("Error parsing IPv6 IPPools")
return nil, err
}
if _, ok := stdinData["ipam"].(map[string]interface{}); !ok {
logger.Fatal("Error asserting stdinData type")
os.Exit(0)
}
stdinData["ipam"].(map[string]interface{})["ipv6_pools"] = v6PoolSlice
logger.WithField("ipv6_pools", v6pools).Debug("Setting IPv6 Pools")
}
newData, err := json.Marshal(stdinData)
if err != nil {
logger.WithField("stdinData", stdinData).Error("Error Marshaling data")
return nil, err
}
args.StdinData = newData
logger.WithField("stdin", string(args.StdinData)).Debug("Updated stdin data")
}
}
}
ipAddrsNoIpam := annot["cni.projectcalico.org/ipAddrsNoIpam"]
ipAddrs := annot["cni.projectcalico.org/ipAddrs"]
// switch based on which annotations are passed or not passed.
switch {
case ipAddrs == "" && ipAddrsNoIpam == "":
// Call IPAM plugin if ipAddrsNoIpam or ipAddrs annotation is not present.
logger.Debugf("Calling IPAM plugin %s", conf.IPAM.Type)
ipamResult, err := ipam.ExecAdd(conf.IPAM.Type, args.StdinData)
if err != nil {
return nil, err
}
logger.Debugf("IPAM plugin returned: %+v", ipamResult)
// Convert IPAM result into current Result.
// IPAM result has a bunch of fields that are optional for an IPAM plugin
// but required for a CNI plugin, so this is to populate those fields.
// See CNI Spec doc for more details.
result, err = current.NewResultFromResult(ipamResult)
if err != nil {
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
}
if len(result.IPs) == 0 {
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, errors.New("IPAM plugin returned missing IP config")
}
case ipAddrs != "" && ipAddrsNoIpam != "":
// Can't have both ipAddrs and ipAddrsNoIpam annotations at the same time.
e := fmt.Errorf("Can't have both annotations: 'ipAddrs' and 'ipAddrsNoIpam' in use at the same time")
logger.Error(e)
return nil, e
case ipAddrsNoIpam != "":
// ipAddrsNoIpam annotation is set so bypass IPAM, and set the IPs manually.
overriddenResult, err := overrideIPAMResult(ipAddrsNoIpam, logger)
if err != nil {
return nil, err
}
logger.Debugf("Bypassing IPAM to set the result to: %+v", overriddenResult)
// Convert overridden IPAM result into current Result.
// This method fill in all the empty fields necessory for CNI output according to spec.
result, err = current.NewResultFromResult(overriddenResult)
if err != nil {
return nil, err
}
if len(result.IPs) == 0 {
return nil, errors.New("Failed to build result")
}
case ipAddrs != "":
// When ipAddrs annotation is set, we call out to the configured IPAM plugin
// requesting the specific IP addresses included in the annotation.
result, err = ipAddrsResult(ipAddrs, conf, args, logger)
if err != nil {
return nil, err
}
logger.Debugf("IPAM result set to: %+v", result)
}
// Create the endpoint object and configure it.
endpoint = api.NewWorkloadEndpoint()
endpoint.Metadata.Name = args.IfName
endpoint.Metadata.Node = nodename
endpoint.Metadata.ActiveInstanceID = args.ContainerID
endpoint.Metadata.Orchestrator = orchestrator
endpoint.Metadata.Workload = workload
endpoint.Metadata.Labels = labels
// Set the profileID according to whether Kubernetes policy is required.
// If it's not, then just use the network name (which is the normal behavior)
// otherwise use one based on the Kubernetes pod's Namespace.
if conf.Policy.PolicyType == "k8s" {
endpoint.Spec.Profiles = []string{fmt.Sprintf("k8s_ns.%s", k8sArgs.K8S_POD_NAMESPACE)}
} else {
endpoint.Spec.Profiles = []string{conf.Name}
}
// Populate the endpoint with the output from the IPAM plugin.
if err = utils.PopulateEndpointNets(endpoint, result); err != nil {
// Cleanup IP allocation and return the error.
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
}
logger.WithField("endpoint", endpoint).Info("Populated endpoint")
}
fmt.Fprintf(os.Stderr, "Calico CNI using IPs: %s
", endpoint.Spec.IPNetworks)
// Whether the endpoint existed or not, the veth needs (re)creating.
hostVethName := k8sbackend.VethNameForWorkload(workload)
_, contVethMac, err := utils.DoNetworking(args, conf, result, logger, hostVethName)
if err != nil {
// Cleanup IP allocation and return the error.
logger.Errorf("Error setting up networking: %s", err)
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
}
mac, err := net.ParseMAC(contVethMac)
if err != nil {
// Cleanup IP allocation and return the error.
logger.Errorf("Error parsing MAC (%s): %s", contVethMac, err)
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
}
endpoint.Spec.MAC = &cnet.MAC{HardwareAddr: mac}
endpoint.Spec.InterfaceName = hostVethName
logger.WithField("endpoint", endpoint).Info("Added Mac and interface name to endpoint")
// Write the endpoint object (either the newly created one, or the updated one)
if _, err := calicoClient.WorkloadEndpoints().Apply(endpoint); err != nil {
// Cleanup IP allocation and return the error.
utils.ReleaseIPAllocation(logger, conf.IPAM.Type, args.StdinData)
return nil, err
}
logger.Info("Wrote updated endpoint to datastore")
return result, nil
}
1. 먼저 newK8sClient를 통해kubernetes client를 만듭니다.conf.IPAM을 판단합니다.Type ipam의 네트워크 구현은 물론 calico 자체가 하나의 calico-ipam의 네트워크 관리 실현을 갖추고 있으며host-local 네트워크를 사용하여usePodCidr를 설정하면 k8s를 통해podCidr를 얻을 수 있다. 이 안에는 k8s에 node의podCidr를 설정해야 한다.
2. 네트워크 정책의 사용 여부를 판독한다.이 안에 k8sapi를 통해pod의 annotation을 가져오고calico/k8sns, 관련 공간의 label
3. 다음은calico-ipam의 주소 분배에 들어갑니다. 여기서 k8s의 여부와 지정한 IP 주소를 판단해야 합니다. 설정이 없으면 ipamResult,err:=ipam을 호출합니다.ExecAdd(conf.IPAM.Type,args.StdinData)에서 IP를 할당합니다.만약 ipAddrs!=""설명은 IP를 지정하면 ipam 분배가 필요하지 않습니다. ipam 주소가 되돌아오는 주소result를 차지했음을 직접 알려 줍니다.err = ipAddrsResult(ipAddrs,conf,args,logger),k8s는 ippool 내의 IP 주소를 설정하는 것 외에 IPAM이 관리하지 않는 ip주소를 스스로 정의하는 ipAddrs NoIpam도 지원합니다!"",이건 아이패드가 필요 없어요.
4. 네트워크 카드를 만듭니다. 이것은bridge의 실현과 같습니다.utils를 통해.DoNetworking은 이 함수에서netlink를 통과합니다.LinkAdd에서veth에 대한 네트워크 카드를 만들고 호스트 쪽에서cali가 개통되며, 다음 11위는 용기의 id로 시작합니다.그 다음에 카드를 용기에 삽입하여 IP와 루트를 설정합니다. 이것은bridge와 같습니다. 구체적인 코드는 다음과 같습니다.
if err = ip.AddDefaultRoute(gw, contVeth); err != nil {
return fmt.Errorf("failed to add route %v", err)
}
if err = netlink.AddrAdd(contVeth, &netlink.Addr{IPNet: &addr.Address}); err != nil {
return fmt.Errorf("failed to add IP addr to %q: %v", contVethName, err)
}
그리고 호스트 루트를 설정했다. 바로 목표 IP를 설정하면 용기의 데이터가 모두 용기에 전달된다. 호스트 측에 있는 카드는cali-xxxxxx 그 카드이다.
5. 그리고 endpoint를 만들 수 있습니다. 먼저 각종 파라미터를 조립한 다음calicoClient를 통해WorkloadEndpoints().이 endpoint를 만듭니다.하나의calico의endpoint는 하나의 네트워크 포인트를 대표하는데 인터넷 카드의 별명으로 간단하게 이해할 수 있다. 바로kubernetes에pod가 있지만 하나의pod에는endpoint가 여러 개 있는 것과 같이 결합이 풀린다.calico의 endpoint에는 다음과 같은 metadata 정보가 포함되어 있습니다.
endpoint.Metadata.Name = args.IfName
endpoint.Metadata.Node = nodename
endpoint.Metadata.ActiveInstanceID = args.ContainerID
endpoint.Metadata.Orchestrator = orchestrator
endpoint.Metadata.Workload = workload
endpoint.Metadata.Labels = labels
인터넷 카드 정보도 있어요.
endpoint.Spec.MAC = &cnet.MAC{HardwareAddr: mac}
endpoint.Spec.InterfaceName = hostVethName
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Kubernetes 배포 master 서비스1. 서버 초기화 1 방화벽 닫기 [모든 마스터 실행] 2 selinux 닫기 [모든 마스터 실행] 3 호스트 이름 구성 [모든 마스터 실행] hostnamectl set-hostname 호스트 이름 4 구성 이름 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.