// api/etcdserverpb/rpc.proto 66 行
serviceWatch{// Watch watches for events happening or that have happened. Both input and output
// are streams; the input stream is for creating and canceling watchers and the output
// stream sends events. One watch RPC can watch on multiple key ranges, streaming events
// for several watches at once. The entire event history can be watched starting from the
// last compaction revision.
rpcWatch(streamWatchRequest)returns(streamWatchResponse){option(google.api.http)={post:"/v3/watch"body:"*"};}}
// server/etcdserver/api/v3rpc/watch.go 152行func(ws*watchServer)Watch(streampb.Watch_WatchServer)(errerror){sws:=serverWatchStream{lg:ws.lg,clusterID:ws.clusterID,memberID:ws.memberID,maxRequestBytes:ws.maxRequestBytes,sg:ws.sg,watchable:ws.watchable,ag:ws.ag,gRPCStream:stream,watchStream:ws.watchable.NewWatchStream(),// chan for sending control response like watcher created and canceled.ctrlStream:make(chan*pb.WatchResponse,ctrlStreamBufLen),progress:make(map[mvcc.WatchID]bool),prevKV:make(map[mvcc.WatchID]bool),fragment:make(map[mvcc.WatchID]bool),closec:make(chanstruct{}),}sws.wg.Add(1)// 第一个 goroutine sendLoop gofunc(){sws.sendLoop()sws.wg.Done()}()errc:=make(chanerror,1)// 第二个 goroutine recvLoopgofunc(){ifrerr:=sws.recvLoop();rerr!=nil{ifisClientCtxErr(stream.Context().Err(),rerr){sws.lg.Debug("failed to receive watch request from gRPC stream",zap.Error(rerr))}else{sws.lg.Warn("failed to receive watch request from gRPC stream",zap.Error(rerr))streamFailures.WithLabelValues("receive","watch").Inc()}errc<-rerr}}()select{caseerr=<-errc:iferr==context.Canceled{err=rpctypes.ErrGRPCWatchCanceled}close(sws.ctrlStream)case<-stream.Context().Done():err=stream.Context().Err()iferr==context.Canceled{err=rpctypes.ErrGRPCWatchCanceled}}sws.close()returnerr}
// server/etcdserver/api/v3rpc/watch.go 355行func(sws*serverWatchStream)sendLoop(){// watch ids that are currently activeids:=make(map[mvcc.WatchID]struct{})// watch responses pending on a watch id creation messagepending:=make(map[mvcc.WatchID][]*pb.WatchResponse)interval:=GetProgressReportInterval()progressTicker:=time.NewTicker(interval)// 省略部分逻辑for{select{casewresp,ok:=<-sws.watchStream.Chan():if!ok{return}evs:=wresp.Eventsevents:=make([]*mvccpb.Event,len(evs))sws.mu.RLock()needPrevKV:=sws.prevKV[wresp.WatchID]sws.mu.RUnlock()fori:=rangeevs{events[i]=&evs[i]ifneedPrevKV&&!IsCreateEvent(evs[i]){opt:=mvcc.RangeOptions{Rev:evs[i].Kv.ModRevision-1}r,err:=sws.watchable.Range(context.TODO(),evs[i].Kv.Key,nil,opt)iferr==nil&&len(r.KVs)!=0{events[i].PrevKv=&(r.KVs[0])}}}canceled:=wresp.CompactRevision!=0wr:=&pb.WatchResponse{Header:sws.newResponseHeader(wresp.Revision),WatchId:int64(wresp.WatchID),Events:events,CompactRevision:wresp.CompactRevision,Canceled:canceled,}if_,okID:=ids[wresp.WatchID];!okID{// buffer if id not yet announcedwrs:=append(pending[wresp.WatchID],wr)pending[wresp.WatchID]=wrscontinue}mvcc.ReportEventReceived(len(evs))sws.mu.RLock()fragmented,ok:=sws.fragment[wresp.WatchID]sws.mu.RUnlock()varserrerrorif!fragmented&&!ok{serr=sws.gRPCStream.Send(wr)}else{serr=sendFragments(wr,sws.maxRequestBytes,sws.gRPCStream.Send)}ifserr!=nil{ifisClientCtxErr(sws.gRPCStream.Context().Err(),serr){sws.lg.Debug("failed to send watch response to gRPC stream",zap.Error(serr))}else{sws.lg.Warn("failed to send watch response to gRPC stream",zap.Error(serr))streamFailures.WithLabelValues("send","watch").Inc()}return}sws.mu.Lock()iflen(evs)>0&&sws.progress[wresp.WatchID]{// elide next progress update if sent a key updatesws.progress[wresp.WatchID]=false}sws.mu.Unlock()casec,ok:=<-sws.ctrlStream:if!ok{return}iferr:=sws.gRPCStream.Send(c);err!=nil{ifisClientCtxErr(sws.gRPCStream.Context().Err(),err){sws.lg.Debug("failed to send watch control response to gRPC stream",zap.Error(err))}else{sws.lg.Warn("failed to send watch control response to gRPC stream",zap.Error(err))streamFailures.WithLabelValues("send","watch").Inc()}return}// track id creationwid:=mvcc.WatchID(c.WatchId)ifc.Canceled{delete(ids,wid)continue}ifc.Created{// flush buffered eventsids[wid]=struct{}{}for_,v:=rangepending[wid]{mvcc.ReportEventReceived(len(v.Events))iferr:=sws.gRPCStream.Send(v);err!=nil{ifisClientCtxErr(sws.gRPCStream.Context().Err(),err){sws.lg.Debug("failed to send pending watch response to gRPC stream",zap.Error(err))}else{sws.lg.Warn("failed to send pending watch response to gRPC stream",zap.Error(err))streamFailures.WithLabelValues("send","watch").Inc()}return}}delete(pending,wid)}case<-progressTicker.C:sws.mu.Lock()forid,ok:=rangesws.progress{ifok{sws.watchStream.RequestProgress(id)}sws.progress[id]=true}sws.mu.Unlock()case<-sws.closec:return}}}
func(s*watchableStore)progress(w*watcher){s.mu.RLock()defers.mu.RUnlock()if_,ok:=s.synced.watchers[w];ok{w.send(WatchResponse{WatchID:w.id,Revision:s.rev()})// If the ch is full, this watcher is receiving events.// We do not need to send progress at all.}}
for{select{// 从 chan 中取出 event casewresp,ok:=<-sws.watchStream.Chan():if!ok{return}evs:=wresp.Eventsevents:=make([]*mvccpb.Event,len(evs))sws.mu.RLock()needPrevKV:=sws.prevKV[wresp.WatchID]sws.mu.RUnlock()fori:=rangeevs{events[i]=&evs[i]ifneedPrevKV&&!IsCreateEvent(evs[i]){opt:=mvcc.RangeOptions{Rev:evs[i].Kv.ModRevision-1}r,err:=sws.watchable.Range(context.TODO(),evs[i].Kv.Key,nil,opt)iferr==nil&&len(r.KVs)!=0{events[i].PrevKv=&(r.KVs[0])}}}canceled:=wresp.CompactRevision!=0wr:=&pb.WatchResponse{Header:sws.newResponseHeader(wresp.Revision),WatchId:int64(wresp.WatchID),Events:events,CompactRevision:wresp.CompactRevision,Canceled:canceled,}// 如果 watcherID 还没注册到 ids 列表中,就先把这个 event 缓存起来if_,okID:=ids[wresp.WatchID];!okID{// buffer if id not yet announcedwrs:=append(pending[wresp.WatchID],wr)pending[wresp.WatchID]=wrscontinue}mvcc.ReportEventReceived(len(evs))sws.mu.RLock()fragmented,ok:=sws.fragment[wresp.WatchID]sws.mu.RUnlock()varserrerror// 然后发送给 clientif!fragmented&&!ok{serr=sws.gRPCStream.Send(wr)}else{serr=sendFragments(wr,sws.maxRequestBytes,sws.gRPCStream.Send)}ifserr!=nil{ifisClientCtxErr(sws.gRPCStream.Context().Err(),serr){sws.lg.Debug("failed to send watch response to gRPC stream",zap.Error(serr))}else{sws.lg.Warn("failed to send watch response to gRPC stream",zap.Error(serr))streamFailures.WithLabelValues("send","watch").Inc()}return}sws.mu.Lock()iflen(evs)>0&&sws.progress[wresp.WatchID]{// elide next progress update if sent a key updatesws.progress[wresp.WatchID]=false}sws.mu.Unlock()
casec,ok:=<-sws.ctrlStream:if!ok{return}iferr:=sws.gRPCStream.Send(c);err!=nil{ifisClientCtxErr(sws.gRPCStream.Context().Err(),err){sws.lg.Debug("failed to send watch control response to gRPC stream",zap.Error(err))}else{sws.lg.Warn("failed to send watch control response to gRPC stream",zap.Error(err))streamFailures.WithLabelValues("send","watch").Inc()}return}// track id creationwid:=mvcc.WatchID(c.WatchId)// 如果是被取消了,就从ids中移除ifc.Canceled{delete(ids,wid)continue}// 如果创建则把 watcherID 注册到 ids 列表中,然后把缓存的event都发送到 clientifc.Created{// flush buffered eventsids[wid]=struct{}{}for_,v:=rangepending[wid]{mvcc.ReportEventReceived(len(v.Events))iferr:=sws.gRPCStream.Send(v);err!=nil{ifisClientCtxErr(sws.gRPCStream.Context().Err(),err){sws.lg.Debug("failed to send pending watch response to gRPC stream",zap.Error(err))}else{sws.lg.Warn("failed to send pending watch response to gRPC stream",zap.Error(err))streamFailures.WithLabelValues("send","watch").Inc()}return}}delete(pending,wid)}
// server/etcdserver/api/v3rpc/watch.go 238行func(sws*serverWatchStream)recvLoop()error{for{req,err:=sws.gRPCStream.Recv()iferr==io.EOF{returnnil}iferr!=nil{returnerr}switchuv:=req.RequestUnion.(type){case*pb.WatchRequest_CreateRequest:ifuv.CreateRequest==nil{break}creq:=uv.CreateRequestiflen(creq.Key)==0{creq.Key=[]byte{0}}iflen(creq.RangeEnd)==0{creq.RangeEnd=nil}iflen(creq.RangeEnd)==1&&creq.RangeEnd[0]==0{creq.RangeEnd=[]byte{}}if!sws.isWatchPermitted(creq){wr:=&pb.WatchResponse{Header:sws.newResponseHeader(sws.watchStream.Rev()),WatchId:creq.WatchId,Canceled:true,Created:true,CancelReason:rpctypes.ErrGRPCPermissionDenied.Error(),}select{casesws.ctrlStream<-wr:continuecase<-sws.closec:returnnil}}filters:=FiltersFromRequest(creq)wsrev:=sws.watchStream.Rev()rev:=creq.StartRevisionifrev==0{rev=wsrev+1}id,err:=sws.watchStream.Watch(mvcc.WatchID(creq.WatchId),creq.Key,creq.RangeEnd,rev,filters...)iferr==nil{sws.mu.Lock()ifcreq.ProgressNotify{sws.progress[id]=true}ifcreq.PrevKv{sws.prevKV[id]=true}ifcreq.Fragment{sws.fragment[id]=true}sws.mu.Unlock()}wr:=&pb.WatchResponse{Header:sws.newResponseHeader(wsrev),WatchId:int64(id),Created:true,Canceled:err!=nil,}iferr!=nil{wr.CancelReason=err.Error()}select{casesws.ctrlStream<-wr:case<-sws.closec:returnnil}case*pb.WatchRequest_CancelRequest:ifuv.CancelRequest!=nil{id:=uv.CancelRequest.WatchIderr:=sws.watchStream.Cancel(mvcc.WatchID(id))iferr==nil{sws.ctrlStream<-&pb.WatchResponse{Header:sws.newResponseHeader(sws.watchStream.Rev()),WatchId:id,Canceled:true,}sws.mu.Lock()delete(sws.progress,mvcc.WatchID(id))delete(sws.prevKV,mvcc.WatchID(id))delete(sws.fragment,mvcc.WatchID(id))sws.mu.Unlock()}}case*pb.WatchRequest_ProgressRequest:ifuv.ProgressRequest!=nil{sws.ctrlStream<-&pb.WatchResponse{Header:sws.newResponseHeader(sws.watchStream.Rev()),WatchId:-1,// response is not associated with any WatchId and will be broadcast to all watch channels}}default:continue}}}
case*pb.WatchRequest_CreateRequest:ifuv.CreateRequest==nil{break}// 组装参数creq:=uv.CreateRequestiflen(creq.Key)==0{// \x00 is the smallest keycreq.Key=[]byte{0}}iflen(creq.RangeEnd)==0{// force nil since watchstream.Watch distinguishes// between nil and []byte{} for single key / >=creq.RangeEnd=nil}iflen(creq.RangeEnd)==1&&creq.RangeEnd[0]==0{// support >= key queriescreq.RangeEnd=[]byte{}}// 校验是否能够执行 watch 请求,主要是判断用户有没有操作这个key的权限if!sws.isWatchPermitted(creq){// 如果没有权限就发送一个同时携带 Created 和 Canceled 标记的消息给前面的 sendLoopwr:=&pb.WatchResponse{Header:sws.newResponseHeader(sws.watchStream.Rev()),WatchId:creq.WatchId,Canceled:true,Created:true,CancelReason:rpctypes.ErrGRPCPermissionDenied.Error(),}select{casesws.ctrlStream<-wr:continuecase<-sws.closec:returnnil}}// 若有权限则发送一个带 Created 标记的消息给前面的 sendLoop,以创建 watcherfilters:=FiltersFromRequest(creq)wsrev:=sws.watchStream.Rev()rev:=creq.StartRevisionifrev==0{rev=wsrev+1}// 创建一个 watcher,并返回其 idid,err:=sws.watchStream.Watch(mvcc.WatchID(creq.WatchId),creq.Key,creq.RangeEnd,rev,filters...)iferr==nil{sws.mu.Lock()ifcreq.ProgressNotify{sws.progress[id]=true}ifcreq.PrevKv{sws.prevKV[id]=true}ifcreq.Fragment{sws.fragment[id]=true}sws.mu.Unlock()}wr:=&pb.WatchResponse{Header:sws.newResponseHeader(wsrev),WatchId:int64(id),Created:true,Canceled:err!=nil,}iferr!=nil{wr.CancelReason=err.Error()}select{casesws.ctrlStream<-wr:case<-sws.closec:returnnil}
case*pb.WatchRequest_ProgressRequest:ifuv.ProgressRequest!=nil{sws.ctrlStream<-&pb.WatchResponse{Header:sws.newResponseHeader(sws.watchStream.Rev()),WatchId:-1,// response is not associated with any WatchId and will be broadcast to all watch channels}}