</a />
直接上代码吧,首先是我们的class based view,如下:
class K8sApiView(APIView):
authentication_classes = (BasicAuthentication, JWTAuthentication)
permission_classes = [IsAuthenticated]
# load k8s集群token信息
namespace = "test-env"
config.load_kube_config('/data/kube-config.yaml')
v1 = client.CoreV1Api()
def request_and_limits_extraction(self, cpu_range: List, mem_range: List) -> Tuple:
"""
这个函数是为了从没有顺序的pod内存和cpu的限制大小中生成一个从左到右从大到小的范围的
"""
cpu_left = cpu_range[0]
cpu_right = cpu_range[1]
if 'm' in cpu_left:
cpu_left_num = float(re.search('\d+', cpu_left).group(0)) / 1000
else:
cpu_left_num = int(re.search('\d+', cpu_left).group(0))
if 'm' in cpu_right:
cpu_right_num = float(re.search('\d+', cpu_right).group(0)) / 1000
else:
cpu_right_num = int(re.search('\d+', cpu_right).group(0))
if cpu_left_num > cpu_right_num:
cpu_limits = str(cpu_right_num) + '核' + '~' + str(cpu_left_num) + '核'
else:
cpu_limits = str(cpu_left_num) + '核' + '~' + str(cpu_right_num) + '核'
mem_left = mem_range[0]
mem_right = mem_range[1]
if 'G' in mem_left:
mem_left_num = int(re.search('\d+', mem_left).group(0))
mem_left_num = mem_left_num * 1024
elif 'M' in mem_left:
mem_left_num = int(re.search('\d+', mem_left).group(0))
if 'G' in mem_right:
mem_right_num = int(re.search('\d+', mem_right).group(0))
mem_right_num = mem_right_num * 1024
elif 'M' in mem_right:
mem_right_num = int(re.search('\d+', mem_right).group(0))
if mem_left_num > mem_right_num:
mem_limits = mem_right.strip() + '~' + mem_left.strip()
else:
mem_limits = mem_left.strip() + '~' + mem_right.strip()
return cpu_limits, mem_limits
def list_pod_by_labels(self,app_name) -> List[str]:
"""
这个方法是通过label标签把pod的名字过滤出来成一个列表用的
"""
# load k8s集群token信息
pod_list = []
ret = self.v1.list_namespaced_pod(namespace=self.namespace, label_selector="k8s-app={0}".format(app_name))
for item in ret.items:
pod_list.append(item.to_dict()['metadata']['name'])
return pod_list
def describe_namespaced_pod(self, pod_name:str) -> dict:
"""
这个方法是通过pod名字获取pod的详细内容的
"""
# 获取pod详情
ret = self.v1.read_namespaced_pod(namespace=self.namespace, name=pod_name)
data = ret.to_dict()
return data
def get_container_status(self, status_dict: dict) -> str:
"""
这是一个简单的helper函数用来提取容器的运行状态的
"""
for s in status_dict:
if status_dict[s] is not None:
return s
return 'None'
@swagger_auto_schema(operation_summary='查询tke pod信息')
def post(self, request):
app_name=request.data.get("app_name")
pod_details_list = []
pod_details = {}
pods_list = self.list_pod_by_labels(app_name)
for pod in pods_list:
container_info = {}
pod_info=self.describe_namespaced_pod(pod)
pod_details['instance_name'] = pod
pod_details['pod_status'] = pod_info['status']['phase']
pod_details['host_ip'] = pod_info['status']['host_ip']
pod_details['instance_ip'] = pod_info['status']['pod_ip']
pod_resources = pod_info['spec']['containers'][0]['resources']
cpu_request_and_limits = [pod_resources['requests']['cpu'], pod_resources['limits']['cpu']]
mem_request_and_limits = [pod_resources['requests']['memory'], pod_resources['limits']['memory']]
request_and_limits_list = self.request_and_limits_extraction(cpu_request_and_limits,mem_request_and_limits)
pod_details['cpu_and_mem_request_and_limits'] = "CPU: "+ request_and_limits_list[0] + ', ' + '内存: ' + request_and_limits_list[1]
pod_details['create_date'] = pod_info['status']['start_time'].strftime("%Y-%m-%d %H:%M:%S")
# container info
container_info['container_name'] = pod_info['status']['container_statuses'][0]['name']
container_info['container_image_url'] = pod_info['status']['container_statuses'][0]['image']
container_info['container_cpu_and_mem_request_and_limits'] = "CPU: "+ request_and_limits_list[0] + ', ' + '内存: ' + request_and_limits_list[1]
container_info['container_status'] = self.get_container_status(pod_info['status']['container_statuses'][0]['state'])
pod_details['containers_info'] = container_info
pod_details_list.append(pod_details)
return Response(pod_details_list)
解释一下上述代码:
上述代码中你只需要修改如下内容即可:
namespace = "test-env"
config.load_kube_config('/data/kube-config.yaml')
修改一下上述的命名空间和k8s的kube config的绝对路径为你们公司实际业务的真实路径即可,其它的看一下代码中的注释即可,这个代码很好理解,没啥难的。
最后去你的Django app中的urls.py中添加一个路由即可,比如下面的:
from django.urls import path
from .views import K8sApiView
urlpatterns = [
path('k8s/fetch/pod', K8sApiView.as_view())
]
接口的body示例如下:
{"app_name": "test-k8s-app"}
接口返回的示例如下:
{
"code": 200,
"data": [
{
"instance_name": "test-k8s-app-787sgg5-s7ujh4h",
"pod_status": "Running",
"host_ip": "192.168.0.1",
"instance_ip": "192.168.0.2",
"cpu_and_mem_request_and_limits": "CPU:0.25核~0.5核,内存:256Mi~2300Mi",
"create_date": "2022-01-16 17:23:05",
"containers_info": {
"container_name": "test-k8s-app",
"container_image_url": "docker-hub.com/test-env/test-k8s-app",
"container_cpu_and_mem_request_and_limits": "CPU:0.25核~0.5核,内存:256Mi~2300Mi",
"container status": "running"
}
}
],
"msg": null
}
说明:以上代码只适用于业务的pod只有一个,pod里面只有一个容器的情况,要想支持多pod多容器的话也很简单,只需要对上面的demo代码稍加调整就行哈~