syncbase/vsync: Initiator and responder changes to support SyncGroup syncing.

Change-Id: Idfa588829d294754585047888848577e0493a738
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index a3d7f7f..6dd6f25 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -14,15 +14,8 @@
 type Sync interface {
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
-	// vector. This process happens one Database at a time encompassing all
-	// the SyncGroups common to the initiator and the responder. For each
-	// Database, the initiator sends a DeltaReq. In response, the
-	// responder sends a "Start" DeltaResp record, all the missing log
-	// records, the responder's genvector, and a "Finish" DeltaResp
-	// record. The initiator parses the stream between a Start and a Finish
-	// record as the response to its DeltaReq, and then moves on to the
-	// next Database in common with this responder.
-	GetDeltas(initiator string) stream<DeltaReq, DeltaResp> error {access.Read}
+	// vector for one Database for either SyncGroup metadata or data.
+	GetDeltas(req DeltaReq, initiator string) stream<_, DeltaResp> error {access.Read}
 
 	// SyncGroup-related methods.
 
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index a281543..3006f3c 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -28,15 +28,8 @@
 type SyncClientMethods interface {
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
-	// vector. This process happens one Database at a time encompassing all
-	// the SyncGroups common to the initiator and the responder. For each
-	// Database, the initiator sends a DeltaReq. In response, the
-	// responder sends a "Start" DeltaResp record, all the missing log
-	// records, the responder's genvector, and a "Finish" DeltaResp
-	// record. The initiator parses the stream between a Start and a Finish
-	// record as the response to its DeltaReq, and then moves on to the
-	// next Database in common with this responder.
-	GetDeltas(ctx *context.T, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+	// vector for one Database for either SyncGroup metadata or data.
+	GetDeltas(ctx *context.T, req DeltaReq, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
@@ -78,9 +71,9 @@
 	name string
 }
 
-func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 DeltaReq, i1 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
 	var call rpc.ClientCall
-	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0}, opts...); err != nil {
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0, i1}, opts...); err != nil {
 		return
 	}
 	ocall = &implSyncGetDeltasClientCall{ClientCall: call}
@@ -143,30 +136,13 @@
 		// Err returns any error encountered by Advance.  Never blocks.
 		Err() error
 	}
-	// SendStream returns the send side of the Sync.GetDeltas client stream.
-	SendStream() interface {
-		// Send places the item onto the output stream.  Returns errors
-		// encountered while sending, or if Send is called after Close or
-		// the stream has been canceled.  Blocks if there is no buffer
-		// space; will unblock when buffer space is available or after
-		// the stream has been canceled.
-		Send(item DeltaReq) error
-		// Close indicates to the server that no more items will be sent;
-		// server Recv calls will receive io.EOF after all sent items.
-		// This is an optional call - e.g. a client might call Close if it
-		// needs to continue receiving items from the server after it's
-		// done sending.  Returns errors encountered while closing, or if
-		// Close is called after the stream has been canceled.  Like Send,
-		// blocks if there is no buffer space available.
-		Close() error
-	}
 }
 
 // SyncGetDeltasClientCall represents the call returned from Sync.GetDeltas.
 type SyncGetDeltasClientCall interface {
 	SyncGetDeltasClientStream
-	// Finish performs the equivalent of SendStream().Close, then blocks until
-	// the server is done, and returns the positional return values for the call.
+	// Finish blocks until the server is done, and returns the positional return
+	// values for call.
 	//
 	// Finish returns immediately if the call has been canceled; depending on the
 	// timing the output could either be an error signaling cancelation, or the
@@ -209,23 +185,6 @@
 	}
 	return c.c.errRecv
 }
-func (c *implSyncGetDeltasClientCall) SendStream() interface {
-	Send(item DeltaReq) error
-	Close() error
-} {
-	return implSyncGetDeltasClientCallSend{c}
-}
-
-type implSyncGetDeltasClientCallSend struct {
-	c *implSyncGetDeltasClientCall
-}
-
-func (c implSyncGetDeltasClientCallSend) Send(item DeltaReq) error {
-	return c.c.Send(item)
-}
-func (c implSyncGetDeltasClientCallSend) Close() error {
-	return c.c.CloseSend()
-}
 func (c *implSyncGetDeltasClientCall) Finish() (err error) {
 	err = c.ClientCall.Finish()
 	return
@@ -479,15 +438,8 @@
 type SyncServerMethods interface {
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
-	// vector. This process happens one Database at a time encompassing all
-	// the SyncGroups common to the initiator and the responder. For each
-	// Database, the initiator sends a DeltaReq. In response, the
-	// responder sends a "Start" DeltaResp record, all the missing log
-	// records, the responder's genvector, and a "Finish" DeltaResp
-	// record. The initiator parses the stream between a Start and a Finish
-	// record as the response to its DeltaReq, and then moves on to the
-	// next Database in common with this responder.
-	GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, initiator string) error
+	// vector for one Database for either SyncGroup metadata or data.
+	GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, req DeltaReq, initiator string) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -521,15 +473,8 @@
 type SyncServerStubMethods interface {
 	// GetDeltas returns the responder's current generation vector and all
 	// the missing log records when compared to the initiator's generation
-	// vector. This process happens one Database at a time encompassing all
-	// the SyncGroups common to the initiator and the responder. For each
-	// Database, the initiator sends a DeltaReq. In response, the
-	// responder sends a "Start" DeltaResp record, all the missing log
-	// records, the responder's genvector, and a "Finish" DeltaResp
-	// record. The initiator parses the stream between a Start and a Finish
-	// record as the response to its DeltaReq, and then moves on to the
-	// next Database in common with this responder.
-	GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, initiator string) error
+	// vector for one Database for either SyncGroup metadata or data.
+	GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, req DeltaReq, initiator string) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -585,8 +530,8 @@
 	gs   *rpc.GlobState
 }
 
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 string) error {
-	return s.impl.GetDeltas(ctx, call, i0)
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 DeltaReq, i1 string) error {
+	return s.impl.GetDeltas(ctx, call, i0, i1)
 }
 
 func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
@@ -632,8 +577,9 @@
 	Methods: []rpc.MethodDesc{
 		{
 			Name: "GetDeltas",
-			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
+			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector for one Database for either SyncGroup metadata or data.",
 			InArgs: []rpc.ArgDesc{
+				{"req", ``},       // DeltaReq
 				{"initiator", ``}, // string
 			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
@@ -691,18 +637,6 @@
 
 // SyncGetDeltasServerStream is the server stream for Sync.GetDeltas.
 type SyncGetDeltasServerStream interface {
-	// RecvStream returns the receiver side of the Sync.GetDeltas server stream.
-	RecvStream() interface {
-		// Advance stages an item so that it may be retrieved via Value.  Returns
-		// true iff there is an item to retrieve.  Advance must be called before
-		// Value is called.  May block if an item is not available.
-		Advance() bool
-		// Value returns the item that was staged by Advance.  May panic if Advance
-		// returned false or was not called.  Never blocks.
-		Value() DeltaReq
-		// Err returns any error encountered by Advance.  Never blocks.
-		Err() error
-	}
 	// SendStream returns the send side of the Sync.GetDeltas server stream.
 	SendStream() interface {
 		// Send places the item onto the output stream.  Returns errors encountered
@@ -722,8 +656,6 @@
 // a typesafe stub that implements SyncGetDeltasServerCall.
 type SyncGetDeltasServerCallStub struct {
 	rpc.StreamServerCall
-	valRecv DeltaReq
-	errRecv error
 }
 
 // Init initializes SyncGetDeltasServerCallStub from rpc.StreamServerCall.
@@ -731,34 +663,6 @@
 	s.StreamServerCall = call
 }
 
-// RecvStream returns the receiver side of the Sync.GetDeltas server stream.
-func (s *SyncGetDeltasServerCallStub) RecvStream() interface {
-	Advance() bool
-	Value() DeltaReq
-	Err() error
-} {
-	return implSyncGetDeltasServerCallRecv{s}
-}
-
-type implSyncGetDeltasServerCallRecv struct {
-	s *SyncGetDeltasServerCallStub
-}
-
-func (s implSyncGetDeltasServerCallRecv) Advance() bool {
-	s.s.valRecv = DeltaReq{}
-	s.s.errRecv = s.s.Recv(&s.s.valRecv)
-	return s.s.errRecv == nil
-}
-func (s implSyncGetDeltasServerCallRecv) Value() DeltaReq {
-	return s.s.valRecv
-}
-func (s implSyncGetDeltasServerCallRecv) Err() error {
-	if s.s.errRecv == io.EOF {
-		return nil
-	}
-	return s.s.errRecv
-}
-
 // SendStream returns the send side of the Sync.GetDeltas server stream.
 func (s *SyncGetDeltasServerCallStub) SendStream() interface {
 	Send(item DeltaResp) error
diff --git a/services/syncbase/server/interfaces/sync_types.go b/services/syncbase/server/interfaces/sync_types.go
new file mode 100644
index 0000000..39732b1
--- /dev/null
+++ b/services/syncbase/server/interfaces/sync_types.go
@@ -0,0 +1,21 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+func (in GenVector) DeepCopy() GenVector {
+	out := make(GenVector)
+	for p, inpgv := range in {
+		out[p] = inpgv.DeepCopy()
+	}
+	return out
+}
+
+func (in PrefixGenVector) DeepCopy() PrefixGenVector {
+	out := make(PrefixGenVector)
+	for id, gen := range in {
+		out[id] = gen
+	}
+	return out
+}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 692a731..65a7f32 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -29,6 +29,7 @@
 
 // PrefixGenVector is the generation vector for a data prefix, which maps each
 // device id to its last locally known generation in the scope of that prefix.
+// TODO(hpucha): Rename this type.
 type PrefixGenVector map[uint64]uint64
 
 // GenVector is the generation vector for a Database, and maps prefixes to their
@@ -70,6 +71,7 @@
 }
 
 // GroupId is a globally unique SyncGroup ID.
+// TODO(hpucha): Make this a string since now the syncgroup id is an object id.
 type GroupId uint64
 
 // Possible states for a SyncGroup.
@@ -99,21 +101,35 @@
 	Joiners     map[string]wire.SyncGroupMemberInfo // map of joiners to their metadata
 }
 
-// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested in within a Database (specified by the AppName/DbName) when
+// DeltaReq contains a request to sync either data or SyncGroup metadata for a
+// Database.
+type DeltaReq union {
+	Sgs SgDeltaReq
+	Data DataDeltaReq
+}
+
+// DataDeltaReq contains the initiator's genvector and the set of SyncGroups it
+// is interested in within a Database (specified by the AppName/DbName) when
 // requesting deltas for that Database.
-type DeltaReq struct {
+type DataDeltaReq struct {
 	AppName string
 	DbName string
 	SgIds   set[GroupId]
 	InitVec GenVector
 }
 
+// SgDeltaReq contains the initiator's genvector for the SyncGroups it is
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for those SyncGroups.
+type SgDeltaReq struct {
+	AppName string
+	DbName string
+	InitVec GenVector // Contains a genvector per SyncGroup.
+}
+
 // DeltaResp contains the responder's genvector or the missing log records
 // returned in response to an initiator's request for deltas for a Database.
 type DeltaResp union {
-	Start   bool
-	Finish  bool
 	Rec     LogRec
 	RespVec GenVector
 }
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index 1fb1396..ca29abc 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -20,6 +20,7 @@
 
 // PrefixGenVector is the generation vector for a data prefix, which maps each
 // device id to its last locally known generation in the scope of that prefix.
+// TODO(hpucha): Rename this type.
 type PrefixGenVector map[uint64]uint64
 
 func (PrefixGenVector) __VDLReflect(struct {
@@ -78,6 +79,7 @@
 }
 
 // GroupId is a globally unique SyncGroup ID.
+// TODO(hpucha): Make this a string since now the syncgroup id is an object id.
 type GroupId uint64
 
 func (GroupId) __VDLReflect(struct {
@@ -157,18 +159,72 @@
 }) {
 }
 
-// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested in within a Database (specified by the AppName/DbName) when
+type (
+	// DeltaReq represents any single field of the DeltaReq union type.
+	//
+	// DeltaReq contains a request to sync either data or SyncGroup metadata for a
+	// Database.
+	DeltaReq interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the DeltaReq union type.
+		__VDLReflect(__DeltaReqReflect)
+	}
+	// DeltaReqSgs represents field Sgs of the DeltaReq union type.
+	DeltaReqSgs struct{ Value SgDeltaReq }
+	// DeltaReqData represents field Data of the DeltaReq union type.
+	DeltaReqData struct{ Value DataDeltaReq }
+	// __DeltaReqReflect describes the DeltaReq union type.
+	__DeltaReqReflect struct {
+		Name  string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+		Type  DeltaReq
+		Union struct {
+			Sgs  DeltaReqSgs
+			Data DeltaReqData
+		}
+	}
+)
+
+func (x DeltaReqSgs) Index() int                     { return 0 }
+func (x DeltaReqSgs) Interface() interface{}         { return x.Value }
+func (x DeltaReqSgs) Name() string                   { return "Sgs" }
+func (x DeltaReqSgs) __VDLReflect(__DeltaReqReflect) {}
+
+func (x DeltaReqData) Index() int                     { return 1 }
+func (x DeltaReqData) Interface() interface{}         { return x.Value }
+func (x DeltaReqData) Name() string                   { return "Data" }
+func (x DeltaReqData) __VDLReflect(__DeltaReqReflect) {}
+
+// DataDeltaReq contains the initiator's genvector and the set of SyncGroups it
+// is interested in within a Database (specified by the AppName/DbName) when
 // requesting deltas for that Database.
-type DeltaReq struct {
+type DataDeltaReq struct {
 	AppName string
 	DbName  string
 	SgIds   map[GroupId]struct{}
 	InitVec GenVector
 }
 
-func (DeltaReq) __VDLReflect(struct {
-	Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+func (DataDeltaReq) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DataDeltaReq"`
+}) {
+}
+
+// SgDeltaReq contains the initiator's genvector for the SyncGroups it is
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for those SyncGroups.
+type SgDeltaReq struct {
+	AppName string
+	DbName  string
+	InitVec GenVector // Contains a genvector per SyncGroup.
+}
+
+func (SgDeltaReq) __VDLReflect(struct {
+	Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.SgDeltaReq"`
 }) {
 }
 
@@ -187,10 +243,6 @@
 		// __VDLReflect describes the DeltaResp union type.
 		__VDLReflect(__DeltaRespReflect)
 	}
-	// DeltaRespStart represents field Start of the DeltaResp union type.
-	DeltaRespStart struct{ Value bool }
-	// DeltaRespFinish represents field Finish of the DeltaResp union type.
-	DeltaRespFinish struct{ Value bool }
 	// DeltaRespRec represents field Rec of the DeltaResp union type.
 	DeltaRespRec struct{ Value LogRec }
 	// DeltaRespRespVec represents field RespVec of the DeltaResp union type.
@@ -200,30 +252,18 @@
 		Name  string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaResp"`
 		Type  DeltaResp
 		Union struct {
-			Start   DeltaRespStart
-			Finish  DeltaRespFinish
 			Rec     DeltaRespRec
 			RespVec DeltaRespRespVec
 		}
 	}
 )
 
-func (x DeltaRespStart) Index() int                      { return 0 }
-func (x DeltaRespStart) Interface() interface{}          { return x.Value }
-func (x DeltaRespStart) Name() string                    { return "Start" }
-func (x DeltaRespStart) __VDLReflect(__DeltaRespReflect) {}
-
-func (x DeltaRespFinish) Index() int                      { return 1 }
-func (x DeltaRespFinish) Interface() interface{}          { return x.Value }
-func (x DeltaRespFinish) Name() string                    { return "Finish" }
-func (x DeltaRespFinish) __VDLReflect(__DeltaRespReflect) {}
-
-func (x DeltaRespRec) Index() int                      { return 2 }
+func (x DeltaRespRec) Index() int                      { return 0 }
 func (x DeltaRespRec) Interface() interface{}          { return x.Value }
 func (x DeltaRespRec) Name() string                    { return "Rec" }
 func (x DeltaRespRec) __VDLReflect(__DeltaRespReflect) {}
 
-func (x DeltaRespRespVec) Index() int                      { return 3 }
+func (x DeltaRespRespVec) Index() int                      { return 1 }
 func (x DeltaRespRespVec) Interface() interface{}          { return x.Value }
 func (x DeltaRespRespVec) Name() string                    { return "RespVec" }
 func (x DeltaRespRespVec) __VDLReflect(__DeltaRespReflect) {}
@@ -257,6 +297,8 @@
 	vdl.Register((*SyncGroupStatus)(nil))
 	vdl.Register((*SyncGroup)(nil))
 	vdl.Register((*DeltaReq)(nil))
+	vdl.Register((*DataDeltaReq)(nil))
+	vdl.Register((*SgDeltaReq)(nil))
 	vdl.Register((*DeltaResp)(nil))
 	vdl.Register((*ChunkHash)(nil))
 	vdl.Register((*ChunkData)(nil))