syncbase/vsync: Initiator module.

Change-Id: I3dfc95ee8f9a6cd6a40558bf1dfab3a2282c3e3e
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index fed4c9a..f025327 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -34,12 +34,12 @@
 	"container/heap"
 	"fmt"
 	"sort"
+	"strconv"
 	"strings"
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
-
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/verror"
@@ -47,9 +47,11 @@
 
 // dbSyncStateInMem represents the in-memory sync state of a Database.
 type dbSyncStateInMem struct {
-	gen    uint64
-	pos    uint64
-	genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
+	gen uint64
+	pos uint64
+
+	ckPtGen uint64
+	genvec  interfaces.GenVector // Note: Generation vector contains state from remote devices only.
 }
 
 // initSync initializes the sync module during startup. It scans all the
@@ -136,6 +138,44 @@
 	return gen, pos
 }
 
+// checkPtLocalGen freezes the local generation number for the responder's use.
+func (s *syncService) checkPtLocalGen(ctx *context.T, appName, dbName string) error {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	name := appDbName(appName, dbName)
+	ds, ok := s.syncState[name]
+	if !ok {
+		return verror.New(verror.ErrInternal, ctx, "db state not found", name)
+	}
+
+	ds.ckPtGen = ds.gen
+	return nil
+}
+
+// getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
+func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	name := appDbName(appName, dbName)
+	ds, ok := s.syncState[name]
+	if !ok {
+		return nil, verror.New(verror.ErrInternal, ctx, "db state not found", name)
+	}
+
+	dsCopy := &dbSyncStateInMem{
+		gen:     ds.gen,
+		pos:     ds.pos,
+		ckPtGen: ds.ckPtGen,
+	}
+
+	// Make a copy of the genvec.
+	dsCopy.genvec = copyGenVec(ds.genvec)
+
+	return dsCopy, nil
+}
+
 // getDbGenInfo returns a copy of the current generation information of the Database.
 func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
 	s.syncStateLock.Lock()
@@ -146,16 +186,33 @@
 	if !ok {
 		return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
 	}
+
 	// Make a copy of the genvec.
-	genvec := make(interfaces.GenVector)
-	for p, dspgv := range ds.genvec {
-		pgv := make(interfaces.PrefixGenVector)
-		for id, gen := range dspgv {
-			pgv[id] = gen
-		}
-		genvec[p] = pgv
+	genvec := copyGenVec(ds.genvec)
+
+	// Add local generation information to the genvec.
+	for _, gv := range genvec {
+		gv[s.id] = ds.ckPtGen
 	}
-	return genvec, ds.gen, nil
+
+	return genvec, ds.ckPtGen, nil
+}
+
+// putDbGenInfoRemote puts the current remote generation information of the Database.
+func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, genvec interfaces.GenVector) error {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	name := appDbName(appName, dbName)
+	ds, ok := s.syncState[name]
+	if !ok {
+		return verror.New(verror.ErrInternal, ctx, "db state not found", name)
+	}
+
+	// Make a copy of the genvec.
+	ds.genvec = copyGenVec(genvec)
+
+	return nil
 }
 
 // appDbName combines the app and db names to return a globally unique name for
@@ -165,6 +222,28 @@
 	return util.JoinKeyParts(appName, dbName)
 }
 
+// splitAppDbName is the inverse of appDbName and returns app and db name from a
+// globally unique name for a Database.
+func splitAppDbName(ctx *context.T, name string) (string, string, error) {
+	parts := util.SplitKeyParts(name)
+	if len(parts) != 2 {
+		return "", "", verror.New(verror.ErrInternal, ctx, "invalid appDbName", name)
+	}
+	return parts[0], parts[1], nil
+}
+
+func copyGenVec(in interfaces.GenVector) interfaces.GenVector {
+	genvec := make(interfaces.GenVector)
+	for p, inpgv := range in {
+		pgv := make(interfaces.PrefixGenVector)
+		for id, gen := range inpgv {
+			pgv[id] = gen
+		}
+		genvec[p] = pgv
+	}
+	return genvec
+}
+
 ////////////////////////////////////////////////////////////
 // Low-level utility functions to access sync state.
 
@@ -204,6 +283,24 @@
 	return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen))
 }
 
+// splitLogRecKey is the inverse of logRecKey and returns device id and generation number.
+func splitLogRecKey(ctx *context.T, key string) (uint64, uint64, error) {
+	parts := util.SplitKeyParts(key)
+	verr := verror.New(verror.ErrInternal, ctx, "invalid logreckey", key)
+	if len(parts) != 4 {
+		return 0, 0, verr
+	}
+	id, err := strconv.ParseUint(parts[2], 10, 64)
+	if err != nil {
+		return 0, 0, verr
+	}
+	gen, err := strconv.ParseUint(parts[3], 10, 64)
+	if err != nil {
+		return 0, 0, verr
+	}
+	return id, gen, nil
+}
+
 // hasLogRec returns true if the log record for (devid, gen) exists.
 func hasLogRec(st store.StoreReader, id, gen uint64) bool {
 	// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
@@ -399,9 +496,6 @@
 				// prefix genvectors to adjust the delta bound and
 				// include in outVec.
 				respgv = respVec[rp]
-				// Add local generation information to
-				// responder's genvec and returned genvec.
-				respgv[s.id] = respGen
 				s.diffPrefixGenVectors(respgv, initpgv, diff)
 				outVec[rp] = respgv
 			}
@@ -411,10 +505,10 @@
 		if rpStart == "" {
 			// No matching prefixes for pfx were found.
 			respgv = make(interfaces.PrefixGenVector)
+			respgv[s.id] = respGen
 		} else {
 			respgv = respVec[rpStart]
 		}
-		respgv[s.id] = respGen
 		s.diffPrefixGenVectors(respgv, initpgv, diff)
 		outVec[pfx] = respgv
 	}