syncbase/vsync: Initiator module.

Change-Id: I3dfc95ee8f9a6cd6a40558bf1dfab3a2282c3e3e
diff --git a/v23/syncbase/testutil/util.go b/v23/syncbase/testutil/util.go
index 415b0a6..ceae1e6 100644
--- a/v23/syncbase/testutil/util.go
+++ b/v23/syncbase/testutil/util.go
@@ -190,7 +190,7 @@
 	if err != nil {
 		vlog.Fatal("ioutil.TempDir() failed: ", err)
 	}
-	service, err := server.NewService(nil, nil, server.ServiceOptions{
+	service, err := server.NewService(serverCtx, nil, server.ServiceOptions{
 		Perms:   perms,
 		RootDir: rootDir,
 		Engine:  "leveldb",
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl b/x/ref/services/syncbase/server/interfaces/sync.vdl
index 082d645..410371e 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl
@@ -12,15 +12,22 @@
 // Sync defines methods for data exchange between Syncbases.
 // TODO(hpucha): Flesh this out further.
 type Sync interface {
-	// GetDeltas returns the responder's current generation vector
-	// and all the missing log records when compared to the
-	// initiator's generation vector.
-	GetDeltas() error {access.Read}
+	// GetDeltas returns the responder's current generation vector and all
+	// the missing log records when compared to the initiator's generation
+	// vector. This process happens one Database at a time encompassing all
+	// the SyncGroups common to the initiator and the responder. For each
+	// Database, the initiator sends a DeltaReq. In response, the
+	// responder sends a "Start" DeltaResp record, all the missing log
+	// records, the responder's genvector, and a "Finish" DeltaResp
+	// record. The initiator parses the stream between a Start and a Finish
+	// record as the response to its DeltaReq, and then moves on to the
+	// next Database, in common with this responder.
+	GetDeltas() stream<DeltaReq, DeltaResp> error {access.Read}
 
 	// SyncGroup-related methods.
 
-	// PublishSyncGroup is typically invoked on a "central" peer
-	// to publish the SyncGroup.
+	// PublishSyncGroup is typically invoked on a "central" peer to publish
+	// the SyncGroup.
 	PublishSyncGroup(sg SyncGroup) error {access.Write}
 
 	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl.go b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
index 594ab0f..0ca399f 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
@@ -9,6 +9,7 @@
 
 import (
 	// VDL system imports
+	"io"
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
@@ -25,12 +26,19 @@
 // Sync defines methods for data exchange between Syncbases.
 // TODO(hpucha): Flesh this out further.
 type SyncClientMethods interface {
-	// GetDeltas returns the responder's current generation vector
-	// and all the missing log records when compared to the
-	// initiator's generation vector.
-	GetDeltas(*context.T, ...rpc.CallOpt) error
-	// PublishSyncGroup is typically invoked on a "central" peer
-	// to publish the SyncGroup.
+	// GetDeltas returns the responder's current generation vector and all
+	// the missing log records when compared to the initiator's generation
+	// vector. This process happens one Database at a time encompassing all
+	// the SyncGroups common to the initiator and the responder. For each
+	// Database, the initiator sends a DeltaReq. In response, the
+	// responder sends a "Start" DeltaResp record, all the missing log
+	// records, the responder's genvector, and a "Finish" DeltaResp
+	// record. The initiator parses the stream between a Start and a Finish
+	// record as the response to its DeltaReq, and then moves on to the
+	// next Database, in common with this responder.
+	GetDeltas(*context.T, ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+	// PublishSyncGroup is typically invoked on a "central" peer to publish
+	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
 	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
 	// Syncbase on a SyncGroup admin. It checks whether the requestor is
@@ -57,8 +65,12 @@
 	name string
 }
 
-func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "GetDeltas", nil, nil, opts...)
+func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+	var call rpc.ClientCall
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", nil, opts...); err != nil {
+		return
+	}
+	ocall = &implSyncGetDeltasClientCall{ClientCall: call}
 	return
 }
 
@@ -77,18 +89,127 @@
 	return
 }
 
+// SyncGetDeltasClientStream is the client stream for Sync.GetDeltas.
+type SyncGetDeltasClientStream interface {
+	// RecvStream returns the receiver side of the Sync.GetDeltas client stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() DeltaResp
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+	// SendStream returns the send side of the Sync.GetDeltas client stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors
+		// encountered while sending, or if Send is called after Close or
+		// the stream has been canceled.  Blocks if there is no buffer
+		// space; will unblock when buffer space is available or after
+		// the stream has been canceled.
+		Send(item DeltaReq) error
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.
+		// This is an optional call - e.g. a client might call Close if it
+		// needs to continue receiving items from the server after it's
+		// done sending.  Returns errors encountered while closing, or if
+		// Close is called after the stream has been canceled.  Like Send,
+		// blocks if there is no buffer space available.
+		Close() error
+	}
+}
+
+// SyncGetDeltasClientCall represents the call returned from Sync.GetDeltas.
+type SyncGetDeltasClientCall interface {
+	SyncGetDeltasClientStream
+	// Finish performs the equivalent of SendStream().Close, then blocks until
+	// the server is done, and returns the positional return values for the call.
+	//
+	// Finish returns immediately if the call has been canceled; depending on the
+	// timing the output could either be an error signaling cancelation, or the
+	// valid positional return values from the server.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless the call
+	// has been canceled or any of the other methods return an error.  Finish should
+	// be called at most once.
+	Finish() error
+}
+
+type implSyncGetDeltasClientCall struct {
+	rpc.ClientCall
+	valRecv DeltaResp
+	errRecv error
+}
+
+func (c *implSyncGetDeltasClientCall) RecvStream() interface {
+	Advance() bool
+	Value() DeltaResp
+	Err() error
+} {
+	return implSyncGetDeltasClientCallRecv{c}
+}
+
+type implSyncGetDeltasClientCallRecv struct {
+	c *implSyncGetDeltasClientCall
+}
+
+func (c implSyncGetDeltasClientCallRecv) Advance() bool {
+	c.c.errRecv = c.c.Recv(&c.c.valRecv)
+	return c.c.errRecv == nil
+}
+func (c implSyncGetDeltasClientCallRecv) Value() DeltaResp {
+	return c.c.valRecv
+}
+func (c implSyncGetDeltasClientCallRecv) Err() error {
+	if c.c.errRecv == io.EOF {
+		return nil
+	}
+	return c.c.errRecv
+}
+func (c *implSyncGetDeltasClientCall) SendStream() interface {
+	Send(item DeltaReq) error
+	Close() error
+} {
+	return implSyncGetDeltasClientCallSend{c}
+}
+
+type implSyncGetDeltasClientCallSend struct {
+	c *implSyncGetDeltasClientCall
+}
+
+func (c implSyncGetDeltasClientCallSend) Send(item DeltaReq) error {
+	return c.c.Send(item)
+}
+func (c implSyncGetDeltasClientCallSend) Close() error {
+	return c.c.CloseSend()
+}
+func (c *implSyncGetDeltasClientCall) Finish() (err error) {
+	err = c.ClientCall.Finish()
+	return
+}
+
 // SyncServerMethods is the interface a server writer
 // implements for Sync.
 //
 // Sync defines methods for data exchange between Syncbases.
 // TODO(hpucha): Flesh this out further.
 type SyncServerMethods interface {
-	// GetDeltas returns the responder's current generation vector
-	// and all the missing log records when compared to the
-	// initiator's generation vector.
-	GetDeltas(*context.T, rpc.ServerCall) error
-	// PublishSyncGroup is typically invoked on a "central" peer
-	// to publish the SyncGroup.
+	// GetDeltas returns the responder's current generation vector and all
+	// the missing log records when compared to the initiator's generation
+	// vector. This process happens one Database at a time encompassing all
+	// the SyncGroups common to the initiator and the responder. For each
+	// Database, the initiator sends a DeltaReq. In response, the
+	// responder sends a "Start" DeltaResp record, all the missing log
+	// records, the responder's genvector, and a "Finish" DeltaResp
+	// record. The initiator parses the stream between a Start and a Finish
+	// record as the response to its DeltaReq, and then moves on to the
+	// next Database, in common with this responder.
+	GetDeltas(*context.T, SyncGetDeltasServerCall) error
+	// PublishSyncGroup is typically invoked on a "central" peer to publish
+	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
 	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
 	// Syncbase on a SyncGroup admin. It checks whether the requestor is
@@ -102,9 +223,32 @@
 
 // SyncServerStubMethods is the server interface containing
 // Sync methods, as expected by rpc.Server.
-// There is no difference between this interface and SyncServerMethods
-// since there are no streaming methods.
-type SyncServerStubMethods SyncServerMethods
+// The only difference between this interface and SyncServerMethods
+// is the streaming methods.
+type SyncServerStubMethods interface {
+	// GetDeltas returns the responder's current generation vector and all
+	// the missing log records when compared to the initiator's generation
+	// vector. This process happens one Database at a time encompassing all
+	// the SyncGroups common to the initiator and the responder. For each
+	// Database, the initiator sends a DeltaReq. In response, the
+	// responder sends a "Start" DeltaResp record, all the missing log
+	// records, the responder's genvector, and a "Finish" DeltaResp
+	// record. The initiator parses the stream between a Start and a Finish
+	// record as the response to its DeltaReq, and then moves on to the
+	// next Database, in common with this responder.
+	GetDeltas(*context.T, *SyncGetDeltasServerCallStub) error
+	// PublishSyncGroup is typically invoked on a "central" peer to publish
+	// the SyncGroup.
+	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
+	// JoinSyncGroupAtAdmin is invoked by a prospective SyncGroup member's
+	// Syncbase on a SyncGroup admin. It checks whether the requestor is
+	// allowed to join the named SyncGroup, and if so, adds the requestor to
+	// the SyncGroup.
+	JoinSyncGroupAtAdmin(ctx *context.T, call rpc.ServerCall, sgName string, joinerName string, myInfo nosql.SyncGroupMemberInfo) (SyncGroup, error)
+	// BlobSync methods.
+	// FetchBlob returns the requested blob.
+	FetchBlob(*context.T, rpc.ServerCall) error
+}
 
 // SyncServerStub adds universal methods to SyncServerStubMethods.
 type SyncServerStub interface {
@@ -135,7 +279,7 @@
 	gs   *rpc.GlobState
 }
 
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call rpc.ServerCall) error {
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub) error {
 	return s.impl.GetDeltas(ctx, call)
 }
 
@@ -170,12 +314,12 @@
 	Methods: []rpc.MethodDesc{
 		{
 			Name: "GetDeltas",
-			Doc:  "// GetDeltas returns the responder's current generation vector\n// and all the missing log records when compared to the\n// initiator's generation vector.",
+			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database, in common with this responder.",
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
 			Name: "PublishSyncGroup",
-			Doc:  "// PublishSyncGroup is typically invoked on a \"central\" peer\n// to publish the SyncGroup.",
+			Doc:  "// PublishSyncGroup is typically invoked on a \"central\" peer to publish\n// the SyncGroup.",
 			InArgs: []rpc.ArgDesc{
 				{"sg", ``}, // SyncGroup
 			},
@@ -201,3 +345,88 @@
 		},
 	},
 }
+
+// SyncGetDeltasServerStream is the server stream for Sync.GetDeltas.
+type SyncGetDeltasServerStream interface {
+	// RecvStream returns the receiver side of the Sync.GetDeltas server stream.
+	RecvStream() interface {
+		// Advance stages an item so that it may be retrieved via Value.  Returns
+		// true iff there is an item to retrieve.  Advance must be called before
+		// Value is called.  May block if an item is not available.
+		Advance() bool
+		// Value returns the item that was staged by Advance.  May panic if Advance
+		// returned false or was not called.  Never blocks.
+		Value() DeltaReq
+		// Err returns any error encountered by Advance.  Never blocks.
+		Err() error
+	}
+	// SendStream returns the send side of the Sync.GetDeltas server stream.
+	SendStream() interface {
+		// Send places the item onto the output stream.  Returns errors encountered
+		// while sending.  Blocks if there is no buffer space; will unblock when
+		// buffer space is available.
+		Send(item DeltaResp) error
+	}
+}
+
+// SyncGetDeltasServerCall represents the context passed to Sync.GetDeltas.
+type SyncGetDeltasServerCall interface {
+	rpc.ServerCall
+	SyncGetDeltasServerStream
+}
+
+// SyncGetDeltasServerCallStub is a wrapper that converts rpc.StreamServerCall into
+// a typesafe stub that implements SyncGetDeltasServerCall.
+type SyncGetDeltasServerCallStub struct {
+	rpc.StreamServerCall
+	valRecv DeltaReq
+	errRecv error
+}
+
+// Init initializes SyncGetDeltasServerCallStub from rpc.StreamServerCall.
+func (s *SyncGetDeltasServerCallStub) Init(call rpc.StreamServerCall) {
+	s.StreamServerCall = call
+}
+
+// RecvStream returns the receiver side of the Sync.GetDeltas server stream.
+func (s *SyncGetDeltasServerCallStub) RecvStream() interface {
+	Advance() bool
+	Value() DeltaReq
+	Err() error
+} {
+	return implSyncGetDeltasServerCallRecv{s}
+}
+
+type implSyncGetDeltasServerCallRecv struct {
+	s *SyncGetDeltasServerCallStub
+}
+
+func (s implSyncGetDeltasServerCallRecv) Advance() bool {
+	s.s.valRecv = DeltaReq{}
+	s.s.errRecv = s.s.Recv(&s.s.valRecv)
+	return s.s.errRecv == nil
+}
+func (s implSyncGetDeltasServerCallRecv) Value() DeltaReq {
+	return s.s.valRecv
+}
+func (s implSyncGetDeltasServerCallRecv) Err() error {
+	if s.s.errRecv == io.EOF {
+		return nil
+	}
+	return s.s.errRecv
+}
+
+// SendStream returns the send side of the Sync.GetDeltas server stream.
+func (s *SyncGetDeltasServerCallStub) SendStream() interface {
+	Send(item DeltaResp) error
+} {
+	return implSyncGetDeltasServerCallSend{s}
+}
+
+type implSyncGetDeltasServerCallSend struct {
+	s *SyncGetDeltasServerCallStub
+}
+
+func (s implSyncGetDeltasServerCallSend) Send(item DeltaResp) error {
+	return s.s.Send(item)
+}
diff --git a/x/ref/services/syncbase/server/interfaces/sync_types.vdl b/x/ref/services/syncbase/server/interfaces/sync_types.vdl
index bb6f4d4..caf0339 100644
--- a/x/ref/services/syncbase/server/interfaces/sync_types.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync_types.vdl
@@ -32,8 +32,8 @@
 type PrefixGenVector map[uint64]uint64
 
 // GenVector is the generation vector for a Database, and maps prefixes to their
-// generation vectors. Note that the prefixes in a GenVector are global prefixes
-// that include the appropriate Application and Database name.
+// generation vectors. Note that the prefixes in a GenVector are relative to the
+// the Application and Database name.
 type GenVector map[string]PrefixGenVector
 
 // LogRecMetadata represents the metadata of a single log record that is
@@ -49,7 +49,7 @@
 	RecType   byte   // type of log record.
 
         // Object related information.
-        ObjId       string      // id of the object that was updated.
+        ObjId       string      // id of the object that was updated. This id is relative to Application and Database names.
         CurVers     string      // current version number of the object.
         Parents     []string    // 0, 1 or 2 parent versions that the current version is derived from.
 	UpdTime     time.Time   // timestamp when the update is generated.
@@ -94,3 +94,19 @@
 	Status      SyncGroupStatus                     // Status of the SyncGroup
 	Joiners     map[string]wire.SyncGroupMemberInfo // map of joiners to their metadata
 }
+
+// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
+// interested with in a Database when requesting deltas for that Database.
+type DeltaReq struct {
+	SgIds   set[GroupId]
+	InitVec GenVector
+}
+
+// DeltaResp contains the responder's genvector or the missing log records
+// returned in response to an initiator's request for deltas for a Database.
+type DeltaResp union {
+	Start   bool
+	Finish  bool
+	Rec     LogRec
+	RespVec GenVector
+}
diff --git a/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go b/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
index a22b37d..a22731d 100644
--- a/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -28,8 +28,8 @@
 }
 
 // GenVector is the generation vector for a Database, and maps prefixes to their
-// generation vectors. Note that the prefixes in a GenVector are global prefixes
-// that include the appropriate Application and Database name.
+// generation vectors. Note that the prefixes in a GenVector are relative to the
+// the Application and Database name.
 type GenVector map[string]PrefixGenVector
 
 func (GenVector) __VDLReflect(struct {
@@ -49,7 +49,7 @@
 	Gen     uint64 // generation number for the log record.
 	RecType byte   // type of log record.
 	// Object related information.
-	ObjId      string    // id of the object that was updated.
+	ObjId      string    // id of the object that was updated. This id is relative to Application and Database names.
 	CurVers    string    // current version number of the object.
 	Parents    []string  // 0, 1 or 2 parent versions that the current version is derived from.
 	UpdTime    time.Time // timestamp when the update is generated.
@@ -155,6 +155,74 @@
 }) {
 }
 
+// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
+// interested with in a Database when requesting deltas for that Database.
+type DeltaReq struct {
+	SgIds   map[GroupId]struct{}
+	InitVec GenVector
+}
+
+func (DeltaReq) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+}) {
+}
+
+type (
+	// DeltaResp represents any single field of the DeltaResp union type.
+	//
+	// DeltaResp contains the responder's genvector or the missing log records
+	// returned in response to an initiator's request for deltas for a Database.
+	DeltaResp interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the DeltaResp union type.
+		__VDLReflect(__DeltaRespReflect)
+	}
+	// DeltaRespStart represents field Start of the DeltaResp union type.
+	DeltaRespStart struct{ Value bool }
+	// DeltaRespFinish represents field Finish of the DeltaResp union type.
+	DeltaRespFinish struct{ Value bool }
+	// DeltaRespRec represents field Rec of the DeltaResp union type.
+	DeltaRespRec struct{ Value LogRec }
+	// DeltaRespRespVec represents field RespVec of the DeltaResp union type.
+	DeltaRespRespVec struct{ Value GenVector }
+	// __DeltaRespReflect describes the DeltaResp union type.
+	__DeltaRespReflect struct {
+		Name  string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.DeltaResp"`
+		Type  DeltaResp
+		Union struct {
+			Start   DeltaRespStart
+			Finish  DeltaRespFinish
+			Rec     DeltaRespRec
+			RespVec DeltaRespRespVec
+		}
+	}
+)
+
+func (x DeltaRespStart) Index() int                      { return 0 }
+func (x DeltaRespStart) Interface() interface{}          { return x.Value }
+func (x DeltaRespStart) Name() string                    { return "Start" }
+func (x DeltaRespStart) __VDLReflect(__DeltaRespReflect) {}
+
+func (x DeltaRespFinish) Index() int                      { return 1 }
+func (x DeltaRespFinish) Interface() interface{}          { return x.Value }
+func (x DeltaRespFinish) Name() string                    { return "Finish" }
+func (x DeltaRespFinish) __VDLReflect(__DeltaRespReflect) {}
+
+func (x DeltaRespRec) Index() int                      { return 2 }
+func (x DeltaRespRec) Interface() interface{}          { return x.Value }
+func (x DeltaRespRec) Name() string                    { return "Rec" }
+func (x DeltaRespRec) __VDLReflect(__DeltaRespReflect) {}
+
+func (x DeltaRespRespVec) Index() int                      { return 3 }
+func (x DeltaRespRespVec) Interface() interface{}          { return x.Value }
+func (x DeltaRespRespVec) Name() string                    { return "RespVec" }
+func (x DeltaRespRespVec) __VDLReflect(__DeltaRespReflect) {}
+
 func init() {
 	vdl.Register((*PrefixGenVector)(nil))
 	vdl.Register((*GenVector)(nil))
@@ -163,6 +231,8 @@
 	vdl.Register((*GroupId)(nil))
 	vdl.Register((*SyncGroupStatus)(nil))
 	vdl.Register((*SyncGroup)(nil))
+	vdl.Register((*DeltaReq)(nil))
+	vdl.Register((*DeltaResp)(nil))
 }
 
 const NoGroupId = GroupId(0)
diff --git a/x/ref/services/syncbase/server/watchable/util.go b/x/ref/services/syncbase/server/watchable/util.go
index 5eb1bb4..3dc3e78 100644
--- a/x/ref/services/syncbase/server/watchable/util.go
+++ b/x/ref/services/syncbase/server/watchable/util.go
@@ -16,6 +16,7 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/context"
 	"v.io/v23/verror"
 )
 
@@ -32,6 +33,34 @@
 	return []byte(join(string(key), string(version)))
 }
 
+// GetVersion returns the current version of a managed key. This method is used
+// by the Sync module when the initiator is attempting to add new versions of
+// objects. Reading the version key is used for optimistic concurrency control.
+func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
+	wtx := st.(*transaction)
+
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return nil, convertError(wtx.err)
+	}
+	return getVersion(wtx.itx, key)
+}
+
+// GetAtVersion returns the value of a managed key at the requested
+// version. This method is used by the Sync module when the responder needs to
+// send objects over the wire.
+func GetAtVersion(ctx *context.T, st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
+	wtx := st.(*transaction)
+
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return valbuf, convertError(wtx.err)
+	}
+	return getAtVersion(wtx.itx, key, valbuf, version)
+}
+
 func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
 	return st.Get(makeVersionKey(key), nil)
 }
@@ -49,6 +78,41 @@
 	return getAtVersion(st, key, valbuf, version)
 }
 
+// PutAtVersion puts a value for the managed key at the requested version. This
+// method is used by the Sync module exclusively when the initiator adds objects
+// with versions created on other Syncbases.
+func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
+	wtx := tx.(*transaction)
+
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+
+	return wtx.itx.Put(makeAtVersionKey(key, version), valbuf)
+}
+
+// PutVersion updates the version of a managed key to the requested
+// version. This method is used by the Sync module exclusively when the
+// initiator selects which of the already stored versions (via PutAtVersion
+// calls) becomes the current version.
+func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
+	wtx := tx.(*transaction)
+
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+
+	if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
+		return err
+	}
+	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Value: version}})
+	return nil
+}
+
 func putVersioned(tx store.Transaction, key, value []byte) error {
 	rngLock.Lock()
 	num := rng.Int63()
diff --git a/x/ref/services/syncbase/syncbased/main.go b/x/ref/services/syncbase/syncbased/main.go
index d158752..7e1b52b 100644
--- a/x/ref/services/syncbase/syncbased/main.go
+++ b/x/ref/services/syncbase/syncbased/main.go
@@ -56,7 +56,7 @@
 		perms = defaultPerms(security.DefaultBlessingPatterns(v23.GetPrincipal(ctx)))
 	}
 	vlog.Infof("Perms: %v", perms)
-	service, err := server.NewService(nil, nil, server.ServiceOptions{
+	service, err := server.NewService(ctx, nil, server.ServiceOptions{
 		Perms:   perms,
 		RootDir: *rootDir,
 		Engine:  *engine,
diff --git a/x/ref/services/syncbase/vsync/cr.go b/x/ref/services/syncbase/vsync/cr.go
new file mode 100644
index 0000000..f7b1827
--- /dev/null
+++ b/x/ref/services/syncbase/vsync/cr.go
@@ -0,0 +1,135 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+	"v.io/v23/context"
+	"v.io/v23/verror"
+)
+
+// Policies for conflict resolution.
+// TODO(hpucha): Move relevant parts to client-facing vdl.
+const (
+	// Resolves conflicts by picking the update with the most recent timestamp.
+	useTime = iota
+
+	// TODO(hpucha): implement other policies.
+	// Resolves conflicts by using the app conflict resolver callbacks via store.
+	useCallback
+)
+
+var (
+	// conflictResolutionPolicy is the policy used to resolve conflicts.
+	conflictResolutionPolicy = useTime
+)
+
+// resolutionType represents how a conflict is resolved.
+type resolutionType byte
+
+const (
+	pickLocal  resolutionType = iota // local update was chosen as the resolution.
+	pickRemote                       // remote update was chosen as the resolution.
+	createNew                        // new update was created as the resolution.
+)
+
+// conflictResolution represents the state of a conflict resolution.
+type conflictResolution struct {
+	ty  resolutionType
+	rec *localLogRec // Valid only if ty == createNew.
+	val []byte       // Valid only if ty == createNew.
+}
+
+// resolveConflicts resolves conflicts for updated objects. Conflicts may be
+// resolved by adding new versions or picking either the local or the remote
+// version.
+func (iSt *initiationState) resolveConflicts(ctx *context.T) error {
+	for obj, st := range iSt.updObjects {
+		if !st.isConflict {
+			continue
+		}
+
+		// TODO(hpucha): Look up policy from the schema. Currently,
+		// hardcoded to time.
+		var err error
+		st.res, err = iSt.resolveObjConflict(ctx, obj, st.oldHead, st.newHead, st.ancestor)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+// resolveObjConflict resolves a conflict for an object given its ID and the 3
+// versions that express the conflict: the object's local version, its remote
+// version (from the device contacted), and the common ancestor from which both
+// versions branched away.  The function returns the new object value according
+// to the conflict resolution policy.
+func (iSt *initiationState) resolveObjConflict(ctx *context.T, oid, local, remote, ancestor string) (*conflictResolution, error) {
+	// Fetch the log records of the 3 object versions.
+	versions := []string{local, remote, ancestor}
+	lrecs, err := iSt.getLogRecsBatch(ctx, oid, versions)
+	if err != nil {
+		return nil, err
+	}
+
+	// Resolve the conflict according to the resolution policy.
+	switch conflictResolutionPolicy {
+	case useTime:
+		return iSt.resolveObjConflictByTime(ctx, oid, lrecs[0], lrecs[1], lrecs[2])
+	default:
+		return nil, verror.New(verror.ErrInternal, ctx, "unknown conflict resolution policy", conflictResolutionPolicy)
+	}
+}
+
+// resolveObjConflictByTime resolves conflicts using the timestamps of the
+// conflicting mutations.  It picks a mutation with the larger timestamp,
+// i.e. the most recent update.  If the timestamps are equal, it uses the
+// mutation version numbers as a tie-breaker, picking the mutation with the
+// larger version.  Instead of creating a new version that resolves the
+// conflict, we pick an existing version as the conflict resolution.
+func (iSt *initiationState) resolveObjConflictByTime(ctx *context.T, oid string, local, remote, ancestor *localLogRec) (*conflictResolution, error) {
+	var res conflictResolution
+	switch {
+	case local.Metadata.UpdTime.After(remote.Metadata.UpdTime):
+		res.ty = pickLocal
+	case local.Metadata.UpdTime.Before(remote.Metadata.UpdTime):
+		res.ty = pickRemote
+	case local.Metadata.CurVers > remote.Metadata.CurVers:
+		res.ty = pickLocal
+	case local.Metadata.CurVers < remote.Metadata.CurVers:
+		res.ty = pickRemote
+	}
+
+	return &res, nil
+}
+
+// getLogRecsBatch gets the log records for an array of versions.
+func (iSt *initiationState) getLogRecsBatch(ctx *context.T, obj string, versions []string) ([]*localLogRec, error) {
+	lrecs := make([]*localLogRec, len(versions))
+	var err error
+	for p, v := range versions {
+		lrecs[p], err = iSt.getLogRec(ctx, obj, v)
+		if err != nil {
+			return nil, err
+		}
+	}
+	return lrecs, nil
+}
+
+// getLogRec returns the log record corresponding to a given object and its version.
+func (iSt *initiationState) getLogRec(ctx *context.T, obj, vers string) (*localLogRec, error) {
+	// TODO(hpucha): May be the change the name in dag for getLogrec. We now
+	// have a few functions of this name.
+	logKey, err := getLogrec(ctx, iSt.tx, obj, vers)
+	if err != nil {
+		return nil, err
+	}
+	dev, gen, err := splitLogRecKey(ctx, logKey)
+	if err != nil {
+		return nil, err
+	}
+	return getLogRec(ctx, iSt.tx, dev, gen)
+}
diff --git a/x/ref/services/syncbase/vsync/dag_test.go b/x/ref/services/syncbase/vsync/dag_test.go
index c3f135a..6177079 100644
--- a/x/ref/services/syncbase/vsync/dag_test.go
+++ b/x/ref/services/syncbase/vsync/dag_test.go
@@ -108,7 +108,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid, version := "1234", "7"
+	oid, version := "foo1", "7"
 
 	tx := st.NewTransaction()
 	if err := s.addParent(nil, tx, oid, version, "haha", nil); err == nil {
@@ -236,7 +236,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
 		t.Fatal(err)
@@ -293,7 +293,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	graft, err := s.dagReplayCommands(nil, "remote-init-00.log.sync")
 	if err != nil {
@@ -335,7 +335,7 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
 	}
 
@@ -367,7 +367,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
 		t.Fatal(err)
@@ -411,10 +411,10 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %v", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %v", oid, newHead, logrec)
 	}
 
@@ -450,7 +450,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
 		t.Fatal(err)
@@ -494,13 +494,13 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "logrec-01" {
+	if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
 		t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
 	}
 
@@ -543,7 +543,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
 		t.Fatal(err)
@@ -587,13 +587,13 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:1" {
+	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:2" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "logrec-01" {
+	if logrec, err := getLogrec(nil, st, oid, ancestor); err != nil || logrec != "$sync:log:10:2" {
 		t.Errorf("invalid logrec for ancestor object %s:%s: %s", oid, ancestor, logrec)
 	}
 
@@ -630,7 +630,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
 		t.Fatal(err)
@@ -674,10 +674,10 @@
 			oid, isConflict, newHead, oldHead, ancestor, errConflict)
 	}
 
-	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "logrec-02" {
+	if logrec, err := getLogrec(nil, st, oid, oldHead); err != nil || logrec != "$sync:log:10:3" {
 		t.Errorf("invalid logrec for oldhead object %s:%s: %s", oid, oldHead, logrec)
 	}
-	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "VeyronPhone:10:1:2" {
+	if logrec, err := getLogrec(nil, st, oid, newHead); err != nil || logrec != "$sync:log:11:3" {
 		t.Errorf("invalid logrec for newhead object %s:%s: %s", oid, newHead, logrec)
 	}
 
@@ -919,7 +919,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
 		t.Fatal(err)
@@ -989,7 +989,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
 		t.Fatal(err)
@@ -1049,7 +1049,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
 		t.Fatal(err)
@@ -1111,7 +1111,7 @@
 	st := svc.St()
 	s := svc.sync
 
-	oid := "1234"
+	oid := "foo1"
 
 	if _, err := s.dagReplayCommands(nil, "local-init-00.log.sync"); err != nil {
 		t.Fatal(err)
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index f836cb2..1624301 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -4,18 +4,59 @@
 
 package vsync
 
-import "time"
+import (
+	"sort"
+	"strings"
+	"time"
 
-var (
-	// peerSyncInterval is the duration between two consecutive
-	// peer contacts.  During every peer contact, the initiator
-	// obtains any pending updates from that peer.
-	peerSyncInterval = 50 * time.Millisecond
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/verror"
+	"v.io/x/lib/set"
+	"v.io/x/lib/vlog"
 )
 
-// contactPeers wakes up every peerSyncInterval to select a peer, and
-// get updates from it.
-func (s *syncService) contactPeers() {
+// Policies to pick a peer to sync with.
+const (
+	// Picks a peer at random from the available set.
+	selectRandom = iota
+
+	// TODO(hpucha): implement other policies.
+	// Picks a peer with most differing generations.
+	selectMostDiff
+
+	// Picks a peer that was synced with the furthest in the past.
+	selectOldest
+)
+
+var (
+	// peerSyncInterval is the duration between two consecutive peer
+	// contacts. During every peer contact, the initiator obtains any
+	// pending updates from that peer.
+	peerSyncInterval = 50 * time.Millisecond
+
+	// peerSelectionPolicy is the policy used to select a peer when
+	// the initiator gets a chance to sync.
+	peerSelectionPolicy = selectRandom
+)
+
+// syncer wakes up every peerSyncInterval to do work: (1) Act as an initiator
+// for SyncGroup metadata by selecting a SyncGroup Admin, and syncing Syncgroup
+// metadata with it (getting updates from the remote peer, detecting and
+// resolving conflicts); (2) Refresh memberView if needed and act as an
+// initiator for data by selecting a peer, and syncing data corresponding to all
+// common SyncGroups across all Databases; (3) Act as a SyncGroup publisher to
+// publish pending SyncGroups; (4) Garbage collect older generations.
+//
+// TODO(hpucha): Currently only does initiation. Add rest.
+func (s *syncService) syncer(ctx *context.T) {
+	// TODO(hpucha): Do we need context per initiator round?
+	ctx, cancel := context.WithRootCancel(ctx)
+	defer cancel()
+
 	ticker := time.NewTicker(peerSyncInterval)
 	for {
 		select {
@@ -27,5 +68,735 @@
 		}
 
 		// Do work.
+		peer, err := s.pickPeer(ctx)
+		if err != nil {
+			continue
+		}
+		s.getDeltasFromPeer(ctx, peer)
+	}
+}
+
+// getDeltasFromPeer performs an initiation round to the specified
+// peer. An initiation round consists of:
+// * Contacting the peer to receive all the deltas based on the local genvector.
+// * Processing those deltas to discover objects which have been updated.
+// * Processing updated objects to detect and resolve any conflicts if needed.
+// * Communicating relevant object updates to the Database.
+// The processing of the deltas is done one Database at a time.
+func (s *syncService) getDeltasFromPeer(ctx *context.T, peer string) {
+	info := s.allMembers.members[peer]
+	if info == nil {
+		return
+	}
+	connected := false
+	var stream interfaces.SyncGetDeltasClientCall
+
+	// Sync each Database that may have SyncGroups common with this peer,
+	// one at a time.
+	for gdbName, sgInfo := range info.db2sg {
+
+		// Initialize initiation state for syncing this Database.
+		iSt, err := newInitiationState(ctx, s, peer, gdbName, sgInfo)
+		if err != nil {
+			vlog.Errorf("getDeltasFromPeer:: couldn't initialize initiator state for peer %s, gdb %s, err %v", peer, gdbName, err)
+			continue
+		}
+
+		if len(iSt.sgIds) == 0 || len(iSt.sgPfxs) == 0 {
+			vlog.Errorf("getDeltasFromPeer:: didn't find any SyncGroups for peer %s, gdb %s, err %v", peer, gdbName, err)
+			continue
+		}
+
+		// Make contact with the peer once.
+		if !connected {
+			stream, connected = iSt.connectToPeer(ctx)
+			if !connected {
+				// Try a different Database. Perhaps there are
+				// different mount tables.
+				continue
+			}
+		}
+
+		// Create local genvec so that it contains knowledge only about common prefixes.
+		if err := iSt.createLocalGenVec(ctx); err != nil {
+			vlog.Errorf("getDeltasFromPeer:: error creating local genvec for gdb %s, err %v", gdbName, err)
+			continue
+		}
+
+		iSt.stream = stream
+		req := interfaces.DeltaReq{SgIds: iSt.sgIds, InitVec: iSt.local}
+		sender := iSt.stream.SendStream()
+		sender.Send(req)
+
+		// Obtain deltas from the peer over the network.
+		if err := iSt.recvAndProcessDeltas(ctx); err != nil {
+			vlog.Errorf("getDeltasFromPeer:: error receiving deltas for gdb %s, err %v", gdbName, err)
+			// Returning here since something could be wrong with
+			// the connection, and no point in attempting the next
+			// Database.
+			return
+		}
+
+		if err := iSt.processUpdatedObjects(ctx); err != nil {
+			vlog.Errorf("getDeltasFromPeer:: error processing objects for gdb %s, err %v", gdbName, err)
+			// Move to the next Database even if processing updates
+			// failed.
+			continue
+		}
+	}
+
+	if connected {
+		stream.Finish()
+	}
+}
+
+// initiationState is accumulated for each Database during an initiation round.
+type initiationState struct {
+	// Relative name of the peer to sync with.
+	peer string
+
+	// Collection of mount tables that this peer may have registered with.
+	mtTables map[string]struct{}
+
+	// SyncGroups being requested in the initiation round.
+	sgIds map[interfaces.GroupId]struct{}
+
+	// SyncGroup prefixes being requested in the initiation round.
+	sgPfxs map[string]struct{}
+
+	// Local generation vector.
+	local interfaces.GenVector
+
+	// Generation vector from the remote peer.
+	remote interfaces.GenVector
+
+	// Updated local generation vector at the end of the initiation round.
+	updLocal interfaces.GenVector
+
+	// State to track updated objects during a log replay.
+	updObjects map[string]*objConflictState
+
+	// DAG state that tracks conflicts and common ancestors.
+	dagGraft graftMap
+
+	sync    *syncService
+	appName string
+	dbName  string
+	st      store.Store                        // Store handle to the Database.
+	stream  interfaces.SyncGetDeltasClientCall // Stream handle for the GetDeltas RPC.
+
+	// Transaction handle for the initiation round. Used during the update
+	// of objects in the Database.
+	tx store.Transaction
+}
+
+// objConflictState contains the conflict state for an object that is updated
+// during an initiator round.
+type objConflictState struct {
+	isConflict bool
+	newHead    string
+	oldHead    string
+	ancestor   string
+	res        *conflictResolution
+}
+
+// newInitiationState creates new initiation state.
+func newInitiationState(ctx *context.T, s *syncService, peer string, name string, sgInfo sgMemberInfo) (*initiationState, error) {
+	iSt := &initiationState{}
+	iSt.peer = peer
+	iSt.updObjects = make(map[string]*objConflictState)
+	iSt.dagGraft = newGraft()
+	iSt.sync = s
+
+	// TODO(hpucha): Would be nice to standardize on the combined "app:db"
+	// name across sync (not syncbase) so we only join split/join them at
+	// the boundary with the store part.
+	var err error
+	iSt.appName, iSt.dbName, err = splitAppDbName(ctx, name)
+	if err != nil {
+		return nil, err
+	}
+
+	// TODO(hpucha): nil rpc.ServerCall ok?
+	iSt.st, err = s.getDbStore(ctx, nil, iSt.appName, iSt.dbName)
+	if err != nil {
+		return nil, err
+	}
+
+	iSt.peerMtTblsAndSgInfo(ctx, peer, sgInfo)
+
+	return iSt, nil
+}
+
+// peerMtTblsAndSgInfo computes the possible mount tables, the SyncGroup Ids and
+// prefixes common with a remote peer in a particular Database by consulting the
+// SyncGroups in the specified Database.
+func (iSt *initiationState) peerMtTblsAndSgInfo(ctx *context.T, peer string, info sgMemberInfo) {
+	iSt.mtTables = make(map[string]struct{})
+	iSt.sgIds = make(map[interfaces.GroupId]struct{})
+	iSt.sgPfxs = make(map[string]struct{})
+
+	for id := range info {
+		sg, err := getSyncGroupById(ctx, iSt.st, id)
+		if err != nil {
+			continue
+		}
+		if _, ok := sg.Joiners[peer]; !ok {
+			// Peer is no longer part of the SyncGroup.
+			continue
+		}
+		for _, mt := range sg.Spec.MountTables {
+			iSt.mtTables[mt] = struct{}{}
+		}
+		iSt.sgIds[id] = struct{}{}
+
+		for _, p := range sg.Spec.Prefixes {
+			iSt.sgPfxs[p] = struct{}{}
+		}
+	}
+}
+
+// connectToPeer attempts to connect to the remote peer using the mount tables
+// obtained from the SyncGroups being synced in the current Database.
+func (iSt *initiationState) connectToPeer(ctx *context.T) (interfaces.SyncGetDeltasClientCall, bool) {
+	if len(iSt.mtTables) < 1 {
+		vlog.Errorf("getDeltasFromPeer:: no mount tables found to connect to peer %s, app %s db %s", iSt.peer, iSt.appName, iSt.dbName)
+		return nil, false
+	}
+	for mt := range iSt.mtTables {
+		c := interfaces.SyncClient(naming.Join(mt, iSt.peer))
+		stream, err := c.GetDeltas(ctx)
+		if err == nil {
+			return stream, true
+		}
+	}
+	return nil, false
+}
+
+// createLocalGenVec creates the generation vector with local knowledge for the
+// initiator to send to the responder.
+//
+// TODO(hpucha): Refactor this code with computeDelta code in sync_state.go.
+func (iSt *initiationState) createLocalGenVec(ctx *context.T) error {
+	// Freeze the most recent batch of local changes before fetching
+	// remote changes from a peer. This frozen state is used by the
+	// responder when responding to GetDeltas RPC.
+	//
+	// We only allow an initiator to freeze local generations (not
+	// responders/watcher) in order to maintain a static baseline
+	// for the duration of a sync. This addresses the following race
+	// condition: If we allow responders to use newer local
+	// generations while the initiator is in progress, they may beat
+	// the initiator and send these new generations to remote
+	// devices.  These remote devices in turn can send these
+	// generations back to the initiator in progress which was
+	// started with older generation information.
+	if err := iSt.sync.checkPtLocalGen(ctx, iSt.appName, iSt.dbName); err != nil {
+		return err
+	}
+
+	local, lgen, err := iSt.sync.getDbGenInfo(ctx, iSt.appName, iSt.dbName)
+	if err != nil {
+		return err
+	}
+	localPfxs := extractAndSortPrefixes(local)
+
+	sgPfxs := set.String.ToSlice(iSt.sgPfxs)
+	sort.Strings(sgPfxs)
+
+	iSt.local = make(interfaces.GenVector)
+
+	if len(sgPfxs) == 0 {
+		return verror.New(verror.ErrInternal, ctx, "no syncgroups for syncing")
+	}
+
+	pfx := sgPfxs[0]
+	for _, p := range sgPfxs {
+		if strings.HasPrefix(p, pfx) && p != pfx {
+			continue
+		}
+
+		// Process this prefix as this is the start of a new set of
+		// nested prefixes.
+		pfx = p
+		var lpStart string
+		for _, lp := range localPfxs {
+			if !strings.HasPrefix(lp, pfx) && !strings.HasPrefix(pfx, lp) {
+				// No relationship with pfx.
+				continue
+			}
+			if strings.HasPrefix(pfx, lp) {
+				lpStart = lp
+			} else {
+				iSt.local[lp] = local[lp]
+			}
+		}
+		// Deal with the starting point.
+		if lpStart == "" {
+			// No matching prefixes for pfx were found.
+			iSt.local[pfx] = make(interfaces.PrefixGenVector)
+			iSt.local[pfx][iSt.sync.id] = lgen
+		} else {
+			iSt.local[pfx] = local[lpStart]
+		}
+	}
+	return nil
+}
+
+// recvAndProcessDeltas first receives the log records and generation vector
+// from the GetDeltas RPC and puts them in the Database. It also replays the
+// entire log stream as the log records arrive. These records span multiple
+// generations from different devices. It does not perform any conflict
+// resolution during replay.  This avoids resolving conflicts that have already
+// been resolved by other devices.
+func (iSt *initiationState) recvAndProcessDeltas(ctx *context.T) error {
+	// TODO(hpucha): This works for now, but figure out a long term solution
+	// as this may be implementation dependent. It currently works because
+	// the RecvStream call is stateless, and grabbing a handle to it
+	// repeatedly doesn't affect what data is seen next.
+	rcvr := iSt.stream.RecvStream()
+	start, finish := false, false
+	tx := iSt.st.NewTransaction()
+	committed := false
+
+	defer func() {
+		if !committed {
+			tx.Abort()
+		}
+	}()
+
+	// Track received batches.
+	batchMap := make(map[uint64]uint64)
+
+	for rcvr.Advance() {
+		resp := rcvr.Value()
+		switch v := resp.(type) {
+		case interfaces.DeltaRespStart:
+			if start {
+				return verror.New(verror.ErrInternal, ctx, "received start followed by start in delta response stream")
+			}
+			start = true
+
+		case interfaces.DeltaRespFinish:
+			if finish {
+				return verror.New(verror.ErrInternal, ctx, "received finish followed by finish in delta response stream")
+			}
+			finish = true
+			break
+
+		case interfaces.DeltaRespRespVec:
+			iSt.remote = v.Value
+
+		case interfaces.DeltaRespRec:
+			// Insert log record in Database.
+			// TODO(hpucha): Should we reserve more postions in a batch?
+			// TODO(hpucha): Handle if SyncGroup is left/destroyed while sync is in progress.
+			pos := iSt.sync.reservePosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
+			rec := &localLogRec{Metadata: v.Value.Metadata, Pos: pos}
+			batchId := rec.Metadata.BatchId
+			if batchId != NoBatchId {
+				if cnt, ok := batchMap[batchId]; !ok {
+					if iSt.sync.startBatch(ctx, tx, batchId) != batchId {
+						return verror.New(verror.ErrInternal, ctx, "failed to create batch info")
+					}
+					batchMap[batchId] = rec.Metadata.BatchCount
+				} else if cnt != rec.Metadata.BatchCount {
+					return verror.New(verror.ErrInternal, ctx, "inconsistent counts for tid", batchId, cnt, rec.Metadata.BatchCount)
+				}
+			}
+
+			if err := iSt.insertRecInLogDagAndDb(ctx, rec, batchId, v.Value.Value, tx); err != nil {
+				return err
+			}
+			// Mark object dirty.
+			iSt.updObjects[rec.Metadata.ObjId] = &objConflictState{}
+		}
+	}
+
+	if !(start && finish) {
+		return verror.New(verror.ErrInternal, ctx, "didn't receive start/finish delimiters in delta response stream")
+	}
+
+	if err := rcvr.Err(); err != nil {
+		return err
+	}
+
+	// End the started batches if any.
+	for bid, cnt := range batchMap {
+		if err := iSt.sync.endBatch(ctx, tx, bid, cnt); err != nil {
+			return err
+		}
+	}
+
+	// Commit this transaction. We do not retry this transaction since it
+	// should not conflict with any other keys. So if it fails, it is a
+	// non-retriable error.
+	err := tx.Commit()
+	if err == nil {
+		committed = true
+	}
+	return err
+}
+
+// insertRecInLogDagAndDb adds a new log record to log and dag data structures,
+// and inserts the versioned value in the Database.
+func (iSt *initiationState) insertRecInLogDagAndDb(ctx *context.T, rec *localLogRec, batchId uint64, valbuf []byte, tx store.Transaction) error {
+	if err := putLogRec(ctx, tx, rec); err != nil {
+		return err
+	}
+
+	m := rec.Metadata
+	logKey := logRecKey(m.Id, m.Gen)
+
+	var err error
+	switch m.RecType {
+	case interfaces.NodeRec:
+		err = iSt.sync.addNode(ctx, tx, m.ObjId, m.CurVers, logKey, m.Delete, m.Parents, m.BatchId, iSt.dagGraft)
+	case interfaces.LinkRec:
+		err = iSt.sync.addParent(ctx, tx, m.ObjId, m.CurVers, m.Parents[0], iSt.dagGraft)
+	default:
+		err = verror.New(verror.ErrInternal, ctx, "unknown log record type")
+	}
+
+	if err != nil {
+		return err
+	}
+	// TODO(hpucha): Hack right now. Need to change Database's handling of
+	// deleted objects.
+	if !rec.Metadata.Delete && rec.Metadata.RecType == interfaces.NodeRec {
+		return watchable.PutAtVersion(ctx, tx, []byte(m.ObjId), valbuf, []byte(m.CurVers))
+	}
+	return nil
+}
+
+// processUpdatedObjects processes all the updates received by the initiator,
+// one object at a time. For each updated object, we first check if the object
+// has any conflicts, resulting in three possibilities:
+//
+// * There is no conflict, and no updates are needed to the Database
+// (isConflict=false, newHead == oldHead). All changes received convey
+// information that still keeps the local head as the most recent version. This
+// occurs when conflicts are resolved by picking the existing local version.
+//
+// * There is no conflict, but a remote version is discovered that builds on the
+// local head (isConflict=false, newHead != oldHead). In this case, we generate
+// a Database update to simply update the Database to the latest value.
+//
+// * There is a conflict and we call into the app or use a well-known policy to
+// resolve the conflict, resulting in three possibilties: (a) conflict was
+// resolved by picking the local version. In this case, Database need not be
+// updated, but a link is added to record the choice. (b) conflict was resolved
+// by picking the remote version. In this case, Database is updated with the
+// remote version and a link is added as well. (c) conflict was resolved by
+// generating a new Database update. In this case, Database is updated with the
+// new version.
+//
+// We collect all the updates to the Database in a transaction. In addition, as
+// part of the same transaction, we update the log and dag state suitably (move
+// the head ptr of the object in the dag to the latest version, and create a new
+// log record reflecting conflict resolution if any). Finally, we update the
+// sync state first on storage. This transaction's commit can fail since
+// preconditions on the objects may have been violated. In this case, we wait to
+// get the latest versions of objects from the Database, and recheck if the object
+// has any conflicts and repeat the above steps, until the transaction commits
+// successfully. Upon commit, we also update the in-memory sync state of the
+// Database.
+func (iSt *initiationState) processUpdatedObjects(ctx *context.T) error {
+	// Note that the tx handle in initiation state is cached for the scope of
+	// this function only as different stages in the pipeline add to the
+	// transaction.
+	committed := false
+	defer func() {
+		if !committed {
+			iSt.tx.Abort()
+		}
+	}()
+
+	for {
+		iSt.tx = iSt.st.NewTransaction()
+
+		if err := iSt.detectConflicts(ctx); err != nil {
+			return err
+		}
+
+		if err := iSt.resolveConflicts(ctx); err != nil {
+			return err
+		}
+
+		if err := iSt.updateDbAndSyncSt(ctx); err != nil {
+			return err
+		}
+
+		err := iSt.tx.Commit()
+		if err == nil {
+			committed = true
+			// Update in-memory genvector since commit is successful.
+			if err := iSt.sync.putDbGenInfoRemote(ctx, iSt.appName, iSt.dbName, iSt.updLocal); err != nil {
+				vlog.Fatalf("processUpdatedObjects:: putting geninfo in memory failed for app %s db %s, err %v", iSt.appName, iSt.dbName, err)
+			}
+			return nil
+		}
+
+		if verror.ErrorID(err) != store.ErrConcurrentTransaction.ID {
+			return err
+		}
+
+		// TODO(hpucha): Sleeping and retrying is a temporary
+		// solution. Next iteration will have coordination with watch
+		// thread to intelligently retry. Hence this value is not a
+		// config param.
+		iSt.tx.Abort()
+		time.Sleep(1 * time.Second)
+	}
+}
+
+// detectConflicts iterates through all the updated objects to detect conflicts.
+func (iSt *initiationState) detectConflicts(ctx *context.T) error {
+	for obj, st := range iSt.updObjects {
+		// Check if object has a conflict.
+		var err error
+		st.isConflict, st.newHead, st.oldHead, st.ancestor, err = hasConflict(ctx, iSt.tx, obj, iSt.dagGraft)
+		if err != nil {
+			return err
+		}
+
+		if !st.isConflict {
+			st.res = &conflictResolution{ty: pickRemote}
+		}
+	}
+	return nil
+}
+
+// updateDbAndSync updates the Database, and if that is successful, updates log,
+// dag and genvector data structures as needed.
+func (iSt *initiationState) updateDbAndSyncSt(ctx *context.T) error {
+	for obj, st := range iSt.updObjects {
+		// If the local version is picked, no further updates to the
+		// Database are needed.
+		if st.res.ty == pickLocal {
+			continue
+		}
+
+		// If the remote version is picked or if a new version is
+		// created, we put it in the Database.
+
+		// TODO(hpucha): Hack right now. Need to change Database's
+		// handling of deleted objects.
+		oldVersDeleted := true
+		if st.oldHead != NoVersion {
+			oldDagNode, err := getNode(ctx, iSt.tx, obj, st.oldHead)
+			if err != nil {
+				return err
+			}
+			oldVersDeleted = oldDagNode.Deleted
+		}
+
+		var newVersion string
+		var newVersDeleted bool
+		switch st.res.ty {
+		case pickRemote:
+			newVersion = st.newHead
+			newDagNode, err := getNode(ctx, iSt.tx, obj, newVersion)
+			if err != nil {
+				return err
+			}
+			newVersDeleted = newDagNode.Deleted
+		case createNew:
+			newVersion = st.res.rec.Metadata.CurVers
+			newVersDeleted = st.res.rec.Metadata.Delete
+		}
+
+		// Skip delete followed by a delete.
+		if oldVersDeleted && newVersDeleted {
+			continue
+		}
+
+		if !oldVersDeleted {
+			// Read current version to enter it in the readset of the transaction.
+			version, err := watchable.GetVersion(ctx, iSt.tx, []byte(obj))
+			if err != nil {
+				return err
+			}
+			if string(version) != st.oldHead {
+				return store.NewErrConcurrentTransaction(ctx)
+			}
+		} else {
+			// Ensure key doesn't exist.
+			if _, err := watchable.GetVersion(ctx, iSt.tx, []byte(obj)); verror.ErrorID(err) != store.ErrUnknownKey.ID {
+				return store.NewErrConcurrentTransaction(ctx)
+			}
+		}
+
+		if !newVersDeleted {
+			if st.res.ty == createNew {
+				if err := watchable.PutAtVersion(ctx, iSt.tx, []byte(obj), st.res.val, []byte(newVersion)); err != nil {
+					return err
+				}
+			}
+			if err := watchable.PutVersion(ctx, iSt.tx, []byte(obj), []byte(newVersion)); err != nil {
+				return err
+			}
+		} else {
+			if err := iSt.tx.Delete([]byte(obj)); err != nil {
+				return err
+			}
+		}
+
+		if err := iSt.updateLogAndDag(ctx, obj); err != nil {
+			return err
+		}
+	}
+
+	return iSt.updateSyncSt(ctx)
+}
+
+// updateLogAndDag updates the log and dag data structures.
+func (iSt *initiationState) updateLogAndDag(ctx *context.T, obj string) error {
+	st, ok := iSt.updObjects[obj]
+	if !ok {
+		return verror.New(verror.ErrInternal, ctx, "object state not found", obj)
+	}
+	var newVersion string
+
+	if !st.isConflict {
+		newVersion = st.newHead
+	} else {
+		// Object had a conflict. Create a log record to reflect resolution.
+		var rec *localLogRec
+
+		switch {
+		case st.res.ty == pickLocal:
+			// Local version was picked as the conflict resolution.
+			rec = iSt.createLocalLinkLogRec(ctx, obj, st.oldHead, st.newHead)
+			newVersion = st.oldHead
+		case st.res.ty == pickRemote:
+			// Remote version was picked as the conflict resolution.
+			rec = iSt.createLocalLinkLogRec(ctx, obj, st.newHead, st.oldHead)
+			newVersion = st.newHead
+		default:
+			// New version was created to resolve the conflict.
+			rec = st.res.rec
+			newVersion = st.res.rec.Metadata.CurVers
+		}
+
+		if err := putLogRec(ctx, iSt.tx, rec); err != nil {
+			return err
+		}
+
+		// Add a new DAG node.
+		var err error
+		m := rec.Metadata
+		switch m.RecType {
+		case interfaces.NodeRec:
+			err = iSt.sync.addNode(ctx, iSt.tx, obj, m.CurVers, logRecKey(m.Id, m.Gen), m.Delete, m.Parents, NoBatchId, nil)
+		case interfaces.LinkRec:
+			err = iSt.sync.addParent(ctx, iSt.tx, obj, m.CurVers, m.Parents[0], nil)
+		default:
+			return verror.New(verror.ErrInternal, ctx, "unknown log record type")
+		}
+		if err != nil {
+			return err
+		}
+	}
+
+	// Move the head. This should be idempotent. We may move head to the
+	// local head in some cases.
+	return moveHead(ctx, iSt.tx, obj, newVersion)
+}
+
+func (iSt *initiationState) createLocalLinkLogRec(ctx *context.T, obj, vers, par string) *localLogRec {
+	gen, pos := iSt.sync.reserveGenAndPosInDbLog(ctx, iSt.appName, iSt.dbName, 1)
+
+	rec := &localLogRec{
+		Metadata: interfaces.LogRecMetadata{
+			Id:      iSt.sync.id,
+			Gen:     gen,
+			RecType: interfaces.LinkRec,
+
+			ObjId:      obj,
+			CurVers:    vers,
+			Parents:    []string{par},
+			UpdTime:    time.Now().UTC(),
+			BatchId:    NoBatchId,
+			BatchCount: 1,
+			// TODO(hpucha): What is its batchid and count?
+		},
+		Pos: pos,
+	}
+	return rec
+}
+
+// updateSyncSt updates local sync state at the end of an initiator cycle.
+func (iSt *initiationState) updateSyncSt(ctx *context.T) error {
+	// Get the current local sync state.
+	dsInMem, err := iSt.sync.getDbSyncStateInMem(ctx, iSt.appName, iSt.dbName)
+	if err != nil {
+		return err
+	}
+	ds := &dbSyncState{
+		Gen:     dsInMem.gen,
+		CkPtGen: dsInMem.ckPtGen,
+		GenVec:  dsInMem.genvec,
+	}
+
+	// remote can be a subset of local.
+	for rpfx, respgv := range iSt.remote {
+		for lpfx, lpgv := range ds.GenVec {
+			if strings.HasPrefix(lpfx, rpfx) {
+				mergePrefixGenVectors(lpgv, respgv)
+			}
+		}
+		if _, ok := ds.GenVec[rpfx]; !ok {
+			ds.GenVec[rpfx] = respgv
+		}
+	}
+
+	iSt.updLocal = ds.GenVec
+	// Clean the genvector of any local state. Note that local state is held
+	// in gen/ckPtGen in sync state struct.
+	for _, pgv := range iSt.updLocal {
+		delete(pgv, iSt.sync.id)
+	}
+
+	// TODO(hpucha): Add knowledge compaction.
+
+	return putDbSyncState(ctx, iSt.tx, ds)
+}
+
+// mergePrefixGenVectors merges responder prefix genvector into local genvector.
+func mergePrefixGenVectors(lpgv, respgv interfaces.PrefixGenVector) {
+	for devid, rgen := range respgv {
+		gen, ok := lpgv[devid]
+		if !ok || gen < rgen {
+			lpgv[devid] = rgen
+		}
+	}
+}
+
+////////////////////////////////////////
+// Peer selection policies.
+
+// pickPeer picks a Syncbase to sync with.
+func (s *syncService) pickPeer(ctx *context.T) (string, error) {
+	switch peerSelectionPolicy {
+	case selectRandom:
+		members := s.getMembers(ctx)
+		// Remove myself from the set.
+		delete(members, s.name)
+		if len(members) == 0 {
+			return "", verror.New(verror.ErrInternal, ctx, "no useful peer")
+		}
+
+		// Pick a peer at random.
+		ind := randIntn(len(members))
+		for m := range members {
+			if ind == 0 {
+				return m, nil
+			}
+			ind--
+		}
+		return "", verror.New(verror.ErrInternal, ctx, "random selection didn't succeed")
+	default:
+		return "", verror.New(verror.ErrInternal, ctx, "unknown peer selection policy")
 	}
 }
diff --git a/x/ref/services/syncbase/vsync/initiator_test.go b/x/ref/services/syncbase/vsync/initiator_test.go
new file mode 100644
index 0000000..83fc8f4
--- /dev/null
+++ b/x/ref/services/syncbase/vsync/initiator_test.go
@@ -0,0 +1,295 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+import (
+	"fmt"
+	"reflect"
+	"testing"
+	"time"
+
+	"v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
+)
+
+// TestLogStreamRemoteOnly tests processing of a remote log stream. Commands are
+// in file testdata/remote-init-00.log.sync.
+func TestLogStreamRemoteOnly(t *testing.T) {
+	svc, iSt, cleanup := testInit(t, "", "remote-init-00.log.sync")
+	defer cleanup(t, svc)
+
+	// Check all log records.
+	objid := util.JoinKeyParts(util.RowPrefix, "foo1")
+	var gen uint64
+	var parents []string
+	for gen = 1; gen < 4; gen++ {
+		gotRec, err := getLogRec(nil, svc.St(), 11, gen)
+		if err != nil || gotRec == nil {
+			t.Fatalf("getLogRec can not find object 11 %d, err %v", gen, err)
+		}
+		vers := fmt.Sprintf("%d", gen)
+		wantRec := &localLogRec{
+			Metadata: interfaces.LogRecMetadata{
+				Id:         11,
+				Gen:        gen,
+				RecType:    interfaces.NodeRec,
+				ObjId:      objid,
+				CurVers:    vers,
+				Parents:    parents,
+				UpdTime:    constTime,
+				BatchCount: 1,
+			},
+			Pos: gen - 1,
+		}
+
+		if !reflect.DeepEqual(gotRec, wantRec) {
+			t.Fatalf("Data mismatch in log record got %v, want %v", gotRec, wantRec)
+		}
+		// Verify DAG state.
+		if _, err := getNode(nil, svc.St(), objid, vers); err != nil {
+			t.Fatalf("getNode can not find object %s vers %s in DAG, err %v", objid, vers, err)
+		}
+		// Verify Database state.
+		tx := svc.St().NewTransaction()
+		if _, err := watchable.GetAtVersion(nil, tx, []byte(objid), nil, []byte(vers)); err != nil {
+			t.Fatalf("GetAtVersion can not find object %s vers %s in Database, err %v", objid, vers, err)
+		}
+		tx.Abort()
+		parents = []string{vers}
+	}
+
+	// Verify conflict state.
+	if len(iSt.updObjects) != 1 {
+		t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
+	}
+	st := iSt.updObjects[objid]
+	if st.isConflict {
+		t.Fatalf("Detected a conflict %v", st)
+	}
+	if st.newHead != "3" || st.oldHead != NoVersion {
+		t.Fatalf("Conflict detection didn't succeed %v", st)
+	}
+
+	// Verify genvec state.
+	wantVec := interfaces.GenVector{
+		"foo1": interfaces.PrefixGenVector{11: 3},
+		"bar":  interfaces.PrefixGenVector{11: 0},
+	}
+	if !reflect.DeepEqual(iSt.updLocal, wantVec) {
+		t.Fatalf("Final local gen vec mismatch got %v, want %v", iSt.updLocal, wantVec)
+	}
+
+	// Verify DAG state.
+	if head, err := getHead(nil, svc.St(), objid); err != nil || head != "3" {
+		t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
+	}
+
+	// Verify Database state.
+	valbuf, err := svc.St().Get([]byte(objid), nil)
+	if err != nil || string(valbuf) != "abc" {
+		t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+	}
+	tx := svc.St().NewTransaction()
+	version, err := watchable.GetVersion(nil, tx, []byte(objid))
+	if err != nil || string(version) != "3" {
+		t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(version), err)
+	}
+	tx.Abort()
+}
+
+// TestLogStreamNoConflict tests that a local and a remote log stream can be
+// correctly applied (when there are no conflicts). Commands are in files
+// testdata/<local-init-00.log.sync,remote-noconf-00.log.sync>.
+func TestLogStreamNoConflict(t *testing.T) {
+	svc, iSt, cleanup := testInit(t, "local-init-00.log.sync", "remote-noconf-00.log.sync")
+	defer cleanup(t, svc)
+
+	objid := util.JoinKeyParts(util.RowPrefix, "foo1")
+
+	// Check all log records.
+	var version uint64 = 1
+	var parents []string
+	for _, devid := range []uint64{10, 11} {
+		var gen uint64
+		for gen = 1; gen < 4; gen++ {
+			gotRec, err := getLogRec(nil, svc.St(), devid, gen)
+			if err != nil || gotRec == nil {
+				t.Fatalf("getLogRec can not find object %d:%d, err %v",
+					devid, gen, err)
+			}
+			vers := fmt.Sprintf("%d", version)
+			wantRec := &localLogRec{
+				Metadata: interfaces.LogRecMetadata{
+					Id:         devid,
+					Gen:        gen,
+					RecType:    interfaces.NodeRec,
+					ObjId:      objid,
+					CurVers:    vers,
+					Parents:    parents,
+					UpdTime:    constTime,
+					BatchCount: 1,
+				},
+				Pos: gen - 1,
+			}
+
+			if !reflect.DeepEqual(gotRec, wantRec) {
+				t.Fatalf("Data mismatch in log record got %v, want %v", gotRec, wantRec)
+			}
+
+			// Verify DAG state.
+			if _, err := getNode(nil, svc.St(), objid, vers); err != nil {
+				t.Fatalf("getNode can not find object %s vers %s in DAG, err %v", objid, vers, err)
+			}
+			// Verify Database state.
+			tx := svc.St().NewTransaction()
+			if _, err := watchable.GetAtVersion(nil, tx, []byte(objid), nil, []byte(vers)); err != nil {
+				t.Fatalf("GetAtVersion can not find object %s vers %s in Database, err %v", objid, vers, err)
+			}
+			tx.Abort()
+			parents = []string{vers}
+			version++
+		}
+	}
+
+	// Verify conflict state.
+	if len(iSt.updObjects) != 1 {
+		t.Fatalf("Unexpected number of updated objects %d", len(iSt.updObjects))
+	}
+	st := iSt.updObjects[objid]
+	if st.isConflict {
+		t.Fatalf("Detected a conflict %v", st)
+	}
+	if st.newHead != "6" || st.oldHead != "3" {
+		t.Fatalf("Conflict detection didn't succeed %v", st)
+	}
+
+	// Verify genvec state.
+	wantVec := interfaces.GenVector{
+		"foo1": interfaces.PrefixGenVector{11: 3},
+		"bar":  interfaces.PrefixGenVector{11: 0},
+	}
+	if !reflect.DeepEqual(iSt.updLocal, wantVec) {
+		t.Fatalf("Final local gen vec failed got %v, want %v", iSt.updLocal, wantVec)
+	}
+
+	// Verify DAG state.
+	if head, err := getHead(nil, svc.St(), objid); err != nil || head != "6" {
+		t.Fatalf("Invalid object %s head in DAG %v, err %v", objid, head, err)
+	}
+
+	// Verify Database state.
+	valbuf, err := svc.St().Get([]byte(objid), nil)
+	if err != nil || string(valbuf) != "abc" {
+		t.Fatalf("Invalid object %s in Database %v, err %v", objid, string(valbuf), err)
+	}
+	tx := svc.St().NewTransaction()
+	versbuf, err := watchable.GetVersion(nil, tx, []byte(objid))
+	if err != nil || string(versbuf) != "6" {
+		t.Fatalf("Invalid object %s head in Database %v, err %v", objid, string(versbuf), err)
+	}
+	tx.Abort()
+}
+
+//////////////////////////////
+// Helpers.
+
+func testInit(t *testing.T, lfile, rfile string) (*mockService, *initiationState, func(*testing.T, *mockService)) {
+	// Set a large value to prevent the threads from firing.
+	// Test is not thread safe.
+	peerSyncInterval = 1 * time.Hour
+	conflictResolutionPolicy = useTime
+	svc := createService(t)
+	cleanup := destroyService
+	s := svc.sync
+	s.id = 10 // initiator
+
+	sgId1 := interfaces.GroupId(1234)
+	nullInfo := nosql.SyncGroupMemberInfo{}
+	sgInfo := sgMemberInfo{
+		sgId1: nullInfo,
+	}
+
+	sg1 := &interfaces.SyncGroup{
+		Name:        "sg1",
+		Id:          sgId1,
+		AppName:     "mockapp",
+		DbName:      "mockdb",
+		Creator:     "mockCreator",
+		SpecVersion: "etag-0",
+		Spec: nosql.SyncGroupSpec{
+			Prefixes:    []string{"foo", "bar"},
+			MountTables: []string{"1/2/3/4", "5/6/7/8"},
+		},
+		Joiners: map[string]nosql.SyncGroupMemberInfo{
+			"a": nullInfo,
+			"b": nullInfo,
+		},
+	}
+
+	tx := svc.St().NewTransaction()
+	if err := addSyncGroup(nil, tx, sg1); err != nil {
+		t.Fatalf("cannot add SyncGroup ID %d, err %v", sg1.Id, err)
+	}
+	if err := tx.Commit(); err != nil {
+		t.Fatalf("cannot commit adding SyncGroup ID %d, err %v", sg1.Id, err)
+	}
+
+	if lfile != "" {
+		replayLocalCommands(t, svc, lfile)
+	}
+
+	if rfile == "" {
+		return svc, nil, cleanup
+	}
+
+	gdb := appDbName("mockapp", "mockdb")
+	iSt, err := newInitiationState(nil, s, "b", gdb, sgInfo)
+	if err != nil {
+		t.Fatalf("newInitiationState failed with err %v", err)
+	}
+
+	testIfMapArrEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
+	testIfMapArrEqual(t, iSt.mtTables, sg1.Spec.MountTables)
+
+	s.syncState[gdb] = &dbSyncStateInMem{}
+
+	// Create local genvec so that it contains knowledge only about common prefixes.
+	if err := iSt.createLocalGenVec(nil); err != nil {
+		t.Fatalf("createLocalGenVec failed with err %v", err)
+	}
+
+	wantVec := interfaces.GenVector{
+		"foo": interfaces.PrefixGenVector{10: 0},
+		"bar": interfaces.PrefixGenVector{10: 0},
+	}
+	if !reflect.DeepEqual(iSt.local, wantVec) {
+		t.Fatalf("createLocalGenVec failed got %v, want %v", iSt.local, wantVec)
+	}
+
+	iSt.stream = createReplayStream(t, rfile)
+
+	if err := iSt.recvAndProcessDeltas(nil); err != nil {
+		t.Fatalf("recvAndProcessDeltas failed with err %v", err)
+	}
+
+	if err := iSt.processUpdatedObjects(nil); err != nil {
+		t.Fatalf("processUpdatedObjects failed with err %v", err)
+	}
+	return svc, iSt, cleanup
+}
+
+func testIfMapArrEqual(t *testing.T, m map[string]struct{}, a []string) {
+	if len(a) != len(m) {
+		t.Fatalf("testIfMapArrEqual diff lengths, got %v want %v", a, m)
+	}
+
+	for _, p := range a {
+		if _, ok := m[p]; !ok {
+			t.Fatalf("testIfMapArrEqual want %v", p)
+		}
+	}
+}
diff --git a/x/ref/services/syncbase/vsync/replay_test.go b/x/ref/services/syncbase/vsync/replay_test.go
index 470d086..98b7d20 100644
--- a/x/ref/services/syncbase/vsync/replay_test.go
+++ b/x/ref/services/syncbase/vsync/replay_test.go
@@ -10,11 +10,17 @@
 
 import (
 	"bufio"
+	"container/list"
 	"fmt"
 	"os"
 	"strconv"
 	"strings"
+	"testing"
+	"time"
 
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/v23/context"
 )
 
@@ -23,15 +29,23 @@
 	addRemote
 	linkLocal
 	linkRemote
+	genvec
+)
+
+var (
+	constTime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
 )
 
 type syncCommand struct {
-	cmd     int
-	oid     string
-	version string
-	parents []string
-	logrec  string
-	deleted bool
+	cmd        int
+	oid        string
+	version    string
+	parents    []string
+	logrec     string
+	deleted    bool
+	batchId    uint64
+	batchCount uint64
+	genVec     interfaces.GenVector
 }
 
 // parseSyncCommands parses a sync test file and returns its commands.
@@ -70,16 +84,26 @@
 				}
 			}
 
+			batchId, err := strconv.ParseUint(args[6], 10, 64)
+			if err != nil {
+				return nil, fmt.Errorf("%s:%d: invalid batchId: %s", file, lineno, args[6])
+			}
+			batchCount, err := strconv.ParseUint(args[7], 10, 64)
+			if err != nil {
+				return nil, fmt.Errorf("%s:%d: invalid batch count: %s", file, lineno, args[7])
+			}
 			del, err := strconv.ParseBool(args[8])
 			if err != nil {
 				return nil, fmt.Errorf("%s:%d: invalid deleted bit: %s", file, lineno, args[8])
 			}
 			cmd := syncCommand{
-				oid:     args[1],
-				version: args[2],
-				parents: parents,
-				logrec:  args[5],
-				deleted: del,
+				oid:        args[1],
+				version:    args[2],
+				parents:    parents,
+				logrec:     args[5],
+				batchId:    batchId,
+				batchCount: batchCount,
+				deleted:    del,
 			}
 			if args[0] == "addl" {
 				cmd.cmd = addLocal
@@ -116,6 +140,33 @@
 			}
 			cmds = append(cmds, cmd)
 
+		case "genvec":
+			cmd := syncCommand{
+				cmd:    genvec,
+				genVec: make(interfaces.GenVector),
+			}
+			for i := 1; i < len(args); i = i + 2 {
+				pfx := args[i]
+				genVec := make(interfaces.PrefixGenVector)
+				for _, elem := range strings.Split(args[i+1], ",") {
+					kv := strings.Split(elem, ":")
+					if len(kv) != 2 {
+						return nil, fmt.Errorf("%s:%d: invalid gen vector key/val: %s", file, lineno, elem)
+					}
+					dev, err := strconv.ParseUint(kv[0], 10, 64)
+					if err != nil {
+						return nil, fmt.Errorf("%s:%d: invalid devid: %s", file, lineno, args[i+1])
+					}
+					gen, err := strconv.ParseUint(kv[1], 10, 64)
+					if err != nil {
+						return nil, fmt.Errorf("%s:%d: invalid gen: %s", file, lineno, args[i+1])
+					}
+					genVec[dev] = gen
+				}
+				cmd.genVec[pfx] = genVec
+			}
+			cmds = append(cmds, cmd)
+
 		default:
 			return nil, fmt.Errorf("%s:%d: invalid operation: %s", file, lineno, args[0])
 		}
@@ -179,3 +230,163 @@
 
 	return graft, nil
 }
+
+// dummyStream emulates stream of log records received from RPC.
+type dummyStream struct {
+	l     *list.List
+	entry interfaces.DeltaResp
+}
+
+func newStream() *dummyStream {
+	ds := &dummyStream{
+		l: list.New(),
+	}
+	return ds
+}
+
+func (ds *dummyStream) add(entry interfaces.DeltaResp) {
+	ds.l.PushBack(entry)
+}
+
+func (ds *dummyStream) Advance() bool {
+	if ds.l.Len() > 0 {
+		ds.entry = ds.l.Remove(ds.l.Front()).(interfaces.DeltaResp)
+		return true
+	}
+	return false
+}
+
+func (ds *dummyStream) Value() interfaces.DeltaResp {
+	return ds.entry
+}
+
+func (ds *dummyStream) RecvStream() interface {
+	Advance() bool
+	Value() interfaces.DeltaResp
+	Err() error
+} {
+	return ds
+}
+
+func (*dummyStream) Err() error { return nil }
+
+func (ds *dummyStream) Finish() error {
+	return nil
+}
+
+func (ds *dummyStream) Cancel() {
+}
+
+func (ds *dummyStream) SendStream() interface {
+	Send(item interfaces.DeltaReq) error
+	Close() error
+} {
+	return ds
+}
+
+func (ds *dummyStream) Send(item interfaces.DeltaReq) error {
+	return nil
+}
+
+func (ds *dummyStream) Close() error {
+	return nil
+}
+
+// replayLocalCommands replays local log records parsed from the input file.
+func replayLocalCommands(t *testing.T, s *mockService, syncfile string) {
+	cmds, err := parseSyncCommands(syncfile)
+	if err != nil {
+		t.Fatalf("parseSyncCommands failed with err %v", err)
+	}
+
+	tx := s.St().NewTransaction()
+	var pos uint64
+	for _, cmd := range cmds {
+		switch cmd.cmd {
+		case addLocal:
+			rec := &localLogRec{
+				Metadata: createMetadata(t, interfaces.NodeRec, cmd),
+				Pos:      pos,
+			}
+			err = s.sync.processLocalLogRec(nil, tx, rec)
+			if err != nil {
+				t.Fatalf("processLocalLogRec failed with err %v", err)
+			}
+
+			// Add to Store.
+			err = watchable.PutVersion(nil, tx, []byte(rec.Metadata.ObjId), []byte(rec.Metadata.CurVers))
+			if err != nil {
+				t.Fatalf("PutVersion failed with err %v", err)
+			}
+			err = watchable.PutAtVersion(nil, tx, []byte(rec.Metadata.ObjId), []byte("abc"), []byte(rec.Metadata.CurVers))
+			if err != nil {
+				t.Fatalf("PutAtVersion failed with err %v", err)
+			}
+
+		default:
+			t.Fatalf("replayLocalCommands failed with unknown command %v", cmd)
+		}
+		pos++
+	}
+	if err := tx.Commit(); err != nil {
+		t.Fatalf("cannot commit local log records %s, err %v", syncfile, err)
+	}
+}
+
+// createReplayStream creates a dummy stream of log records parsed from the input file.
+func createReplayStream(t *testing.T, syncfile string) *dummyStream {
+	cmds, err := parseSyncCommands(syncfile)
+	if err != nil {
+		t.Fatalf("parseSyncCommands failed with err %v", err)
+	}
+
+	stream := newStream()
+	start := interfaces.DeltaRespStart{true}
+	stream.add(start)
+
+	for _, cmd := range cmds {
+		var ty byte
+		switch cmd.cmd {
+		case genvec:
+			gv := interfaces.DeltaRespRespVec{cmd.genVec}
+			stream.add(gv)
+			continue
+		case addRemote:
+			ty = interfaces.NodeRec
+		case linkRemote:
+			ty = interfaces.LinkRec
+		default:
+			t.Fatalf("createReplayStream unknown command %v", cmd)
+		}
+
+		rec := interfaces.DeltaRespRec{interfaces.LogRec{
+			Metadata: createMetadata(t, ty, cmd),
+			Value:    []byte("abc"),
+		}}
+
+		stream.add(rec)
+	}
+	fin := interfaces.DeltaRespFinish{true}
+	stream.add(fin)
+	return stream
+}
+
+func createMetadata(t *testing.T, ty byte, cmd syncCommand) interfaces.LogRecMetadata {
+	id, gen, err := splitLogRecKey(nil, cmd.logrec)
+	if err != nil {
+		t.Fatalf("createReplayStream splitLogRecKey failed, key %s, err %v", cmd.logrec, gen)
+	}
+	m := interfaces.LogRecMetadata{
+		Id:         id,
+		Gen:        gen,
+		RecType:    ty,
+		ObjId:      util.JoinKeyParts(util.RowPrefix, cmd.oid),
+		CurVers:    cmd.version,
+		Parents:    cmd.parents,
+		UpdTime:    constTime,
+		Delete:     cmd.deleted,
+		BatchId:    cmd.batchId,
+		BatchCount: cmd.batchCount,
+	}
+	return m
+}
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index 775d29f..ca2e3fc 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -82,6 +82,13 @@
 	return (uint64(rng.Int63()) << 1) | uint64(rng.Int63n(2))
 }
 
+// randIntn generates as an int, a non-negative pseudo-random number in [0,n).
+func randIntn(n int) int {
+	rngLock.Lock()
+	defer rngLock.Unlock()
+	return rng.Intn(n)
+}
+
 // New creates a new sync module.
 //
 // Concurrency: sync initializes two goroutines at startup: a "watcher" and an
@@ -125,11 +132,18 @@
 	go s.watchStore()
 
 	// Start initiator thread to periodically get deltas from peers.
-	go s.contactPeers()
+	go s.syncer(ctx)
 
 	return s, nil
 }
 
+// Close cleans up sync state.
+// TODO(hpucha): Hook it up to server shutdown of syncbased.
+func (s *syncService) Close() {
+	close(s.closed)
+	s.pending.Wait()
+}
+
 func NewSyncDatabase(db interfaces.Database) *syncDatabase {
 	return &syncDatabase{db: db}
 }
@@ -137,7 +151,7 @@
 ////////////////////////////////////////
 // Core sync method.
 
-func (s *syncService) GetDeltas(ctx *context.T, call rpc.ServerCall) error {
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
 	return verror.NewErrNotImplemented(ctx)
 }
 
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index fed4c9a..f025327 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -34,12 +34,12 @@
 	"container/heap"
 	"fmt"
 	"sort"
+	"strconv"
 	"strings"
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
-
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/verror"
@@ -47,9 +47,11 @@
 
 // dbSyncStateInMem represents the in-memory sync state of a Database.
 type dbSyncStateInMem struct {
-	gen    uint64
-	pos    uint64
-	genvec interfaces.GenVector // Note: Generation vector contains state from remote devices only.
+	gen uint64
+	pos uint64
+
+	ckPtGen uint64
+	genvec  interfaces.GenVector // Note: Generation vector contains state from remote devices only.
 }
 
 // initSync initializes the sync module during startup. It scans all the
@@ -136,6 +138,44 @@
 	return gen, pos
 }
 
+// checkPtLocalGen freezes the local generation number for the responder's use.
+func (s *syncService) checkPtLocalGen(ctx *context.T, appName, dbName string) error {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	name := appDbName(appName, dbName)
+	ds, ok := s.syncState[name]
+	if !ok {
+		return verror.New(verror.ErrInternal, ctx, "db state not found", name)
+	}
+
+	ds.ckPtGen = ds.gen
+	return nil
+}
+
+// getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
+func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	name := appDbName(appName, dbName)
+	ds, ok := s.syncState[name]
+	if !ok {
+		return nil, verror.New(verror.ErrInternal, ctx, "db state not found", name)
+	}
+
+	dsCopy := &dbSyncStateInMem{
+		gen:     ds.gen,
+		pos:     ds.pos,
+		ckPtGen: ds.ckPtGen,
+	}
+
+	// Make a copy of the genvec.
+	dsCopy.genvec = copyGenVec(ds.genvec)
+
+	return dsCopy, nil
+}
+
 // getDbGenInfo returns a copy of the current generation information of the Database.
 func (s *syncService) getDbGenInfo(ctx *context.T, appName, dbName string) (interfaces.GenVector, uint64, error) {
 	s.syncStateLock.Lock()
@@ -146,16 +186,33 @@
 	if !ok {
 		return nil, 0, verror.New(verror.ErrInternal, ctx, "db state not found", name)
 	}
+
 	// Make a copy of the genvec.
-	genvec := make(interfaces.GenVector)
-	for p, dspgv := range ds.genvec {
-		pgv := make(interfaces.PrefixGenVector)
-		for id, gen := range dspgv {
-			pgv[id] = gen
-		}
-		genvec[p] = pgv
+	genvec := copyGenVec(ds.genvec)
+
+	// Add local generation information to the genvec.
+	for _, gv := range genvec {
+		gv[s.id] = ds.ckPtGen
 	}
-	return genvec, ds.gen, nil
+
+	return genvec, ds.ckPtGen, nil
+}
+
+// putDbGenInfoRemote puts the current remote generation information of the Database.
+func (s *syncService) putDbGenInfoRemote(ctx *context.T, appName, dbName string, genvec interfaces.GenVector) error {
+	s.syncStateLock.Lock()
+	defer s.syncStateLock.Unlock()
+
+	name := appDbName(appName, dbName)
+	ds, ok := s.syncState[name]
+	if !ok {
+		return verror.New(verror.ErrInternal, ctx, "db state not found", name)
+	}
+
+	// Make a copy of the genvec.
+	ds.genvec = copyGenVec(genvec)
+
+	return nil
 }
 
 // appDbName combines the app and db names to return a globally unique name for
@@ -165,6 +222,28 @@
 	return util.JoinKeyParts(appName, dbName)
 }
 
+// splitAppDbName is the inverse of appDbName and returns app and db name from a
+// globally unique name for a Database.
+func splitAppDbName(ctx *context.T, name string) (string, string, error) {
+	parts := util.SplitKeyParts(name)
+	if len(parts) != 2 {
+		return "", "", verror.New(verror.ErrInternal, ctx, "invalid appDbName", name)
+	}
+	return parts[0], parts[1], nil
+}
+
+func copyGenVec(in interfaces.GenVector) interfaces.GenVector {
+	genvec := make(interfaces.GenVector)
+	for p, inpgv := range in {
+		pgv := make(interfaces.PrefixGenVector)
+		for id, gen := range inpgv {
+			pgv[id] = gen
+		}
+		genvec[p] = pgv
+	}
+	return genvec
+}
+
 ////////////////////////////////////////////////////////////
 // Low-level utility functions to access sync state.
 
@@ -204,6 +283,24 @@
 	return util.JoinKeyParts(util.SyncPrefix, "log", fmt.Sprintf("%x", id), fmt.Sprintf("%016x", gen))
 }
 
+// splitLogRecKey is the inverse of logRecKey and returns device id and generation number.
+func splitLogRecKey(ctx *context.T, key string) (uint64, uint64, error) {
+	parts := util.SplitKeyParts(key)
+	verr := verror.New(verror.ErrInternal, ctx, "invalid logreckey", key)
+	if len(parts) != 4 {
+		return 0, 0, verr
+	}
+	id, err := strconv.ParseUint(parts[2], 10, 64)
+	if err != nil {
+		return 0, 0, verr
+	}
+	gen, err := strconv.ParseUint(parts[3], 10, 64)
+	if err != nil {
+		return 0, 0, verr
+	}
+	return id, gen, nil
+}
+
 // hasLogRec returns true if the log record for (devid, gen) exists.
 func hasLogRec(st store.StoreReader, id, gen uint64) bool {
 	// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
@@ -399,9 +496,6 @@
 				// prefix genvectors to adjust the delta bound and
 				// include in outVec.
 				respgv = respVec[rp]
-				// Add local generation information to
-				// responder's genvec and returned genvec.
-				respgv[s.id] = respGen
 				s.diffPrefixGenVectors(respgv, initpgv, diff)
 				outVec[rp] = respgv
 			}
@@ -411,10 +505,10 @@
 		if rpStart == "" {
 			// No matching prefixes for pfx were found.
 			respgv = make(interfaces.PrefixGenVector)
+			respgv[s.id] = respGen
 		} else {
 			respgv = respVec[rpStart]
 		}
-		respgv[s.id] = respGen
 		s.diffPrefixGenVectors(respgv, initpgv, diff)
 		outVec[pfx] = respgv
 	}
diff --git a/x/ref/services/syncbase/vsync/sync_state_test.go b/x/ref/services/syncbase/vsync/sync_state_test.go
index 4a41e17..57c1f66 100644
--- a/x/ref/services/syncbase/vsync/sync_state_test.go
+++ b/x/ref/services/syncbase/vsync/sync_state_test.go
@@ -21,6 +21,7 @@
 // Database log.
 func TestReserveGenAndPos(t *testing.T) {
 	svc := createService(t)
+	defer destroyService(t, svc)
 	s := svc.sync
 
 	var wantGen, wantPos uint64 = 1, 0
@@ -42,6 +43,7 @@
 // TestPutGetDbSyncState tests setting and getting sync metadata.
 func TestPutGetDbSyncState(t *testing.T) {
 	svc := createService(t)
+	defer destroyService(t, svc)
 	st := svc.St()
 
 	checkDbSyncState(t, st, false, nil)
@@ -67,6 +69,7 @@
 // TestPutGetDelLogRec tests setting, getting, and deleting a log record.
 func TestPutGetDelLogRec(t *testing.T) {
 	svc := createService(t)
+	defer destroyService(t, svc)
 	st := svc.St()
 
 	var id uint64 = 10
@@ -113,6 +116,7 @@
 // TestDiffPrefixGenVectors tests diffing prefix gen vectors.
 func TestDiffPrefixGenVectors(t *testing.T) {
 	svc := createService(t)
+	defer destroyService(t, svc)
 	s := svc.sync
 	s.id = 10 //responder. Initiator is id 11.
 
@@ -437,7 +441,7 @@
 		s.id = 10 //responder.
 
 		wantDiff, wantVec := test.genDiff, test.outVec
-		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, genvec: test.respVec}
+		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, ckPtGen: test.respGen, genvec: test.respVec}
 
 		gotDiff, gotVec, err := s.computeDeltaBound(nil, appName, dbName, test.initVec)
 		if err != nil || !reflect.DeepEqual(gotVec, wantVec) {
@@ -487,6 +491,8 @@
 			t.Fatalf("sendDeltasPerDatabase failed (I: %v), (R: %v, %v), got %v, want %v err %v", test.initVec, test.respGen, test.respVec, gotVec, wantVec, err)
 		}
 		ts.diffLogRecs(t, wantRecs)
+
+		destroyService(t, svc)
 	}
 }
 
diff --git a/x/ref/services/syncbase/vsync/syncgroup_test.go b/x/ref/services/syncbase/vsync/syncgroup_test.go
index 31ad4c2..c37dba7 100644
--- a/x/ref/services/syncbase/vsync/syncgroup_test.go
+++ b/x/ref/services/syncbase/vsync/syncgroup_test.go
@@ -9,6 +9,7 @@
 import (
 	"reflect"
 	"testing"
+	"time"
 
 	"v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
@@ -41,6 +42,8 @@
 
 // TestAddSyncGroup tests adding SyncGroups.
 func TestAddSyncGroup(t *testing.T) {
+	// Set a large value to prevent the threads from firing.
+	peerSyncInterval = 1 * time.Hour
 	svc := createService(t)
 	defer destroyService(t, svc)
 	st := svc.St()
@@ -180,6 +183,8 @@
 
 // TestInvalidAddSyncGroup tests adding SyncGroups.
 func TestInvalidAddSyncGroup(t *testing.T) {
+	// Set a large value to prevent the threads from firing.
+	peerSyncInterval = 1 * time.Hour
 	svc := createService(t)
 	defer destroyService(t, svc)
 	st := svc.St()
@@ -214,6 +219,8 @@
 
 // TestDeleteSyncGroup tests deleting a SyncGroup.
 func TestDeleteSyncGroup(t *testing.T) {
+	// Set a large value to prevent the threads from firing.
+	peerSyncInterval = 1 * time.Hour
 	svc := createService(t)
 	defer destroyService(t, svc)
 	st := svc.St()
@@ -300,6 +307,8 @@
 
 // TestMultiSyncGroups tests creating multiple SyncGroups.
 func TestMultiSyncGroups(t *testing.T) {
+	// Set a large value to prevent the threads from firing.
+	peerSyncInterval = 1 * time.Hour
 	svc := createService(t)
 	defer destroyService(t, svc)
 	st := svc.St()
diff --git a/x/ref/services/syncbase/vsync/testdata/local-init-00.log.sync b/x/ref/services/syncbase/vsync/testdata/local-init-00.log.sync
index 46b3502..7435348 100644
--- a/x/ref/services/syncbase/vsync/testdata/local-init-00.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/local-init-00.log.sync
@@ -1,6 +1,6 @@
 # Create an object locally and update it twice (linked-list).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addl|1234|1|||logrec-00|0|1|false
-addl|1234|2|1||logrec-01|0|1|false
-addl|1234|3|2||logrec-02|0|1|false
+addl|foo1|1|||$sync:log:10:1|0|1|false
+addl|foo1|2|1||$sync:log:10:2|0|1|false
+addl|foo1|3|2||$sync:log:10:3|0|1|false
diff --git a/x/ref/services/syncbase/vsync/testdata/local-resolve-00.sync b/x/ref/services/syncbase/vsync/testdata/local-resolve-00.sync
index 2c87de5..1666cf0 100644
--- a/x/ref/services/syncbase/vsync/testdata/local-resolve-00.sync
+++ b/x/ref/services/syncbase/vsync/testdata/local-resolve-00.sync
@@ -1,4 +1,4 @@
 # Create an object locally and update it twice (linked-list).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addl|1234|7|3|6|logrec-06|0|1|false
+addl|foo1|7|3|6|logrec-06|0|1|false
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-conf-00.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-conf-00.log.sync
index 1f9bb5b..060cf0c 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-conf-00.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-conf-00.log.sync
@@ -3,6 +3,6 @@
 # it from the local sync at v2, then updated separately).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|1234|4|2||VeyronPhone:10:1:0|0|1|false
-addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
-addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
+addr|foo1|4|2||$sync:log:11:1|0|1|false
+addr|foo1|5|4||$sync:log:11:2|0|1|false
+addr|foo1|6|5||$sync:log:11:3|0|1|false
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-conf-01.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-conf-01.log.sync
index 9581f69..2053157 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-conf-01.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-conf-01.log.sync
@@ -5,6 +5,6 @@
 # sees 2 graft points: v1-v4 and v2-v5.
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|1234|4|1||VeyronLaptop:10:1:0|0|1|false
-addr|1234|5|2|4|VeyronPhone:10:1:0|0|1|false
-addr|1234|6|5||VeyronPhone:10:1:1|0|1|false
+addr|foo1|4|1||$sync:log:12:1|0|1|false
+addr|foo1|5|2|4|$sync:log:11:1|0|1|false
+addr|foo1|6|5||$sync:log:11:2|0|1|false
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-conf-03.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-conf-03.log.sync
index c293656..673405e 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-conf-03.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-conf-03.log.sync
@@ -1,6 +1,6 @@
 # Create the same object remotely from scratch and update it twice (linked-list).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|1234|4|||VeyronPhone:10:1:0|0|1|false
-addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
-addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
+addr|foo1|4|||$sync:log:11:1|0|1|false
+addr|foo1|5|4||$sync:log:11:2|0|1|false
+addr|foo1|6|5||$sync:log:11:3|0|1|false
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-conf-link.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-conf-link.log.sync
index a324e4f..bdf0331 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-conf-link.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-conf-link.log.sync
@@ -1,5 +1,5 @@
 # Update an object remotely, detect conflict, and bless the local version.
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
-linkr|1234|4|2||VeyronPhone:10:1:1
+addr|foo1|4|1||$sync:log:11:1|0|1|false
+linkr|foo1|4|2||$sync:log:11:2
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-init-00.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-init-00.log.sync
index 9795d53..2546c47 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-init-00.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-init-00.log.sync
@@ -1,6 +1,7 @@
 # Create an object remotely and update it twice (linked-list).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|1234|1|||VeyronPhone:10:1:0|0|1|false
-addr|1234|2|1||VeyronPhone:10:1:1|0|1|false
-addr|1234|3|2||VeyronPhone:10:1:2|0|1|false
+addr|foo1|1|||$sync:log:11:1|0|1|false
+addr|foo1|2|1||$sync:log:11:2|0|1|false
+addr|foo1|3|2||$sync:log:11:3|0|1|false
+genvec|foo1|10:0,11:3|bar|11:0
\ No newline at end of file
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-noconf-00.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
index e2e2afa..6adf5dd 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-noconf-00.log.sync
@@ -3,6 +3,7 @@
 # received it from the local sync first, then updated it).
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|1234|4|3||VeyronPhone:10:1:0|0|1|false
-addr|1234|5|4||VeyronPhone:10:1:1|0|1|false
-addr|1234|6|5||VeyronPhone:10:1:2|0|1|false
+addr|foo1|4|3||$sync:log:11:1|0|1|false
+addr|foo1|5|4||$sync:log:11:2|0|1|false
+addr|foo1|6|5||$sync:log:11:3|0|1|false
+genvec|foo1|10:0,11:3|bar|11:0
\ No newline at end of file
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
index 6945bb2..a06bec5 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-00.log.sync
@@ -1,5 +1,5 @@
 # Update an object remotely, detect conflict, and bless the remote version.
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
-linkr|1234|2|4||VeyronPhone:10:1:1
+addr|foo1|4|1||$sync:log:11:1|0|1|false
+linkr|foo1|2|4||$sync:log:11:2
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
index 0c6969e..1271e23 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-01.log.sync
@@ -1,5 +1,5 @@
 # Update an object remotely, detect conflict, and bless the local version.
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
-linkr|1234|4|3||VeyronPhone:10:1:1
+addr|foo1|4|1||$sync:log:11:1|0|1|false
+linkr|foo1|4|3||$sync:log:11:2
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
index df9e128..890d2bc 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-02.log.sync
@@ -1,6 +1,6 @@
 # Update an object remotely, detect conflict, and bless the remote version, and continue updating.
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-addr|1234|4|1||VeyronPhone:10:1:0|0|1|false
-linkr|1234|3|4||VeyronPhone:10:1:1
-addr|1234|5|3||VeyronPhone:10:2:0|0|1|false
+addr|foo1|4|1||$sync:log:11:1|0|1|false
+linkr|foo1|3|4||$sync:log:11:2
+addr|foo1|5|3||$sync:log:11:3|0|1|false
diff --git a/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync b/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
index 82e11c6..31e85a9 100644
--- a/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
+++ b/x/ref/services/syncbase/vsync/testdata/remote-noconf-link-repeat.log.sync
@@ -1,4 +1,4 @@
 # Resolve the same conflict on two different devices.
 # The format is: <cmd>|<objid>|<version>|<parent1>|<parent2>|<logrec>|<txid>|<txcount>|<deleted>
 
-linkr|1234|3|4||VeyronLaptop:10:1:0
+linkr|foo1|3|4||$sync:log:12:1
\ No newline at end of file
diff --git a/x/ref/services/syncbase/vsync/types.vdl b/x/ref/services/syncbase/vsync/types.vdl
index d061f4a..226f739 100644
--- a/x/ref/services/syncbase/vsync/types.vdl
+++ b/x/ref/services/syncbase/vsync/types.vdl
@@ -15,8 +15,9 @@
 
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
-	Gen    uint64               // local generation number incremented on every local update.
-	GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+	Gen     uint64               // local generation number incremented on every local update.
+	CkPtGen uint64               // local generation number advertised to remote peers (used by the responder).
+	GenVec  interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
 }
 
 // localLogRec represents the persistent local state of a log record. Metadata
diff --git a/x/ref/services/syncbase/vsync/types.vdl.go b/x/ref/services/syncbase/vsync/types.vdl.go
index 54c0466..ae51caa 100644
--- a/x/ref/services/syncbase/vsync/types.vdl.go
+++ b/x/ref/services/syncbase/vsync/types.vdl.go
@@ -27,8 +27,9 @@
 
 // dbSyncState represents the persistent sync state of a Database.
 type dbSyncState struct {
-	Gen    uint64               // local generation number incremented on every local update.
-	GenVec interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
+	Gen     uint64               // local generation number incremented on every local update.
+	CkPtGen uint64               // local generation number advertised to remote peers (used by the responder).
+	GenVec  interfaces.GenVector // generation vector capturing the locally-known generations of remote peers.
 }
 
 func (dbSyncState) __VDLReflect(struct {
diff --git a/x/ref/services/syncbase/vsync/util_test.go b/x/ref/services/syncbase/vsync/util_test.go
index cbaad6c..0848d35 100644
--- a/x/ref/services/syncbase/vsync/util_test.go
+++ b/x/ref/services/syncbase/vsync/util_test.go
@@ -14,20 +14,24 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
+	_ "v.io/x/ref/runtime/factories/generic"
+	"v.io/x/ref/test"
 )
 
 // mockService emulates a Syncbase service that includes store and sync.
 // It is used to access a mock application.
 type mockService struct {
-	engine string
-	path   string
-	st     store.Store
-	sync   *syncService
+	engine   string
+	path     string
+	st       store.Store
+	sync     *syncService
+	shutdown func()
 }
 
 func (s *mockService) St() store.Store {
@@ -114,6 +118,7 @@
 
 // createService creates a mock Syncbase service used for testing sync functionality.
 func createService(t *testing.T) *mockService {
+	ctx, shutdown := test.V23Init()
 	engine := "leveldb"
 	path := fmt.Sprintf("%s/vsync_test_%d_%d", os.TempDir(), os.Getpid(), time.Now().UnixNano())
 
@@ -121,13 +126,17 @@
 	if err != nil {
 		t.Fatalf("cannot create store %s (%s): %v", engine, path, err)
 	}
+	st, err = watchable.Wrap(st, &watchable.Options{
+		ManagedPrefixes: []string{util.RowPrefix},
+	})
 
 	s := &mockService{
-		st:     st,
-		engine: engine,
-		path:   path,
+		st:       st,
+		engine:   engine,
+		path:     path,
+		shutdown: shutdown,
 	}
-	if s.sync, err = New(nil, nil, s); err != nil {
+	if s.sync, err = New(ctx, nil, s); err != nil {
 		util.DestroyStore(engine, path)
 		t.Fatalf("cannot create sync service: %v", err)
 	}
@@ -136,6 +145,8 @@
 
 // destroyService cleans up the mock Syncbase service.
 func destroyService(t *testing.T, s *mockService) {
+	defer s.shutdown()
+	defer s.sync.Close()
 	if err := util.DestroyStore(s.engine, s.path); err != nil {
 		t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.path, err)
 	}