Merge "internal storage engine: improve compile-time typing"
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index 470d5b9..ee15a16 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -22,7 +22,7 @@
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
 	// next Database in common with this responder.
-	GetDeltas() stream<DeltaReq, DeltaResp> error {access.Read}
+	GetDeltas(initiator string) stream<DeltaReq, DeltaResp> error {access.Read}
 
 	// SyncGroup-related methods.
 
diff --git a/services/syncbase/server/interfaces/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
index adb964f..d378170 100644
--- a/services/syncbase/server/interfaces/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -36,7 +36,7 @@
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
 	// next Database in common with this responder.
-	GetDeltas(*context.T, ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
+	GetDeltas(ctx *context.T, initiator string, opts ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
@@ -65,9 +65,9 @@
 	name string
 }
 
-func (c implSyncClientStub) GetDeltas(ctx *context.T, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
+func (c implSyncClientStub) GetDeltas(ctx *context.T, i0 string, opts ...rpc.CallOpt) (ocall SyncGetDeltasClientCall, err error) {
 	var call rpc.ClientCall
-	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", nil, opts...); err != nil {
+	if call, err = v23.GetClient(ctx).StartCall(ctx, c.name, "GetDeltas", []interface{}{i0}, opts...); err != nil {
 		return
 	}
 	ocall = &implSyncGetDeltasClientCall{ClientCall: call}
@@ -207,7 +207,7 @@
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
 	// next Database in common with this responder.
-	GetDeltas(*context.T, SyncGetDeltasServerCall) error
+	GetDeltas(ctx *context.T, call SyncGetDeltasServerCall, initiator string) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -236,7 +236,7 @@
 	// record. The initiator parses the stream between a Start and a Finish
 	// record as the response to its DeltaReq, and then moves on to the
 	// next Database in common with this responder.
-	GetDeltas(*context.T, *SyncGetDeltasServerCallStub) error
+	GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, initiator string) error
 	// PublishSyncGroup is typically invoked on a "central" peer to publish
 	// the SyncGroup.
 	PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
@@ -279,8 +279,8 @@
 	gs   *rpc.GlobState
 }
 
-func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub) error {
-	return s.impl.GetDeltas(ctx, call)
+func (s implSyncServerStub) GetDeltas(ctx *context.T, call *SyncGetDeltasServerCallStub, i0 string) error {
+	return s.impl.GetDeltas(ctx, call, i0)
 }
 
 func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
@@ -315,6 +315,9 @@
 		{
 			Name: "GetDeltas",
 			Doc:  "// GetDeltas returns the responder's current generation vector and all\n// the missing log records when compared to the initiator's generation\n// vector. This process happens one Database at a time encompassing all\n// the SyncGroups common to the initiator and the responder. For each\n// Database, the initiator sends a DeltaReq. In response, the\n// responder sends a \"Start\" DeltaResp record, all the missing log\n// records, the responder's genvector, and a \"Finish\" DeltaResp\n// record. The initiator parses the stream between a Start and a Finish\n// record as the response to its DeltaReq, and then moves on to the\n// next Database in common with this responder.",
+			InArgs: []rpc.ArgDesc{
+				{"initiator", ``}, // string
+			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
diff --git a/services/syncbase/vsync/initiator.go b/services/syncbase/vsync/initiator.go
index e237277..5d3d9b8 100644
--- a/services/syncbase/vsync/initiator.go
+++ b/services/syncbase/vsync/initiator.go
@@ -300,7 +300,7 @@
 	for mt := range iSt.mtTables {
 		absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix)
 		c := interfaces.SyncClient(absName)
-		stream, err := c.GetDeltas(ctx)
+		stream, err := c.GetDeltas(ctx, iSt.sync.name)
 		if err == nil {
 			vlog.VI(3).Infof("sync: connectToPeer: established on %s", absName)
 			return stream, true
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 8d5b384..8ba9cd9 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -9,6 +9,7 @@
 	"sort"
 	"strings"
 
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -19,9 +20,9 @@
 )
 
 // GetDeltas implements the responder side of the GetDeltas RPC.
-func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
-	vlog.VI(2).Infof("sync: GetDeltas: begin")
-	defer vlog.VI(2).Infof("sync: GetDeltas: end")
+func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall, initiator string) error {
+	vlog.VI(2).Infof("sync: GetDeltas: begin: from initiator %s", initiator)
+	defer vlog.VI(2).Infof("sync: GetDeltas: end: from initiator %s", initiator)
 
 	recvr := call.RecvStream()
 	for recvr.Advance() {
@@ -31,7 +32,7 @@
 		// the failure might be genuine. For example, the responder is
 		// no longer part of the requested SyncGroups, or the app/db is
 		// locally deleted, or a permission change has denied access.
-		rSt := newResponderState(ctx, call, s, req)
+		rSt := newResponderState(ctx, call, s, req, initiator)
 		rSt.sendDeltasPerDatabase(ctx)
 	}
 
@@ -42,17 +43,18 @@
 // responderState is state accumulated per Database by the responder during an
 // initiation round.
 type responderState struct {
-	req      interfaces.DeltaReq
-	call     interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
-	errState error                              // Captures the error from the first two phases of the responder.
-	sync     *syncService
-	st       store.Store // Store handle to the Database.
-	diff     genRangeVector
-	outVec   interfaces.GenVector
+	req       interfaces.DeltaReq
+	call      interfaces.SyncGetDeltasServerCall // Stream handle for the GetDeltas RPC.
+	initiator string
+	errState  error // Captures the error from the first two phases of the responder.
+	sync      *syncService
+	st        store.Store // Store handle to the Database.
+	diff      genRangeVector
+	outVec    interfaces.GenVector
 }
 
-func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq) *responderState {
-	rSt := &responderState{call: call, sync: sync, req: req}
+func newResponderState(ctx *context.T, call interfaces.SyncGetDeltasServerCall, sync *syncService, req interfaces.DeltaReq, initiator string) *responderState {
+	rSt := &responderState{call: call, sync: sync, req: req, initiator: initiator}
 	return rSt
 }
 
@@ -138,6 +140,12 @@
 		for _, p := range sg.Spec.Prefixes {
 			allowedPfxs[p] = struct{}{}
 		}
+
+		// Add the initiator to the SyncGroup membership if not already
+		// in it.  It is a temporary solution until SyncGroup metadata
+		// is synchronized peer to peer.
+		// TODO(rdaoud): remove this when SyncGroups are synced.
+		rSt.addInitiatorToSyncGroup(ctx, sgid)
 	}
 
 	// Filter the initiator's prefixes to what is allowed.
@@ -159,6 +167,39 @@
 	return
 }
 
+// addInitiatorToSyncGroup adds the request initiator to the membership of the
+// given SyncGroup if the initiator is not already a member.  It is a temporary
+// solution until SyncGroup metadata starts being synchronized, at which time
+// peers will learn of new members through mutations of the SyncGroup metadata
+// by the SyncGroup administrators.
+// Note: the joiner metadata is fake because the responder does not have it.
+func (rSt *responderState) addInitiatorToSyncGroup(ctx *context.T, gid interfaces.GroupId) {
+	if rSt.initiator == "" {
+		return
+	}
+
+	err := store.RunInTransaction(rSt.st, func(tx store.StoreReadWriter) error {
+		sg, err := getSyncGroupById(ctx, tx, gid)
+		if err != nil {
+			return err
+		}
+
+		// If the initiator is already a member of the SyncGroup abort
+		// the transaction with a special error code.
+		if _, ok := sg.Joiners[rSt.initiator]; ok {
+			return verror.New(verror.ErrExist, ctx, "member already in SyncGroup")
+		}
+
+		vlog.VI(4).Infof("sync: addInitiatorToSyncGroup: add %s to sgid %d", rSt.initiator, gid)
+		sg.Joiners[rSt.initiator] = wire.SyncGroupMemberInfo{SyncPriority: 1}
+		return setSGDataEntry(ctx, tx, gid, sg)
+	})
+
+	if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
+		vlog.Errorf("sync: addInitiatorToSyncGroup: initiator %s, sgid %d: %v", rSt.initiator, gid, err)
+	}
+}
+
 // computeDeltaBound computes the bound on missing generations across all
 // requested prefixes (phase 2 of sendDeltas).
 func (rSt *responderState) computeDeltaBound(ctx *context.T) {
diff --git a/services/syncbase/vsync/responder_test.go b/services/syncbase/vsync/responder_test.go
index cf67ca8..986f9b3 100644
--- a/services/syncbase/vsync/responder_test.go
+++ b/services/syncbase/vsync/responder_test.go
@@ -111,7 +111,7 @@
 	for _, test := range tests {
 		want := test.genDiffWant
 		got := test.genDiffIn
-		rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{})
+		rSt := newResponderState(nil, nil, s, interfaces.DeltaReq{}, "fakeInitiator")
 		rSt.diff = got
 		rSt.diffPrefixGenVectors(test.respPVec, test.initPVec)
 		checkEqualDevRanges(t, got, want)
@@ -353,7 +353,7 @@
 		s.syncState[appDbName(appName, dbName)] = &dbSyncStateInMem{gen: test.respGen, checkptGen: test.respGen, genvec: test.respVec}
 
 		req := interfaces.DeltaReq{AppName: appName, DbName: dbName, InitVec: test.initVec}
-		rSt := newResponderState(nil, nil, s, req)
+		rSt := newResponderState(nil, nil, s, req, "fakeInitiator")
 
 		rSt.computeDeltaBound(nil)
 		if rSt.errState != nil || !reflect.DeepEqual(rSt.outVec, wantVec) {
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 2f341e4..bf4e3aa 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -618,16 +618,15 @@
 			return err
 		}
 
-		gid, err := getSyncGroupId(ctx, tx, sgName)
+		sg, err := getSyncGroupByName(ctx, tx, sgName)
 		if err != nil {
 			return err
 		}
-		sg, err := getSyncGroupById(ctx, tx, gid)
 
 		// TODO(hpucha): Check SyncGroup ACL. Perform version checking.
 
 		sg.Spec = spec
-		return setSGDataEntry(ctx, tx, gid, sg)
+		return setSGDataEntry(ctx, tx, sg.Id, sg)
 	})
 	return err
 }