// staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go#L59// NewAPIServiceRegistrationController returns a new APIServiceRegistrationController.funcNewAPIServiceRegistrationController(apiServiceInformerinformers.APIServiceInformer,apiHandlerManagerAPIHandlerManager)*APIServiceRegistrationController{c:=&APIServiceRegistrationController{apiHandlerManager:apiHandlerManager,apiServiceLister:apiServiceInformer.Lister(),apiServiceSynced:apiServiceInformer.Informer().HasSynced,queue:workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),"APIServiceRegistrationController"),}apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc:c.addAPIService,UpdateFunc:c.updateAPIService,DeleteFunc:c.deleteAPIService,})c.syncFn=c.sync// 核心逻辑在这个方法里returnc}
// staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go#L419// AddAPIService adds an API service. It is not thread-safe, so only call it on one thread at a time please.// It's a slow moving API, so its ok to run the controller on a single threadfunc(s*APIAggregator)AddAPIService(apiService*v1.APIService)error{// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not// since they are wired against listers because they require multiple resources to respondifproxyHandler,exists:=s.proxyHandlers[apiService.Name];exists{proxyHandler.updateAPIService(apiService)ifs.openAPIAggregationController!=nil{s.openAPIAggregationController.UpdateAPIService(proxyHandler,apiService)}ifs.openAPIV3AggregationController!=nil{s.openAPIV3AggregationController.UpdateAPIService(proxyHandler,apiService)}returnnil}// 这里就是前面提到的 url 的拼接规则proxyPath:="/apis/"+apiService.Spec.Group+"/"+apiService.Spec.Version// v1. is a special case for the legacy API. It proxies to a wider set of endpoints.ifapiService.Name==legacyAPIServiceName{proxyPath="/api"}// 这里在构建 handler 了// register the proxy handlerproxyHandler:=&proxyHandler{localDelegate:s.delegateHandler,proxyCurrentCertKeyContent:s.proxyCurrentCertKeyContent,proxyTransport:s.proxyTransport,serviceResolver:s.serviceResolver,egressSelector:s.egressSelector,}proxyHandler.updateAPIService(apiService)ifs.openAPIAggregationController!=nil{s.openAPIAggregationController.AddAPIService(proxyHandler,apiService)}ifs.openAPIV3AggregationController!=nil{s.openAPIV3AggregationController.AddAPIService(proxyHandler,apiService)}s.proxyHandlers[apiService.Name]=proxyHandler// 开始注册 可以看到,这里同时注册了带/ 和不带/ 两个 paths.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath,proxyHandler)s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/",proxyHandler)// if we're dealing with the legacy group, we're done hereifapiService.Name==legacyAPIServiceName{returnnil}// if we've already registered the path with the handler, we don't want to do it again.ifs.handledGroups.Has(apiService.Spec.Group){returnnil}// it's time to register the group aggregation endpointgroupPath:="/apis/"+apiService.Spec.GroupgroupDiscoveryHandler:=&apiGroupHandler{codecs:aggregatorscheme.Codecs,groupName:apiService.Spec.Group,lister:s.lister,delegate:s.delegateHandler,}// aggregation is protecteds.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath,groupDiscoveryHandler)s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/",groupDiscoveryHandler)s.handledGroups.Insert(apiService.Spec.Group)returnnil}// RemoveAPIService removes the APIService from being handled. It is not thread-safe, so only call it on one thread at a time please.// It's a slow moving API, so it's ok to run the controller on a single thread.func(s*APIAggregator)RemoveAPIService(apiServiceNamestring){version:=v1helper.APIServiceNameToGroupVersion(apiServiceName)proxyPath:="/apis/"+version.Group+"/"+version.Version// v1. is a special case for the legacy API. It proxies to a wider set of endpoints.ifapiServiceName==legacyAPIServiceName{proxyPath="/api"}// 移除则是调用 Unregisters.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath)s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath+"/")ifs.openAPIAggregationController!=nil{s.openAPIAggregationController.RemoveAPIService(apiServiceName)}ifs.openAPIV3AggregationController!=nil{s.openAPIAggregationController.RemoveAPIService(apiServiceName)}delete(s.proxyHandlers,apiServiceName)// TODO unregister group level discovery when there are no more versions for the group// We don't need this right away because the handler properly delegates when no versions are present}
// staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go#L227func(r*proxyHandler)updateAPIService(apiService*apiregistrationv1api.APIService){ifapiService.Spec.Service==nil{r.handlingInfo.Store(proxyHandlingInfo{local:true})return}proxyClientCert,proxyClientKey:=r.proxyCurrentCertKeyContent()clientConfig:=&restclient.Config{TLSClientConfig:restclient.TLSClientConfig{Insecure:apiService.Spec.InsecureSkipTLSVerify,// 拼接 Service 的 DNS 记录ServerName:apiService.Spec.Service.Name+"."+apiService.Spec.Service.Namespace+".svc",CertData:proxyClientCert,KeyData:proxyClientKey,CAData:apiService.Spec.CABundle,},}clientConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(x509MissingSANCounter,x509InsecureSHA1Counter,))newInfo:=proxyHandlingInfo{name:apiService.Name,restConfig:clientConfig,serviceName:apiService.Spec.Service.Name,serviceNamespace:apiService.Spec.Service.Namespace,servicePort:*apiService.Spec.Service.Port,serviceAvailable:apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService,apiregistrationv1api.Available),}ifr.egressSelector!=nil{networkContext:=egressselector.Cluster.AsNetworkContext()varegressDialerutilnet.DialFuncegressDialer,err:=r.egressSelector.Lookup(networkContext)iferr!=nil{klog.Warning(err.Error())}else{newInfo.restConfig.Dial=egressDialer}}elseifr.proxyTransport!=nil&&r.proxyTransport.DialContext!=nil{newInfo.restConfig.Dial=r.proxyTransport.DialContext}newInfo.proxyRoundTripper,newInfo.transportBuildingError=restclient.TransportFor(newInfo.restConfig)ifnewInfo.transportBuildingError!=nil{klog.Warning(newInfo.transportBuildingError.Error())}r.handlingInfo.Store(newInfo)}
核心如下,根据 service name + namespace 组装成了 svc 的 DNS 记录,在加上 TLS 证书等信息的就构建成了一个 restClient,使用该客户端就可以访问到对应的后端 service 了。
// Handle registers the handler for the given pattern.// If a handler already exists for pattern, Handle panics.func(m*PathRecorderMux)Handle(pathstring,handlerhttp.Handler){m.lock.Lock()deferm.lock.Unlock()m.trackCallers(path)m.exposedPaths=append(m.exposedPaths,path)m.pathToHandler[path]=handlerm.refreshMuxLocked()}
// PathRecorderMux wraps a mux object and records the registered exposedPaths.typePathRecorderMuxstruct{// name is used for logging so you can trace requests throughnamestringlocksync.MutexnotFoundHandlerhttp.HandlerpathToHandlermap[string]http.HandlerprefixToHandlermap[string]http.Handler// mux stores a pathHandler and is used to handle the actual serving.// Turns out, we want to accept trailing slashes, BUT we don't care about handling// everything under them. This does exactly matches only unless its explicitly requested to// do something differentmuxatomic.Value// exposedPaths is the list of paths that should be shown at /exposedPaths[]string// pathStacks holds the stacks of all registered paths. This allows us to show a more helpful message// before the "http: multiple registrations for %s" panic.pathStacksmap[string]string}
// ServeHTTP makes it an http.Handlerfunc(m*PathRecorderMux)ServeHTTP(whttp.ResponseWriter,r*http.Request){m.mux.Load().(*pathHandler).ServeHTTP(w,r)}// ServeHTTP makes it an http.Handlerfunc(h*pathHandler)ServeHTTP(whttp.ResponseWriter,r*http.Request){// 先看下能否直接匹配,也就是 不带/的pathifexactHandler,ok:=h.pathToHandler[r.URL.Path];ok{klog.V(5).Infof("%v: %q satisfied by exact match",h.muxName,r.URL.Path)exactHandler.ServeHTTP(w,r)return}for_,prefixHandler:=rangeh.prefixHandlers{// 然后在看能否前缀匹配ifstrings.HasPrefix(r.URL.Path,prefixHandler.prefix){klog.V(5).Infof("%v: %q satisfied by prefix %v",h.muxName,r.URL.Path,prefixHandler.prefix)prefixHandler.handler.ServeHTTP(w,r)return}}klog.V(5).Infof("%v: %q satisfied by NotFoundHandler",h.muxName,r.URL.Path)h.notFoundHandler.ServeHTTP(w,r)}
// NewWithDelegate returns a new instance of APIAggregator from the given config.func(ccompletedConfig)NewWithDelegate(delegationTargetgenericapiserver.DelegationTarget)(*APIAggregator,error){// delegationTarget 就是委派目标的意思,前面提到过 kube-apiserver 里的 3 个 apiserver 以责任链形式组合的,前面处理不了的就委派给后面的来处理。// APIAggregator 这里的委派目标就是 apiservergenericServer,err:=c.GenericConfig.New("kube-aggregator",delegationTarget)iferr!=nil{returnnil,err}apiregistrationClient,err:=clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)iferr!=nil{returnnil,err}informerFactory:=informers.NewSharedInformerFactory(apiregistrationClient,5*time.Minute,// this is effectively used as a refresh interval right now. Might want to do something nicer later on.)// apiServiceRegistrationControllerInitiated is closed when APIServiceRegistrationController has finished "installing" all known APIServices.// At this point we know that the proxy handler knows about APIServices and can handle client requests.// Before it might have resulted in a 404 response which could have serious consequences for some controllers like GC and NS//
// Note that the APIServiceRegistrationController waits for APIServiceInformer to synced before doing its work.apiServiceRegistrationControllerInitiated:=make(chanstruct{})iferr:=genericServer.RegisterMuxAndDiscoveryCompleteSignal("APIServiceRegistrationControllerInitiated",apiServiceRegistrationControllerInitiated);err!=nil{returnnil,err}s:=&APIAggregator{GenericAPIServer:genericServer,delegateHandler:delegationTarget.UnprotectedHandler(),proxyTransport:c.ExtraConfig.ProxyTransport,proxyHandlers:map[string]*proxyHandler{},handledGroups:sets.String{},lister:informerFactory.Apiregistration().V1().APIServices().Lister(),APIRegistrationInformers:informerFactory,serviceResolver:c.ExtraConfig.ServiceResolver,openAPIConfig:c.GenericConfig.OpenAPIConfig,openAPIV3Config:c.GenericConfig.OpenAPIV3Config,egressSelector:c.GenericConfig.EgressSelector,proxyCurrentCertKeyContent:func()(bytes[]byte,bytes2[]byte){returnnil,nil},}// used later to filter the served resource by those that have expired.resourceExpirationEvaluator,err:=genericapiserver.NewResourceExpirationEvaluator(*c.GenericConfig.Version)iferr!=nil{returnnil,err}apiGroupInfo:=apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig,c.GenericConfig.RESTOptionsGetter,resourceExpirationEvaluator.ShouldServeForVersion(1,22))iferr:=s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo);err!=nil{returnnil,err}enabledVersions:=sets.NewString()forv:=rangeapiGroupInfo.VersionedResourcesStorageMap{enabledVersions.Insert(v)}if!enabledVersions.Has(v1.SchemeGroupVersion.Version){returnnil,fmt.Errorf("API group/version %s must be enabled",v1.SchemeGroupVersion.String())}apisHandler:=&apisHandler{codecs:aggregatorscheme.Codecs,lister:s.lister,discoveryGroup:discoveryGroup(enabledVersions),}s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis",apisHandler)s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/",apisHandler)apiserviceRegistrationController:=NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(),s)iflen(c.ExtraConfig.ProxyClientCertFile)>0&&len(c.ExtraConfig.ProxyClientKeyFile)>0{aggregatorProxyCerts,err:=dynamiccertificates.NewDynamicServingContentFromFiles("aggregator-proxy-cert",c.ExtraConfig.ProxyClientCertFile,c.ExtraConfig.ProxyClientKeyFile)iferr!=nil{returnnil,err}// We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the// context is not used at all. So passing a empty context shouldn't be a problemctx:=context.TODO()iferr:=aggregatorProxyCerts.RunOnce(ctx);err!=nil{returnnil,err}aggregatorProxyCerts.AddListener(apiserviceRegistrationController)s.proxyCurrentCertKeyContent=aggregatorProxyCerts.CurrentCertKeyContents.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert",func(postStartHookContextgenericapiserver.PostStartHookContext)error{// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver// TODO: See if we can pass ctx to the current methodctx,cancel:=context.WithCancel(context.Background())gofunc(){select{case<-postStartHookContext.StopCh:cancel()// stopCh closed, so cancel our contextcase<-ctx.Done():}}()goaggregatorProxyCerts.Run(ctx,1)returnnil})}availableController,err:=statuscontrollers.NewAvailableConditionController(informerFactory.Apiregistration().V1().APIServices(),c.GenericConfig.SharedInformerFactory.Core().V1().Services(),c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),apiregistrationClient.ApiregistrationV1(),c.ExtraConfig.ProxyTransport,(func()([]byte,[]byte))(s.proxyCurrentCertKeyContent),s.serviceResolver,c.GenericConfig.EgressSelector,)iferr!=nil{returnnil,err}s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers",func(contextgenericapiserver.PostStartHookContext)error{informerFactory.Start(context.StopCh)c.GenericConfig.SharedInformerFactory.Start(context.StopCh)returnnil})s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller",func(contextgenericapiserver.PostStartHookContext)error{goapiserviceRegistrationController.Run(context.StopCh,apiServiceRegistrationControllerInitiated)select{case<-context.StopCh:case<-apiServiceRegistrationControllerInitiated:}returnnil})s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller",func(contextgenericapiserver.PostStartHookContext)error{// if we end up blocking for long periods of time, we may need to increase workers.goavailableController.Run(5,context.StopCh)returnnil})ifutilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI)&&utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity){// Spawn a goroutine in aggregator apiserver to update storage version for// all built-in resourcess.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName,func(hookContextgenericapiserver.PostStartHookContext)error{// Wait for apiserver-identity to exist first before updating storage// versions, to avoid storage version GC accidentally garbage-collecting// storage versions.kubeClient,err:=kubernetes.NewForConfig(hookContext.LoopbackClientConfig)iferr!=nil{returnerr}iferr:=wait.PollImmediateUntil(100*time.Millisecond,func()(bool,error){_,err:=kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(),s.GenericAPIServer.APIServerID,metav1.GetOptions{})ifapierrors.IsNotFound(err){returnfalse,nil}iferr!=nil{returnfalse,err}returntrue,nil},hookContext.StopCh);err!=nil{returnfmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",s.GenericAPIServer.APIServerID,err)}// Technically an apiserver only needs to update storage version once during bootstrap.// Reconcile StorageVersion objects every 10 minutes will help in the case that the// StorageVersion objects get accidentally modified/deleted by a different agent. In that// case, the reconciliation ensures future storage migration still works. If nothing gets// changed, the reconciliation update is a noop and gets short-circuited by the apiserver,// therefore won't change the resource version and trigger storage migration.gowait.PollImmediateUntil(10*time.Minute,func()(bool,error){// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)// share the same generic apiserver config. The same StorageVersion manager is used// to register all built-in resources when the generic apiservers install APIs.s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig,s.GenericAPIServer.APIServerID)returnfalse,nil},hookContext.StopCh)// Once the storage version updater finishes the first round of update,// the PostStartHook will return to unblock /healthz. The handler chain// won't block write requests anymore. Check every second since it's not// expensive.wait.PollImmediateUntil(1*time.Second,func()(bool,error){returns.GenericAPIServer.StorageVersionManager.Completed(),nil},hookContext.StopCh)returnnil})}returns,nil}
除了聚合 API,官方还提供了另一种方式以实现对标准 kubernetes API 接口的扩展:CRD(Custom Resource Definition ),能达到与聚合 API 基本一样的功能,而且更加易用,开发成本更小,但相较而言聚合 API 则更为灵活。针对这两种扩展方式如何选择,官方也提供了相应的参考。
通常,如果存在以下情况,CRD 可能更合适:
定制资源的字段不多;
你在组织内部使用该资源或者在一个小规模的开源项目中使用该资源,而不是在商业产品中使用;
聚合 API 可提供更多的高级 API 特性,也可对其他特性进行定制;例如,对存储层进行定制、对 protobuf 协议支持、对 logs、patch 等操作支持。
两种方式的核心区别是定义 api-resource 的方式不同。在 Aggregated APIServer 方式中,api-resource 是通过代码向 API 注册资源类型,而 Custom Resource 是直接通过 yaml 文件向 API 注册资源类型。
简单来说就是
CRD 是让 kube-apiserver 认识更多的对象类别(Kind)
Aggregated APIServer 是构建自己的 APIServer 服务。
虽然 CRD 更简单,但是缺少更多的灵活性,更详细的 CRDs 与 Aggregated API 的对比可参考官方文档。
不过大部分需求都可以通过 CRD 方式实现,而且官方也是比较推荐使用 CRD 进行扩展,Aggregated API 一般是用于接入已有的 apiserver。
5. 小结
本文主要分析了 Aggregated API 的大致实现,即:通过 controller watch APIServer 对象,然后动态注册 handler。