DRA (Dynamic Resource Allocation) 工作流程分析
基于 Kubernetes master 分支代码分析
目录
1. 概述
DRA (Dynamic Resource Allocation) 是 Kubernetes 中用于分配异构硬件资源(如 GPU、FPGA、NIC 等)的通用框架,替代了传统的 Device Plugin 机制。与 Device Plugin 相比,DRA 具有以下优势:
结构化参数 :通过 CEL 表达式进行设备选择,支持跨设备约束动态分配 :调度器在调度时实时分配设备,而非依赖节点级的 allocatable 上报灵活性 :支持共享设备、可分区设备、管理访问等高级特性设备污点 :支持类似 Node Taint 的设备级污点和容忍机制核心组件:
组件 路径 职责 API 类型 pkg/apis/resource/types.go定义 ResourceSlice、ResourceClaim、DeviceClass 等 调度器插件 pkg/scheduler/framework/plugins/dynamicresources/在调度周期中分配设备 ResourceClaim 控制器 pkg/controller/resourceclaim/controller.go从模板创建 Claim,管理预留 污点驱逐控制器 pkg/controller/devicetainteviction/device_taint_eviction.go驱逐使用污点设备的 Pod Kubelet DRA 管理器 pkg/kubelet/cm/dra/manager.go调用驱动 gRPC 接口准备资源 结构化分配器 staging/src/k8s.io/dynamic-resource-allocation/structured/核心分配算法 驱动辅助库 staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/驱动实现框架
2. 核心数据模型
2.1 API 对象关系
1
2
3
4
5
6
7
8
9
10
11
12
13
ResourceSlice (驱动发布可用设备)
│
│ 引用
▼
DeviceClass (管理员定义设备选择器和配置)
│
│ 引用
▼
ResourceClaimTemplate → ResourceClaim (请求设备)
│
│ 绑定
▼
Pod (消费已分配的设备)
2.2 ResourceSlice
ResourceSlice 由 DRA 驱动发布,声明可用的设备池。
文件: pkg/apis/resource/types.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// ResourceSlice 代表由同一驱动管理的一个或多个资源
type ResourceSlice struct {
metav1 . TypeMeta
metav1 . ObjectMeta
Spec ResourceSliceSpec
}
type ResourceSliceSpec struct {
Driver string // DRA 驱动名称,不可变
Pool ResourcePool // 资源池标识
// 节点选择(三选一)
NodeName * string
NodeSelector * core . NodeSelector
AllNodes * bool
// 设备列表(最多 128 个,启用高级特性时最多 64 个)
Devices [] Device
// 每设备节点选择(DRAPartitionableDevices)
PerDeviceNodeSelection * bool
// 共享计数器集(DRAPartitionableDevices)
SharedCounters [] CounterSet
}
Device 结构 表示单个硬件实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Device struct {
Name string
Attributes map [ QualifiedName ] DeviceAttribute // 设备属性(最多 32 个)
Capacity map [ QualifiedName ] DeviceCapacity // 设备容量
ConsumesCounters [] DeviceCounterConsumption // 消费的共享计数器
// 每设备节点选择(DRAPartitionableDevices)
NodeName * string
NodeSelector * core . NodeSelector
AllNodes * bool
// 污点(DRADeviceTaints)
Taints [] DeviceTaint
// 绑定条件(DRADeviceBindingConditions)
BindsToNode * bool
BindingConditions [] string
BindingFailureConditions [] string
// 允许多次分配(DRAConsumableCapacity)
AllowMultipleAllocations * bool
// 节点可分配资源映射(DRANodeAllocatableResources)
NodeAllocatableResourceMappings map [ v1 . ResourceName ] NodeAllocatableResourceMapping
}
设备的全局唯一标识为三元组:<driver name>, <pool name>, <device name>。
2.3 ResourceClaim
ResourceClaim 描述对资源的请求,跟踪分配状态。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type ResourceClaim struct {
metav1 . TypeMeta
metav1 . ObjectMeta
Spec ResourceClaimSpec // 请求内容,不可变
Status ResourceClaimStatus // 分配状态
}
type ResourceClaimSpec struct {
Devices DeviceClaim // 设备请求
}
type DeviceClaim struct {
Requests [] DeviceRequest // 设备请求列表(最多 32 个)
Constraints [] DeviceConstraint // 跨设备约束
Config [] DeviceClaimConfiguration // 驱动配置
}
DeviceRequest 支持两种模式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type DeviceRequest struct {
Name string // DNS 标签,被 Pod 的 containers[].resources.claims 引用
// 两种请求模式(二选一)
Exactly * ExactDeviceRequest // 精确请求
FirstAvailable [] DeviceSubRequest // 优先级列表(DRAPrioritizedList)
}
type ExactDeviceRequest struct {
DeviceClassName string // 引用 DeviceClass
Selectors [] DeviceSelector // 设备选择器
AllocationMode DeviceAllocationMode // ExactCount 或 All
Count int64 // ExactCount 模式的数量
AdminAccess * bool // 管理访问(DRAAdminAccess)
Tolerations [] DeviceToleration // 污点容忍(DRADeviceTaints)
Capacity * CapacityRequirements // 容量需求(DRAConsumableCapacity)
}
ResourceClaimStatus 跟踪分配结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type ResourceClaimStatus struct {
Allocation * AllocationResult // 分配结果
ReservedFor [] ResourceClaimConsumerReference // 预留消费者(最多 256 个)
Devices [] AllocatedDeviceStatus // 每设备状态
}
type AllocationResult struct {
Devices DeviceAllocationResult
NodeSelector * core . NodeSelector // 分配设备可用的节点
AllocationTimestamp * metav1 . Time
}
type DeviceRequestAllocationResult struct {
Request string // 回引 DeviceRequest.Name
Driver string // 驱动名称
Pool string // 池名称
Device string // 设备名称
AdminAccess * bool
Tolerations [] DeviceToleration // 从请求复制的容忍
ShareID * types . UID // 共享设备标识
ConsumedCapacity map [ QualifiedName ] resource . Quantity
BindingConditions [] string // 从 ResourceSlice 复制
BindingFailureConditions [] string // 从 ResourceSlice 复制
}
2.4 DeviceClass
DeviceClass 是集群范围的管理员资源,提供默认选择器和配置:
1
2
3
4
5
6
7
8
9
10
11
type DeviceClass struct {
metav1 . TypeMeta
metav1 . ObjectMeta
Spec DeviceClassSpec
}
type DeviceClassSpec struct {
Selectors [] DeviceSelector // 设备必须满足的选择器
Config [] DeviceClassConfiguration // 传递给驱动的配置
ExtendedResourceName * string // 映射到扩展资源(DRAExtendedResource)
}
2.5 ResourceClaimTemplate
ResourceClaimTemplate 用于为每个 Pod 生成 ResourceClaim:
1
2
3
4
5
6
7
8
9
10
type ResourceClaimTemplate struct {
metav1 . TypeMeta
metav1 . ObjectMeta
Spec ResourceClaimTemplateSpec
}
type ResourceClaimTemplateSpec struct {
ObjectMeta metav1 . ObjectMeta // 标签/注解复制到 ResourceClaim
Spec ResourceClaimSpec // 不变地复制到 ResourceClaim
}
2.6 DeviceTaintRule
DeviceTaintRule 为匹配选择器的设备添加污点(仅内部 API,尚未晋升 v1):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type DeviceTaintRule struct {
Spec DeviceTaintRuleSpec
Status DeviceTaintRuleStatus
}
type DeviceTaintRuleSpec struct {
DeviceSelector * DeviceTaintSelector // 驱动/池/设备过滤
Taint DeviceTaint
}
type DeviceTaint struct {
Key string
Value string
Effect DeviceTaintEffect // None, NoSchedule, NoExecute
TimeAdded * metav1 . Time
}
3. DRA 驱动发布资源
3.1 驱动辅助库
文件: staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go
DRA 驱动必须实现 DRAPlugin 接口:
1
2
3
4
5
type DRAPlugin interface {
PrepareResourceClaims ( ctx context . Context , claims [] * resourceapi . ResourceClaim ) ( map [ types . UID ] PrepareResult , error )
UnprepareResourceClaims ( ctx context . Context , claims [] NamespacedObject ) ( map [ types . UID ] error , error )
HandleError ( ctx context . Context , err error , msg string )
}
驱动通过 Helper.Start() 启动,它会:
启动 gRPC DRA 服务器(注册 v1 和 v1beta1 服务) 启动注册服务器(通知 kubelet 插件存在) 设置上下文取消时的清理逻辑 3.2 发布 ResourceSlice
文件: staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.go
驱动调用 PublishResources 来声明可用设备:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func ( d * Helper ) PublishResources ( _ context . Context , resources resourceslice . DriverResources ) error {
d . mutex . Lock ()
defer d . mutex . Unlock ()
owner := resourceslice . Owner { APIVersion : "v1" , Kind : "Node" , Name : d . nodeName , UID : d . nodeUID }
if d . resourceSliceController == nil {
d . resourceSliceController , err = resourceslice . StartController ( controllerCtx , resourceslice . Options {
DriverName : d . driverName ,
KubeClient : d . kubeClient ,
Owner : & owner ,
Resources : driverResources ,
ErrorHandler : func ( ctx context . Context , err error , msg string ) { ... },
})
} else {
d . resourceSliceController . Update ( driverResources )
}
}
DriverResources 结构:
1
2
3
4
5
6
7
8
9
10
type DriverResources struct {
Pools map [ string ] Pool
}
type Pool struct {
NodeSelector * v1 . NodeSelector
AllNodes bool
Generation int64
Slices [] Slice
}
Pool 同步逻辑 (syncPool):
收集现有 ResourceSlice,按池名过滤 按名称中的编码索引匹配期望切片 如果有变化,提升 generation(旧 generation 的切片被忽略) 删除过时切片、更新变更切片、创建新切片 切片名格式:<hex-index>-<driver>-<owner>-,索引按十六进制编码以支持字典序排序 3.3 gRPC 服务接口
文件: staging/src/k8s.io/kubelet/pkg/apis/dra/v1/api_grpc.pb.go
1
2
3
4
type DRAPluginClient interface {
NodePrepareResources ( ctx context . Context , in * NodePrepareResourcesRequest , ... ) ( * NodePrepareResourcesResponse , error )
NodeUnprepareResources ( ctx context . Context , in * NodeUnprepareResourcesRequest , ... ) ( * NodeUnprepareResourcesResponse , error )
}
请求/响应消息 (api.pb.go):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type NodePrepareResourcesRequest struct {
Claims [] * Claim
}
type NodePrepareResourcesResponse struct {
Claims map [ string ] * NodePrepareResourceResponse // key: claim UID
}
type NodePrepareResourceResponse struct {
Devices [] * Device
Error string
}
type Device struct {
RequestNames [] string // 对应 ResourceClaim 中的请求名
PoolName string
DeviceName string
CdiDeviceIds [] string // CDI 设备 ID,传递给容器运行时
ShareId * string
}
4. Pod 创建与 ResourceClaim 生命周期
4.1 ResourceClaim 控制器
文件: pkg/controller/resourceclaim/controller.go
控制器负责三件事:
从 ResourceClaimTemplate 创建 ResourceClaim 管理预留(ReservedFor) 清理已终止 Pod 的 Claim 4.1.1 从模板创建 Claim
当 Pod 引用 ResourceClaimTemplate 时,syncPod 为每个 claim 调用 handleClaim:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// pkg/controller/resourceclaim/controller.go
func ( ec * Controller ) handleClaim ( ctx context . Context , pod * v1 . Pod , podClaim * v1 . PodResourceClaim , ... ) error {
// 1. 解析 claim 名称
claimName , checkOwner , err := resourceclaim . Name ( pod , & podClaim )
// 2. 如果 claim 已存在,直接返回
if claim != nil && checkOwner == nil { return nil }
// 3. 从模板创建 ResourceClaim
template , err := ec . templateLister . ResourceClaimTemplates ( pod . Namespace ). Get ( * templateName )
claim = & resourceapi . ResourceClaim {
ObjectMeta : metav1 . ObjectMeta {
GenerateName : pod . Name + "-" + podClaim . Name + "-" ,
OwnerReferences : [] metav1 . OwnerReference {{
APIVersion : "v1" , Kind : "Pod" ,
Name : pod . Name , UID : pod . UID ,
Controller : & isTrue ,
}},
Annotations : annotations , // 包含 PodResourceClaimAnnotation
Labels : template . Spec . ObjectMeta . Labels ,
},
Spec : template . Spec . Spec , // 直接复制模板 Spec
}
claim , err = ec . kubeClient . ResourceV1 (). ResourceClaims ( pod . Namespace ). Create ( ctx , claim , metav1 . CreateOptions {})
}
关键设计:
Pod 作为 OwnerReference,垃圾回收自动清理 PodResourceClaimAnnotation 注解绑定 claim 到 pod 的特定 claim 名GenerateName 避免命名冲突4.1.2 管理预留
Pod 被调度后(NodeName != ""),控制器为已分配但未预留的 claim 添加预留:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func ( ec * Controller ) syncPod ( ctx context . Context , pod * v1 . Pod ) error {
if pod . Spec . NodeName == "" {
return nil // 调度器负责预留
}
for _ , podClaim := range pod . Spec . ResourceClaims {
if claim . Status . Allocation != nil &&
! resourceclaim . IsReservedForPod ( pod , claim , ec . features . WorkloadResourceClaims ) &&
resourceclaim . CanBeReserved ( claim ) {
if err := ec . reserveFor ( ctx , bindTo , claim ); err != nil {
return err
}
}
}
}
func ( ec * Controller ) reserveFor ( ctx context . Context , consumer resourceapi . ResourceClaimConsumerReference , claim * resourceapi . ResourceClaim ) error {
claim = claim . DeepCopy ()
claim . Status . ReservedFor = append ( claim . Status . ReservedFor , consumer )
_ , err := ec . kubeClient . ResourceV1 (). ResourceClaims ( claim . Namespace ). UpdateStatus ( ctx , claim , metav1 . UpdateOptions {})
return err
}
4.1.3 清理终止 Pod 的 Claim
syncClaim 迭代 ReservedFor 条目,清理不再需要的预留:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func ( ec * Controller ) syncClaim ( ctx context . Context , claim * resourceapi . ResourceClaim ) error {
for _ , reservedFor := range claim . Status . ReservedFor {
keepEntry := true
if ec . deletedObjects . Has ( reservedFor . UID ) {
keepEntry = false // 已删除
} else {
pod , err := ec . podLister . Pods ( claim . Namespace ). Get ( reservedFor . Name )
if err != nil {
// 不在缓存中,查询 API Server 确认
pod , err = ec . kubeClient . CoreV1 (). Pods ( claim . Namespace ). Get ( ctx , reservedFor . Name , metav1 . GetOptions {})
if pod == nil || pod . UID != reservedFor . UID {
keepEntry = false
}
} else if isPodDone ( pod ) {
keepEntry = false // Pod 已终止
}
}
// ... 移除不需要的条目 ...
}
// 当 ReservedFor 为空时:
// 1. 清除分配 (Status.Allocation = nil)
// 2. 移除 Finalizer
// 3. 删除从模板生成的 Claim
}
5. 调度器工作流
5.1 调度器插件概览
文件: pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go
DynamicResources 插件实现了调度框架的 9 个扩展点:
1
2
3
4
5
6
7
8
9
10
type DynamicResources struct {
enabled bool
fts feature . Features
filterTimeout time . Duration
bindingTimeout time . Duration
fh fwk . Handle
clientset kubernetes . Interface
celCache * cel . Cache
draManager fwk . SharedDRAManager
}
调度周期状态数据:
1
2
3
4
5
6
7
8
9
type stateData struct {
claims claimStore // 所有 Claim
draExtendedResource draExtendedResource // 扩展资源
allocator structured . Allocator // 结构化分配器
mutex sync . Mutex
unavailableClaims sets . Set [ int ] // 不可用 Claim 索引
informationsForClaim [] informationForClaim // 每 Claim 的元数据
nodeAllocations map [ string ] nodeAllocation // 每节点分配结果缓存
}
5.2 PreEnqueue
验证 Pod 引用的所有 ResourceClaim 存在且未被删除:
1
2
3
4
5
6
7
func ( pl * DynamicResources ) PreEnqueue ( ctx context . Context , pod * v1 . Pod ) * fwk . Status {
if ! pl . enabled { return nil }
if err := pl . foreachPodResourceClaim ( pod , nil ); err != nil {
return statusUnschedulable ( klog . FromContext ( ctx ), err . Error ())
}
return nil
}
不满足条件的 Pod 留在不可调度队列,等待相关集群事件触发重试。
5.3 PreFilter
这是最复杂的准备阶段,负责收集状态并构建分配器:
收集用户 Claim :获取 Pod 引用的所有 ResourceClaim扩展资源处理 :检查 Pod 是否请求了 DRA 支持的扩展资源,如有则创建内存中的特殊 Claim验证 Claim :已分配的 Claim:提取 NodeSelector,确定可调度节点 未分配的 Claim:验证 DeviceClass 存在 创建分配器 :1
2
3
4
5
6
7
8
9
// 收集当前已分配状态
allocatedState , err := pl . draManager . ResourceClaims (). GatherAllocatedState ()
// 列出所有 ResourceSlice
slices , err := pl . draManager . ResourceSlices (). ListWithDeviceTaintRules ()
// 构建结构化分配器
allocator , err := structured . NewAllocator ( ctx , features , * allocatedState ,
pl . draManager . DeviceClasses (), slices , pl . celCache )
5.4 Filter
对每个候选节点,判断 Pod 的 Claim 能否满足:
扩展资源检查 :计算节点特定的 Claim 规范已分配 Claim 检查 :NodeSelector 是否匹配节点 绑定条件是否满足 分配器分配 :1
2
3
4
5
6
7
8
// 构建待分配 Claim 列表
claimsToAllocate := buildClaimsToAllocate ( ... )
// 调用核心分配算法
allocationResult , err := state . allocator . Allocate ( allocCtx , node , claimsToAllocate )
if err != nil {
// 分配失败,过滤掉此节点
}
节点可分配资源检查 (DRANodeAllocatableResources):验证 DRA 分配的资源是否超出节点剩余容量
存储结果 :将分配结果缓存到 state.nodeAllocations[node.Name]
5.5 PostFilter
当所有节点都不满足时,尝试释放被占用的资源:
1
2
3
4
5
6
7
8
9
10
11
12
func ( pl * DynamicResources ) PostFilter ( ctx context . Context , state * fwk . CycleState , pod * v1 . Pod , ... ) ( * fwk . PostFilterResult , * fwk . Status ) {
// 对每个不可用的 Claim:
// 如果未被其他 Pod 预留,清除其分配
for i := range state . unavailableClaims {
claim := state . claims . claims [ idx ]
if claim . Status . Allocation != nil && ! reservedByOthers {
claim . Status . Allocation = nil
claim . Status . ReservedFor = nil
ec . kubeClient . ResourceV1 (). ResourceClaims ( claim . Namespace ). UpdateStatus ( ctx , claim , ... )
}
}
}
5.6 Score
基于 FirstAvailable 优先级列表的偏好评分:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func computeScore ( iterator iter . Seq2 [ int , * resourceapi . ResourceClaim ], allocations nodeAllocation ) ( int64 , error ) {
var score int64
for i , claim := range iterator {
allocatedSubRequests := sets . New [ string ]()
for _ , res := range allocations . allocationResults [ i ]. Devices . Results {
if resourceclaim . IsSubRequestRef ( res . Request ) {
allocatedSubRequests . Insert ( res . Request )
}
}
for _ , req := range claim . Spec . Devices . Requests {
if req . Exactly != nil { continue }
for i , subReq := range req . FirstAvailable {
subRequestRef := resourceclaim . CreateSubRequestRef ( req . Name , subReq . Name )
if allocatedSubRefs . Has ( subRequestRef ) {
// 优先级越高的子请求(索引越小),得分越高
score += int64 ( resourceapi . FirstAvailableDeviceRequestMaxSize - i )
}
}
}
}
return score , nil
}
5.7 Reserve
选定节点后,记录分配结果(不写入 API Server):
1
2
3
4
5
6
7
8
9
10
11
12
13
func ( pl * DynamicResources ) Reserve ( ctx context . Context , state * fwk . CycleState , pod * v1 . Pod , nodeName string ) * fwk . Status {
// 从缓存获取分配结果
allocations := state . nodeAllocations [ nodeName ]
for i , claim := range claims . toAllocate () {
// 存储分配结果到 state
state . informationsForClaim [ i ]. allocation = & allocations . allocationResults [ i ]
// 标记为"进行中"分配
claim . Status . Allocation = allocations . allocationResults [ i ]
pl . draManager . ResourceClaims (). SignalClaimPendingAllocation ( claim . UID , claim )
}
}
关键设计 — 三层 Claim 跟踪:
层级 机制 作用 1 Informer + AssumeCache PreBind 后立即更新,不等 Informer 同步 2 In-flight Allocations Reserve 到 PreBind 期间防止并发冲突 3 allocatedDevices 响应式维护所有已分配设备 ID 集合
SignalClaimPendingAllocation 将 Claim 标记为"进行中",防止其他 Pod 看到未更新的 Claim 而尝试分配相同设备。
5.8 Unreserve
调度失败时回滚:
1
2
3
4
5
6
7
8
9
10
11
func ( pl * DynamicResources ) Unreserve ( ctx context . Context , state * fwk . CycleState , pod * v1 . Pod , nodeName string ) {
for _ , claim := range state . claims . allUserClaims () {
// 1. 移除进行中分配
pl . draManager . ResourceClaims (). MaybeRemoveClaimPendingAllocation ( claim . UID , false )
// 2. 恢复 AssumeCache
pl . draManager . ResourceClaims (). AssumedClaimRestore ( claim )
// 3. 从 ReservedFor 移除 Pod(通过 strategic merge patch)
}
// 4. 清理扩展资源 Claim
// 5. 清理节点可分配资源状态
}
5.9 PreBind
将分配结果持久化到 API Server:
1
2
3
4
5
6
7
8
9
10
func ( pl * DynamicResources ) PreBind ( ctx context . Context , state * fwk . CycleState , pod * v1 . Pod , nodeName string ) * fwk . Status {
// 1. 绑定每个 Claim
for i , claim := range claims . all () {
if alreadyReservedForPod { continue }
pl . bindClaim ( ctx , pod , i , claim , state )
}
// 2. 绑定节点可分配资源状态
// 3. 等待绑定条件(如果有)
}
bindClaim 的核心逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func ( pl * DynamicResources ) bindClaim ( ctx context . Context , pod * v1 . Pod , ... ) error {
// 如果是扩展资源 Claim(内存中的特殊名称),先创建到 API Server
if claim . Name == "<extended-resources>" {
claim , err = pl . createExtendedResourceClaimInAPI ( ctx , claim )
}
// 设置分配结果和预留
claim . Status . Allocation = allocation
claim . Status . ReservedFor = append ( claim . Status . ReservedFor , consumerRef )
// 更新 API Server(带冲突重试)
_ , err = pl . clientset . ResourceV1 (). ResourceClaims ( claim . Namespace ). UpdateStatus ( ctx , claim , metav1 . UpdateOptions {})
// 更新 AssumeCache
pl . draManager . ResourceClaims (). AssumeClaimAfterAPICall ( claim )
// 移除进行中分配
pl . draManager . ResourceClaims (). MaybeRemoveClaimPendingAllocation ( claim . UID , false )
}
5.10 SignPod
DRA Pod 不参与 Pod 签名:
1
2
3
4
5
6
func ( pl * DynamicResources ) SignPod ( ctx context . Context , pod * v1 . Pod ) ([] fwk . SignFragment , * fwk . Status ) {
if len ( pod . Spec . ResourceClaims ) > 0 {
return nil , fwk . NewStatus ( fwk . Unschedulable , "pods with dra resource claims are not signable" )
}
return nil , nil
}
5.11 DRA Manager
文件: pkg/scheduler/framework/plugins/dynamicresources/dra_manager.go
1
2
3
4
5
6
7
type DefaultDRAManager struct {
resourceClaimTracker * claimTracker // Claim 跟踪器
resourceSliceLister * resourceSliceLister // ResourceSlice 列表器
deviceClassLister * deviceClassLister // DeviceClass 列表器
podGroupLister * podGroupLister // PodGroup 列表器
extendedResourceCache * extendedresourcecache . ExtendedResourceCache
}
allocatedDevices (allocateddevices.go) 响应式维护所有已分配设备:
1
2
3
4
5
6
7
8
9
type allocatedDevices struct {
logger klog . Logger
mutex sync . RWMutex
revision int64 // 乐观并发控制
ids sets . Set [ structured . DeviceID ]
shareIDs sets . Set [ structured . SharedDeviceID ]
capacities structured . ConsumedCapacityCollection
enabledConsumableCapacity bool
}
Informer 事件处理器在 Claim 增加/更新/删除时调用 addDevices/removeDevices。revision 计数器允许调用者检测并发修改并重试。
6. Kubelet 资源准备
6.1 DRA Manager
文件: pkg/kubelet/cm/dra/manager.go
Kubelet DRA 管理器负责在节点上准备和清理 DRA 资源。
6.1.1 PrepareResources
入口点,为 Pod 准备资源:
阶段 1 — 验证(无变更):
1
2
3
4
5
6
7
8
9
10
11
12
func ( m * Manager ) prepareResources ( ctx context . Context , pod * v1 . Pod ) error {
// 获取每个 ResourceClaim,验证 Pod 在 ReservedFor 中
for i , podClaim := range pod . Spec . ResourceClaims {
resourceClaim , err := m . kubeClient . ResourceV1 (). ResourceClaims ( pod . Namespace ). Get ( ctx , claimName , metav1 . GetOptions {})
claimInfo , err := newClaimInfoFromClaim ( resourceClaim )
// 解析每个需要的 DRA 驱动
for driverName := range claimInfo . DriverState {
plugin , err := m . draPlugins . GetPlugin ( driverName )
infos [ i ]. plugins [ driverName ] = plugin
}
}
}
阶段 2 — 缓存更新 + 检查点:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
err = m . cache . withLock ( logger , func () error {
for i := range podResourceClaims {
claimInfo , exists := m . cache . get ( resourceClaim . Name , resourceClaim . Namespace )
if ! exists {
m . cache . add ( claimInfo )
}
claimInfo . addPodReference ( pod . UID )
m . cache . syncToCheckpoint ()
if claimInfo . isPrepared () { continue }
// 构建每驱动的 gRPC 批次
for driverName := range claimInfo . DriverState {
batches [ plugin ] = append ( batches [ plugin ], claim )
}
}
})
阶段 3 — gRPC 调用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
for plugin , claims := range batches {
response , err := plugin . NodePrepareResources ( ctx , & drapb . NodePrepareResourcesRequest { Claims : claims })
for claimUID , result := range response . Claims {
for _ , device := range result . GetDevices () {
info . addDevice ( plugin . DriverName (), state . Device {
PoolName : device . PoolName ,
DeviceName : device . DeviceName ,
ShareID : ( * types . UID )( device . ShareId ),
RequestNames : device . RequestNames ,
CDIDeviceIDs : device . CdiDeviceIds ,
})
}
}
}
阶段 4 — 标记已准备 + 最终检查点:
1
2
3
4
5
6
m . cache . withLock ( logger , func () error {
for _ , claim := range resourceClaims {
info . setPrepared ()
}
m . cache . syncToCheckpoint ()
})
6.1.2 GetResources
为容器运行时提供 CDI 设备:
1
2
3
4
5
6
7
8
9
10
func ( m * Manager ) GetResources ( pod * v1 . Pod , container * v1 . Container ) ( * ContainerInfo , error ) {
cdiDevices := [] kubecontainer . CDIDevice {}
for claimName , requestNames := range claimRequests {
claimInfo , exists := m . cache . get ( claimName , pod . Namespace )
for _ , requestName := range requestNames {
cdiDevices = append ( cdiDevices , claimInfo . cdiDevicesAsList ( requestName ) ... )
}
}
return & ContainerInfo { CDIDevices : cdiDevices }, nil
}
CDI 设备传递给容器运行时,在创建容器时注入。
6.1.3 UnprepareResources
Pod 终止时清理资源:
1
2
3
4
5
6
7
func ( m * Manager ) UnprepareResources ( ctx context . Context , pod * v1 . Pod ) error {
// 移除 Pod 引用
// 当没有 Pod 引用 Claim 时,调用 NodeUnprepareResources
for plugin , claims := range batches {
response , err := plugin . NodeUnprepareResources ( ctx , & drapb . NodeUnprepareResourcesRequest { Claims : claims })
}
}
6.2 Claim 信息缓存
文件: pkg/kubelet/cm/dra/claiminfo.go
1
2
3
4
5
6
7
8
9
10
type claimInfoCache struct {
sync . RWMutex
checkpointer state . Checkpointer
claimInfo map [ string ] * ClaimInfo // key: namespace/claimname
}
type ClaimInfo struct {
state . ClaimInfoState
prepared bool
}
检查点机制保证重启后恢复状态。syncToCheckpoint 在准备/取消准备的关键点写入磁盘。
6.3 插件注册与发现
文件: pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go
1
2
3
4
var servicesSupportedByKubelet = [] string {
drapbv1 . DRAPluginService , // 优先 v1
drapbv1beta1 . DRAPluginService , // 回退 v1beta1
}
注册流程:
驱动在 /var/lib/kubelet/plugins_registry/ 创建注册 socket Kubelet 发现 socket,调用 ValidatePlugin 验证服务版本 Kubelet 调用 RegisterPlugin,建立 gRPC 连接 启动健康监控流(如果 ResourceHealthStatus 启用) 驱动注销时的 ResourceSlice 清理:
defaultWipingDelay 常量定义在 manager.go(30 秒),传递给 DRAPluginManager:
1
2
3
4
5
6
7
8
9
10
11
12
// pkg/kubelet/cm/dra/manager.go:72
const defaultWipingDelay = 30 * time . Second
// 当驱动断开连接 30 秒后,清理其 ResourceSlice
func ( pm * DRAPluginManager ) sync ( driverName string ) {
if pm . usable ( driverName ) {
pm . pendingWipes . CancelWork ( ... )
return
}
fireAt := now . Add ( pm . wipingDelay ) // 30 秒延迟
pm . pendingWipes . AddWork ( ctx , workArgs , now , fireAt )
}
6.4 健康监控
文件: pkg/kubelet/cm/dra/manager.go
当 ResourceHealthStatus feature gate 启用时,为每个插件启动健康监控 goroutine:
1
2
3
4
5
6
7
go func () {
streamCtx , streamCancel := context . WithCancel ( p . backgroundCtx )
wait . UntilWithContext ( streamCtx , func ( ctx context . Context ) {
stream , err := p . NodeWatchResources ( ctx )
err = pm . streamHandler . HandleWatchResourcesStream ( ctx , stream , driverName )
}, 5 * time . Second )
}()
HandleWatchResourcesStream 接收健康更新,更新 healthInfoCache,通知受影响的 Pod:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
for {
resp , err := stream . Recv ()
devices := make ([] state . DeviceHealth , len ( resp . GetDevices ()))
changedDevices , _ := m . healthInfoCache . updateHealthInfo ( logger , pluginName , devices )
if len ( changedDevices ) > 0 {
// 查找使用变更设备的 Pod
for _ , dev := range changedDevices {
for _ , cInfo := range m . cache . claimInfo {
for _ , allocatedDevice := range driverState . Devices {
if allocatedDevice . PoolName == dev . PoolName && allocatedDevice . DeviceName == dev . DeviceName {
podsToUpdate . Insert ( cInfo . PodUIDs . UnsortedList () ... )
}
}
}
}
// 发送更新通知
m . update <- resourceupdates . Update { PodUIDs : podUIDs }
}
}
6.5 协调循环
每 60 秒运行一次,清理不活跃 Pod 的 Claim 信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func ( m * Manager ) reconcileLoop ( ctx context . Context ) {
activePods := sets . New [ string ]()
for _ , p := range m . activePods () {
activePods . Insert ( string ( p . UID ))
}
// 查找仍被不活跃 Pod 引用的 Claim
for _ , claimInfo := range m . cache . claimInfo {
for podUID := range claimInfo . PodUIDs {
if ! activePods . Has ( podUID ) {
// 收集不活跃 Pod 的 Claim
}
}
}
// Unprepare 不活跃 Pod 的资源
for _ , podClaims := range inactivePodClaims {
m . unprepareResources ( ctx , podClaims . uid , podClaims . namespace , podClaims . claimNames )
}
}
7. 结构化分配器
7.1 分配器接口与分层
文件: staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go
1
2
3
type Allocator interface {
Allocate ( ctx context . Context , node * v1 . Node , claims [] * resourceapi . ResourceClaim ) ([] resourceapi . AllocationResult , error )
}
三个实现层:
层级 包路径 支持的特性 stable internal/stable/AdminAccess, PrioritizedList, PartitionableDevices, DeviceTaints incubating internal/incubating/stable + DeviceBindingAndStatus, ConsumableCapacity experimental internal/experimental/incubating + ListTypeAttributes
选择逻辑: 按稳定度排序,选择第一个支持所需特性集的分配器:
1
2
3
4
5
6
7
func NewAllocator ( ctx context . Context , features Features , allocatedState AllocatedState , ... ) ( Allocator , error ) {
for _ , allocator := range availableAllocators {
if allocator . supportedFeatures . Set (). IsSuperset ( features . Set ()) {
return allocator . newAllocator ( ctx , features , allocatedState , classLister , slices , celCache )
}
}
}
各层完全独立,不共享代码。当孵化层代码足够成熟时,可以整体复制到稳定层。
7.2 分配算法
文件: staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/allocator_stable.go
Phase 1 — 收集池:
1
pools , err := GatherPools ( ctx , alloc . slices , node , a . features )
GatherPools 收集与目标节点相关的 ResourceSlice,按 (Driver, PoolName) 分组,验证完整性。
Phase 2 — 验证请求:
检查所有选择器使用 CEL 验证 DeviceClass 引用 确定 numDevices(ExactCount 模式)或预计算 allDevices 列表(All 模式) Phase 3 — 递归搜索:
1
done , err := alloc . allocateOne ( deviceIndices {}, false )
allocateOne 是核心递归算法:
基线情况 :所有 Claim 已分配 → 返回成功FirstAvailable :按优先级顺序尝试子请求,第一个成功的胜出All 模式 :预计算的设备列表逐一尝试ExactCount 模式 :遍历池/切片/设备,对每个候选设备:跳过已使用的设备 检查 CEL 选择器 检查约束和污点 如果分配成功,递归尝试下一个设备索引 失败时回溯(deallocate()) 7.3 CEL 选择器评估
文件: staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/allocator_stable.go
1
2
3
4
5
6
7
8
9
10
11
12
func ( alloc * allocator ) selectorsMatch ( r requestIndices , device * draapi . Device , deviceID DeviceID , ... ) ( bool , error ) {
for _ , selector := range selectors {
expr := alloc . celCache . GetOrCompile ( selector . CEL . Expression )
matches , _ , err := expr . DeviceMatches ( alloc . ctx , cel . Device {
Driver : deviceID . Driver . String (),
Attributes : d . Attributes ,
Capacity : d . Capacity ,
})
if ! matches { return false , nil }
}
return true , nil
}
关键特性:
编译缓存 :每个 CEL 表达式只编译一次设备匹配缓存 :deviceMatchesRequest 缓存每个 (设备, 请求) 对的布尔结果AND 语义 :所有选择器必须匹配7.4 约束检查
1
2
3
4
type constraint interface {
add ( requestName , subRequestName string , device * draapi . Device , deviceID DeviceID ) bool
remove ( requestName , subRequestName string , device * draapi . Device , deviceID DeviceID )
}
matchAttributeConstraint :约束集中的所有设备在指定属性上必须有相同值。
distinctAttributeConstraint (incubating+):约束集中的设备在指定属性上必须有不同值。
分配设备时,所有约束逐一检查;如果某个约束失败,回滚之前已添加的约束:
1
2
3
4
5
6
7
8
9
for i , constraint := range alloc . constraints [ r . claimIndex ] {
added := constraint . add ( baseRequestName , subRequestName , device . Device , device . id )
if ! added {
for e := 0 ; e < i ; e ++ {
alloc . constraints [ r . claimIndex ][ e ]. remove ( ... )
}
return false , nil , nil
}
}
8. 设备污点与驱逐
8.1 污点驱逐控制器
文件: pkg/controller/devicetainteviction/device_taint_eviction.go
控制器监控设备污点,驱逐使用不可容忍 NoExecute 污点设备的 Pod。
8.1.1 污点来源
污点有两个来源:
ResourceSlice 中设备的直接 Taints 字段 DeviceTaintRule 匹配选择器动态添加的污点 8.1.2 驱逐时间计算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func ( tc * Controller ) claimEvictionTime ( allocatedDevice DeviceRequestAllocationResult , slice * ResourceSlice , device * Device ) * metav1 . Time {
for taint := range tc . allEvictingDeviceTaints ( allocatedDevice , slice , device ) {
newEvictionTime := taint . deviceTaint (). TimeAdded
for _ , toleration := range allocatedDevice . Tolerations {
if resourceclaim . ToleratesTaint ( toleration , * taint . deviceTaint ()) {
if toleration . TolerationSeconds == nil {
continue nextTaint // 永久容忍
}
newEvictionTime = taint . deviceTaint (). TimeAdded + tolerationSeconds
}
}
// 无匹配容忍 → 立即驱逐
// 取所有污点中最早的驱逐时间
}
}
8.1.3 驱逐执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func ( tc * Controller ) evictPod ( podRef tainteviction . NamespacedObject , eviction evictionAndReason ) {
tc . deletePodAt [ podRef ] = eviction
delay := eviction . when . Sub ( time . Now ())
if delay <= 0 {
tc . workqueue . Add ( workItem { podRef : podRef })
} else {
tc . workqueue . AddAfter ( workItem { podRef : podRef }, delay )
}
}
func ( tc * Controller ) maybeDeletePod ( ctx context . Context , podRef tainteviction . NamespacedObject ) error {
// 设置 DisruptionTarget 条件(原因:DeletionByDeviceTaintManager)
// 删除 Pod(带 UID 前提条件避免竞态)
addConditionAndDeletePod ( ctx , pod )
}
8.1.4 Generation 管理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
type pool struct {
slices map [ string ] * resourceapi . ResourceSlice
maxGeneration int64
}
// 只考虑当前 generation 的切片
func ( p pool ) getTaintedDevices () [] taintedDevice {
for _ , slice := range p . slices {
if slice . Spec . Pool . Generation != p . maxGeneration {
continue // 跳过过时切片
}
// ... 收集污点设备 ...
}
}
9. Feature Gates
文件: pkg/features/kube_features.go
Feature Gate 描述 DynamicResourceAllocationDRA 基础功能 DRAAdminAccess管理访问(监控/管理设备) DRAConsumableCapacity可消费容量设备(部分分配) DRADeviceBindingConditions设备绑定条件 DRADeviceTaintRulesDeviceTaintRule API 支持 DRADeviceTaintsResourceSlice 设备污点 DRAExtendedResourceDRA 支持的扩展资源 DRAListTypeAttributes列表类型设备属性 DRANodeAllocatableResourcesDRA 设备映射到节点可分配资源 DRAPartitionableDevices可分区(共享)设备 DRAPrioritizedListFirstAvailable 优先级列表 DRAResourceClaimDeviceStatusResourceClaim 中每设备状态 DRAResourceClaimGranularStatusAuthorization细粒度设备状态授权 DRAResourcePoolStatusResourcePoolStatusRequest API DRASchedulerFilterTimeout调度器 DRA Filter 超时 DRAWorkloadResourceClaimsPodGroup 基础 ResourceClaims
10. 端到端流程总结
10.1 完整生命周期
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
┌─────────────────────────────────────────────────────────────┐
│ 1. DRA 驱动启动 │
│ ├─ 发布 ResourceSlice(声明可用设备) │
│ ├─ 注册到 kubelet(plugins_registry socket) │
│ └─ 启动 gRPC 服务器 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 2. 用户创建 Pod(引用 ResourceClaimTemplate) │
│ ├─ ResourceClaim Controller 创建 ResourceClaim │
│ └─ 更新 Pod Status 中的 Claim 名称 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. 调度器处理 │
│ ├─ PreEnqueue: 验证 Claim 存在 │
│ ├─ PreFilter: 构建 ClaimStore, 创建 Allocator │
│ ├─ Filter: 对每个节点运行分配算法 │
│ ├─ Score: 基于 FirstAvailable 偏好评分 │
│ ├─ Reserve: 标记进行中分配 │
│ └─ PreBind: 持久化分配结果到 API Server │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 4. Kubelet 准备资源 │
│ ├─ PrepareResources: 验证预留 → 缓存更新 → gRPC 调用 │
│ │ └─ NodePrepareResources → 返回 CDI 设备 ID │
│ ├─ GetResources: 为容器提供 CDI 设备 │
│ └─ 容器运行时通过 CDI 规范注入设备 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 5. Pod 运行 │
│ ├─ 健康监控: WatchResources 流持续更新设备状态 │
│ ├─ 污点驱逐: 如设备被污染且无容忍,Pod 被驱逐 │
│ └─ 绑定条件: 如设备需要绑定条件,等待就绪 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 6. Pod 终止清理 │
│ ├─ Kubelet: NodeUnprepareResources → 移除 CDI 设备 │
│ ├─ ResourceClaim Controller: │
│ │ ├─ 移除 ReservedFor 条目 │
│ │ ├─ 清除分配(ReservedFor 为空时) │
│ │ └─ 删除模板生成的 Claim │
│ └─ 设备变为可用,供其他 Pod 分配 │
└─────────────────────────────────────────────────────────────┘
10.2 关键交互时序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Pod创建 ──→ ResourceClaim Controller ──→ 创建 ResourceClaim
│
调度器 ◀──────────────────────────────────────┘
│
├─ PreFilter: 收集 Cluster 状态
│ ├─ ResourceSlice (可用设备)
│ ├─ AllocatedState (已分配设备)
│ └─ DeviceClass (选择器+配置)
│
├─ Filter: 对每个节点
│ └─ Allocator.Allocate(node, claims)
│ ├─ GatherPools (收集节点相关切片)
│ ├─ 验证请求 + 初始化约束
│ └─ allocateOne (递归搜索+回溯)
│ ├─ CEL 选择器匹配
│ ├─ 污点/容忍检查
│ └─ 约束检查 (matchAttribute/distinctAttribute)
│
├─ Reserve: SignalClaimPendingAllocation
│
└─ PreBind: UpdateStatus (allocation + reservedFor)
│
Kubelet ◀───────────┘
│
├─ PrepareResources
│ └─ NodePrepareResources gRPC
│ └─ 返回 CDI Device IDs
│
├─ GetResources → ContainerInfo
│
└─ Container Runtime → CDI 注入
10.3 核心文件索引
文件 描述 pkg/apis/resource/types.go内部 API 类型定义 staging/src/k8s.io/api/resource/v1/types.gov1 版本化 API 类型 pkg/apis/resource/validation/validation.goAPI 验证逻辑 pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go调度器插件主体 pkg/scheduler/framework/plugins/dynamicresources/dra_manager.goDRA 管理器(Claim/Slice/Class 缓存) pkg/scheduler/framework/plugins/dynamicresources/allocateddevices.go已分配设备跟踪 pkg/scheduler/framework/plugins/dynamicresources/claims.goClaim 存储结构 pkg/scheduler/framework/plugins/dynamicresources/extendeddynamicresources.go扩展资源支持 pkg/scheduler/framework/plugins/dynamicresources/nodeallocatabledynamicresources.go节点可分配资源 pkg/controller/resourceclaim/controller.goResourceClaim 控制器 pkg/controller/devicetainteviction/device_taint_eviction.go污点驱逐控制器 pkg/kubelet/cm/dra/manager.goKubelet DRA 管理器 pkg/kubelet/cm/dra/claiminfo.goClaim 信息缓存+检查点 pkg/kubelet/cm/dra/plugin/dra_plugin_manager.go插件注册管理 pkg/kubelet/cm/dra/plugin/dra_plugin.go单个插件 gRPC 封装 staging/src/k8s.io/dynamic-resource-allocation/structured/allocator.go分配器接口与工厂 staging/src/k8s.io/dynamic-resource-allocation/structured/internal/stable/allocator_stable.go稳定层分配算法 staging/src/k8s.io/dynamic-resource-allocation/kubeletplugin/draplugin.go驱动辅助库 staging/src/k8s.io/dynamic-resource-allocation/resourceslice/resourceslicecontroller.goResourceSlice 控制器 staging/src/k8s.io/kubelet/pkg/apis/dra/v1/api.pb.gogRPC 消息类型 staging/src/k8s.io/kubelet/pkg/apis/dra/v1/api_grpc.pb.gogRPC 服务定义 pkg/features/kube_features.goFeature Gate 定义