syncbase/vsync: responder discovers initiator

Make the responder learn the name of the initiator during a p2p sync
operation and add the initiator to the SyncGroup memberships of all
common SyncGroups that pass the ACL checks, if the initiator is not
already a member.

This is a temporary workaround until the SyncGroup metadata (which
includes the membership set) starts being synchronized as well.  Until
then, early joiners who are not SyncGroup admins cannot discover later
joiners and start synchronizing with them (initiating to them).  This
issues shows up when testing with 3 or more Syncbases.  This workaround
allows the early joiners to auto-learn of the late joiners when they
receive GetDeltas() requests from them.

This code will be removed when p2p SyncGroup sync is implemented and all
SyncGroup members learn of all SyncGroup mutations (membership updates
as well as SG ACL changes) through the regular p2p sync protocol.

Change-Id: I9cf58cb4291e3443e7519d21630d71827e80ba9f
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl b/x/ref/services/syncbase/server/interfaces/sync.vdl
index 470d5b9..ee15a16 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl
@@ -22,7 +22,7 @@
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
 	// next Database in common with this responder.
-	GetDeltas() stream<DeltaReq, DeltaResp> error {access.Read}
+	GetDeltas(initiator string) stream<DeltaReq, DeltaResp> error {access.Read}
 
 	// SyncGroup-related methods.
 
diff --git a/x/ref/services/syncbase/server/interfaces/sync.vdl.go b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
index adb964f..d378170 100644
--- a/x/ref/services/syncbase/server/interfaces/sync.vdl.go
+++ b/x/ref/services/syncbase/server/interfaces/sync.vdl.go
@@ -36,7 +36,7 @@
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
 	// next Database in common with this responder.
-	GetDeltas(*context.T, ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+	GetDeltas(ctx *context.T, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
@@ -65,9 +65,9 @@
 	name string
 }
 
-func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
 	var call rpc.ClientCall
-	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", nil, opts...); err != nil {
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0}, opts...); err != nil {
 		return
 	}
 	ocall = &implSyncGetDeltasClientCall{ClientCall: call}
@@ -207,7 +207,7 @@
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
 	// next Database in common with this responder.
-	GetDeltas(*context.T, SyncGetDeltasServerCall) error
+	GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, initiator string) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -236,7 +236,7 @@
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
 	// next Database in common with this responder.
-	GetDeltas(*context.T, *SyncGetDeltasServerCallStub) error
+	GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, initiator string) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -279,8 +279,8 @@
 	gs   *rpc.GlobState
 }
 
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub) error {
-	return s.impl.GetDeltas(ctx, call)
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 string) error {
+	return s.impl.GetDeltas(ctx, call, i0)
 }
 
 func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
@@ -315,6 +315,9 @@
 		{
 			Name: "GetDeltas",
 			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
+			InArgs: []rpc.ArgDesc{
+				{"initiator", ``}, // string
+			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index e237277..5d3d9b8 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -300,7 +300,7 @@
 	for mt := range iSt.mtTables {
 		absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix)
 		c := interfaces.SyncClient(absName)
-		stream, err := c.GetDeltas(ctx)
+		stream, err := c.GetDeltas(ctx, iSt.sync.name)
 		if err == nil {
 			vlog.VI(3).Infof("sync: connectToPeer: established on %s", absName)
 			return stream, true
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 2211da4..6b4fad1 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -9,6 +9,7 @@
 	"sort"
 	"strings"
 
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -19,9 +20,9 @@
 )
 
 // GetDeltas implements the responder side of the GetDeltas RPC.
-func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
-	vlog.VI(2).Infof("sync: GetDeltas: begin")
-	defer vlog.VI(2).Infof("sync: GetDeltas: end")
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, initiator string) error {
+	vlog.VI(2).Infof("sync: GetDeltas: begin: from initiator %s", initiator)
+	defer vlog.VI(2).Infof("sync: GetDeltas: end: from initiator %s", initiator)
 
 	recvr := call.RecvStream()
 	for recvr.Advance() {
@@ -31,7 +32,7 @@
 		// the failure might be genuine. For example, the responder is
 		// no longer part of the requested SyncGroups, or the app/db is
 		// locally deleted, or a permission change has denied access.
-		rSt := newResponderState(ctx, call, s, req)
+		rSt := newResponderState(ctx, call, s, req, initiator)
 		rSt.sendDeltasPerDatabase(ctx)
 	}
 
@@ -42,17 +43,18 @@
 // responderState is state accumulated per Database by the responder during an
 // initiation round.
 type responderState struct {
-	req      interfaces.DeltaReq
-	call     interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
-	errState error                              // Captures the error from the first two phases of the responder.
-	sync     *syncService
-	st       store.Store // Store handle to the Database.
-	diff     genRangeVector
-	outVec   interfaces.GenVector
+	req       interfaces.DeltaReq
+	call      interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
+	initiator string
+	errState  error // Captures the error from the first two phases of the responder.
+	sync      *syncService
+	st        store.Store // Store handle to the Database.
+	diff      genRangeVector
+	outVec    interfaces.GenVector
 }
 
-func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq) *responderState {
-	rSt := &responderState{call: call, sync: sync, req: req}
+func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq, initiator string) *responderState {
+	rSt := &responderState{call: call, sync: sync, req: req, initiator: initiator}
 	return rSt
 }
 
@@ -138,6 +140,12 @@
 		for _, p := range sg.Spec.Prefixes {
 			allowedPfxs[p] = struct{}{}
 		}
+
+		// Add the initiator to the SyncGroup membership if not already
+		// in it.  It is a temporary solution until SyncGroup metadata
+		// is synchronized peer to peer.
+		// TODO(rdaoud): remove this when SyncGroups are synced.
+		rSt.addInitiatorToSyncGroup(ctx, sgid)
 	}
 
 	// Filter the initiator's prefixes to what is allowed.
@@ -159,6 +167,39 @@
 	return
 }
 
+// addInitiatorToSyncGroup adds the request initiator to the membership of the
+// given SyncGroup if the initiator is not already a member.  It is a temporary
+// solution until SyncGroup metadata starts being synchronized, at which time
+// peers will learn of new members through mutations of the SyncGroup metadata
+// by the SyncGroup administrators.
+// Note: the joiner metadata is fake because the responder does not have it.
+func (rSt *responderState) addInitiatorToSyncGroup(ctx *context.T, gid interfaces.GroupId) {
+	if rSt.initiator == "" {
+		return
+	}
+
+	err := store.RunInTransaction(rSt.st, func(tx store.StoreReadWriter) error {
+		sg, err := getSyncGroupById(ctx, tx, gid)
+		if err != nil {
+			return err
+		}
+
+		// If the initiator is already a member of the SyncGroup abort
+		// the transaction with a special error code.
+		if _, ok := sg.Joiners[rSt.initiator]; ok {
+			return verror.New(verror.ErrExist, ctx, "member already in SyncGroup")
+		}
+
+		vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
+		sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
+		return setSGDataEntry(ctx, tx, gid, sg)
+	})
+
+	if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
+		vlog.Errorf("sync: addInitiatorToSyncGroup: initiator %s, sgid %d: %v", rSt.initiator, gid, err)
+	}
+}
+
 // computeDeltaBound computes the bound on missing generations across all
 // requested prefixes (phase 2 of sendDeltas).
 func (rSt *responderState) computeDeltaBound(ctx *context.T) {
diff --git a/x/ref/services/syncbase/vsync/responder_test.go b/x/ref/services/syncbase/vsync/responder_test.go
index cf67ca8..986f9b3 100644
--- a/x/ref/services/syncbase/vsync/responder_test.go
+++ b/x/ref/services/syncbase/vsync/responder_test.go
@@ -111,7 +111,7 @@
 	for _, test := range tests {
 		want := test.genDiffWant
 		got := test.genDiffIn
-		rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{})
+		rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{}, "fakeInitiator")
 		rSt.diff = got
 		rSt.diffPrefixGenVectors(test.respPVec, test.initPVec)
 		checkEqualDevRanges(t, got, want)
@@ -353,7 +353,7 @@
 		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, checkptGen: test.respGen, genvec: test.respVec}
 
 		req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
-		rSt := newResponderState(nil, nil, s, req)
+		rSt := newResponderState(nil, nil, s, req, "fakeInitiator")
 
 		rSt.computeDeltaBound(nil)
 		if rSt.errState != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 1b158f6..961121c 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -628,16 +628,15 @@
 			return err
 		}
 
-		gid, err := getSyncGroupId(ctx, tx, sgName)
+		sg, err := getSyncGroupByName(ctx, tx, sgName)
 		if err != nil {
 			return err
 		}
-		sg, err := getSyncGroupById(ctx, tx, gid)
 
 		// TODO(hpucha): Check SyncGroup ACL. Perform version checking.
 
 		sg.Spec = spec
-		return setSGDataEntry(ctx, tx, gid, sg)
+		return setSGDataEntry(ctx, tx, sg.Id, sg)
 	})
 	return err
 }