syncbase/vsync: Integrate SyncGroup syncing, and add e2e tests.

MultiPart: 2/2
Change-Id: Ie3e669bb0357fe42b8853e29dd4f08f0eac92a15
diff --git a/services/syncbase/server/interfaces/sync_types.go b/services/syncbase/server/interfaces/sync_types.go
index 39732b1..a37ddc5 100644
--- a/services/syncbase/server/interfaces/sync_types.go
+++ b/services/syncbase/server/interfaces/sync_types.go
@@ -19,3 +19,49 @@
 	}
 	return out
 }
+
+// Compare returns an integer comparing two prefix generation vectors. The
+// result will be 0 if a==b, -1 if a < b, +1 if a > b and +2 if a and b are
+// uncomparable.
+func (a PrefixGenVector) Compare(b PrefixGenVector) int {
+	res := -2
+
+	if len(a) == 0 && len(b) == 0 {
+		return 0
+	}
+
+	for aid, agen := range a {
+		bgen, ok := b[aid]
+
+		resCur := 0
+		if agen > bgen || !ok {
+			resCur = 1
+		} else if agen < bgen {
+			resCur = -1
+		}
+
+		if res == -2 || res == 0 {
+			// Initialize/overwrite safely with the curent result.
+			res = resCur
+		} else if res != resCur && resCur != 0 {
+			// Uncomparable, since some elements are less and others
+			// are greater.
+			return 2
+		}
+	}
+
+	for bid := range b {
+		if _, ok := a[bid]; ok {
+			continue
+		}
+
+		if res == 1 {
+			// Missing elements. So a cannot be greater than b.
+			return 2
+		}
+
+		return -1
+	}
+
+	return res
+}
diff --git a/services/syncbase/server/interfaces/sync_types_test.go b/services/syncbase/server/interfaces/sync_types_test.go
new file mode 100644
index 0000000..4386bf5
--- /dev/null
+++ b/services/syncbase/server/interfaces/sync_types_test.go
@@ -0,0 +1,105 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+import "testing"
+
+func TestPrefixGenVectorCompare(t *testing.T) {
+	tests := []struct {
+		a, b  PrefixGenVector
+		resAB int
+		resBA int
+	}{
+		{ // a = b.
+			a:     PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			b:     PrefixGenVector{10: 1, 11: 10, 12: 20, 13: 2},
+			resAB: 0,
+			resBA: 0,
+		},
+		{ // a = b.
+			a:     PrefixGenVector{},
+			b:     PrefixGenVector{},
+			resAB: 0,
+			resBA: 0,
+		},
+		{ // a > b.
+			a:     PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+			b:     PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+			resAB: 1,
+			resBA: -1,
+		},
+		{ // a > b.
+			a:     PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+			b:     PrefixGenVector{},
+			resAB: 1,
+			resBA: -1,
+		},
+		{ // a > b.
+			a:     PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+			b:     PrefixGenVector{11: 2, 12: 3, 13: 4},
+			resAB: 1,
+			resBA: -1,
+		},
+		{ // a > b.
+			a:     PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+			b:     PrefixGenVector{11: 11, 12: 2, 13: 4},
+			resAB: 1,
+			resBA: -1,
+		},
+		{ // a > b.
+			a:     PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+			b:     PrefixGenVector{11: 11, 12: 12, 13: 13},
+			resAB: 1,
+			resBA: -1,
+		},
+		{ // a > b.
+			a:     PrefixGenVector{10: 1, 11: 11, 12: 12, 13: 13},
+			b:     PrefixGenVector{10: 1, 11: 2, 12: 3, 13: 4},
+			resAB: 1,
+			resBA: -1,
+		},
+		{ // a > b.
+			a:     PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+			b:     PrefixGenVector{10: 1, 11: 5, 12: 23, 13: 4},
+			resAB: 1,
+			resBA: -1,
+		},
+		{ // a > b.
+			a:     PrefixGenVector{10: 0, 11: 5, 12: 56, 13: 13},
+			b:     PrefixGenVector{11: 5, 12: 23, 13: 4},
+			resAB: 1,
+			resBA: -1,
+		},
+		{ // a != b.
+			a:     PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+			b:     PrefixGenVector{10: 56, 11: 5, 12: 23, 13: 4},
+			resAB: 2,
+			resBA: 2,
+		},
+		{ // a != b.
+			a:     PrefixGenVector{10: 38, 11: 5, 12: 56, 13: 13},
+			b:     PrefixGenVector{10: 1, 11: 50, 12: 23, 13: 4},
+			resAB: 2,
+			resBA: 2,
+		},
+		{ // a != b.
+			a:     PrefixGenVector{10: 10, 11: 11, 12: 12, 13: 13},
+			b:     PrefixGenVector{11: 11, 12: 2, 13: 4, 15: 40},
+			resAB: 2,
+			resBA: 2,
+		},
+	}
+
+	for pos, test := range tests {
+		got, want := test.a.Compare(test.b), test.resAB
+		if got != want {
+			t.Fatalf("Comparison failed for pos %d (a=%v, b=%v), got %v, want %v", pos, test.a, test.b, got, want)
+		}
+		got, want = test.b.Compare(test.a), test.resBA
+		if got != want {
+			t.Fatalf("Comparison failed for pos %d (a=%v, b=%v), got %v, want %v", pos, test.a, test.b, got, want)
+		}
+	}
+}
diff --git a/services/syncbase/testutil/v23util.go b/services/syncbase/testutil/v23util.go
index 8e370a8..42496a7 100644
--- a/services/syncbase/testutil/v23util.go
+++ b/services/syncbase/testutil/v23util.go
@@ -31,6 +31,7 @@
 		}
 		rmRootDir = true
 	}
+
 	// Start syncbased.
 	invocation := syncbased.WithStartOpts(syncbased.StartOpts().WithCustomCredentials(creds)).Start(
 		"--v23.tcp.address=127.0.0.1:0",
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 94f79e1..0727cca 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -494,6 +494,9 @@
 
 			if iSt.sg {
 				// Add the SyncGroup value to the Database.
+				if err := iSt.insertSgRecInDb(ctx, rec, v.Value.Value, tx); err != nil {
+					return err
+				}
 			} else {
 				if err := iSt.insertRecInDb(ctx, rec, v.Value.Value, tx); err != nil {
 					return err
@@ -564,6 +567,16 @@
 	return err
 }
 
+// insertSgRecInDb inserts the versioned value of a SyncGroup in the Database.
+func (iSt *initiationState) insertSgRecInDb(ctx *context.T, rec *localLogRec, valbuf []byte, tx store.Transaction) error {
+	m := rec.Metadata
+	var sg interfaces.SyncGroup
+	if err := vom.Decode(valbuf, &sg); err != nil {
+		return err
+	}
+	return setSGDataEntryByOID(ctx, tx, m.ObjId, m.CurVers, &sg)
+}
+
 // insertRecInDb inserts the versioned value in the Database.
 func (iSt *initiationState) insertRecInDb(ctx *context.T, rec *localLogRec, valbuf []byte, tx store.Transaction) error {
 	m := rec.Metadata
@@ -944,7 +957,7 @@
 			Gen:        dsInMem.data.gen,
 			CheckptGen: dsInMem.data.checkptGen,
 		},
-		Sgs:      make(map[interfaces.GroupId]localGenInfo),
+		Sgs:      make(map[string]localGenInfo),
 		GenVec:   dsInMem.genvec,
 		SgGenVec: dsInMem.sggenvec,
 	}
@@ -970,6 +983,31 @@
 		if _, ok := genvec[rpfx]; !ok {
 			genvec[rpfx] = respgv
 		}
+
+		if iSt.sg {
+			// Flip sync pending if needed in case of SyncGroup
+			// syncing. See explanation for SyncPending flag in
+			// types.vdl.
+			gid, err := sgID(rpfx)
+			if err != nil {
+				return err
+			}
+			state, err := getSGIdEntry(ctx, iSt.tx, gid)
+			if err != nil {
+				return err
+			}
+			if state.SyncPending {
+				curgv := genvec[rpfx]
+				res := curgv.Compare(state.PendingGenVec)
+				vlog.VI(4).Infof("sync: updateSyncSt:: checking join pending %v, curgv %v, res %v", state.PendingGenVec, curgv, res)
+				if res >= 0 {
+					state.SyncPending = false
+					if err := setSGIdEntry(ctx, iSt.tx, gid, state); err != nil {
+						return err
+					}
+				}
+			}
+		}
 	}
 
 	iSt.updLocal = genvec
@@ -979,8 +1017,6 @@
 		delete(pgv, iSt.config.sync.id)
 	}
 
-	// TODO(hpucha): Flip join pending if needed.
-
 	// TODO(hpucha): Add knowledge compaction.
 
 	return putDbSyncState(ctx, iSt.tx, ds)
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 7c1a045..c740d95 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -490,7 +490,7 @@
 		t.Fatalf("Mount tables are not equal config %v, spec %v", iSt.config.mtTables, sg1.Spec.MountTables)
 	}
 
-	s.initSyncStateInMem(nil, "mockapp", "mockdb", sgId1)
+	s.initSyncStateInMem(nil, "mockapp", "mockdb", sgOID(sgId1))
 
 	iSt.stream = createReplayStream(t, rfile)
 
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 0c78f52..8cdc47a 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -8,12 +8,11 @@
 	"container/heap"
 	"fmt"
 	"sort"
-	"strconv"
 	"strings"
 
 	"v.io/v23/context"
-	wire "v.io/v23/services/syncbase/nosql"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
 	"v.io/x/lib/vlog"
 	"v.io/x/ref/services/syncbase/server/interfaces"
 	"v.io/x/ref/services/syncbase/server/watchable"
@@ -69,10 +68,10 @@
 		rSt.initVec = v.Value.InitVec
 		rSt.sgIds = make(sgSet)
 		// Populate the sgids from the initvec.
-		for id := range rSt.initVec {
-			gid, err := strconv.ParseUint(id, 10, 64)
+		for oid := range rSt.initVec {
+			gid, err := sgID(oid)
 			if err != nil {
-				vlog.Fatalf("sync: newResponderState: invalid syncgroup id", gid)
+				vlog.Fatalf("sync: newResponderState: invalid syncgroup key", oid)
 			}
 			rSt.sgIds[interfaces.GroupId(gid)] = struct{}{}
 		}
@@ -119,8 +118,8 @@
 	// embedded, consider using a helper function to auto-fill it instead
 	// (see http://goo.gl/mEa4L0) but only incur that overhead when the
 	// logging level specified is enabled.
-	vlog.VI(3).Infof("sync: sendDeltasPerDatabase: %s, %s: sgids %v, genvec %v",
-		rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
+	vlog.VI(3).Infof("sync: sendDeltasPerDatabase: recvd %s, %s: sgids %v, genvec %v, sg %v",
+		rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec, rSt.sg)
 
 	// Phase 1 of sendDeltas: Authorize the initiator and respond to the
 	// caller only for the SyncGroups that allow access.
@@ -179,12 +178,6 @@
 		for _, p := range sg.Spec.Prefixes {
 			allowedPfxs[p] = struct{}{}
 		}
-
-		// Add the initiator to the SyncGroup membership if not already
-		// in it.  It is a temporary solution until SyncGroup metadata
-		// is synchronized peer to peer.
-		// TODO(rdaoud): remove this when SyncGroups are synced.
-		rSt.addInitiatorToSyncGroup(ctx, sgid)
 	}
 
 	if err != nil {
@@ -214,49 +207,9 @@
 	return nil
 }
 
-// addInitiatorToSyncGroup adds the request initiator to the membership of the
-// given SyncGroup if the initiator is not already a member.  It is a temporary
-// solution until SyncGroup metadata starts being synchronized, at which time
-// peers will learn of new members through mutations of the SyncGroup metadata
-// by the SyncGroup administrators.
-// Note: the joiner metadata is fake because the responder does not have it.
-func (rSt *responderState) addInitiatorToSyncGroup(ctx *context.T, gid interfaces.GroupId) {
-	if rSt.initiator == "" {
-		return
-	}
-
-	err := store.RunInTransaction(rSt.st, func(tx store.Transaction) error {
-		version, err := getSyncGroupVersion(ctx, tx, gid)
-		if err != nil {
-			return err
-		}
-		sg, err := getSGDataEntry(ctx, tx, gid, version)
-		if err != nil {
-			return err
-		}
-
-		// If the initiator is already a member of the SyncGroup abort
-		// the transaction with a special error code.
-		if _, ok := sg.Joiners[rSt.initiator]; ok {
-			return verror.New(verror.ErrExist, ctx, "member already in SyncGroup")
-		}
-
-		vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
-		sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
-		return setSGDataEntry(ctx, tx, gid, version, sg)
-	})
-
-	if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
-		vlog.Errorf("sync: addInitiatorToSyncGroup: initiator %s, sgid %d: %v", rSt.initiator, gid, err)
-	}
-}
-
 // sendSgDeltas computes the bound on missing generations, and sends the missing
 // log records across all requested SyncGroups (phases 2 and 3 of sendDeltas).
 func (rSt *responderState) sendSgDeltas(ctx *context.T) error {
-	vlog.VI(3).Infof("sync: sendSgDeltas: %s, %s: sgids %v, genvec %v",
-		rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
-
 	respVec, _, err := rSt.sync.copyDbGenInfo(ctx, rSt.appName, rSt.dbName, rSt.sgIds)
 	if err != nil {
 		return err
@@ -367,7 +320,7 @@
 		rSt.outVec[pfx] = respgv
 	}
 
-	vlog.VI(3).Infof("sync: computeDeltaBound: %s, %s: diff %v, outvec %v",
+	vlog.VI(3).Infof("sync: computeDataDeltas: %s, %s: diff %v, outvec %v",
 		rSt.appName, rSt.dbName, rSt.diff, rSt.outVec)
 	return nil
 }
@@ -411,7 +364,7 @@
 
 		if rSt.sg || !filterLogRec(rec, rSt.initVec, initPfxs) {
 			// Send on the wire.
-			wireRec, err := makeWireLogRec(ctx, rSt.st, rec)
+			wireRec, err := makeWireLogRec(ctx, rSt.sg, rSt.st, rec)
 			if err != nil {
 				return err
 			}
@@ -434,6 +387,8 @@
 }
 
 func (rSt *responderState) sendGenVec(ctx *context.T) error {
+	vlog.VI(3).Infof("sync: sendGenVec: sending genvec %v", rSt.outVec)
+
 	sender := rSt.call.SendStream()
 	sender.Send(interfaces.DeltaRespRespVec{rSt.outVec})
 	return nil
@@ -553,11 +508,20 @@
 
 // makeWireLogRec creates a sync log record to send on the wire from a given
 // local sync record.
-func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
+func makeWireLogRec(ctx *context.T, sg bool, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
 	// Get the object value at the required version.
 	key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
 	var value []byte
-	if !rec.Metadata.Delete {
+	if sg {
+		sg, err := getSGDataEntryByOID(ctx, st, key, version)
+		if err != nil {
+			return nil, err
+		}
+		value, err = vom.Encode(sg)
+		if err != nil {
+			return nil, err
+		}
+	} else if !rec.Metadata.Delete {
 		var err error
 		value, err = watchable.GetAtVersion(ctx, st, []byte(key), nil, []byte(version))
 		if err != nil {
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index d820d8f..953f150 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -57,7 +57,6 @@
 
 	"v.io/v23/context"
 	"v.io/v23/verror"
-	"v.io/x/lib/vlog"
 	"v.io/x/ref/services/syncbase/server/interfaces"
 	"v.io/x/ref/services/syncbase/server/util"
 	"v.io/x/ref/services/syncbase/store"
@@ -82,8 +81,11 @@
 // dbSyncStateInMem represents the in-memory sync state of a Database and all
 // its SyncGroups.
 type dbSyncStateInMem struct {
-	data *localGenInfoInMem                        // info for data.
-	sgs  map[interfaces.GroupId]*localGenInfoInMem // info for SyncGroups.
+	data *localGenInfoInMem // info for data.
+
+	// Info for SyncGroups. The key here is the SyncGroup oid of the form
+	// $sync:sgd:<group id>. More details in syncgroup.go.
+	sgs map[string]*localGenInfoInMem
 
 	// Note: Generation vector contains state from remote devices only.
 	genvec   interfaces.GenVector
@@ -94,9 +96,9 @@
 	out := &dbSyncStateInMem{}
 	out.data = in.data.deepCopy()
 
-	out.sgs = make(map[interfaces.GroupId]*localGenInfoInMem)
-	for id, info := range in.sgs {
-		out.sgs[id] = info.deepCopy()
+	out.sgs = make(map[string]*localGenInfoInMem)
+	for oid, info := range in.sgs {
+		out.sgs[oid] = info.deepCopy()
 	}
 
 	out.genvec = in.genvec.DeepCopy()
@@ -200,25 +202,22 @@
 // Note: For all the utilities below, if the sgid parameter is non-nil, the
 // operation is performed in the SyncGroup space. If nil, it is performed in the
 // data space for the Database.
-//
-// TODO(hpucha): Once GroupId is changed to string, clean up these function
-// signatures.
 
 // reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
 // positions in a Database's log. Used when local updates result in log
 // entries.
-func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) (uint64, uint64) {
-	return s.reserveGenAndPosInternal(appName, dbName, sgid, count, count)
+func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName, sgoid string, count uint64) (uint64, uint64) {
+	return s.reserveGenAndPosInternal(appName, dbName, sgoid, count, count)
 }
 
 // reservePosInDbLog reserves a chunk of log positions in a Database's log. Used
 // when remote log records are received.
-func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) uint64 {
-	_, pos := s.reserveGenAndPosInternal(appName, dbName, sgid, 0, count)
+func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName, sgoid string, count uint64) uint64 {
+	_, pos := s.reserveGenAndPosInternal(appName, dbName, sgoid, 0, count)
 	return pos
 }
 
-func (s *syncService) reserveGenAndPosInternal(appName, dbName, sgid string, genCount, posCount uint64) (uint64, uint64) {
+func (s *syncService) reserveGenAndPosInternal(appName, dbName, sgoid string, genCount, posCount uint64) (uint64, uint64) {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
@@ -227,23 +226,18 @@
 	if !ok {
 		ds = &dbSyncStateInMem{
 			data: &localGenInfoInMem{gen: 1},
-			sgs:  make(map[interfaces.GroupId]*localGenInfoInMem),
+			sgs:  make(map[string]*localGenInfoInMem),
 		}
 		s.syncState[name] = ds
 	}
 
 	var info *localGenInfoInMem
-	if sgid != "" {
-		id, err := strconv.ParseUint(sgid, 10, 64)
-		if err != nil {
-			vlog.Fatalf("sync: reserveGenAndPosInternal: invalid syncgroup id", sgid)
-		}
-
+	if sgoid != "" {
 		var ok bool
-		info, ok = ds.sgs[interfaces.GroupId(id)]
+		info, ok = ds.sgs[sgoid]
 		if !ok {
 			info = &localGenInfoInMem{gen: 1}
-			ds.sgs[interfaces.GroupId(id)] = info
+			ds.sgs[sgoid] = info
 		}
 	} else {
 		info = ds.data
@@ -273,7 +267,7 @@
 	if len(sgs) > 0 {
 		// Checkpoint requested SyncGroups.
 		for id := range sgs {
-			info, ok := ds.sgs[id]
+			info, ok := ds.sgs[sgOID(id)]
 			if !ok {
 				return verror.New(verror.ErrInternal, ctx, "sg state not found", name, id)
 			}
@@ -286,7 +280,7 @@
 }
 
 // initSyncStateInMem initializes the in memory sync state of the Database/SyncGroup if needed.
-func (s *syncService) initSyncStateInMem(ctx *context.T, appName, dbName string, sgid interfaces.GroupId) {
+func (s *syncService) initSyncStateInMem(ctx *context.T, appName, dbName string, sgoid string) {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
@@ -294,13 +288,13 @@
 	if s.syncState[name] == nil {
 		s.syncState[name] = &dbSyncStateInMem{
 			data: &localGenInfoInMem{gen: 1},
-			sgs:  make(map[interfaces.GroupId]*localGenInfoInMem),
+			sgs:  make(map[string]*localGenInfoInMem),
 		}
 	}
-	if sgid != interfaces.NoGroupId {
+	if sgoid != "" {
 		ds := s.syncState[name]
-		if _, ok := ds.sgs[sgid]; !ok {
-			ds.sgs[sgid] = &localGenInfoInMem{gen: 1}
+		if _, ok := ds.sgs[sgoid]; !ok {
+			ds.sgs[sgoid] = &localGenInfoInMem{gen: 1}
 		}
 	}
 	return
@@ -335,10 +329,10 @@
 	if len(sgs) > 0 {
 		genvec = make(interfaces.GenVector)
 		for id := range sgs {
-			sid := fmt.Sprintf("%d", id)
-			gv := ds.sggenvec[sid]
-			genvec[sid] = gv.DeepCopy()
-			genvec[sid][s.id] = ds.sgs[id].checkptGen
+			sgoid := sgOID(id)
+			gv := ds.sggenvec[sgoid]
+			genvec[sgoid] = gv.DeepCopy()
+			genvec[sgoid][s.id] = ds.sgs[sgoid].checkptGen
 		}
 	} else {
 		genvec = ds.genvec.DeepCopy()
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index 3036611..5a3b2c8 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -6,7 +6,6 @@
 
 import (
 	"reflect"
-	"strconv"
 	"testing"
 	"time"
 
@@ -40,11 +39,7 @@
 			if sgid == "" {
 				info = s.syncState[name].data
 			} else {
-				id, err := strconv.ParseUint(sgid, 10, 64)
-				if err != nil {
-					t.Fatalf("reserveGenAndPosInternal failed, invalid sgid %v", sgid)
-				}
-				info = s.syncState[name].sgs[interfaces.GroupId(id)]
+				info = s.syncState[name].sgs[sgid]
 			}
 			if info.gen != wantGen || info.pos != wantPos {
 				t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", info.gen, wantGen, info.pos, wantPos)
@@ -74,9 +69,9 @@
 			100: 200, 300: 400, 500: 600,
 		},
 	}
-	localsgs := make(map[interfaces.GroupId]localGenInfo)
-	localsgs[interfaces.GroupId(8888)] = localGenInfo{Gen: 56, CheckptGen: 2000}
-	localsgs[interfaces.GroupId(1008888)] = localGenInfo{Gen: 25890, CheckptGen: 100}
+	localsgs := make(map[string]localGenInfo)
+	localsgs["8888"] = localGenInfo{Gen: 56, CheckptGen: 2000}
+	localsgs["1008888"] = localGenInfo{Gen: 25890, CheckptGen: 100}
 
 	tx := st.NewTransaction()
 	wantSt := &dbSyncState{
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 2fb5479..98e0e84 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -15,6 +15,7 @@
 
 import (
 	"fmt"
+	"strconv"
 	"strings"
 	"time"
 
@@ -212,28 +213,30 @@
 		version = newSyncGroupVersion()
 	}
 
+	oid := sgOID(sg.Id)
+
 	// Add the SyncGroup versioned data entry.
-	if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+	if err := setSGDataEntryByOID(ctx, tx, oid, version, sg); err != nil {
 		return err
 	}
 
-	// Add a sync log record for the SyncGroup if needed.
-	oid := sgIdKey(sg.Id)
-	logKey := ""
-	if withLog {
-		if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
-			return err
-		}
-		logKey = logRecKey(oid, servId, gen)
-	}
-
-	// Add the SyncGroup to the DAG.
 	var parents []string
 	if head, err := getHead(ctx, tx, oid); err == nil {
 		parents = []string{head}
 	} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
 		return err
 	}
+
+	// Add a sync log record for the SyncGroup if needed.
+	logKey := ""
+	if withLog {
+		if err := addSyncGroupLogRec(ctx, tx, oid, version, parents, servId, gen, pos); err != nil {
+			return err
+		}
+		logKey = logRecKey(oid, servId, gen)
+	}
+
+	// Add the SyncGroup to the DAG.
 	if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
 		return err
 	}
@@ -241,12 +244,12 @@
 }
 
 // addSyncGroupLogRec adds a new local log record for a SyncGroup.
-func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
-	oid := sgIdKey(gid)
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, oid, version string, parents []string, servId, gen, pos uint64) error {
 	rec := &localLogRec{
 		Metadata: interfaces.LogRecMetadata{
 			ObjId:   oid,
 			CurVers: version,
+			Parents: parents,
 			Delete:  false,
 			UpdTime: watchable.GetStoreTime(ctx, tx),
 			Id:      servId,
@@ -267,7 +270,7 @@
 
 // getSyncGroupVersion retrieves the current version of the SyncGroup.
 func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
-	return getHead(ctx, st, sgIdKey(gid))
+	return getHead(ctx, st, sgOID(gid))
 }
 
 // getSyncGroupById retrieves the SyncGroup given its ID.
@@ -321,7 +324,7 @@
 	// nodes).  This is done separately from pruning the DAG nodes because
 	// some nodes may have no log record pointing back to the SyncGroup data
 	// entries (loose coupling to support the pending SyncGroup state).
-	oid := sgIdKey(gid)
+	oid := sgOID(gid)
 	err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
 		return delSGDataEntry(ctx, tx, gid, v)
 	})
@@ -491,21 +494,11 @@
 	// Note: as with other syncable objects, the DAG "heads" table contains
 	// a reference to the current SyncGroup version, and the DAG "nodes"
 	// table tracks its history of mutations.
-	// TODO(rdaoud): change the data key prefix to use the SG OID instead
-	// of its ID, to be similar to the versioned user data keys.  The OID
-	// would use another SG-data prefix: "$sync:sgd:<gid>" and the data
-	// entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
 	sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
 	sgIdKeyPrefix   = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
-	sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
+	sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgDataPrefix)
 )
 
-// sgIdStr returns the SyncGroup ID in string format.
-// TODO(rdaoud): delete when the SG ID becomes a string throughout.
-func sgIdStr(gid interfaces.GroupId) string {
-	return fmt.Sprintf("%d", uint64(gid))
-}
-
 // sgNameKey returns the key used to access the SyncGroup name entry.
 func sgNameKey(name string) string {
 	return util.JoinKeyParts(sgNameKeyPrefix, name)
@@ -516,11 +509,35 @@
 	return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
 }
 
+// sgOID converts a group id into an oid string.
+func sgOID(gid interfaces.GroupId) string {
+	return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgID is the inverse of sgOID and converts an oid string into a group id.
+func sgID(oid string) (interfaces.GroupId, error) {
+	parts := util.SplitKeyParts(oid)
+	if len(parts) != 3 {
+		return 0, fmt.Errorf("invalid sgoid %s", oid)
+	}
+
+	id, err := strconv.ParseUint(parts[2], 10, 64)
+	if err != nil {
+		return 0, err
+	}
+	return interfaces.GroupId(id), nil
+}
+
 // sgDataKey returns the key used to access a version of the SyncGroup data.
 func sgDataKey(gid interfaces.GroupId, version string) string {
 	return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
 }
 
+// sgDataKeyByOID returns the key used to access a version of the SyncGroup data.
+func sgDataKeyByOID(oid, version string) string {
+	return util.JoinKeyParts(oid, version)
+}
+
 // splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
 func splitSgNameKey(ctx *context.T, key string) (string, error) {
 	// Note that the actual SyncGroup name may contain ":" as a separator.
@@ -558,9 +575,9 @@
 	return util.Put(ctx, tx, sgIdKey(gid), state)
 }
 
-// setSGDataEntry stores the SyncGroup versioned data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
-	return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+// setSGDataEntryByOID stores the SyncGroup versioned data entry.
+func setSGDataEntryByOID(ctx *context.T, tx store.Transaction, sgoid, version string, sg *interfaces.SyncGroup) error {
+	return util.Put(ctx, tx, sgDataKeyByOID(sgoid, version), sg)
 }
 
 // getSGNameEntry retrieves the SyncGroup ID for a given name.
@@ -590,6 +607,15 @@
 	return &sg, nil
 }
 
+// getSGDataEntryByOID retrieves the SyncGroup data for a given group OID and version.
+func getSGDataEntryByOID(ctx *context.T, st store.StoreReader, sgoid string, version string) (*interfaces.SyncGroup, error) {
+	var sg interfaces.SyncGroup
+	if err := util.Get(ctx, st, sgDataKeyByOID(sgoid, version), &sg); err != nil {
+		return nil, err
+	}
+	return &sg, nil
+}
+
 // delSGNameEntry deletes the SyncGroup name entry.
 func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
 	return util.Delete(ctx, tx, sgNameKey(name))
@@ -642,8 +668,7 @@
 		// has Admin privilege.
 
 		// Reserve a log generation and position counts for the new SyncGroup.
-		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
-		gen, pos := uint64(1), uint64(1)
+		gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
 
 		if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
 			return err
@@ -657,7 +682,7 @@
 		return err
 	}
 
-	ss.initSyncStateInMem(ctx, appName, dbName, gid)
+	ss.initSyncStateInMem(ctx, appName, dbName, sgOID(gid))
 
 	// Local SG create succeeded. Publish the SG at the chosen server, or if
 	// that fails, enqueue it for later publish retries.
@@ -780,7 +805,7 @@
 		return nullSpec, err
 	}
 
-	ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
+	ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sgOID(sg2.Id))
 
 	// Publish at the chosen mount table and in the neighborhood.
 	sd.publishInMountTables(ctx, call, sg2.Spec)
@@ -878,7 +903,7 @@
 	}
 
 	ss := sd.sync.(*syncService)
-	//appName, dbName := sd.db.App().Name(), sd.db.Name()
+	appName, dbName := sd.db.App().Name(), sd.db.Name()
 
 	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
@@ -909,8 +934,7 @@
 		}
 
 		// Reserve a log generation and position counts for the new SyncGroup.
-		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
-		gen, pos := uint64(1), uint64(1)
+		gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(sg.Id), 1)
 
 		// TODO(hpucha): Check SyncGroup ACL.
 
@@ -935,7 +959,7 @@
 func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
 	st := sd.db.St()
 	ss := sd.sync.(*syncService)
-	//appName, dbName := sd.db.App().Name(), sd.db.Name()
+	appName, dbName := sd.db.App().Name(), sd.db.Name()
 
 	gid, err := getSyncGroupId(ctx, st, sgName)
 	if err != nil {
@@ -959,11 +983,18 @@
 	// and joiner list of the SyncGroup get updated.  This is functionally
 	// correct, just not symmetrical with what happens at joiner, which
 	// receives the SyncGroup state post-join.
-	// TODO(rdaoud): send the SyncGroup genvec to the remote peer.
 	status := interfaces.SyncGroupStatusPublishRejected
 
+	sgs := sgSet{gid: struct{}{}}
+	gv, _, err := ss.copyDbGenInfo(ctx, appName, dbName, sgs)
+	if err != nil {
+		return err
+	}
+	// TODO(hpucha): Do we want to pick the head version corresponding to
+	// the local gen of the sg? It appears that it shouldn't matter.
+
 	c := interfaces.SyncClient(sgName)
-	peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
+	peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, gv[sgOID(gid)])
 
 	if err == nil {
 		status = interfaces.SyncGroupStatusRunning
@@ -1007,8 +1038,7 @@
 
 		// Reserve a log generation and position counts for the new
 		// SyncGroup version.
-		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
-		gen, pos := uint64(1), uint64(1)
+		gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgOID(gid), 1)
 
 		sg.Status = status
 		if status == interfaces.SyncGroupStatusRunning {
@@ -1176,12 +1206,15 @@
 	})
 
 	if err == nil {
-		s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+		s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sgOID(sg.Id))
 	}
 	return s.name, err
 }
 
 func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
+	vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: begin: %s from peer %s", sgName, joinerName)
+	defer vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: end: %s from peer %s", sgName, joinerName)
+
 	var dbSt store.Store
 	var gid interfaces.GroupId
 	var err error
@@ -1212,6 +1245,7 @@
 
 	version := newSyncGroupVersion()
 	var sg *interfaces.SyncGroup
+	var gen, pos uint64
 
 	err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
 		var err error
@@ -1235,8 +1269,7 @@
 		}
 
 		// Reserve a log generation and position counts for the new SyncGroup.
-		//gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
-		gen, pos := uint64(1), uint64(1)
+		gen, pos = s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgOID(gid), 1)
 
 		// Add to joiner list.
 		sg.Joiners[joinerName] = joinerInfo
@@ -1246,6 +1279,17 @@
 	if err != nil {
 		return nullSG, "", nullGV, err
 	}
-	// TODO(rdaoud): return the SyncGroup genvec
-	return *sg, version, nullGV, nil
+
+	sgs := sgSet{gid: struct{}{}}
+	gv, _, err := s.copyDbGenInfo(ctx, stAppName, stDbName, sgs)
+	if err != nil {
+		return nullSG, "", nullGV, err
+	}
+	// The retrieved genvector does not contain the mutation that adds the
+	// joiner to the list since initiator is the one checkpointing the
+	// generations. Add that generation to this genvector.
+	gv[sgOID(gid)][s.id] = gen
+
+	vlog.VI(2).Infof("sync: JoinSyncGroupAtAdmin: returning: sg %v, vers %v, genvec %v", sg, version, gv[sgOID(gid)])
+	return *sg, version, gv[sgOID(gid)], nil
 }
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 820dec6..054eb20 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -15,7 +15,8 @@
 	logDataPrefix = "data" // data log state.
 	dbssPrefix    = "dbss" // database sync state.
 	dagPrefix     = "dag"  // dag state.
-	sgPrefix      = "sg"   // syncgroup state.
+	sgPrefix      = "sg"   // local syncgroup state.
+	sgDataPrefix  = "sgd"  // synced syncgroup state.
 )
 
 // syncData represents the persistent state of the sync module.
@@ -32,7 +33,7 @@
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
 	Data     localGenInfo
-	Sgs      map[interfaces.GroupId]localGenInfo
+	Sgs      map[string]localGenInfo
 	GenVec   interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
 	SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
 }
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index e352de4..3726fb0 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -39,7 +39,7 @@
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
 	Data     localGenInfo
-	Sgs      map[interfaces.GroupId]localGenInfo
+	Sgs      map[string]localGenInfo
 	GenVec   interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
 	SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
 }
@@ -156,4 +156,6 @@
 
 const dagPrefix = "dag" // dag state.
 
-const sgPrefix = "sg" // syncgroup state.
+const sgPrefix = "sg" // local syncgroup state.
+
+const sgDataPrefix = "sgd" // synced syncgroup state.
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index 9c16294..5b8b469 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -120,7 +120,7 @@
 	}
 
 	// Initialize Database sync state if needed.
-	s.initSyncStateInMem(ctx, appName, dbName, interfaces.NoGroupId)
+	s.initSyncStateInMem(ctx, appName, dbName, "")
 
 	// Get a batch of watch log entries, if any, after this resume marker.
 	logs, nextResmark, err := watchable.ReadBatchFromLog(st, resMark)