syncbase/vsync: SyncGroup and DAG changes for SG syncing

* Change SyncGroup storage to support their versioning and being tracked
  in DAG and log records.
* Update the SyncGroup join & publish calls to support the asynchronous
  transition from known-but-pending SyncGroup to caught-up after sync.
* Add a SyncGroup local state info to track the number of local peer
  joiners, and the watchability, remote publishing, and sync pending
  states of the SyncGroup.
* Add the retry loop for the SyncGroup publishing to the remote peer.
* Improve error checking of SyncGroup prefixes passed in the Spec.
* Add a 1st-cut HasKey() store util API to bypass the VOM decode.  This
  will be later further optimized for leveldb to avoid a copy of the
  value from the C buffer to the Go buffer.
* Add DAG support to fully prune all nodes for an object including its
  current head node.
* Remove temporay solution of the responder adding the initiator to the
  SyncGroup joiner list.
* Improve unittest coverage.

Change-Id: I4585e248bd9e15b8b9585ec7b1830f8225686b2a
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 91ba499..2fb5479 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -81,6 +81,7 @@
 }
 
 // verifySyncGroup verifies if a SyncGroup struct is well-formed.
+// TODO(rdaoud): define verrors for all ErrBadArg cases.
 func verifySyncGroup(ctx *context.T, sg *interfaces.SyncGroup) error {
 	if sg == nil {
 		return verror.New(verror.ErrBadArg, ctx, "group information not specified")
@@ -106,40 +107,157 @@
 	if len(sg.Joiners) == 0 {
 		return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
 	}
-	if len(sg.Spec.Prefixes) == 0 {
+	return verifySyncGroupSpec(ctx, &sg.Spec)
+}
+
+// verifySyncGroupSpec verifies if a SyncGroupSpec is well-formed.
+func verifySyncGroupSpec(ctx *context.T, spec *wire.SyncGroupSpec) error {
+	if spec == nil {
+		return verror.New(verror.ErrBadArg, ctx, "group spec not specified")
+	}
+	if len(spec.Prefixes) == 0 {
 		return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
 	}
+
+	// Duplicate prefixes are not allowed.
+	prefixes := make(map[string]bool, len(spec.Prefixes))
+	for _, pfx := range spec.Prefixes {
+		prefixes[pfx] = true
+	}
+	if len(prefixes) != len(spec.Prefixes) {
+		return verror.New(verror.ErrBadArg, ctx, "group has duplicate prefixes specified")
+	}
 	return nil
 }
 
-// addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error {
-	// Verify SyncGroup before storing it since it may have been received
-	// from a remote peer.
+// samePrefixes returns true if the two sets of prefixes are the same.
+func samePrefixes(pfx1, pfx2 []string) bool {
+	pfxMap := make(map[string]uint8)
+	for _, p := range pfx1 {
+		pfxMap[p] |= 0x01
+	}
+	for _, p := range pfx2 {
+		pfxMap[p] |= 0x02
+	}
+	for _, mask := range pfxMap {
+		if mask != 0x03 {
+			return false
+		}
+	}
+	return true
+}
+
+// addSyncGroup adds a new SyncGroup given its version and information.  This
+// also includes creating a DAG node entry and updating the DAG head.  If the
+// caller is the creator of the SyncGroup, a local log record is also created
+// using the given server ID and gen and pos counters to index the log record.
+// Otherwise, it's a joiner case and the SyncGroup is put in a pending state
+// (waiting for its full metadata to be synchronized) and the log record is
+// skipped, delaying its creation till the Initiator does p2p sync.
+func (s *syncService) addSyncGroup(ctx *context.T, tx store.Transaction, version string, creator bool, remotePublisher string, genvec interfaces.PrefixGenVector, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+	// Verify the SyncGroup information before storing it since it may have
+	// been received from a remote peer.
 	if err := verifySyncGroup(ctx, sg); err != nil {
 		return err
 	}
 
-	if ok, err := hasSGDataEntry(tx, sg.Id); err != nil {
-		return err
-	} else if ok {
-		return verror.New(verror.ErrExist, ctx, "group id already exists")
-	}
+	// Add the group name and ID entries.
 	if ok, err := hasSGNameEntry(tx, sg.Name); err != nil {
 		return err
 	} else if ok {
 		return verror.New(verror.ErrExist, ctx, "group name already exists")
 	}
+	if ok, err := hasSGIdEntry(tx, sg.Id); err != nil {
+		return err
+	} else if ok {
+		return verror.New(verror.ErrExist, ctx, "group id already exists")
+	}
 
-	// Add the group name and data entries.
+	state := sgLocalState{
+		RemotePublisher: remotePublisher,
+		SyncPending:     !creator,
+		PendingGenVec:   genvec,
+	}
+	if remotePublisher == "" {
+		state.NumLocalJoiners = 1
+	}
+
 	if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
 		return err
 	}
-	if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil {
+	if err := setSGIdEntry(ctx, tx, sg.Id, &state); err != nil {
 		return err
 	}
 
-	return nil
+	// Add the SyncGroup versioned data entry.
+	if ok, err := hasSGDataEntry(tx, sg.Id, version); err != nil {
+		return err
+	} else if ok {
+		return verror.New(verror.ErrExist, ctx, "group id version already exists")
+	}
+
+	return s.updateSyncGroupVersioning(ctx, tx, version, creator, servId, gen, pos, sg)
+}
+
+// updateSyncGroupVersioning updates the per-version information of a SyncGroup.
+// It writes a new versioned copy of the SyncGroup data entry, a new DAG node,
+// and updates the DAG head.  Optionally, it also writes a new local log record
+// using the given server ID and gen and pos counters to index it.  The caller
+// can provide the version number to use otherwise, if NoVersion is given, a new
+// version is generated by the function.
+// TODO(rdaoud): hook SyncGroup mutations (and deletions) to the watch log so
+// apps can monitor SG changes as well.
+func (s *syncService) updateSyncGroupVersioning(ctx *context.T, tx store.Transaction, version string, withLog bool, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+	if version == NoVersion {
+		version = newSyncGroupVersion()
+	}
+
+	// Add the SyncGroup versioned data entry.
+	if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+		return err
+	}
+
+	// Add a sync log record for the SyncGroup if needed.
+	oid := sgIdKey(sg.Id)
+	logKey := ""
+	if withLog {
+		if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
+			return err
+		}
+		logKey = logRecKey(oid, servId, gen)
+	}
+
+	// Add the SyncGroup to the DAG.
+	var parents []string
+	if head, err := getHead(ctx, tx, oid); err == nil {
+		parents = []string{head}
+	} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
+		return err
+	}
+	if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
+		return err
+	}
+	return setHead(ctx, tx, oid, version)
+}
+
+// addSyncGroupLogRec adds a new local log record for a SyncGroup.
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
+	oid := sgIdKey(gid)
+	rec := &localLogRec{
+		Metadata: interfaces.LogRecMetadata{
+			ObjId:   oid,
+			CurVers: version,
+			Delete:  false,
+			UpdTime: watchable.GetStoreTime(ctx, tx),
+			Id:      servId,
+			Gen:     gen,
+			RecType: interfaces.NodeRec,
+			BatchId: NoBatchId,
+		},
+		Pos: pos,
+	}
+
+	return putLogRec(ctx, tx, oid, rec)
 }
 
 // getSyncGroupId retrieves the SyncGroup ID given its name.
@@ -147,18 +265,18 @@
 	return getSGNameEntry(ctx, st, name)
 }
 
-// getSyncGroupName retrieves the SyncGroup name given its ID.
-func getSyncGroupName(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
-	sg, err := getSyncGroupById(ctx, st, gid)
-	if err != nil {
-		return "", err
-	}
-	return sg.Name, nil
+// getSyncGroupVersion retrieves the current version of the SyncGroup.
+func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
+	return getHead(ctx, st, sgIdKey(gid))
 }
 
 // getSyncGroupById retrieves the SyncGroup given its ID.
 func getSyncGroupById(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
-	return getSGDataEntry(ctx, st, gid)
+	version, err := getSyncGroupVersion(ctx, st, gid)
+	if err != nil {
+		return nil, err
+	}
+	return getSGDataEntry(ctx, st, gid, version)
 }
 
 // getSyncGroupByName retrieves the SyncGroup given its name.
@@ -176,19 +294,53 @@
 	if err != nil {
 		return err
 	}
-	if err = delSGNameEntry(ctx, tx, sg.Name); err != nil {
-		return err
-	}
-	return delSGDataEntry(ctx, tx, sg.Id)
+	return delSyncGroupByName(ctx, tx, sg.Name)
 }
 
 // delSyncGroupByName deletes the SyncGroup given its name.
 func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
+	// Get the SyncGroup ID and current version.
 	gid, err := getSyncGroupId(ctx, tx, name)
 	if err != nil {
 		return err
 	}
-	return delSyncGroupById(ctx, tx, gid)
+	version, err := getSyncGroupVersion(ctx, tx, gid)
+	if err != nil {
+		return err
+	}
+
+	// Delete the name and ID entries.
+	if err := delSGNameEntry(ctx, tx, name); err != nil {
+		return err
+	}
+	if err := delSGIdEntry(ctx, tx, gid); err != nil {
+		return err
+	}
+
+	// Delete all versioned SyncGroup data entries (same versions as DAG
+	// nodes).  This is done separately from pruning the DAG nodes because
+	// some nodes may have no log record pointing back to the SyncGroup data
+	// entries (loose coupling to support the pending SyncGroup state).
+	oid := sgIdKey(gid)
+	err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
+		return delSGDataEntry(ctx, tx, gid, v)
+	})
+	if err != nil {
+		return err
+	}
+
+	// Delete all DAG nodes and log records.
+	bset := newBatchPruning()
+	err = prune(ctx, tx, oid, NoVersion, bset, func(ctx *context.T, tx store.Transaction, lr string) error {
+		if lr != "" {
+			return util.Delete(ctx, tx, lr)
+		}
+		return nil
+	})
+	if err != nil {
+		return err
+	}
+	return pruneDone(ctx, tx, bset)
 }
 
 // refreshMembersIfExpired updates the aggregate view of SyncGroup members
@@ -252,16 +404,23 @@
 // make forEachSyncGroup() stop the iteration earlier; otherwise the function
 // loops across all SyncGroups in the Database.
 func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
-	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
-	stream := st.Scan(scanStart, scanLimit)
+	stream := st.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+	defer stream.Cancel()
+
 	for stream.Advance() {
-		var sg interfaces.SyncGroup
-		if vom.Decode(stream.Value(nil), &sg) != nil {
-			vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup value for key %s", string(stream.Key(nil)))
+		var gid interfaces.GroupId
+		if vom.Decode(stream.Value(nil), &gid) != nil {
+			vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup ID for key %s", string(stream.Key(nil)))
 			continue
 		}
 
-		if callback(&sg) {
+		sg, err := getSyncGroupById(nil, st, gid)
+		if err != nil {
+			vlog.Errorf("sync: forEachSyncGroup: cannot get SyncGroup %d: %v", gid, err)
+			continue
+		}
+
+		if callback(sg) {
 			break // done, early exit
 		}
 	}
@@ -324,63 +483,69 @@
 // Use the functions above to manipulate SyncGroups.
 
 var (
-	// sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
-	sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
-
-	// sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
-	sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+	// Prefixes used to store the different mappings of a SyncGroup:
+	// sgNameKeyPrefix: name --> ID
+	// sgIdKeyPrefix: ID --> SyncGroup local state
+	// sgDataKeyPrefix: (ID, version) --> SyncGroup data (synchronized)
+	//
+	// Note: as with other syncable objects, the DAG "heads" table contains
+	// a reference to the current SyncGroup version, and the DAG "nodes"
+	// table tracks its history of mutations.
+	// TODO(rdaoud): change the data key prefix to use the SG OID instead
+	// of its ID, to be similar to the versioned user data keys.  The OID
+	// would use another SG-data prefix: "$sync:sgd:<gid>" and the data
+	// entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
+	sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+	sgIdKeyPrefix   = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
+	sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
 )
 
-// sgDataKey returns the key used to access the SyncGroup data entry.
-func sgDataKey(gid interfaces.GroupId) string {
-	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d", fmt.Sprintf("%d", gid))
+// sgIdStr returns the SyncGroup ID in string format.
+// TODO(rdaoud): delete when the SG ID becomes a string throughout.
+func sgIdStr(gid interfaces.GroupId) string {
+	return fmt.Sprintf("%d", uint64(gid))
 }
 
 // sgNameKey returns the key used to access the SyncGroup name entry.
 func sgNameKey(name string) string {
-	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
+	return util.JoinKeyParts(sgNameKeyPrefix, name)
+}
+
+// sgIdKey returns the key used to access the SyncGroup ID entry.
+func sgIdKey(gid interfaces.GroupId) string {
+	return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgDataKey returns the key used to access a version of the SyncGroup data.
+func sgDataKey(gid interfaces.GroupId, version string) string {
+	return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
 }
 
 // splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
 func splitSgNameKey(ctx *context.T, key string) (string, error) {
-	prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
-
 	// Note that the actual SyncGroup name may contain ":" as a separator.
-	if !strings.HasPrefix(key, prefix) {
+	// So don't split the key on the separator, instead trim its prefix.
+	prefix := util.JoinKeyParts(sgNameKeyPrefix, "")
+	name := strings.TrimPrefix(key, prefix)
+	if name == key {
 		return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
 	}
-	return strings.TrimPrefix(key, prefix), nil
-}
-
-// hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
-	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
-	var sg interfaces.SyncGroup
-	if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil {
-		if verror.ErrorID(err) == verror.ErrNoExist.ID {
-			err = nil
-		}
-		return false, err
-	}
-	return true, nil
+	return name, nil
 }
 
 // hasSGNameEntry returns true if the SyncGroup name entry exists.
 func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
-	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
-	var gid interfaces.GroupId
-	if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil {
-		if verror.ErrorID(err) == verror.ErrNoExist.ID {
-			err = nil
-		}
-		return false, err
-	}
-	return true, nil
+	return util.Exists(nil, sntx, sgNameKey(name))
 }
 
-// setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
-	return util.Put(ctx, tx, sgDataKey(gid), sg)
+// hasSGIdEntry returns true if the SyncGroup ID entry exists.
+func hasSGIdEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
+	return util.Exists(nil, sntx, sgIdKey(gid))
+}
+
+// hasSGDataEntry returns true if the SyncGroup versioned data entry exists.
+func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId, version string) (bool, error) {
+	return util.Exists(nil, sntx, sgDataKey(gid, version))
 }
 
 // setSGNameEntry stores the SyncGroup name entry.
@@ -388,34 +553,58 @@
 	return util.Put(ctx, tx, sgNameKey(name), gid)
 }
 
-// getSGDataEntry retrieves the SyncGroup data for a given group ID.
-func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
+// setSGIdEntry stores the SyncGroup ID entry.
+func setSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, state *sgLocalState) error {
+	return util.Put(ctx, tx, sgIdKey(gid), state)
+}
+
+// setSGDataEntry stores the SyncGroup versioned data entry.
+func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
+	return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+}
+
+// getSGNameEntry retrieves the SyncGroup ID for a given name.
+func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
+	var gid interfaces.GroupId
+	if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
+		return interfaces.NoGroupId, err
+	}
+	return gid, nil
+}
+
+// getSGIdEntry retrieves the SyncGroup local state for a given group ID.
+func getSGIdEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*sgLocalState, error) {
+	var state sgLocalState
+	if err := util.Get(ctx, st, sgIdKey(gid), &state); err != nil {
+		return nil, err
+	}
+	return &state, nil
+}
+
+// getSGDataEntry retrieves the SyncGroup data for a given group ID and version.
+func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId, version string) (*interfaces.SyncGroup, error) {
 	var sg interfaces.SyncGroup
-	if err := util.Get(ctx, st, sgDataKey(gid), &sg); err != nil {
+	if err := util.Get(ctx, st, sgDataKey(gid, version), &sg); err != nil {
 		return nil, err
 	}
 	return &sg, nil
 }
 
-// getSGNameEntry retrieves the SyncGroup name to ID mapping.
-func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
-	var gid interfaces.GroupId
-	if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
-		return gid, err
-	}
-	return gid, nil
-}
-
-// delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
-	return util.Delete(ctx, tx, sgDataKey(gid))
-}
-
-// delSGNameEntry deletes the SyncGroup name to ID mapping.
+// delSGNameEntry deletes the SyncGroup name entry.
 func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
 	return util.Delete(ctx, tx, sgNameKey(name))
 }
 
+// delSGIdEntry deletes the SyncGroup ID entry.
+func delSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
+	return util.Delete(ctx, tx, sgIdKey(gid))
+}
+
+// delSGDataEntry deletes the SyncGroup versioned data entry.
+func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string) error {
+	return util.Delete(ctx, tx, sgDataKey(gid, version))
+}
+
 ////////////////////////////////////////////////////////////
 // SyncGroup methods between Client and Syncbase.
 
@@ -424,9 +613,22 @@
 	vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
 	defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
 
-	// Get this Syncbase's sync module handle.
 	ss := sd.sync.(*syncService)
-	var sg *interfaces.SyncGroup
+	appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+	// Instantiate sg. Add self as joiner.
+	gid, version := newSyncGroupId(), newSyncGroupVersion()
+	sg := &interfaces.SyncGroup{
+		Id:          gid,
+		Name:        sgName,
+		SpecVersion: version,
+		Spec:        spec,
+		Creator:     ss.name,
+		AppName:     appName,
+		DbName:      dbName,
+		Status:      interfaces.SyncGroupStatusPublishPending,
+		Joiners:     map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
+	}
 
 	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
@@ -436,29 +638,17 @@
 
 		// TODO(hpucha): Check prefix ACLs on all SG prefixes.
 		// This may need another method on util.Database interface.
-
 		// TODO(hpucha): Do some SG ACL checking. Check creator
 		// has Admin privilege.
 
-		// Instantiate sg. Add self as joiner.
-		sg = &interfaces.SyncGroup{
-			Id:          newSyncGroupId(),
-			Name:        sgName,
-			SpecVersion: newSyncGroupVersion(),
-			Spec:        spec,
-			Creator:     ss.name,
-			AppName:     sd.db.App().Name(),
-			DbName:      sd.db.Name(),
-			Status:      interfaces.SyncGroupStatusPublishPending,
-			Joiners:     map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
-		}
+		// Reserve a log generation and position counts for the new SyncGroup.
+		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+		gen, pos := uint64(1), uint64(1)
 
-		if err := addSyncGroup(ctx, tx, sg); err != nil {
+		if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
 			return err
 		}
 
-		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
 		// Take a snapshot of the data to bootstrap the SyncGroup.
 		return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes)
 	})
@@ -467,9 +657,13 @@
 		return err
 	}
 
-	ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
-	// Local SG create succeeded. Publish the SG at the chosen server.
-	sd.publishSyncGroup(ctx, call, sgName)
+	ss.initSyncStateInMem(ctx, appName, dbName, gid)
+
+	// Local SG create succeeded. Publish the SG at the chosen server, or if
+	// that fails, enqueue it for later publish retries.
+	if err := sd.publishSyncGroup(ctx, call, sgName); err != nil {
+		ss.enqueuePublishSyncGroup(sgName, appName, dbName, true)
+	}
 
 	// Publish at the chosen mount table and in the neighborhood.
 	sd.publishInMountTables(ctx, call, spec)
@@ -492,24 +686,54 @@
 			return err
 		}
 
-		// Check if SyncGroup already exists.
-		sg, sgErr = getSyncGroupByName(ctx, tx, sgName)
+		// Check if SyncGroup already exists and get its info.
+		var gid interfaces.GroupId
+		gid, sgErr = getSyncGroupId(ctx, tx, sgName)
 		if sgErr != nil {
 			return sgErr
 		}
 
-		// SyncGroup already exists. Possibilities include created
-		// locally, already joined locally or published at the device as
-		// a result of SyncGroup creation on a different device.
-		//
-		// TODO(hpucha): Handle the above cases. If the SG was published
-		// locally, but not joined, we need to bootstrap the DAG and
-		// watcher. If multiple joins are done locally, we may want to
-		// ref count the SG state and track the leaves accordingly. So
-		// we may need to add some local state for each SyncGroup.
+		sg, sgErr = getSyncGroupById(ctx, tx, gid)
+		if sgErr != nil {
+			return sgErr
+		}
 
 		// Check SG ACL.
-		return authorize(ctx, call.Security(), sg)
+		if err := authorize(ctx, call.Security(), sg); err != nil {
+			return err
+		}
+
+		// SyncGroup already exists, increment the number of local
+		// joiners in its local state information.  This presents
+		// different scenarios:
+		// 1- An additional local joiner: the current number of local
+		//    joiners is > 0 and the SyncGroup was already bootstrapped
+		//    to the Watcher, so there is nothing else to do.
+		// 2- A new local joiner after all previous local joiners had
+		//    left: the number of local joiners is 0, the Watcher must
+		//    be re-notified via a SyncGroup bootstrap because the last
+		//    previous joiner to leave had un-notified the Watcher.  In
+		//    this scenario the SyncGroup was not destroyed after the
+		//    last joiner left because the SyncGroup was also published
+		//    here by a remote peer and thus cannot be destroyed only
+		//    based on the local joiners.
+		// 3- A first local joiner for a SyncGroup that was published
+		//    here from a remote Syncbase: the number of local joiners
+		//    is also 0 (and the remote publish flag is set), and the
+		//    Watcher must be notified via a SyncGroup bootstrap.
+		// Conclusion: bootstrap if the number of local joiners is 0.
+		sgState, err := getSGIdEntry(ctx, tx, gid)
+		if err != nil {
+			return err
+		}
+
+		if sgState.NumLocalJoiners == 0 {
+			if err := sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes); err != nil {
+				return err
+			}
+		}
+		sgState.NumLocalJoiners++
+		return setSGIdEntry(ctx, tx, gid, sgState)
 	})
 
 	// The presented blessing is allowed to make this Syncbase instance join
@@ -532,48 +756,41 @@
 	ss := sd.sync.(*syncService)
 
 	// Contact a SyncGroup Admin to join the SyncGroup.
-	sg = &interfaces.SyncGroup{}
-	*sg, err = sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
+	sg2, version, genvec, err := sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
 	if err != nil {
 		return nullSpec, err
 	}
 
 	// Verify that the app/db combination is valid for this SyncGroup.
-	if sg.AppName != sd.db.App().Name() || sg.DbName != sd.db.Name() {
+	appName, dbName := sd.db.App().Name(), sd.db.Name()
+	if sg2.AppName != appName || sg2.DbName != dbName {
 		return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
 	}
 
 	err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
-
-		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
-		// TODO(hpucha): Get SG Deltas from Admin device.
-
-		if err := addSyncGroup(ctx, tx, sg); err != nil {
+		if err := ss.addSyncGroup(ctx, tx, version, false, "", genvec, 0, 0, 0, &sg2); err != nil {
 			return err
 		}
 
 		// Take a snapshot of the data to bootstrap the SyncGroup.
-		return sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes)
+		return sd.bootstrapSyncGroup(ctx, tx, sg2.Spec.Prefixes)
 	})
 
 	if err != nil {
 		return nullSpec, err
 	}
 
-	ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+	ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
 
 	// Publish at the chosen mount table and in the neighborhood.
-	sd.publishInMountTables(ctx, call, sg.Spec)
+	sd.publishInMountTables(ctx, call, sg2.Spec)
 
-	return sg.Spec, nil
+	return sg2.Spec, nil
 }
 
 func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
-	var sgNames []string
-
 	vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
-	defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end")
 
 	sn := sd.db.St().NewSnapshot()
 	defer sn.Abort()
@@ -584,8 +801,8 @@
 	}
 
 	// Scan all the SyncGroup names found in the Database.
-	scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
-	stream := sn.Scan(scanStart, scanLimit)
+	stream := sn.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+	var sgNames []string
 	var key []byte
 	for stream.Advance() {
 		sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
@@ -599,18 +816,19 @@
 		return nil, err
 	}
 
+	vlog.VI(2).Infof("sync: GetSyncGroupNames: %v", sgNames)
 	return sgNames, nil
 }
 
 func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
-	var spec wire.SyncGroupSpec
-
 	vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
-	defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s", sgName)
 
 	sn := sd.db.St().NewSnapshot()
 	defer sn.Abort()
 
+	var spec wire.SyncGroupSpec
+
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
 		return spec, "", err
@@ -623,41 +841,45 @@
 	}
 	// TODO(hpucha): Check SyncGroup ACL.
 
-	spec = sg.Spec
-	return spec, sg.SpecVersion, nil
+	vlog.VI(2).Infof("sync: GetSyncGroupSpec: %s spec %v", sgName, sg.Spec)
+	return sg.Spec, sg.SpecVersion, nil
 }
 
 func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
-	var members map[string]wire.SyncGroupMemberInfo
-
 	vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
-	defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s", sgName)
 
 	sn := sd.db.St().NewSnapshot()
 	defer sn.Abort()
 
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
-		return members, err
+		return nil, err
 	}
 
 	// Get the SyncGroup information.
 	sg, err := getSyncGroupByName(ctx, sn, sgName)
 	if err != nil {
-		return members, err
+		return nil, err
 	}
 
 	// TODO(hpucha): Check SyncGroup ACL.
 
-	members = sg.Joiners
-	return members, nil
+	vlog.VI(2).Infof("sync: GetSyncGroupMembers: %s members %v", sgName, sg.Joiners)
+	return sg.Joiners, nil
 }
 
-// TODO(hpucha): Enable syncing syncgroup metadata.
 func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
 	vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
 	defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
 
+	if err := verifySyncGroupSpec(ctx, &spec); err != nil {
+		return err
+	}
+
+	ss := sd.sync.(*syncService)
+	//appName, dbName := sd.db.App().Name(), sd.db.Name()
+
 	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
@@ -669,10 +891,33 @@
 			return err
 		}
 
-		// TODO(hpucha): Check SyncGroup ACL. Perform version checking.
+		if version != NoVersion && sg.SpecVersion != version {
+			return verror.NewErrBadVersion(ctx)
+		}
 
+		// Must not change the SyncGroup prefixes.
+		if !samePrefixes(spec.Prefixes, sg.Spec.Prefixes) {
+			return verror.New(verror.ErrBadArg, ctx, "cannot modify prefixes")
+		}
+
+		sgState, err := getSGIdEntry(ctx, tx, sg.Id)
+		if err != nil {
+			return err
+		}
+		if sgState.SyncPending {
+			return verror.NewErrBadState(ctx)
+		}
+
+		// Reserve a log generation and position counts for the new SyncGroup.
+		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
+		gen, pos := uint64(1), uint64(1)
+
+		// TODO(hpucha): Check SyncGroup ACL.
+
+		newVersion := newSyncGroupVersion()
 		sg.Spec = spec
-		return setSGDataEntry(ctx, tx, sg.Id, sg)
+		sg.SpecVersion = newVersion
+		return ss.updateSyncGroupVersioning(ctx, tx, newVersion, true, ss.id, gen, pos, sg)
 	})
 	return err
 }
@@ -680,9 +925,27 @@
 //////////////////////////////
 // Helper functions
 
-// TODO(hpucha): Call this periodically until we are able to contact the remote peer.
+// publishSyncGroup publishes the SyncGroup at the remote peer and update its
+// status.  If the publish operation is either successful or rejected by the
+// peer, the status is updated to "running" or "rejected" respectively and the
+// function returns "nil" to indicate to the caller there is no need to make
+// further attempts.  Otherwise an error (typically RPC error, but could also
+// be a store error) is returned to the caller.
+// TODO(rdaoud): make all SG admins try to publish after they join.
 func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
-	sg, err := getSyncGroupByName(ctx, sd.db.St(), sgName)
+	st := sd.db.St()
+	ss := sd.sync.(*syncService)
+	//appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+	gid, err := getSyncGroupId(ctx, st, sgName)
+	if err != nil {
+		return err
+	}
+	version, err := getSyncGroupVersion(ctx, st, gid)
+	if err != nil {
+		return err
+	}
+	sg, err := getSGDataEntry(ctx, st, gid, version)
 	if err != nil {
 		return err
 	}
@@ -691,34 +954,74 @@
 		return nil
 	}
 
+	// Note: the remote peer is given the SyncGroup version and genvec at
+	// the point before the post-publish update, at which time the status
+	// and joiner list of the SyncGroup get updated.  This is functionally
+	// correct, just not symmetrical with what happens at joiner, which
+	// receives the SyncGroup state post-join.
+	// TODO(rdaoud): send the SyncGroup genvec to the remote peer.
+	status := interfaces.SyncGroupStatusPublishRejected
+
 	c := interfaces.SyncClient(sgName)
-	err = c.PublishSyncGroup(ctx, *sg)
+	peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
 
-	// Publish failed temporarily. Retry later.
-	// TODO(hpucha): Is there an RPC error that we can check here?
-	if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
-		return err
-	}
-
-	// Publish succeeded.
 	if err == nil {
-		// TODO(hpucha): Get SG Deltas from publisher. Obtaining the
-		// new version from the publisher prevents SG conflicts.
-		return err
+		status = interfaces.SyncGroupStatusRunning
+	} else {
+		errId := verror.ErrorID(err)
+		if errId == interfaces.ErrDupSyncGroupPublish.ID {
+			// Duplicate publish: another admin already published
+			// the SyncGroup, nothing else needs to happen because
+			// that other admin would have updated the SyncGroup
+			// status and p2p SG sync will propagate the change.
+			// TODO(rdaoud): what if that other admin crashes and
+			// never updates the SyncGroup status (dies permanently
+			// or is ejected before the status update)?  Eventually
+			// some admin must decide to update the SG status anyway
+			// even if that causes extra SG mutations and conflicts.
+			vlog.VI(3).Infof("sync: publishSyncGroup: %s: duplicate publish", sgName)
+			return nil
+		}
+
+		if errId != verror.ErrExist.ID {
+			// The publish operation failed with an error other
+			// than ErrExist then it must be retried later on.
+			// TODO(hpucha): Is there an RPC error that we can check here?
+			vlog.VI(3).Infof("sync: publishSyncGroup: %s: failed, retry later: %v", sgName, err)
+			return err
+		}
 	}
 
-	// Publish rejected. Persist that to avoid retrying in the
-	// future and to remember the split universe scenario.
-	err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
+	// The publish operation is done because either it succeeded or it
+	// failed with the ErrExist error.  Update the SyncGroup status and, if
+	// the publish was successful, add the remote peer to the SyncGroup.
+	vlog.VI(3).Infof("sync: publishSyncGroup: %s: peer %s: done: status %s: %v",
+		sgName, peer, status.String(), err)
+
+	err = store.RunInTransaction(st, func(tx store.Transaction) error {
 		// Ensure SG still exists.
-		sg, err := getSyncGroupByName(ctx, tx, sgName)
+		sg, err := getSyncGroupById(ctx, tx, gid)
 		if err != nil {
 			return err
 		}
 
-		sg.Status = interfaces.SyncGroupStatusPublishRejected
-		return setSGDataEntry(ctx, tx, sg.Id, sg)
+		// Reserve a log generation and position counts for the new
+		// SyncGroup version.
+		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+		gen, pos := uint64(1), uint64(1)
+
+		sg.Status = status
+		if status == interfaces.SyncGroupStatusRunning {
+			// TODO(hpucha): Default priority?
+			sg.Joiners[peer] = wire.SyncGroupMemberInfo{}
+		}
+
+		return ss.updateSyncGroupVersioning(ctx, tx, NoVersion, true, ss.id, gen, pos, sg)
 	})
+	if err != nil {
+		vlog.Errorf("sync: publishSyncGroup: cannot update SyncGroup %s status to %s: %v",
+			sgName, status.String(), err)
+	}
 	return err
 }
 
@@ -801,7 +1104,7 @@
 	return nil
 }
 
-func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
 	c := interfaces.SyncClient(sgName)
 	return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo)
 
@@ -819,36 +1122,49 @@
 ////////////////////////////////////////////////////////////
 // Methods for SyncGroup create/join between Syncbases.
 
-func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg interfaces.SyncGroup) error {
+func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg interfaces.SyncGroup, version string, genvec interfaces.PrefixGenVector) (string, error) {
 	st, err := s.getDbStore(ctx, call, sg.AppName, sg.DbName)
 	if err != nil {
-		return err
+		return s.name, err
 	}
 
 	err = store.RunInTransaction(st, func(tx store.Transaction) error {
-		localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
-
+		gid, err := getSyncGroupId(ctx, tx, sg.Name)
 		if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
 			return err
 		}
 
-		// SG name already claimed.
-		if err == nil && localSG.Id != sg.Id {
-			return verror.New(verror.ErrExist, ctx, sg.Name)
-		}
-
-		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG
-		// metadata if needed.
-		//
-		// TODO(hpucha): Catch up on SG versions so far.
-
-		// SG already published. Update if needed.
-		if err == nil && localSG.Id == sg.Id {
-			if localSG.Status == interfaces.SyncGroupStatusPublishPending {
-				localSG.Status = interfaces.SyncGroupStatusRunning
-				return setSGDataEntry(ctx, tx, localSG.Id, localSG)
+		if err == nil {
+			// SG name already claimed.  Note that in this case of
+			// split-brain (same SG name, different IDs), those in
+			// SG ID being rejected here do not benefit from the
+			// de-duping optimization below and will end up making
+			// duplicate SG mutations to set the status, yielding
+			// more SG conflicts.  It is functionally correct but
+			// bypasses the de-dup optimization for the rejected SG.
+			if gid != sg.Id {
+				return verror.New(verror.ErrExist, ctx, sg.Name)
 			}
-			return nil
+
+			// SG exists locally, either locally created/joined or
+			// previously published.  Make it idempotent for the
+			// same publisher, otherwise it's a duplicate.
+			state, err := getSGIdEntry(ctx, tx, gid)
+			if err != nil {
+				return err
+			}
+			if state.RemotePublisher == "" {
+				// Locally created/joined SyncGroup: update its
+				// state to include the publisher.
+				state.RemotePublisher = publisher
+				return setSGIdEntry(ctx, tx, gid, state)
+			}
+			if publisher == state.RemotePublisher {
+				// Same previous publisher: nothing to change,
+				// the old genvec and version info is valid.
+				return nil
+			}
+			return interfaces.NewErrDupSyncGroupPublish(ctx, sg.Name)
 		}
 
 		// Publish the SyncGroup.
@@ -856,23 +1172,21 @@
 		// TODO(hpucha): Use some ACL check to allow/deny publishing.
 		// TODO(hpucha): Ensure node is on Admin ACL.
 
-		// TODO(hpucha): Default priority?
-		sg.Joiners[s.name] = wire.SyncGroupMemberInfo{}
-		sg.Status = interfaces.SyncGroupStatusRunning
-		return addSyncGroup(ctx, tx, &sg)
+		return s.addSyncGroup(ctx, tx, version, false, publisher, genvec, 0, 0, 0, &sg)
 	})
 
-	if err != nil {
-		return err
+	if err == nil {
+		s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
 	}
-	s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
-	return nil
+	return s.name, err
 }
 
-func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
 	var dbSt store.Store
 	var gid interfaces.GroupId
 	var err error
+	var stAppName, stDbName string
+	nullSG, nullGV := interfaces.SyncGroup{}, interfaces.PrefixGenVector{}
 
 	// Find the database store for this SyncGroup.
 	//
@@ -885,6 +1199,7 @@
 		if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
 			// Found the SyncGroup being looked for.
 			dbSt = st
+			stAppName, stDbName = appName, dbName
 			return true
 		}
 		return false
@@ -892,10 +1207,12 @@
 
 	// SyncGroup not found.
 	if err != nil {
-		return interfaces.SyncGroup{}, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
+		return nullSG, "", nullGV, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
 	}
 
+	version := newSyncGroupVersion()
 	var sg *interfaces.SyncGroup
+
 	err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
 		var err error
 		sg, err = getSyncGroupById(ctx, tx, gid)
@@ -908,13 +1225,27 @@
 			return err
 		}
 
+		// Check that the SG is not in pending state.
+		state, err := getSGIdEntry(ctx, tx, gid)
+		if err != nil {
+			return err
+		}
+		if state.SyncPending {
+			return verror.NewErrBadState(ctx)
+		}
+
+		// Reserve a log generation and position counts for the new SyncGroup.
+		//gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
+		gen, pos := uint64(1), uint64(1)
+
 		// Add to joiner list.
 		sg.Joiners[joinerName] = joinerInfo
-		return setSGDataEntry(ctx, tx, sg.Id, sg)
+		return s.updateSyncGroupVersioning(ctx, tx, version, true, s.id, gen, pos, sg)
 	})
 
 	if err != nil {
-		return interfaces.SyncGroup{}, err
+		return nullSG, "", nullGV, err
 	}
-	return *sg, nil
+	// TODO(rdaoud): return the SyncGroup genvec
+	return *sg, version, nullGV, nil
 }