Merge "internal storage engine: improve compile-time typing"
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 470d5b9..ee15a16 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -22,7 +22,7 @@
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
// next Database in common with this responder.
- GetDeltas() stream<DeltaReq, DeltaResp> error {access.Read}
+ GetDeltas(initiator string) stream<DeltaReq, DeltaResp> error {access.Read}
// SyncGroup-related methods.
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index adb964f..d378170 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -36,7 +36,7 @@
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
// next Database in common with this responder.
- GetDeltas(*context.T, ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+ GetDeltas(ctx *context.T, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
@@ -65,9 +65,9 @@
name string
}
-func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
var call rpc.ClientCall
- if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", nil, opts...); err != nil {
+ if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0}, opts...); err != nil {
return
}
ocall = &implSyncGetDeltasClientCall{ClientCall: call}
@@ -207,7 +207,7 @@
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
// next Database in common with this responder.
- GetDeltas(*context.T, SyncGetDeltasServerCall) error
+ GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, initiator string) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -236,7 +236,7 @@
// record. The initiator parses the stream between a Start and a Finish
// record as the response to its DeltaReq, and then moves on to the
// next Database in common with this responder.
- GetDeltas(*context.T, *SyncGetDeltasServerCallStub) error
+ GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, initiator string) error
// PublishSyncGroup is typically invoked on a "central" peer to publish
// the SyncGroup.
PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -279,8 +279,8 @@
gs *rpc.GlobState
}
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub) error {
- return s.impl.GetDeltas(ctx, call)
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 string) error {
+ return s.impl.GetDeltas(ctx, call, i0)
}
func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
@@ -315,6 +315,9 @@
{
Name: "GetDeltas",
Doc: "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
+ InArgs: []rpc.ArgDesc{
+ {"initiator", ``}, // string
+ },
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
{
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index e237277..5d3d9b8 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -300,7 +300,7 @@
for mt := range iSt.mtTables {
absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix)
c := interfaces.SyncClient(absName)
- stream, err := c.GetDeltas(ctx)
+ stream, err := c.GetDeltas(ctx, iSt.sync.name)
if err == nil {
vlog.VI(3).Infof("sync: connectToPeer: established on %s", absName)
return stream, true
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 8d5b384..8ba9cd9 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -9,6 +9,7 @@
"sort"
"strings"
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -19,9 +20,9 @@
)
// GetDeltas implements the responder side of the GetDeltas RPC.
-func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
- vlog.VI(2).Infof("sync: GetDeltas: begin")
- defer vlog.VI(2).Infof("sync: GetDeltas: end")
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, initiator string) error {
+ vlog.VI(2).Infof("sync: GetDeltas: begin: from initiator %s", initiator)
+ defer vlog.VI(2).Infof("sync: GetDeltas: end: from initiator %s", initiator)
recvr := call.RecvStream()
for recvr.Advance() {
@@ -31,7 +32,7 @@
// the failure might be genuine. For example, the responder is
// no longer part of the requested SyncGroups, or the app/db is
// locally deleted, or a permission change has denied access.
- rSt := newResponderState(ctx, call, s, req)
+ rSt := newResponderState(ctx, call, s, req, initiator)
rSt.sendDeltasPerDatabase(ctx)
}
@@ -42,17 +43,18 @@
// responderState is state accumulated per Database by the responder during an
// initiation round.
type responderState struct {
- req interfaces.DeltaReq
- call interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
- errState error // Captures the error from the first two phases of the responder.
- sync *syncService
- st store.Store // Store handle to the Database.
- diff genRangeVector
- outVec interfaces.GenVector
+ req interfaces.DeltaReq
+ call interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
+ initiator string
+ errState error // Captures the error from the first two phases of the responder.
+ sync *syncService
+ st store.Store // Store handle to the Database.
+ diff genRangeVector
+ outVec interfaces.GenVector
}
-func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq) *responderState {
- rSt := &responderState{call: call, sync: sync, req: req}
+func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq, initiator string) *responderState {
+ rSt := &responderState{call: call, sync: sync, req: req, initiator: initiator}
return rSt
}
@@ -138,6 +140,12 @@
for _, p := range sg.Spec.Prefixes {
allowedPfxs[p] = struct{}{}
}
+
+ // Add the initiator to the SyncGroup membership if not already
+ // in it. It is a temporary solution until SyncGroup metadata
+ // is synchronized peer to peer.
+ // TODO(rdaoud): remove this when SyncGroups are synced.
+ rSt.addInitiatorToSyncGroup(ctx, sgid)
}
// Filter the initiator's prefixes to what is allowed.
@@ -159,6 +167,39 @@
return
}
+// addInitiatorToSyncGroup adds the request initiator to the membership of the
+// given SyncGroup if the initiator is not already a member. It is a temporary
+// solution until SyncGroup metadata starts being synchronized, at which time
+// peers will learn of new members through mutations of the SyncGroup metadata
+// by the SyncGroup administrators.
+// Note: the joiner metadata is fake because the responder does not have it.
+func (rSt *responderState) addInitiatorToSyncGroup(ctx *context.T, gid interfaces.GroupId) {
+ if rSt.initiator == "" {
+ return
+ }
+
+ err := store.RunInTransaction(rSt.st, func(tx store.StoreReadWriter) error {
+ sg, err := getSyncGroupById(ctx, tx, gid)
+ if err != nil {
+ return err
+ }
+
+ // If the initiator is already a member of the SyncGroup abort
+ // the transaction with a special error code.
+ if _, ok := sg.Joiners[rSt.initiator]; ok {
+ return verror.New(verror.ErrExist, ctx, "member already in SyncGroup")
+ }
+
+ vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
+ sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
+ return setSGDataEntry(ctx, tx, gid, sg)
+ })
+
+ if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
+ vlog.Errorf("sync: addInitiatorToSyncGroup: initiator %s, sgid %d: %v", rSt.initiator, gid, err)
+ }
+}
+
// computeDeltaBound computes the bound on missing generations across all
// requested prefixes (phase 2 of sendDeltas).
func (rSt *responderState) computeDeltaBound(ctx *context.T) {
diff --git a/services/syncbase/vsync/responder_test.go b/services/syncbase/vsync/responder_test.go
index cf67ca8..986f9b3 100644
--- a/services/syncbase/vsync/responder_test.go
+++ b/services/syncbase/vsync/responder_test.go
@@ -111,7 +111,7 @@
for _, test := range tests {
want := test.genDiffWant
got := test.genDiffIn
- rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{})
+ rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{}, "fakeInitiator")
rSt.diff = got
rSt.diffPrefixGenVectors(test.respPVec, test.initPVec)
checkEqualDevRanges(t, got, want)
@@ -353,7 +353,7 @@
s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, checkptGen: test.respGen, genvec: test.respVec}
req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
- rSt := newResponderState(nil, nil, s, req)
+ rSt := newResponderState(nil, nil, s, req, "fakeInitiator")
rSt.computeDeltaBound(nil)
if rSt.errState != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 2f341e4..bf4e3aa 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -618,16 +618,15 @@
return err
}
- gid, err := getSyncGroupId(ctx, tx, sgName)
+ sg, err := getSyncGroupByName(ctx, tx, sgName)
if err != nil {
return err
}
- sg, err := getSyncGroupById(ctx, tx, gid)
// TODO(hpucha): Check SyncGroup ACL. Perform version checking.
sg.Spec = spec
- return setSGDataEntry(ctx, tx, gid, sg)
+ return setSGDataEntry(ctx, tx, sg.Id, sg)
})
return err
}