// ListAndWatch returns a stream of List of Devices// Whenever a Device state change or a Device disappears, ListAndWatch// returns the new listfunc(c*GopherDevicePlugin)ListAndWatch(_*pluginapi.Empty,srvpluginapi.DevicePlugin_ListAndWatchServer)error{devs:=c.dm.Devices()klog.Infof("find devices:%s",String(devs))err:=srv.Send(&pluginapi.ListAndWatchResponse{Devices:devs})iferr!=nil{returnerrors.WithMessage(err,"send device failed")}klog.Infoln("waiting for device update")forrangec.dm.notify{devs=c.dm.Devices()klog.Infof("device update,new device list:%s",String(devs))_=srv.Send(&pluginapi.ListAndWatchResponse{Devices:devs})}returnnil}
发现设备的部分代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func(d*DeviceMonitor)List()error{err:=filepath.Walk(d.path,func(pathstring,infofs.FileInfo,errerror)error{ifinfo.IsDir(){klog.Infof("%s is dir,skip",path)returnnil}d.devices[info.Name()]=&pluginapi.Device{ID:info.Name(),Health:pluginapi.Healthy,}returnnil})returnerrors.WithMessagef(err,"walk [%s] failed",d.path)}
// Allocate is called during container creation so that the Device// Plugin can run device specific operations and instruct Kubelet// of the steps to make the Device available in the containerfunc(c*GopherDevicePlugin)Allocate(_context.Context,reqs*pluginapi.AllocateRequest)(*pluginapi.AllocateResponse,error){ret:=&pluginapi.AllocateResponse{}for_,req:=rangereqs.ContainerRequests{klog.Infof("[Allocate] received request: %v",strings.Join(req.DevicesIDs,","))resp:=pluginapi.ContainerAllocateResponse{Envs:map[string]string{"Gopher":strings.Join(req.DevicesIDs,","),},}ret.ContainerResponses=append(ret.ContainerResponses,&resp)}returnret,nil}
简单看一下 NVIDIA 的 device plugin 是怎么实现 Allocate 的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Allocate which return list of devices.func(plugin*NvidiaDevicePlugin)Allocate(ctxcontext.Context,reqs*pluginapi.AllocateRequest)(*pluginapi.AllocateResponse,error){responses:=pluginapi.AllocateResponse{}for_,req:=rangereqs.ContainerRequests{iferr:=plugin.rm.ValidateRequest(req.DevicesIDs);err!=nil{returnnil,fmt.Errorf("invalid allocation request for %q: %w",plugin.rm.Resource(),err)}response,err:=plugin.getAllocateResponse(req.DevicesIDs)iferr!=nil{returnnil,fmt.Errorf("failed to get allocate response: %v",err)}responses.ContainerResponses=append(responses.ContainerResponses,response)}return&responses,nil}
核心其实是这个方法:
1
2
3
4
// updateResponseForDeviceListEnvvar sets the environment variable for the requested devices.func(plugin*NvidiaDevicePlugin)updateResponseForDeviceListEnvvar(response*pluginapi.ContainerAllocateResponse,deviceIDs...string){response.Envs[plugin.deviceListEnvvar]=strings.Join(deviceIDs,",")}
func(p*iluvatarDevicePlugin)allocateDevicesByDeviceID(hostminoruint,numint)*pluginapi.DeviceSpec{vardevicepluginapi.DeviceSpechostPathPrefix:="/dev/"containerPathPrefix:="/dev/"// Expose the device node for iluvatar pod.device.HostPath=hostPathPrefix+deviceName+strconv.Itoa(int(hostminor))device.ContainerPath=containerPathPrefix+deviceName+strconv.Itoa(num)device.Permissions="rw"return&device}
// GetDevicePluginOptions returns options to be communicated with Device// Managerfunc(c*GopherDevicePlugin)GetDevicePluginOptions(_context.Context,_*pluginapi.Empty)(*pluginapi.DevicePluginOptions,error){return&pluginapi.DevicePluginOptions{PreStartRequired:true},nil}// GetPreferredAllocation returns a preferred set of devices to allocate// from a list of available ones. The resulting preferred allocation is not// guaranteed to be the allocation ultimately performed by the// devicemanager. It is only designed to help the devicemanager make a more// informed allocation decision when possible.func(c*GopherDevicePlugin)GetPreferredAllocation(_context.Context,_*pluginapi.PreferredAllocationRequest)(*pluginapi.PreferredAllocationResponse,error){return&pluginapi.PreferredAllocationResponse{},nil}// PreStartContainer is called, if indicated by Device Plugin during registeration phase,// before each container start. Device plugin can run device specific operations// such as reseting the device before making devices available to the containerfunc(c*GopherDevicePlugin)PreStartContainer(_context.Context,_*pluginapi.PreStartContainerRequest)(*pluginapi.PreStartContainerResponse,error){return&pluginapi.PreStartContainerResponse{},nil}
// Register registers the device plugin for the given resourceName with Kubelet.func(c*GopherDevicePlugin)Register()error{conn,err:=connect(pluginapi.KubeletSocket,common.ConnectTimeout)iferr!=nil{returnerrors.WithMessagef(err,"connect to %s failed",pluginapi.KubeletSocket)}deferconn.Close()client:=pluginapi.NewRegistrationClient(conn)reqt:=&pluginapi.RegisterRequest{Version:pluginapi.Version,Endpoint:path.Base(common.DeviceSocket),ResourceName:common.ResourceName,}_,err=client.Register(context.Background(),reqt)iferr!=nil{returnerrors.WithMessage(err,"register to kubelet failed")}returnnil}
// /pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go#L143-L165func(s*server)Register(ctxcontext.Context,r*api.RegisterRequest)(*api.Empty,error){klog.InfoS("Got registration request from device plugin with resource","resourceName",r.ResourceName)metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()if!s.isVersionCompatibleWithPlugin(r.Version){err:=fmt.Errorf(errUnsupportedVersion,r.Version,api.SupportedVersions)klog.InfoS("Bad registration request from device plugin with resource","resourceName",r.ResourceName,"err",err)return&api.Empty{},err}if!v1helper.IsExtendedResourceName(core.ResourceName(r.ResourceName)){err:=fmt.Errorf(errInvalidResourceName,r.ResourceName)klog.InfoS("Bad registration request from device plugin","err",err)return&api.Empty{},err}iferr:=s.connectClient(r.ResourceName,filepath.Join(s.socketDir,r.Endpoint));err!=nil{klog.InfoS("Error connecting to device plugin client","err",err)return&api.Empty{},err}return&api.Empty{},nil}
核心在 connectClient 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func(s*server)connectClient(namestring,socketPathstring)error{c:=NewPluginClient(name,socketPath,s.chandler)s.registerClient(name,c)iferr:=c.Connect();err!=nil{s.deregisterClient(name)klog.ErrorS(err,"Failed to connect to new client","resource",name)returnerr}gofunc(){s.runClient(name,c)}()returnnil}
[root@test ~]# k get poNAME READY STATUS RESTARTS AGE
gopher-pod 1/1 Running 0 3m9s
gopher-pod2 0/1 Pending 0 2s
因为只有一个 gopher 资源,因此第二个 Pod pending 了。
1
2
3
4
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning FailedScheduling 7s default-scheduler 0/1 nodes are available: 1 Insufficient lixueduan.com/gopher. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..