syncbase/vsync: Initiator and responder changes to support SyncGroup syncing.
Change-Id: Idfa588829d294754585047888848577e0493a738
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index a3d7f7f..6dd6f25 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -14,15 +14,8 @@
type Sync interface {
// GetDeltas returns the responder's current generation vector and all
// the missing log records when compared to the initiator's generation
- // vector. This process happens one Database at a time encompassing all
- // the SyncGroups common to the initiator and the responder. For each
- // Database, the initiator sends a DeltaReq. In response, the
- // responder sends a "Start" DeltaResp record, all the missing log
- // records, the responder's genvector, and a "Finish" DeltaResp
- // record. The initiator parses the stream between a Start and a Finish
- // record as the response to its DeltaReq, and then moves on to the
- // next Database in common with this responder.
- GetDeltas(initiator string) stream<DeltaReq, DeltaResp> error {access.Read}
+ // vector for one Database for either SyncGroup metadata or data.
+ GetDeltas(req DeltaReq, initiator string) stream<_, DeltaResp> error {access.Read}
// SyncGroup-related methods.
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index a281543..3006f3c 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -28,15 +28,8 @@
type SyncClientMethods interface {
// GetDeltas returns the responder's current generation vector and all
// the missing log records when compared to the initiator's generation
- // vector. This process happens one Database at a time encompassing all
- // the SyncGroups common to the initiator and the responder. For each
- // Database, the initiator sends a DeltaReq. In response, the
- // responder sends a "Start" DeltaResp record, all the missing log
- // records, the responder's genvector, and a "Finish" DeltaResp
- // record. The initiator parses the stream between a Start and a Finish
- // record as the response to its DeltaReq, and then moves on to the
- // next Database in common with this responder.
- GetDeltas(ctx *context.T, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+ // vector for one Database for either SyncGroup metadata or data.
+ GetDeltas(ctx *context.T, req DeltaReq, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
@@ -78,9 +71,9 @@
name string
}
-func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 DeltaReq, i1 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
var call rpc.ClientCall
- if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0}, opts...); err != nil {
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0, i1}, opts...); err != nil {
return
}
ocall = &implSyncGetDeltasClientCall{ClientCall: call}
@@ -143,30 +136,13 @@
// Err returns any error encountered by Advance. Never blocks.
Err() error
}
- // SendStream returns the send side of the Sync.GetDeltas client stream.
- SendStream() interface {
- // Send places the item onto the output stream. Returns errors
- // encountered while sending, or if Send is called after Close or
- // the stream has been canceled. Blocks if there is no buffer
- // space; will unblock when buffer space is available or after
- // the stream has been canceled.
- Send(item DeltaReq) error
- // Close indicates to the server that no more items will be sent;
- // server Recv calls will receive io.EOF after all sent items.
- // This is an optional call - e.g. a client might call Close if it
- // needs to continue receiving items from the server after it's
- // done sending. Returns errors encountered while closing, or if
- // Close is called after the stream has been canceled. Like Send,
- // blocks if there is no buffer space available.
- Close() error
- }
}
// SyncGetDeltasClientCall represents the call returned from Sync.GetDeltas.
type SyncGetDeltasClientCall interface {
SyncGetDeltasClientStream
- // Finish performs the equivalent of SendStream().Close, then blocks until
- // the server is done, and returns the positional return values for the call.
+ // Finish blocks until the server is done, and returns the positional return
+ // values for call.
//
// Finish returns immediately if the call has been canceled; depending on the
// timing the output could either be an error signaling cancelation, or the
@@ -209,23 +185,6 @@
}
return c.c.errRecv
}
-func (c *implSyncGetDeltasClientCall) SendStream() interface {
- Send(item DeltaReq) error
- Close() error
-} {
- return implSyncGetDeltasClientCallSend{c}
-}
-
-type implSyncGetDeltasClientCallSend struct {
- c *implSyncGetDeltasClientCall
-}
-
-func (c implSyncGetDeltasClientCallSend) Send(item DeltaReq) error {
- return c.c.Send(item)
-}
-func (c implSyncGetDeltasClientCallSend) Close() error {
- return c.c.CloseSend()
-}
func (c *implSyncGetDeltasClientCall) Finish() (err error) {
err = c.ClientCall.Finish()
return
@@ -479,15 +438,8 @@
type SyncServerMethods interface {
// GetDeltas returns the responder's current generation vector and all
// the missing log records when compared to the initiator's generation
- // vector. This process happens one Database at a time encompassing all
- // the SyncGroups common to the initiator and the responder. For each
- // Database, the initiator sends a DeltaReq. In response, the
- // responder sends a "Start" DeltaResp record, all the missing log
- // records, the responder's genvector, and a "Finish" DeltaResp
- // record. The initiator parses the stream between a Start and a Finish
- // record as the response to its DeltaReq, and then moves on to the
- // next Database in common with this responder.
- GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, initiator string) error
+ // vector for one Database for either SyncGroup metadata or data.
+ GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, req DeltaReq, initiator string) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -521,15 +473,8 @@
type SyncServerStubMethods interface {
// GetDeltas returns the responder's current generation vector and all
// the missing log records when compared to the initiator's generation
- // vector. This process happens one Database at a time encompassing all
- // the SyncGroups common to the initiator and the responder. For each
- // Database, the initiator sends a DeltaReq. In response, the
- // responder sends a "Start" DeltaResp record, all the missing log
- // records, the responder's genvector, and a "Finish" DeltaResp
- // record. The initiator parses the stream between a Start and a Finish
- // record as the response to its DeltaReq, and then moves on to the
- // next Database in common with this responder.
- GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, initiator string) error
+ // vector for one Database for either SyncGroup metadata or data.
+ GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, req DeltaReq, initiator string) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -585,8 +530,8 @@
gs *rpc.GlobState
}
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 string) error {
- return s.impl.GetDeltas(ctx, call, i0)
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 DeltaReq, i1 string) error {
+ return s.impl.GetDeltas(ctx, call, i0, i1)
}
func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
@@ -632,8 +577,9 @@
Methods: []rpc.MethodDesc{
{
Name: "GetDeltas",
- Doc: "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
+ Doc: "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector for one Database for either SyncGroup metadata or data.",
InArgs: []rpc.ArgDesc{
+ {"req", ``}, // DeltaReq
{"initiator", ``}, // string
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
@@ -691,18 +637,6 @@
// SyncGetDeltasServerStream is the server stream for Sync.GetDeltas.
type SyncGetDeltasServerStream interface {
- // RecvStream returns the receiver side of the Sync.GetDeltas server stream.
- RecvStream() interface {
- // Advance stages an item so that it may be retrieved via Value. Returns
- // true iff there is an item to retrieve. Advance must be called before
- // Value is called. May block if an item is not available.
- Advance() bool
- // Value returns the item that was staged by Advance. May panic if Advance
- // returned false or was not called. Never blocks.
- Value() DeltaReq
- // Err returns any error encountered by Advance. Never blocks.
- Err() error
- }
// SendStream returns the send side of the Sync.GetDeltas server stream.
SendStream() interface {
// Send places the item onto the output stream. Returns errors encountered
@@ -722,8 +656,6 @@
// a typesafe stub that implements SyncGetDeltasServerCall.
type SyncGetDeltasServerCallStub struct {
rpc.StreamServerCall
- valRecv DeltaReq
- errRecv error
}
// Init initializes SyncGetDeltasServerCallStub from rpc.StreamServerCall.
@@ -731,34 +663,6 @@
s.StreamServerCall = call
}
-// RecvStream returns the receiver side of the Sync.GetDeltas server stream.
-func (s *SyncGetDeltasServerCallStub) RecvStream() interface {
- Advance() bool
- Value() DeltaReq
- Err() error
-} {
- return implSyncGetDeltasServerCallRecv{s}
-}
-
-type implSyncGetDeltasServerCallRecv struct {
- s *SyncGetDeltasServerCallStub
-}
-
-func (s implSyncGetDeltasServerCallRecv) Advance() bool {
- s.s.valRecv = DeltaReq{}
- s.s.errRecv = s.s.Recv(&s.s.valRecv)
- return s.s.errRecv == nil
-}
-func (s implSyncGetDeltasServerCallRecv) Value() DeltaReq {
- return s.s.valRecv
-}
-func (s implSyncGetDeltasServerCallRecv) Err() error {
- if s.s.errRecv == io.EOF {
- return nil
- }
- return s.s.errRecv
-}
-
// SendStream returns the send side of the Sync.GetDeltas server stream.
func (s *SyncGetDeltasServerCallStub) SendStream() interface {
Send(item DeltaResp) error
diff --git a/services/syncbase/server/interfaces/sync_types.go b/services/syncbase/server/interfaces/sync_types.go
new file mode 100644
index 0000000..39732b1
--- /dev/null
+++ b/services/syncbase/server/interfaces/sync_types.go
@@ -0,0 +1,21 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+func (in GenVector) DeepCopy() GenVector {
+ out := make(GenVector)
+ for p, inpgv := range in {
+ out[p] = inpgv.DeepCopy()
+ }
+ return out
+}
+
+func (in PrefixGenVector) DeepCopy() PrefixGenVector {
+ out := make(PrefixGenVector)
+ for id, gen := range in {
+ out[id] = gen
+ }
+ return out
+}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
index 692a731..65a7f32 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -29,6 +29,7 @@
// PrefixGenVector is the generation vector for a data prefix, which maps each
// device id to its last locally known generation in the scope of that prefix.
+// TODO(hpucha): Rename this type.
type PrefixGenVector map[uint64]uint64
// GenVector is the generation vector for a Database, and maps prefixes to their
@@ -70,6 +71,7 @@
}
// GroupId is a globally unique SyncGroup ID.
+// TODO(hpucha): Make this a string since now the syncgroup id is an object id.
type GroupId uint64
// Possible states for a SyncGroup.
@@ -99,21 +101,35 @@
Joiners map[string]wire.SyncGroupMemberInfo // map of joiners to their metadata
}
-// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested in within a Database (specified by the AppName/DbName) when
+// DeltaReq contains a request to sync either data or SyncGroup metadata for a
+// Database.
+type DeltaReq union {
+ Sgs SgDeltaReq
+ Data DataDeltaReq
+}
+
+// DataDeltaReq contains the initiator's genvector and the set of SyncGroups it
+// is interested in within a Database (specified by the AppName/DbName) when
// requesting deltas for that Database.
-type DeltaReq struct {
+type DataDeltaReq struct {
AppName string
DbName string
SgIds set[GroupId]
InitVec GenVector
}
+// SgDeltaReq contains the initiator's genvector for the SyncGroups it is
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for those SyncGroups.
+type SgDeltaReq struct {
+ AppName string
+ DbName string
+ InitVec GenVector // Contains a genvector per SyncGroup.
+}
+
// DeltaResp contains the responder's genvector or the missing log records
// returned in response to an initiator's request for deltas for a Database.
type DeltaResp union {
- Start bool
- Finish bool
Rec LogRec
RespVec GenVector
}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
index 1fb1396..ca29abc 100644
--- a/services/syncbase/server/interfaces/sync_types.vdl.go
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -20,6 +20,7 @@
// PrefixGenVector is the generation vector for a data prefix, which maps each
// device id to its last locally known generation in the scope of that prefix.
+// TODO(hpucha): Rename this type.
type PrefixGenVector map[uint64]uint64
func (PrefixGenVector) __VDLReflect(struct {
@@ -78,6 +79,7 @@
}
// GroupId is a globally unique SyncGroup ID.
+// TODO(hpucha): Make this a string since now the syncgroup id is an object id.
type GroupId uint64
func (GroupId) __VDLReflect(struct {
@@ -157,18 +159,72 @@
}) {
}
-// DeltaReq contains the initiator's genvector and the set of SyncGroups it is
-// interested in within a Database (specified by the AppName/DbName) when
+type (
+ // DeltaReq represents any single field of the DeltaReq union type.
+ //
+ // DeltaReq contains a request to sync either data or SyncGroup metadata for a
+ // Database.
+ DeltaReq interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the DeltaReq union type.
+ __VDLReflect(__DeltaReqReflect)
+ }
+ // DeltaReqSgs represents field Sgs of the DeltaReq union type.
+ DeltaReqSgs struct{ Value SgDeltaReq }
+ // DeltaReqData represents field Data of the DeltaReq union type.
+ DeltaReqData struct{ Value DataDeltaReq }
+ // __DeltaReqReflect describes the DeltaReq union type.
+ __DeltaReqReflect struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+ Type DeltaReq
+ Union struct {
+ Sgs DeltaReqSgs
+ Data DeltaReqData
+ }
+ }
+)
+
+func (x DeltaReqSgs) Index() int { return 0 }
+func (x DeltaReqSgs) Interface() interface{} { return x.Value }
+func (x DeltaReqSgs) Name() string { return "Sgs" }
+func (x DeltaReqSgs) __VDLReflect(__DeltaReqReflect) {}
+
+func (x DeltaReqData) Index() int { return 1 }
+func (x DeltaReqData) Interface() interface{} { return x.Value }
+func (x DeltaReqData) Name() string { return "Data" }
+func (x DeltaReqData) __VDLReflect(__DeltaReqReflect) {}
+
+// DataDeltaReq contains the initiator's genvector and the set of SyncGroups it
+// is interested in within a Database (specified by the AppName/DbName) when
// requesting deltas for that Database.
-type DeltaReq struct {
+type DataDeltaReq struct {
AppName string
DbName string
SgIds map[GroupId]struct{}
InitVec GenVector
}
-func (DeltaReq) __VDLReflect(struct {
- Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaReq"`
+func (DataDeltaReq) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DataDeltaReq"`
+}) {
+}
+
+// SgDeltaReq contains the initiator's genvector for the SyncGroups it is
+// interested in within a Database (specified by the AppName/DbName) when
+// requesting deltas for those SyncGroups.
+type SgDeltaReq struct {
+ AppName string
+ DbName string
+ InitVec GenVector // Contains a genvector per SyncGroup.
+}
+
+func (SgDeltaReq) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.SgDeltaReq"`
}) {
}
@@ -187,10 +243,6 @@
// __VDLReflect describes the DeltaResp union type.
__VDLReflect(__DeltaRespReflect)
}
- // DeltaRespStart represents field Start of the DeltaResp union type.
- DeltaRespStart struct{ Value bool }
- // DeltaRespFinish represents field Finish of the DeltaResp union type.
- DeltaRespFinish struct{ Value bool }
// DeltaRespRec represents field Rec of the DeltaResp union type.
DeltaRespRec struct{ Value LogRec }
// DeltaRespRespVec represents field RespVec of the DeltaResp union type.
@@ -200,30 +252,18 @@
Name string `vdl:"v.io/x/ref/services/syncbase/server/interfaces.DeltaResp"`
Type DeltaResp
Union struct {
- Start DeltaRespStart
- Finish DeltaRespFinish
Rec DeltaRespRec
RespVec DeltaRespRespVec
}
}
)
-func (x DeltaRespStart) Index() int { return 0 }
-func (x DeltaRespStart) Interface() interface{} { return x.Value }
-func (x DeltaRespStart) Name() string { return "Start" }
-func (x DeltaRespStart) __VDLReflect(__DeltaRespReflect) {}
-
-func (x DeltaRespFinish) Index() int { return 1 }
-func (x DeltaRespFinish) Interface() interface{} { return x.Value }
-func (x DeltaRespFinish) Name() string { return "Finish" }
-func (x DeltaRespFinish) __VDLReflect(__DeltaRespReflect) {}
-
-func (x DeltaRespRec) Index() int { return 2 }
+func (x DeltaRespRec) Index() int { return 0 }
func (x DeltaRespRec) Interface() interface{} { return x.Value }
func (x DeltaRespRec) Name() string { return "Rec" }
func (x DeltaRespRec) __VDLReflect(__DeltaRespReflect) {}
-func (x DeltaRespRespVec) Index() int { return 3 }
+func (x DeltaRespRespVec) Index() int { return 1 }
func (x DeltaRespRespVec) Interface() interface{} { return x.Value }
func (x DeltaRespRespVec) Name() string { return "RespVec" }
func (x DeltaRespRespVec) __VDLReflect(__DeltaRespReflect) {}
@@ -257,6 +297,8 @@
vdl.Register((*SyncGroupStatus)(nil))
vdl.Register((*SyncGroup)(nil))
vdl.Register((*DeltaReq)(nil))
+ vdl.Register((*DataDeltaReq)(nil))
+ vdl.Register((*SgDeltaReq)(nil))
vdl.Register((*DeltaResp)(nil))
vdl.Register((*ChunkHash)(nil))
vdl.Register((*ChunkData)(nil))