typeSchedulerstruct{nodeManagerpodManagerstopChchanstruct{}kubeClientkubernetes.InterfacepodListerlisterscorev1.PodListernodeListerlisterscorev1.NodeLister//Node status returned by filtercachedstatusmap[string]*NodeUsagenodeNotifychanstruct{}//Node Overviewoverviewstatusmap[string]*NodeUsageeventRecorderrecord.EventRecorder}typenodeManagerstruct{nodesmap[string]*util.NodeInfomutexsync.RWMutex}
// returns all nodes and its device memory usage, and we filter it with nodeSelector, taints, nodeAffinity// unschedulerable and nodeName.func(s*Scheduler)getNodesUsage(nodes*[]string,task*corev1.Pod)(*map[string]*NodeUsage,map[string]string,error){overallnodeMap:=make(map[string]*NodeUsage)cachenodeMap:=make(map[string]*NodeUsage)failedNodes:=make(map[string]string)//for _, nodeID := range *nodes {allNodes,err:=s.ListNodes()iferr!=nil{return&overallnodeMap,failedNodes,err}for_,node:=rangeallNodes{nodeInfo:=&NodeUsage{}userGPUPolicy:=config.GPUSchedulerPolicyiftask!=nil&&task.Annotations!=nil{ifvalue,ok:=task.Annotations[policy.GPUSchedulerPolicyAnnotationKey];ok{userGPUPolicy=value}}nodeInfo.Devices=policy.DeviceUsageList{Policy:userGPUPolicy,DeviceLists:make([]*policy.DeviceListsScore,0),}for_,d:=rangenode.Devices{nodeInfo.Devices.DeviceLists=append(nodeInfo.Devices.DeviceLists,&policy.DeviceListsScore{Score:0,Device:&util.DeviceUsage{ID:d.ID,Index:d.Index,Used:0,Count:d.Count,Usedmem:0,Totalmem:d.Devmem,Totalcore:d.Devcore,Usedcores:0,Type:d.Type,Numa:d.Numa,Health:d.Health,},})}overallnodeMap[node.ID]=nodeInfo}podsInfo:=s.ListPodsInfo()for_,p:=rangepodsInfo{node,ok:=overallnodeMap[p.NodeID]if!ok{continue}for_,podsingleds:=rangep.Devices{for_,ctrdevs:=rangepodsingleds{for_,udevice:=rangectrdevs{for_,d:=rangenode.Devices.DeviceLists{ifd.Device.ID==udevice.UUID{d.Device.Used++d.Device.Usedmem+=udevice.Usedmemd.Device.Usedcores+=udevice.Usedcores}}}}}klog.V(5).Infof("usage: pod %v assigned %v %v",p.Name,p.NodeID,p.Devices)}s.overviewstatus=overallnodeMapfor_,nodeID:=range*nodes{node,err:=s.GetNode(nodeID)iferr!=nil{// The identified node does not have a gpu device, so the log here has no practical meaning,increase log priority.klog.V(5).InfoS("node unregistered","node",nodeID,"error",err)failedNodes[nodeID]="node unregistered"continue}cachenodeMap[node.ID]=overallnodeMap[node.ID]}s.cachedstatus=cachenodeMapreturn&cachenodeMap,failedNodes,nil}
大概长这样:
1
2
3
4
5
6
7
8
9
10
11
12
root@test:~# k get node test -oyaml
apiVersion: v1
kind: Node
metadata:
annotations:
csi.volume.kubernetes.io/nodeid: '{"nfs.csi.k8s.io":"j99cloudvm","rbd.csi.ceph.com":"j99cloudvm"}' hami.io/node-handshake: Requesting_2024.11.19 03:10:32
hami.io/node-handshake-dcu: Deleted_2024.09.13 06:42:44
hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA
A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA
A40,0,true:' kubeadm.alpha.kubernetes.io/cri-socket: /run/containerd/containerd.sock
这边限制了只会处理 hami.io/vgpu-node annoations 的 Pod,过滤掉其他 Pod,从 Pod Annoations 中解析出该 Pod 使用的 GPU UUID 以及 memory 和 core 等信息。
Pod 上的 Annoations 大概是这样的:
1
2
3
4
5
6
7
8
9
10
11
$ k get po gpu-pod -oyaml
apiVersion: v1
kind: Pod
metadata:
annotations:
hami.io/bind-phase: success
hami.io/bind-time: "1727251686" hami.io/vgpu-devices-allocated: GPU-03f69c50-207a-2038-9b45-23cac89cb67d,NVIDIA,3000,30:; hami.io/vgpu-devices-to-allocate: ; hami.io/vgpu-node: test hami.io/vgpu-time: "1727251686"
其中hami.io/vgpu-devices-allocated 对应的 value 就是 GPU 信息,格式化后如下
func(s*Scheduler)RegisterFromNodeAnnotations(){klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")ticker:=time.NewTicker(time.Second*15)for{select{case<-s.nodeNotify:case<-ticker.C:case<-s.stopCh:return}// .....
// pkg/scheduler/scheduler.go#L444func(s*Scheduler)Filter(argsextenderv1.ExtenderArgs)(*extenderv1.ExtenderFilterResult,error){klog.InfoS("begin schedule filter","pod",args.Pod.Name,"uuid",args.Pod.UID,"namespaces",args.Pod.Namespace)nums:=k8sutil.Resourcereqs(args.Pod)total:=0for_,n:=rangenums{for_,k:=rangen{total+=int(k.Nums)}}iftotal==0{klog.V(1).Infof("pod %v not find resource",args.Pod.Name)s.recordScheduleFilterResultEvent(args.Pod,EventReasonFilteringFailed,[]string{},fmt.Errorf("does not request any resource"))return&extenderv1.ExtenderFilterResult{NodeNames:args.NodeNames,FailedNodes:nil,Error:"",},nil}annos:=args.Pod.Annotationss.delPod(args.Pod)nodeUsage,failedNodes,err:=s.getNodesUsage(args.NodeNames,args.Pod)iferr!=nil{s.recordScheduleFilterResultEvent(args.Pod,EventReasonFilteringFailed,[]string{},err)returnnil,err}iflen(failedNodes)!=0{klog.V(5).InfoS("getNodesUsage failed nodes","nodes",failedNodes)}nodeScores,err:=s.calcScore(nodeUsage,nums,annos,args.Pod)iferr!=nil{err:=fmt.Errorf("calcScore failed %v for pod %v",err,args.Pod.Name)s.recordScheduleFilterResultEvent(args.Pod,EventReasonFilteringFailed,[]string{},err)returnnil,err}iflen((*nodeScores).NodeList)==0{klog.V(4).Infof("All node scores do not meet for pod %v",args.Pod.Name)s.recordScheduleFilterResultEvent(args.Pod,EventReasonFilteringFailed,[]string{},fmt.Errorf("no available node, all node scores do not meet"))return&extenderv1.ExtenderFilterResult{FailedNodes:failedNodes,},nil}klog.V(4).Infoln("nodeScores_len=",len((*nodeScores).NodeList))sort.Sort(nodeScores)m:=(*nodeScores).NodeList[len((*nodeScores).NodeList)-1]klog.Infof("schedule %v/%v to %v %v",args.Pod.Namespace,args.Pod.Name,m.NodeID,m.Devices)annotations:=make(map[string]string)annotations[util.AssignedNodeAnnotations]=m.NodeIDannotations[util.AssignedTimeAnnotations]=strconv.FormatInt(time.Now().Unix(),10)for_,val:=rangedevice.GetDevices(){val.PatchAnnotations(&annotations,m.Devices)}//InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)//supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)//maps.Copy(annotations, InRequestDevices)//maps.Copy(annotations, supportDevices)s.addPod(args.Pod,m.NodeID,m.Devices)err=util.PatchPodAnnotations(args.Pod,annotations)iferr!=nil{s.recordScheduleFilterResultEvent(args.Pod,EventReasonFilteringFailed,[]string{},err)s.delPod(args.Pod)returnnil,err}s.recordScheduleFilterResultEvent(args.Pod,EventReasonFilteringSucceed,[]string{m.NodeID},nil)res:=extenderv1.ExtenderFilterResult{NodeNames:&[]string{m.NodeID}}return&res,nil}
对于没有申请特殊资源的 Pod 直接返回全部 Node 都可以调度,不做处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
nums:=k8sutil.Resourcereqs(args.Pod)total:=0for_,n:=rangenums{for_,k:=rangen{total+=int(k.Nums)}}iftotal==0{klog.V(1).Infof("pod %v not find resource",args.Pod.Name)s.recordScheduleFilterResultEvent(args.Pod,EventReasonFilteringFailed,[]string{},fmt.Errorf("does not request any resource"))return&extenderv1.ExtenderFilterResult{NodeNames:args.NodeNames,FailedNodes:nil,Error:"",},nil}
// pkg/scheduler/policy/node_policy.go#L52func(ns*NodeScore)ComputeScore(devicesDeviceUsageList){// current user having request resourceused,usedCore,usedMem:=int32(0),int32(0),int32(0)for_,device:=rangedevices.DeviceLists{used+=device.Device.UsedusedCore+=device.Device.UsedcoresusedMem+=device.Device.Usedmem}klog.V(2).Infof("node %s used %d, usedCore %d, usedMem %d,",ns.NodeID,used,usedCore,usedMem)total,totalCore,totalMem:=int32(0),int32(0),int32(0)for_,deviceLists:=rangedevices.DeviceLists{total+=deviceLists.Device.CounttotalCore+=deviceLists.Device.TotalcoretotalMem+=deviceLists.Device.Totalmem}useScore:=float32(used)/float32(total)coreScore:=float32(usedCore)/float32(totalCore)memScore:=float32(usedMem)/float32(totalMem)ns.Score=float32(Weight)*(useScore+coreScore+memScore)klog.V(2).Infof("node %s computer score is %f",ns.NodeID,ns.Score)}
// pkg/scheduler/score.go#L185func(s*Scheduler)calcScore(nodes*map[string]*NodeUsage,numsutil.PodDeviceRequests,annosmap[string]string,task*corev1.Pod)(*policy.NodeScoreList,error){userNodePolicy:=config.NodeSchedulerPolicyifannos!=nil{ifvalue,ok:=annos[policy.NodeSchedulerPolicyAnnotationKey];ok{userNodePolicy=value}}res:=policy.NodeScoreList{Policy:userNodePolicy,NodeList:make([]*policy.NodeScore,0),}//func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) {// res := make(NodeScoreList, 0, len(*nodes))fornodeID,node:=range*nodes{viewStatus(*node)score:=policy.NodeScore{NodeID:nodeID,Devices:make(util.PodDevices),Score:0}score.ComputeScore(node.Devices)//This loop is for different container requestctrfit:=falseforctrid,n:=rangenums{sums:=0for_,k:=rangen{sums+=int(k.Nums)}ifsums==0{foridx:=rangescore.Devices{iflen(score.Devices[idx])<=ctrid{score.Devices[idx]=append(score.Devices[idx],util.ContainerDevices{})}score.Devices[idx][ctrid]=append(score.Devices[idx][ctrid],util.ContainerDevice{})continue}}klog.V(5).InfoS("fitInDevices","pod",klog.KObj(task),"node",nodeID)fit,_:=fitInDevices(node,n,annos,task,&score.Devices)ctrfit=fitif!fit{klog.InfoS("calcScore:node not fit pod","pod",klog.KObj(task),"node",nodeID)break}}ifctrfit{res.NodeList=append(res.NodeList,&score)}}return&res,nil}
func(s*Scheduler)Bind(argsextenderv1.ExtenderBindingArgs)(*extenderv1.ExtenderBindingResult,error){klog.InfoS("Bind","pod",args.PodName,"namespace",args.PodNamespace,"podUID",args.PodUID,"node",args.Node)varerrerrorvarres*extenderv1.ExtenderBindingResultbinding:=&corev1.Binding{ObjectMeta:metav1.ObjectMeta{Name:args.PodName,UID:args.PodUID},Target:corev1.ObjectReference{Kind:"Node",Name:args.Node},}current,err:=s.kubeClient.CoreV1().Pods(args.PodNamespace).Get(context.Background(),args.PodName,metav1.GetOptions{})iferr!=nil{klog.ErrorS(err,"Get pod failed")}node,err:=s.kubeClient.CoreV1().Nodes().Get(context.Background(),args.Node,metav1.GetOptions{})iferr!=nil{klog.ErrorS(err,"Failed to get node","node",args.Node)s.recordScheduleBindingResultEvent(current,EventReasonBindingFailed,[]string{},fmt.Errorf("failed to get node %v",args.Node))res=&extenderv1.ExtenderBindingResult{Error:err.Error(),}returnres,nil}tmppatch:=make(map[string]string)for_,val:=rangedevice.GetDevices(){err=val.LockNode(node,current)iferr!=nil{gotoReleaseNodeLocks}}/*
err = nodelock.LockNode(args.Node)
if err != nil {
klog.ErrorS(err, "Failed to lock node", "node", args.Node)
res = &extenderv1.ExtenderBindingResult{
Error: err.Error(),
}
return res, nil
}*///defer util.ReleaseNodeLock(args.Node)tmppatch[util.DeviceBindPhase]="allocating"tmppatch[util.BindTimeAnnotations]=strconv.FormatInt(time.Now().Unix(),10)err=util.PatchPodAnnotations(current,tmppatch)iferr!=nil{klog.ErrorS(err,"patch pod annotation failed")}iferr=s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(),binding,metav1.CreateOptions{});err!=nil{klog.ErrorS(err,"Failed to bind pod","pod",args.PodName,"namespace",args.PodNamespace,"podUID",args.PodUID,"node",args.Node)}iferr==nil{s.recordScheduleBindingResultEvent(current,EventReasonBindingSucceed,[]string{args.Node},nil)res=&extenderv1.ExtenderBindingResult{Error:"",}klog.Infoln("After Binding Process")returnres,nil}ReleaseNodeLocks:klog.InfoS("bind failed","err",err.Error())for_,val:=rangedevice.GetDevices(){val.ReleaseNodeLock(node,current)}s.recordScheduleBindingResultEvent(current,EventReasonBindingFailed,[]string{},err)return&extenderv1.ExtenderBindingResult{Error:err.Error(),},nil}
核心部分:
1
2
3
4
5
6
7
binding:=&corev1.Binding{ObjectMeta:metav1.ObjectMeta{Name:args.PodName,UID:args.PodUID},Target:corev1.ObjectReference{Kind:"Node",Name:args.Node},}iferr=s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(),binding,metav1.CreateOptions{});err!=nil{klog.ErrorS(err,"Failed to bind pod","pod",args.PodName,"namespace",args.PodNamespace,"podUID",args.PodUID,"node",args.Node)}