syncbase/vsync: SyncGroup and DAG changes for SG syncing

* Change SyncGroup storage to support their versioning and being tracked
  in DAG and log records.
* Update the SyncGroup join & publish calls to support the asynchronous
  transition from known-but-pending SyncGroup to caught-up after sync.
* Add a SyncGroup local state info to track the number of local peer
  joiners, and the watchability, remote publishing, and sync pending
  states of the SyncGroup.
* Add the retry loop for the SyncGroup publishing to the remote peer.
* Improve error checking of SyncGroup prefixes passed in the Spec.
* Add a 1st-cut HasKey() store util API to bypass the VOM decode.  This
  will be later further optimized for leveldb to avoid a copy of the
  value from the C buffer to the Go buffer.
* Add DAG support to fully prune all nodes for an object including its
  current head node.
* Remove temporay solution of the responder adding the initiator to the
  SyncGroup joiner list.
* Improve unittest coverage.

Change-Id: I4585e248bd9e15b8b9585ec7b1830f8225686b2a
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 6dd6f25..80b46a0 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -19,15 +19,36 @@
 
 	// SyncGroup-related methods.
 
-	// PublishSyncGroup is typically invoked on a "central" peer to publish
-	// the SyncGroup.
-	PublishSyncGroup(sg SyncGroup) error {access.Write}
+	// PublishSyncGroup is invoked on the SyncGroup name (typically served
+	// by a "central" peer) to publish the SyncGroup.  It takes the name of
+	// Syncbase doing the publishing (the publisher) and returns the name
+	// of the Syncbase where the SyncGroup is published (the publishee).
+	// This allows the publisher and the publishee to learn of each other.
+	// When a SyncGroup is published, the publishee is given the SyncGroup
+	// metadata, its current version at the publisher, and the current
+	// SyncGroup generation vector.  The generation vector serves as a
+	// checkpoint at the time of publishing.  The publishing proceeds
+	// asynchronously, and the publishee learns the SyncGroup history
+	// through the routine p2p sync process and determines when it has
+	// caught up to the level of knowledge at the time of publishing using
+	// the checkpointed generation vector.  Until that point, the publishee
+	// locally deems the SyncGroup to be in a pending state and does not
+	// mutate it.  Thus it locally rejects SyncGroup joins or updates to
+	// its spec until it is caught up on the SyncGroup history.
+	PublishSyncGroup(publisher string, sg SyncGroup, version string, genvec PrefixGenVector) (string | error) {access.Write}
 
 	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
 	// Syncbase on a SyncGroup admin. It checks whether the requestor is
 	// allowed to join the named SyncGroup, and if so, adds the requestor to
-	// the SyncGroup.
-	JoinSyncGroupAtAdmin(sgName, joinerName string, myInfo wire.SyncGroupMemberInfo) (SyncGroup | error) {access.Read}
+	// the SyncGroup.  It returns a copy of the updated SyncGroup metadata,
+	// its version, and the SyncGroup generation vector at the time of the
+	// join.  Similar to the PublishSyncGroup scenario, the joiner at that
+	// point does not have the SyncGroup history and locally deems it to be
+	// in a pending state and does not mutate it.  This means it rejects
+	// local updates to the SyncGroup spec or, if it were also an admin on
+	// the SyncGroup, it would reject SyncGroup joins until it is caught up
+	// on the SyncGroup history through p2p sync.
+	JoinSyncGroupAtAdmin(sgName, joinerName string, myInfo wire.SyncGroupMemberInfo) (sg SyncGroup, version string, genvec PrefixGenVector | error) {access.Read}
 
 	// BlobSync methods.
 
@@ -50,3 +71,7 @@
 	FetchBlobRecipe(br wire.BlobRef) stream<_, ChunkHash> error
 	FetchChunks() stream<ChunkHash, ChunkData> error
 }
+
+error (
+	DupSyncGroupPublish(name string) {"en": "duplicate publish on SyncGroup: {name}"}
+)
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index 3006f3c..102699b 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -12,14 +12,29 @@
 	"io"
 	"v.io/v23"
 	"v.io/v23/context"
+	"v.io/v23/i18n"
 	"v.io/v23/rpc"
 	"v.io/v23/vdl"
+	"v.io/v23/verror"
 
 	// VDL user imports
 	"v.io/v23/security/access"
 	"v.io/v23/services/syncbase/nosql"
 )
 
+var (
+	ErrDupSyncGroupPublish = verror.Register("v.io/x/ref/services/syncbase/server/interfaces.DupSyncGroupPublish", verror.NoRetry, "{1:}{2:} duplicate publish on SyncGroup: {3}")
+)
+
+func init() {
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDupSyncGroupPublish.ID), "{1:}{2:} duplicate publish on SyncGroup: {3}")
+}
+
+// NewErrDupSyncGroupPublish returns an error with the ErrDupSyncGroupPublish ID.
+func NewErrDupSyncGroupPublish(ctx *context.T, name string) error {
+	return verror.New(ErrDupSyncGroupPublish, ctx, name)
+}
+
 // SyncClientMethods is the client interface
 // containing Sync methods.
 //
@@ -30,14 +45,35 @@
 	// the missing log records when compared to the initiator's generation
 	// vector for one Database for either SyncGroup metadata or data.
 	GetDeltas(ctx *context.T, req DeltaReq, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
-	// PublishSyncGroup is typically invoked on a "central" peer to publish
-	// the SyncGroup.
-	PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
+	// PublishSyncGroup is invoked on the SyncGroup name (typically served
+	// by a "central" peer) to publish the SyncGroup.  It takes the name of
+	// Syncbase doing the publishing (the publisher) and returns the name
+	// of the Syncbase where the SyncGroup is published (the publishee).
+	// This allows the publisher and the publishee to learn of each other.
+	// When a SyncGroup is published, the publishee is given the SyncGroup
+	// metadata, its current version at the publisher, and the current
+	// SyncGroup generation vector.  The generation vector serves as a
+	// checkpoint at the time of publishing.  The publishing proceeds
+	// asynchronously, and the publishee learns the SyncGroup history
+	// through the routine p2p sync process and determines when it has
+	// caught up to the level of knowledge at the time of publishing using
+	// the checkpointed generation vector.  Until that point, the publishee
+	// locally deems the SyncGroup to be in a pending state and does not
+	// mutate it.  Thus it locally rejects SyncGroup joins or updates to
+	// its spec until it is caught up on the SyncGroup history.
+	PublishSyncGroup(ctx *context.T, publisher string, sg SyncGroup, version string, genvec PrefixGenVector, opts ...rpc.CallOpt) (string, error)
 	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
 	// Syncbase on a SyncGroup admin. It checks whether the requestor is
 	// allowed to join the named SyncGroup, and if so, adds the requestor to
-	// the SyncGroup.
-	JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (SyncGroup, error)
+	// the SyncGroup.  It returns a copy of the updated SyncGroup metadata,
+	// its version, and the SyncGroup generation vector at the time of the
+	// join.  Similar to the PublishSyncGroup scenario, the joiner at that
+	// point does not have the SyncGroup history and locally deems it to be
+	// in a pending state and does not mutate it.  This means it rejects
+	// local updates to the SyncGroup spec or, if it were also an admin on
+	// the SyncGroup, it would reject SyncGroup joins until it is caught up
+	// on the SyncGroup history through p2p sync.
+	JoinSyncGroupAtAdmin(ctx *context.T, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (sg SyncGroup, version string, genvec PrefixGenVector, err error)
 	// HaveBlob verifies that the peer has the requested blob, and if
 	// present, returns its size.
 	HaveBlob(ctx *context.T, br nosql.BlobRef, opts ...rpc.CallOpt) (int64, error)
@@ -80,13 +116,13 @@
 	return
 }
 
-func (c implSyncClientStub) PublishSyncGroup(ctx *context.T, i0 SyncGroup, opts ...rpc.CallOpt) (err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "PublishSyncGroup", []interface{}{i0}, nil, opts...)
+func (c implSyncClientStub) PublishSyncGroup(ctx *context.T, i0 string, i1 SyncGroup, i2 string, i3 PrefixGenVector, opts ...rpc.CallOpt) (o0 string, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "PublishSyncGroup", []interface{}{i0, i1, i2, i3}, []interface{}{&o0}, opts...)
 	return
 }
 
-func (c implSyncClientStub) JoinSyncGroupAtAdmin(ctx *context.T, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (o0 SyncGroup, err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroupAtAdmin", []interface{}{i0, i1, i2}, []interface{}{&o0}, opts...)
+func (c implSyncClientStub) JoinSyncGroupAtAdmin(ctx *context.T, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo, opts ...rpc.CallOpt) (o0 SyncGroup, o1 string, o2 PrefixGenVector, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "JoinSyncGroupAtAdmin", []interface{}{i0, i1, i2}, []interface{}{&o0, &o1, &o2}, opts...)
 	return
 }
 
@@ -440,14 +476,35 @@
 	// the missing log records when compared to the initiator's generation
 	// vector for one Database for either SyncGroup metadata or data.
 	GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, req DeltaReq, initiator string) error
-	// PublishSyncGroup is typically invoked on a "central" peer to publish
-	// the SyncGroup.
-	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
+	// PublishSyncGroup is invoked on the SyncGroup name (typically served
+	// by a "central" peer) to publish the SyncGroup.  It takes the name of
+	// Syncbase doing the publishing (the publisher) and returns the name
+	// of the Syncbase where the SyncGroup is published (the publishee).
+	// This allows the publisher and the publishee to learn of each other.
+	// When a SyncGroup is published, the publishee is given the SyncGroup
+	// metadata, its current version at the publisher, and the current
+	// SyncGroup generation vector.  The generation vector serves as a
+	// checkpoint at the time of publishing.  The publishing proceeds
+	// asynchronously, and the publishee learns the SyncGroup history
+	// through the routine p2p sync process and determines when it has
+	// caught up to the level of knowledge at the time of publishing using
+	// the checkpointed generation vector.  Until that point, the publishee
+	// locally deems the SyncGroup to be in a pending state and does not
+	// mutate it.  Thus it locally rejects SyncGroup joins or updates to
+	// its spec until it is caught up on the SyncGroup history.
+	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg SyncGroup, version string, genvec PrefixGenVector) (string, error)
 	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
 	// Syncbase on a SyncGroup admin. It checks whether the requestor is
 	// allowed to join the named SyncGroup, and if so, adds the requestor to
-	// the SyncGroup.
-	JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
+	// the SyncGroup.  It returns a copy of the updated SyncGroup metadata,
+	// its version, and the SyncGroup generation vector at the time of the
+	// join.  Similar to the PublishSyncGroup scenario, the joiner at that
+	// point does not have the SyncGroup history and locally deems it to be
+	// in a pending state and does not mutate it.  This means it rejects
+	// local updates to the SyncGroup spec or, if it were also an admin on
+	// the SyncGroup, it would reject SyncGroup joins until it is caught up
+	// on the SyncGroup history through p2p sync.
+	JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (sg SyncGroup, version string, genvec PrefixGenVector, err error)
 	// HaveBlob verifies that the peer has the requested blob, and if
 	// present, returns its size.
 	HaveBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) (int64, error)
@@ -475,14 +532,35 @@
 	// the missing log records when compared to the initiator's generation
 	// vector for one Database for either SyncGroup metadata or data.
 	GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, req DeltaReq, initiator string) error
-	// PublishSyncGroup is typically invoked on a "central" peer to publish
-	// the SyncGroup.
-	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
+	// PublishSyncGroup is invoked on the SyncGroup name (typically served
+	// by a "central" peer) to publish the SyncGroup.  It takes the name of
+	// Syncbase doing the publishing (the publisher) and returns the name
+	// of the Syncbase where the SyncGroup is published (the publishee).
+	// This allows the publisher and the publishee to learn of each other.
+	// When a SyncGroup is published, the publishee is given the SyncGroup
+	// metadata, its current version at the publisher, and the current
+	// SyncGroup generation vector.  The generation vector serves as a
+	// checkpoint at the time of publishing.  The publishing proceeds
+	// asynchronously, and the publishee learns the SyncGroup history
+	// through the routine p2p sync process and determines when it has
+	// caught up to the level of knowledge at the time of publishing using
+	// the checkpointed generation vector.  Until that point, the publishee
+	// locally deems the SyncGroup to be in a pending state and does not
+	// mutate it.  Thus it locally rejects SyncGroup joins or updates to
+	// its spec until it is caught up on the SyncGroup history.
+	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg SyncGroup, version string, genvec PrefixGenVector) (string, error)
 	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
 	// Syncbase on a SyncGroup admin. It checks whether the requestor is
 	// allowed to join the named SyncGroup, and if so, adds the requestor to
-	// the SyncGroup.
-	JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
+	// the SyncGroup.  It returns a copy of the updated SyncGroup metadata,
+	// its version, and the SyncGroup generation vector at the time of the
+	// join.  Similar to the PublishSyncGroup scenario, the joiner at that
+	// point does not have the SyncGroup history and locally deems it to be
+	// in a pending state and does not mutate it.  This means it rejects
+	// local updates to the SyncGroup spec or, if it were also an admin on
+	// the SyncGroup, it would reject SyncGroup joins until it is caught up
+	// on the SyncGroup history through p2p sync.
+	JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (sg SyncGroup, version string, genvec PrefixGenVector, err error)
 	// HaveBlob verifies that the peer has the requested blob, and if
 	// present, returns its size.
 	HaveBlob(ctx *context.T, call rpc.ServerCall, br nosql.BlobRef) (int64, error)
@@ -534,11 +612,11 @@
 	return s.impl.GetDeltas(ctx, call, i0, i1)
 }
 
-func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
-	return s.impl.PublishSyncGroup(ctx, call, i0)
+func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 string, i1 SyncGroup, i2 string, i3 PrefixGenVector) (string, error) {
+	return s.impl.PublishSyncGroup(ctx, call, i0, i1, i2, i3)
 }
 
-func (s implSyncServerStub) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo) (SyncGroup, error) {
+func (s implSyncServerStub) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, i0 string, i1 string, i2 nosql.SyncGroupMemberInfo) (SyncGroup, string, PrefixGenVector, error) {
 	return s.impl.JoinSyncGroupAtAdmin(ctx, call, i0, i1, i2)
 }
 
@@ -586,22 +664,30 @@
 		},
 		{
 			Name: "PublishSyncGroup",
-			Doc:  "// PublishSyncGroup is typically invoked on a \"central\" peer to publish\n// the SyncGroup.",
+			Doc:  "// PublishSyncGroup is invoked on the SyncGroup name (typically served\n// by a \"central\" peer) to publish the SyncGroup.  It takes the name of\n// Syncbase doing the publishing (the publisher) and returns the name\n// of the Syncbase where the SyncGroup is published (the publishee).\n// This allows the publisher and the publishee to learn of each other.\n// When a SyncGroup is published, the publishee is given the SyncGroup\n// metadata, its current version at the publisher, and the current\n// SyncGroup generation vector.  The generation vector serves as a\n// checkpoint at the time of publishing.  The publishing proceeds\n// asynchronously, and the publishee learns the SyncGroup history\n// through the routine p2p sync process and determines when it has\n// caught up to the level of knowledge at the time of publishing using\n// the checkpointed generation vector.  Until that point, the publishee\n// locally deems the SyncGroup to be in a pending state and does not\n// mutate it.  Thus it locally rejects SyncGroup joins or updates to\n// its spec until it is caught up on the SyncGroup history.",
 			InArgs: []rpc.ArgDesc{
-				{"sg", ``}, // SyncGroup
+				{"publisher", ``}, // string
+				{"sg", ``},        // SyncGroup
+				{"version", ``},   // string
+				{"genvec", ``},    // PrefixGenVector
+			},
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // string
 			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
 		},
 		{
 			Name: "JoinSyncGroupAtAdmin",
-			Doc:  "// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's\n// Syncbase on a SyncGroup admin. It checks whether the requestor is\n// allowed to join the named SyncGroup, and if so, adds the requestor to\n// the SyncGroup.",
+			Doc:  "// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's\n// Syncbase on a SyncGroup admin. It checks whether the requestor is\n// allowed to join the named SyncGroup, and if so, adds the requestor to\n// the SyncGroup.  It returns a copy of the updated SyncGroup metadata,\n// its version, and the SyncGroup generation vector at the time of the\n// join.  Similar to the PublishSyncGroup scenario, the joiner at that\n// point does not have the SyncGroup history and locally deems it to be\n// in a pending state and does not mutate it.  This means it rejects\n// local updates to the SyncGroup spec or, if it were also an admin on\n// the SyncGroup, it would reject SyncGroup joins until it is caught up\n// on the SyncGroup history through p2p sync.",
 			InArgs: []rpc.ArgDesc{
 				{"sgName", ``},     // string
 				{"joinerName", ``}, // string
 				{"myInfo", ``},     // nosql.SyncGroupMemberInfo
 			},
 			OutArgs: []rpc.ArgDesc{
-				{"", ``}, // SyncGroup
+				{"sg", ``},      // SyncGroup
+				{"version", ``}, // string
+				{"genvec", ``},  // PrefixGenVector
 			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index b3d96d4..15fa69a 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -52,6 +52,21 @@
 	return nil
 }
 
+// Exists returns true if the key exists in the store.
+// TODO(rdaoud): for now it only bypasses the Get's VOM decode step.  It should
+// be optimized further by adding a st.Exists(k) API and let each implementation
+// do its best to reduce data fetching in its key lookup.
+func Exists(ctx *context.T, st store.StoreReader, k string) (bool, error) {
+	_, err := st.Get([]byte(k), nil)
+	if err != nil {
+		if verror.ErrorID(err) == store.ErrUnknownKey.ID {
+			return false, nil
+		}
+		return false, verror.New(verror.ErrInternal, ctx, err)
+	}
+	return true, nil
+}
+
 // GetWithAuth does Get followed by an auth check.
 func GetWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, k string, v Permser) error {
 	if err := Get(ctx, st, k, v); err != nil {
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 22b53f4..3178cb7 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -8,6 +8,7 @@
 	"fmt"
 	"math"
 	"sync"
+	"time"
 
 	"v.io/v23/context"
 	"v.io/v23/verror"
@@ -173,6 +174,12 @@
 	return tx.itx.Abort()
 }
 
+// GetStoreTime returns the current time from the given transaction store.
+func GetStoreTime(ctx *context.T, tx store.Transaction) time.Time {
+	wtx := tx.(*transaction)
+	return wtx.st.clock.Now(ctx)
+}
+
 // AddSyncGroupOp injects a SyncGroup operation notification in the log entries
 // that the transaction writes when it is committed.  It allows the SyncGroup
 // operations (create, join, leave, destroy) to notify the sync watcher of the
diff --git a/services/syncbase/vsync/dag.go b/services/syncbase/vsync/dag.go
index 40ef3b3..086296c 100644
--- a/services/syncbase/vsync/dag.go
+++ b/services/syncbase/vsync/dag.go
@@ -279,13 +279,6 @@
 		}
 	}
 
-	// The new node must not exist.
-	if ok, err := hasNode(ctx, tx, oid, version); err != nil {
-		return err
-	} else if ok {
-		return verror.New(verror.ErrInternal, ctx, "DAG node already exists", oid, version)
-	}
-
 	// Verify the parents, determine the node level.  Also save the levels
 	// of the parent nodes for later in this function in graft updates.
 	parentLevels := make(map[string]uint64)
@@ -620,10 +613,14 @@
 
 // prune trims the DAG of an object at a given version (node) by deleting all
 // its ancestor nodes, making it the new root node.  For each deleted node it
-// calls the given callback function to delete its log record.
+// calls the given callback function to delete its log record.  If NoVersion
+// is given instead, then all object nodes are deleted, including the head node.
 //
-// Note: this function should only be used when sync determines that all devices
-// that know about this object have gotten past this version.
+// Note: this function is typically used when sync determines that all devices
+// that know about this object have gotten past this version, as part of its
+// GC operations.  It can also be used when an object history is obliterated,
+// for example when destroying a SyncGroup, which is also versioned and tracked
+// in the DAG.
 //
 // The batch set passed is used to track batches affected by the deletion of DAG
 // objects across multiple calls to prune().  It is later given to pruneDone()
@@ -633,21 +630,33 @@
 		return verror.New(verror.ErrInternal, ctx, "missing batch set")
 	}
 
-	// Get the node at the pruning point and set its parents to nil.
-	// It will become the oldest DAG node (root) for the object.
-	node, err := getNode(ctx, tx, oid, version)
-	if err != nil {
-		return err
-	}
-	if node.Parents == nil {
-		// Nothing to do, this node is already the root.
-		return nil
-	}
-
-	parents := node.Parents
-	node.Parents = nil
-	if err = setNode(ctx, tx, oid, version, node); err != nil {
-		return err
+	var parents []string
+	if version == NoVersion {
+		// Delete all object versions including its head version.
+		head, err := getHead(ctx, tx, oid)
+		if err != nil {
+			return err
+		}
+		if err := delHead(ctx, tx, oid); err != nil {
+			return err
+		}
+		parents = []string{head}
+	} else {
+		// Get the node at the pruning point and set its parents to nil.
+		// It will become the oldest DAG node (root) for the object.
+		node, err := getNode(ctx, tx, oid, version)
+		if err != nil {
+			return err
+		}
+		if node.Parents == nil {
+			// Nothing to do, this node is already the root.
+			return nil
+		}
+		parents = node.Parents
+		node.Parents = nil
+		if err = setNode(ctx, tx, oid, version, node); err != nil {
+			return err
+		}
 	}
 
 	// Delete all ancestor nodes and their log records. Delete as many as
@@ -724,7 +733,7 @@
 // setNode stores the DAG node entry.
 func setNode(ctx *context.T, tx store.Transaction, oid, version string, node *dagNode) error {
 	if version == NoVersion {
-		return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+		vlog.Fatalf("sync: setNode: invalid version: %s", version)
 	}
 
 	return util.Put(ctx, tx, nodeKey(oid, version), node)
@@ -733,7 +742,7 @@
 // getNode retrieves the DAG node entry for the given (oid, version).
 func getNode(ctx *context.T, st store.StoreReader, oid, version string) (*dagNode, error) {
 	if version == NoVersion {
-		return nil, verror.New(verror.ErrInternal, ctx, "invalid version", version)
+		vlog.Fatalf("sync: getNode: invalid version: %s", version)
 	}
 
 	var node dagNode
@@ -747,7 +756,7 @@
 // delNode deletes the DAG node entry.
 func delNode(ctx *context.T, tx store.Transaction, oid, version string) error {
 	if version == NoVersion {
-		return verror.New(verror.ErrInternal, ctx, "invalid version", version)
+		vlog.Fatalf("sync: delNode: invalid version: %s", version)
 	}
 
 	return util.Delete(ctx, tx, nodeKey(oid, version))
@@ -755,14 +764,11 @@
 
 // hasNode returns true if the node (oid, version) exists in the DAG.
 func hasNode(ctx *context.T, st store.StoreReader, oid, version string) (bool, error) {
-	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
-	if _, err := getNode(ctx, st, oid, version); err != nil {
-		if verror.ErrorID(err) == verror.ErrNoExist.ID {
-			err = nil
-		}
-		return false, err
+	if version == NoVersion {
+		vlog.Fatalf("sync: hasNode: invalid version: %s", version)
 	}
-	return true, nil
+
+	return util.Exists(ctx, st, nodeKey(oid, version))
 }
 
 // headKey returns the key used to access the DAG object head.
@@ -773,7 +779,7 @@
 // setHead stores version as the DAG object head.
 func setHead(ctx *context.T, tx store.Transaction, oid, version string) error {
 	if version == NoVersion {
-		return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version))
+		vlog.Fatalf("sync: setHead: invalid version: %s", version)
 	}
 
 	return util.Put(ctx, tx, headKey(oid), version)
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index c72b3a7..57291a4 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -255,11 +255,7 @@
 		t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
 	}
 
-	// Make sure an existing node cannot be added again.
 	tx := st.NewTransaction()
-	if err := s.addNode(nil, tx, oid, "2", "foo", false, []string{"1", "3"}, NoBatchId, nil); err == nil {
-		t.Errorf("addNode() did not fail when given an existing node")
-	}
 
 	// Make sure a new node cannot have more than 2 parents.
 	if err := s.addNode(nil, tx, oid, "4", "foo", false, []string{"1", "2", "3"}, NoBatchId, nil); err == nil {
@@ -780,10 +776,10 @@
 	exp := map[string][]string{"1": nil, "2": {"1"}, "3": {"2"}, "4": {"2"}, "5": {"3", "4"}, "6": {"5"}, "7": {"2"}, "8": {"6", "7"}, "9": {"8"}}
 
 	// Loop pruning at an invalid version (333) then at different valid versions.
-	testVersions := []string{"333", "1", "2", "6", "8", "9", "9"}
-	delCounts := []int{0, 0, 1, 4, 2, 1, 0}
-	which := "prune-snip-"
-	remain := 9
+	// The last version used (NoVersion) deletes all remaining nodes for the
+	// object, including the head node.
+	testVersions := []string{"333", "1", "2", "6", "8", "9", "9", NoVersion}
+	delCounts := []int{0, 0, 1, 4, 2, 1, 0, 1}
 
 	for i, version := range testVersions {
 		batches := newBatchPruning()
@@ -807,11 +803,13 @@
 				oid, version, del, delCounts[i])
 		}
 
-		which += "*"
-		remain -= del
-
-		if head, err := getHead(nil, st, oid); err != nil || head != "9" {
-			t.Errorf("object %s has wrong head: %s", oid, head)
+		head, err := getHead(nil, st, oid)
+		if version != NoVersion {
+			if err != nil || head != "9" {
+				t.Errorf("object %s has wrong head: %s", oid, head)
+			}
+		} else if err == nil {
+			t.Errorf("found head %s for object %s after pruning all versions", head, oid)
 		}
 
 		tx = st.NewTransaction()
@@ -823,16 +821,20 @@
 
 		// Remove pruned nodes from the expected parent map used to validate
 		// and set the parents of the pruned node to nil.
-		intVersion, err := strconv.ParseInt(version, 10, 32)
-		if err != nil {
-			t.Errorf("invalid version: %s", version)
-		}
-
-		if intVersion < 10 {
-			for j := int64(0); j < intVersion; j++ {
-				delete(exp, fmt.Sprintf("%d", j))
+		if version == NoVersion {
+			exp = make(map[string][]string)
+		} else {
+			intVersion, err := strconv.ParseInt(version, 10, 32)
+			if err != nil {
+				t.Errorf("invalid version: %s", version)
 			}
-			exp[version] = nil
+
+			if intVersion < 10 {
+				for j := int64(0); j < intVersion; j++ {
+					delete(exp, fmt.Sprintf("%d", j))
+				}
+				exp[version] = nil
+			}
 		}
 
 		pmap := getParentMap(nil, st, oid, nil)
@@ -903,6 +905,16 @@
 	if !reflect.DeepEqual(pmap, exp) {
 		t.Errorf("invalid object %s parent map: (%v) instead of (%v)", oid, pmap, exp)
 	}
+
+	// Invalid pruning without a batch set.
+	tx = st.NewTransaction()
+	err = prune(nil, tx, oid, version, nil, func(ctx *context.T, tx store.Transaction, lr string) error {
+		return nil
+	})
+	if err == nil {
+		t.Errorf("pruning object %s:%s without a batch set did not fail", oid, version)
+	}
+	tx.Abort()
 }
 
 // TestRemoteLinkedNoConflictSameHead tests sync of remote updates that contain
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 935539b..7c1a045 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -457,7 +457,7 @@
 	}
 
 	tx := svc.St().NewTransaction()
-	if err := addSyncGroup(nil, tx, sg1); err != nil {
+	if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg1); err != nil {
 		t.Fatalf("cannot add SyncGroup ID %d, err %v", sg1.Id, err)
 	}
 	if err := tx.Commit(); err != nil {
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 46d03a1..0c78f52 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -226,7 +226,11 @@
 	}
 
 	err := store.RunInTransaction(rSt.st, func(tx store.Transaction) error {
-		sg, err := getSyncGroupById(ctx, tx, gid)
+		version, err := getSyncGroupVersion(ctx, tx, gid)
+		if err != nil {
+			return err
+		}
+		sg, err := getSGDataEntry(ctx, tx, gid, version)
 		if err != nil {
 			return err
 		}
@@ -239,7 +243,7 @@
 
 		vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
 		sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
-		return setSGDataEntry(ctx, tx, gid, sg)
+		return setSGDataEntry(ctx, tx, gid, version, sg)
 	})
 
 	if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index f2e72ae..0b73aad 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -12,6 +12,7 @@
 // records in response to a GetDeltas request, it replays those log
 // records to get in sync with the sender.
 import (
+	"container/list"
 	"fmt"
 	"math/rand"
 	"path"
@@ -73,6 +74,11 @@
 	syncState     map[string]*dbSyncStateInMem
 	syncStateLock sync.Mutex // lock to protect access to the sync state.
 
+	// In-memory queue of SyncGroups to be published.  It is reconstructed
+	// at startup from SyncGroup info so it does not need to be persisted.
+	sgPublishQueue     *list.List
+	sgPublishQueueLock sync.Mutex
+
 	// In-memory tracking of batches during their construction.
 	// The sync Initiator and Watcher build batches incrementally here
 	// and then persist them in DAG batch entries.  The mutex guards
@@ -124,8 +130,9 @@
 // sync module responds to incoming RPCs from remote sync modules.
 func New(ctx *context.T, call rpc.ServerCall, sv interfaces.Service, rootDir string) (*syncService, error) {
 	s := &syncService{
-		sv:      sv,
-		batches: make(batchSet),
+		sv:             sv,
+		batches:        make(batchSet),
+		sgPublishQueue: list.New(),
 	}
 
 	data := &syncData{}
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index 1eebf88..d820d8f 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -53,6 +53,7 @@
 import (
 	"fmt"
 	"strconv"
+	"time"
 
 	"v.io/v23/context"
 	"v.io/v23/verror"
@@ -104,12 +105,23 @@
 	return out
 }
 
+// sgPublishInfo holds information on a SyncGroup waiting to be published to a
+// remote peer.  It is an in-memory entry in a queue of pending SyncGroups.
+type sgPublishInfo struct {
+	sgName  string
+	appName string
+	dbName  string
+	queued  time.Time
+	lastTry time.Time
+}
+
 // initSync initializes the sync module during startup. It scans all the
 // databases across all apps to initialize the following:
 // a) in-memory sync state of a Database and all its SyncGroups consisting of
 // the current generation number, log position and generation vector.
 // b) watcher map of prefixes currently being synced.
 // c) republish names in mount tables for all syncgroups.
+// d) in-memory queue of SyncGroups to be published.
 //
 // TODO(hpucha): This is incomplete. Flesh this out further.
 func (s *syncService) initSync(ctx *context.T) error {
@@ -133,6 +145,10 @@
 			for _, prefix := range sg.Spec.Prefixes {
 				incrWatchPrefix(appName, dbName, prefix)
 			}
+
+			if sg.Status == interfaces.SyncGroupStatusPublishPending {
+				s.enqueuePublishSyncGroup(sg.Name, appName, dbName, false)
+			}
 			return false
 		})
 
@@ -164,6 +180,23 @@
 	return errFinal
 }
 
+// enqueuePublishSyncGroup appends the given SyncGroup to the publish queue.
+func (s *syncService) enqueuePublishSyncGroup(sgName, appName, dbName string, attempted bool) {
+	s.sgPublishQueueLock.Lock()
+	defer s.sgPublishQueueLock.Unlock()
+
+	entry := &sgPublishInfo{
+		sgName:  sgName,
+		appName: appName,
+		dbName:  dbName,
+		queued:  time.Now(),
+	}
+	if attempted {
+		entry.lastTry = entry.queued
+	}
+	s.sgPublishQueue.PushBack(entry)
+}
+
 // Note: For all the utilities below, if the sgid parameter is non-nil, the
 // operation is performed in the SyncGroup space. If nil, it is performed in the
 // data space for the Database.
@@ -420,15 +453,7 @@
 
 // hasLogRec returns true if the log record for (devid, gen) exists.
 func hasLogRec(st store.StoreReader, pfx string, id, gen uint64) (bool, error) {
-	// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
-	var rec localLogRec
-	if err := util.Get(nil, st, logRecKey(pfx, id, gen), &rec); err != nil {
-		if verror.ErrorID(err) == verror.ErrNoExist.ID {
-			err = nil
-		}
-		return false, err
-	}
-	return true, nil
+	return util.Exists(nil, st, logRecKey(pfx, id, gen))
 }
 
 // putLogRec stores the log record.
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 91ba499..2fb5479 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -81,6 +81,7 @@
 }
 
 // verifySyncGroup verifies if a SyncGroup struct is well-formed.
+// TODO(rdaoud): define verrors for all ErrBadArg cases.
 func verifySyncGroup(ctx *context.T, sg *interfaces.SyncGroup) error {
 	if sg == nil {
 		return verror.New(verror.ErrBadArg, ctx, "group information not specified")
@@ -106,40 +107,157 @@
 	if len(sg.Joiners) == 0 {
 		return verror.New(verror.ErrBadArg, ctx, "group has no joiners")
 	}
-	if len(sg.Spec.Prefixes) == 0 {
+	return verifySyncGroupSpec(ctx, &sg.Spec)
+}
+
+// verifySyncGroupSpec verifies if a SyncGroupSpec is well-formed.
+func verifySyncGroupSpec(ctx *context.T, spec *wire.SyncGroupSpec) error {
+	if spec == nil {
+		return verror.New(verror.ErrBadArg, ctx, "group spec not specified")
+	}
+	if len(spec.Prefixes) == 0 {
 		return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
 	}
+
+	// Duplicate prefixes are not allowed.
+	prefixes := make(map[string]bool, len(spec.Prefixes))
+	for _, pfx := range spec.Prefixes {
+		prefixes[pfx] = true
+	}
+	if len(prefixes) != len(spec.Prefixes) {
+		return verror.New(verror.ErrBadArg, ctx, "group has duplicate prefixes specified")
+	}
 	return nil
 }
 
-// addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error {
-	// Verify SyncGroup before storing it since it may have been received
-	// from a remote peer.
+// samePrefixes returns true if the two sets of prefixes are the same.
+func samePrefixes(pfx1, pfx2 []string) bool {
+	pfxMap := make(map[string]uint8)
+	for _, p := range pfx1 {
+		pfxMap[p] |= 0x01
+	}
+	for _, p := range pfx2 {
+		pfxMap[p] |= 0x02
+	}
+	for _, mask := range pfxMap {
+		if mask != 0x03 {
+			return false
+		}
+	}
+	return true
+}
+
+// addSyncGroup adds a new SyncGroup given its version and information.  This
+// also includes creating a DAG node entry and updating the DAG head.  If the
+// caller is the creator of the SyncGroup, a local log record is also created
+// using the given server ID and gen and pos counters to index the log record.
+// Otherwise, it's a joiner case and the SyncGroup is put in a pending state
+// (waiting for its full metadata to be synchronized) and the log record is
+// skipped, delaying its creation till the Initiator does p2p sync.
+func (s *syncService) addSyncGroup(ctx *context.T, tx store.Transaction, version string, creator bool, remotePublisher string, genvec interfaces.PrefixGenVector, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+	// Verify the SyncGroup information before storing it since it may have
+	// been received from a remote peer.
 	if err := verifySyncGroup(ctx, sg); err != nil {
 		return err
 	}
 
-	if ok, err := hasSGDataEntry(tx, sg.Id); err != nil {
-		return err
-	} else if ok {
-		return verror.New(verror.ErrExist, ctx, "group id already exists")
-	}
+	// Add the group name and ID entries.
 	if ok, err := hasSGNameEntry(tx, sg.Name); err != nil {
 		return err
 	} else if ok {
 		return verror.New(verror.ErrExist, ctx, "group name already exists")
 	}
+	if ok, err := hasSGIdEntry(tx, sg.Id); err != nil {
+		return err
+	} else if ok {
+		return verror.New(verror.ErrExist, ctx, "group id already exists")
+	}
 
-	// Add the group name and data entries.
+	state := sgLocalState{
+		RemotePublisher: remotePublisher,
+		SyncPending:     !creator,
+		PendingGenVec:   genvec,
+	}
+	if remotePublisher == "" {
+		state.NumLocalJoiners = 1
+	}
+
 	if err := setSGNameEntry(ctx, tx, sg.Name, sg.Id); err != nil {
 		return err
 	}
-	if err := setSGDataEntry(ctx, tx, sg.Id, sg); err != nil {
+	if err := setSGIdEntry(ctx, tx, sg.Id, &state); err != nil {
 		return err
 	}
 
-	return nil
+	// Add the SyncGroup versioned data entry.
+	if ok, err := hasSGDataEntry(tx, sg.Id, version); err != nil {
+		return err
+	} else if ok {
+		return verror.New(verror.ErrExist, ctx, "group id version already exists")
+	}
+
+	return s.updateSyncGroupVersioning(ctx, tx, version, creator, servId, gen, pos, sg)
+}
+
+// updateSyncGroupVersioning updates the per-version information of a SyncGroup.
+// It writes a new versioned copy of the SyncGroup data entry, a new DAG node,
+// and updates the DAG head.  Optionally, it also writes a new local log record
+// using the given server ID and gen and pos counters to index it.  The caller
+// can provide the version number to use otherwise, if NoVersion is given, a new
+// version is generated by the function.
+// TODO(rdaoud): hook SyncGroup mutations (and deletions) to the watch log so
+// apps can monitor SG changes as well.
+func (s *syncService) updateSyncGroupVersioning(ctx *context.T, tx store.Transaction, version string, withLog bool, servId, gen, pos uint64, sg *interfaces.SyncGroup) error {
+	if version == NoVersion {
+		version = newSyncGroupVersion()
+	}
+
+	// Add the SyncGroup versioned data entry.
+	if err := setSGDataEntry(ctx, tx, sg.Id, version, sg); err != nil {
+		return err
+	}
+
+	// Add a sync log record for the SyncGroup if needed.
+	oid := sgIdKey(sg.Id)
+	logKey := ""
+	if withLog {
+		if err := addSyncGroupLogRec(ctx, tx, sg.Id, version, servId, gen, pos); err != nil {
+			return err
+		}
+		logKey = logRecKey(oid, servId, gen)
+	}
+
+	// Add the SyncGroup to the DAG.
+	var parents []string
+	if head, err := getHead(ctx, tx, oid); err == nil {
+		parents = []string{head}
+	} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
+		return err
+	}
+	if err := s.addNode(ctx, tx, oid, version, logKey, false, parents, NoBatchId, nil); err != nil {
+		return err
+	}
+	return setHead(ctx, tx, oid, version)
+}
+
+// addSyncGroupLogRec adds a new local log record for a SyncGroup.
+func addSyncGroupLogRec(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, servId, gen, pos uint64) error {
+	oid := sgIdKey(gid)
+	rec := &localLogRec{
+		Metadata: interfaces.LogRecMetadata{
+			ObjId:   oid,
+			CurVers: version,
+			Delete:  false,
+			UpdTime: watchable.GetStoreTime(ctx, tx),
+			Id:      servId,
+			Gen:     gen,
+			RecType: interfaces.NodeRec,
+			BatchId: NoBatchId,
+		},
+		Pos: pos,
+	}
+
+	return putLogRec(ctx, tx, oid, rec)
 }
 
 // getSyncGroupId retrieves the SyncGroup ID given its name.
@@ -147,18 +265,18 @@
 	return getSGNameEntry(ctx, st, name)
 }
 
-// getSyncGroupName retrieves the SyncGroup name given its ID.
-func getSyncGroupName(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
-	sg, err := getSyncGroupById(ctx, st, gid)
-	if err != nil {
-		return "", err
-	}
-	return sg.Name, nil
+// getSyncGroupVersion retrieves the current version of the SyncGroup.
+func getSyncGroupVersion(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
+	return getHead(ctx, st, sgIdKey(gid))
 }
 
 // getSyncGroupById retrieves the SyncGroup given its ID.
 func getSyncGroupById(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
-	return getSGDataEntry(ctx, st, gid)
+	version, err := getSyncGroupVersion(ctx, st, gid)
+	if err != nil {
+		return nil, err
+	}
+	return getSGDataEntry(ctx, st, gid, version)
 }
 
 // getSyncGroupByName retrieves the SyncGroup given its name.
@@ -176,19 +294,53 @@
 	if err != nil {
 		return err
 	}
-	if err = delSGNameEntry(ctx, tx, sg.Name); err != nil {
-		return err
-	}
-	return delSGDataEntry(ctx, tx, sg.Id)
+	return delSyncGroupByName(ctx, tx, sg.Name)
 }
 
 // delSyncGroupByName deletes the SyncGroup given its name.
 func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
+	// Get the SyncGroup ID and current version.
 	gid, err := getSyncGroupId(ctx, tx, name)
 	if err != nil {
 		return err
 	}
-	return delSyncGroupById(ctx, tx, gid)
+	version, err := getSyncGroupVersion(ctx, tx, gid)
+	if err != nil {
+		return err
+	}
+
+	// Delete the name and ID entries.
+	if err := delSGNameEntry(ctx, tx, name); err != nil {
+		return err
+	}
+	if err := delSGIdEntry(ctx, tx, gid); err != nil {
+		return err
+	}
+
+	// Delete all versioned SyncGroup data entries (same versions as DAG
+	// nodes).  This is done separately from pruning the DAG nodes because
+	// some nodes may have no log record pointing back to the SyncGroup data
+	// entries (loose coupling to support the pending SyncGroup state).
+	oid := sgIdKey(gid)
+	err = forEachAncestor(ctx, tx, oid, []string{version}, func(v string, nd *dagNode) error {
+		return delSGDataEntry(ctx, tx, gid, v)
+	})
+	if err != nil {
+		return err
+	}
+
+	// Delete all DAG nodes and log records.
+	bset := newBatchPruning()
+	err = prune(ctx, tx, oid, NoVersion, bset, func(ctx *context.T, tx store.Transaction, lr string) error {
+		if lr != "" {
+			return util.Delete(ctx, tx, lr)
+		}
+		return nil
+	})
+	if err != nil {
+		return err
+	}
+	return pruneDone(ctx, tx, bset)
 }
 
 // refreshMembersIfExpired updates the aggregate view of SyncGroup members
@@ -252,16 +404,23 @@
 // make forEachSyncGroup() stop the iteration earlier; otherwise the function
 // loops across all SyncGroups in the Database.
 func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
-	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix, "")
-	stream := st.Scan(scanStart, scanLimit)
+	stream := st.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+	defer stream.Cancel()
+
 	for stream.Advance() {
-		var sg interfaces.SyncGroup
-		if vom.Decode(stream.Value(nil), &sg) != nil {
-			vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup value for key %s", string(stream.Key(nil)))
+		var gid interfaces.GroupId
+		if vom.Decode(stream.Value(nil), &gid) != nil {
+			vlog.Errorf("sync: forEachSyncGroup: invalid SyncGroup ID for key %s", string(stream.Key(nil)))
 			continue
 		}
 
-		if callback(&sg) {
+		sg, err := getSyncGroupById(nil, st, gid)
+		if err != nil {
+			vlog.Errorf("sync: forEachSyncGroup: cannot get SyncGroup %d: %v", gid, err)
+			continue
+		}
+
+		if callback(sg) {
 			break // done, early exit
 		}
 	}
@@ -324,63 +483,69 @@
 // Use the functions above to manipulate SyncGroups.
 
 var (
-	// sgDataKeyScanPrefix is the prefix used to scan SyncGroup data entries.
-	sgDataKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
-
-	// sgNameKeyScanPrefix is the prefix used to scan SyncGroup name entries.
-	sgNameKeyScanPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+	// Prefixes used to store the different mappings of a SyncGroup:
+	// sgNameKeyPrefix: name --> ID
+	// sgIdKeyPrefix: ID --> SyncGroup local state
+	// sgDataKeyPrefix: (ID, version) --> SyncGroup data (synchronized)
+	//
+	// Note: as with other syncable objects, the DAG "heads" table contains
+	// a reference to the current SyncGroup version, and the DAG "nodes"
+	// table tracks its history of mutations.
+	// TODO(rdaoud): change the data key prefix to use the SG OID instead
+	// of its ID, to be similar to the versioned user data keys.  The OID
+	// would use another SG-data prefix: "$sync:sgd:<gid>" and the data
+	// entry: "$sync:sgd:<gid>:<version>" (i.e. <oid>:<version>).
+	sgNameKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n")
+	sgIdKeyPrefix   = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "i")
+	sgDataKeyPrefix = util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d")
 )
 
-// sgDataKey returns the key used to access the SyncGroup data entry.
-func sgDataKey(gid interfaces.GroupId) string {
-	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "d", fmt.Sprintf("%d", gid))
+// sgIdStr returns the SyncGroup ID in string format.
+// TODO(rdaoud): delete when the SG ID becomes a string throughout.
+func sgIdStr(gid interfaces.GroupId) string {
+	return fmt.Sprintf("%d", uint64(gid))
 }
 
 // sgNameKey returns the key used to access the SyncGroup name entry.
 func sgNameKey(name string) string {
-	return util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", name)
+	return util.JoinKeyParts(sgNameKeyPrefix, name)
+}
+
+// sgIdKey returns the key used to access the SyncGroup ID entry.
+func sgIdKey(gid interfaces.GroupId) string {
+	return util.JoinKeyParts(sgIdKeyPrefix, fmt.Sprintf("%d", gid))
+}
+
+// sgDataKey returns the key used to access a version of the SyncGroup data.
+func sgDataKey(gid interfaces.GroupId, version string) string {
+	return util.JoinKeyParts(sgDataKeyPrefix, fmt.Sprintf("%d", gid), version)
 }
 
 // splitSgNameKey is the inverse of sgNameKey and returns the SyncGroup name.
 func splitSgNameKey(ctx *context.T, key string) (string, error) {
-	prefix := util.JoinKeyParts(util.SyncPrefix, sgPrefix, "n", "")
-
 	// Note that the actual SyncGroup name may contain ":" as a separator.
-	if !strings.HasPrefix(key, prefix) {
+	// So don't split the key on the separator, instead trim its prefix.
+	prefix := util.JoinKeyParts(sgNameKeyPrefix, "")
+	name := strings.TrimPrefix(key, prefix)
+	if name == key {
 		return "", verror.New(verror.ErrInternal, ctx, "invalid sgNamekey", key)
 	}
-	return strings.TrimPrefix(key, prefix), nil
-}
-
-// hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
-	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
-	var sg interfaces.SyncGroup
-	if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil {
-		if verror.ErrorID(err) == verror.ErrNoExist.ID {
-			err = nil
-		}
-		return false, err
-	}
-	return true, nil
+	return name, nil
 }
 
 // hasSGNameEntry returns true if the SyncGroup name entry exists.
 func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
-	// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
-	var gid interfaces.GroupId
-	if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil {
-		if verror.ErrorID(err) == verror.ErrNoExist.ID {
-			err = nil
-		}
-		return false, err
-	}
-	return true, nil
+	return util.Exists(nil, sntx, sgNameKey(name))
 }
 
-// setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
-	return util.Put(ctx, tx, sgDataKey(gid), sg)
+// hasSGIdEntry returns true if the SyncGroup ID entry exists.
+func hasSGIdEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
+	return util.Exists(nil, sntx, sgIdKey(gid))
+}
+
+// hasSGDataEntry returns true if the SyncGroup versioned data entry exists.
+func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId, version string) (bool, error) {
+	return util.Exists(nil, sntx, sgDataKey(gid, version))
 }
 
 // setSGNameEntry stores the SyncGroup name entry.
@@ -388,34 +553,58 @@
 	return util.Put(ctx, tx, sgNameKey(name), gid)
 }
 
-// getSGDataEntry retrieves the SyncGroup data for a given group ID.
-func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
+// setSGIdEntry stores the SyncGroup ID entry.
+func setSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, state *sgLocalState) error {
+	return util.Put(ctx, tx, sgIdKey(gid), state)
+}
+
+// setSGDataEntry stores the SyncGroup versioned data entry.
+func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string, sg *interfaces.SyncGroup) error {
+	return util.Put(ctx, tx, sgDataKey(gid, version), sg)
+}
+
+// getSGNameEntry retrieves the SyncGroup ID for a given name.
+func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
+	var gid interfaces.GroupId
+	if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
+		return interfaces.NoGroupId, err
+	}
+	return gid, nil
+}
+
+// getSGIdEntry retrieves the SyncGroup local state for a given group ID.
+func getSGIdEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*sgLocalState, error) {
+	var state sgLocalState
+	if err := util.Get(ctx, st, sgIdKey(gid), &state); err != nil {
+		return nil, err
+	}
+	return &state, nil
+}
+
+// getSGDataEntry retrieves the SyncGroup data for a given group ID and version.
+func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId, version string) (*interfaces.SyncGroup, error) {
 	var sg interfaces.SyncGroup
-	if err := util.Get(ctx, st, sgDataKey(gid), &sg); err != nil {
+	if err := util.Get(ctx, st, sgDataKey(gid, version), &sg); err != nil {
 		return nil, err
 	}
 	return &sg, nil
 }
 
-// getSGNameEntry retrieves the SyncGroup name to ID mapping.
-func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
-	var gid interfaces.GroupId
-	if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
-		return gid, err
-	}
-	return gid, nil
-}
-
-// delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
-	return util.Delete(ctx, tx, sgDataKey(gid))
-}
-
-// delSGNameEntry deletes the SyncGroup name to ID mapping.
+// delSGNameEntry deletes the SyncGroup name entry.
 func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
 	return util.Delete(ctx, tx, sgNameKey(name))
 }
 
+// delSGIdEntry deletes the SyncGroup ID entry.
+func delSGIdEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
+	return util.Delete(ctx, tx, sgIdKey(gid))
+}
+
+// delSGDataEntry deletes the SyncGroup versioned data entry.
+func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, version string) error {
+	return util.Delete(ctx, tx, sgDataKey(gid, version))
+}
+
 ////////////////////////////////////////////////////////////
 // SyncGroup methods between Client and Syncbase.
 
@@ -424,9 +613,22 @@
 	vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
 	defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
 
-	// Get this Syncbase's sync module handle.
 	ss := sd.sync.(*syncService)
-	var sg *interfaces.SyncGroup
+	appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+	// Instantiate sg. Add self as joiner.
+	gid, version := newSyncGroupId(), newSyncGroupVersion()
+	sg := &interfaces.SyncGroup{
+		Id:          gid,
+		Name:        sgName,
+		SpecVersion: version,
+		Spec:        spec,
+		Creator:     ss.name,
+		AppName:     appName,
+		DbName:      dbName,
+		Status:      interfaces.SyncGroupStatusPublishPending,
+		Joiners:     map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
+	}
 
 	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
@@ -436,29 +638,17 @@
 
 		// TODO(hpucha): Check prefix ACLs on all SG prefixes.
 		// This may need another method on util.Database interface.
-
 		// TODO(hpucha): Do some SG ACL checking. Check creator
 		// has Admin privilege.
 
-		// Instantiate sg. Add self as joiner.
-		sg = &interfaces.SyncGroup{
-			Id:          newSyncGroupId(),
-			Name:        sgName,
-			SpecVersion: newSyncGroupVersion(),
-			Spec:        spec,
-			Creator:     ss.name,
-			AppName:     sd.db.App().Name(),
-			DbName:      sd.db.Name(),
-			Status:      interfaces.SyncGroupStatusPublishPending,
-			Joiners:     map[string]wire.SyncGroupMemberInfo{ss.name: myInfo},
-		}
+		// Reserve a log generation and position counts for the new SyncGroup.
+		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+		gen, pos := uint64(1), uint64(1)
 
-		if err := addSyncGroup(ctx, tx, sg); err != nil {
+		if err := ss.addSyncGroup(ctx, tx, version, true, "", nil, ss.id, gen, pos, sg); err != nil {
 			return err
 		}
 
-		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
 		// Take a snapshot of the data to bootstrap the SyncGroup.
 		return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes)
 	})
@@ -467,9 +657,13 @@
 		return err
 	}
 
-	ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
-	// Local SG create succeeded. Publish the SG at the chosen server.
-	sd.publishSyncGroup(ctx, call, sgName)
+	ss.initSyncStateInMem(ctx, appName, dbName, gid)
+
+	// Local SG create succeeded. Publish the SG at the chosen server, or if
+	// that fails, enqueue it for later publish retries.
+	if err := sd.publishSyncGroup(ctx, call, sgName); err != nil {
+		ss.enqueuePublishSyncGroup(sgName, appName, dbName, true)
+	}
 
 	// Publish at the chosen mount table and in the neighborhood.
 	sd.publishInMountTables(ctx, call, spec)
@@ -492,24 +686,54 @@
 			return err
 		}
 
-		// Check if SyncGroup already exists.
-		sg, sgErr = getSyncGroupByName(ctx, tx, sgName)
+		// Check if SyncGroup already exists and get its info.
+		var gid interfaces.GroupId
+		gid, sgErr = getSyncGroupId(ctx, tx, sgName)
 		if sgErr != nil {
 			return sgErr
 		}
 
-		// SyncGroup already exists. Possibilities include created
-		// locally, already joined locally or published at the device as
-		// a result of SyncGroup creation on a different device.
-		//
-		// TODO(hpucha): Handle the above cases. If the SG was published
-		// locally, but not joined, we need to bootstrap the DAG and
-		// watcher. If multiple joins are done locally, we may want to
-		// ref count the SG state and track the leaves accordingly. So
-		// we may need to add some local state for each SyncGroup.
+		sg, sgErr = getSyncGroupById(ctx, tx, gid)
+		if sgErr != nil {
+			return sgErr
+		}
 
 		// Check SG ACL.
-		return authorize(ctx, call.Security(), sg)
+		if err := authorize(ctx, call.Security(), sg); err != nil {
+			return err
+		}
+
+		// SyncGroup already exists, increment the number of local
+		// joiners in its local state information.  This presents
+		// different scenarios:
+		// 1- An additional local joiner: the current number of local
+		//    joiners is > 0 and the SyncGroup was already bootstrapped
+		//    to the Watcher, so there is nothing else to do.
+		// 2- A new local joiner after all previous local joiners had
+		//    left: the number of local joiners is 0, the Watcher must
+		//    be re-notified via a SyncGroup bootstrap because the last
+		//    previous joiner to leave had un-notified the Watcher.  In
+		//    this scenario the SyncGroup was not destroyed after the
+		//    last joiner left because the SyncGroup was also published
+		//    here by a remote peer and thus cannot be destroyed only
+		//    based on the local joiners.
+		// 3- A first local joiner for a SyncGroup that was published
+		//    here from a remote Syncbase: the number of local joiners
+		//    is also 0 (and the remote publish flag is set), and the
+		//    Watcher must be notified via a SyncGroup bootstrap.
+		// Conclusion: bootstrap if the number of local joiners is 0.
+		sgState, err := getSGIdEntry(ctx, tx, gid)
+		if err != nil {
+			return err
+		}
+
+		if sgState.NumLocalJoiners == 0 {
+			if err := sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes); err != nil {
+				return err
+			}
+		}
+		sgState.NumLocalJoiners++
+		return setSGIdEntry(ctx, tx, gid, sgState)
 	})
 
 	// The presented blessing is allowed to make this Syncbase instance join
@@ -532,48 +756,41 @@
 	ss := sd.sync.(*syncService)
 
 	// Contact a SyncGroup Admin to join the SyncGroup.
-	sg = &interfaces.SyncGroup{}
-	*sg, err = sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
+	sg2, version, genvec, err := sd.joinSyncGroupAtAdmin(ctx, call, sgName, ss.name, myInfo)
 	if err != nil {
 		return nullSpec, err
 	}
 
 	// Verify that the app/db combination is valid for this SyncGroup.
-	if sg.AppName != sd.db.App().Name() || sg.DbName != sd.db.Name() {
+	appName, dbName := sd.db.App().Name(), sd.db.Name()
+	if sg2.AppName != appName || sg2.DbName != dbName {
 		return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
 	}
 
 	err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
-
-		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
-
-		// TODO(hpucha): Get SG Deltas from Admin device.
-
-		if err := addSyncGroup(ctx, tx, sg); err != nil {
+		if err := ss.addSyncGroup(ctx, tx, version, false, "", genvec, 0, 0, 0, &sg2); err != nil {
 			return err
 		}
 
 		// Take a snapshot of the data to bootstrap the SyncGroup.
-		return sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes)
+		return sd.bootstrapSyncGroup(ctx, tx, sg2.Spec.Prefixes)
 	})
 
 	if err != nil {
 		return nullSpec, err
 	}
 
-	ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+	ss.initSyncStateInMem(ctx, sg2.AppName, sg2.DbName, sg2.Id)
 
 	// Publish at the chosen mount table and in the neighborhood.
-	sd.publishInMountTables(ctx, call, sg.Spec)
+	sd.publishInMountTables(ctx, call, sg2.Spec)
 
-	return sg.Spec, nil
+	return sg2.Spec, nil
 }
 
 func (sd *syncDatabase) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
-	var sgNames []string
-
 	vlog.VI(2).Infof("sync: GetSyncGroupNames: begin")
-	defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end")
 
 	sn := sd.db.St().NewSnapshot()
 	defer sn.Abort()
@@ -584,8 +801,8 @@
 	}
 
 	// Scan all the SyncGroup names found in the Database.
-	scanStart, scanLimit := util.ScanPrefixArgs(sgNameKeyScanPrefix, "")
-	stream := sn.Scan(scanStart, scanLimit)
+	stream := sn.Scan(util.ScanPrefixArgs(sgNameKeyPrefix, ""))
+	var sgNames []string
 	var key []byte
 	for stream.Advance() {
 		sgName, err := splitSgNameKey(ctx, string(stream.Key(key)))
@@ -599,18 +816,19 @@
 		return nil, err
 	}
 
+	vlog.VI(2).Infof("sync: GetSyncGroupNames: %v", sgNames)
 	return sgNames, nil
 }
 
 func (sd *syncDatabase) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
-	var spec wire.SyncGroupSpec
-
 	vlog.VI(2).Infof("sync: GetSyncGroupSpec: begin %s", sgName)
-	defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s", sgName)
 
 	sn := sd.db.St().NewSnapshot()
 	defer sn.Abort()
 
+	var spec wire.SyncGroupSpec
+
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
 		return spec, "", err
@@ -623,41 +841,45 @@
 	}
 	// TODO(hpucha): Check SyncGroup ACL.
 
-	spec = sg.Spec
-	return spec, sg.SpecVersion, nil
+	vlog.VI(2).Infof("sync: GetSyncGroupSpec: %s spec %v", sgName, sg.Spec)
+	return sg.Spec, sg.SpecVersion, nil
 }
 
 func (sd *syncDatabase) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
-	var members map[string]wire.SyncGroupMemberInfo
-
 	vlog.VI(2).Infof("sync: GetSyncGroupMembers: begin %s", sgName)
-	defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
+	defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s", sgName)
 
 	sn := sd.db.St().NewSnapshot()
 	defer sn.Abort()
 
 	// Check permissions on Database.
 	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
-		return members, err
+		return nil, err
 	}
 
 	// Get the SyncGroup information.
 	sg, err := getSyncGroupByName(ctx, sn, sgName)
 	if err != nil {
-		return members, err
+		return nil, err
 	}
 
 	// TODO(hpucha): Check SyncGroup ACL.
 
-	members = sg.Joiners
-	return members, nil
+	vlog.VI(2).Infof("sync: GetSyncGroupMembers: %s members %v", sgName, sg.Joiners)
+	return sg.Joiners, nil
 }
 
-// TODO(hpucha): Enable syncing syncgroup metadata.
 func (sd *syncDatabase) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
 	vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
 	defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
 
+	if err := verifySyncGroupSpec(ctx, &spec); err != nil {
+		return err
+	}
+
+	ss := sd.sync.(*syncService)
+	//appName, dbName := sd.db.App().Name(), sd.db.Name()
+
 	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
@@ -669,10 +891,33 @@
 			return err
 		}
 
-		// TODO(hpucha): Check SyncGroup ACL. Perform version checking.
+		if version != NoVersion && sg.SpecVersion != version {
+			return verror.NewErrBadVersion(ctx)
+		}
 
+		// Must not change the SyncGroup prefixes.
+		if !samePrefixes(spec.Prefixes, sg.Spec.Prefixes) {
+			return verror.New(verror.ErrBadArg, ctx, "cannot modify prefixes")
+		}
+
+		sgState, err := getSGIdEntry(ctx, tx, sg.Id)
+		if err != nil {
+			return err
+		}
+		if sgState.SyncPending {
+			return verror.NewErrBadState(ctx)
+		}
+
+		// Reserve a log generation and position counts for the new SyncGroup.
+		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(sg.Id), 1)
+		gen, pos := uint64(1), uint64(1)
+
+		// TODO(hpucha): Check SyncGroup ACL.
+
+		newVersion := newSyncGroupVersion()
 		sg.Spec = spec
-		return setSGDataEntry(ctx, tx, sg.Id, sg)
+		sg.SpecVersion = newVersion
+		return ss.updateSyncGroupVersioning(ctx, tx, newVersion, true, ss.id, gen, pos, sg)
 	})
 	return err
 }
@@ -680,9 +925,27 @@
 //////////////////////////////
 // Helper functions
 
-// TODO(hpucha): Call this periodically until we are able to contact the remote peer.
+// publishSyncGroup publishes the SyncGroup at the remote peer and update its
+// status.  If the publish operation is either successful or rejected by the
+// peer, the status is updated to "running" or "rejected" respectively and the
+// function returns "nil" to indicate to the caller there is no need to make
+// further attempts.  Otherwise an error (typically RPC error, but could also
+// be a store error) is returned to the caller.
+// TODO(rdaoud): make all SG admins try to publish after they join.
 func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
-	sg, err := getSyncGroupByName(ctx, sd.db.St(), sgName)
+	st := sd.db.St()
+	ss := sd.sync.(*syncService)
+	//appName, dbName := sd.db.App().Name(), sd.db.Name()
+
+	gid, err := getSyncGroupId(ctx, st, sgName)
+	if err != nil {
+		return err
+	}
+	version, err := getSyncGroupVersion(ctx, st, gid)
+	if err != nil {
+		return err
+	}
+	sg, err := getSGDataEntry(ctx, st, gid, version)
 	if err != nil {
 		return err
 	}
@@ -691,34 +954,74 @@
 		return nil
 	}
 
+	// Note: the remote peer is given the SyncGroup version and genvec at
+	// the point before the post-publish update, at which time the status
+	// and joiner list of the SyncGroup get updated.  This is functionally
+	// correct, just not symmetrical with what happens at joiner, which
+	// receives the SyncGroup state post-join.
+	// TODO(rdaoud): send the SyncGroup genvec to the remote peer.
+	status := interfaces.SyncGroupStatusPublishRejected
+
 	c := interfaces.SyncClient(sgName)
-	err = c.PublishSyncGroup(ctx, *sg)
+	peer, err := c.PublishSyncGroup(ctx, ss.name, *sg, version, nil)
 
-	// Publish failed temporarily. Retry later.
-	// TODO(hpucha): Is there an RPC error that we can check here?
-	if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
-		return err
-	}
-
-	// Publish succeeded.
 	if err == nil {
-		// TODO(hpucha): Get SG Deltas from publisher. Obtaining the
-		// new version from the publisher prevents SG conflicts.
-		return err
+		status = interfaces.SyncGroupStatusRunning
+	} else {
+		errId := verror.ErrorID(err)
+		if errId == interfaces.ErrDupSyncGroupPublish.ID {
+			// Duplicate publish: another admin already published
+			// the SyncGroup, nothing else needs to happen because
+			// that other admin would have updated the SyncGroup
+			// status and p2p SG sync will propagate the change.
+			// TODO(rdaoud): what if that other admin crashes and
+			// never updates the SyncGroup status (dies permanently
+			// or is ejected before the status update)?  Eventually
+			// some admin must decide to update the SG status anyway
+			// even if that causes extra SG mutations and conflicts.
+			vlog.VI(3).Infof("sync: publishSyncGroup: %s: duplicate publish", sgName)
+			return nil
+		}
+
+		if errId != verror.ErrExist.ID {
+			// The publish operation failed with an error other
+			// than ErrExist then it must be retried later on.
+			// TODO(hpucha): Is there an RPC error that we can check here?
+			vlog.VI(3).Infof("sync: publishSyncGroup: %s: failed, retry later: %v", sgName, err)
+			return err
+		}
 	}
 
-	// Publish rejected. Persist that to avoid retrying in the
-	// future and to remember the split universe scenario.
-	err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
+	// The publish operation is done because either it succeeded or it
+	// failed with the ErrExist error.  Update the SyncGroup status and, if
+	// the publish was successful, add the remote peer to the SyncGroup.
+	vlog.VI(3).Infof("sync: publishSyncGroup: %s: peer %s: done: status %s: %v",
+		sgName, peer, status.String(), err)
+
+	err = store.RunInTransaction(st, func(tx store.Transaction) error {
 		// Ensure SG still exists.
-		sg, err := getSyncGroupByName(ctx, tx, sgName)
+		sg, err := getSyncGroupById(ctx, tx, gid)
 		if err != nil {
 			return err
 		}
 
-		sg.Status = interfaces.SyncGroupStatusPublishRejected
-		return setSGDataEntry(ctx, tx, sg.Id, sg)
+		// Reserve a log generation and position counts for the new
+		// SyncGroup version.
+		//gen, pos := ss.reserveGenAndPosInDbLog(ctx, appName, dbName, sgIdStr(gid), 1)
+		gen, pos := uint64(1), uint64(1)
+
+		sg.Status = status
+		if status == interfaces.SyncGroupStatusRunning {
+			// TODO(hpucha): Default priority?
+			sg.Joiners[peer] = wire.SyncGroupMemberInfo{}
+		}
+
+		return ss.updateSyncGroupVersioning(ctx, tx, NoVersion, true, ss.id, gen, pos, sg)
 	})
+	if err != nil {
+		vlog.Errorf("sync: publishSyncGroup: cannot update SyncGroup %s status to %s: %v",
+			sgName, status.String(), err)
+	}
 	return err
 }
 
@@ -801,7 +1104,7 @@
 	return nil
 }
 
-func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (sd *syncDatabase) joinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, name string, myInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
 	c := interfaces.SyncClient(sgName)
 	return c.JoinSyncGroupAtAdmin(ctx, sgName, name, myInfo)
 
@@ -819,36 +1122,49 @@
 ////////////////////////////////////////////////////////////
 // Methods for SyncGroup create/join between Syncbases.
 
-func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg interfaces.SyncGroup) error {
+func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, publisher string, sg interfaces.SyncGroup, version string, genvec interfaces.PrefixGenVector) (string, error) {
 	st, err := s.getDbStore(ctx, call, sg.AppName, sg.DbName)
 	if err != nil {
-		return err
+		return s.name, err
 	}
 
 	err = store.RunInTransaction(st, func(tx store.Transaction) error {
-		localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
-
+		gid, err := getSyncGroupId(ctx, tx, sg.Name)
 		if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
 			return err
 		}
 
-		// SG name already claimed.
-		if err == nil && localSG.Id != sg.Id {
-			return verror.New(verror.ErrExist, ctx, sg.Name)
-		}
-
-		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG
-		// metadata if needed.
-		//
-		// TODO(hpucha): Catch up on SG versions so far.
-
-		// SG already published. Update if needed.
-		if err == nil && localSG.Id == sg.Id {
-			if localSG.Status == interfaces.SyncGroupStatusPublishPending {
-				localSG.Status = interfaces.SyncGroupStatusRunning
-				return setSGDataEntry(ctx, tx, localSG.Id, localSG)
+		if err == nil {
+			// SG name already claimed.  Note that in this case of
+			// split-brain (same SG name, different IDs), those in
+			// SG ID being rejected here do not benefit from the
+			// de-duping optimization below and will end up making
+			// duplicate SG mutations to set the status, yielding
+			// more SG conflicts.  It is functionally correct but
+			// bypasses the de-dup optimization for the rejected SG.
+			if gid != sg.Id {
+				return verror.New(verror.ErrExist, ctx, sg.Name)
 			}
-			return nil
+
+			// SG exists locally, either locally created/joined or
+			// previously published.  Make it idempotent for the
+			// same publisher, otherwise it's a duplicate.
+			state, err := getSGIdEntry(ctx, tx, gid)
+			if err != nil {
+				return err
+			}
+			if state.RemotePublisher == "" {
+				// Locally created/joined SyncGroup: update its
+				// state to include the publisher.
+				state.RemotePublisher = publisher
+				return setSGIdEntry(ctx, tx, gid, state)
+			}
+			if publisher == state.RemotePublisher {
+				// Same previous publisher: nothing to change,
+				// the old genvec and version info is valid.
+				return nil
+			}
+			return interfaces.NewErrDupSyncGroupPublish(ctx, sg.Name)
 		}
 
 		// Publish the SyncGroup.
@@ -856,23 +1172,21 @@
 		// TODO(hpucha): Use some ACL check to allow/deny publishing.
 		// TODO(hpucha): Ensure node is on Admin ACL.
 
-		// TODO(hpucha): Default priority?
-		sg.Joiners[s.name] = wire.SyncGroupMemberInfo{}
-		sg.Status = interfaces.SyncGroupStatusRunning
-		return addSyncGroup(ctx, tx, &sg)
+		return s.addSyncGroup(ctx, tx, version, false, publisher, genvec, 0, 0, 0, &sg)
 	})
 
-	if err != nil {
-		return err
+	if err == nil {
+		s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
 	}
-	s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
-	return nil
+	return s.name, err
 }
 
-func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
+func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, string, interfaces.PrefixGenVector, error) {
 	var dbSt store.Store
 	var gid interfaces.GroupId
 	var err error
+	var stAppName, stDbName string
+	nullSG, nullGV := interfaces.SyncGroup{}, interfaces.PrefixGenVector{}
 
 	// Find the database store for this SyncGroup.
 	//
@@ -885,6 +1199,7 @@
 		if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
 			// Found the SyncGroup being looked for.
 			dbSt = st
+			stAppName, stDbName = appName, dbName
 			return true
 		}
 		return false
@@ -892,10 +1207,12 @@
 
 	// SyncGroup not found.
 	if err != nil {
-		return interfaces.SyncGroup{}, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
+		return nullSG, "", nullGV, verror.New(verror.ErrNoExist, ctx, "SyncGroup not found", sgName)
 	}
 
+	version := newSyncGroupVersion()
 	var sg *interfaces.SyncGroup
+
 	err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
 		var err error
 		sg, err = getSyncGroupById(ctx, tx, gid)
@@ -908,13 +1225,27 @@
 			return err
 		}
 
+		// Check that the SG is not in pending state.
+		state, err := getSGIdEntry(ctx, tx, gid)
+		if err != nil {
+			return err
+		}
+		if state.SyncPending {
+			return verror.NewErrBadState(ctx)
+		}
+
+		// Reserve a log generation and position counts for the new SyncGroup.
+		//gen, pos := s.reserveGenAndPosInDbLog(ctx, stAppName, stDbName, sgIdStr(gid), 1)
+		gen, pos := uint64(1), uint64(1)
+
 		// Add to joiner list.
 		sg.Joiners[joinerName] = joinerInfo
-		return setSGDataEntry(ctx, tx, sg.Id, sg)
+		return s.updateSyncGroupVersioning(ctx, tx, version, true, s.id, gen, pos, sg)
 	})
 
 	if err != nil {
-		return interfaces.SyncGroup{}, err
+		return nullSG, "", nullGV, err
 	}
-	return *sg, nil
+	// TODO(rdaoud): return the SyncGroup genvec
+	return *sg, version, nullGV, nil
 }
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index 4dc5845..af44cfb 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -50,6 +50,7 @@
 	svc := createService(t)
 	defer destroyService(t, svc)
 	st := svc.St()
+	s := svc.sync
 
 	checkSGStats(t, svc, "add-1", 0, 0)
 
@@ -57,6 +58,7 @@
 
 	sgName := "foobar"
 	sgId := interfaces.GroupId(1234)
+	version := "v111"
 
 	sg := &interfaces.SyncGroup{
 		Name:        sgName,
@@ -76,7 +78,7 @@
 	}
 
 	tx := st.NewTransaction()
-	if err := addSyncGroup(nil, tx, sg); err != nil {
+	if err := s.addSyncGroup(nil, tx, version, true, "", nil, s.id, 1, 1, sg); err != nil {
 		t.Errorf("cannot add SyncGroup ID %d: %v", sg.Id, err)
 	}
 	if err := tx.Commit(); err != nil {
@@ -88,10 +90,6 @@
 	if id, err := getSyncGroupId(nil, st, sgName); err != nil || id != sgId {
 		t.Errorf("cannot get ID of SyncGroup %s: got %d instead of %d; err: %v", sgName, id, sgId, err)
 	}
-	if name, err := getSyncGroupName(nil, st, sgId); err != nil || name != sgName {
-		t.Errorf("cannot get name of SyncGroup %d: got %s instead of %s; err: %v",
-			sgId, name, sgName, err)
-	}
 
 	sgOut, err := getSyncGroupById(nil, st, sgId)
 	if err != nil {
@@ -150,7 +148,7 @@
 	sg.Name = "another-name"
 
 	tx = st.NewTransaction()
-	if err = addSyncGroup(nil, tx, sg); err == nil {
+	if err = s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 2, 2, sg); err == nil {
 		t.Errorf("re-adding SyncGroup %d did not fail", sgId)
 	}
 	tx.Abort()
@@ -159,7 +157,7 @@
 	sg.Id = interfaces.GroupId(5555)
 
 	tx = st.NewTransaction()
-	if err = addSyncGroup(nil, tx, sg); err == nil {
+	if err = s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 3, 3, sg); err == nil {
 		t.Errorf("adding SyncGroup %s with a different ID did not fail", sgName)
 	}
 	tx.Abort()
@@ -173,9 +171,6 @@
 	if id, err := getSyncGroupId(nil, st, badName); err == nil {
 		t.Errorf("found non-existing SyncGroup %s: got ID %d", badName, id)
 	}
-	if name, err := getSyncGroupName(nil, st, badId); err == nil {
-		t.Errorf("found non-existing SyncGroup %d: got name %s", badId, name)
-	}
 	if sg, err := getSyncGroupByName(nil, st, badName); err == nil {
 		t.Errorf("found non-existing SyncGroup %s: got %v", badName, sg)
 	}
@@ -191,10 +186,11 @@
 	svc := createService(t)
 	defer destroyService(t, svc)
 	st := svc.St()
+	s := svc.sync
 
 	checkBadAddSyncGroup := func(t *testing.T, st store.Store, sg *interfaces.SyncGroup, msg string) {
 		tx := st.NewTransaction()
-		if err := addSyncGroup(nil, tx, sg); err == nil {
+		if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg); err == nil {
 			t.Errorf("checkBadAddSyncGroup: adding bad SyncGroup (%s) did not fail", msg)
 		}
 		tx.Abort()
@@ -202,13 +198,22 @@
 
 	checkBadAddSyncGroup(t, st, nil, "nil SG")
 
-	sg := &interfaces.SyncGroup{Id: 1234}
+	sg := &interfaces.SyncGroup{}
 	checkBadAddSyncGroup(t, st, sg, "SG w/o name")
 
-	sg = &interfaces.SyncGroup{Name: "foobar"}
-	checkBadAddSyncGroup(t, st, sg, "SG w/o Id")
+	sg.Name = "foobar"
+	checkBadAddSyncGroup(t, st, sg, "SG w/o AppName")
 
-	sg.Id = 1234
+	sg.AppName = "mockApp"
+	checkBadAddSyncGroup(t, st, sg, "SG w/o DbName")
+
+	sg.DbName = "mockDb"
+	checkBadAddSyncGroup(t, st, sg, "SG w/o creator")
+
+	sg.Creator = "haha"
+	checkBadAddSyncGroup(t, st, sg, "SG w/o ID")
+
+	sg.Id = newSyncGroupId()
 	checkBadAddSyncGroup(t, st, sg, "SG w/o Version")
 
 	sg.SpecVersion = "v1"
@@ -218,6 +223,9 @@
 		"phone": nosql.SyncGroupMemberInfo{SyncPriority: 10},
 	}
 	checkBadAddSyncGroup(t, st, sg, "SG w/o Prefixes")
+
+	sg.Spec.Prefixes = []string{"foo", "bar", "foo"}
+	checkBadAddSyncGroup(t, st, sg, "SG with duplicate Prefixes")
 }
 
 // TestDeleteSyncGroup tests deleting a SyncGroup.
@@ -227,6 +235,7 @@
 	svc := createService(t)
 	defer destroyService(t, svc)
 	st := svc.St()
+	s := svc.sync
 
 	sgName := "foobar"
 	sgId := interfaces.GroupId(1234)
@@ -264,7 +273,7 @@
 	}
 
 	tx = st.NewTransaction()
-	if err := addSyncGroup(nil, tx, sg); err != nil {
+	if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg); err != nil {
 		t.Errorf("creating SyncGroup ID %d failed: %v", sgId, err)
 	}
 	if err := tx.Commit(); err != nil {
@@ -285,16 +294,24 @@
 
 	checkSGStats(t, svc, "del-3", 0, 0)
 
-	// Create it again then delete it by name.
+	// Create it again, update it, then delete it by name.
 
 	tx = st.NewTransaction()
-	if err := addSyncGroup(nil, tx, sg); err != nil {
+	if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 2, 2, sg); err != nil {
 		t.Errorf("creating SyncGroup ID %d after delete failed: %v", sgId, err)
 	}
 	if err := tx.Commit(); err != nil {
 		t.Errorf("cannot commit adding SyncGroup ID %d after delete: %v", sgId, err)
 	}
 
+	tx = st.NewTransaction()
+	if err := s.updateSyncGroupVersioning(nil, tx, NoVersion, true, s.id, 3, 3, sg); err != nil {
+		t.Errorf("updating SyncGroup ID %d version: %v", sgId, err)
+	}
+	if err := tx.Commit(); err != nil {
+		t.Errorf("cannot commit updating SyncGroup ID %d version: %v", sgId, err)
+	}
+
 	checkSGStats(t, svc, "del-4", 1, 3)
 
 	tx = st.NewTransaction()
@@ -315,6 +332,7 @@
 	svc := createService(t)
 	defer destroyService(t, svc)
 	st := svc.St()
+	s := svc.sync
 
 	sgName1, sgName2 := "foo", "bar"
 	sgId1, sgId2 := interfaces.GroupId(1234), interfaces.GroupId(8888)
@@ -357,7 +375,7 @@
 	}
 
 	tx := st.NewTransaction()
-	if err := addSyncGroup(nil, tx, sg1); err != nil {
+	if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 1, 1, sg1); err != nil {
 		t.Errorf("creating SyncGroup ID %d failed: %v", sgId1, err)
 	}
 	if err := tx.Commit(); err != nil {
@@ -367,7 +385,7 @@
 	checkSGStats(t, svc, "multi-1", 1, 3)
 
 	tx = st.NewTransaction()
-	if err := addSyncGroup(nil, tx, sg2); err != nil {
+	if err := s.addSyncGroup(nil, tx, NoVersion, true, "", nil, s.id, 2, 2, sg2); err != nil {
 		t.Errorf("creating SyncGroup ID %d failed: %v", sgId2, err)
 	}
 	if err := tx.Commit(); err != nil {
@@ -510,3 +528,21 @@
 		}
 	}
 }
+
+// TestPrefixCompare tests the prefix comparison utility.
+func TestPrefixCompare(t *testing.T) {
+	check := func(t *testing.T, pfx1, pfx2 []string, want bool, msg string) {
+		if got := samePrefixes(pfx1, pfx2); got != want {
+			t.Errorf("samePrefixes: %s: got %t instead of %t", msg, got, want)
+		}
+	}
+
+	check(t, nil, nil, true, "both nil")
+	check(t, []string{}, nil, true, "empty vs nil")
+	check(t, []string{"a", "b"}, []string{"b", "a"}, true, "different ordering")
+	check(t, []string{"a", "b", "c"}, []string{"b", "a"}, false, "p1 superset of p2")
+	check(t, []string{"a", "b"}, []string{"b", "a", "c"}, false, "p2 superset of p1")
+	check(t, []string{"a", "b", "c"}, []string{"b", "d", "a"}, false, "overlap")
+	check(t, []string{"a", "b", "c"}, []string{"x", "y"}, false, "no overlap")
+	check(t, []string{"a", "b"}, []string{"B", "a"}, false, "upper/lowercases")
+}
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 9cf53d3..a0c8731 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -43,3 +43,44 @@
 	Metadata interfaces.LogRecMetadata
 	Pos      uint64 // position in the Database log.
 }
+
+// sgLocalState holds the SyncGroup local state, only relevant to this member
+// (i.e. the local Syncbase).  This is needed for crash recovery of the internal
+// state transitions of the SyncGroup.
+type sgLocalState struct {
+	// The count of local joiners to the same SyncGroup.
+	NumLocalJoiners uint32
+
+	// The SyncGroup is watched when the sync Watcher starts processing the
+	// SyncGroup data.  When a SyncGroup is created or joined, an entry is
+	// added to the Watcher queue (log) to inform it from which point to
+	// start accepting store mutations, an asynchronous notification similar
+	// to regular store mutations.  When the Watcher processes that queue
+	// entry, it sets this bit to true.  When Syncbase restarts, the value
+	// of this bit allows the new sync Watcher to recreate its in-memory
+	// state by resuming to watch only the prefixes of SyncGroups that were
+	// previously being watched.
+	Watched bool
+
+	// The SyncGroup was published here by this remote peer (if non-empty
+	// string), typically the SyncGroup creator.  In this case the SyncGroup
+	// cannot be GCed locally even if it has no local joiners.
+	RemotePublisher string
+
+	// The SyncGroup is in pending state on a device that learns the current
+	// state of the SyncGroup from another device but has not yet received
+	// through peer-to-peer sync the history of the changes (DAG and logs).
+	// This happens in two cases:
+	// 1- A joiner was accepted into a SyncGroup by a SyncGroup admin and
+	//    only given the current SyncGroup info synchronously and will
+	//    receive the full history later via p2p sync.
+	// 2- A remote server where the SyncGroup is published was told by the
+	//    SyncGroup publisher the current SyncGroup info synchronously and
+	//    will receive the full history later via p2p sync.
+	// The pending state is over when the device reaches or exceeds the
+	// knowledge level indicated in the pending genvec.  While SyncPending
+	// is true, no local SyncGroup mutations are allowed (i.e. no join or
+	// set-spec requests).
+	SyncPending   bool
+	PendingGenVec interfaces.PrefixGenVector
+}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index c02d2d4..34f425f 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -61,11 +61,55 @@
 }) {
 }
 
+// sgLocalState holds the SyncGroup local state, only relevant to this member
+// (i.e. the local Syncbase).  This is needed for crash recovery of the internal
+// state transitions of the SyncGroup.
+type sgLocalState struct {
+	// The count of local joiners to the same SyncGroup.
+	NumLocalJoiners uint32
+	// The SyncGroup is watched when the sync Watcher starts processing the
+	// SyncGroup data.  When a SyncGroup is created or joined, an entry is
+	// added to the Watcher queue (log) to inform it from which point to
+	// start accepting store mutations, an asynchronous notification similar
+	// to regular store mutations.  When the Watcher processes that queue
+	// entry, it sets this bit to true.  When Syncbase restarts, the value
+	// of this bit allows the new sync Watcher to recreate its in-memory
+	// state by resuming to watch only the prefixes of SyncGroups that were
+	// previously being watched.
+	Watched bool
+	// The SyncGroup was published here by this remote peer (if non-empty
+	// string), typically the SyncGroup creator.  In this case the SyncGroup
+	// cannot be GCed locally even if it has no local joiners.
+	RemotePublisher string
+	// The SyncGroup is in pending state on a device that learns the current
+	// state of the SyncGroup from another device but has not yet received
+	// through peer-to-peer sync the history of the changes (DAG and logs).
+	// This happens in two cases:
+	// 1- A joiner was accepted into a SyncGroup by a SyncGroup admin and
+	//    only given the current SyncGroup info synchronously and will
+	//    receive the full history later via p2p sync.
+	// 2- A remote server where the SyncGroup is published was told by the
+	//    SyncGroup publisher the current SyncGroup info synchronously and
+	//    will receive the full history later via p2p sync.
+	// The pending state is over when the device reaches or exceeds the
+	// knowledge level indicated in the pending genvec.  While SyncPending
+	// is true, no local SyncGroup mutations are allowed (i.e. no join or
+	// set-spec requests).
+	SyncPending   bool
+	PendingGenVec interfaces.PrefixGenVector
+}
+
+func (sgLocalState) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/vsync.sgLocalState"`
+}) {
+}
+
 func init() {
 	vdl.Register((*syncData)(nil))
 	vdl.Register((*localGenInfo)(nil))
 	vdl.Register((*dbSyncState)(nil))
 	vdl.Register((*localLogRec)(nil))
+	vdl.Register((*sgLocalState)(nil))
 }
 
 const logPrefix = "log" // log state.