syncbase/vsync: Integrate SyncGroup syncing, and add e2e tests.

MultiPart: 2/2
Change-Id: Ie3e669bb0357fe42b8853e29dd4f08f0eac92a15
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 2fb5479..98e0e84 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -15,6 +15,7 @@
 
 import (
 	"fmt"
+	"strconv"
 	"strings"
 	"time"
 
@@ -212,28 +213,30 @@
 		version = newSyncGroupVersion()
 	}
 
+	oid := sgOID(sg.Id)
+
 	// Add the SyncGroup versioned data entry.
-	if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+	if err := setSGDataEntryByOID(ctx, tx, oid, version, sg); err != nil {
 		return err
 	}
 
-	// Add a sync log record for the SyncGroup if needed.
-	oid := sgIdKey(sg.Id)
-	logKey := ""
-	if withLog {
-		if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
-			return err
-		}
-		logKey = logRecKey(oid, servId, gen)
-	}
-
-	// Add the SyncGroup to the DAG.
 	var parents []string
 	if head, err := getHead(ctx, tx, oid); err == nil {
 		parents = []string{head}
 	} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
 		return err
 	}
+
+	// Add a sync log record for the SyncGroup if needed.
+	logKey := ""
+	if withLog {
+		if err := addSyncGroupLogRec(ctx, tx, oid, version, parents, servId, gen, pos); err != nil {
+			return err
+		}
+		logKey = logRecKey(oid, servId, gen)
+	}
+
+	// Add the SyncGroup to the DAG.
 	if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
 		return err
 	}
@@ -241,12 +244,12 @@
 }
 
 // addSyncGroupLogRec adds a new local log record for a SyncGroup.
-func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
-	oid := sgIdKey(gid)
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, oid, version string, parents []string, servId, gen, pos uint64) error {
 	rec := &localLogRec{
 		Metadata: interfaces.LogRecMetadata{
 			ObjId:   oid,
 			CurVers: version,
+			Parents: parents,
 			Delete:  false,
 			UpdTime: watchable.GetStoreTime(ctx, tx),
 			Id:      servId,
@@ -267,7 +270,7 @@
 
 // getSyncGroupVersion retrieves the current version of the SyncGroup.
 func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
-	return getHead(ctx, st, sgIdKey(gid))
+	return getHead(ctx, st, sgOID(gid))
 }
 
 // getSyncGroupById retrieves the SyncGroup given its ID.
@@ -321,7 +324,7 @@
 	// nodes).  This is done separately from pruning the DAG nodes because
 	// some nodes may have no log record pointing back to the SyncGroup data
 	// entries (loose coupling to support the pending SyncGroup state).
-	oid := sgIdKey(gid)
+	oid := sgOID(gid)
 	err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
 		return delSGDataEntry(ctx, tx, gid, v)
 	})
@@ -491,21 +494,11 @@
 	// Note: as with other syncable objects, the DAG "heads" table contains
 	// a reference to the current SyncGroup version, and the DAG "nodes"
 	// table tracks its history of mutations.
-	// TODO(rdaoud): change the data key prefix to use the SG OID instead
-	// of its ID, to be similar to the versioned user data keys.  The OID
-	// would use another SG-data prefix: "$sync:sgd:<gid>" and the data
-	// entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
 	sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
 	sgIdKeyPrefix   = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
-	sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
+	sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgDataPrefix)
 )
 
-// sgIdStr returns the SyncGroup ID in string format.
-// TODO(rdaoud): delete when the SG ID becomes a string throughout.
-func sgIdStr(gid interfaces.GroupId) string {
-	return fmt.Sprintf("%d", uint64(gid))
-}
-
 // sgNameKey returns the key used to access the SyncGroup name entry.
 func sgNameKey(name string) string {
 	return util.JoinKeyParts(sgNameKeyPrefix, name)
@@ -516,11 +509,35 @@
 	return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
 }
 
+// sgOID converts a group id into an oid string.
+func sgOID(gid interfaces.GroupId) string {
+	return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgID is the inverse of sgOID and converts an oid string into a group id.
+func sgID(oid string) (interfaces.GroupId, error) {
+	parts := util.SplitKeyParts(oid)
+	if len(parts) != 3 {
+		return 0, fmt.Errorf("invalid sgoid %s", oid)
+	}
+
+	id, err := strconv.ParseUint(parts[2], 10, 64)
+	if err != nil {
+		return 0, err
+	}
+	return interfaces.GroupId(id), nil
+}
+
 // sgDataKey returns the key used to access a version of the SyncGroup data.
 func sgDataKey(gid interfaces.GroupId, version string) string {
 	return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
 }
 
+// sgDataKeyByOID returns the key used to access a version of the SyncGroup data.
+func sgDataKeyByOID(oid, version string) string {
+	return util.JoinKeyParts(oid, version)
+}
+
 // splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
 func splitSgNameKey(ctx *context.T, key string) (string, error) {
 	// Note that the actual SyncGroup name may contain ":" as a separator.
@@ -558,9 +575,9 @@
 	return util.Put(ctx, tx, sgIdKey(gid), state)
 }
 
-// setSGDataEntry stores the SyncGroup versioned data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
-	return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+// setSGDataEntryByOID stores the SyncGroup versioned data entry.
+func setSGDataEntryByOID(ctx *context.T, tx store.Transaction, sgoid, version string, sg *interfaces.SyncGroup) error {
+	return util.Put(ctx, tx, sgDataKeyByOID(sgoid, version), sg)
 }
 
 // getSGNameEntry retrieves the SyncGroup ID for a given name.
@@ -590,6 +607,15 @@
 	return &sg, nil
 }
 
+// getSGDataEntryByOID retrieves the SyncGroup data for a given group OID and version.
+func getSGDataEntryByOID(ctx *context.T, st store.StoreReader, sgoid string, version string) (*interfaces.SyncGroup, error) {
+	var sg interfaces.SyncGroup
+	if err := util.Get(ctx, st, sgDataKeyByOID(sgoid, version), &sg); err != nil {
+		return nil, err
+	}
+	return &sg, nil
+}
+
 // delSGNameEntry deletes the SyncGroup name entry.
 func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
 	return util.Delete(ctx, tx, sgNameKey(name))
@@ -642,8 +668,7 @@
 		// has Admin privilege.
 
 		// Reserve a log generation and position counts for the new SyncGroup.
-		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
-		gen, pos := uint64(1), uint64(1)
+		gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
 
 		if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
 			return err
@@ -657,7 +682,7 @@
 		return err
 	}
 
-	ss.initSyncStateInMem(ctx, appName, dbName, gid)
+	ss.initSyncStateInMem(ctx, appName, dbName, sgOID(gid))
 
 	// Local SG create succeeded. Publish the SG at the chosen server, or if
 	// that fails, enqueue it for later publish retries.
@@ -780,7 +805,7 @@
 		return nullSpec, err
 	}
 
-	ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
+	ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sgOID(sg2.Id))
 
 	// Publish at the chosen mount table and in the neighborhood.
 	sd.publishInMountTables(ctx, call, sg2.Spec)
@@ -878,7 +903,7 @@
 	}
 
 	ss := sd.sync.(*syncService)
-	//appName, dbName := sd.db.App().Name(), sd.db.Name()
+	appName, dbName := sd.db.App().Name(), sd.db.Name()
 
 	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
@@ -909,8 +934,7 @@
 		}
 
 		// Reserve a log generation and position counts for the new SyncGroup.
-		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
-		gen, pos := uint64(1), uint64(1)
+		gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(sg.Id), 1)
 
 		// TODO(hpucha): Check SyncGroup ACL.
 
@@ -935,7 +959,7 @@
 func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
 	st := sd.db.St()
 	ss := sd.sync.(*syncService)
-	//appName, dbName := sd.db.App().Name(), sd.db.Name()
+	appName, dbName := sd.db.App().Name(), sd.db.Name()
 
 	gid, err := getSyncGroupId(ctx, st, sgName)
 	if err != nil {
@@ -959,11 +983,18 @@
 	// and joiner list of the SyncGroup get updated.  This is functionally
 	// correct, just not symmetrical with what happens at joiner, which
 	// receives the SyncGroup state post-join.
-	// TODO(rdaoud): send the SyncGroup genvec to the remote peer.
 	status := interfaces.SyncGroupStatusPublishRejected
 
+	sgs := sgSet{gid: struct{}{}}
+	gv, _, err := ss.copyDbGenInfo(ctx, appName, dbName, sgs)
+	if err != nil {
+		return err
+	}
+	// TODO(hpucha): Do we want to pick the head version corresponding to
+	// the local gen of the sg? It appears that it shouldn't matter.
+
 	c := interfaces.SyncClient(sgName)
-	peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
+	peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, gv[sgOID(gid)])
 
 	if err == nil {
 		status = interfaces.SyncGroupStatusRunning
@@ -1007,8 +1038,7 @@
 
 		// Reserve a log generation and position counts for the new
 		// SyncGroup version.
-		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
-		gen, pos := uint64(1), uint64(1)
+		gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
 
 		sg.Status = status
 		if status == interfaces.SyncGroupStatusRunning {
@@ -1176,12 +1206,15 @@
 	})
 
 	if err == nil {
-		s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+		s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sgOID(sg.Id))
 	}
 	return s.name, err
 }
 
 func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
+	vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: begin: %s from peer %s", sgName, joinerName)
+	defer vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: end: %s from peer %s", sgName, joinerName)
+
 	var dbSt store.Store
 	var gid interfaces.GroupId
 	var err error
@@ -1212,6 +1245,7 @@
 
 	version := newSyncGroupVersion()
 	var sg *interfaces.SyncGroup
+	var gen, pos uint64
 
 	err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
 		var err error
@@ -1235,8 +1269,7 @@
 		}
 
 		// Reserve a log generation and position counts for the new SyncGroup.
-		//gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
-		gen, pos := uint64(1), uint64(1)
+		gen, pos = s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgOID(gid), 1)
 
 		// Add to joiner list.
 		sg.Joiners[joinerName] = joinerInfo
@@ -1246,6 +1279,17 @@
 	if err != nil {
 		return nullSG, "", nullGV, err
 	}
-	// TODO(rdaoud): return the SyncGroup genvec
-	return *sg, version, nullGV, nil
+
+	sgs := sgSet{gid: struct{}{}}
+	gv, _, err := s.copyDbGenInfo(ctx, stAppName, stDbName, sgs)
+	if err != nil {
+		return nullSG, "", nullGV, err
+	}
+	// The retrieved genvector does not contain the mutation that adds the
+	// joiner to the list since initiator is the one checkpointing the
+	// generations. Add that generation to this genvector.
+	gv[sgOID(gid)][s.id] = gen
+
+	vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: returning: sg %v, vers %v, genvec %v", sg, version, gv[sgOID(gid)])
+	return *sg, version, gv[sgOID(gid)], nil
 }