apiVersion:batch.volcano.sh/v1alpha1kind:Jobmetadata:name:simple-vgpu-trainingspec:schedulerName:volcanominAvailable: 3 # Gang Scheduling:确保3个Pod同时启动tasks:- name:workerreplicas:3# 启动2个Workertemplate:spec:restartPolicy:OnFailurecontainers:- name:python-trainerimage:python:3.9-slimcommand:["python","-c"]args:- | # 简化的训练代码
import os
import time
worker_id = os.getenv("VC_TASK_INDEX", "0")
print(f"Worker {worker_id} started with vGPU")
# 模拟训练过程
for epoch in range(1, 10):
time.sleep(6)
print(f"Worker {worker_id} completed epoch {epoch}")
print(f"Worker {worker_id} finished training!")env:# 获取任务索引 (0,1,...)- name:VC_TASK_INDEXvalueFrom:fieldRef:fieldPath:metadata.annotations['volcano.sh/task-index']resources:limits:volcano.sh/vgpu-memory:1024# 每个Worker分配1024MB显存volcano.sh/vgpu-number:1# 每个Worker1个vGPUcpu:"1"memory:"1Gi"
一切正常:
1
2
3
4
5
root@node5-3:~/lixd# k get po -w
NAME READY STATUS RESTARTS AGE
simple-vgpu-training-worker-0 1/1 Running 0 5s
simple-vgpu-training-worker-1 1/1 Running 0 5s
simple-vgpu-training-worker-2 1/1 Running 0 5s
# HELP volcano_vgpu_device_allocated_cores The percentage of gpu compute cores allocated in this card# TYPE volcano_vgpu_device_allocated_cores gaugevolcano_vgpu_device_allocated_cores{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad"}50# HELP volcano_vgpu_device_allocated_memory The number of vgpu memory allocated in this card# TYPE volcano_vgpu_device_allocated_memory gaugevolcano_vgpu_device_allocated_memory{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad"}2048# HELP volcano_vgpu_device_core_allocation_for_a_certain_pod The vgpu device core allocated for a certain pod# TYPE volcano_vgpu_device_core_allocation_for_a_certain_pod gaugevolcano_vgpu_device_core_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-0"}10volcano_vgpu_device_core_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-1"}10volcano_vgpu_device_core_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-2"}10volcano_vgpu_device_core_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-3"}10volcano_vgpu_device_core_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-4"}10# HELP volcano_vgpu_device_memory_allocation_for_a_certain_pod The vgpu device memory allocated for a certain pod# TYPE volcano_vgpu_device_memory_allocation_for_a_certain_pod gaugevolcano_vgpu_device_memory_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-0"}128volcano_vgpu_device_memory_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-1"}128volcano_vgpu_device_memory_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-2"}128volcano_vgpu_device_memory_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-3"}128volcano_vgpu_device_memory_allocation_for_a_certain_pod{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad",podName="simple-vgpu-training-worker-4"}128# HELP volcano_vgpu_device_memory_limit The number of total device memory in this card# TYPE volcano_vgpu_device_memory_limit gaugevolcano_vgpu_device_memory_limit{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad"}4914# HELP volcano_vgpu_device_shared_number The number of vgpu tasks sharing this card# TYPE volcano_vgpu_device_shared_number gaugevolcano_vgpu_device_shared_number{NodeName="node5-3",devID="GPU-542efc47-39a1-9669-3d17-3b7dec8251ad"}5
func(dp*deviceSharePlugin)OnSessionOpen(ssn*framework.Session){// Register event handlers to update task info in PodLister & nodeMapssn.AddPredicateFn(dp.Name(),func(task*api.TaskInfo,node*api.NodeInfo)error{predicateStatus:=make([]*api.Status,0)// Check PredicateWithCachefor_,val:=rangeapi.RegisteredDevices{ifdev,ok:=node.Others[val].(api.Devices);ok{ifreflect.ValueOf(dev).IsNil(){// TODO When a pod requests a device of the current type, but the current node does not have such a device, an error is thrownifdev==nil||dev.HasDeviceRequest(task.Pod){predicateStatus=append(predicateStatus,&api.Status{Code:devices.Unschedulable,Reason:"node not initialized with device"+val,Plugin:PluginName,})returnapi.NewFitErrWithStatus(task,node,predicateStatus...)}klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it",task.Pod.Namespace,task.Pod.Name,val,node.Name)continue}code,msg,err:=dev.FilterNode(task.Pod,dp.schedulePolicy)iferr!=nil{predicateStatus=append(predicateStatus,createStatus(code,msg))returnapi.NewFitErrWithStatus(task,node,predicateStatus...)}filterNodeStatus:=createStatus(code,msg)iffilterNodeStatus.Code!=api.Success{predicateStatus=append(predicateStatus,filterNodeStatus)returnapi.NewFitErrWithStatus(task,node,predicateStatus...)}}else{klog.Warningf("Devices %s assertion conversion failed, skip",val)}}klog.V(4).Infof("checkDevices predicates Task <%s/%s> on Node <%s>: fit ",task.Namespace,task.Name,node.Name)returnnil})ssn.AddNodeOrderFn(dp.Name(),func(task*api.TaskInfo,node*api.NodeInfo)(float64,error){// DeviceScorenodeScore:=float64(0)ifdp.scheduleWeight>0{score,status:=getDeviceScore(context.TODO(),task.Pod,node,dp.schedulePolicy)if!status.IsSuccess(){klog.Warningf("Node: %s, Calculate Device Score Failed because of Error: %v",node.Name,status.AsError())return0,status.AsError()}// TODO: we should use a seperate plugin for devices, and seperate them from predicates and nodeOrder plugin.nodeScore=float64(score)*float64(dp.scheduleWeight)klog.V(5).Infof("Node: %s, task<%s/%s> Device Score weight %d, score: %f",node.Name,task.Namespace,task.Name,dp.scheduleWeight,nodeScore)}returnnodeScore,nil})}
ssn.AddPredicateFn(dp.Name(),func(task*api.TaskInfo,node*api.NodeInfo)error{predicateStatus:=make([]*api.Status,0)// Check PredicateWithCachefor_,val:=rangeapi.RegisteredDevices{ifdev,ok:=node.Others[val].(api.Devices);ok{ifreflect.ValueOf(dev).IsNil(){// TODO When a pod requests a device of the current type, but the current node does not have such a device, an error is thrownifdev==nil||dev.HasDeviceRequest(task.Pod){predicateStatus=append(predicateStatus,&api.Status{Code:devices.Unschedulable,Reason:"node not initialized with device"+val,Plugin:PluginName,})returnapi.NewFitErrWithStatus(task,node,predicateStatus...)}klog.V(4).Infof("pod %s/%s did not request device %s on %s, skipping it",task.Pod.Namespace,task.Pod.Name,val,node.Name)continue}code,msg,err:=dev.FilterNode(task.Pod,dp.schedulePolicy)iferr!=nil{predicateStatus=append(predicateStatus,createStatus(code,msg))returnapi.NewFitErrWithStatus(task,node,predicateStatus...)}filterNodeStatus:=createStatus(code,msg)iffilterNodeStatus.Code!=api.Success{predicateStatus=append(predicateStatus,filterNodeStatus)returnapi.NewFitErrWithStatus(task,node,predicateStatus...)}}else{klog.Warningf("Devices %s assertion conversion failed, skip",val)}}klog.V(4).Infof("checkDevices predicates Task <%s/%s> on Node <%s>: fit ",task.Namespace,task.Name,node.Name)returnnil})
核心逻辑在 FilterNode 方法中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
code,msg,err:=dev.FilterNode(task.Pod,dp.schedulePolicy)iferr!=nil{predicateStatus=append(predicateStatus,createStatus(code,msg))returnapi.NewFitErrWithStatus(task,node,predicateStatus...)}func(gs*GPUDevices)FilterNode(pod*v1.Pod,schedulePolicystring)(int,string,error){ifVGPUEnable{klog.V(4).Infoln("hami-vgpu DeviceSharing starts filtering pods",pod.Name)fit,_,score,err:=checkNodeGPUSharingPredicateAndScore(pod,gs,true,schedulePolicy)iferr!=nil||!fit{klog.ErrorS(err,"Failed to fitler node to vgpu task","pod",pod.Name)returndevices.Unschedulable,"hami-vgpuDeviceSharing error",err}gs.Score=scoreklog.V(4).Infoln("hami-vgpu DeviceSharing successfully filters pods")}returndevices.Success,"",nil}
ctrdevs:=[]ContainerDevices{}for_,val:=rangectrReq{devs:=[]ContainerDevice{}ifint(val.Nums)>len(gs.Device){returnfalse,[]ContainerDevices{},0,fmt.Errorf("no enough gpu cards on node %s",gs.Name)}klog.V(3).InfoS("Allocating device for container","request",val)fori:=len(gs.Device)-1;i>=0;i--{klog.V(3).InfoS("Scoring pod request","memReq",val.Memreq,"memPercentageReq",val.MemPercentagereq,"coresReq",val.Coresreq,"Nums",val.Nums,"Index",i,"ID",gs.Device[i].ID)klog.V(3).InfoS("Current Device","Index",i,"TotalMemory",gs.Device[i].Memory,"UsedMemory",gs.Device[i].UsedMem,"UsedCores",gs.Device[i].UsedCore,"replicate",replicate)ifgs.Device[i].Number<=uint(gs.Device[i].UsedNum){continue}ifval.MemPercentagereq!=101&&val.Memreq==0{val.Memreq=gs.Device[i].Memory*uint(val.MemPercentagereq/100)}ifint(gs.Device[i].Memory)-int(gs.Device[i].UsedMem)<int(val.Memreq){continue}ifgs.Device[i].UsedCore+val.Coresreq>100{continue}// Coresreq=100 indicates it want this card exclusivelyifval.Coresreq==100&&gs.Device[i].UsedNum>0{continue}// You can't allocate core=0 job to an already full GPUifgs.Device[i].UsedCore==100&&val.Coresreq==0{continue}if!checkType(pod.Annotations,*gs.Device[i],val){klog.Errorln("failed checktype",gs.Device[i].Type,val.Type)continue}fit,uuid:=gs.Sharing.TryAddPod(gs.Device[i],uint(val.Memreq),uint(val.Coresreq))if!fit{klog.V(3).Info(gs.Device[i].ID,"not fit")continue}//total += gs.Devices[i].Count//free += node.Devices[i].Count - node.Devices[i].Usedifval.Nums>0{val.Nums--klog.V(3).Info("fitted uuid: ",uuid)devs=append(devs,ContainerDevice{UUID:uuid,Type:val.Type,Usedmem:val.Memreq,Usedcores:val.Coresreq,})score+=GPUScore(schedulePolicy,gs.Device[i])}ifval.Nums==0{break}}ifval.Nums>0{returnfalse,[]ContainerDevices{},0,fmt.Errorf("not enough gpu fitted on this node")}ctrdevs=append(ctrdevs,devs)}
ssn.AddNodeOrderFn(dp.Name(),func(task*api.TaskInfo,node*api.NodeInfo)(float64,error){// DeviceScorenodeScore:=float64(0)ifdp.scheduleWeight>0{score,status:=getDeviceScore(context.TODO(),task.Pod,node,dp.schedulePolicy)if!status.IsSuccess(){klog.Warningf("Node: %s, Calculate Device Score Failed because of Error: %v",node.Name,status.AsError())return0,status.AsError()}// TODO: we should use a seperate plugin for devices, and seperate them from predicates and nodeOrder plugin.nodeScore=float64(score)*float64(dp.scheduleWeight)klog.V(5).Infof("Node: %s, task<%s/%s> Device Score weight %d, score: %f",node.Name,task.Namespace,task.Name,dp.scheduleWeight,nodeScore)}returnnodeScore,nil})}