Python 은 Kubernetes API 를 사용 하여 클 러 스 터 에 접근 합 니 다.
5351 단어 PythonKubernetesAPI방문 군집
grep/cut 방식 사용:
# , .kubeconfig
kubectl config view -o jsonpath='{"Cluster name\tServer
"}{range .clusters[*]}{.name}{"\t"}{.cluster.server}{"
"}{end}'
#
export CLUSTER_NAME="some_server_name"
# API
APISERVER=$(kubectl config view -o jsonpath="{.clusters[?(@.name==\"$CLUSTER_NAME\")].cluster.server}")
#
TOKEN=$(kubectl get secrets -o jsonpath="{.items[?(@.metadata.annotations['kubernetes\.io/service-account\.name']=='default')].data.token}"|base64 -d)
# API
curl -X GET $APISERVER/api --header "Authorization: Bearer $TOKEN" --insecure
클 라 이언 트 라 이브 러 리:https://kubernetes.io/zh/docs/reference/using-api/client-libraries/python 예:
디 렉 터 리 구조
설정 파일 두 가지 방식
1.클 러 스 터 의~/.kube/config 를 kubeconfig.yaml 로 이름 바 꿉 니 다.
코드:
from kubernetes import client,config
from kubernetes.stream import stream
import yaml
config_file = r"D:\Users\JackHe\PycharmProjects\JJ\k8s\auth\kubeconfig.yaml"
config.kube_config.load_kube_config(config_file=config_file)
Api_Instance = client.CoreV1Api()
Api_Batch = client.BatchV1Api()
# namesapce
for ns in Api_Instance.list_namespace().items:
print(ns.metadata.name)
# nodes
def list_node():
api_response = Api_Instance.list_node()
data = {}
for i in api_response.items:
data[i.metadata.name] = {"name": i.metadata.name,
"status": i.status.conditions[-1].type if i.status.conditions[-1].status == "True" else "NotReady",
"ip": i.status.addresses[0].address,
"kubelet_version": i.status.node_info.kubelet_version,
"os_image": i.status.node_info.os_image,
}
return data
nodes = list_node()
print(nodes)
2.token 형식 을 사용 하여 명령 에 표 시 된 것 을 가 져 옵 니 다.코드:
# -*- coding: utf-8 -*-
from kubernetes.client import api_client
from kubernetes.client.apis import core_v1_api
from kubernetes import client,config
class KubernetesTools(object):
def __init__(self):
self.k8s_url = 'https://192.168.1.56:6443'
def get_token(self):
"""
token
:return:
"""
with open(r'D:\Users\JackHe\PycharmProjects\JJ\k8s\auth\token', 'r') as file:
Token = file.read().strip('
')
return Token
def get_api(self):
"""
API CoreV1Api
:return:
"""
configuration = client.Configuration()
configuration.host = self.k8s_url
configuration.verify_ssl = False
configuration.api_key = {"authorization": "Bearer " + self.get_token()}
client1 = api_client.ApiClient(configuration=configuration)
api = core_v1_api.CoreV1Api(client1)
return api
def get_namespace_list(self):
"""
:return:
"""
api = self.get_api()
namespace_list = []
for ns in api.list_namespace().items:
# print(ns.metadata.name)
namespace_list.append(ns.metadata.name)
return namespace_list
def get_pod_list(self):
api = self.get_api()
print("Listing pods with their IPs:")
ret = api.list_pod_for_all_namespaces(watch=False)
for i in ret.items:
print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
def get_service_list(self):
api = self.get_api()
ret = api.list_service_for_all_namespaces(watch=False)
for i in ret.items:
print("%s \t%s \t%s \t%s \t%s
" %(i.kind,i.metadata.namespace,i.metadata.name,i.spec.cluster_ip,i.spec.ports))
if __name__ == '__main__':
namespace_list = KubernetesTools().get_namespace_list()
pod_list = KubernetesTools().get_pod_list()
service = KubernetesTools().get_service_list()
print(namespace_list)
print(pod_list)
print(service)
파 이 썬 이 Kubernetes API 를 사용 하여 클 러 스 터 를 방문 하 는 것 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.더 많은 파 이 썬 Kubernetes API 방문 클 러 스 터 내용 은 우리 의 이전 글 을 검색 하거나 아래 의 관련 글 을 계속 조회 하 십시오.앞으로 많은 지원 을 바 랍 니 다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Python의 None과 NULL의 차이점 상세 정보그래서 대상 = 속성 + 방법 (사실 방법도 하나의 속성, 데이터 속성과 구별되는 호출 가능한 속성 같은 속성과 방법을 가진 대상을 클래스, 즉 Classl로 분류할 수 있다.클래스는 하나의 청사진과 같아서 하나의 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.