本文主要记录了 k8s 中的 kubedns 是如何工作的,如何将 service DNS 记录解析为 IP 地址的。包括 CoreDNS 如何解析请求的,具体的数据来源以及处理逻辑,同时 Pod 又是怎么知道要把 DNS 解析请求发送给 CoreDNS 的等等问题。
1. CoreDNS 与 KubeDNS
CoreDNS 是一个 Go 语言实现的 DNS Server,通过 链式插件(chains plugins) 实现的,因此用户可以很轻松的以添加插件的方式在实现自定义逻辑。同时 CoreDNS 也是一个 CNCF 的毕业项目,稳定性、可用性方面不用担心。
为什么需要 KubeDNS 组件
Kubernetes Service 通过虚拟 IP 地址或者节点端口为用户应用提供访问入口,然而这些 IP 地址和端口是动态分配的,实际项目中无法把一个可变的入口发布出去供用户访问。为了解决这个问题,Kubernetes 提供了内置的域名服务,用户定义的服务会自动获取域名,通过域名解析,可以对外向用户提供一个固定的服务访问地址。
// plugin/kubernetes/setup.go#33funcsetup(c*caddy.Controller)error{// Do not call klog.InitFlags(nil) here. It will cause reload to panic.klog.SetOutput(os.Stdout)// 检查了 corefile 中 kubernetes 配置的定义,并配置了一些缺省值k,err:=kubernetesParse(c)iferr!=nil{returnplugin.Error(pluginName,err)}// 启动了对 pod, service, endpoint 三种资源增、删、改的 watch,并注册了一些回调// 注意:pod 是否启动 watch 是根据配置文件中 pod 的值来决定的,如果值不是 verified 就不会启动 pod 的 watch// 这里的 watch 方法观测到变化后,仅仅只改变 dns.modified 这个值,它会将该值设置为当前时间戳onStart,onShut,err:=k.InitKubeCache(context.Background())iferr!=nil{returnplugin.Error(pluginName,err)}ifonStart!=nil{c.OnStartup(onStart)}ifonShut!=nil{c.OnShutdown(onShut)}// 添加插件到插件链里dnsserver.GetConfig(c).AddPlugin(func(nextplugin.Handler)plugin.Handler{k.Next=nextreturnk})// get locally bound addressesc.OnStartup(func()error{k.localIPs=boundIPs(c)returnnil})returnnil}
//plugin/kubernetes/controller.go#563func(dns*dnsControl)Add(objinterface{}){dns.updateModified()}func(dns*dnsControl)Delete(objinterface{}){dns.updateModified()}func(dns*dnsControl)Update(oldObj,newObjinterface{}){dns.detectChanges(oldObj,newObj)}// updateModified set dns.modified to the current time.func(dns*dnsControl)updateModified(){unix:=time.Now().Unix()atomic.StoreInt64(&dns.modified,unix)}// updateExtModified set dns.extModified to the current time.func(dns*dnsControl)updateExtModifed(){unix:=time.Now().Unix()atomic.StoreInt64(&dns.extModified,unix)}// detectChanges detects changes in objects, and updates the modified timestampfunc(dns*dnsControl)detectChanges(oldObj,newObjinterface{}){// If both objects have the same resource version, they are identical.ifnewObj!=nil&&oldObj!=nil&&(oldObj.(meta.Object).GetResourceVersion()==newObj.(meta.Object).GetResourceVersion()){return}obj:=newObjifobj==nil{obj=oldObj}switchob:=obj.(type){case*object.Service:imod,emod:=serviceModified(oldObj,newObj)ifimod{dns.updateModified()}ifemod{dns.updateExtModifed()}case*object.Pod:dns.updateModified()case*object.Endpoints:if!endpointsEquivalent(oldObj.(*object.Endpoints),newObj.(*object.Endpoints)){dns.updateModified()}default:log.Warningf("Updates for %T not supported.",ob)}}
// plugin/kubernetes/kubernetes.go#95func(k*Kubernetes)Services(ctxcontext.Context,staterequest.Request,exactbool,optplugin.Options)(svcs[]msg.Service,errerror){// We're looking again at types, which we've already done in ServeDNS, but there are some types k8s just can't answer.switchstate.QType(){// 如果是插件 DNS 服务版本的话就返回编译时的版本号casedns.TypeTXT:// 1 label + zone, label must be "dns-version".t,_:=dnsutil.TrimZone(state.Name(),state.Zone)segs:=dns.SplitDomainName(t)iflen(segs)!=1{returnnil,nil}ifsegs[0]!="dns-version"{returnnil,nil}svc:=msg.Service{Text:DNSSchemaVersion,TTL:28800,Key:msg.Path(state.QName(),coredns)}return[]msg.Service{svc},nil// 如果查询的是 NS 类型记录,就返回 CoreDNS service 自身在集群里的记录casedns.TypeNS:// We can only get here if the qname equals the zone, see ServeDNS in handler.go.nss:=k.nsAddrs(false,state.Zone)varsvcs[]msg.Servicefor_,ns:=rangenss{ifns.Header().Rrtype==dns.TypeA{svcs=append(svcs,msg.Service{Host:ns.(*dns.A).A.String(),Key:msg.Path(ns.Header().Name,coredns),TTL:k.ttl})continue}ifns.Header().Rrtype==dns.TypeAAAA{svcs=append(svcs,msg.Service{Host:ns.(*dns.AAAA).AAAA.String(),Key:msg.Path(ns.Header().Name,coredns),TTL:k.ttl})}}returnsvcs,nil}ifisDefaultNS(state.Name(),state.Zone){nss:=k.nsAddrs(false,state.Zone)varsvcs[]msg.Servicefor_,ns:=rangenss{ifns.Header().Rrtype==dns.TypeA&&state.QType()==dns.TypeA{svcs=append(svcs,msg.Service{Host:ns.(*dns.A).A.String(),Key:msg.Path(state.QName(),coredns),TTL:k.ttl})continue}ifns.Header().Rrtype==dns.TypeAAAA&&state.QType()==dns.TypeAAAA{svcs=append(svcs,msg.Service{Host:ns.(*dns.AAAA).AAAA.String(),Key:msg.Path(state.QName(),coredns),TTL:k.ttl})}}returnsvcs,nil}// 到这里才正式开始解析 DNS 请求s,e:=k.Records(ctx,state,false)// SRV for external services is not yet implemented, so remove those records.ifstate.QType()!=dns.TypeSRV{returns,e}internal:=[]msg.Service{}for_,svc:=ranges{ift,_:=svc.HostType();t!=dns.TypeCNAME{internal=append(internal,svc)}}returninternal,e}
// plugin/kubernetes/kubernetes.go#412func(k*Kubernetes)findPods(rrecordRequest,zonestring)(pods[]msg.Service,errerror){ifk.podMode==podModeDisabled{returnnil,errNoItems}namespace:=r.namespaceif!k.namespaceExposed(namespace){returnnil,errNoItems}podname:=r.service// handle empty pod nameifpodname==""{ifk.namespaceExposed(namespace){// NODATAreturnnil,nil}// NXDOMAINreturnnil,errNoItems}zonePath:=msg.Path(zone,coredns)ip:=""ifstrings.Count(podname,"-")==3&&!strings.Contains(podname,"--"){ip=strings.ReplaceAll(podname,"-",".")}else{ip=strings.ReplaceAll(podname,"-",":")}ifk.podMode==podModeInsecure{if!k.namespaceExposed(namespace){// namespace does not existreturnnil,errNoItems}// If ip does not parse as an IP address, we return an error, otherwise we assume a CNAME and will try to resolve it in backend_lookup.goifnet.ParseIP(ip)==nil{returnnil,errNoItems}return[]msg.Service{{Key:strings.Join([]string{zonePath,Pod,namespace,podname},"/"),Host:ip,TTL:k.ttl}},err}// PodModeVerifiederr=errNoItemsfor_,p:=rangek.APIConn.PodIndex(ip){// check for matching ip and namespaceifip==p.PodIP&&match(namespace,p.Namespace){s:=msg.Service{Key:strings.Join([]string{zonePath,Pod,namespace,podname},"/"),Host:ip,TTL:k.ttl}pods=append(pods,s)err=nil}}returnpods,err}
// findServices returns the services matching r from the cache.func(k*Kubernetes)findServices(rrecordRequest,zonestring)(services[]msg.Service,errerror){if!k.namespaceExposed(r.namespace){returnnil,errNoItems}// handle empty service nameifr.service==""{ifk.namespaceExposed(r.namespace){// NODATAreturnnil,nil}// NXDOMAINreturnnil,errNoItems}err=errNoItemsvar(endpointsListFuncfunc()[]*object.EndpointsendpointsList[]*object.EndpointsserviceList[]*object.Service)idx:=object.ServiceKey(r.service,r.namespace)serviceList=k.APIConn.SvcIndex(idx)endpointsListFunc=func()[]*object.Endpoints{returnk.APIConn.EpIndex(idx)}zonePath:=msg.Path(zone,coredns)for_,svc:=rangeserviceList{if!(match(r.namespace,svc.Namespace)&&match(r.service,svc.Name)){continue}// If "ignore empty_service" option is set and no endpoints exist, return NXDOMAIN unless// it's a headless or externalName service (covered below).ifk.opts.ignoreEmptyService&&svc.Type!=api.ServiceTypeExternalName&&!svc.Headless(){// serve NXDOMAIN if no endpoint is able to answerpodsCount:=0for_,ep:=rangeendpointsListFunc(){for_,eps:=rangeep.Subsets{podsCount+=len(eps.Addresses)}}ifpodsCount==0{continue}}// External serviceifsvc.Type==api.ServiceTypeExternalName{s:=msg.Service{Key:strings.Join([]string{zonePath,Svc,svc.Namespace,svc.Name},"/"),Host:svc.ExternalName,TTL:k.ttl}ift,_:=s.HostType();t==dns.TypeCNAME{s.Key=strings.Join([]string{zonePath,Svc,svc.Namespace,svc.Name},"/")services=append(services,s)err=nil}continue}// Endpoint query or headless serviceifsvc.Headless()||r.endpoint!=""{ifendpointsList==nil{endpointsList=endpointsListFunc()}for_,ep:=rangeendpointsList{ifobject.EndpointsKey(svc.Name,svc.Namespace)!=ep.Index{continue}for_,eps:=rangeep.Subsets{for_,addr:=rangeeps.Addresses{// See comments in parse.go parseRequest about the endpoint handling.ifr.endpoint!=""{if!match(r.endpoint,endpointHostname(addr,k.endpointNameMode)){continue}}for_,p:=rangeeps.Ports{if!(matchPortAndProtocol(r.port,p.Name,r.protocol,p.Protocol)){continue}s:=msg.Service{Host:addr.IP,Port:int(p.Port),TTL:k.ttl}s.Key=strings.Join([]string{zonePath,Svc,svc.Namespace,svc.Name,endpointHostname(addr,k.endpointNameMode)},"/")err=nilservices=append(services,s)}}}}continue}// ClusterIP servicefor_,p:=rangesvc.Ports{if!(matchPortAndProtocol(r.port,p.Name,r.protocol,string(p.Protocol))){continue}err=nilfor_,ip:=rangesvc.ClusterIPs{s:=msg.Service{Host:ip,Port:int(p.Port),TTL:k.ttl}s.Key=strings.Join([]string{zonePath,Svc,svc.Namespace,svc.Name},"/")services=append(services,s)}}}returnservices,err}
可以看到 pod 和 service 逻辑其实差不多,最终都是去 client-go 的 Indexer 缓存里查数据并返回,比如 service 是这样从 Indexer 里拿数据的:
nameservers:将用作于 Pod 的 DNS 服务器的 IP 地址列表。 最多可以指定 3 个 IP 地址。当 Pod 的 dnsPolicy 设置为 “None” 时, 列表必须至少包含一个 IP 地址,否则此属性是可选的。 所列出的服务器将合并到从指定的 DNS 策略生成的基本名称服务器,并删除重复的地址。
searches:用于在 Pod 中查找主机名的 DNS 搜索域的列表。此属性是可选的。 指定此属性时,所提供的列表将合并到根据所选 DNS 策略生成的基本搜索域名中。 重复的域名将被删除。Kubernetes 最多允许 6 个搜索域。
options:可选的对象列表,其中每个对象可能具有 name 属性(必需)和 value 属性(可选)。 此属性中的内容将合并到从指定的 DNS 策略生成的选项。 重复的条目将被删除。
// pkg/kubelet/network/dns/dns.go#L384// GetPodDNS returns DNS settings for the pod.func(c*Configurer)GetPodDNS(pod*v1.Pod)(*runtimeapi.DNSConfig,error){dnsConfig,err:=c.getHostDNSConfig(c.ResolverConfig)iferr!=nil{returnnil,err}// 首先获取类型,然后根据不同的配置,有不同的逻辑dnsType,err:=getPodDNSType(pod)iferr!=nil{klog.ErrorS(err,"Failed to get DNS type for pod. Falling back to DNSClusterFirst policy.","pod",klog.KObj(pod))dnsType=podDNSCluster// 没有指定就使用 DNSClusterFirst}switchdnsType{casepodDNSNone:// DNSNone should use empty DNS settings as the base.dnsConfig=&runtimeapi.DNSConfig{}casepodDNSCluster:iflen(c.clusterDNS)!=0{// For a pod with DNSClusterFirst policy, the cluster DNS server is// the only nameserver configured for the pod. The cluster DNS server// itself will forward queries to other nameservers that is configured// to use, in case the cluster DNS server cannot resolve the DNS query// itself.dnsConfig.Servers=[]string{}for_,ip:=rangec.clusterDNS{dnsConfig.Servers=append(dnsConfig.Servers,ip.String())}dnsConfig.Searches=c.generateSearchesForDNSClusterFirst(dnsConfig.Searches,pod)dnsConfig.Options=defaultDNSOptionsbreak}// clusterDNS is not known. Pod with ClusterDNSFirst Policy cannot be created.nodeErrorMsg:=fmt.Sprintf("kubelet does not have ClusterDNS IP configured and cannot create Pod using %q policy. Falling back to %q policy.",v1.DNSClusterFirst,v1.DNSDefault)c.recorder.Eventf(c.nodeRef,v1.EventTypeWarning,"MissingClusterDNS",nodeErrorMsg)c.recorder.Eventf(pod,v1.EventTypeWarning,"MissingClusterDNS","pod: %q. %s",format.Pod(pod),nodeErrorMsg)// Fallback to DNSDefault.fallthroughcasepodDNSHost:// When the kubelet --resolv-conf flag is set to the empty string, use// DNS settings that override the docker default (which is to use// /etc/resolv.conf) and effectively disable DNS lookups. According to// the bind documentation, the behavior of the DNS client library when// "nameservers" are not specified is to "use the nameserver on the// local machine". A nameserver setting of localhost is equivalent to// this documented behavior.ifc.ResolverConfig==""{for_,nodeIP:=rangec.nodeIPs{ifutilnet.IsIPv6(nodeIP){dnsConfig.Servers=append(dnsConfig.Servers,"::1")}else{dnsConfig.Servers=append(dnsConfig.Servers,"127.0.0.1")}}iflen(dnsConfig.Servers)==0{dnsConfig.Servers=append(dnsConfig.Servers,"127.0.0.1")}dnsConfig.Searches=[]string{"."}}}ifpod.Spec.DNSConfig!=nil{dnsConfig=appendDNSConfig(dnsConfig,pod.Spec.DNSConfig)}returnc.formDNSConfigFitsLimits(dnsConfig,pod),nil}
首先获取类型,然后根据不同的配置,有不同的逻辑,如果 Pod 配置里没有指定 dnsPolicy 就使用 DNSClusterFirst
1
2
3
4
5
6
dnsType,err:=getPodDNSType(pod)iferr!=nil{klog.ErrorS(err,"Failed to get DNS type for pod. Falling back to DNSClusterFirst policy.","pod",klog.KObj(pod))dnsType=podDNSCluster//
}
// pkg/kubelet/network/dns/dns.go#430// SetupDNSinContainerizedMounter replaces the nameserver in containerized-mounter's rootfs/etc/resolv.conf with kubelet.ClusterDNSfunc(c*Configurer)SetupDNSinContainerizedMounter(mounterPathstring){resolvePath:=filepath.Join(strings.TrimSuffix(mounterPath,"/mounter"),"rootfs","etc","resolv.conf")dnsString:=""for_,dns:=rangec.clusterDNS{dnsString=dnsString+fmt.Sprintf("nameserver %s\n",dns)}ifc.ResolverConfig!=""{f,err:=os.Open(c.ResolverConfig)iferr!=nil{klog.ErrorS(err,"Could not open resolverConf file")}else{deferf.Close()_,hostSearch,_,err:=parseResolvConf(f)iferr!=nil{klog.ErrorS(err,"Error for parsing the resolv.conf file")}else{dnsString=dnsString+"search"for_,search:=rangehostSearch{dnsString=dnsString+fmt.Sprintf(" %s",search)}dnsString=dnsString+"\n"}}}iferr:=ioutil.WriteFile(resolvePath,[]byte(dnsString),0600);err!=nil{klog.ErrorS(err,"Could not write dns nameserver in the file","path",resolvePath)}}