syncbase/vsync: Integrate log and gen vector functionality (Part 2/2).
Support for diffing genvectors of nested prefixes. Required by responder when
sending deltas to initiator.
Change-Id: I57d88cb3a2b3666bde998bf8d9c1fb6f93ca1669
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index dce51f2..fed4c9a 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -31,13 +31,17 @@
// device.
import (
+ "container/heap"
"fmt"
+ "sort"
+ "strings"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
+ "v.io/v23/rpc"
"v.io/v23/verror"
)
@@ -45,7 +49,7 @@
type dbSyncStateInMem struct {
gen uint64
pos uint64
- genvec interfaces.GenVector
+ genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
}
// initSync initializes the sync module during startup. It scans all the
@@ -132,6 +136,28 @@
return gen, pos
}
+// getDbGenInfo returns a copy of the current generation information of the Database.
+func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := appDbName(appName, dbName)
+ ds, ok := s.syncState[name]
+ if !ok {
+ return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
+ }
+ // Make a copy of the genvec.
+ genvec := make(interfaces.GenVector)
+ for p, dspgv := range ds.genvec {
+ pgv := make(interfaces.PrefixGenVector)
+ for id, gen := range dspgv {
+ pgv[id] = gen
+ }
+ genvec[p] = pgv
+ }
+ return genvec, ds.gen, nil
+}
+
// appDbName combines the app and db names to return a globally unique name for
// a Database. This relies on the fact that the app name is globally unique and
// the db name is unique within the scope of the app.
@@ -160,7 +186,7 @@
func getDbSyncState(ctx *context.T, st store.StoreReader) (*dbSyncState, error) {
var ds dbSyncState
if err := util.GetObject(st, dbSyncStateKey(), &ds); err != nil {
- return nil, verror.New(verror.ErrInternal, ctx, err)
+ return nil, translateError(ctx, err, dbSyncStateKey())
}
return &ds, nil
}
@@ -201,7 +227,7 @@
func getLogRec(ctx *context.T, st store.StoreReader, id, gen uint64) (*localLogRec, error) {
var rec localLogRec
if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
- return nil, verror.New(verror.ErrInternal, ctx, err)
+ return nil, translateError(ctx, err, logRecKey(id, gen))
}
return &rec, nil
}
@@ -215,3 +241,322 @@
}
return nil
}
+
+////////////////////////////////////////////////////////////
+// Genvector-related utilities.
+
+// sendDeltasPerDatabase sends to an initiator all the missing generations
+// corresponding to the prefixes requested for this Database, and a genvector
+// summarizing the knowledge transferred from the responder to the
+// initiator. This happens in two phases:
+//
+// In the first phase, for a given set of nested prefixes from the initiator,
+// the shortest prefix in that set is extracted. The initiator's prefix
+// genvector for this shortest prefix represents the lower bound on its
+// knowledge for the entire set of nested prefixes. This prefix genvector
+// (representing the lower bound) is diffed with all the responder prefix
+// genvectors corresponding to same or deeper prefixes compared to the initiator
+// prefix. This diff produces a bound on the missing knowledge. For example, say
+// the initiator is interested in prefixes {foo, foobar}, where each prefix is
+// associated with a prefix genvector. Since the initiator strictly has as much
+// or more knowledge for prefix "foobar" as it has for prefix "foo", "foo"'s
+// prefix genvector is chosen as the lower bound for the initiator's
+// knowledge. Similarly, say the responder has knowledge on prefixes {f,
+// foobarX, foobarY, bar}. The responder diffs the prefix genvectors for
+// prefixes f, foobarX and foobarY with the initiator's prefix genvector to
+// compute a bound on missing generations (all responder's prefixes that match
+// "foo". Note that since the responder doesn't have a prefix genvector at
+// "foo", its knowledge at "f" is applicable to "foo").
+//
+// Since the first phase outputs an aggressive calculation of missing
+// generations containing more generation entries than strictly needed by the
+// initiator, in the second phase, each missing generation is sent to the
+// initiator only if the initiator is eligible for it and is not aware of
+// it. The generations are sent to the initiator in the same order as the
+// responder learned them so that the initiator can reconstruct the DAG for the
+// objects by learning older nodes first.
+func (s *syncService) sendDeltasPerDatabase(ctx *context.T, call rpc.ServerCall, appName, dbName string, initVec interfaces.GenVector, stream logRecStream) (interfaces.GenVector, error) {
+ // Phase 1 of sendDeltas. diff contains the bound on the generations
+ // missing from the initiator per device.
+ diff, outVec, err := s.computeDeltaBound(ctx, appName, dbName, initVec)
+ if err != nil {
+ return nil, err
+ }
+
+ // Phase 2 of sendDeltas: Process the diff, filtering out records that
+ // are not needed, and send the remainder on the wire ordered.
+ st, err := s.getDbStore(ctx, call, appName, dbName)
+ if err != nil {
+ return nil, err
+ }
+
+ // We now visit every log record in the generation range as obtained
+ // from phase 1 in their log order. We use a heap to incrementally sort
+ // the log records as per their position in the log.
+ //
+ // Init the min heap, one entry per device in the diff.
+ mh := make(minHeap, 0, len(diff))
+ for dev, r := range diff {
+ r.cur = r.min
+ rec, err := getNextLogRec(ctx, st, dev, r)
+ if err != nil {
+ return nil, err
+ }
+ if rec != nil {
+ mh = append(mh, rec)
+ } else {
+ delete(diff, dev)
+ }
+ }
+ heap.Init(&mh)
+
+ // Process the log records in order.
+ initPfxs := extractAndSortPrefixes(initVec)
+
+ for mh.Len() > 0 {
+ rec := heap.Pop(&mh).(*localLogRec)
+
+ if !filterLogRec(rec, initVec, initPfxs) {
+ // Send on the wire.
+ wireRec := interfaces.LogRec{Metadata: rec.Metadata}
+ // TODO(hpucha): Hash out this fake stream stuff when
+ // defining the RPC and the rest of the responder.
+ stream.Send(wireRec)
+ }
+
+ // Add a new record from the same device if not done.
+ dev := rec.Metadata.Id
+ rec, err := getNextLogRec(ctx, st, dev, diff[dev])
+ if err != nil {
+ return nil, err
+ }
+ if rec != nil {
+ heap.Push(&mh, rec)
+ } else {
+ delete(diff, dev)
+ }
+ }
+
+ return outVec, nil
+}
+
+// computeDeltaBound computes the bound on missing generations across all
+// requested prefixes (phase 1 of sendDeltas).
+func (s *syncService) computeDeltaBound(ctx *context.T, appName, dbName string, initVec interfaces.GenVector) (genRangeVector, interfaces.GenVector, error) {
+ respVec, respGen, err := s.getDbGenInfo(ctx, appName, dbName)
+ if err != nil {
+ return nil, nil, err
+ }
+ respPfxs := extractAndSortPrefixes(respVec)
+ initPfxs := extractAndSortPrefixes(initVec)
+ if len(initPfxs) == 0 {
+ return nil, nil, verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
+ }
+
+ outVec := make(interfaces.GenVector)
+ diff := make(genRangeVector)
+ pfx := initPfxs[0]
+
+ for _, p := range initPfxs {
+ if strings.HasPrefix(p, pfx) && p != pfx {
+ continue
+ }
+
+ // Process this prefix as this is the start of a new set of
+ // nested prefixes.
+ pfx = p
+
+ // Lower bound on initiator's knowledge for this prefix set.
+ initpgv := initVec[pfx]
+
+ // Find the relevant responder prefixes and add the corresponding knowledge.
+ var respgv interfaces.PrefixGenVector
+ var rpStart string
+ for _, rp := range respPfxs {
+ if !strings.HasPrefix(rp, pfx) && !strings.HasPrefix(pfx, rp) {
+ // No relationship with pfx.
+ continue
+ }
+
+ if strings.HasPrefix(pfx, rp) {
+ // If rp is a prefix of pfx, remember it because
+ // it may be a potential starting point for the
+ // responder's knowledge. The actual starting
+ // point is the deepest prefix where rp is a
+ // prefix of pfx.
+ //
+ // Say the initiator is looking for "foo", and
+ // the responder has knowledge for "f" and "fo",
+ // the responder's starting point will be the
+ // prefix genvector for "fo". Similarly, if the
+ // responder has knowledge for "foo", the
+ // starting point will be the prefix genvector
+ // for "foo".
+ rpStart = rp
+ } else {
+ // If pfx is a prefix of rp, this knowledge must
+ // be definitely sent to the initiator. Diff the
+ // prefix genvectors to adjust the delta bound and
+ // include in outVec.
+ respgv = respVec[rp]
+ // Add local generation information to
+ // responder's genvec and returned genvec.
+ respgv[s.id] = respGen
+ s.diffPrefixGenVectors(respgv, initpgv, diff)
+ outVec[rp] = respgv
+ }
+ }
+
+ // Deal with the starting point.
+ if rpStart == "" {
+ // No matching prefixes for pfx were found.
+ respgv = make(interfaces.PrefixGenVector)
+ } else {
+ respgv = respVec[rpStart]
+ }
+ respgv[s.id] = respGen
+ s.diffPrefixGenVectors(respgv, initpgv, diff)
+ outVec[pfx] = respgv
+ }
+
+ return diff, outVec, nil
+}
+
+// genRange represents a range of generations (min and max inclusive).
+type genRange struct {
+ min uint64
+ max uint64
+ cur uint64
+}
+
+type genRangeVector map[uint64]*genRange
+
+// diffPrefixGenVectors diffs two generation vectors, belonging to the responder
+// and the initiator, and updates the range of generations per device known to
+// the responder but not known to the initiator. "gens" (generation range) is
+// passed in as an input argument so that it can be incrementally updated as the
+// range of missing generations grows when different responder prefix genvectors
+// are used to compute the diff.
+//
+// For example: Generation vector for responder is say RVec = {A:10, B:5, C:1},
+// Generation vector for initiator is say IVec = {A:5, B:10, D:2}. Diffing these
+// two vectors returns: {A:[6-10], C:[1-1]}.
+//
+// TODO(hpucha): Add reclaimVec for GCing.
+func (s *syncService) diffPrefixGenVectors(respPVec, initPVec interfaces.PrefixGenVector, gens genRangeVector) {
+ // Compute missing generations for devices that are in both initiator's and responder's vectors.
+ for devid, gen := range initPVec {
+ rgen, ok := respPVec[devid]
+ // Skip since responder doesn't know of this device.
+ if ok {
+ updateDevRange(devid, rgen, gen, gens)
+ }
+ }
+
+ // Compute missing generations for devices not in initiator's vector but in responder's vector.
+ for devid, rgen := range respPVec {
+ if _, ok := initPVec[devid]; !ok {
+ updateDevRange(devid, rgen, 0, gens)
+ }
+ }
+}
+
+func updateDevRange(devid, rgen, gen uint64, gens genRangeVector) {
+ if gen < rgen {
+ // Need to include all generations in the interval [gen+1,rgen], gen+1 and rgen inclusive.
+ if r, ok := gens[devid]; !ok {
+ gens[devid] = &genRange{min: gen + 1, max: rgen}
+ } else {
+ if gen+1 < r.min {
+ r.min = gen + 1
+ }
+ if rgen > r.max {
+ r.max = rgen
+ }
+ }
+ }
+}
+
+func extractAndSortPrefixes(vec interfaces.GenVector) []string {
+ pfxs := make([]string, len(vec))
+ i := 0
+ for p := range vec {
+ pfxs[i] = p
+ i++
+ }
+ sort.Strings(pfxs)
+ return pfxs
+}
+
+// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
+// loop.
+func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
+ for i := r.cur; i <= r.max; i++ {
+ rec, err := getLogRec(ctx, sn, dev, i)
+ if err == nil {
+ r.cur = i + 1
+ return rec, nil
+ }
+ if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ return nil, err
+ }
+ }
+ return nil, nil
+}
+
+// Note: initPfxs is sorted.
+func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
+ filter := true
+
+ var maxGen uint64
+ for _, p := range initPfxs {
+ if strings.HasPrefix(rec.Metadata.ObjId, p) {
+ // Do not filter. Initiator is interested in this
+ // prefix.
+ filter = false
+
+ // Track if the initiator knows of this record.
+ gen := initVec[p][rec.Metadata.Id]
+ if maxGen < gen {
+ maxGen = gen
+ }
+ }
+ }
+
+ // Filter this record if the initiator already has it.
+ if maxGen >= rec.Metadata.Gen {
+ return true
+ }
+
+ return filter
+}
+
+// A minHeap implements heap.Interface and holds local log records.
+type minHeap []*localLogRec
+
+func (mh minHeap) Len() int { return len(mh) }
+
+func (mh minHeap) Less(i, j int) bool {
+ return mh[i].Pos < mh[j].Pos
+}
+
+func (mh minHeap) Swap(i, j int) {
+ mh[i], mh[j] = mh[j], mh[i]
+}
+
+func (mh *minHeap) Push(x interface{}) {
+ item := x.(*localLogRec)
+ *mh = append(*mh, item)
+}
+
+func (mh *minHeap) Pop() interface{} {
+ old := *mh
+ n := len(old)
+ item := old[n-1]
+ *mh = old[0 : n-1]
+ return item
+}
+
+type logRecStream interface {
+ Send(interfaces.LogRec)
+}