Contents

HAMi vGPU 原理分析 Part3:hami-scheduler 工作流程分析

https://img.lixueduan.com/kubernetes/cover/hami-analyze-3-hami-scheduler.png

上篇我们分析了 hami-webhook,该 Webhook 将申请了 vGPU 资源的 Pod 的调度器修改为 hami-scheduler,后续使用 hami-scheduler 进行调度。

本文为 HAMi 原理分析的第三篇,分析 hami-scheduler 工作流程。

上篇主要分析了 hami-webhook,解决了:Pod 是如何使用到 hami-scheduler,创建 Pod 时我们未指定 SchedulerName 默认会使用 default-scheduler 进行调度才对 问题。

这篇开始分析 hami-scheduler,解决另一个问题:hami-scheduler 逻辑,spread & binpark 等 高级调度策略是如何实现的

写完发现内容还是很多,spread & binpark 调度策略下一篇在分析吧,这篇主要分析调度流程。

以下分析基于 HAMi v2.4.0

省流:

HAMi Webhook 、Scheduler 工作流程如下:

https://img.lixueduan.com/kubernetes/vgpu/hami-scheduler.png

  • 1)用户创建 Pod 并在 Pod 中申请了 vGPU 资源

  • 2)kube-apiserver 根据 MutatingWebhookConfiguration 配置请求 HAMi-Webhook

  • 3)HAMi-Webhook 检测 Pod 中的 Resource,如果申请的由 HAMi 管理的 vGPU 资源,就会把 Pod 中的 SchedulerName 改成了 hami-scheduler,这样这个 Pod 就会由 hami-scheduler 进行调度了。

  • 对于特权模式的 Pod,Webhook 会直接跳过不处理

  • 对于使用 vGPU 资源但指定了 nodeName 的 Pod,Webhook 会直接拒绝

  • 4)hami-scheduler 进行 Pod 调度,不过就是用的 k8s 的默认 kube-scheduler 镜像,因此调度逻辑和默认的 default-scheduler 是一样的,但是 kube-scheduler 还会根据 KubeSchedulerConfiguration 配置,调用 Extender Scheduler 插件

  • 这个 Extender Scheduler 就是 hami-scheduler Pod 中的另一个 Container,该 Container 同时提供了 Webhook 和 Scheduler 相关 API。

  • 当 Pod 申请了 vGPU 资源时,kube-scheduler 就会根据配置以 HTTP 形式调用 Extender Scheduler 插件,这样就实现了自定义调度逻辑

  • 5)Extender Scheduler 插件包含了真正的 hami 调度逻辑, 调度时根据节点剩余资源量进行打分选择节点

    • 这里就包含了spread & binpark 等 高级调度策略的实现
  • 6)异步任务,包括 GPU 感知逻辑

    • devicePlugin 中的后台 Goroutine 定时上报 Node 上的 GPU 资源并写入到 Node 的 Annoations
    • 除了 DevicePlugin 之外,还使用异步任务以 Patch Annotation 方式提交更多信息
    • Extender Scheduler 插件根据 Node Annoations 解析出 GPU 资源总量、从 Node 上已经运行的 Pod 的 Annoations 中解析出 GPU 使用量,计算出每个 Node 剩余的可用资源保存到内存供调度时使用

1. 概述

Hami-scheduler 主要是 Pod 的调度逻辑,从集群节点中为当前 Pod 选择最合适的节点。

Hami-scheduler 也是通过 Scheduler Extender 方式实现的,可以参考上一篇文章 K8s 自定义调度器 Part1:通过 Scheduler Extender 实现自定义调度逻辑 手把手实现一个自定义调度器。

但是 HAMi 并没有直接扩展 default-scheduler,而是使用默认的 kube-scheduler 镜像额外启动了一个 scheduler,但是通过配置把名称指定为了 hami-scheduler。

然后给这个 hami-scheduler 配置了 Extender,Extender 服务就是同 Pod 中的另一个Container 启动的一个 http 服务。

ps:后续说的 hami-scheduler 一般只这部分 Extender 实现的调度插件

2. 具体部署

Deployment

Hami-scheduler 使用 Deployment 进行部署,该 Deployment 中有两个 Container,其中一个是原生的 kube-scheduler,另一个则是 HAMi 的 Scheduler 服务。

完整 yaml 如下:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: vgpu-hami-scheduler
  namespace: kube-system
spec:
  template:
    spec:
      containers:
      - command:
        - kube-scheduler
        - --config=/config/config.yaml
        - -v=4
        - --leader-elect=true
        - --leader-elect-resource-name=hami-scheduler
        - --leader-elect-resource-namespace=kube-system
        image: 192.168.116.54:5000/kube-scheduler:v1.23.17
        imagePullPolicy: IfNotPresent
        name: kube-scheduler
        resources: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /config
          name: scheduler-config
      - command:
        - scheduler
        - --resource-name=nvidia.com/vgpu
        - --resource-mem=nvidia.com/gpumem
        - --resource-cores=nvidia.com/gpucores
        - --resource-mem-percentage=nvidia.com/gpumem-percentage
        - --resource-priority=nvidia.com/priority
        - --http_bind=0.0.0.0:443
        - --cert_file=/tls/tls.crt
        - --key_file=/tls/tls.key
        - --scheduler-name=hami-scheduler
        - --metrics-bind-address=:9395
        - --default-mem=0
        - --default-gpu=1
        - --default-cores=0
        - --iluvatar-memory=iluvatar.ai/vcuda-memory
        - --iluvatar-cores=iluvatar.ai/vcuda-core
        - --cambricon-mlu-name=cambricon.com/vmlu
        - --cambricon-mlu-memory=cambricon.com/mlu.smlu.vmemory
        - --cambricon-mlu-cores=cambricon.com/mlu.smlu.vcore
        - --ascend-name=huawei.com/Ascend910
        - --ascend-memory=huawei.com/Ascend910-memory
        - --ascend310p-name=huawei.com/Ascend310P
        - --ascend310p-memory=huawei.com/Ascend310P-memory
        - --overwrite-env=false
        - --node-scheduler-policy=binpack
        - --gpu-scheduler-policy=spread
        - --debug
        - -v=4
        image: projecthami/hami:v2.3.13
        imagePullPolicy: IfNotPresent
        name: vgpu-scheduler-extender
        ports:
        - containerPort: 443
          name: http
          protocol: TCP
        volumeMounts:
        - mountPath: /tls
          name: tls-config
      dnsPolicy: ClusterFirst
      priorityClassName: system-node-critical
      restartPolicy: Always
      schedulerName: default-scheduler
      serviceAccount: vgpu-hami-scheduler
      serviceAccountName: vgpu-hami-scheduler
      terminationGracePeriodSeconds: 30
      volumes:
      - name: tls-config
        secret:
          defaultMode: 420
          secretName: vgpu-hami-scheduler-tls
      - configMap:
          defaultMode: 420
          name: vgpu-hami-scheduler-newversion
        name: scheduler-config

KubeSchedulerConfiguration

对应的 Scheduler 的配置文件存储在 Configmap 中,具体内容如下:

apiVersion: v1
data:
  config.yaml: |
    apiVersion: kubescheduler.config.k8s.io/v1beta2
    kind: KubeSchedulerConfiguration
    leaderElection:
      leaderElect: false
    profiles:
    - schedulerName: hami-scheduler
    extenders:
    - urlPrefix: "https://127.0.0.1:443"
      filterVerb: filter
      bindVerb: bind
      nodeCacheCapable: true
      weight: 1
      httpTimeout: 30s
      enableHTTPS: true
      tlsConfig:
        insecure: true
      managedResources:
      - name: nvidia.com/vgpu
        ignoredByScheduler: true
      - name: nvidia.com/gpumem
        ignoredByScheduler: true
      - name: nvidia.com/gpucores
        ignoredByScheduler: true
      - name: nvidia.com/gpumem-percentage
        ignoredByScheduler: true
      - name: nvidia.com/priority
        ignoredByScheduler: true
      - name: cambricon.com/vmlu
        ignoredByScheduler: true
      - name: hygon.com/dcunum
        ignoredByScheduler: true
      - name: hygon.com/dcumem
        ignoredByScheduler: true
      - name: hygon.com/dcucores
        ignoredByScheduler: true
      - name: iluvatar.ai/vgpu
        ignoredByScheduler: true
      - name: huawei.com/Ascend910-memory
        ignoredByScheduler: true
      - name: huawei.com/Ascend910
        ignoredByScheduler: true
      - name: huawei.com/Ascend310P-memory
        ignoredByScheduler: true
      - name: huawei.com/Ascend310P
        ignoredByScheduler: true    
kind: ConfigMap
metadata:
  name: vgpu-hami-scheduler-newversion
  namespace: kube-system

SchedulerName

首先是指定了 Scheduler 名字叫做 hami-scheduler,k8s 默认的调度器叫做 default-scheduler。

profiles:
- schedulerName: hami-scheduler

创建 Pod 的时候我们是没有指定 schedulerName 的,所以默认都会使用 default-scheduler,也就是默认 kube-scheduler 进行调度。

之前 hami-webhook 修改 SchedulerName 时就需要和这里配置的名称对应。

Extenders

调度器核心配置如下:

extenders:
- urlPrefix: "https://127.0.0.1:443"
  filterVerb: filter
  bindVerb: bind

参数解释:

  • urlPrefix: "https://127.0.0.1:443":这是一个调度器扩展器的服务地址,Kubernetes 调度器会调用这个地址来请求外部调度逻辑。可以通过 HTTPS 访问。

    • External Scheduler 因为是和 kube-scheduler 部署在一个 Pod 里的,因此使用 127.0.0.1 进行访问
  • filterVerb: filter:这个动词指示了调度器会调用这个扩展器服务来过滤节点,即决定哪些节点适合调度 Pod。

    • Filter 接口对应这个 http 服务的 url 就是 /filter
  • bindVerb: bind:调度器扩展器可以执行绑定操作,即将 Pod 绑定到特定节点。

    • 同上,bind 就要对应 /bind 这个接口

managedResources

managedResources 这部分指定这个扩展调度器 hami-cheduler 管理的资源,只有 Pod Resource 中申请了 managedResources 中指定的资源时,Scheduler 才会请求我们配置的 Extender,也就是 hami-scheduler。

即:只要没申请 vGPU 资源,就是指定使用 hami-scheduler 调度,也是由名为 hami-scheduler 的 kube-scheduler 进行调度,不会请求 Extender,真正的 HAMi 调度插件不会生效。

managedResources:
- name: nvidia.com/vgpu
  ignoredByScheduler: true
- name: nvidia.com/gpumem
  ignoredByScheduler: true
  ...
  • name: nvidia.com/vgpu:资源名称

  • ignoredByScheduler:当设置为 true 时,调度器在做节点资源匹配和资源分配时,会忽略这个资源。这些资源都由扩展的 hami-scheduler 进行调度即可。

这样配置之后,对于 nvidia.com/vgpu、nvidia.com/gpumem 等等 managedResources 中指定的资源,调度器在做节点资源匹配和资源分配时,会忽略这个资源,不会因为 Node 上没有这些虚拟资源,就直接调度失败了。

当调度器请求扩展的 hami-scheduler 进行调度时,hami-scheduler 就能够正常处理这些资源,根据 Pod 申请的 Resource 配置找到对应的节点。

接下来则分析 hami-scheduler 的具体实现,包括两个问题:

  • 1)hami-scheduler 如何感知 Node 上的 GPU 信息的,因为前面提到 gpucore、gpumem 这些都是虚拟资源, DevicePlugin 也是没有直接上报到 Node 上的

  • 2)hami-scheduler 是如何选择最合适的节点的,spark & binpark 等高级调度策略是如何实现的

3. hami 如何感知 Nod 上的 GPU 资源情况的

分为两部分:

  • 1)感知 Node 上的 GPU 资源信息

  • 2)感知 Node 上 GPU 资源使用情况

因为 gpucore、gpumem 这些都是虚拟资源,因此不能像 DevicePlugin 上报的标准第三方资源一样,由 K8s 直接维护,而是需要 hami 自行维护。

为什么需要自定义感知逻辑

到这里大家可能会有疑问,上一篇文章中介绍了 hami-device-plugin-nvidia,这里的 devicePlugin 不就已经感知了节点上的 GPU 并上报到 kube-apiserver 了吗,怎么还需要实现一个感知逻辑?

在节点上都可以看到了,例如下面节点上就有 20 个 GPU

root@j99cloudvm:~# k describe node j99cloudvm |grep Capa -A 7
Capacity:
  cpu:                64
  ephemeral-storage:  2097139692Ki
  hugepages-1Gi:      0
  hugepages-2Mi:      0
  memory:             131966216Ki
  nvidia.com/gpu:     20

因为:hami-scheduler 做了细粒度的 gpucore、gpumem 切分,那么就要知道节点上的 GPU 具体数量、每张卡的显存大小等信息,不然如果把 Pod 分配到一个所有 GPU 显存都已经消耗完的 Node 上不就出问题了。

感知节点上的 GPU 资源

Hami 是如何感知节点上的 GPU 情况的呢?也就是之前 start 方法中的这个 Goroutine 在维护,核心就是在这个 RegisterFromNodeAnnotations 方法中

go sher.RegisterFromNodeAnnotations()

精简后代码如下:

调用 kube-apiserver 获取到节点列表,然后从 node 的 Annoations 中解析出 Device 信息,并保存到内存中。

func (s *Scheduler) RegisterFromNodeAnnotations() {
    klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")
    ticker := time.NewTicker(time.Second * 15)
    for {
       select {
       case <-s.nodeNotify:
       case <-ticker.C:
       case <-s.stopCh:
          return
       }
       //  kube-apiserver 查询节点列表
       labelSelector := labels.Everything()
        if config.NodeLabelSelector != nil && len(config.NodeLabelSelector) > 0 {
            labelSelector = (labels.Set)(config.NodeLabelSelector).AsSelector()
        }
        rawNodes, err := s.nodeLister.List(labelSelector)
        if err != nil {
            klog.Errorln("nodes list failed", err.Error())
            continue
        }
        var nodeNames []string
        for _, val := range rawNodes {
            // 从 node 信息中解析出 GPU 信息
            nodedevices, err := devInstance.GetNodeDevices(*val)
        
            for _, deviceinfo := range nodedevices {
                found := false
                _, ok := s.nodes[val.Name]
                if ok {
                   for i1, val1 := range s.nodes[val.Name].Devices {
                      if strings.Compare(val1.ID, deviceinfo.ID) == 0 {
                         found = true
                         s.nodes[val.Name].Devices[i1].Devmem = deviceinfo.Devmem
                         s.nodes[val.Name].Devices[i1].Devcore = deviceinfo.Devcore
                         break
                      }
                   }
                }
                if !found {
                   nodeInfo.Devices = append(nodeInfo.Devices, util.DeviceInfo{
                      ID:           deviceinfo.ID,
                      Index:        uint(deviceinfo.Index),
                      Count:        deviceinfo.Count,
                      Devmem:       deviceinfo.Devmem,
                      Devcore:      deviceinfo.Devcore,
                      Type:         deviceinfo.Type,
                      Numa:         deviceinfo.Numa,
                      Health:       deviceinfo.Health,
                      DeviceVendor: devhandsk,
                   })
                }
            }
        // 最终将节点信息添加到缓存里
        s.addNode(val.Name, nodeInfo)
        }
       // .....

会将最新的 Node 数据存到内存中,便于调度时使用,就是 nodeManager 中的 nodes 这个 map 对象

type Scheduler struct {
    nodeManager
    podManager

    stopCh     chan struct{}
    kubeClient kubernetes.Interface
    podLister  listerscorev1.PodLister
    nodeLister listerscorev1.NodeLister
    //Node status returned by filter
    cachedstatus map[string]*NodeUsage
    nodeNotify   chan struct{}
    //Node Overview
    overviewstatus map[string]*NodeUsage

    eventRecorder record.EventRecorder
}
type nodeManager struct {
    nodes map[string]*util.NodeInfo
    mutex sync.RWMutex
}

重点来了:这里的数据来源是 node 的 Annoations,而这个 Annoations 就是上一篇中提到的 hami device plugin nvidia 中的一个后台 goroutine 在维护。

// returns all nodes and its device memory usage, and we filter it with nodeSelector, taints, nodeAffinity
// unschedulerable and nodeName.
func (s *Scheduler) getNodesUsage(nodes *[]string, task *corev1.Pod) (*map[string]*NodeUsage, map[string]string, error) {
    overallnodeMap := make(map[string]*NodeUsage)
    cachenodeMap := make(map[string]*NodeUsage)
    failedNodes := make(map[string]string)
    //for _, nodeID := range *nodes {
    allNodes, err := s.ListNodes()
    if err != nil {
       return &overallnodeMap, failedNodes, err
    }

    for _, node := range allNodes {
       nodeInfo := &NodeUsage{}
       userGPUPolicy := config.GPUSchedulerPolicy
       if task != nil && task.Annotations != nil {
          if value, ok := task.Annotations[policy.GPUSchedulerPolicyAnnotationKey]; ok {
             userGPUPolicy = value
          }
       }
       nodeInfo.Devices = policy.DeviceUsageList{
          Policy:      userGPUPolicy,
          DeviceLists: make([]*policy.DeviceListsScore, 0),
       }
       for _, d := range node.Devices {
          nodeInfo.Devices.DeviceLists = append(nodeInfo.Devices.DeviceLists, &policy.DeviceListsScore{
             Score: 0,
             Device: &util.DeviceUsage{
                ID:        d.ID,
                Index:     d.Index,
                Used:      0,
                Count:     d.Count,
                Usedmem:   0,
                Totalmem:  d.Devmem,
                Totalcore: d.Devcore,
                Usedcores: 0,
                Type:      d.Type,
                Numa:      d.Numa,
                Health:    d.Health,
             },
          })
       }
       overallnodeMap[node.ID] = nodeInfo
    }

    podsInfo := s.ListPodsInfo()
    for _, p := range podsInfo {
       node, ok := overallnodeMap[p.NodeID]
       if !ok {
          continue
       }
       for _, podsingleds := range p.Devices {
          for _, ctrdevs := range podsingleds {
             for _, udevice := range ctrdevs {
                for _, d := range node.Devices.DeviceLists {
                   if d.Device.ID == udevice.UUID {
                      d.Device.Used++
                      d.Device.Usedmem += udevice.Usedmem
                      d.Device.Usedcores += udevice.Usedcores
                   }
                }
             }
          }
       }
       klog.V(5).Infof("usage: pod %v assigned %v %v", p.Name, p.NodeID, p.Devices)
    }
    s.overviewstatus = overallnodeMap
    for _, nodeID := range *nodes {
       node, err := s.GetNode(nodeID)
       if err != nil {
          // The identified node does not have a gpu device, so the log here has no practical meaning,increase log priority.
          klog.V(5).InfoS("node unregistered", "node", nodeID, "error", err)
          failedNodes[nodeID] = "node unregistered"
          continue
       }
       cachenodeMap[node.ID] = overallnodeMap[node.ID]
    }
    s.cachedstatus = cachenodeMap
    return &cachenodeMap, failedNodes, nil
}

大概长这样:

root@test:~# k get node test -oyaml
apiVersion: v1
kind: Node
metadata:
  annotations:
    csi.volume.kubernetes.io/nodeid: '{"nfs.csi.k8s.io":"j99cloudvm","rbd.csi.ceph.com":"j99cloudvm"}'
    hami.io/node-handshake: Requesting_2024.11.19 03:10:32
    hami.io/node-handshake-dcu: Deleted_2024.09.13 06:42:44
    hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA
      A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA
      A40,0,true:'
    kubeadm.alpha.kubernetes.io/cri-socket: /run/containerd/containerd.sock

其中下面这个就是 GPU 的具体信息,包括 ID、型号、内存等信息。

hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA
      A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA
      A40,0,true:'

感知节点上 GPU 使用情况

除此之外,hami 还需要感知到 GPU 的使用情况,才能计算出还有多少 gpucore、gpumem 可以使用

这里使用的 client-go 中提供的 Informer 机制,Watch Pod 和 Node 的变化情况,获取 Node 上运行的 Pod,根据 Pod 申请的资源和 Node 上的总资源计算出剩余资源。

// pkg/scheduler/scheduler.go#L127
func (s *Scheduler) Start() {
    kubeClient, err := k8sutil.NewClient()
    check(err)
    s.kubeClient = kubeClient
    informerFactory := informers.NewSharedInformerFactoryWithOptions(s.kubeClient, time.Hour*1)
    s.podLister = informerFactory.Core().V1().Pods().Lister()
    s.nodeLister = informerFactory.Core().V1().Nodes().Lister()

    informer := informerFactory.Core().V1().Pods().Informer()
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
       AddFunc:    s.onAddPod,
       UpdateFunc: s.onUpdatePod,
       DeleteFunc: s.onDelPod,
    })
    informerFactory.Core().V1().Nodes().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
       AddFunc:    s.onAddNode,
       UpdateFunc: s.onUpdateNode,
       DeleteFunc: s.onDelNode,
    })
    informerFactory.Start(s.stopCh)
    informerFactory.WaitForCacheSync(s.stopCh)
    s.addAllEventHandlers()
}

其中 Pod Create、Update 时都会j进入下面这个 onAddPod 逻辑:

AssignedNodeAnnotations = "hami.io/vgpu-node"

// pkg/scheduler/scheduler.go#L92
func (s *Scheduler) onAddPod(obj interface{}) {
    pod, ok := obj.(*corev1.Pod)
    if !ok {
       klog.Errorf("unknown add object type")
       return
    }
    nodeID, ok := pod.Annotations[util.AssignedNodeAnnotations]
    if !ok {
       return
    }
    if k8sutil.IsPodInTerminatedState(pod) {
       s.delPod(pod)
       return
    }
    podDev, _ := util.DecodePodDevices(util.SupportDevices, pod.Annotations)
    s.addPod(pod, nodeID, podDev)
}

这边限制了只会处理 hami.io/vgpu-node annoations 的 Pod,过滤掉其他 Pod,从 Pod Annoations 中解析出该 Pod 使用的 GPU UUID 以及 memory 和 core 等信息。

Pod 上的 Annoations 大概是这样的:

$ k get po gpu-pod -oyaml
apiVersion: v1
kind: Pod
metadata:
  annotations:
    hami.io/bind-phase: success
    hami.io/bind-time: "1727251686"
    hami.io/vgpu-devices-allocated: GPU-03f69c50-207a-2038-9b45-23cac89cb67d,NVIDIA,3000,30:;
    hami.io/vgpu-devices-to-allocate: ;
    hami.io/vgpu-node: test
    hami.io/vgpu-time: "1727251686"

其中hami.io/vgpu-devices-allocated 对应的 value 就是 GPU 信息,格式化后如下

GPU-03f69c50-207a-2038-9b45-23cac89cb67d,NVIDIA,3000,30:;
  • GPU-03f69c50-207a-2038-9b45-23cac89cb67d:设备 UUID

  • NVIDIA:设备类型

  • 3000:使用 3000M memory

  • 30:使用 30% core

至于 Pod 删除则是直接删除内存中缓存的 Pod 信息即可。

Node 的变化则比较简单,就是往 nodeNotify channel 发送通知

func (s *Scheduler) onUpdateNode(_, newObj interface{}) {
    s.nodeNotify <- struct{}{}
}

func (s *Scheduler) onDelNode(obj interface{}) {
    s.nodeNotify <- struct{}{}
}

func (s *Scheduler) onAddNode(obj interface{}) {
    s.nodeNotify <- struct{}{}
}

这个消息最终就会立即触发一次上面的 RegisterFromNodeAnnotations 逻辑

默认是定时的,每 15s 触发一次,增加 notify 可以在节点变化时更快感知到

func (s *Scheduler) RegisterFromNodeAnnotations() {
    klog.V(5).Infoln("Scheduler into RegisterFromNodeAnnotations")
    ticker := time.NewTicker(time.Second * 15)
    for {
       select {
       case <-s.nodeNotify:
       case <-ticker.C:
       case <-s.stopCh:
          return
       }
       
       // .....

通过这个 Informer HAMi 可以知道以下信息:

  • 集群中的 GPU 节点情况,每个节点上的 GPU 设备信息

  • Pod 对 GPU 的具体使用情况,包括 memory、core 使用量等

通过这些信息,就可以完成后续的 Scheduler 逻辑了。

4. 调度实现

上一步拿到集群中的 GPU 信息之后,就可以开始调度了,具体实现分为两个接口:

  • Filter:根据各种条件进行过滤,为当前 Pod 选择合适的节点

  • Bind:将 Pod 最终后某一个节点进行绑定,完成调度

Filter 接口

看下看 Filter 接口是怎么进行节点过滤的

// pkg/scheduler/scheduler.go#L444
func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
    klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)
    nums := k8sutil.Resourcereqs(args.Pod)
    total := 0
    for _, n := range nums {
       for _, k := range n {
          total += int(k.Nums)
       }
    }
    if total == 0 {
       klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
       return &extenderv1.ExtenderFilterResult{
          NodeNames:   args.NodeNames,
          FailedNodes: nil,
          Error:       "",
       }, nil
    }
    annos := args.Pod.Annotations
    s.delPod(args.Pod)
    nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
    if err != nil {
       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
       return nil, err
    }
    if len(failedNodes) != 0 {
       klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)
    }
    nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
    if err != nil {
       err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
       return nil, err
    }
    if len((*nodeScores).NodeList) == 0 {
       klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
       return &extenderv1.ExtenderFilterResult{
          FailedNodes: failedNodes,
       }, nil
    }
    klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
    sort.Sort(nodeScores)
    m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
    klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
    annotations := make(map[string]string)
    annotations[util.AssignedNodeAnnotations] = m.NodeID
    annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

    for _, val := range device.GetDevices() {
       val.PatchAnnotations(&annotations, m.Devices)
    }

    //InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
    //supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
    //maps.Copy(annotations, InRequestDevices)
    //maps.Copy(annotations, supportDevices)
    s.addPod(args.Pod, m.NodeID, m.Devices)
    err = util.PatchPodAnnotations(args.Pod, annotations)
    if err != nil {
       s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
       s.delPod(args.Pod)
       return nil, err
    }
    s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
    res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
    return &res, nil
}

对于没有申请特殊资源的 Pod 直接返回全部 Node 都可以调度,不做处理

nums := k8sutil.Resourcereqs(args.Pod)
total := 0
for _, n := range nums {
    for _, k := range n {
       total += int(k.Nums)
    }
}
if total == 0 {
    klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
    s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
    return &extenderv1.ExtenderFilterResult{
       NodeNames:   args.NodeNames,
       FailedNodes: nil,
       Error:       "",
    }, nil
}

否则就根据上一步中获取到的 Node 信息进行打分,具体打分逻辑则是根据每个节点上的已经使用的 GPU Core、GPU Memory 资源和总的GPU Core、GPU Memory 的比值,根据权重归一化处理后得到最终的得分。

总的来说就是:节点上 GPU Core 和 GPU Memory 资源剩余越少,得分越高

// pkg/scheduler/policy/node_policy.go#L52
func (ns *NodeScore) ComputeScore(devices DeviceUsageList) {
    // current user having request resource
    used, usedCore, usedMem := int32(0), int32(0), int32(0)
    for _, device := range devices.DeviceLists {
       used += device.Device.Used
       usedCore += device.Device.Usedcores
       usedMem += device.Device.Usedmem
    }
    klog.V(2).Infof("node %s used %d, usedCore %d, usedMem %d,", ns.NodeID, used, usedCore, usedMem)

    total, totalCore, totalMem := int32(0), int32(0), int32(0)
    for _, deviceLists := range devices.DeviceLists {
       total += deviceLists.Device.Count
       totalCore += deviceLists.Device.Totalcore
       totalMem += deviceLists.Device.Totalmem
    }
    useScore := float32(used) / float32(total)
    coreScore := float32(usedCore) / float32(totalCore)
    memScore := float32(usedMem) / float32(totalMem)
    ns.Score = float32(Weight) * (useScore + coreScore + memScore)
    klog.V(2).Infof("node %s computer score is %f", ns.NodeID, ns.Score)
}

除了计算得分还要判断节点剩余资源是否能满足当前 Pod,如果不满足则直接忽略掉

// pkg/scheduler/score.go#L185
func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) {
    userNodePolicy := config.NodeSchedulerPolicy
    if annos != nil {
       if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok {
          userNodePolicy = value
       }
    }
    res := policy.NodeScoreList{
       Policy:   userNodePolicy,
       NodeList: make([]*policy.NodeScore, 0),
    }

    //func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) {
    // res := make(NodeScoreList, 0, len(*nodes))
    for nodeID, node := range *nodes {
       viewStatus(*node)
       score := policy.NodeScore{NodeID: nodeID, Devices: make(util.PodDevices), Score: 0}
       score.ComputeScore(node.Devices)

       //This loop is for different container request
       ctrfit := false
       for ctrid, n := range nums {
          sums := 0
          for _, k := range n {
             sums += int(k.Nums)
          }

          if sums == 0 {
             for idx := range score.Devices {
                if len(score.Devices[idx]) <= ctrid {
                   score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{})
                }
                score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{})
                continue
             }
          }
          klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID)
          fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
          ctrfit = fit
          if !fit {
             klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID)
             break
          }
       }

       if ctrfit {
          res.NodeList = append(res.NodeList, &score)
       }
    }
    return &res, nil
}

计算完成之后从中选了一个节点进行调度。

//计算得分,拿到所有满足条件的节点
nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)

// 排序
sort.Sort(nodeScores)
// 直接选择最后一个节点
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]

// 返回结果
res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
return &res, nil

到这里,我们已经拿到了最终要调度的 Node 了,调度逻辑就结束了。

这里大家可能会有疑问:为什么 Filter 方法就只返回了一个节点,甚至还融合了打分的逻辑在里面。

这个问题在上一篇K8s 自定义调度器 Part1:通过 Scheduler Extender 实现自定义调度逻辑 中已经有解释了,如果按照正常逻辑实现 Filter、Score 等方法,最终 Scheduler 会汇总多个插件的打分,然后根据最终结果选择一个节点,但是 HAMi 这边是要完全控制调度结果的,因此直接将 Filter、Score 逻辑融合到一起,最终就只返回一个目标节点,这样最后肯定会调度到该节点。

Bind 接口

很简单,直接根据 Filter 的返回结果,将 Pod 和 Node 绑定即可完成调度。

func (s *Scheduler) Bind(args extenderv1.ExtenderBindingArgs) (*extenderv1.ExtenderBindingResult, error) {
    klog.InfoS("Bind", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)
    var err error
    var res *extenderv1.ExtenderBindingResult
    binding := &corev1.Binding{
       ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID},
       Target:     corev1.ObjectReference{Kind: "Node", Name: args.Node},
    }
    current, err := s.kubeClient.CoreV1().Pods(args.PodNamespace).Get(context.Background(), args.PodName, metav1.GetOptions{})
    if err != nil {
       klog.ErrorS(err, "Get pod failed")
    }

    node, err := s.kubeClient.CoreV1().Nodes().Get(context.Background(), args.Node, metav1.GetOptions{})
    if err != nil {
       klog.ErrorS(err, "Failed to get node", "node", args.Node)
       s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, fmt.Errorf("failed to get node %v", args.Node))
       res = &extenderv1.ExtenderBindingResult{
          Error: err.Error(),
       }
       return res, nil
    }

    tmppatch := make(map[string]string)
    for _, val := range device.GetDevices() {
       err = val.LockNode(node, current)
       if err != nil {
          goto ReleaseNodeLocks
       }
    }
    /*
       err = nodelock.LockNode(args.Node)
       if err != nil {
          klog.ErrorS(err, "Failed to lock node", "node", args.Node)
          res = &extenderv1.ExtenderBindingResult{
             Error: err.Error(),
          }
          return res, nil
       }*/
    //defer util.ReleaseNodeLock(args.Node)

    tmppatch[util.DeviceBindPhase] = "allocating"
    tmppatch[util.BindTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)

    err = util.PatchPodAnnotations(current, tmppatch)
    if err != nil {
       klog.ErrorS(err, "patch pod annotation failed")
    }
    if err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{}); err != nil {
       klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)
    }
    if err == nil {
       s.recordScheduleBindingResultEvent(current, EventReasonBindingSucceed, []string{args.Node}, nil)
       res = &extenderv1.ExtenderBindingResult{
          Error: "",
       }
       klog.Infoln("After Binding Process")
       return res, nil
    }
ReleaseNodeLocks:
    klog.InfoS("bind failed", "err", err.Error())
    for _, val := range device.GetDevices() {
       val.ReleaseNodeLock(node, current)
    }
    s.recordScheduleBindingResultEvent(current, EventReasonBindingFailed, []string{}, err)
    return &extenderv1.ExtenderBindingResult{
       Error: err.Error(),
    }, nil
}

核心部分:

binding := &corev1.Binding{
   ObjectMeta: metav1.ObjectMeta{Name: args.PodName, UID: args.PodUID},
   Target:     corev1.ObjectReference{Kind: "Node", Name: args.Node},
}
if err = s.kubeClient.CoreV1().Pods(args.PodNamespace).Bind(context.Background(), binding, metav1.CreateOptions{}); err != nil {
   klog.ErrorS(err, "Failed to bind pod", "pod", args.PodName, "namespace", args.PodNamespace, "podUID", args.PodUID, "node", args.Node)
}

调用 api 创建了一个 binding 对象,将 Pod 调度到指定节点即可。

至此,Scheduler 的逻辑我们就分析完了,调度完成 Kubelet 就开始启动 Pod 了,然后 hami 的 device plugin 也要开始发挥作用了。

小结

这里 HAMi 是使用默认的 kube-scheduler 镜像额外启动了一个 scheduler,但是通过配置把名称指定为了 hami-scheduler。

然后给这个 hami-scheduler 配置了 Extender,Extender 服务就是同 Pod 中的另一个Container 启动的一个 http 服务。

ps:我们说的 hami-scheduler 一般只这部分 Extender 实现的调度插件

然后在调度可以分为两部分:

  • 1)获取 GPU 信息

    • 从 Node Annoations 中获取节点上的 GPU 资源信息

    • 从 Pod Annoations 中获取 GPU 的使用情况

  • 2)按配置策略进行节点选择并完成调度

    • 直接在 Filter 接口按照得分排序后返回最推荐的一个节点,以实现完全控制调度结果
  • 按照 GPU memory、core 剩余情况计算得分,剩余资源越多得分越低

5. 小结

本文主要分析了 hami-scheduler 的实现原理,其中包含两个组件:

  • Webhook:根据 Pod Resource 中的 ResourceName 判断该 Pod 是否使用的 HAMi vGPU,如果是则修改 Pod 的 SchedulerName 为 hami-scheduler,让 hami-scheduler 进行调度。

  • Scheduler:以 kube-shceduler 为镜像启动服务并改名为 hami-scheduler,然后通过配置 extender 接入真正的 hami-scheduler 逻辑。

    • 从 Node 的 Annoations 上解析拿到 GPU 资源信息,从已经运行的 Pod Annoations 上解析拿到 Pod 消耗的 GPU 资源计算出每个 Node 上真实可用的 GPU 资源

    • 根据节点剩余资源进行打分,然后根据配置的 Spread、Binpack 调度策略选择得分最高或最低的节点,将 Pod 进行调度。

HAMi Webhook 、Scheduler 工作流程如下:

https://img.lixueduan.com/kubernetes/vgpu/hami-scheduler.png

  • 1)用户创建 Pod 并在 Pod 中申请了 vGPU 资源
  • 2)kube-apiserver 根据 MutatingWebhookConfiguration 配置请求 HAMi-Webhook
  • 3)HAMi-Webhook 检测 Pod 中的 Resource,如果申请的由 HAMi 管理的 vGPU 资源,就会把 Pod 中的 SchedulerName 改成了 hami-scheduler,这样这个 Pod 就会由 hami-scheduler 进行调度了。
    • 对于特权模式的 Pod,Webhook 会直接跳过不处理

    • 对于使用 vGPU 资源但指定了 nodeName 的 Pod,Webhook 会直接拒绝

  • 4)hami-scheduler 进行 Pod 调度,不过就是用的 k8s 的默认 kube-scheduler 镜像,因此调度逻辑和默认的 default-scheduler 是一样的,但是 kube-scheduler 还会根据 KubeSchedulerConfiguration 配置,调用 Extender Scheduler 插件
    • 这个 Extender Scheduler 就是 hami-scheduler Pod 中的另一个 Container,该 Container 同时提供了 Webhook 和 Scheduler 相关 API。

    • 当 Pod 申请了 vGPU 资源时,kube-scheduler 就会根据配置以 HTTP 形式调用 Extender Scheduler 插件,这样就实现了自定义调度逻辑

  • 5)Extender Scheduler 插件包含了真正的 hami 调度逻辑, 调度时根据节点剩余资源量进行打分选择节点
    • 这里就包含了spread & binpark 等 高级调度策略的实现
  • 6)异步任务,包括 GPU 感知逻辑
    • devicePlugin 中的后台 Goroutine 定时上报 Node 上的 GPU 资源并写入到 Node 的 Annoations
    • 除了 DevicePlugin 之外,还使用异步任务以 Patch Annotation 方式提交更多信息
    • Extender Scheduler 插件根据 Node Annoations 解析出 GPU 资源总量、从 Node 上已经运行的 Pod 的 Annoations 中解析出 GPU 使用量,计算出每个 Node 剩余的可用资源保存到内存供调度时使用

至此,HAMi Webhook、Scheduler 就分析完了,spread & binpark 等 高级调度策略是如何实现的留着下篇分析~。