Merge "services/groups: Authorization fixes."
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index a3d7f7f..6dd6f25 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -14,15 +14,8 @@
 type Sync interface {
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
-	// vector. This process happens one Database at a time encompassing all
-	// the SyncGroups common to the initiator and the responder. For each
-	// Database, the initiator sends a DeltaReq. In response, the
-	// responder sends a "Start" DeltaResp record, all the missing log
-	// records, the responder's genvector, and a "Finish" DeltaResp
-	// record. The initiator parses the stream between a Start and a Finish
-	// record as the response to its DeltaReq, and then moves on to the
-	// next Database in common with this responder.
-	GetDeltas(initiator string) stream<DeltaReq, DeltaResp> error {access.Read}
+	// vector for one Database for either SyncGroup metadata or data.
+	GetDeltas(req DeltaReq, initiator string) stream<_, DeltaResp> error {access.Read}
 
 	// SyncGroup-related methods.
 
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index a281543..3006f3c 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -28,15 +28,8 @@
 type SyncClientMethods interface {
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
-	// vector. This process happens one Database at a time encompassing all
-	// the SyncGroups common to the initiator and the responder. For each
-	// Database, the initiator sends a DeltaReq. In response, the
-	// responder sends a "Start" DeltaResp record, all the missing log
-	// records, the responder's genvector, and a "Finish" DeltaResp
-	// record. The initiator parses the stream between a Start and a Finish
-	// record as the response to its DeltaReq, and then moves on to the
-	// next Database in common with this responder.
-	GetDeltas(ctx *context.T, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+	// vector for one Database for either SyncGroup metadata or data.
+	GetDeltas(ctx *context.T, req DeltaReq, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
@@ -78,9 +71,9 @@
 	name string
 }
 
-func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 DeltaReq, i1 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
 	var call rpc.ClientCall
-	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0}, opts...); err != nil {
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0, i1}, opts...); err != nil {
 		return
 	}
 	ocall = &implSyncGetDeltasClientCall{ClientCall: call}
@@ -143,30 +136,13 @@
 		// Err returns any error encountered by Advance.  Never blocks.
 		Err() error
 	}
-	// SendStream returns the send side of the Sync.GetDeltas client stream.
-	SendStream() interface {
-		// Send places the item onto the output stream.  Returns errors
-		// encountered while sending, or if Send is called after Close or
-		// the stream has been canceled.  Blocks if there is no buffer
-		// space; will unblock when buffer space is available or after
-		// the stream has been canceled.
-		Send(item DeltaReq) error
-		// Close indicates to the server that no more items will be sent;
-		// server Recv calls will receive io.EOF after all sent items.
-		// This is an optional call - e.g. a client might call Close if it
-		// needs to continue receiving items from the server after it's
-		// done sending.  Returns errors encountered while closing, or if
-		// Close is called after the stream has been canceled.  Like Send,
-		// blocks if there is no buffer space available.
-		Close() error
-	}
 }
 
 // SyncGetDeltasClientCall represents the call returned from Sync.GetDeltas.
 type SyncGetDeltasClientCall interface {
 	SyncGetDeltasClientStream
-	// Finish performs the equivalent of SendStream().Close, then blocks until
-	// the server is done, and returns the positional return values for the call.
+	// Finish blocks until the server is done, and returns the positional return
+	// values for call.
 	//
 	// Finish returns immediately if the call has been canceled; depending on the
 	// timing the output could either be an error signaling cancelation, or the
@@ -209,23 +185,6 @@
 	}
 	return c.c.errRecv
 }
-func (c *implSyncGetDeltasClientCall) SendStream() interface {
-	Send(item DeltaReq) error
-	Close() error
-} {
-	return implSyncGetDeltasClientCallSend{c}
-}
-
-type implSyncGetDeltasClientCallSend struct {
-	c *implSyncGetDeltasClientCall
-}
-
-func (c implSyncGetDeltasClientCallSend) Send(item DeltaReq) error {
-	return c.c.Send(item)
-}
-func (c implSyncGetDeltasClientCallSend) Close() error {
-	return c.c.CloseSend()
-}
 func (c *implSyncGetDeltasClientCall) Finish() (err error) {
 	err = c.ClientCall.Finish()
 	return
@@ -479,15 +438,8 @@
 type SyncServerMethods interface {
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
-	// vector. This process happens one Database at a time encompassing all
-	// the SyncGroups common to the initiator and the responder. For each
-	// Database, the initiator sends a DeltaReq. In response, the
-	// responder sends a "Start" DeltaResp record, all the missing log
-	// records, the responder's genvector, and a "Finish" DeltaResp
-	// record. The initiator parses the stream between a Start and a Finish
-	// record as the response to its DeltaReq, and then moves on to the
-	// next Database in common with this responder.
-	GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, initiator string) error
+	// vector for one Database for either SyncGroup metadata or data.
+	GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, req DeltaReq, initiator string) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -521,15 +473,8 @@
 type SyncServerStubMethods interface {
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
-	// vector. This process happens one Database at a time encompassing all
-	// the SyncGroups common to the initiator and the responder. For each
-	// Database, the initiator sends a DeltaReq. In response, the
-	// responder sends a "Start" DeltaResp record, all the missing log
-	// records, the responder's genvector, and a "Finish" DeltaResp
-	// record. The initiator parses the stream between a Start and a Finish
-	// record as the response to its DeltaReq, and then moves on to the
-	// next Database in common with this responder.
-	GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, initiator string) error
+	// vector for one Database for either SyncGroup metadata or data.
+	GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, req DeltaReq, initiator string) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -585,8 +530,8 @@
 	gs   *rpc.GlobState
 }
 
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 string) error {
-	return s.impl.GetDeltas(ctx, call, i0)
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 DeltaReq, i1 string) error {
+	return s.impl.GetDeltas(ctx, call, i0, i1)
 }
 
 func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
@@ -632,8 +577,9 @@
 	Methods: []rpc.MethodDesc{
 		{
 			Name: "GetDeltas",
-			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
+			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector for one Database for either SyncGroup metadata or data.",
 			InArgs: []rpc.ArgDesc{
+				{"req", ``},       // DeltaReq
 				{"initiator", ``}, // string
 			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
@@ -691,18 +637,6 @@
 
 // SyncGetDeltasServerStream is the server stream for Sync.GetDeltas.
 type SyncGetDeltasServerStream interface {
-	// RecvStream returns the receiver side of the Sync.GetDeltas server stream.
-	RecvStream() interface {
-		// Advance stages an item so that it may be retrieved via Value.  Returns
-		// true iff there is an item to retrieve.  Advance must be called before
-		// Value is called.  May block if an item is not available.
-		Advance() bool
-		// Value returns the item that was staged by Advance.  May panic if Advance
-		// returned false or was not called.  Never blocks.
-		Value() DeltaReq
-		// Err returns any error encountered by Advance.  Never blocks.
-		Err() error
-	}
 	// SendStream returns the send side of the Sync.GetDeltas server stream.
 	SendStream() interface {
 		// Send places the item onto the output stream.  Returns errors encountered
@@ -722,8 +656,6 @@
 // a typesafe stub that implements SyncGetDeltasServerCall.
 type SyncGetDeltasServerCallStub struct {
 	rpc.StreamServerCall
-	valRecv DeltaReq
-	errRecv error
 }
 
 // Init initializes SyncGetDeltasServerCallStub from rpc.StreamServerCall.
@@ -731,34 +663,6 @@
 	s.StreamServerCall = call
 }
 
-// RecvStream returns the receiver side of the Sync.GetDeltas server stream.
-func (s *SyncGetDeltasServerCallStub) RecvStream() interface {
-	Advance() bool
-	Value() DeltaReq
-	Err() error
-} {
-	return implSyncGetDeltasServerCallRecv{s}
-}
-
-type implSyncGetDeltasServerCallRecv struct {
-	s *SyncGetDeltasServerCallStub
-}
-
-func (s implSyncGetDeltasServerCallRecv) Advance() bool {
-	s.s.valRecv = DeltaReq{}
-	s.s.errRecv = s.s.Recv(&s.s.valRecv)
-	return s.s.errRecv == nil
-}
-func (s implSyncGetDeltasServerCallRecv) Value() DeltaReq {
-	return s.s.valRecv
-}
-func (s implSyncGetDeltasServerCallRecv) Err() error {
-	if s.s.errRecv == io.EOF {
-		return nil
-	}
-	return s.s.errRecv
-}
-
 // SendStream returns the send side of the Sync.GetDeltas server stream.
 func (s *SyncGetDeltasServerCallStub) SendStream() interface {
 	Send(item DeltaResp) error
diff --git a/services/syncbase/server/interfaces/sync_types.go b/services/syncbase/server/interfaces/sync_types.go
new file mode 100644
index 0000000..39732b1
--- /dev/null
+++ b/services/syncbase/server/interfaces/sync_types.go
@@ -0,0 +1,21 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+func (in GenVector) DeepCopy() GenVector {
+	out := make(GenVector)
+	for p, inpgv := range in {
+		out[p] = inpgv.DeepCopy()
+	}
+	return out
+}
+
+func (in PrefixGenVector) DeepCopy() PrefixGenVector {
+	out := make(PrefixGenVector)
+	for id, gen := range in {
+		out[id] = gen
+	}
+	return out
+}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 692a731..65a7f32 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -29,6 +29,7 @@
 
 // PrefixGenVector is the generation vector for a data prefix, which maps each
 // device id to its last locally known generation in the scope of that prefix.
+// TODO(hpucha): Rename this type.
 type PrefixGenVector map[uint64]uint64
 
 // GenVector is the generation vector for a Database, and maps prefixes to their
@@ -70,6 +71,7 @@
 }
 
 // GroupId is a globally unique SyncGroup ID.
+// TODO(hpucha): Make this a string since now the syncgroup id is an object id.
 type GroupId uint64
 
 // Possible states for a SyncGroup.
@@ -99,21 +101,35 @@
 	Joiners     map[string]wire.SyncGroupMemberInfo // map of joiners to their metadata
 }
 
-// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested in within a Database (specified by the AppName/DbName) when
+// DeltaReq contains a request to sync either data or SyncGroup metadata for a
+// Database.
+type DeltaReq union {
+	Sgs SgDeltaReq
+	Data DataDeltaReq
+}
+
+// DataDeltaReq contains the initiator's genvector and the set of SyncGroups it
+// is interested in within a Database (specified by the AppName/DbName) when
 // requesting deltas for that Database.
-type DeltaReq struct {
+type DataDeltaReq struct {
 	AppName string
 	DbName string
 	SgIds   set[GroupId]
 	InitVec GenVector
 }
 
+// SgDeltaReq contains the initiator's genvector for the SyncGroups it is
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for those SyncGroups.
+type SgDeltaReq struct {
+	AppName string
+	DbName string
+	InitVec GenVector // Contains a genvector per SyncGroup.
+}
+
 // DeltaResp contains the responder's genvector or the missing log records
 // returned in response to an initiator's request for deltas for a Database.
 type DeltaResp union {
-	Start   bool
-	Finish  bool
 	Rec     LogRec
 	RespVec GenVector
 }
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index 1fb1396..ca29abc 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -20,6 +20,7 @@
 
 // PrefixGenVector is the generation vector for a data prefix, which maps each
 // device id to its last locally known generation in the scope of that prefix.
+// TODO(hpucha): Rename this type.
 type PrefixGenVector map[uint64]uint64
 
 func (PrefixGenVector) __VDLReflect(struct {
@@ -78,6 +79,7 @@
 }
 
 // GroupId is a globally unique SyncGroup ID.
+// TODO(hpucha): Make this a string since now the syncgroup id is an object id.
 type GroupId uint64
 
 func (GroupId) __VDLReflect(struct {
@@ -157,18 +159,72 @@
 }) {
 }
 
-// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested in within a Database (specified by the AppName/DbName) when
+type (
+	// DeltaReq represents any single field of the DeltaReq union type.
+	//
+	// DeltaReq contains a request to sync either data or SyncGroup metadata for a
+	// Database.
+	DeltaReq interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the DeltaReq union type.
+		__VDLReflect(__DeltaReqReflect)
+	}
+	// DeltaReqSgs represents field Sgs of the DeltaReq union type.
+	DeltaReqSgs struct{ Value SgDeltaReq }
+	// DeltaReqData represents field Data of the DeltaReq union type.
+	DeltaReqData struct{ Value DataDeltaReq }
+	// __DeltaReqReflect describes the DeltaReq union type.
+	__DeltaReqReflect struct {
+		Name  string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+		Type  DeltaReq
+		Union struct {
+			Sgs  DeltaReqSgs
+			Data DeltaReqData
+		}
+	}
+)
+
+func (x DeltaReqSgs) Index() int                     { return 0 }
+func (x DeltaReqSgs) Interface() interface{}         { return x.Value }
+func (x DeltaReqSgs) Name() string                   { return "Sgs" }
+func (x DeltaReqSgs) __VDLReflect(__DeltaReqReflect) {}
+
+func (x DeltaReqData) Index() int                     { return 1 }
+func (x DeltaReqData) Interface() interface{}         { return x.Value }
+func (x DeltaReqData) Name() string                   { return "Data" }
+func (x DeltaReqData) __VDLReflect(__DeltaReqReflect) {}
+
+// DataDeltaReq contains the initiator's genvector and the set of SyncGroups it
+// is interested in within a Database (specified by the AppName/DbName) when
 // requesting deltas for that Database.
-type DeltaReq struct {
+type DataDeltaReq struct {
 	AppName string
 	DbName  string
 	SgIds   map[GroupId]struct{}
 	InitVec GenVector
 }
 
-func (DeltaReq) __VDLReflect(struct {
-	Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+func (DataDeltaReq) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DataDeltaReq"`
+}) {
+}
+
+// SgDeltaReq contains the initiator's genvector for the SyncGroups it is
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for those SyncGroups.
+type SgDeltaReq struct {
+	AppName string
+	DbName  string
+	InitVec GenVector // Contains a genvector per SyncGroup.
+}
+
+func (SgDeltaReq) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.SgDeltaReq"`
 }) {
 }
 
@@ -187,10 +243,6 @@
 		// __VDLReflect describes the DeltaResp union type.
 		__VDLReflect(__DeltaRespReflect)
 	}
-	// DeltaRespStart represents field Start of the DeltaResp union type.
-	DeltaRespStart struct{ Value bool }
-	// DeltaRespFinish represents field Finish of the DeltaResp union type.
-	DeltaRespFinish struct{ Value bool }
 	// DeltaRespRec represents field Rec of the DeltaResp union type.
 	DeltaRespRec struct{ Value LogRec }
 	// DeltaRespRespVec represents field RespVec of the DeltaResp union type.
@@ -200,30 +252,18 @@
 		Name  string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaResp"`
 		Type  DeltaResp
 		Union struct {
-			Start   DeltaRespStart
-			Finish  DeltaRespFinish
 			Rec     DeltaRespRec
 			RespVec DeltaRespRespVec
 		}
 	}
 )
 
-func (x DeltaRespStart) Index() int                      { return 0 }
-func (x DeltaRespStart) Interface() interface{}          { return x.Value }
-func (x DeltaRespStart) Name() string                    { return "Start" }
-func (x DeltaRespStart) __VDLReflect(__DeltaRespReflect) {}
-
-func (x DeltaRespFinish) Index() int                      { return 1 }
-func (x DeltaRespFinish) Interface() interface{}          { return x.Value }
-func (x DeltaRespFinish) Name() string                    { return "Finish" }
-func (x DeltaRespFinish) __VDLReflect(__DeltaRespReflect) {}
-
-func (x DeltaRespRec) Index() int                      { return 2 }
+func (x DeltaRespRec) Index() int                      { return 0 }
 func (x DeltaRespRec) Interface() interface{}          { return x.Value }
 func (x DeltaRespRec) Name() string                    { return "Rec" }
 func (x DeltaRespRec) __VDLReflect(__DeltaRespReflect) {}
 
-func (x DeltaRespRespVec) Index() int                      { return 3 }
+func (x DeltaRespRespVec) Index() int                      { return 1 }
 func (x DeltaRespRespVec) Interface() interface{}          { return x.Value }
 func (x DeltaRespRespVec) Name() string                    { return "RespVec" }
 func (x DeltaRespRespVec) __VDLReflect(__DeltaRespReflect) {}
@@ -257,6 +297,8 @@
 	vdl.Register((*SyncGroupStatus)(nil))
 	vdl.Register((*SyncGroup)(nil))
 	vdl.Register((*DeltaReq)(nil))
+	vdl.Register((*DataDeltaReq)(nil))
+	vdl.Register((*SgDeltaReq)(nil))
 	vdl.Register((*DeltaResp)(nil))
 	vdl.Register((*ChunkHash)(nil))
 	vdl.Register((*ChunkData)(nil))
diff --git a/services/syncbase/vsync/blob.go b/services/syncbase/vsync/blob.go
index 51a8f7c..eb78d00 100644
--- a/services/syncbase/vsync/blob.go
+++ b/services/syncbase/vsync/blob.go
@@ -380,33 +380,7 @@
 func (sd *syncDatabase) getMountTables(ctx *context.T, peer string) (map[string]struct{}, error) {
 	ss := sd.sync.(*syncService)
 	mInfo := ss.copyMemberInfo(ctx, peer)
-
-	mtTables := make(map[string]struct{})
-	for gdbName, sgInfo := range mInfo.db2sg {
-		appName, dbName, err := splitAppDbName(ctx, gdbName)
-		if err != nil {
-			return nil, err
-		}
-		st, err := ss.getDbStore(ctx, nil, appName, dbName)
-		if err != nil {
-			return nil, err
-		}
-
-		for id := range sgInfo {
-			sg, err := getSyncGroupById(ctx, st, id)
-			if err != nil {
-				continue
-			}
-			if _, ok := sg.Joiners[peer]; !ok {
-				// Peer is no longer part of the SyncGroup.
-				continue
-			}
-			for _, mt := range sg.Spec.MountTables {
-				mtTables[mt] = struct{}{}
-			}
-		}
-	}
-	return mtTables, nil
+	return mInfo.mtTables, nil
 }
 
 // TODO(hpucha): Persist the blob directory periodically.
diff --git a/services/syncbase/vsync/conflict_resolution.go b/services/syncbase/vsync/conflict_resolution.go
index a8e41f6..689e2d3 100644
--- a/services/syncbase/vsync/conflict_resolution.go
+++ b/services/syncbase/vsync/conflict_resolution.go
@@ -131,11 +131,7 @@
 		if err != nil {
 			return nil, err
 		}
-		dev, gen, err := splitLogRecKey(ctx, logKey)
-		if err != nil {
-			return nil, err
-		}
-		lrecs[p], err = getLogRec(ctx, iSt.tx, dev, gen)
+		lrecs[p], err = getLogRecByKey(ctx, iSt.tx, logKey)
 		if err != nil {
 			return nil, err
 		}
diff --git a/services/syncbase/vsync/dag_test.go b/services/syncbase/vsync/dag_test.go
index 2644655..c72b3a7 100644
--- a/services/syncbase/vsync/dag_test.go
+++ b/services/syncbase/vsync/dag_test.go
@@ -335,7 +335,7 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
 	}
 
@@ -411,10 +411,10 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:data:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
 	}
 
@@ -494,13 +494,13 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:data:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
 	}
-	if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
+	if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:data:10:2" {
 		t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
 	}
 
@@ -587,13 +587,13 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:data:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:2" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:2" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
 	}
-	if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
+	if logrec, err := getLogRecKey(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:data:10:2" {
 		t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
 	}
 
@@ -674,10 +674,10 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:data:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
+	if logrec, err := getLogRecKey(nil, st, oid, newHead); err != nil || logrec != "$sync:log:data:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
 	}
 
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index 9fa297f..5875e21 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -4,11 +4,10 @@
 
 package vsync
 
-// Initiator is a goroutine that periodically picks a peer from all the known
-// remote peers, and requests deltas from that peer for all the SyncGroups in
-// common across all apps/databases. It then modifies the sync metadata (DAG and
-// local log records) based on the deltas, detects and resolves conflicts if
-// any, and suitably updates the local Databases.
+// Initiator requests deltas from a chosen peer for all the SyncGroups in common
+// across all apps/databases. It then modifies the sync metadata (DAG and local
+// log records) based on the deltas, detects and resolves conflicts if any, and
+// suitably updates the local Databases.
 
 import (
 	"sort"
@@ -21,6 +20,7 @@
 	"v.io/v23/vdl"
 	"v.io/v23/verror"
 	"v.io/v23/vom"
+	"v.io/x/lib/set"
 	"v.io/x/lib/vlog"
 	"v.io/x/ref/services/syncbase/server/interfaces"
 	"v.io/x/ref/services/syncbase/server/util"
@@ -28,204 +28,167 @@
 	"v.io/x/ref/services/syncbase/store"
 )
 
-// Policies to pick a peer to sync with.
-const (
-	// Picks a peer at random from the available set.
-	selectRandom = iota
-
-	// TODO(hpucha): implement other policies.
-	// Picks a peer with most differing generations.
-	selectMostDiff
-
-	// Picks a peer that was synced with the furthest in the past.
-	selectOldest
-)
-
-var (
-	// peerSyncInterval is the duration between two consecutive peer
-	// contacts. During every peer contact, the initiator obtains any
-	// pending updates from that peer.
-	peerSyncInterval = 50 * time.Millisecond
-
-	// peerSelectionPolicy is the policy used to select a peer when
-	// the initiator gets a chance to sync.
-	peerSelectionPolicy = selectRandom
-)
-
-// syncer wakes up every peerSyncInterval to do work: (1) Act as an initiator
-// for SyncGroup metadata by selecting a SyncGroup Admin, and syncing Syncgroup
-// metadata with it (getting updates from the remote peer, detecting and
-// resolving conflicts); (2) Refresh memberView if needed and act as an
-// initiator for data by selecting a peer, and syncing data corresponding to all
-// common SyncGroups across all Databases; (3) Act as a SyncGroup publisher to
-// publish pending SyncGroups; (4) Garbage collect older generations.
-//
-// TODO(hpucha): Currently only does initiation. Add rest.
-func (s *syncService) syncer(ctx *context.T) {
-	defer s.pending.Done()
-
-	ticker := time.NewTicker(peerSyncInterval)
-	defer ticker.Stop()
-
-	for {
-		select {
-		case <-s.closed:
-			vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
-			return
-
-		case <-ticker.C:
-		}
-
-		// TODO(hpucha): Cut a gen for the responder even if there is no
-		// one to initiate to?
-
-		// Do work.
-		peer, err := s.pickPeer(ctx)
-		if err != nil {
-			continue
-		}
-		s.getDeltasFromPeer(ctx, peer)
-	}
-}
-
-// getDeltasFromPeer performs an initiation round to the specified
-// peer. An initiation round consists of:
+// getDeltas performs an initiation round to the specified peer. An
+// initiation round consists of two sync rounds:
+// * Sync SyncGroup metadata.
+// * Sync data.
+// Each sync round involves:
 // * Contacting the peer to receive all the deltas based on the local genvector.
 // * Processing those deltas to discover objects which have been updated.
 // * Processing updated objects to detect and resolve any conflicts if needed.
-// * Communicating relevant object updates to the Database, and updating local
-// genvector to catch up to the received remote genvector.
+// * Communicating relevant object updates to the Database in case of data.
+// * Updating local genvector to catch up to the received remote genvector.
 //
-// The processing of the deltas is done one Database at a time. If a local error
-// is encountered during the processing of a Database, that Database is skipped
-// and the initiator continues on to the next one. If the connection to the peer
+// The processing of the deltas is done one Database at a time, encompassing all
+// the SyncGroups common to the initiator and the responder. If a local error is
+// encountered during the processing of a Database, that Database is skipped and
+// the initiator continues on to the next one. If the connection to the peer
 // encounters an error, this initiation round is aborted. Note that until the
 // local genvector is updated based on the received deltas (the last step in an
 // initiation round), the work done by the initiator is idempotent.
 //
 // TODO(hpucha): Check the idempotence, esp in addNode in DAG.
-func (s *syncService) getDeltasFromPeer(ctxIn *context.T, peer string) {
-	vlog.VI(2).Infof("sync: getDeltasFromPeer: begin: contacting peer %s", peer)
-	defer vlog.VI(2).Infof("sync: getDeltasFromPeer: end: contacting peer %s", peer)
-
-	ctx, cancel := context.WithRootCancel(ctxIn)
+func (s *syncService) getDeltas(ctx *context.T, peer string) {
+	vlog.VI(2).Infof("sync: getDeltas: begin: contacting peer %s", peer)
+	defer vlog.VI(2).Infof("sync: getDeltas: end: contacting peer %s", peer)
 
 	info := s.copyMemberInfo(ctx, peer)
 	if info == nil {
-		vlog.Fatalf("sync: getDeltasFromPeer: missing information in member view for %q", peer)
+		vlog.Fatalf("sync: getDeltas: missing information in member view for %q", peer)
 	}
-	connected := false
-	var stream interfaces.SyncGetDeltasClientCall
+
+	// Preferred mount tables for this peer.
+	prfMtTbls := set.String.ToSlice(info.mtTables)
 
 	// Sync each Database that may have SyncGroups common with this peer,
 	// one at a time.
-	for gdbName, sgInfo := range info.db2sg {
+	for gdbName := range info.db2sg {
+		vlog.VI(4).Infof("sync: getDeltas: started for peer %s db %s", peer, gdbName)
 
-		// Initialize initiation state for syncing this Database.
-		iSt, err := newInitiationState(ctx, s, peer, gdbName, sgInfo)
-		if err != nil {
-			vlog.Errorf("sync: getDeltasFromPeer: couldn't initialize initiator state for peer %s, gdb %s, err %v", peer, gdbName, err)
-			continue
-		}
-
-		if len(iSt.sgIds) == 0 || len(iSt.sgPfxs) == 0 {
-			vlog.Errorf("sync: getDeltasFromPeer: didn't find any SyncGroups for peer %s, gdb %s, err %v", peer, gdbName, err)
-			continue
-		}
-
-		// Make contact with the peer once.
-		if !connected {
-			stream, connected = iSt.connectToPeer(ctx)
-			if !connected {
-				// Try a different Database. Perhaps there are
-				// different mount tables.
-				continue
-			}
-		}
-
-		// Create local genvec so that it contains knowledge only about common prefixes.
-		if err := iSt.createLocalGenVec(ctx); err != nil {
-			vlog.Errorf("sync: getDeltasFromPeer: error creating local genvec for gdb %s, err %v", gdbName, err)
-			continue
-		}
-
-		iSt.stream = stream
-		req := interfaces.DeltaReq{
-			AppName: iSt.appName,
-			DbName:  iSt.dbName,
-			SgIds:   iSt.sgIds,
-			InitVec: iSt.local,
-		}
-
-		vlog.VI(3).Infof("sync: getDeltasFromPeer: send request: %v", req)
-		sender := iSt.stream.SendStream()
-		sender.Send(req)
-
-		// Obtain deltas from the peer over the network.
-		if err := iSt.recvAndProcessDeltas(ctx); err != nil {
-			vlog.Errorf("sync: getDeltasFromPeer: error receiving deltas for gdb %s, err %v", gdbName, err)
-			// Returning here since something could be wrong with
-			// the connection, and no point in attempting the next
-			// Database.
-			cancel()
-			stream.Finish()
+		if len(prfMtTbls) < 1 {
+			vlog.Errorf("sync: getDeltas: no mount tables found to connect to peer %s", peer)
 			return
 		}
-		vlog.VI(3).Infof("sync: getDeltasFromPeer: got reply: %v", iSt.remote)
 
-		if err := iSt.processUpdatedObjects(ctx); err != nil {
-			vlog.Errorf("sync: getDeltasFromPeer: error processing objects for gdb %s, err %v", gdbName, err)
-			// Move to the next Database even if processing updates
-			// failed.
+		c, err := newInitiationConfig(ctx, s, peer, gdbName, info, prfMtTbls)
+		if err != nil {
+			vlog.Errorf("sync: getDeltas: couldn't initialize initiator config for peer %s, gdb %s, err %v", peer, gdbName, err)
 			continue
 		}
+
+		if err := s.getDBDeltas(ctx, peer, c, true); err == nil {
+			if err := s.getDBDeltas(ctx, peer, c, false); err != nil {
+				vlog.Errorf("sync: getDeltas: failed for data sync, err %v", err)
+			}
+		} else {
+			// If SyncGroup sync fails, abort data sync as well.
+			vlog.Errorf("sync: getDeltas: failed for SyncGroup sync, err %v", err)
+		}
+
+		// Cache the pruned mount table list for the next Database.
+		prfMtTbls = c.mtTables
+
+		vlog.VI(4).Infof("sync: getDeltas: done for peer %s db %s", peer, gdbName)
+	}
+}
+
+// getDBDeltas gets the deltas from the chosen peer. If sg flag is set to true,
+// it will sync SyncGroup metadata. If sg flag is false, it will sync data.
+func (s *syncService) getDBDeltas(ctxIn *context.T, peer string, c *initiationConfig, sg bool) error {
+	vlog.VI(2).Infof("sync: getDBDeltas: begin: contacting peer sg %v %s", sg, peer)
+	defer vlog.VI(2).Infof("sync: getDBDeltas: end: contacting peer sg %v %s", sg, peer)
+
+	ctx, cancel := context.WithRootCancel(ctxIn)
+	// cancel() is idempotent.
+	defer cancel()
+
+	// Initialize initiation state for syncing this Database.
+	iSt := newInitiationState(ctx, c, sg)
+
+	// Initialize SyncGroup prefixes for data syncing.
+	if !sg {
+		iSt.peerSgInfo(ctx)
+		if len(iSt.config.sgPfxs) == 0 {
+			return verror.New(verror.ErrInternal, ctx, "no SyncGroup prefixes found", peer, iSt.config.appName, iSt.config.dbName)
+		}
 	}
 
-	if connected {
-		stream.Finish()
+	if sg {
+		// Create local genvec so that it contains knowledge about
+		// common SyncGroups and then send the SyncGroup metadata sync
+		// request.
+		if err := iSt.prepareSGDeltaReq(ctx); err != nil {
+			return err
+		}
+	} else {
+		// Create local genvec so that it contains knowledge only about common
+		// prefixes and then send the data sync request.
+		if err := iSt.prepareDataDeltaReq(ctx); err != nil {
+			return err
+		}
 	}
-	cancel()
+
+	// Make contact with the peer.
+	if !iSt.connectToPeer(ctx) {
+		return verror.New(verror.ErrInternal, ctx, "couldn't connect to peer", peer)
+	}
+
+	// Obtain deltas from the peer over the network.
+	if err := iSt.recvAndProcessDeltas(ctx); err != nil {
+		cancel()
+		iSt.stream.Finish()
+		return err
+	}
+
+	vlog.VI(4).Infof("sync: getDBDeltas: got reply: %v", iSt.remote)
+
+	if err := iSt.processUpdatedObjects(ctx); err != nil {
+		return err
+	}
+
+	return iSt.stream.Finish()
 }
 
 type sgSet map[interfaces.GroupId]struct{}
 
-// initiationState is accumulated for each Database during an initiation round.
-type initiationState struct {
-	// Relative name of the peer to sync with.
-	peer string
+// initiationConfig is the configuration information for a Database in an
+// initiation round.
+type initiationConfig struct {
+	peer string // relative name of the peer to sync with.
 
-	// Collection of mount tables that this peer may have registered with.
-	mtTables map[string]struct{}
+	// Mount tables that this peer may have registered with. The first entry
+	// in this array is the mount table where the peer was successfully
+	// reached the last time.
+	mtTables []string
 
-	// SyncGroups being requested in the initiation round.
-	sgIds sgSet
-
-	// SyncGroup prefixes being requested in the initiation round, and their
-	// corresponding SyncGroup ids.
-	sgPfxs map[string]sgSet
-
-	// Local generation vector.
-	local interfaces.GenVector
-
-	// Generation vector from the remote peer.
-	remote interfaces.GenVector
-
-	// Updated local generation vector at the end of the initiation round.
-	updLocal interfaces.GenVector
-
-	// State to track updated objects during a log replay.
-	updObjects map[string]*objConflictState
-
-	// DAG state that tracks conflicts and common ancestors.
-	dagGraft graftMap
-
+	sgIds   sgSet            // SyncGroups being requested in the initiation round.
+	sgPfxs  map[string]sgSet // SyncGroup prefixes and their ids being requested in the initiation round.
 	sync    *syncService
 	appName string
 	dbName  string
-	st      store.Store                        // Store handle to the Database.
-	stream  interfaces.SyncGetDeltasClientCall // Stream handle for the GetDeltas RPC.
+	st      store.Store // Store handle to the Database.
+}
 
-	// Transaction handle for the initiation round. Used during the update
+// initiationState is accumulated for a Database in each sync round in an
+// initiation round.
+type initiationState struct {
+	// Config information.
+	config *initiationConfig
+
+	// Accumulated sync state.
+	local      interfaces.GenVector         // local generation vector.
+	remote     interfaces.GenVector         // generation vector from the remote peer.
+	updLocal   interfaces.GenVector         // updated local generation vector at the end of sync round.
+	updObjects map[string]*objConflictState // tracks updated objects during a log replay.
+	dagGraft   graftMap                     // DAG state that tracks conflicts and common ancestors.
+
+	req    interfaces.DeltaReq                // GetDeltas RPC request.
+	stream interfaces.SyncGetDeltasClientCall // stream handle for the GetDeltas RPC.
+
+	// Flag to indicate if this is SyncGroup metadata sync.
+	sg bool
+
+	// Transaction handle for the sync round. Used during the update
 	// of objects in the Database.
 	tx store.Transaction
 }
@@ -240,93 +203,90 @@
 	res        *conflictResolution
 }
 
-// newInitiationState creates new initiation state.
-func newInitiationState(ctx *context.T, s *syncService, peer string, name string, sgInfo sgMemberInfo) (*initiationState, error) {
-	iSt := &initiationState{}
-	iSt.peer = peer
-	iSt.updObjects = make(map[string]*objConflictState)
-	iSt.dagGraft = newGraft()
-	iSt.sync = s
+// newInitiatonConfig creates new initiation config. This will be shared between
+// the two sync rounds in the initiation round of a Database.
+func newInitiationConfig(ctx *context.T, s *syncService, peer string, name string, info *memberInfo, mtTables []string) (*initiationConfig, error) {
+	c := &initiationConfig{}
+	c.peer = peer
+	c.mtTables = mtTables
+	c.sgIds = make(sgSet)
+	for id := range info.db2sg[name] {
+		c.sgIds[id] = struct{}{}
+	}
+	if len(c.sgIds) == 0 {
+		return nil, verror.New(verror.ErrInternal, ctx, "no SyncGroups found", peer, name)
+	}
+	// Note: sgPfxs will be inited when needed by the data sync.
+
+	c.sync = s
 
 	// TODO(hpucha): Would be nice to standardize on the combined "app:db"
 	// name across sync (not syncbase) so we only join split/join them at
 	// the boundary with the store part.
 	var err error
-	iSt.appName, iSt.dbName, err = splitAppDbName(ctx, name)
+	c.appName, c.dbName, err = splitAppDbName(ctx, name)
 	if err != nil {
 		return nil, err
 	}
 
 	// TODO(hpucha): nil rpc.ServerCall ok?
-	iSt.st, err = s.getDbStore(ctx, nil, iSt.appName, iSt.dbName)
+	c.st, err = s.getDbStore(ctx, nil, c.appName, c.dbName)
 	if err != nil {
 		return nil, err
 	}
 
-	iSt.peerMtTblsAndSgInfo(ctx, peer, sgInfo)
-
-	return iSt, nil
+	return c, nil
 }
 
-// peerMtTblsAndSgInfo computes the possible mount tables, the SyncGroup Ids and
-// prefixes common with a remote peer in a particular Database by consulting the
-// SyncGroups in the specified Database.
-func (iSt *initiationState) peerMtTblsAndSgInfo(ctx *context.T, peer string, info sgMemberInfo) {
-	iSt.mtTables = make(map[string]struct{})
-	iSt.sgIds = make(sgSet)
-	iSt.sgPfxs = make(map[string]sgSet)
+// newInitiationState creates new initiation state.
+func newInitiationState(ctx *context.T, c *initiationConfig, sg bool) *initiationState {
+	iSt := &initiationState{}
+	iSt.config = c
+	iSt.updObjects = make(map[string]*objConflictState)
+	iSt.dagGraft = newGraft()
+	iSt.sg = sg
+	return iSt
+}
 
-	for id := range info {
-		sg, err := getSyncGroupById(ctx, iSt.st, id)
+// peerSgInfo computes the SyncGroup Ids and prefixes common with a remote peer
+// in a particular Database by consulting the SyncGroups in the specified
+// Database.
+func (iSt *initiationState) peerSgInfo(ctx *context.T) {
+	sgs := iSt.config.sgIds
+	iSt.config.sgIds = make(sgSet) // regenerate the sgids since we are going through the SyncGroups in any case.
+	iSt.config.sgPfxs = make(map[string]sgSet)
+
+	for id := range sgs {
+		sg, err := getSyncGroupById(ctx, iSt.config.st, id)
 		if err != nil {
 			continue
 		}
-		if _, ok := sg.Joiners[peer]; !ok {
+		if _, ok := sg.Joiners[iSt.config.peer]; !ok {
 			// Peer is no longer part of the SyncGroup.
 			continue
 		}
-		for _, mt := range sg.Spec.MountTables {
-			iSt.mtTables[mt] = struct{}{}
-		}
-		iSt.sgIds[id] = struct{}{}
+
+		iSt.config.sgIds[id] = struct{}{}
 
 		for _, p := range sg.Spec.Prefixes {
-			sgs, ok := iSt.sgPfxs[p]
+			sgs, ok := iSt.config.sgPfxs[p]
 			if !ok {
 				sgs = make(sgSet)
-				iSt.sgPfxs[p] = sgs
+				iSt.config.sgPfxs[p] = sgs
 			}
 			sgs[id] = struct{}{}
 		}
 	}
 }
 
-// connectToPeer attempts to connect to the remote peer using the mount tables
-// obtained from the SyncGroups being synced in the current Database.
-func (iSt *initiationState) connectToPeer(ctx *context.T) (interfaces.SyncGetDeltasClientCall, bool) {
-	if len(iSt.mtTables) < 1 {
-		vlog.Errorf("sync: connectToPeer: no mount tables found to connect to peer %s, app %s db %s", iSt.peer, iSt.appName, iSt.dbName)
-		return nil, false
-	}
-	for mt := range iSt.mtTables {
-		absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix)
-		c := interfaces.SyncClient(absName)
-		stream, err := c.GetDeltas(ctx, iSt.sync.name)
-		if err == nil {
-			vlog.VI(3).Infof("sync: connectToPeer: established on %s", absName)
-			return stream, true
-		}
-	}
-	return nil, false
-}
-
-// createLocalGenVec creates the generation vector with local knowledge for the
-// initiator to send to the responder.
+// prepareDataDeltaReq creates the generation vector with local knowledge for the
+// initiator to send to the responder, and creates the request to start the data
+// sync.
 //
 // TODO(hpucha): Refactor this code with computeDelta code in sync_state.go.
-func (iSt *initiationState) createLocalGenVec(ctx *context.T) error {
-	iSt.sync.thLock.Lock()
-	defer iSt.sync.thLock.Unlock()
+func (iSt *initiationState) prepareDataDeltaReq(ctx *context.T) error {
+	iSt.config.sync.thLock.Lock()
+	defer iSt.config.sync.thLock.Unlock()
 
 	// Freeze the most recent batch of local changes before fetching
 	// remote changes from a peer. This frozen state is used by the
@@ -341,19 +301,20 @@
 	// devices.  These remote devices in turn can send these
 	// generations back to the initiator in progress which was
 	// started with older generation information.
-	if err := iSt.sync.checkptLocalGen(ctx, iSt.appName, iSt.dbName); err != nil {
+	if err := iSt.config.sync.checkptLocalGen(ctx, iSt.config.appName, iSt.config.dbName, nil); err != nil {
 		return err
 	}
 
-	local, lgen, err := iSt.sync.copyDbGenInfo(ctx, iSt.appName, iSt.dbName)
+	local, lgen, err := iSt.config.sync.copyDbGenInfo(ctx, iSt.config.appName, iSt.config.dbName, nil)
 	if err != nil {
 		return err
 	}
+
 	localPfxs := extractAndSortPrefixes(local)
 
-	sgPfxs := make([]string, len(iSt.sgPfxs))
+	sgPfxs := make([]string, len(iSt.config.sgPfxs))
 	i := 0
-	for p := range iSt.sgPfxs {
+	for p := range iSt.config.sgPfxs {
 		sgPfxs[i] = p
 		i++
 	}
@@ -390,14 +351,84 @@
 		if lpStart == "" {
 			// No matching prefixes for pfx were found.
 			iSt.local[pfx] = make(interfaces.PrefixGenVector)
-			iSt.local[pfx][iSt.sync.id] = lgen
+			iSt.local[pfx][iSt.config.sync.id] = lgen
 		} else {
 			iSt.local[pfx] = local[lpStart]
 		}
 	}
+
+	// Send request.
+	req := interfaces.DataDeltaReq{
+		AppName: iSt.config.appName,
+		DbName:  iSt.config.dbName,
+		SgIds:   iSt.config.sgIds,
+		InitVec: iSt.local,
+	}
+
+	iSt.req = interfaces.DeltaReqData{req}
+
+	vlog.VI(4).Infof("sync: prepareDataDeltaReq: request: %v", req)
+
 	return nil
 }
 
+// prepareSGDeltaReq creates the SyncGroup generation vector with local knowledge
+// for the initiator to send to the responder, and prepares the request to start
+// the SyncGroup sync.
+func (iSt *initiationState) prepareSGDeltaReq(ctx *context.T) error {
+	iSt.config.sync.thLock.Lock()
+	defer iSt.config.sync.thLock.Unlock()
+
+	if err := iSt.config.sync.checkptLocalGen(ctx, iSt.config.appName, iSt.config.dbName, iSt.config.sgIds); err != nil {
+		return err
+	}
+
+	var err error
+	iSt.local, _, err = iSt.config.sync.copyDbGenInfo(ctx, iSt.config.appName, iSt.config.dbName, iSt.config.sgIds)
+	if err != nil {
+		return err
+	}
+
+	// Send request.
+	req := interfaces.SgDeltaReq{
+		AppName: iSt.config.appName,
+		DbName:  iSt.config.dbName,
+		InitVec: iSt.local,
+	}
+
+	iSt.req = interfaces.DeltaReqSgs{req}
+
+	vlog.VI(4).Infof("sync: prepareSGDeltaReq: request: %v", req)
+
+	return nil
+}
+
+// connectToPeer attempts to connect to the remote peer using the mount tables
+// obtained from all the common SyncGroups.
+func (iSt *initiationState) connectToPeer(ctx *context.T) bool {
+	if len(iSt.config.mtTables) < 1 {
+		vlog.Errorf("sync: connectToPeer: no mount tables found to connect to peer %s, app %s db %s", iSt.config.peer, iSt.config.appName, iSt.config.dbName)
+		return false
+	}
+
+	for i, mt := range iSt.config.mtTables {
+		absName := naming.Join(mt, iSt.config.peer, util.SyncbaseSuffix)
+		c := interfaces.SyncClient(absName)
+		var err error
+		iSt.stream, err = c.GetDeltas(ctx, iSt.req, iSt.config.sync.name)
+		if err == nil {
+			vlog.VI(4).Infof("sync: connectToPeer: established on %s", absName)
+
+			// Prune out the unsuccessful mount tables.
+			iSt.config.mtTables = iSt.config.mtTables[i:]
+			return true
+		}
+	}
+	iSt.config.mtTables = nil
+	vlog.Errorf("sync: connectToPeer: couldn't connect to peer %s", iSt.config.peer)
+	return false
+}
+
 // recvAndProcessDeltas first receives the log records and generation vector
 // from the GetDeltas RPC and puts them in the Database. It also replays the
 // entire log stream as the log records arrive. These records span multiple
@@ -405,20 +436,28 @@
 // resolution during replay.  This avoids resolving conflicts that have already
 // been resolved by other devices.
 func (iSt *initiationState) recvAndProcessDeltas(ctx *context.T) error {
-	iSt.sync.thLock.Lock()
-	defer iSt.sync.thLock.Unlock()
+	// This is to handle issues with graftMap in DAG. Ideally, the
+	// transaction created to store all the deltas received over the network
+	// should not contend with any other store changes since this is all
+	// brand new information. However, as log records are received over the
+	// network, they are also incrementally processed. To enable incremental
+	// processing, the current head of each dirty object is read to populate
+	// the graftMap. This read can potentially contend with the watcher
+	// updating the head of an object. This lock prevents that contention in
+	// order to avoid retrying the whole transaction.
+	iSt.config.sync.thLock.Lock()
+	defer iSt.config.sync.thLock.Unlock()
 
 	// TODO(hpucha): This works for now, but figure out a long term solution
 	// as this may be implementation dependent. It currently works because
 	// the RecvStream call is stateless, and grabbing a handle to it
 	// repeatedly doesn't affect what data is seen next.
 	rcvr := iSt.stream.RecvStream()
-	start, finish := false, false
 
 	// TODO(hpucha): See if we can avoid committing the entire delta stream
 	// as one batch. Currently the dependency is between the log records and
 	// the batch info.
-	tx := iSt.st.NewTransaction()
+	tx := iSt.config.st.NewTransaction()
 	committed := false
 
 	defer func() {
@@ -433,18 +472,6 @@
 	for rcvr.Advance() {
 		resp := rcvr.Value()
 		switch v := resp.(type) {
-		case interfaces.DeltaRespStart:
-			if start {
-				return verror.New(verror.ErrInternal, ctx, "received start followed by start in delta response stream")
-			}
-			start = true
-
-		case interfaces.DeltaRespFinish:
-			if finish {
-				return verror.New(verror.ErrInternal, ctx, "received finish followed by finish in delta response stream")
-			}
-			finish = true
-
 		case interfaces.DeltaRespRespVec:
 			iSt.remote = v.Value
 
@@ -452,12 +479,18 @@
 			// Insert log record in Database.
 			// TODO(hpucha): Should we reserve more positions in a batch?
 			// TODO(hpucha): Handle if SyncGroup is left/destroyed while sync is in progress.
-			pos := iSt.sync.reservePosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
+			var pos uint64
+			if iSt.sg {
+				pos = iSt.config.sync.reservePosInDbLog(ctx, iSt.config.appName, iSt.config.dbName, v.Value.Metadata.ObjId, 1)
+			} else {
+				pos = iSt.config.sync.reservePosInDbLog(ctx, iSt.config.appName, iSt.config.dbName, "", 1)
+			}
+
 			rec := &localLogRec{Metadata: v.Value.Metadata, Pos: pos}
 			batchId := rec.Metadata.BatchId
 			if batchId != NoBatchId {
 				if cnt, ok := batchMap[batchId]; !ok {
-					if iSt.sync.startBatch(ctx, tx, batchId) != batchId {
+					if iSt.config.sync.startBatch(ctx, tx, batchId) != batchId {
 						return verror.New(verror.ErrInternal, ctx, "failed to create batch info")
 					}
 					batchMap[batchId] = rec.Metadata.BatchCount
@@ -467,27 +500,25 @@
 			}
 
 			vlog.VI(4).Infof("sync: recvAndProcessDeltas: processing rec %v", rec)
-			if err := iSt.insertRecInLogDagAndDb(ctx, rec, batchId, v.Value.Value, tx); err != nil {
+			if err := iSt.insertRecInLogAndDag(ctx, rec, batchId, tx); err != nil {
 				return err
 			}
 
-			// Check for BlobRefs, and process them.
-			if err := iSt.processBlobRefs(ctx, &rec.Metadata, v.Value.Value); err != nil {
-				return err
+			if iSt.sg {
+				// Add the SyncGroup value to the Database.
+			} else {
+				if err := iSt.insertRecInDb(ctx, rec, v.Value.Value, tx); err != nil {
+					return err
+				}
+				// Check for BlobRefs, and process them.
+				if err := iSt.processBlobRefs(ctx, &rec.Metadata, v.Value.Value); err != nil {
+					return err
+				}
 			}
 
 			// Mark object dirty.
 			iSt.updObjects[rec.Metadata.ObjId] = &objConflictState{}
 		}
-
-		// Break out of the stream.
-		if finish {
-			break
-		}
-	}
-
-	if !(start && finish) {
-		return verror.New(verror.ErrInternal, ctx, "didn't receive start/finish delimiters in delta response stream")
 	}
 
 	if err := rcvr.Err(); err != nil {
@@ -496,7 +527,7 @@
 
 	// End the started batches if any.
 	for bid, cnt := range batchMap {
-		if err := iSt.sync.endBatch(ctx, tx, bid, cnt); err != nil {
+		if err := iSt.config.sync.endBatch(ctx, tx, bid, cnt); err != nil {
 			return err
 		}
 	}
@@ -516,6 +547,48 @@
 	return err
 }
 
+// insertRecInLogAndDag adds a new log record to log and dag data structures.
+func (iSt *initiationState) insertRecInLogAndDag(ctx *context.T, rec *localLogRec, batchId uint64, tx store.Transaction) error {
+	var pfx string
+	m := rec.Metadata
+
+	if iSt.sg {
+		pfx = m.ObjId
+	} else {
+		pfx = logDataPrefix
+	}
+
+	if err := putLogRec(ctx, tx, pfx, rec); err != nil {
+		return err
+	}
+	logKey := logRecKey(pfx, m.Id, m.Gen)
+
+	var err error
+	switch m.RecType {
+	case interfaces.NodeRec:
+		err = iSt.config.sync.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, iSt.dagGraft)
+	case interfaces.LinkRec:
+		err = iSt.config.sync.addParent(ctx, tx, m.ObjId, m.CurVers, m.Parents[0], iSt.dagGraft)
+	default:
+		err = verror.New(verror.ErrInternal, ctx, "unknown log record type")
+	}
+
+	return err
+}
+
+// insertRecInDb inserts the versioned value in the Database.
+func (iSt *initiationState) insertRecInDb(ctx *context.T, rec *localLogRec, valbuf []byte, tx store.Transaction) error {
+	m := rec.Metadata
+	// TODO(hpucha): Hack right now. Need to change Database's handling of
+	// deleted objects. Currently, the initiator needs to treat deletions
+	// specially since deletions do not get a version number or a special
+	// value in the Database.
+	if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
+		return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers))
+	}
+	return nil
+}
+
 func (iSt *initiationState) processBlobRefs(ctx *context.T, m *interfaces.LogRecMetadata, valbuf []byte) error {
 	objid := m.ObjId
 	srcPeer := syncbaseIdToName(m.Id)
@@ -538,16 +611,16 @@
 	}
 	sgIds := make(sgSet)
 	for br := range brs {
-		for p, sgs := range iSt.sgPfxs {
+		for p, sgs := range iSt.config.sgPfxs {
 			if strings.HasPrefix(extractAppKey(objid), p) {
 				for sg := range sgs {
 					sgIds[sg] = struct{}{}
 				}
 			}
 		}
-		vlog.VI(4).Infof("sync: processBlobRefs: Found blobref %v peer %v, source %v, sgs %v", br, iSt.peer, srcPeer, sgIds)
-		info := &blobLocInfo{peer: iSt.peer, source: srcPeer, sgIds: sgIds}
-		if err := iSt.sync.addBlobLocInfo(ctx, br, info); err != nil {
+		vlog.VI(4).Infof("sync: processBlobRefs: Found blobref %v peer %v, source %v, sgs %v", br, iSt.config.peer, srcPeer, sgIds)
+		info := &blobLocInfo{peer: iSt.config.peer, source: srcPeer, sgIds: sgIds}
+		if err := iSt.config.sync.addBlobLocInfo(ctx, br, info); err != nil {
 			return err
 		}
 	}
@@ -578,39 +651,6 @@
 	return nil
 }
 
-// insertRecInLogDagAndDb adds a new log record to log and dag data structures,
-// and inserts the versioned value in the Database.
-func (iSt *initiationState) insertRecInLogDagAndDb(ctx *context.T, rec *localLogRec, batchId uint64, valbuf []byte, tx store.Transaction) error {
-	if err := putLogRec(ctx, tx, rec); err != nil {
-		return err
-	}
-
-	m := rec.Metadata
-	logKey := logRecKey(m.Id, m.Gen)
-
-	var err error
-	switch m.RecType {
-	case interfaces.NodeRec:
-		err = iSt.sync.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, iSt.dagGraft)
-	case interfaces.LinkRec:
-		err = iSt.sync.addParent(ctx, tx, m.ObjId, m.CurVers, m.Parents[0], iSt.dagGraft)
-	default:
-		err = verror.New(verror.ErrInternal, ctx, "unknown log record type")
-	}
-
-	if err != nil {
-		return err
-	}
-	// TODO(hpucha): Hack right now. Need to change Database's handling of
-	// deleted objects. Currently, the initiator needs to treat deletions
-	// specially since deletions do not get a version number or a special
-	// value in the Database.
-	if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
-		return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers))
-	}
-	return nil
-}
-
 // processUpdatedObjects processes all the updates received by the initiator,
 // one object at a time. Conflict detection and resolution is carried out after
 // the entire delta of log records is replayed, instead of incrementally after
@@ -660,15 +700,15 @@
 	}()
 
 	for {
-		vlog.VI(3).Infof("sync: processUpdatedObjects: begin: %d objects updated", len(iSt.updObjects))
+		vlog.VI(4).Infof("sync: processUpdatedObjects: begin: %d objects updated", len(iSt.updObjects))
 
-		iSt.tx = iSt.st.NewTransaction()
+		iSt.tx = iSt.config.st.NewTransaction()
 		watchable.SetTransactionFromSync(iSt.tx) // for echo-suppression
 
 		if count, err := iSt.detectConflicts(ctx); err != nil {
 			return err
 		} else {
-			vlog.VI(3).Infof("sync: processUpdatedObjects: %d conflicts detected", count)
+			vlog.VI(4).Infof("sync: processUpdatedObjects: %d conflicts detected", count)
 		}
 
 		if err := iSt.resolveConflicts(ctx); err != nil {
@@ -682,10 +722,10 @@
 		if err == nil {
 			committed = true
 			// Update in-memory genvector since commit is successful.
-			if err := iSt.sync.putDbGenInfoRemote(ctx, iSt.appName, iSt.dbName, iSt.updLocal); err != nil {
-				vlog.Fatalf("sync: processUpdatedObjects: putting geninfo in memory failed for app %s db %s, err %v", iSt.appName, iSt.dbName, err)
+			if err := iSt.config.sync.putDbGenInfoRemote(ctx, iSt.config.appName, iSt.config.dbName, iSt.sg, iSt.updLocal); err != nil {
+				vlog.Fatalf("sync: processUpdatedObjects: putting geninfo in memory failed for app %s db %s, err %v", iSt.config.appName, iSt.config.dbName, err)
 			}
-			vlog.VI(3).Info("sync: processUpdatedObjects: end: changes committed")
+			vlog.VI(4).Info("sync: processUpdatedObjects: end: changes committed")
 			return nil
 		}
 		if verror.ErrorID(err) != store.ErrConcurrentTransaction.ID {
@@ -699,7 +739,7 @@
 		// solution. Next iteration will have coordination with watch
 		// thread to intelligently retry. Hence this value is not a
 		// config param.
-		vlog.VI(3).Info("sync: processUpdatedObjects: retry due to local mutations")
+		vlog.VI(4).Info("sync: processUpdatedObjects: retry due to local mutations")
 		iSt.tx.Abort()
 		time.Sleep(1 * time.Second)
 	}
@@ -736,7 +776,7 @@
 		// If the local version is picked, no further updates to the
 		// Database are needed. If the remote version is picked or if a
 		// new version is created, we put it in the Database.
-		if confSt.res.ty != pickLocal {
+		if confSt.res.ty != pickLocal && !iSt.sg {
 
 			// TODO(hpucha): Hack right now. Need to change Database's
 			// handling of deleted objects.
@@ -815,10 +855,10 @@
 }
 
 // updateLogAndDag updates the log and dag data structures.
-func (iSt *initiationState) updateLogAndDag(ctx *context.T, obj string) error {
-	confSt, ok := iSt.updObjects[obj]
+func (iSt *initiationState) updateLogAndDag(ctx *context.T, objid string) error {
+	confSt, ok := iSt.updObjects[objid]
 	if !ok {
-		return verror.New(verror.ErrInternal, ctx, "object state not found", obj)
+		return verror.New(verror.ErrInternal, ctx, "object state not found", objid)
 	}
 	var newVersion string
 
@@ -831,11 +871,11 @@
 		switch {
 		case confSt.res.ty == pickLocal:
 			// Local version was picked as the conflict resolution.
-			rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.oldHead, confSt.newHead)
+			rec = iSt.createLocalLinkLogRec(ctx, objid, confSt.oldHead, confSt.newHead)
 			newVersion = confSt.oldHead
 		case confSt.res.ty == pickRemote:
 			// Remote version was picked as the conflict resolution.
-			rec = iSt.createLocalLinkLogRec(ctx, obj, confSt.newHead, confSt.oldHead)
+			rec = iSt.createLocalLinkLogRec(ctx, objid, confSt.newHead, confSt.oldHead)
 			newVersion = confSt.newHead
 		default:
 			// New version was created to resolve the conflict.
@@ -843,7 +883,13 @@
 			newVersion = confSt.res.rec.Metadata.CurVers
 		}
 
-		if err := putLogRec(ctx, iSt.tx, rec); err != nil {
+		var pfx string
+		if iSt.sg {
+			pfx = objid
+		} else {
+			pfx = logDataPrefix
+		}
+		if err := putLogRec(ctx, iSt.tx, pfx, rec); err != nil {
 			return err
 		}
 
@@ -852,9 +898,9 @@
 		m := rec.Metadata
 		switch m.RecType {
 		case interfaces.NodeRec:
-			err = iSt.sync.addNode(ctx, iSt.tx, obj, m.CurVers, logRecKey(m.Id, m.Gen), m.Delete, m.Parents, NoBatchId, nil)
+			err = iSt.config.sync.addNode(ctx, iSt.tx, objid, m.CurVers, logRecKey(pfx, m.Id, m.Gen), m.Delete, m.Parents, NoBatchId, nil)
 		case interfaces.LinkRec:
-			err = iSt.sync.addParent(ctx, iSt.tx, obj, m.CurVers, m.Parents[0], nil)
+			err = iSt.config.sync.addParent(ctx, iSt.tx, objid, m.CurVers, m.Parents[0], nil)
 		default:
 			return verror.New(verror.ErrInternal, ctx, "unknown log record type")
 		}
@@ -865,21 +911,26 @@
 
 	// Move the head. This should be idempotent. We may move head to the
 	// local head in some cases.
-	return moveHead(ctx, iSt.tx, obj, newVersion)
+	return moveHead(ctx, iSt.tx, objid, newVersion)
 }
 
-func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, obj, vers, par string) *localLogRec {
-	gen, pos := iSt.sync.reserveGenAndPosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
+func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, objid, vers, par string) *localLogRec {
+	vlog.VI(4).Infof("sync: createLocalLinkLogRec: obj %s vers %s par %s", objid, vers, par)
 
-	vlog.VI(4).Infof("sync: createLocalLinkLogRec: obj %s vers %s par %s", obj, vers, par)
+	var gen, pos uint64
+	if iSt.sg {
+		gen, pos = iSt.config.sync.reserveGenAndPosInDbLog(ctx, iSt.config.appName, iSt.config.dbName, objid, 1)
+	} else {
+		gen, pos = iSt.config.sync.reserveGenAndPosInDbLog(ctx, iSt.config.appName, iSt.config.dbName, "", 1)
+	}
 
 	rec := &localLogRec{
 		Metadata: interfaces.LogRecMetadata{
-			Id:      iSt.sync.id,
+			Id:      iSt.config.sync.id,
 			Gen:     gen,
 			RecType: interfaces.LinkRec,
 
-			ObjId:      obj,
+			ObjId:      objid,
 			CurVers:    vers,
 			Parents:    []string{par},
 			UpdTime:    time.Now().UTC(),
@@ -895,35 +946,53 @@
 // updateSyncSt updates local sync state at the end of an initiator cycle.
 func (iSt *initiationState) updateSyncSt(ctx *context.T) error {
 	// Get the current local sync state.
-	dsInMem, err := iSt.sync.copyDbSyncStateInMem(ctx, iSt.appName, iSt.dbName)
+	dsInMem, err := iSt.config.sync.copyDbSyncStateInMem(ctx, iSt.config.appName, iSt.config.dbName)
 	if err != nil {
 		return err
 	}
+	// Create the state to be persisted.
 	ds := &dbSyncState{
-		Gen:        dsInMem.gen,
-		CheckptGen: dsInMem.checkptGen,
-		GenVec:     dsInMem.genvec,
+		Data: localGenInfo{
+			Gen:        dsInMem.data.gen,
+			CheckptGen: dsInMem.data.checkptGen,
+		},
+		Sgs:      make(map[interfaces.GroupId]localGenInfo),
+		GenVec:   dsInMem.genvec,
+		SgGenVec: dsInMem.sggenvec,
 	}
 
+	for id, info := range dsInMem.sgs {
+		ds.Sgs[id] = localGenInfo{
+			Gen:        info.gen,
+			CheckptGen: info.checkptGen,
+		}
+	}
+
+	genvec := ds.GenVec
+	if iSt.sg {
+		genvec = ds.SgGenVec
+	}
 	// remote can be a subset of local.
 	for rpfx, respgv := range iSt.remote {
-		for lpfx, lpgv := range ds.GenVec {
+		for lpfx, lpgv := range genvec {
 			if strings.HasPrefix(lpfx, rpfx) {
 				mergePrefixGenVectors(lpgv, respgv)
 			}
 		}
-		if _, ok := ds.GenVec[rpfx]; !ok {
-			ds.GenVec[rpfx] = respgv
+		if _, ok := genvec[rpfx]; !ok {
+			genvec[rpfx] = respgv
 		}
 	}
 
-	iSt.updLocal = ds.GenVec
+	iSt.updLocal = genvec
 	// Clean the genvector of any local state. Note that local state is held
 	// in gen/ckPtGen in sync state struct.
 	for _, pgv := range iSt.updLocal {
-		delete(pgv, iSt.sync.id)
+		delete(pgv, iSt.config.sync.id)
 	}
 
+	// TODO(hpucha): Flip join pending if needed.
+
 	// TODO(hpucha): Add knowledge compaction.
 
 	return putDbSyncState(ctx, iSt.tx, ds)
@@ -938,31 +1007,3 @@
 		}
 	}
 }
-
-////////////////////////////////////////
-// Peer selection policies.
-
-// pickPeer picks a Syncbase to sync with.
-func (s *syncService) pickPeer(ctx *context.T) (string, error) {
-	switch peerSelectionPolicy {
-	case selectRandom:
-		members := s.getMembers(ctx)
-		// Remove myself from the set.
-		delete(members, s.name)
-		if len(members) == 0 {
-			return "", verror.New(verror.ErrInternal, ctx, "no useful peer")
-		}
-
-		// Pick a peer at random.
-		ind := randIntn(len(members))
-		for m := range members {
-			if ind == 0 {
-				return m, nil
-			}
-			ind--
-		}
-		return "", verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
-	default:
-		return "", verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
-	}
-}
diff --git a/services/syncbase/vsync/initiator_test.go b/services/syncbase/vsync/initiator_test.go
index 01a63ad..935539b 100644
--- a/services/syncbase/vsync/initiator_test.go
+++ b/services/syncbase/vsync/initiator_test.go
@@ -15,12 +15,14 @@
 import (
 	"fmt"
 	"reflect"
+	"sort"
 	"testing"
 	"time"
 
 	"v.io/v23/services/syncbase/nosql"
 	"v.io/v23/vdl"
 	"v.io/v23/vom"
+	"v.io/x/lib/set"
 	_ "v.io/x/ref/runtime/factories/generic"
 	"v.io/x/ref/services/syncbase/server/interfaces"
 	"v.io/x/ref/services/syncbase/server/util"
@@ -84,7 +86,7 @@
 // TestLogStreamRemoteOnly tests processing of a remote log stream. Commands are
 // in file testdata/remote-init-00.log.sync.
 func TestLogStreamRemoteOnly(t *testing.T) {
-	svc, iSt, cleanup := testInit(t, "", "remote-init-00.log.sync")
+	svc, iSt, cleanup := testInit(t, "", "remote-init-00.log.sync", false)
 	defer cleanup(t, svc)
 
 	// Check all log records.
@@ -92,9 +94,9 @@
 	var gen uint64
 	var parents []string
 	for gen = 1; gen < 4; gen++ {
-		gotRec, err := getLogRec(nil, svc.St(), 11, gen)
+		gotRec, err := getLogRec(nil, svc.St(), logDataPrefix, 11, gen)
 		if err != nil || gotRec == nil {
-			t.Fatalf("getLogRec can not find object 11 %d, err %v", gen, err)
+			t.Fatalf("getLogRec can not find object %s 11 %d, err %v", logDataPrefix, gen, err)
 		}
 		vers := fmt.Sprintf("%d", gen)
 		wantRec := &localLogRec{
@@ -174,7 +176,7 @@
 // correctly applied (when there are no conflicts). Commands are in files
 // testdata/<local-init-00.log.sync,remote-noconf-00.log.sync>.
 func TestLogStreamNoConflict(t *testing.T) {
-	svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-noconf-00.log.sync")
+	svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-noconf-00.log.sync", false)
 	defer cleanup(t, svc)
 
 	objid := util.JoinKeyParts(util.RowPrefix, "foo1")
@@ -185,10 +187,10 @@
 	for _, devid := range []uint64{10, 11} {
 		var gen uint64
 		for gen = 1; gen < 4; gen++ {
-			gotRec, err := getLogRec(nil, svc.St(), devid, gen)
+			gotRec, err := getLogRec(nil, svc.St(), logDataPrefix, devid, gen)
 			if err != nil || gotRec == nil {
-				t.Fatalf("getLogRec can not find object %d:%d, err %v",
-					devid, gen, err)
+				t.Fatalf("getLogRec can not find object %s:%d:%d, err %v",
+					logDataPrefix, devid, gen, err)
 			}
 			vers := fmt.Sprintf("%d", version)
 			wantRec := &localLogRec{
@@ -271,7 +273,7 @@
 // correctly applied when there are conflicts. Commands are in files
 // testdata/<local-init-00.log.sync,remote-conf-00.log.sync>.
 func TestLogStreamConflict(t *testing.T) {
-	svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-00.log.sync")
+	svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-00.log.sync", false)
 	defer cleanup(t, svc)
 
 	objid := util.JoinKeyParts(util.RowPrefix, "foo1")
@@ -318,7 +320,7 @@
 // two versions of an object have no common ancestor. Commands are in files
 // testdata/<local-init-00.log.sync,remote-conf-03.log.sync>.
 func TestLogStreamConflictNoAncestor(t *testing.T) {
-	svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-03.log.sync")
+	svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-conf-03.log.sync", false)
 	defer cleanup(t, svc)
 
 	objid := util.JoinKeyParts(util.RowPrefix, "foo1")
@@ -360,10 +362,59 @@
 	tx.Abort()
 }
 
+// TestSgSync tests that a local and a remote log stream can be correctly
+// applied when there are conflicts on a SyncGroup update. Commands are in files
+// testdata/<local-sginit-00.log.sync,remote-sgconf-00.log.sync>. Note that
+// since data syncing tests are covering different scenarios in detail, a single
+// sniff test is added for SyncGroup sync.
+// TODO(hpucha): Add this test.
+/* func TestSgSync(t *testing.T) {
+	svc, iSt, cleanup := testInit(t, "local-sginit-00.log.sync", "remote-sgconf-00.log.sync", false)
+	defer cleanup(t, svc)
+
+	objid := util.JoinKeyParts(util.RowPrefix, "foo1")
+
+	// Verify conflict state.
+	if len(iSt.updObjects) != 1 {
+		t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
+	}
+	st := iSt.updObjects[objid]
+	if !st.isConflict {
+		t.Fatalf("Didn't detect a conflict %v", st)
+	}
+	if st.newHead != "6" || st.oldHead != "3" || st.ancestor != "2" {
+		t.Fatalf("Conflict detection didn't succeed %v", st)
+	}
+	if st.res.ty != pickRemote {
+		t.Fatalf("Conflict resolution did not pick remote: %v", st.res.ty)
+	}
+
+	// Verify DAG state.
+	if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
+		t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
+	}
+
+	// Verify Database state.
+	valbuf, err := svc.St().Get([]byte(objid), nil)
+	var val string
+	if err := vom.Decode(valbuf, &val); err != nil {
+		t.Fatalf("Value decode failed, err %v", err)
+	}
+	if err != nil || val != "abc" {
+		t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+	}
+	tx := svc.St().NewTransaction()
+	versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
+	if err != nil || string(versbuf) != "6" {
+		t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
+	}
+	tx.Abort()
+} */
+
 //////////////////////////////
 // Helpers.
 
-func testInit(t *testing.T, lfile, rfile string) (*mockService, *initiationState, func(*testing.T, *mockService)) {
+func testInit(t *testing.T, lfile, rfile string, sg bool) (*mockService, *initiationState, func(*testing.T, *mockService)) {
 	// Set a large value to prevent the initiator from running.
 	peerSyncInterval = 1 * time.Hour
 	conflictResolutionPolicy = useTime
@@ -373,10 +424,20 @@
 	s.id = 10 // initiator
 
 	sgId1 := interfaces.GroupId(1234)
+	gdb := appDbName("mockapp", "mockdb")
 	nullInfo := nosql.SyncGroupMemberInfo{}
 	sgInfo := sgMemberInfo{
 		sgId1: nullInfo,
 	}
+	info := &memberInfo{
+		db2sg: map[string]sgMemberInfo{
+			gdb: sgInfo,
+		},
+		mtTables: map[string]struct{}{
+			"1/2/3/4": struct{}{},
+			"5/6/7/8": struct{}{},
+		},
+	}
 
 	sg1 := &interfaces.SyncGroup{
 		Name:        "sg1",
@@ -411,32 +472,52 @@
 		return svc, nil, cleanup
 	}
 
-	gdb := appDbName("mockapp", "mockdb")
-	iSt, err := newInitiationState(nil, s, "b", gdb, sgInfo)
+	c, err := newInitiationConfig(nil, s, "b", gdb, info, set.String.ToSlice(info.mtTables))
 	if err != nil {
-		t.Fatalf("newInitiationState failed with err %v", err)
+		t.Fatalf("newInitiationConfig failed with err %v", err)
 	}
 
-	testIfSgPfxsEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
-	testIfMapArrEqual(t, iSt.mtTables, sg1.Spec.MountTables)
+	iSt := newInitiationState(nil, c, sg)
 
-	s.initDbSyncStateInMem(nil, "mockapp", "mockdb")
-
-	// Create local genvec so that it contains knowledge only about common prefixes.
-	if err := iSt.createLocalGenVec(nil); err != nil {
-		t.Fatalf("createLocalGenVec failed with err %v", err)
+	if !sg {
+		iSt.peerSgInfo(nil)
+		testIfSgPfxsEqual(t, iSt.config.sgPfxs, sg1.Spec.Prefixes)
 	}
 
-	wantVec := interfaces.GenVector{
-		"foo": interfaces.PrefixGenVector{10: 0},
-		"bar": interfaces.PrefixGenVector{10: 0},
+	sort.Strings(iSt.config.mtTables)
+	sort.Strings(sg1.Spec.MountTables)
+	if !reflect.DeepEqual(iSt.config.mtTables, sg1.Spec.MountTables) {
+		t.Fatalf("Mount tables are not equal config %v, spec %v", iSt.config.mtTables, sg1.Spec.MountTables)
 	}
+
+	s.initSyncStateInMem(nil, "mockapp", "mockdb", sgId1)
+
+	iSt.stream = createReplayStream(t, rfile)
+
+	var wantVec interfaces.GenVector
+	if sg {
+		if err := iSt.prepareSGDeltaReq(nil); err != nil {
+			t.Fatalf("prepareSGDeltaReq failed with err %v", err)
+		}
+		sg := fmt.Sprintf("%d", sgId1)
+		wantVec = interfaces.GenVector{
+			sg: interfaces.PrefixGenVector{10: 0},
+		}
+	} else {
+		if err := iSt.prepareDataDeltaReq(nil); err != nil {
+			t.Fatalf("prepareDataDeltaReq failed with err %v", err)
+		}
+
+		wantVec = interfaces.GenVector{
+			"foo": interfaces.PrefixGenVector{10: 0},
+			"bar": interfaces.PrefixGenVector{10: 0},
+		}
+	}
+
 	if !reflect.DeepEqual(iSt.local, wantVec) {
 		t.Fatalf("createLocalGenVec failed got %v, want %v", iSt.local, wantVec)
 	}
 
-	iSt.stream = createReplayStream(t, rfile)
-
 	if err := iSt.recvAndProcessDeltas(nil); err != nil {
 		t.Fatalf("recvAndProcessDeltas failed with err %v", err)
 	}
@@ -461,13 +542,6 @@
 	}
 }
 
-func testIfMapArrEqual(t *testing.T, m map[string]struct{}, a []string) {
-	aMap := arrToMap(a)
-	if !reflect.DeepEqual(m, aMap) {
-		t.Fatalf("testIfMapArrEqual failed map %v, arr %v", m, aMap)
-	}
-}
-
 func arrToMap(a []string) map[string]struct{} {
 	m := make(map[string]struct{})
 	for _, s := range a {
diff --git a/services/syncbase/vsync/replay_test.go b/services/syncbase/vsync/replay_test.go
index 1a4e2f1..e13eab6 100644
--- a/services/syncbase/vsync/replay_test.go
+++ b/services/syncbase/vsync/replay_test.go
@@ -278,21 +278,6 @@
 func (ds *dummyStream) Cancel() {
 }
 
-func (ds *dummyStream) SendStream() interface {
-	Send(item interfaces.DeltaReq) error
-	Close() error
-} {
-	return ds
-}
-
-func (ds *dummyStream) Send(item interfaces.DeltaReq) error {
-	return nil
-}
-
-func (ds *dummyStream) Close() error {
-	return nil
-}
-
 // replayLocalCommands replays local log records parsed from the input file.
 func replayLocalCommands(t *testing.T, s *mockService, syncfile string) {
 	cmds, err := parseSyncCommands(syncfile)
@@ -342,9 +327,6 @@
 	}
 
 	stream := newStream()
-	start := interfaces.DeltaRespStart{true}
-	stream.add(start)
-
 	for _, cmd := range cmds {
 		var ty byte
 		switch cmd.cmd {
@@ -373,13 +355,11 @@
 
 		stream.add(rec)
 	}
-	fin := interfaces.DeltaRespFinish{true}
-	stream.add(fin)
 	return stream
 }
 
 func createMetadata(t *testing.T, ty byte, cmd syncCommand) interfaces.LogRecMetadata {
-	id, gen, err := splitLogRecKey(nil, cmd.logrec)
+	_, id, gen, err := splitLogRecKey(nil, cmd.logrec)
 	if err != nil {
 		t.Fatalf("createReplayStream splitLogRecKey failed, key %s, err %v", cmd.logrec, gen)
 	}
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 0290b41..46d03a1 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -6,7 +6,9 @@
 
 import (
 	"container/heap"
+	"fmt"
 	"sort"
+	"strconv"
 	"strings"
 
 	"v.io/v23/context"
@@ -19,41 +21,62 @@
 )
 
 // GetDeltas implements the responder side of the GetDeltas RPC.
-func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, initiator string) error {
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, req interfaces.DeltaReq, initiator string) error {
 	vlog.VI(2).Infof("sync: GetDeltas: begin: from initiator %s", initiator)
 	defer vlog.VI(2).Infof("sync: GetDeltas: end: from initiator %s", initiator)
 
-	recvr := call.RecvStream()
-	for recvr.Advance() {
-		req := recvr.Value()
-		// Ignoring errors since if one Database fails for any reason,
-		// it is fine to continue to the next one. In fact, sometimes
-		// the failure might be genuine. For example, the responder is
-		// no longer part of the requested SyncGroups, or the app/db is
-		// locally deleted, or a permission change has denied access.
-		rSt := newResponderState(ctx, call, s, req, initiator)
-		rSt.sendDeltasPerDatabase(ctx)
-	}
-
-	// TODO(hpucha): Is there a need to call finish or some such?
-	return recvr.Err()
+	rSt := newResponderState(ctx, call, s, req, initiator)
+	return rSt.sendDeltasPerDatabase(ctx)
 }
 
 // responderState is state accumulated per Database by the responder during an
 // initiation round.
 type responderState struct {
-	req       interfaces.DeltaReq
+	// Parameters from the request.
+	appName string
+	dbName  string
+	sgIds   sgSet
+	initVec interfaces.GenVector
+	sg      bool
+
 	call      interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
 	initiator string
-	errState  error // Captures the error from the first two phases of the responder.
 	sync      *syncService
 	st        store.Store // Store handle to the Database.
-	diff      genRangeVector
-	outVec    interfaces.GenVector
+
+	diff   genRangeVector
+	outVec interfaces.GenVector
 }
 
 func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq, initiator string) *responderState {
-	rSt := &responderState{call: call, sync: sync, req: req, initiator: initiator}
+	rSt := &responderState{
+		call:      call,
+		sync:      sync,
+		initiator: initiator,
+	}
+
+	switch v := req.(type) {
+	case interfaces.DeltaReqData:
+		rSt.appName = v.Value.AppName
+		rSt.dbName = v.Value.DbName
+		rSt.sgIds = v.Value.SgIds
+		rSt.initVec = v.Value.InitVec
+
+	case interfaces.DeltaReqSgs:
+		rSt.sg = true
+		rSt.appName = v.Value.AppName
+		rSt.dbName = v.Value.DbName
+		rSt.initVec = v.Value.InitVec
+		rSt.sgIds = make(sgSet)
+		// Populate the sgids from the initvec.
+		for id := range rSt.initVec {
+			gid, err := strconv.ParseUint(id, 10, 64)
+			if err != nil {
+				vlog.Fatalf("sync: newResponderState: invalid syncgroup id", gid)
+			}
+			rSt.sgIds[interfaces.GroupId(gid)] = struct{}{}
+		}
+	}
 	return rSt
 }
 
@@ -97,43 +120,60 @@
 	// (see http://goo.gl/mEa4L0) but only incur that overhead when the
 	// logging level specified is enabled.
 	vlog.VI(3).Infof("sync: sendDeltasPerDatabase: %s, %s: sgids %v, genvec %v",
-		rSt.req.AppName, rSt.req.DbName, rSt.req.SgIds, rSt.req.InitVec)
+		rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
 
 	// Phase 1 of sendDeltas: Authorize the initiator and respond to the
 	// caller only for the SyncGroups that allow access.
-	rSt.authorizeAndFilterSyncGroups(ctx)
+	err := rSt.authorizeAndFilterSyncGroups(ctx)
 
-	// Phase 2 of sendDeltas: diff contains the bound on the
+	// Check error from phase 1.
+	if err != nil {
+		return err
+	}
+
+	if len(rSt.initVec) == 0 {
+		return verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
+	}
+
+	// Phase 2 and 3 of sendDeltas: diff contains the bound on the
 	// generations missing from the initiator per device.
-	rSt.computeDeltaBound(ctx)
+	if rSt.sg {
+		err = rSt.sendSgDeltas(ctx)
+	} else {
+		err = rSt.sendDataDeltas(ctx)
+	}
 
-	// Phase 3 of sendDeltas: Process the diff, filtering out records that
-	// are not needed, and send the remainder on the wire ordered.
-	return rSt.filterAndSendDeltas(ctx)
+	return err
 }
 
 // authorizeAndFilterSyncGroups authorizes the initiator against the requested
 // SyncGroups and filters the initiator's prefixes to only include those from
 // allowed SyncGroups (phase 1 of sendDeltas).
-func (rSt *responderState) authorizeAndFilterSyncGroups(ctx *context.T) {
-	rSt.st, rSt.errState = rSt.sync.getDbStore(ctx, nil, rSt.req.AppName, rSt.req.DbName)
-	if rSt.errState != nil {
-		return
+func (rSt *responderState) authorizeAndFilterSyncGroups(ctx *context.T) error {
+	var err error
+	rSt.st, err = rSt.sync.getDbStore(ctx, nil, rSt.appName, rSt.dbName)
+	if err != nil {
+		return err
 	}
 
 	allowedPfxs := make(map[string]struct{})
-	for sgid := range rSt.req.SgIds {
+	for sgid := range rSt.sgIds {
 		// Check permissions for the SyncGroup.
 		var sg *interfaces.SyncGroup
-		sg, rSt.errState = getSyncGroupById(ctx, rSt.st, sgid)
-		if rSt.errState != nil {
-			return
-		}
-		rSt.errState = authorize(ctx, rSt.call.Security(), sg)
-		if verror.ErrorID(rSt.errState) == verror.ErrNoAccess.ID {
+		sg, err = getSyncGroupById(ctx, rSt.st, sgid)
+		if err != nil {
+			vlog.Errorf("sync: authorizeAndFilterSyncGroups: accessing SyncGroup information failed %v, err %v", sgid, err)
 			continue
-		} else if rSt.errState != nil {
-			return
+		}
+		err = authorize(ctx, rSt.call.Security(), sg)
+		if verror.ErrorID(err) == verror.ErrNoAccess.ID {
+			if rSt.sg {
+				id := fmt.Sprintf("%d", sgid)
+				delete(rSt.initVec, id)
+			}
+			continue
+		} else if err != nil {
+			return err
 		}
 
 		for _, p := range sg.Spec.Prefixes {
@@ -147,8 +187,16 @@
 		rSt.addInitiatorToSyncGroup(ctx, sgid)
 	}
 
+	if err != nil {
+		return err
+	}
+
+	if rSt.sg {
+		return nil
+	}
+
 	// Filter the initiator's prefixes to what is allowed.
-	for pfx := range rSt.req.InitVec {
+	for pfx := range rSt.initVec {
 		if _, ok := allowedPfxs[pfx]; ok {
 			continue
 		}
@@ -160,10 +208,10 @@
 		}
 
 		if !allowed {
-			delete(rSt.req.InitVec, pfx)
+			delete(rSt.initVec, pfx)
 		}
 	}
-	return
+	return nil
 }
 
 // addInitiatorToSyncGroup adds the request initiator to the membership of the
@@ -199,27 +247,58 @@
 	}
 }
 
-// computeDeltaBound computes the bound on missing generations across all
-// requested prefixes (phase 2 of sendDeltas).
-func (rSt *responderState) computeDeltaBound(ctx *context.T) {
-	// Check error from phase 1.
-	if rSt.errState != nil {
-		return
+// sendSgDeltas computes the bound on missing generations, and sends the missing
+// log records across all requested SyncGroups (phases 2 and 3 of sendDeltas).
+func (rSt *responderState) sendSgDeltas(ctx *context.T) error {
+	vlog.VI(3).Infof("sync: sendSgDeltas: %s, %s: sgids %v, genvec %v",
+		rSt.appName, rSt.dbName, rSt.sgIds, rSt.initVec)
+
+	respVec, _, err := rSt.sync.copyDbGenInfo(ctx, rSt.appName, rSt.dbName, rSt.sgIds)
+	if err != nil {
+		return err
 	}
 
-	if len(rSt.req.InitVec) == 0 {
-		rSt.errState = verror.New(verror.ErrInternal, ctx, "empty initiator generation vector")
-		return
+	rSt.outVec = make(interfaces.GenVector)
+
+	for sg, initpgv := range rSt.initVec {
+		respgv, ok := respVec[sg]
+		if !ok {
+			continue
+		}
+		rSt.diff = make(genRangeVector)
+		rSt.diffPrefixGenVectors(respgv, initpgv)
+		rSt.outVec[sg] = respgv
+
+		if err := rSt.filterAndSendDeltas(ctx, sg); err != nil {
+			return err
+		}
+	}
+	return rSt.sendGenVec(ctx)
+}
+
+// sendDataDeltas computes the bound on missing generations across all requested
+// prefixes, and sends the missing log records (phases 2 and 3 of sendDeltas).
+func (rSt *responderState) sendDataDeltas(ctx *context.T) error {
+	// Phase 2 of sendDeltas: Compute the missing generations.
+	if err := rSt.computeDataDeltas(ctx); err != nil {
+		return err
 	}
 
-	var respVec interfaces.GenVector
-	var respGen uint64
-	respVec, respGen, rSt.errState = rSt.sync.copyDbGenInfo(ctx, rSt.req.AppName, rSt.req.DbName)
-	if rSt.errState != nil {
-		return
+	// Phase 3 of sendDeltas: Process the diff, filtering out records that
+	// are not needed, and send the remainder on the wire ordered.
+	if err := rSt.filterAndSendDeltas(ctx, logDataPrefix); err != nil {
+		return err
+	}
+	return rSt.sendGenVec(ctx)
+}
+
+func (rSt *responderState) computeDataDeltas(ctx *context.T) error {
+	respVec, respGen, err := rSt.sync.copyDbGenInfo(ctx, rSt.appName, rSt.dbName, nil)
+	if err != nil {
+		return err
 	}
 	respPfxs := extractAndSortPrefixes(respVec)
-	initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
+	initPfxs := extractAndSortPrefixes(rSt.initVec)
 
 	rSt.outVec = make(interfaces.GenVector)
 	rSt.diff = make(genRangeVector)
@@ -235,7 +314,7 @@
 		pfx = p
 
 		// Lower bound on initiator's knowledge for this prefix set.
-		initpgv := rSt.req.InitVec[pfx]
+		initpgv := rSt.initVec[pfx]
 
 		// Find the relevant responder prefixes and add the corresponding knowledge.
 		var respgv interfaces.PrefixGenVector
@@ -285,27 +364,17 @@
 	}
 
 	vlog.VI(3).Infof("sync: computeDeltaBound: %s, %s: diff %v, outvec %v",
-		rSt.req.AppName, rSt.req.DbName, rSt.diff, rSt.outVec)
-	return
+		rSt.appName, rSt.dbName, rSt.diff, rSt.outVec)
+	return nil
 }
 
 // filterAndSendDeltas filters the computed delta to remove records already
 // known by the initiator, and sends the resulting records to the initiator
 // (phase 3 of sendDeltas).
-func (rSt *responderState) filterAndSendDeltas(ctx *context.T) error {
-	// Always send a start and finish response so that the initiator can
-	// move on to the next Database.
-	//
+func (rSt *responderState) filterAndSendDeltas(ctx *context.T, pfx string) error {
 	// TODO(hpucha): Although ok for now to call SendStream once per
 	// Database, would like to make this implementation agnostic.
 	sender := rSt.call.SendStream()
-	sender.Send(interfaces.DeltaRespStart{true})
-	defer sender.Send(interfaces.DeltaRespFinish{true})
-
-	// Check error from phase 2.
-	if rSt.errState != nil {
-		return rSt.errState
-	}
 
 	// First two phases were successful. So now on to phase 3. We now visit
 	// every log record in the generation range as obtained from phase 1 in
@@ -316,7 +385,7 @@
 	mh := make(minHeap, 0, len(rSt.diff))
 	for dev, r := range rSt.diff {
 		r.cur = r.min
-		rec, err := getNextLogRec(ctx, rSt.st, dev, r)
+		rec, err := getNextLogRec(ctx, rSt.st, pfx, dev, r)
 		if err != nil {
 			return err
 		}
@@ -329,11 +398,14 @@
 	heap.Init(&mh)
 
 	// Process the log records in order.
-	initPfxs := extractAndSortPrefixes(rSt.req.InitVec)
+	var initPfxs []string
+	if !rSt.sg {
+		initPfxs = extractAndSortPrefixes(rSt.initVec)
+	}
 	for mh.Len() > 0 {
 		rec := heap.Pop(&mh).(*localLogRec)
 
-		if !filterLogRec(rec, rSt.req.InitVec, initPfxs) {
+		if rSt.sg || !filterLogRec(rec, rSt.initVec, initPfxs) {
 			// Send on the wire.
 			wireRec, err := makeWireLogRec(ctx, rSt.st, rec)
 			if err != nil {
@@ -344,7 +416,7 @@
 
 		// Add a new record from the same device if not done.
 		dev := rec.Metadata.Id
-		rec, err := getNextLogRec(ctx, rSt.st, dev, rSt.diff[dev])
+		rec, err := getNextLogRec(ctx, rSt.st, pfx, dev, rSt.diff[dev])
 		if err != nil {
 			return err
 		}
@@ -354,7 +426,11 @@
 			delete(rSt.diff, dev)
 		}
 	}
+	return nil
+}
 
+func (rSt *responderState) sendGenVec(ctx *context.T) error {
+	sender := rSt.call.SendStream()
 	sender.Send(interfaces.DeltaRespRespVec{rSt.outVec})
 	return nil
 }
@@ -426,9 +502,9 @@
 
 // TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
 // loop.
-func getNextLogRec(ctx *context.T, st store.Store, dev uint64, r *genRange) (*localLogRec, error) {
+func getNextLogRec(ctx *context.T, st store.Store, pfx string, dev uint64, r *genRange) (*localLogRec, error) {
 	for i := r.cur; i <= r.max; i++ {
-		rec, err := getLogRec(ctx, st, dev, i)
+		rec, err := getLogRec(ctx, st, pfx, dev, i)
 		if err == nil {
 			r.cur = i + 1
 			return rec, nil
diff --git a/services/syncbase/vsync/responder_test.go b/services/syncbase/vsync/responder_test.go
index dbd0fea..f1c42e0 100644
--- a/services/syncbase/vsync/responder_test.go
+++ b/services/syncbase/vsync/responder_test.go
@@ -111,7 +111,7 @@
 	for _, test := range tests {
 		want := test.genDiffWant
 		got := test.genDiffIn
-		rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{}, "fakeInitiator")
+		rSt := newResponderState(nil, nil, s, interfaces.DeltaReqData{}, "fakeInitiator")
 		rSt.diff = got
 		rSt.diffPrefixGenVectors(test.respPVec, test.initPVec)
 		checkEqualDevRanges(t, got, want)
@@ -121,7 +121,7 @@
 // TestSendDeltas tests the computation of the delta bound (computeDeltaBound)
 // and if the log records on the wire are correctly ordered (phases 2 and 3 of
 // SendDeltas).
-func TestSendDeltas(t *testing.T) {
+func TestSendDataDeltas(t *testing.T) {
 	appName := "mockapp"
 	dbName := "mockdb"
 
@@ -350,16 +350,13 @@
 		s.id = 10 //responder.
 
 		wantDiff, wantVec := test.genDiff, test.outVec
-		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, checkptGen: test.respGen, genvec: test.respVec}
-
-		req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
-		rSt := newResponderState(nil, nil, s, req, "fakeInitiator")
-
-		rSt.computeDeltaBound(nil)
-		if rSt.errState != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
-			t.Fatalf("computeDeltaBound failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, rSt.outVec, wantVec, rSt.errState)
+		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{
+			data: &localGenInfoInMem{
+				gen:        test.respGen,
+				checkptGen: test.respGen,
+			},
+			genvec: test.respVec,
 		}
-		checkEqualDevRanges(t, rSt.diff, wantDiff)
 
 		////////////////////////////////////////
 		// Test sending deltas.
@@ -385,7 +382,7 @@
 					Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
 					Pos:      pos + k,
 				}
-				if err := putLogRec(nil, tx, rec); err != nil {
+				if err := putLogRec(nil, tx, logDataPrefix, rec); err != nil {
 					t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
 				}
 				value := fmt.Sprintf("value_%s", okey)
@@ -404,16 +401,31 @@
 			t.Fatalf("cannot commit putting log rec, err %v", err)
 		}
 
+		req := interfaces.DataDeltaReq{
+			AppName: appName,
+			DbName:  dbName,
+			InitVec: test.initVec,
+		}
+
+		rSt := newResponderState(nil, nil, s, interfaces.DeltaReqData{req}, "fakeInitiator")
 		d := &dummyResponder{}
 		rSt.call = d
-		rSt.st, rSt.errState = rSt.sync.getDbStore(nil, nil, rSt.req.AppName, rSt.req.DbName)
-		if rSt.errState != nil {
-			t.Fatalf("filterAndSendDeltas failed to get store handle for app/db %v %v", rSt.req.AppName, rSt.req.DbName)
-		}
-		err := rSt.filterAndSendDeltas(nil)
+		var err error
+		rSt.st, err = rSt.sync.getDbStore(nil, nil, rSt.appName, rSt.dbName)
 		if err != nil {
-			t.Fatalf("filterAndSendDeltas failed (I: %v), (R: %v, %v) err %v", test.initVec, test.respGen, test.respVec, err)
+			t.Fatalf("getDbStore failed to get store handle for app/db %v %v", rSt.appName, rSt.dbName)
 		}
+
+		err = rSt.computeDataDeltas(nil)
+		if err != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
+			t.Fatalf("computeDataDeltas failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, rSt.outVec, wantVec, err)
+		}
+		checkEqualDevRanges(t, rSt.diff, wantDiff)
+
+		if err = rSt.sendDataDeltas(nil); err != nil {
+			t.Fatalf("sendDataDeltas failed, err %v", err)
+		}
+
 		d.diffLogRecs(t, wantRecs, wantVec)
 
 		destroyService(t, svc)
@@ -424,29 +436,10 @@
 // Helpers
 
 type dummyResponder struct {
-	start, finish int
-	gotRecs       []*localLogRec
-	outVec        interfaces.GenVector
+	gotRecs []*localLogRec
+	outVec  interfaces.GenVector
 }
 
-func (d *dummyResponder) RecvStream() interface {
-	Advance() bool
-	Value() interfaces.DeltaReq
-	Err() error
-} {
-	return d
-}
-
-func (d *dummyResponder) Advance() bool {
-	return false
-}
-
-func (d *dummyResponder) Value() interfaces.DeltaReq {
-	return interfaces.DeltaReq{}
-}
-
-func (d *dummyResponder) Err() error { return nil }
-
 func (d *dummyResponder) SendStream() interface {
 	Send(item interfaces.DeltaResp) error
 } {
@@ -455,10 +448,6 @@
 
 func (d *dummyResponder) Send(item interfaces.DeltaResp) error {
 	switch v := item.(type) {
-	case interfaces.DeltaRespStart:
-		d.start++
-	case interfaces.DeltaRespFinish:
-		d.finish++
 	case interfaces.DeltaRespRespVec:
 		d.outVec = v.Value
 	case interfaces.DeltaRespRec:
@@ -492,9 +481,6 @@
 }
 
 func (d *dummyResponder) diffLogRecs(t *testing.T, wantRecs []*localLogRec, wantVec interfaces.GenVector) {
-	if d.start != 1 || d.finish != 1 {
-		t.Fatalf("diffLogRecs incorrect start/finish records (%v, %v)", d.start, d.finish)
-	}
 	if len(d.gotRecs) != len(wantRecs) {
 		t.Fatalf("diffLogRecs failed, gotLen %v, wantLen %v\n", len(d.gotRecs), len(wantRecs))
 	}
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index 94ab6eb..1eebf88 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -29,6 +29,26 @@
 // generation vector for a Database. Log records are indexed such that they can
 // be selectively retrieved from the store for any missing generation from any
 // device.
+//
+// Sync also tracks the current generation number and the current local log
+// position for each mutation of a SyncGroup, created on a Database. Similar to
+// the data log records, these log records are used to sync SyncGroup metadata.
+//
+// The generations for the data mutations and mutations for each SyncGroup are
+// in separate spaces. Data mutations in a Database start at gen 1, and
+// grow. Mutations for each SyncGroup start at gen 1, and grow. Thus, for the
+// local data log records, the keys are of the form
+// $sync:log:data:<devid>:<gen>, and the keys for local SyncGroup log record are
+// of the form $sync:log:<sgid>:<devid>:<gen>.
+
+// TODO(hpucha): Should this space be separate from the data or not? If it is
+// not, it can provide consistency between data and SyncGroup metadata. For
+// example, lets say we mutate the data in a SyncGroup and soon after change the
+// SyncGroup ACL to prevent syncing with a device. This device may not get the
+// last batch of updates since the next time it will try to sync, it will be
+// rejected. However implementing consistency is not straightforward. Even if we
+// had SyncGroup updates in the same space as the data, we need to switch to the
+// right SyncGroup ACL at the responder based on the requested generations.
 
 import (
 	"fmt"
@@ -36,24 +56,58 @@
 
 	"v.io/v23/context"
 	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
 	"v.io/x/ref/services/syncbase/server/interfaces"
 	"v.io/x/ref/services/syncbase/server/util"
 	"v.io/x/ref/services/syncbase/store"
 )
 
-// dbSyncStateInMem represents the in-memory sync state of a Database.
-type dbSyncStateInMem struct {
-	gen uint64
-	pos uint64
-
+// localGenInfoInMem represents the state corresponding to local generations.
+type localGenInfoInMem struct {
+	gen        uint64
+	pos        uint64
 	checkptGen uint64
-	genvec     interfaces.GenVector // Note: Generation vector contains state from remote devices only.
+}
+
+func (in *localGenInfoInMem) deepCopy() *localGenInfoInMem {
+	out := &localGenInfoInMem{
+		gen:        in.gen,
+		pos:        in.pos,
+		checkptGen: in.checkptGen,
+	}
+	return out
+}
+
+// dbSyncStateInMem represents the in-memory sync state of a Database and all
+// its SyncGroups.
+type dbSyncStateInMem struct {
+	data *localGenInfoInMem                        // info for data.
+	sgs  map[interfaces.GroupId]*localGenInfoInMem // info for SyncGroups.
+
+	// Note: Generation vector contains state from remote devices only.
+	genvec   interfaces.GenVector
+	sggenvec interfaces.GenVector
+}
+
+func (in *dbSyncStateInMem) deepCopy() *dbSyncStateInMem {
+	out := &dbSyncStateInMem{}
+	out.data = in.data.deepCopy()
+
+	out.sgs = make(map[interfaces.GroupId]*localGenInfoInMem)
+	for id, info := range in.sgs {
+		out.sgs[id] = info.deepCopy()
+	}
+
+	out.genvec = in.genvec.DeepCopy()
+	out.sggenvec = in.sggenvec.DeepCopy()
+
+	return out
 }
 
 // initSync initializes the sync module during startup. It scans all the
 // databases across all apps to initialize the following:
-// a) in-memory sync state of a Database consisting of the current generation
-//    number, log position and generation vector.
+// a) in-memory sync state of a Database and all its SyncGroups consisting of
+// the current generation number, log position and generation vector.
 // b) watcher map of prefixes currently being synced.
 // c) republish names in mount tables for all syncgroups.
 //
@@ -94,14 +148,14 @@
 			if verror.ErrorID(err) == verror.ErrNoExist.ID {
 				scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "")
 			} else {
-				scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "")
+				scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(logDataPrefix, s.id, ds.Data.Gen), "")
 			}
 			var maxpos uint64
 			var dbName string
 			// Scan local log records to find the most recent one.
 			st.Scan(scanStart, scanLimit)
 			// Scan remote log records using the persisted GenVector.
-			s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1}
+			s.syncState[dbName] = &dbSyncStateInMem{data: &localGenInfoInMem{pos: maxpos + 1}}
 		}
 
 		return false
@@ -110,42 +164,68 @@
 	return errFinal
 }
 
+// Note: For all the utilities below, if the sgid parameter is non-nil, the
+// operation is performed in the SyncGroup space. If nil, it is performed in the
+// data space for the Database.
+//
+// TODO(hpucha): Once GroupId is changed to string, clean up these function
+// signatures.
+
 // reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
 // positions in a Database's log. Used when local updates result in log
 // entries.
-func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName string, count uint64) (uint64, uint64) {
-	return s.reserveGenAndPosInternal(appName, dbName, count, count)
+func (s *syncService) reserveGenAndPosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) (uint64, uint64) {
+	return s.reserveGenAndPosInternal(appName, dbName, sgid, count, count)
 }
 
 // reservePosInDbLog reserves a chunk of log positions in a Database's log. Used
 // when remote log records are received.
-func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName string, count uint64) uint64 {
-	_, pos := s.reserveGenAndPosInternal(appName, dbName, 0, count)
+func (s *syncService) reservePosInDbLog(ctx *context.T, appName, dbName, sgid string, count uint64) uint64 {
+	_, pos := s.reserveGenAndPosInternal(appName, dbName, sgid, 0, count)
 	return pos
 }
 
-func (s *syncService) reserveGenAndPosInternal(appName, dbName string, genCount, posCount uint64) (uint64, uint64) {
+func (s *syncService) reserveGenAndPosInternal(appName, dbName, sgid string, genCount, posCount uint64) (uint64, uint64) {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
 	name := appDbName(appName, dbName)
 	ds, ok := s.syncState[name]
 	if !ok {
-		ds = &dbSyncStateInMem{gen: 1}
+		ds = &dbSyncStateInMem{
+			data: &localGenInfoInMem{gen: 1},
+			sgs:  make(map[interfaces.GroupId]*localGenInfoInMem),
+		}
 		s.syncState[name] = ds
 	}
 
-	gen := ds.gen
-	pos := ds.pos
+	var info *localGenInfoInMem
+	if sgid != "" {
+		id, err := strconv.ParseUint(sgid, 10, 64)
+		if err != nil {
+			vlog.Fatalf("sync: reserveGenAndPosInternal: invalid syncgroup id", sgid)
+		}
 
-	ds.gen += genCount
-	ds.pos += posCount
+		var ok bool
+		info, ok = ds.sgs[interfaces.GroupId(id)]
+		if !ok {
+			info = &localGenInfoInMem{gen: 1}
+			ds.sgs[interfaces.GroupId(id)] = info
+		}
+	} else {
+		info = ds.data
+	}
+	gen := info.gen
+	pos := info.pos
+
+	info.gen += genCount
+	info.pos += posCount
 
 	return gen, pos
 }
 
 // checkptLocalGen freezes the local generation number for the responder's use.
-func (s *syncService) checkptLocalGen(ctx *context.T, appName, dbName string) error {
+func (s *syncService) checkptLocalGen(ctx *context.T, appName, dbName string, sgs sgSet) error {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
@@ -157,19 +237,40 @@
 
 	// The frozen generation is the last generation number used, i.e. one
 	// below the next available one to use.
-	ds.checkptGen = ds.gen - 1
+	if len(sgs) > 0 {
+		// Checkpoint requested SyncGroups.
+		for id := range sgs {
+			info, ok := ds.sgs[id]
+			if !ok {
+				return verror.New(verror.ErrInternal, ctx, "sg state not found", name, id)
+			}
+			info.checkptGen = info.gen - 1
+		}
+	} else {
+		ds.data.checkptGen = ds.data.gen - 1
+	}
 	return nil
 }
 
-// initDbSyncStateInMem initializes the in memory sync state of the Database if needed.
-func (s *syncService) initDbSyncStateInMem(ctx *context.T, appName, dbName string) {
+// initSyncStateInMem initializes the in memory sync state of the Database/SyncGroup if needed.
+func (s *syncService) initSyncStateInMem(ctx *context.T, appName, dbName string, sgid interfaces.GroupId) {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
 	name := appDbName(appName, dbName)
 	if s.syncState[name] == nil {
-		s.syncState[name] = &dbSyncStateInMem{gen: 1}
+		s.syncState[name] = &dbSyncStateInMem{
+			data: &localGenInfoInMem{gen: 1},
+			sgs:  make(map[interfaces.GroupId]*localGenInfoInMem),
+		}
 	}
+	if sgid != interfaces.NoGroupId {
+		ds := s.syncState[name]
+		if _, ok := ds.sgs[sgid]; !ok {
+			ds.sgs[sgid] = &localGenInfoInMem{gen: 1}
+		}
+	}
+	return
 }
 
 // copyDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
@@ -182,20 +283,11 @@
 	if !ok {
 		return nil, verror.New(verror.ErrInternal, ctx, "db state not found", name)
 	}
-
-	dsCopy := &dbSyncStateInMem{
-		gen:        ds.gen,
-		pos:        ds.pos,
-		checkptGen: ds.checkptGen,
-	}
-
-	dsCopy.genvec = copyGenVec(ds.genvec)
-
-	return dsCopy, nil
+	return ds.deepCopy(), nil
 }
 
 // copyDbGenInfo returns a copy of the current generation information of the Database.
-func (s *syncService) copyDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
+func (s *syncService) copyDbGenInfo(ctx *context.T, appName, dbName string, sgs sgSet) (interfaces.GenVector, uint64, error) {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
@@ -205,18 +297,30 @@
 		return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
 	}
 
-	genvec := copyGenVec(ds.genvec)
+	var genvec interfaces.GenVector
+	var gen uint64
+	if len(sgs) > 0 {
+		genvec = make(interfaces.GenVector)
+		for id := range sgs {
+			sid := fmt.Sprintf("%d", id)
+			gv := ds.sggenvec[sid]
+			genvec[sid] = gv.DeepCopy()
+			genvec[sid][s.id] = ds.sgs[id].checkptGen
+		}
+	} else {
+		genvec = ds.genvec.DeepCopy()
 
-	// Add local generation information to the genvec.
-	for _, gv := range genvec {
-		gv[s.id] = ds.checkptGen
+		// Add local generation information to the genvec.
+		for _, gv := range genvec {
+			gv[s.id] = ds.data.checkptGen
+		}
+		gen = ds.data.checkptGen
 	}
-
-	return genvec, ds.checkptGen, nil
+	return genvec, gen, nil
 }
 
 // putDbGenInfoRemote puts the current remote generation information of the Database.
-func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, genvec interfaces.GenVector) error {
+func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, sg bool, genvec interfaces.GenVector) error {
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
@@ -226,7 +330,11 @@
 		return verror.New(verror.ErrInternal, ctx, "db state not found", name)
 	}
 
-	ds.genvec = copyGenVec(genvec)
+	if sg {
+		ds.sggenvec = genvec.DeepCopy()
+	} else {
+		ds.genvec = genvec.DeepCopy()
+	}
 
 	return nil
 }
@@ -248,18 +356,6 @@
 	return parts[0], parts[1], nil
 }
 
-func copyGenVec(in interfaces.GenVector) interfaces.GenVector {
-	genvec := make(interfaces.GenVector)
-	for p, inpgv := range in {
-		pgv := make(interfaces.PrefixGenVector)
-		for id, gen := range inpgv {
-			pgv[id] = gen
-		}
-		genvec[p] = pgv
-	}
-	return genvec
-}
-
 ////////////////////////////////////////////////////////////
 // Low-level utility functions to access sync state.
 
@@ -291,36 +387,42 @@
 }
 
 // logRecKey returns the key used to access a specific log record.
-func logRecKey(id, gen uint64) string {
-	return util.JoinKeyParts(util.SyncPrefix, logPrefix, fmt.Sprintf("%d", id), fmt.Sprintf("%016x", gen))
+func logRecKey(pfx string, id, gen uint64) string {
+	return util.JoinKeyParts(util.SyncPrefix, logPrefix, pfx, fmt.Sprintf("%d", id), fmt.Sprintf("%016x", gen))
 }
 
-// splitLogRecKey is the inverse of logRecKey and returns device id and generation number.
-func splitLogRecKey(ctx *context.T, key string) (uint64, uint64, error) {
+// splitLogRecKey is the inverse of logRecKey and returns the prefix, device id
+// and generation number.
+func splitLogRecKey(ctx *context.T, key string) (string, uint64, uint64, error) {
 	parts := util.SplitKeyParts(key)
 	verr := verror.New(verror.ErrInternal, ctx, "invalid logreckey", key)
-	if len(parts) != 4 {
-		return 0, 0, verr
+	if len(parts) != 5 {
+		return "", 0, 0, verr
 	}
 	if parts[0] != util.SyncPrefix || parts[1] != logPrefix {
-		return 0, 0, verr
+		return "", 0, 0, verr
 	}
-	id, err := strconv.ParseUint(parts[2], 10, 64)
+	if parts[2] != logDataPrefix {
+		if _, err := strconv.ParseUint(parts[2], 10, 64); err != nil {
+			return "", 0, 0, verr
+		}
+	}
+	id, err := strconv.ParseUint(parts[3], 10, 64)
 	if err != nil {
-		return 0, 0, verr
+		return "", 0, 0, verr
 	}
-	gen, err := strconv.ParseUint(parts[3], 16, 64)
+	gen, err := strconv.ParseUint(parts[4], 16, 64)
 	if err != nil {
-		return 0, 0, verr
+		return "", 0, 0, verr
 	}
-	return id, gen, nil
+	return parts[2], id, gen, nil
 }
 
 // hasLogRec returns true if the log record for (devid, gen) exists.
-func hasLogRec(st store.StoreReader, id, gen uint64) (bool, error) {
+func hasLogRec(st store.StoreReader, pfx string, id, gen uint64) (bool, error) {
 	// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
 	var rec localLogRec
-	if err := util.Get(nil, st, logRecKey(id, gen), &rec); err != nil {
+	if err := util.Get(nil, st, logRecKey(pfx, id, gen), &rec); err != nil {
 		if verror.ErrorID(err) == verror.ErrNoExist.ID {
 			err = nil
 		}
@@ -330,20 +432,25 @@
 }
 
 // putLogRec stores the log record.
-func putLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
-	return util.Put(ctx, tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec)
+func putLogRec(ctx *context.T, tx store.Transaction, pfx string, rec *localLogRec) error {
+	return util.Put(ctx, tx, logRecKey(pfx, rec.Metadata.Id, rec.Metadata.Gen), rec)
 }
 
 // getLogRec retrieves the log record for a given (devid, gen).
-func getLogRec(ctx *context.T, st store.StoreReader, id, gen uint64) (*localLogRec, error) {
+func getLogRec(ctx *context.T, st store.StoreReader, pfx string, id, gen uint64) (*localLogRec, error) {
+	return getLogRecByKey(ctx, st, logRecKey(pfx, id, gen))
+}
+
+// getLogRecByKey retrieves the log record for a given log record key.
+func getLogRecByKey(ctx *context.T, st store.StoreReader, key string) (*localLogRec, error) {
 	var rec localLogRec
-	if err := util.Get(ctx, st, logRecKey(id, gen), &rec); err != nil {
+	if err := util.Get(ctx, st, key, &rec); err != nil {
 		return nil, err
 	}
 	return &rec, nil
 }
 
 // delLogRec deletes the log record for a given (devid, gen).
-func delLogRec(ctx *context.T, tx store.Transaction, id, gen uint64) error {
-	return util.Delete(ctx, tx, logRecKey(id, gen))
+func delLogRec(ctx *context.T, tx store.Transaction, pfx string, id, gen uint64) error {
+	return util.Delete(ctx, tx, logRecKey(pfx, id, gen))
 }
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index b621fdc..3036611 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"reflect"
+	"strconv"
 	"testing"
 	"time"
 
@@ -23,18 +24,31 @@
 	defer destroyService(t, svc)
 	s := svc.sync
 
-	var wantGen, wantPos uint64 = 1, 0
-	for i := 0; i < 5; i++ {
-		gotGen, gotPos := s.reserveGenAndPosInternal("mockapp", "mockdb", 5, 10)
-		if gotGen != wantGen || gotPos != wantPos {
-			t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", gotGen, wantGen, gotPos, wantPos)
-		}
-		wantGen += 5
-		wantPos += 10
+	sgids := []string{"", "100", "200"}
+	for _, sgid := range sgids {
+		var wantGen, wantPos uint64 = 1, 0
+		for i := 0; i < 5; i++ {
+			gotGen, gotPos := s.reserveGenAndPosInternal("mockapp", "mockdb", sgid, 5, 10)
+			if gotGen != wantGen || gotPos != wantPos {
+				t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", gotGen, wantGen, gotPos, wantPos)
+			}
+			wantGen += 5
+			wantPos += 10
 
-		name := appDbName("mockapp", "mockdb")
-		if s.syncState[name].gen != wantGen || s.syncState[name].pos != wantPos {
-			t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", s.syncState[name].gen, wantGen, s.syncState[name].pos, wantPos)
+			name := appDbName("mockapp", "mockdb")
+			var info *localGenInfoInMem
+			if sgid == "" {
+				info = s.syncState[name].data
+			} else {
+				id, err := strconv.ParseUint(sgid, 10, 64)
+				if err != nil {
+					t.Fatalf("reserveGenAndPosInternal failed, invalid sgid %v", sgid)
+				}
+				info = s.syncState[name].sgs[interfaces.GroupId(id)]
+			}
+			if info.gen != wantGen || info.pos != wantPos {
+				t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", info.gen, wantGen, info.pos, wantPos)
+			}
 		}
 	}
 }
@@ -52,9 +66,25 @@
 			1: 2, 3: 4, 5: 6,
 		},
 	}
+	sggv := interfaces.GenVector{
+		"mocksg1": interfaces.PrefixGenVector{
+			10: 20, 30: 40, 50: 60,
+		},
+		"mocksg2": interfaces.PrefixGenVector{
+			100: 200, 300: 400, 500: 600,
+		},
+	}
+	localsgs := make(map[interfaces.GroupId]localGenInfo)
+	localsgs[interfaces.GroupId(8888)] = localGenInfo{Gen: 56, CheckptGen: 2000}
+	localsgs[interfaces.GroupId(1008888)] = localGenInfo{Gen: 25890, CheckptGen: 100}
 
 	tx := st.NewTransaction()
-	wantSt := &dbSyncState{Gen: 40, GenVec: gv}
+	wantSt := &dbSyncState{
+		Data:     localGenInfo{Gen: 40},
+		Sgs:      localsgs,
+		GenVec:   gv,
+		SgGenVec: sggv,
+	}
 	if err := putDbSyncState(nil, tx, wantSt); err != nil {
 		t.Fatalf("putDbSyncState failed, err %v", err)
 	}
@@ -92,8 +122,8 @@
 		},
 		Pos: 10,
 	}
-	if err := putLogRec(nil, tx, wantRec); err != nil {
-		t.Fatalf("putLogRec(%d:%d) failed err %v", id, gen, err)
+	if err := putLogRec(nil, tx, logDataPrefix, wantRec); err != nil {
+		t.Fatalf("putLogRec(%s:%d:%d) failed err %v", logDataPrefix, id, gen, err)
 	}
 	if err := tx.Commit(); err != nil {
 		t.Fatalf("cannot commit putting log rec, err %v", err)
@@ -102,8 +132,8 @@
 	checkLogRec(t, st, id, gen, true, wantRec)
 
 	tx = st.NewTransaction()
-	if err := delLogRec(nil, tx, id, gen); err != nil {
-		t.Fatalf("delLogRec(%d:%d) failed err %v", id, gen, err)
+	if err := delLogRec(nil, tx, logDataPrefix, id, gen); err != nil {
+		t.Fatalf("delLogRec(%s:%d:%d) failed err %v", logDataPrefix, id, gen, err)
 	}
 	if err := tx.Commit(); err != nil {
 		t.Fatalf("cannot commit deleting log rec, err %v", err)
@@ -113,27 +143,28 @@
 }
 
 func TestLogRecKeyUtils(t *testing.T) {
-	invalid := []string{"$sync:aa:bb", "log:aa:bb", "$sync:log:aa:xx", "$sync:log:x:bb"}
+	invalid := []string{"$sync:100:bb", "log:100:bb", "$sync:log:data:100:xx", "$sync:log:data:aa:bb", "$sync:log:xx:100:bb"}
 
 	for _, k := range invalid {
-		if _, _, err := splitLogRecKey(nil, k); err == nil {
+		if _, _, _, err := splitLogRecKey(nil, k); err == nil {
 			t.Fatalf("splitting log rec key didn't fail %q", k)
 		}
 	}
 
 	valid := []struct {
+		pfx string
 		id  uint64
 		gen uint64
 	}{
-		{10, 20},
-		{190, 540},
-		{9999, 999999},
+		{logDataPrefix, 10, 20},
+		{"2500", 190, 540},
+		{"4200", 9999, 999999},
 	}
 
 	for _, v := range valid {
-		gotId, gotGen, err := splitLogRecKey(nil, logRecKey(v.id, v.gen))
-		if gotId != v.id || gotGen != v.gen || err != nil {
-			t.Fatalf("failed key conversion id got %v want %v, gen got %v want %v, err %v", gotId, v.id, gotGen, v.gen, err)
+		gotPfx, gotId, gotGen, err := splitLogRecKey(nil, logRecKey(v.pfx, v.id, v.gen))
+		if gotPfx != v.pfx || gotId != v.id || gotGen != v.gen || err != nil {
+			t.Fatalf("failed key conversion pfx got %v want %v, id got %v want %v, gen got %v want %v, err %v", gotPfx, v.pfx, gotId, v.id, gotGen, v.gen, err)
 		}
 	}
 }
@@ -158,7 +189,7 @@
 }
 
 func checkLogRec(t *testing.T, st store.StoreReader, id, gen uint64, exists bool, wantRec *localLogRec) {
-	gotRec, err := getLogRec(nil, st, id, gen)
+	gotRec, err := getLogRec(nil, st, logDataPrefix, id, gen)
 
 	if (!exists && err == nil) || (exists && err != nil) {
 		t.Fatalf("getLogRec(%d:%d) failed, exists %v err %v", id, gen, exists, err)
@@ -168,7 +199,7 @@
 		t.Fatalf("getLogRec(%d:%d) failed, got %v, want %v", id, gen, gotRec, wantRec)
 	}
 
-	if ok, err := hasLogRec(st, id, gen); err != nil || ok != exists {
+	if ok, err := hasLogRec(st, logDataPrefix, id, gen); err != nil || ok != exists {
 		t.Fatalf("hasLogRec(%d:%d) failed, want %v", id, gen, exists)
 	}
 }
diff --git a/services/syncbase/vsync/syncer.go b/services/syncbase/vsync/syncer.go
new file mode 100644
index 0000000..22f485b
--- /dev/null
+++ b/services/syncbase/vsync/syncer.go
@@ -0,0 +1,105 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
+)
+
+// Policies to pick a peer to sync with.
+const (
+	// Picks a peer at random from the available set.
+	selectRandom = iota
+
+	// TODO(hpucha): implement other policies.
+	// Picks a peer with most differing generations.
+	selectMostDiff
+
+	// Picks a peer that was synced with the furthest in the past.
+	selectOldest
+)
+
+var (
+	// peerSyncInterval is the duration between two consecutive peer
+	// contacts. During every peer contact, the initiator obtains any
+	// pending updates from that peer.
+	peerSyncInterval = 50 * time.Millisecond
+
+	// peerSelectionPolicy is the policy used to select a peer when
+	// the initiator gets a chance to sync.
+	peerSelectionPolicy = selectRandom
+)
+
+// syncer wakes up every peerSyncInterval to do work: (1) Refresh memberView if
+// needed and pick a peer from all the known remote peers to sync with. (2) Act
+// as an initiator and sync Syncgroup metadata for all common SyncGroups with
+// the chosen peer (getting updates from the remote peer, detecting and
+// resolving conflicts) (3) Act as an initiator and sync data corresponding to
+// all common SyncGroups across all Apps/Databases with the chosen peer; (4)
+// Fetch any queued blobs. (5) Transfer ownership of blobs if needed. (6) Act as
+// a SyncGroup publisher to publish pending SyncGroups; (6) Garbage collect
+// older generations.
+//
+// TODO(hpucha): Currently only does initiation. Add rest.
+func (s *syncService) syncer(ctx *context.T) {
+	defer s.pending.Done()
+
+	ticker := time.NewTicker(peerSyncInterval)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-s.closed:
+			vlog.VI(1).Info("sync: syncer: channel closed, stop work and exit")
+			return
+
+		case <-ticker.C:
+		}
+
+		// TODO(hpucha): Cut a gen for the responder even if there is no
+		// one to initiate to?
+
+		// Do work.
+		peer, err := s.pickPeer(ctx)
+		if err != nil {
+			continue
+		}
+
+		// Sync Syncgroup metadata and data.
+		s.getDeltas(ctx, peer)
+	}
+}
+
+////////////////////////////////////////
+// Peer selection policies.
+
+// pickPeer picks a Syncbase to sync with.
+func (s *syncService) pickPeer(ctx *context.T) (string, error) {
+	switch peerSelectionPolicy {
+	case selectRandom:
+		members := s.getMembers(ctx)
+		// Remove myself from the set.
+		delete(members, s.name)
+		if len(members) == 0 {
+			return "", verror.New(verror.ErrInternal, ctx, "no useful peer")
+		}
+
+		// Pick a peer at random.
+		ind := randIntn(len(members))
+		for m := range members {
+			if ind == 0 {
+				return m, nil
+			}
+			ind--
+		}
+		return "", verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
+	default:
+		return "", verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
+	}
+}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 7225404..91ba499 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -54,10 +54,13 @@
 }
 
 // memberInfo holds the member metadata for each SyncGroup this member belongs
-// to within each App/Database (i.e. global database name).  It's a mapping of
-// global DB names to sets of SyncGroup member information.
+// to within each App/Database (i.e. global database name). It's a mapping of
+// global DB names to sets of SyncGroup member information. It also maintains
+// all the mount table candidates that could be used to reach this peer, learned
+// from the SyncGroup metadata.
 type memberInfo struct {
-	db2sg map[string]sgMemberInfo
+	db2sg    map[string]sgMemberInfo
+	mtTables map[string]struct{}
 }
 
 // sgMemberInfo maps SyncGroups to their member metadata.
@@ -220,12 +223,20 @@
 			// A member's info is different across SyncGroups, so gather all of them.
 			for member, info := range sg.Joiners {
 				if _, ok := newMembers[member]; !ok {
-					newMembers[member] = &memberInfo{db2sg: make(map[string]sgMemberInfo)}
+					newMembers[member] = &memberInfo{
+						db2sg:    make(map[string]sgMemberInfo),
+						mtTables: make(map[string]struct{}),
+					}
 				}
 				if _, ok := newMembers[member].db2sg[name]; !ok {
 					newMembers[member].db2sg[name] = make(sgMemberInfo)
 				}
 				newMembers[member].db2sg[name][sg.Id] = info
+
+				// Collect mount tables.
+				for _, mt := range sg.Spec.MountTables {
+					newMembers[member].mtTables[mt] = struct{}{}
+				}
 			}
 			return false
 		})
@@ -291,13 +302,19 @@
 	}
 
 	// Make a copy.
-	infoCopy := &memberInfo{make(map[string]sgMemberInfo)}
+	infoCopy := &memberInfo{
+		db2sg:    make(map[string]sgMemberInfo),
+		mtTables: make(map[string]struct{}),
+	}
 	for gdbName, sgInfo := range info.db2sg {
 		infoCopy.db2sg[gdbName] = make(sgMemberInfo)
 		for gid, mi := range sgInfo {
 			infoCopy.db2sg[gdbName][gid] = mi
 		}
 	}
+	for mt := range info.mtTables {
+		infoCopy.mtTables[mt] = struct{}{}
+	}
 
 	return infoCopy
 }
@@ -407,6 +424,10 @@
 	vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
 	defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
 
+	// Get this Syncbase's sync module handle.
+	ss := sd.sync.(*syncService)
+	var sg *interfaces.SyncGroup
+
 	err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
 		// Check permissions on Database.
 		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
@@ -419,11 +440,8 @@
 		// TODO(hpucha): Do some SG ACL checking. Check creator
 		// has Admin privilege.
 
-		// Get this Syncbase's sync module handle.
-		ss := sd.sync.(*syncService)
-
 		// Instantiate sg. Add self as joiner.
-		sg := &interfaces.SyncGroup{
+		sg = &interfaces.SyncGroup{
 			Id:          newSyncGroupId(),
 			Name:        sgName,
 			SpecVersion: newSyncGroupVersion(),
@@ -449,6 +467,7 @@
 		return err
 	}
 
+	ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
 	// Local SG create succeeded. Publish the SG at the chosen server.
 	sd.publishSyncGroup(ctx, call, sgName)
 
@@ -542,6 +561,8 @@
 		return nullSpec, err
 	}
 
+	ss.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+
 	// Publish at the chosen mount table and in the neighborhood.
 	sd.publishInMountTables(ctx, call, sg.Spec)
 
@@ -841,7 +862,11 @@
 		return addSyncGroup(ctx, tx, &sg)
 	})
 
-	return err
+	if err != nil {
+		return err
+	}
+	s.initSyncStateInMem(ctx, sg.AppName, sg.DbName, sg.Id)
+	return nil
 }
 
 func (s *syncService) JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName, joinerName string, joinerInfo wire.SyncGroupMemberInfo) (interfaces.SyncGroup, error) {
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index e48e1f6..4dc5845 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -329,7 +329,8 @@
 		Creator:     "mockCreator",
 		SpecVersion: "etag-1",
 		Spec: nosql.SyncGroupSpec{
-			Prefixes: []string{"foo"},
+			MountTables: []string{"mt1"},
+			Prefixes:    []string{"foo"},
 		},
 		Joiners: map[string]nosql.SyncGroupMemberInfo{
 			"phone":  nosql.SyncGroupMemberInfo{SyncPriority: 10},
@@ -345,7 +346,8 @@
 		Creator:     "mockCreator",
 		SpecVersion: "etag-2",
 		Spec: nosql.SyncGroupSpec{
-			Prefixes: []string{"bar"},
+			MountTables: []string{"mt2", "mt3"},
+			Prefixes:    []string{"bar"},
 		},
 		Joiners: map[string]nosql.SyncGroupMemberInfo{
 			"tablet": nosql.SyncGroupMemberInfo{SyncPriority: 111},
@@ -383,6 +385,11 @@
 		t.Errorf("invalid SyncGroup members: got %v instead of %v", members, expMembers)
 	}
 
+	mt2and3 := map[string]struct{}{
+		"mt2": struct{}{},
+		"mt3": struct{}{},
+	}
+
 	expMemberInfo := map[string]*memberInfo{
 		"phone": &memberInfo{
 			db2sg: map[string]sgMemberInfo{
@@ -390,6 +397,7 @@
 					sgId1: sg1.Joiners["phone"],
 				},
 			},
+			mtTables: map[string]struct{}{"mt1": struct{}{}},
 		},
 		"tablet": &memberInfo{
 			db2sg: map[string]sgMemberInfo{
@@ -398,6 +406,11 @@
 					sgId2: sg2.Joiners["tablet"],
 				},
 			},
+			mtTables: map[string]struct{}{
+				"mt1": struct{}{},
+				"mt2": struct{}{},
+				"mt3": struct{}{},
+			},
 		},
 		"cloud": &memberInfo{
 			db2sg: map[string]sgMemberInfo{
@@ -405,6 +418,7 @@
 					sgId1: sg1.Joiners["cloud"],
 				},
 			},
+			mtTables: map[string]struct{}{"mt1": struct{}{}},
 		},
 		"door": &memberInfo{
 			db2sg: map[string]sgMemberInfo{
@@ -412,6 +426,7 @@
 					sgId2: sg2.Joiners["door"],
 				},
 			},
+			mtTables: mt2and3,
 		},
 		"lamp": &memberInfo{
 			db2sg: map[string]sgMemberInfo{
@@ -419,6 +434,7 @@
 					sgId2: sg2.Joiners["lamp"],
 				},
 			},
+			mtTables: mt2and3,
 		},
 	}
 
@@ -430,7 +446,7 @@
 		}
 		expInfo := expMemberInfo[mm]
 		if !reflect.DeepEqual(mi, expInfo) {
-			t.Errorf("invalid Info for SyncGroup member %s: got %v instead of %v", mm, mi, expInfo)
+			t.Errorf("invalid Info for SyncGroup member %s: got %#v instead of %#v", mm, mi, expInfo)
 		}
 	}
 
@@ -462,6 +478,7 @@
 					sgId2: sg2.Joiners["tablet"],
 				},
 			},
+			mtTables: mt2and3,
 		},
 		"door": &memberInfo{
 			db2sg: map[string]sgMemberInfo{
@@ -469,6 +486,7 @@
 					sgId2: sg2.Joiners["door"],
 				},
 			},
+			mtTables: mt2and3,
 		},
 		"lamp": &memberInfo{
 			db2sg: map[string]sgMemberInfo{
@@ -476,6 +494,7 @@
 					sgId2: sg2.Joiners["lamp"],
 				},
 			},
+			mtTables: mt2and3,
 		},
 	}
 
diff --git a/services/syncbase/vsync/testdata/local-init-00.log.sync b/services/syncbase/vsync/testdata/local-init-00.log.sync
index 7435348..7f655eb 100644
--- a/services/syncbase/vsync/testdata/local-init-00.log.sync
+++ b/services/syncbase/vsync/testdata/local-init-00.log.sync
@@ -1,6 +1,6 @@
 # Create an object locally and update it twice (linked-list).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addl|foo1|1|||$sync:log:10:1|0|1|false
-addl|foo1|2|1||$sync:log:10:2|0|1|false
-addl|foo1|3|2||$sync:log:10:3|0|1|false
+addl|foo1|1|||$sync:log:data:10:1|0|1|false
+addl|foo1|2|1||$sync:log:data:10:2|0|1|false
+addl|foo1|3|2||$sync:log:data:10:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-00.log.sync b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
index 060cf0c..66cdce3 100644
--- a/services/syncbase/vsync/testdata/remote-conf-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-00.log.sync
@@ -3,6 +3,6 @@
 # it from the local sync at v2, then updated separately).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|foo1|4|2||$sync:log:11:1|0|1|false
-addr|foo1|5|4||$sync:log:11:2|0|1|false
-addr|foo1|6|5||$sync:log:11:3|0|1|false
+addr|foo1|4|2||$sync:log:data:11:1|0|1|false
+addr|foo1|5|4||$sync:log:data:11:2|0|1|false
+addr|foo1|6|5||$sync:log:data:11:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-01.log.sync b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
index 2053157..5f440b5 100644
--- a/services/syncbase/vsync/testdata/remote-conf-01.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-01.log.sync
@@ -5,6 +5,6 @@
 # sees 2 graft points: v1-v4 and v2-v5.
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|foo1|4|1||$sync:log:12:1|0|1|false
-addr|foo1|5|2|4|$sync:log:11:1|0|1|false
-addr|foo1|6|5||$sync:log:11:2|0|1|false
+addr|foo1|4|1||$sync:log:data:12:1|0|1|false
+addr|foo1|5|2|4|$sync:log:data:11:1|0|1|false
+addr|foo1|6|5||$sync:log:data:11:2|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-conf-03.log.sync b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
index 673405e..edf91a0 100644
--- a/services/syncbase/vsync/testdata/remote-conf-03.log.sync
+++ b/services/syncbase/vsync/testdata/remote-conf-03.log.sync
@@ -1,6 +1,6 @@
 # Create the same object remotely from scratch and update it twice (linked-list).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|foo1|4|||$sync:log:11:1|0|1|false
-addr|foo1|5|4||$sync:log:11:2|0|1|false
-addr|foo1|6|5||$sync:log:11:3|0|1|false
+addr|foo1|4|||$sync:log:data:11:1|0|1|false
+addr|foo1|5|4||$sync:log:data:11:2|0|1|false
+addr|foo1|6|5||$sync:log:data:11:3|0|1|false
diff --git a/services/syncbase/vsync/testdata/remote-init-00.log.sync b/services/syncbase/vsync/testdata/remote-init-00.log.sync
index 35b1511..a5a4864 100644
--- a/services/syncbase/vsync/testdata/remote-init-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-init-00.log.sync
@@ -1,7 +1,7 @@
 # Create an object remotely and update it twice (linked-list).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|foo1|1|||$sync:log:11:1|0|1|false
-addr|foo1|2|1||$sync:log:11:2|0|1|false
-addr|foo1|3|2||$sync:log:11:3|0|1|false
+addr|foo1|1|||$sync:log:data:11:1|0|1|false
+addr|foo1|2|1||$sync:log:data:11:2|0|1|false
+addr|foo1|3|2||$sync:log:data:11:3|0|1|false
 genvec|foo1|10:0,11:3|bar|11:0
diff --git a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
index ef52b0a..94e144e 100644
--- a/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
+++ b/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
@@ -2,8 +2,7 @@
 # after it was created locally up to v3 (i.e. assume the remote sync
 # received it from the local sync first, then updated it).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
-
-addr|foo1|4|3||$sync:log:11:1|0|1|false
-addr|foo1|5|4||$sync:log:11:2|0|1|false
-addr|foo1|6|5||$sync:log:11:3|0|1|false
+addr|foo1|4|3||$sync:log:data:11:1|0|1|false
+addr|foo1|5|4||$sync:log:data:11:2|0|1|false
+addr|foo1|6|5||$sync:log:data:11:3|0|1|false
 genvec|foo1|10:0,11:3|bar|11:0
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index f34d415..9cf53d3 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -11,10 +11,11 @@
 // Key prefixes for sync data structures. All these prefixes are prepended with
 // util.SyncPrefix.
 const (
-	logPrefix  = "log"
-	dbssPrefix = "dbss"
-	dagPrefix  = "dag"
-	sgPrefix   = "sg"
+	logPrefix     = "log"  // log state.
+	logDataPrefix = "data" // data log state.
+	dbssPrefix    = "dbss" // database sync state.
+	dagPrefix     = "dag"  // dag state.
+	sgPrefix      = "sg"   // syncgroup state.
 )
 
 // syncData represents the persistent state of the sync module.
@@ -22,11 +23,18 @@
 	Id uint64
 }
 
+// localGenInfo represents the persistent state corresponding to local generations.
+type localGenInfo struct {
+	Gen        uint64 // local generation number incremented on every local update.
+	CheckptGen uint64 // local generation number advertised to remote peers (used by the responder).
+}
+
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
-	Gen        uint64               // local generation number incremented on every local update.
-	CheckptGen uint64               // local generation number advertised to remote peers (used by the responder).
-	GenVec     interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+	Data     localGenInfo
+	Sgs      map[interfaces.GroupId]localGenInfo
+	GenVec   interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
+	SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
 }
 
 // localLogRec represents the persistent local state of a log record. Metadata
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index f305083..c02d2d4 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -25,11 +25,23 @@
 }) {
 }
 
+// localGenInfo represents the persistent state corresponding to local generations.
+type localGenInfo struct {
+	Gen        uint64 // local generation number incremented on every local update.
+	CheckptGen uint64 // local generation number advertised to remote peers (used by the responder).
+}
+
+func (localGenInfo) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/vsync.localGenInfo"`
+}) {
+}
+
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
-	Gen        uint64               // local generation number incremented on every local update.
-	CheckptGen uint64               // local generation number advertised to remote peers (used by the responder).
-	GenVec     interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+	Data     localGenInfo
+	Sgs      map[interfaces.GroupId]localGenInfo
+	GenVec   interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for data in Database.
+	SgGenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers for SyncGroups in Database.
 }
 
 func (dbSyncState) __VDLReflect(struct {
@@ -51,14 +63,17 @@
 
 func init() {
 	vdl.Register((*syncData)(nil))
+	vdl.Register((*localGenInfo)(nil))
 	vdl.Register((*dbSyncState)(nil))
 	vdl.Register((*localLogRec)(nil))
 }
 
-const logPrefix = "log"
+const logPrefix = "log" // log state.
 
-const dbssPrefix = "dbss"
+const logDataPrefix = "data" // data log state.
 
-const dagPrefix = "dag"
+const dbssPrefix = "dbss" // database sync state.
 
-const sgPrefix = "sg"
+const dagPrefix = "dag" // dag state.
+
+const sgPrefix = "sg" // syncgroup state.
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index 2e9e6fc..9c16294 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -120,7 +120,7 @@
 	}
 
 	// Initialize Database sync state if needed.
-	s.initDbSyncStateInMem(ctx, appName, dbName)
+	s.initSyncStateInMem(ctx, appName, dbName, interfaces.NoGroupId)
 
 	// Get a batch of watch log entries, if any, after this resume marker.
 	logs, nextResmark, err := watchable.ReadBatchFromLog(st, resMark)
@@ -212,7 +212,7 @@
 		}
 	}
 
-	gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count)
+	gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, "", count)
 
 	vlog.VI(3).Infof("sync: processBatch: %s, %s: len %d, total %d, btid %x, gen %d, pos %d",
 		appName, dbName, count, totalCount, batchId, gen, pos)
@@ -251,12 +251,12 @@
 // suitably updating the DAG metadata.
 func (s *syncService) processLocalLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
 	// Insert the new log record into the log.
-	if err := putLogRec(ctx, tx, rec); err != nil {
+	if err := putLogRec(ctx, tx, logDataPrefix, rec); err != nil {
 		return err
 	}
 
 	m := rec.Metadata
-	logKey := logRecKey(m.Id, m.Gen)
+	logKey := logRecKey(logDataPrefix, m.Id, m.Gen)
 
 	// Insert the new log record into dag.
 	if err := s.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, nil); err != nil {