syncbase/vsync: Initiator module.
Change-Id: I3dfc95ee8f9a6cd6a40558bf1dfab3a2282c3e3e
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index fed4c9a..f025327 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -34,12 +34,12 @@
"container/heap"
"fmt"
"sort"
+ "strconv"
"strings"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
-
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
@@ -47,9 +47,11 @@
// dbSyncStateInMem represents the in-memory sync state of a Database.
type dbSyncStateInMem struct {
- gen uint64
- pos uint64
- genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
+ gen uint64
+ pos uint64
+
+ ckPtGen uint64
+ genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
}
// initSync initializes the sync module during startup. It scans all the
@@ -136,6 +138,44 @@
return gen, pos
}
+// checkPtLocalGen freezes the local generation number for the responder's use.
+func (s *syncService) checkPtLocalGen(ctx *context.T, appName, dbName string) error {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := appDbName(appName, dbName)
+ ds, ok := s.syncState[name]
+ if !ok {
+ return verror.New(verror.ErrInternal, ctx, "db state not found", name)
+ }
+
+ ds.ckPtGen = ds.gen
+ return nil
+}
+
+// getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
+func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := appDbName(appName, dbName)
+ ds, ok := s.syncState[name]
+ if !ok {
+ return nil, verror.New(verror.ErrInternal, ctx, "db state not found", name)
+ }
+
+ dsCopy := &dbSyncStateInMem{
+ gen: ds.gen,
+ pos: ds.pos,
+ ckPtGen: ds.ckPtGen,
+ }
+
+ // Make a copy of the genvec.
+ dsCopy.genvec = copyGenVec(ds.genvec)
+
+ return dsCopy, nil
+}
+
// getDbGenInfo returns a copy of the current generation information of the Database.
func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
s.syncStateLock.Lock()
@@ -146,16 +186,33 @@
if !ok {
return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
}
+
// Make a copy of the genvec.
- genvec := make(interfaces.GenVector)
- for p, dspgv := range ds.genvec {
- pgv := make(interfaces.PrefixGenVector)
- for id, gen := range dspgv {
- pgv[id] = gen
- }
- genvec[p] = pgv
+ genvec := copyGenVec(ds.genvec)
+
+ // Add local generation information to the genvec.
+ for _, gv := range genvec {
+ gv[s.id] = ds.ckPtGen
}
- return genvec, ds.gen, nil
+
+ return genvec, ds.ckPtGen, nil
+}
+
+// putDbGenInfoRemote puts the current remote generation information of the Database.
+func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, genvec interfaces.GenVector) error {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := appDbName(appName, dbName)
+ ds, ok := s.syncState[name]
+ if !ok {
+ return verror.New(verror.ErrInternal, ctx, "db state not found", name)
+ }
+
+ // Make a copy of the genvec.
+ ds.genvec = copyGenVec(genvec)
+
+ return nil
}
// appDbName combines the app and db names to return a globally unique name for
@@ -165,6 +222,28 @@
return util.JoinKeyParts(appName, dbName)
}
+// splitAppDbName is the inverse of appDbName and returns app and db name from a
+// globally unique name for a Database.
+func splitAppDbName(ctx *context.T, name string) (string, string, error) {
+ parts := util.SplitKeyParts(name)
+ if len(parts) != 2 {
+ return "", "", verror.New(verror.ErrInternal, ctx, "invalid appDbName", name)
+ }
+ return parts[0], parts[1], nil
+}
+
+func copyGenVec(in interfaces.GenVector) interfaces.GenVector {
+ genvec := make(interfaces.GenVector)
+ for p, inpgv := range in {
+ pgv := make(interfaces.PrefixGenVector)
+ for id, gen := range inpgv {
+ pgv[id] = gen
+ }
+ genvec[p] = pgv
+ }
+ return genvec
+}
+
////////////////////////////////////////////////////////////
// Low-level utility functions to access sync state.
@@ -204,6 +283,24 @@
return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen))
}
+// splitLogRecKey is the inverse of logRecKey and returns device id and generation number.
+func splitLogRecKey(ctx *context.T, key string) (uint64, uint64, error) {
+ parts := util.SplitKeyParts(key)
+ verr := verror.New(verror.ErrInternal, ctx, "invalid logreckey", key)
+ if len(parts) != 4 {
+ return 0, 0, verr
+ }
+ id, err := strconv.ParseUint(parts[2], 10, 64)
+ if err != nil {
+ return 0, 0, verr
+ }
+ gen, err := strconv.ParseUint(parts[3], 10, 64)
+ if err != nil {
+ return 0, 0, verr
+ }
+ return id, gen, nil
+}
+
// hasLogRec returns true if the log record for (devid, gen) exists.
func hasLogRec(st store.StoreReader, id, gen uint64) bool {
// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
@@ -399,9 +496,6 @@
// prefix genvectors to adjust the delta bound and
// include in outVec.
respgv = respVec[rp]
- // Add local generation information to
- // responder's genvec and returned genvec.
- respgv[s.id] = respGen
s.diffPrefixGenVectors(respgv, initpgv, diff)
outVec[rp] = respgv
}
@@ -411,10 +505,10 @@
if rpStart == "" {
// No matching prefixes for pfx were found.
respgv = make(interfaces.PrefixGenVector)
+ respgv[s.id] = respGen
} else {
respgv = respVec[rpStart]
}
- respgv[s.id] = respGen
s.diffPrefixGenVectors(respgv, initpgv, diff)
outVec[pfx] = respgv
}