// server/storage/mvcc/kvstore_txn.go 182 行func(tw*storeTxnWrite)put(key,value[]byte,leaseIDlease.LeaseID){rev:=tw.beginRev+1c:=revoldLease:=lease.NoLease// 1.查询keyIndex_,created,ver,err:=tw.s.kvindex.Get(key,rev)iferr==nil{c=created.mainoldLease=tw.s.le.GetLease(lease.LeaseItem{Key:string(key)})}ibytes:=newRevBytes()idxRev:=revision{main:rev,sub:int64(len(tw.changes))}revToBytes(idxRev,ibytes)ver=ver+1kv:=mvccpb.KeyValue{Key:key,Value:value,CreateRevision:c,ModRevision:rev,Version:ver,Lease:int64(leaseID),}d,err:=kv.Marshal()iferr!=nil{tw.storeTxnRead.s.lg.Fatal("failed to marshal mvccpb.KeyValue",zap.Error(err),)}// 2.写blotdbtw.tx.UnsafeSeqPut(schema.Key,ibytes,d)// 3.更新keyIndextw.s.kvindex.Put(key,idxRev)tw.changes=append(tw.changes,kv)// lease 相关更新// 若存在旧lease则移除ifoldLease!=lease.NoLease{iftw.s.le==nil{panic("no lessor to detach lease")}err=tw.s.le.Detach(oldLease,[]lease.LeaseItem{{Key:string(key)}})iferr!=nil{tw.storeTxnRead.s.lg.Error("failed to detach old lease from a key",zap.Error(err),)}}// 若本次指定了 lease则关联上ifleaseID!=lease.NoLease{iftw.s.le==nil{panic("no lessor to attach lease")}err=tw.s.le.Attach(leaseID,[]lease.LeaseItem{{Key:string(key)}})iferr!=nil{panic("unexpected error from lease Attach")}}}
// server/storage/mvcc/kvstore_txn.go 127行func(tr*storeTxnRead)rangeKeys(ctxcontext.Context,key,end[]byte,curRevint64,roRangeOptions)(*RangeResult,error){rev:=ro.Revifrev>curRev{return&RangeResult{KVs:nil,Count:-1,Rev:curRev},ErrFutureRev}// 若没指定或指定了错误的版本号就会默认查最新的一个版本ifrev<=0{rev=curRev}// 1.查找 revisions // 这里如果当前查询的版本号比compactMainRev小说明这个版本已经被回收了 直接返回错误ifrev<tr.s.compactMainRev{return&RangeResult{KVs:nil,Count:-1,Rev:0},ErrCompacted}ifro.Count{total:=tr.s.kvindex.CountRevisions(key,end,rev)tr.trace.Step("count revisions from in-memory index tree")return&RangeResult{KVs:nil,Count:total,Rev:curRev},nil}// 否则就查询比当前版本号大的所有版本号revpairs,total:=tr.s.kvindex.Revisions(key,end,rev,int(ro.Limit))tr.trace.Step("range keys from in-memory index tree")iflen(revpairs)==0{return&RangeResult{KVs:nil,Count:total,Rev:curRev},nil}limit:=int(ro.Limit)iflimit<=0||limit>len(revpairs){limit=len(revpairs)}kvs:=make([]mvccpb.KeyValue,limit)revBytes:=newRevBytes()// 2.查询 blotdb// 然后根据上面查到的版本号循环去blotdb中查找对应valuefori,revpair:=rangerevpairs[:len(kvs)]{select{case<-ctx.Done():returnnil,ctx.Err()default:}revToBytes(revpair,revBytes)_,vs:=tr.tx.UnsafeRange(schema.Key,revBytes,nil,0)iflen(vs)!=1{tr.s.lg.Fatal("range failed to find revision pair",zap.Int64("revision-main",revpair.main),zap.Int64("revision-sub",revpair.sub),)}iferr:=kvs[i].Unmarshal(vs[0]);err!=nil{tr.s.lg.Fatal("failed to unmarshal mvccpb.KeyValue",zap.Error(err),)}}tr.trace.Step("range keys from bolt db")return&RangeResult{KVs:kvs,Count:total,Rev:curRev},nil}
// server/storage/mvcc/kvstore_txn.go 262行func(tw*storeTxnWrite)delete(key[]byte){ibytes:=newRevBytes()idxRev:=revision{main:tw.beginRev+1,sub:int64(len(tw.changes))}revToBytes(idxRev,ibytes)// 1.标记删除 blotdb// 在 blotdb 的 key上追加tombstone标识(标记删除)ibytes=appendMarkTombstone(tw.storeTxnRead.s.lg,ibytes)kv:=mvccpb.KeyValue{Key:key}d,err:=kv.Marshal()iferr!=nil{tw.storeTxnRead.s.lg.Fatal("failed to marshal mvccpb.KeyValue",zap.Error(err),)}// 因为是标记删除,所以这里调用的是 put而不是deletetw.tx.UnsafeSeqPut(schema.Key,ibytes,d)// 2.处理keyIndexerr=tw.s.kvindex.Tombstone(key,idxRev)iferr!=nil{tw.storeTxnRead.s.lg.Fatal("failed to tombstone an existing key",zap.String("key",string(key)),zap.Error(err),)}tw.changes=append(tw.changes,kv)// 3.如果还有关联的 lease,则移除关联item:=lease.LeaseItem{Key:string(key)}leaseID:=tw.s.le.GetLease(item)ifleaseID!=lease.NoLease{err=tw.s.le.Detach(leaseID,[]lease.LeaseItem{item})iferr!=nil{tw.storeTxnRead.s.lg.Error("failed to detach old lease from a key",zap.Error(err),)}}}