// server/storage/mvcc/watchable_store.go 73 行funcNew(lg*zap.Logger,bbackend.Backend,lelease.Lessor,cfgStoreConfig)WatchableKV{returnnewWatchableStore(lg,b,le,cfg)}funcnewWatchableStore(lg*zap.Logger,bbackend.Backend,lelease.Lessor,cfgStoreConfig)*watchableStore{iflg==nil{lg=zap.NewNop()}s:=&watchableStore{store:NewStore(lg,b,le,cfg),victimc:make(chanstruct{},1),unsynced:newWatcherGroup(),synced:newWatcherGroup(),stopc:make(chanstruct{}),}s.store.ReadView=&readView{s}s.store.WriteView=&writeView{s}ifs.le!=nil{// use this store as the deleter so revokes trigger watch eventss.le.SetRangeDeleter(func()lease.TxnDelete{returns.Write(traceutil.TODO())})}s.wg.Add(2)// 这里启动了两个 goroutine,分别是 syncWatchersLoop 和 syncVictimsLoopgos.syncWatchersLoop()gos.syncVictimsLoop()returns}
// server/storage/mvcc/watchable_store.go 211行func(s*watchableStore)syncWatchersLoop(){defers.wg.Done()for{s.mu.RLock()st:=time.Now()lastUnsyncedWatchers:=s.unsynced.size()s.mu.RUnlock()unsyncedWatchers:=0iflastUnsyncedWatchers>0{unsyncedWatchers=s.syncWatchers()}syncDuration:=time.Since(st)waitDuration:=100*time.Millisecond// more work pending?ifunsyncedWatchers!=0&&lastUnsyncedWatchers>unsyncedWatchers{// be fair to other store operations by yielding time takenwaitDuration=syncDuration}select{case<-time.After(waitDuration):case<-s.stopc:return}}}
// server/storage/mvcc/watchable_store.go 326行func(s*watchableStore)syncWatchers()int{s.mu.Lock()defers.mu.Unlock()ifs.unsynced.size()==0{return0}s.store.revMu.RLock()defers.store.revMu.RUnlock()curRev:=s.store.currentRevcompactionRev:=s.store.compactMainRevwg,minRev:=s.unsynced.choose(maxWatchersPerSync,curRev,compactionRev)minBytes,maxBytes:=newRevBytes(),newRevBytes()revToBytes(revision{main:minRev},minBytes)revToBytes(revision{main:curRev+1},maxBytes)tx:=s.store.b.ReadTx()tx.RLock()revs,vs:=tx.UnsafeRange(schema.Key,minBytes,maxBytes,0)evs:=kvsToEvents(s.store.lg,wg,revs,vs)tx.RUnlock()victims:=make(watcherBatch)wb:=newWatcherBatch(wg,evs)forw:=rangewg.watchers{w.minRev=curRev+1eb,ok:=wb[w]if!ok{s.synced.add(w)s.unsynced.delete(w)continue}ifeb.moreRev!=0{w.minRev=eb.moreRev}ifw.send(WatchResponse{WatchID:w.id,Events:eb.evs,Revision:curRev}){pendingEventsGauge.Add(float64(len(eb.evs)))}else{w.victim=true}ifw.victim{victims[w]=eb}else{ifeb.moreRev!=0{// stay unsynced; more to readcontinue}s.synced.add(w)}s.unsynced.delete(w)}s.addVictim(victims)vsz:=0for_,v:=ranges.victims{vsz+=len(v)}slowWatcherGauge.Set(float64(s.unsynced.size()+vsz))returns.unsynced.size()}
// server/storage/mvcc/watchable_store.go 243行func(s*watchableStore)syncVictimsLoop(){defers.wg.Done()for{fors.moveVictims()!=0{// try to update all victim watchers}s.mu.RLock()isEmpty:=len(s.victims)==0s.mu.RUnlock()vartickc<-chantime.Timeif!isEmpty{tickc=time.After(10*time.Millisecond)}select{case<-tickc:case<-s.victimc:case<-s.stopc:return}}}
// pkg/adt/interval_tree.go 185行typeIntervalTreeinterface{// Insert adds a node with the given interval into the tree.Insert(ivlInterval,valinterface{})// Delete removes the node with the given interval from the tree, returning// true if a node is in fact removed.Delete(ivlInterval)bool// Len gives the number of elements in the tree.Len()int// Height is the number of levels in the tree; one node has height 1.Height()int// MaxHeight is the expected maximum tree height given the number of nodes.MaxHeight()int// Visit calls a visitor function on every tree node intersecting the given interval.// It will visit each interval [x, y) in ascending order sorted on x.Visit(ivlInterval,ivvIntervalVisitor)// Find gets the IntervalValue for the node matching the given intervalFind(ivlInterval)*IntervalValue// Intersects returns true if there is some tree node intersecting the given interval.Intersects(ivInterval)bool// Contains returns true if the interval tree's keys cover the entire given interval.Contains(ivlInterval)bool// Stab returns a slice with all elements in the tree intersecting the interval.Stab(ivInterval)[]*IntervalValue// Union merges a given interval tree into the receiver.Union(inIvtIntervalTree,ivlInterval)}