Contents

HAMi 异构设备虚拟化之海光 DCU 实战:K8s 下的 vDCU 资源调度与纳管

https://img.lixueduan.com/kubernetes/cover/hami-vdcu.png

之前通过一系列文章分析了 HAMI vGPU 部署使用以及背后的实现原理,本文则是分析海光 DCU 设备如何通过 HAMi 完成虚拟化,实现统一纳管与调度。

HAMi 系列文章推荐阅读~:

1. vDCU 相关命令

在这之前简单过一下海光 DCU 自带的 vDCU 功能如何使用。

1.1 查看物理 DCU 资源

$ hy-smi virtual -show-device-info

Device 0:
 Actual Device: 0
 Compute units: 60
 Global memory: 34342961152 bytes
Device 1:
 Actual Device: 1
 Compute units: 60
 Global memory: 34342961152 bytes

1.2 拆分 vDCU

拆分 DCU 为 4 份,分别包含 5,15,20,20 个计算单元以及 4096,8192,8192,8192MiB 的显存

$ hy-smi virtual -create-vdevices 4 -d 0 \
-vdevice-compute-units 5,15,20,20 \
-vdevice-memory-size 4096,8192,8192,8192

The virtual device is created successfully!

1.3 查看 vdcu 信息

$ hy-smi virtual -show-vdevice-info
Virtual Device 0:
 Actual Device: 7
 Compute units: 5
 Global memory: 4294967296 bytes
- 5 -
Virtual Device 1:
 Actual Device: 7
 Compute units: 15
 Global memory: 8589934592 bytes
Virtual Device 2:
 Actual Device: 7
 Compute units: 20
 Global memory: 8589934592 bytes
Virtual Device 3:
 Actual Device: 7
 Compute units: 20
 Global memory: 8589934592 bytes

1.4 销毁 vDCU

# -d ${dev_id} 指定物理 DCU ID,不指定则选定所有物理 DCU
# -destroy-vdevice 销毁此物理 DCU 上所有 vDCU
hy-smi virtual -d ${dev_id} -destroy-vdevice

1.5 Docker 使用 vDCU

在容器启动的时候执行以下命令将 vDCU 挂载至容器内。以下命令表示用户在启动容器时,挂载第 0 号 vDCU 实例。

docker run -it --name container_name \ 
--device=/dev/kfd \ 
--device=/dev/dri \ 
--device=/dev/mkfd \ 
-v /etc/vdev/vdev0.conf:/etc/vdev/docker/vdev0.conf:ro \ 
${docker_image:tag} \ 
/bin/bash

可以看到除了通过 –device 方式挂载 kfd、dri、mkfd 等设备文件之外,还额外挂载了一个 /etc/vdev/vdev0.conf 文件到容器里。

实际上这个文件就是 vDCU 的配置文件,内部记录了该 vDCU 的详细信息。

虽然看不到相关源码,不过可以肯定的是,驱动程序肯定会查看该文件是否存在,如果存在则说明当前使用的是 vDCU,然后根据该文件中的信息来找到对应的物理 DCU,以及 core、mem 等信息以完成限制。

接下来看下 HAMi 中如何使用 vDCU。

2.部署 HAMi

前提条件:

  • dtk 驱动程序 >= 24.04

2.1 部署

使用 Helm 部署

# 添加 repo
helm repo add hami-charts https://project-hami.github.io/HAMi/
# 安装
helm install hami hami-charts/hami -n kube-system

ps:可以通过调整 HAMi 部署配置来自定义你的安装。

如果当前环境无法拉取在线镜像,可以指定镜像仓库

export registry=192.168.10.172:5000
# 不指定 tag,会自动使用当前集群同样版本,需要确保 Registry 有该镜像
export kubescheduler=$registry/kube-scheduler

helm upgrade --install hami hami-charts/hami -n kube-system \
--set scheduler.kubeScheduler.image=$kubescheduler \
--set scheduler.extender.image=$registry/projecthami/hami \
--set scheduler.patch.imageNew=$registry/liangjw/kube-webhook-certgen:v1.1.1

2.2 检查 Pod 运行情况

$ kubectl -n kube-system get po -l app.kubernetes.io/name=hami
NAME                              READY   STATUS    RESTARTS   AGE
hami-scheduler-6dbdf69644-mz9m5   2/2     Running   0          2m19s

如果 hami-scheduler Running 说明安装成功。

ps:对于 DCU 环境只会启动 hami-scheduler 一个 Pod。

3. 部署 hami-vdcu-device-plugin

对应的 DevicePlugin 也是用 HAMi 社区提供的版本。

先给 DCU 节点打上 label dcu=on

kubectl label nodes {nodeid} dcu=on

3.1 准备工作

然后做以下准备工作

创建 vdev 目录

# on the dcu node, create these directory:
$ mkdir /etc/vdev

将 dtk 复制到 /opt/dtk 目录

因为容器会统一挂载 /opt/dtk ,因此将其从部署目录 cp 到指定目录

# should change dtk-xx.xx.x to your installed dtk version
$ cp -r /opt/dtk-xx.xx.x /opt/dtk

注意:/opt/dtk-xx.xx.x 这个位置取决于之前部署 DTK 时指定的目录

3.2 部署

然后就可以开始部署了

https://github.com/Project-HAMi/dcu-vgpu-device-plugin 项目获取相关 yaml:

wget https://raw.githubusercontent.com/Project-HAMi/dcu-vgpu-device-plugin/refs/heads/master/k8s-dcu-plugin.yaml
wget https://raw.githubusercontent.com/Project-HAMi/dcu-vgpu-device-plugin/refs/heads/master/k8s-dcu-rbac.yaml

然后部署 device-plugin

kubectl apply -f k8s-dcu-rbac.yaml
kubectl apply -f k8s-dcu-plugin.yaml

3.3 检查 Pod 运行情况

确认相关组件都正常运行:

$ kubectl get po -n kube-system
NAMESPACE     NAME                                   READY   STATUS    RESTARTS   AGE
kube-system   hami-dcu-vgpu-device-plugin-cgwwv      1/1     Running   4          14m
kube-system   hami-dcu-vgpu-device-plugin-lp7gl      1/1     Running   0          8m36s
kube-system   hami-dcu-vgpu-device-plugin-lswbl      1/1     Running   0          8m40s
kube-system   hami-dcu-vgpu-device-plugin-svwl7      1/1     Running   0          8m38s
kube-system   hami-scheduler-bb5586989-q7bvr         2/2     Running   0          54m

可以看到,几个 DCU 节点上的 dcu-vdcu-device-plugin 都运行正常。

至此,相关组件部署完成,接下来验证下 vDCU 能否正常使用了。

4. 验证 vdcu

4.1 查看 Node 资源

接下来就是查看节点资源情况,vdcu 是否正常注册。

             
$ kubectl describe node d41gpucns41 | grep Capacity -A 8   
Capacity:
  cpu:                64
  ephemeral-storage:  934609028Ki
  hugepages-1Gi:      0
  hugepages-2Mi:      0
  hygon.com/dcu:      0
  hygon.com/dcunum:   32
  memory:             1043017856Ki
  pods:               500

这里的 hygon.com/dcunum: 32 就是 vdcu 数量,单节点 8 DCU,这里是做了 4 倍切分。

ps: 因为当前一块海光物理 DCU 只支持切分为 4 个 vDCU。

4.2 启动 Pod 使用 vdcu

完整 yaml 如下:

apiVersion: v1
kind: Pod
metadata:
  name: alexnet-tf-gpu-pod-mem
  labels:
    purpose: demo-tf-amdgpu
spec:
  containers:
    - name: alexnet-tf-gpu-container
      image: ubuntu:20.04
      workingDir: /root
      command: ["sleep","infinity"]
      resources:
        limits:
          hygon.com/dcunum: 1 # requesting a GPU
          hygon.com/dcumem: 2000 # each dcu require 2000 MiB device memory
          hygon.com/dcucores: 15 # each dcu use 15% of total compute cores

和 vGPU 类似,支持单独指定卡数、core、mem 等资源。

进入 Pod 后先 source 下环境变量

source /opt/hygondriver/env.sh

然后使用以下命令验证

hy-virtual -show-device-info

正常输出如下:

Device 0:
        Actual Device: 0
        Compute units: 9
        Global memory: 2097152000 bytes

Compute unitsGlobal memory 就是我们前面指定的 core 和 mem。

至此,说明 HAMi vdcu 已经生效了。

4.3 注意事项

一些注意事项:

  1. 如果您的镜像不是 ‘dtk-embedded-image’,则需要在任务运行后安装 pciutiilslibelf-devkmod,否则,像 hy-smihy-virtual 这样的 dcu 工具可能无法正常工作。

  2. 不支持在 init 容器中共享 DCU,init 容器中带有 “hygon.com/dcumem” 的 pod 将永远不会被调度。

  3. 每个容器只能获取一个 vdcu。如果您想挂载多个 dcu 设备,则不应设置 hygon.com/dcumemhygon.com/dcucores

5. 实现分析

简单分析一下 vdcu-device-plugin,根据第一章中使用 vDCU 的步骤可知,vdcu-device-plugin

除了正常的上报设备信息,分配设备之外,还需要处理以下事情:

  • 1)vDCU 配置文件维护
    • 创建 Pod 时根据 yaml 中申请的 core、mem 生成对应的 vdcu 配置文件
    • Pod 删除后也需要删除对应的配置文件
  • 2)挂载 vDCU 配置文件到 Pod 中
    • 只有这样驱动程序才知道将该 vDCU 限制在多少 core、mem

5.1 Register

首先是要将 DevicePlugin 注册到 Kubelet,这里用的是 device-plugin-manager 因此没有单独注册的代码,不过也可以看下在启动的时候做了哪些工作。

// Start is an optional interface that could be implemented by plugin.
// If case Start is implemented, it will be executed by Manager after
// plugin instantiation and before its registration to kubelet. This
// method could be used to prepare resources before they are offered
// to Kubernetes.
func (p *Plugin) Start() error {
	var err error

  // 初始化 device info
	dcgm.Init()
	p.devices, err = dcgm.DeviceInfos()
	if err != nil {
		log.Fatalf("dcgm DeviceInfos failed:%s", err.Error())
	}
	for idx := range p.devices {
		p.devices[idx].DevTypeName = fmt.Sprintf("%v-%v", "DCU", p.devices[idx].DevTypeName)
	}
	fmt.Println("infos=", p.devices)

	for idx, val := range p.devices {
		p.coremask[idx][0] = initCoreUsage(int(val.ComputeUnit))
		p.coremask[idx][1] = initCoreUsage(int(val.ComputeUnit))
	}
  
 
	go p.WatchAndRegister()
	return nil
}

启动时通过dcgm.DeviceInfos() 获取 DCU 信息

	dcgm.Init()
	p.devices, err = dcgm.DeviceInfos()
	if err != nil {
		log.Fatalf("dcgm DeviceInfos failed:%s", err.Error())
	}
	for idx := range p.devices {
		p.devices[idx].DevTypeName = fmt.Sprintf("%v-%v", "DCU", p.devices[idx].DevTypeName)
	}
	fmt.Println("infos=", p.devices)

	for idx, val := range p.devices {
		p.coremask[idx][0] = initCoreUsage(int(val.ComputeUnit))
		p.coremask[idx][1] = initCoreUsage(int(val.ComputeUnit))
	}

DeviceInfos

具体实现

// DeviceInfos 获取设备信息列表
// @Summary 获取设备信息列表
// @Description 返回所有设备的详细信息列表
// @Produce json
// @Success 200 {array} DeviceInfo "返回设备信息列表"
// @Failure 500 {object} error "服务器内部错误"
// @Router /DeviceInfos [get]
func DeviceInfos() (deviceInfos []DeviceInfo, err error) {
	numDevices, err := rsmiNumMonitorDevices()
	if err != nil {
		return nil, err
	}
	for i := 0; i < numDevices; i++ {
		bdfid, err := rsmiDevPciIdGet(i)
		if err != nil {
			return nil, err
		}
		// 解析BDFID
		domain := (bdfid >> 32) & 0xffffffff
		bus := (bdfid >> 8) & 0xff
		dev := (bdfid >> 3) & 0x1f
		function := bdfid & 0x7
		// 格式化PCI ID
		pciBusNumber := fmt.Sprintf("%04X:%02X:%02X.%X", domain, bus, dev, function)
		//设备序列号
		deviceId, _ := rsmiDevSerialNumberGet(i)
		//获取设备类型标识id
		devTypeId, _ := rsmiDevIdGet(i)
		devType := fmt.Sprintf("%x", devTypeId)
		//型号名称
		devTypeName := type2name[devType]
		//获取设备内存总量
		memoryTotal, _ := rsmiDevMemoryTotalGet(i, RSMI_MEM_TYPE_FIRST)
		mt, _ := strconv.ParseFloat(fmt.Sprintf("%f", float64(memoryTotal)/1.0), 64)
		glog.Info(" DCU[%v] memory total memory total: %.0f", i, mt)
		//获取设备内存使用量
		memoryUsed, _ := rsmiDevMemoryUsageGet(i, RSMI_MEM_TYPE_FIRST)
		mu, _ := strconv.ParseFloat(fmt.Sprintf("%f", float64(memoryUsed)/1.0), 64)
		glog.Info(" DCU[%v] memory used :%.0f", i, mu)
		computeUnit := computeUnitType[devTypeName]
		glog.Info(" DCU[%v] computeUnit : %.0f", i, computeUnit)
		deviceInfo := DeviceInfo{
			DvInd:        i,
			DeviceId:     deviceId,
			DevType:      devType,
			DevTypeName:  devTypeName,
			PciBusNumber: pciBusNumber,
			MemoryTotal:  mt,
			MemoryUsed:   mu,
			ComputeUnit:  computeUnit,
		}
		deviceInfos = append(deviceInfos, deviceInfo)
	}
	glog.Info("deviceInfos: ", dataToJson(deviceInfos))
	return
}

这块用的 CGO 调用 C 库实现的,大家感兴趣可以自行查看~

package dcgm

/*
#cgo CFLAGS: -Wall -I./include
#cgo LDFLAGS: -L./lib -lrocm_smi64 -lhydmi -Wl,--unresolved-symbols=ignore-in-object-files
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <kfd_ioctl.h>
#include <rocm_smi64Config.h>
#include <rocm_smi.h>
#include <dmi_virtual.h>
#include <dmi_error.h>
#include <dmi.h>
#include <dmi_mig.h>
*/
import "C"
import (
	"encoding/json"
	"fmt"
	"os"
	"unsafe"

	"github.com/golang/glog"
)

// rsmiNumMonitorDevices 获取gpu数量 *
func rsmiNumMonitorDevices() (gpuNum int, err error) {
	var p C.uint
	ret := C.rsmi_num_monitor_devices(&p)
	//glog.Info("go_rsmi_num_monitor_devices_ret:", ret)
	if err = errorString(ret); err != nil {
		return 0, fmt.Errorf("Error go_rsmi_num_monitor_devices_ret: %s", err)
	}
	gpuNum = int(p)
	//glog.Info("go_rsmi_num_monitor_devices:", gpuNum)
	return gpuNum, nil
}

// rsmiDevPciIdGet 获取唯一pci设备标识符
func rsmiDevPciIdGet(dvInd int) (bdfid int64, err error) {
	var cbdfid C.uint64_t
	ret := C.rsmi_dev_pci_id_get(C.uint32_t(dvInd), &cbdfid)
	//glog.Infof("rsmi_dev_pci_id_get ret:%v, retStr:%v", ret, errorString(ret))
	if err = errorString(ret); err != nil {
		glog.Errorf("rsmi_dev_pci_id_get err:%v", err.Error())
		return bdfid, err
	}
	bdfid = int64(cbdfid)
	//glog.Infof("rsmiDevPciIdGet bdfid:%v", bdfid)
	return
}

WatchAndRegister

该方法主要调用两个方法:

  • RefreshContainerDevices:维护 vdcu 配置信息,Pod 删除后移除对应文件
  • RegistrInAnnotation:将节点上的物理 DCU 信息记录到节点的 Annoation 上。
func (r *Plugin) WatchAndRegister() {
	klog.Info("into WatchAndRegister")
	for {
		r.RefreshContainerDevices()
		err := r.RegistrInAnnotation()
		if err != nil {
			klog.Errorf("register error, %v", err)
			time.Sleep(time.Second * 5)
		} else {
			time.Sleep(time.Second * 30)
		}
	}
}

RefreshContainerDevices

根据 /usr/local/vgpu/dcu 目录下的文件和 Kubernetes 中的 Pod 信息,更新 vdcu 使用情况,并清理不再使用的 vdcu 配置文件,以确保设备状态信息与实际使用情况保持一致。

func (p *Plugin) RefreshContainerDevices() error {
	files, err := os.ReadDir("/usr/local/vgpu/dcu")
	if err != nil {
		return err
	}
	idx := 0
	for idx < len(p.devices) {
		p.coremask[idx][0] = initCoreUsage(int(p.devices[idx].ComputeUnit))
		p.coremask[idx][1] = initCoreUsage(int(p.devices[idx].ComputeUnit))
		idx++
	}

	for _, f := range files {
		pods, err := client.GetClient().CoreV1().Pods("").List(context.Background(), metav1.ListOptions{})
		if err != nil {
			return err
		}
		found := false
		for _, val := range pods.Items {
			if strings.Contains(f.Name(), string(val.UID)) {
				found = true
				var didx, pid, vdidx int
				tmpstr := strings.Split(f.Name(), "_")
				didx, _ = strconv.Atoi(tmpstr[2])
				pid, _ = strconv.Atoi(tmpstr[3])
				vdidx, _ = strconv.Atoi(tmpstr[4])
				p.coremask[didx][0], _ = addCoreUsage(p.coremask[didx][0], tmpstr[5])
				p.coremask[didx][1], _ = addCoreUsage(p.coremask[didx][1], tmpstr[6])
				p.vidx[vdidx] = true
				p.pipeid[didx][pid] = true
			}
		}
		if !found {
			var didx, pid, vdidx int
			tmpstr := strings.Split(f.Name(), "_")
			didx, _ = strconv.Atoi(tmpstr[2])
			pid, _ = strconv.Atoi(tmpstr[3])
			vdidx, _ = strconv.Atoi(tmpstr[4])
			p.vidx[vdidx] = false
			p.pipeid[didx][pid] = false
			os.RemoveAll("/usr/local/vgpu/dcu/" + f.Name())
			os.Remove(fmt.Sprintf("/etc/vdev/vdev%d.conf", vdidx))
		}
		fmt.Println(f.Name())
	}
	fmt.Println(p.coremask)
	return nil
}

RegistrInAnnotation

将 Device 信息记录到 Node Annoation 上,这样 hami-scheduler 可以从 Annoation 中拿到每个节点上 DCU 的详细信息。

func (r *Plugin) RegistrInAnnotation() error {
	devices := r.apiDevices()
	annos := make(map[string]string)
	if len(util.NodeName) == 0 {
		util.NodeName = os.Getenv(util.NodeNameEnvName)
	}
	node, err := util.GetNode(util.NodeName)
	if err != nil {
		klog.Errorln("get node error", err.Error())
		return err
	}
	encodeddevices := util.EncodeNodeDevices(*devices)
	annos[util.HandshakeAnnosString] = "Reported " + time.Now().String()
	annos[util.RegisterAnnos] = encodeddevices
	klog.Infoln("Reporting devices", encodeddevices, "in", time.Now().String())
	err = util.PatchNodeAnnotations(node, annos)

	if err != nil {
		klog.Errorln("patch node error", err.Error())
	}
	return err
}

ListAndWatch

ListAndWatch 检测节点上的 DCU 并上报给 Kubelet,由 Kubelet 提交 kube-apiserver,最终更新到 Node 的 Resource 上。

ListAndWatch 方法内容如下:

// ListAndWatch returns a stream of List of Devices
// Whenever a Device state change or a Device disappears, ListAndWatch
// returns the new list
func (p *Plugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
	p.AMDGPUs = amdgpu.GetAMDGPUs()

	devs := make([]*kubeletdevicepluginv1beta1.Device, len(p.AMDGPUs))

	// limit scope for hwloc
	func() {
		var hw hwloc.Hwloc
		hw.Init()
		defer hw.Destroy()

		i := 0
		for id := range p.AMDGPUs {
			dev := &kubeletdevicepluginv1beta1.Device{
				ID:     id,
				Health: kubeletdevicepluginv1beta1.Healthy,
			}
			devs[i] = dev
			i++

			numas, err := hw.GetNUMANodes(id)
			glog.Infof("Watching GPU with bus ID: %s NUMA Node: %+v", id, numas)
			if err != nil {
				glog.Error(err)
				continue
			}

			if len(numas) == 0 {
				glog.Errorf("No NUMA for GPU ID: %s", id)
				continue
			}

			numaNodes := make([]*kubeletdevicepluginv1beta1.NUMANode, len(numas))
			for j, v := range numas {
				numaNodes[j] = &kubeletdevicepluginv1beta1.NUMANode{
					ID: int64(v),
				}
			}

			dev.Topology = &kubeletdevicepluginv1beta1.TopologyInfo{
				Nodes: numaNodes,
			}
		}
	}()

	fakedevs := p.apiDevices()
	s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: p.generateFakeDevs(fakedevs)})

	for {
		select {
		case <-p.Heartbeat:
			var health = kubeletdevicepluginv1beta1.Unhealthy

			// TODO there are no per device health check currently
			// TODO all devices on a node is used together by kfd
			if simpleHealthCheck() {
				health = kubeletdevicepluginv1beta1.Healthy
			}

			for i := 0; i < len(p.AMDGPUs); i++ {
				devs[i].Health = health
			}
			s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: p.generateFakeDevs(fakedevs)})
		}
	}
	// returning a value with this function will unregister the plugin from k8s
}

看了下有部分无效代码,有用的下面这部分:

fakedevs := p.apiDevices()
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: p.generateFakeDevs(fakedevs)})

for {
    select {
    case <-p.Heartbeat:
       s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: p.generateFakeDevs(fakedevs)})
    }
}

apiDevices

根据前面 Start 方法中获取到的 device 信息生成 fakeDevice 提交给 kubelet。

func (r *Plugin) apiDevices() *[]*api.DeviceInfo {
	res := []*api.DeviceInfo{}
	for idx, val := range r.devices {
		if val.MemoryTotal > 0 {
			res = append(res, &api.DeviceInfo{
				Index:   idx,
				Id:      "DCU-" + fmt.Sprint(idx),
				Count:   4,
				Devmem:  int32(val.MemoryTotal / 1024 / 1024),
				Devcore: 100,
				Numa:    0,
				Type:    val.DevTypeName,
				Health:  true,
			})
		}
	}
	return &res
}

其中 Count 固定为 4,即每个 DCU 可以切分为 4 个 vdcu。

ps:因为目前海光 DCU 虚拟化功能在一张物理卡上支持最多 4 个 vDCU。

generateFakeDevs

func (p *Plugin) generateFakeDevs(devices *[]*api.DeviceInfo) []*kubeletdevicepluginv1beta1.Device {
	fakedevs := []*kubeletdevicepluginv1beta1.Device{}

	for _, val := range *devices {
		idx := 0
		for idx < int(val.Count) {
			fakedevs = append(fakedevs, &kubeletdevicepluginv1beta1.Device{
				ID:     val.Id + "-fake-" + fmt.Sprint(idx),
				Health: kubeletdevicepluginv1beta1.Healthy,
			})
			idx++
		}
	}
	return fakedevs
}

然后在 generateFakeDevs 中在根据 Count 信息复制出足够数量的 fakedev,至此对 Kubelet 来说感知到的 dcu 数量将是 物理 dcu 数量的 4 倍。

Allocate

Allocate 则是包含了真正将 vDCU 分配给 Pod 的逻辑。

ps:因为没有单独的 container runtime,因此在 Allocate 中需要额外做一些工作,比如把

kfd、mkfd、dri、hygondriver、hyhal 等等设备或者目录挂载到 Pod 中。

func (p *Plugin) Allocate(ctx context.Context, reqs *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
	var car kubeletdevicepluginv1beta1.ContainerAllocateResponse
	var dev *kubeletdevicepluginv1beta1.DeviceSpec
	responses := kubeletdevicepluginv1beta1.AllocateResponse{}
	nodename := util.NodeName
	current, err := hmutil.GetPendingPod(ctx, nodename)
	if err != nil {
		//nodelock.ReleaseNodeLock(nodename, NodeLockDCU, current, false)
		return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
	}
	klog.Infof("Allocate for pod %s/%s uid [%s] \n", current.Namespace, current.Name, current.UID)
	drmCards, drmRenders, err := util.ListDcuDrmDevices()
	if err != nil {
		util.PodAllocationFailed(nodename, current, NodeLockDCU)
		return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
	}
	for idx := range reqs.ContainerRequests {
		currentCtr, devreq, err := util.GetNextDeviceRequest(util.HygonDCUDevice, *current)
		klog.Infoln("deviceAllocateFromAnnotation=", devreq)
		if err != nil {
			util.PodAllocationFailed(nodename, current, NodeLockDCU)
			return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
		}
		if len(devreq) != len(reqs.ContainerRequests[idx].DevicesIDs) {
			util.PodAllocationFailed(nodename, current, NodeLockDCU)
			return &kubeletdevicepluginv1beta1.AllocateResponse{}, errors.New("device number not matched")
		}

		err = util.EraseNextDeviceTypeFromAnnotation(util.HygonDCUDevice, *current)
		if err != nil {
			util.PodAllocationFailed(nodename, current, NodeLockDCU)
			return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
		}

		car = kubeletdevicepluginv1beta1.ContainerAllocateResponse{}
		// Currently, there are only 1 /dev/kfd per nodes regardless of the # of GPU available
		// for compute/rocm/HSA use cases
		dev = new(kubeletdevicepluginv1beta1.DeviceSpec)
		dev.HostPath = "/dev/kfd"
		dev.ContainerPath = "/dev/kfd"
		dev.Permissions = "rwm"
		car.Devices = append(car.Devices, dev)

		dev = new(kubeletdevicepluginv1beta1.DeviceSpec)
		dev.HostPath = "/dev/mkfd"
		dev.ContainerPath = "/dev/mkfd"
		dev.Permissions = "rwm"
		car.Devices = append(car.Devices, dev)

		for _, val := range devreq {
			var devIdx = -1
			klog.Infof("Allocating device ID: %s", val.UUID)
			succeedCount, err := fmt.Sscanf(val.UUID, "DCU-%d", &devIdx)
			if err != nil || succeedCount == 0 || devIdx == -1 {
				klog.Errorf("Invalid request device uuid: %s", val.UUID)
				util.PodAllocationFailed(nodename, current, NodeLockDCU)
				return &kubeletdevicepluginv1beta1.AllocateResponse{}, fmt.Errorf("invalid request device uuid %s", val.UUID)
			}

			if devIdx > len(drmCards) || devIdx > len(drmRenders) {
				klog.Errorf("Invalid device index: %d, all devices counts is: %d, all renders count is: %d", devIdx, len(drmCards), len(drmRenders))
				util.PodAllocationFailed(nodename, current, NodeLockDCU)
				return &kubeletdevicepluginv1beta1.AllocateResponse{}, fmt.Errorf("can not match dcu dri request %s. cards %d, renders %d", val.UUID, len(drmCards), len(drmRenders))
			}

			drmCardName := drmCards[devIdx]
			klog.Infof("All dcu dri card devs: %v, mapped dri: %s", drmCards, drmCardName)
			devpath := fmt.Sprintf("/dev/dri/%s", drmCardName)
			dev = new(kubeletdevicepluginv1beta1.DeviceSpec)
			dev.HostPath = devpath
			dev.ContainerPath = devpath
			dev.Permissions = "rw"
			car.Devices = append(car.Devices, dev)

			drmRenderName := drmRenders[devIdx]
			klog.Infof("All dcu dri render devs: %v, mapped dri: %s", drmRenders, drmRenderName)
			devpath = fmt.Sprintf("/dev/dri/%s", drmRenderName)
			dev = new(kubeletdevicepluginv1beta1.DeviceSpec)
			dev.HostPath = devpath
			dev.ContainerPath = devpath
			dev.Permissions = "rw"
			car.Devices = append(car.Devices, dev)
		}
		//Create vdev file
		klog.Infoln("devreqs=", len(devreq), "usedmem=", devreq[0].Usedmem, ":", p.devices[0].MemoryTotal/1024/1024)
		if len(devreq) < 2 && devreq[0].Usedmem < int32(p.devices[0].MemoryTotal/1024/1024) {
			filename, err := p.createvdevFiles(current, &currentCtr, devreq)
			if err != nil {
				util.PodAllocationFailed(nodename, current, NodeLockDCU)
				return &responses, err
			}
			if len(filename) > 0 {
				car.Mounts = append(car.Mounts, &kubeletdevicepluginv1beta1.Mount{
					ContainerPath: "/etc/vdev/docker/",
					HostPath:      filename,
					ReadOnly:      false,
				}, &kubeletdevicepluginv1beta1.Mount{
					ContainerPath: "/opt/hygondriver",
					HostPath:      os.Getenv("HYGONPATH"),
					ReadOnly:      false,
				}, &kubeletdevicepluginv1beta1.Mount{
					ContainerPath: "/opt/hyhal",
					HostPath:      "/opt/hyhal",
					ReadOnly:      false,
				})
				car.Mounts = append(car.Mounts)
			}
		}
		responses.ContainerResponses = append(responses.ContainerResponses, &car)
	}
	klog.Infoln("response=", responses)
	util.PodAllocationTrySuccess(nodename, util.HygonDCUDevice, NodeLockDCU, current)
	return &responses, nil
}

和 hami-nvidia-device-plugin 一样的逻辑,在 hami-scheduler 的时候就已经把要分配的 device 选好了。

DecodePodDevices

device-plugin 这边只需要从 Annoation 中解析出来即可。

func DecodePodDevices(checklist map[string]string, annos map[string]string) (PodDevices, error) {
	klog.V(5).Infof("checklist is [%+v], annos is [%+v]", checklist, annos)
	if len(annos) == 0 {
		return PodDevices{}, nil
	}
	pd := make(PodDevices)
	for devID, devs := range checklist {
		str, ok := annos[devs]
		if !ok {
			continue
		}
		pd[devID] = make(PodSingleDevice, 0)
		for _, s := range strings.Split(str, OnePodMultiContainerSplitSymbol) {
			cd, err := DecodeContainerDevices(s)
			if err != nil {
				return PodDevices{}, nil
			}
			if len(cd) == 0 {
				continue
			}
			pd[devID] = append(pd[devID], cd)
		}
	}
	klog.InfoS("Decoded pod annos", "poddevices", pd)
	return pd, nil
}

createvdevFiles

最后会创建一个 vdev 配置文件并挂载到 Pod 里

			filename, err := p.createvdevFiles(current, &currentCtr, devreq)
			if err != nil {
				util.PodAllocationFailed(nodename, current, NodeLockDCU)
				return &responses, err
			}
			if len(filename) > 0 {
				car.Mounts = append(car.Mounts, &kubeletdevicepluginv1beta1.Mount{
					ContainerPath: "/etc/vdev/docker/",
					HostPath:      filename,
					ReadOnly:      false,
				})
				car.Mounts = append(car.Mounts)
			} 

之前的 RefreshContainerDevices 方法维护的就是这里创建的 vdev 配置文件,当 Pod 删除后需要清理对应的目录。

	dirName := string(current.UID) + "_" + ctr.Name + "_" + fmt.Sprint(devidx) + "_" + fmt.Sprint(pipeid) + "_" + fmt.Sprint(vdevidx) + "_" + fmt.Sprint(coremsk1) + "_" + fmt.Sprint(coremsk2)
	cacheFileHostDirectory := fmt.Sprintf("/usr/local/vgpu/dcu/%s", dirName)
	err = createvdevFile(pcibusId, coremsk1, coremsk2, reqcores, mem, 0, vdevidx, pipeid, cacheFileHostDirectory, "vdev0.conf")
	if err != nil {
		return "", err
	}

createvdevFile 内容如下:

func createvdevFile(pcibusId, coremsk1, coremsk2 string, reqcores, mem int32, deviceid, vdevidx, pipeid int, cacheFileHostDirectory, cacheFileName string) error {
	s := ""
	s = fmt.Sprintf("PciBusId: %s\n", pcibusId)
	s = s + fmt.Sprintf("cu_mask: 0x%s\n", coremsk1)
	s = s + fmt.Sprintf("cu_mask: 0x%s\n", coremsk2)
	s = s + fmt.Sprintf("cu_count: %d\n", reqcores)
	s = s + fmt.Sprintf("mem: %d MiB\n", mem)
	s = s + fmt.Sprintf("device_id: %d\n", deviceid)
	s = s + fmt.Sprintf("vdev_id: %d\n", vdevidx)
	s = s + fmt.Sprintf("pipe_id: %d\n", pipeid)
	s = s + fmt.Sprintln("enable: 1")
	klog.Infoln("s=", s)

	_, err := os.Stat(cacheFileHostDirectory)
	if os.IsNotExist(err) {
		err := os.MkdirAll(cacheFileHostDirectory, 0777)
		if err != nil {
			return err
		}
		err = os.Chmod(cacheFileHostDirectory, 0777)
		if err != nil {
			return err
		}
	}

	err = os.WriteFile(fmt.Sprintf("%s/%s", cacheFileHostDirectory, cacheFileName), []byte(s), os.ModePerm)
	if err != nil {
		return err
	}
	return nil
}

就是简单创建了一个文件,但是把相关信息都写到文件里了,后续驱动程序根据该配置文件就知道该 Pod 可以使用多少 core 多少 mem 了。

ps:和 vGPU 通过 env 传递信息的方式不同,但是作用都是一样的。

这部分逻辑和以下命令一致,都是用于创建 vDCU 配置文件。

$ hy-smi virtual -create-vdevices 4 -d 0 \
-vdevice-compute-units 5,15,20,20 \
-vdevice-memory-size 4096,8192,8192,8192