syncbase/vsync: track app/db in SG membership view.

* Modify the SyncGroup membership view to track the nesting of SyncGroups
  within their app/db shards.
* The DB iterator now passes the app/db names to the callback function.

Change-Id: Ie1a1bb0af92307dd7731787681333b5943f6b550
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index 4d4e271..dce51f2 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -116,7 +116,7 @@
 	s.syncStateLock.Lock()
 	defer s.syncStateLock.Unlock()
 
-	name := globalDbName(appName, dbName)
+	name := appDbName(appName, dbName)
 	ds, ok := s.syncState[name]
 	if !ok {
 		ds = &dbSyncStateInMem{gen: 1}
@@ -132,8 +132,10 @@
 	return gen, pos
 }
 
-// globalDbName returns the global name of a Database by combining the app and db names.
-func globalDbName(appName, dbName string) string {
+// appDbName combines the app and db names to return a globally unique name for
+// a Database.  This relies on the fact that the app name is globally unique and
+// the db name is unique within the scope of the app.
+func appDbName(appName, dbName string) string {
 	return util.JoinKeyParts(appName, dbName)
 }
 
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index 03cb268..8606472 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -30,7 +30,7 @@
 		wantGen += 5
 		wantPos += 10
 
-		name := globalDbName("mockapp", "mockdb")
+		name := appDbName("mockapp", "mockdb")
 		if s.syncState[name].gen != wantGen || s.syncState[name].pos != wantPos {
 			t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", s.syncState[name].gen, wantGen, s.syncState[name].pos, wantPos)
 		}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 9e6efd4..da98994 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -54,11 +54,15 @@
 }
 
 // memberInfo holds the member metadata for each SyncGroup this member belongs
-// to.
+// to within each App/Database (i.e. global database name).  It's a mapping of
+// global DB names to sets of SyncGroup member information.
 type memberInfo struct {
-	gid2info map[interfaces.GroupId]wire.SyncGroupMemberInfo
+	db2sg map[string]sgMemberInfo
 }
 
+// sgMemberInfo maps SyncGroups to their member metadata.
+type sgMemberInfo map[interfaces.GroupId]wire.SyncGroupMemberInfo
+
 // newSyncGroupVersion generates a random SyncGroup version ("etag").
 func newSyncGroupVersion() string {
 	return fmt.Sprintf("%x", rand64())
@@ -195,7 +199,7 @@
 	if view == nil {
 		// The empty expiration time in Go is before "now" and treated as expired
 		// below.
-		view = &memberView{expiration: time.Time{}, members: make(map[string]*memberInfo)}
+		view = &memberView{expiration: time.Time{}, members: nil}
 		s.allMembers = view
 	}
 
@@ -207,11 +211,12 @@
 	newMembers := make(map[string]*memberInfo)
 	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
 
-	s.forEachDatabaseStore(ctx, func(st store.Store) bool {
+	s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
 		// For each database, fetch its SyncGroup data entries by scanning their
 		// prefix range.  Use a database snapshot for the scan.
 		sn := st.NewSnapshot()
 		defer sn.Close()
+		name := appDbName(appName, dbName)
 
 		stream := sn.Scan(scanStart, scanLimit)
 		for stream.Advance() {
@@ -225,11 +230,12 @@
 			// A member's info is different across SyncGroups, so gather all of them.
 			for member, info := range sg.Joiners {
 				if _, ok := newMembers[member]; !ok {
-					newMembers[member] = &memberInfo{
-						gid2info: make(map[interfaces.GroupId]wire.SyncGroupMemberInfo),
-					}
+					newMembers[member] = &memberInfo{db2sg: make(map[string]sgMemberInfo)}
 				}
-				newMembers[member].gid2info[sg.Id] = info
+				if _, ok := newMembers[member].db2sg[name]; !ok {
+					newMembers[member].db2sg[name] = make(sgMemberInfo)
+				}
+				newMembers[member].db2sg[name][sg.Id] = info
 			}
 		}
 		return false
@@ -246,7 +252,11 @@
 
 	members := make(map[string]uint32)
 	for member, info := range s.allMembers.members {
-		members[member] = uint32(len(info.gid2info))
+		count := 0
+		for _, sgmi := range info.db2sg {
+			count += len(sgmi)
+		}
+		members[member] = uint32(count)
 	}
 
 	return members
@@ -669,7 +679,7 @@
 	// feedback from app developers (see discussion in SyncGroup API
 	// doc). If we decide to keep the SG name as stand-alone, this scan can
 	// be optimized by a lazy cache of sgname to <app, db> info.
-	s.forEachDatabaseStore(ctx, func(st store.Store) bool {
+	s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
 		if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
 			// Found the SyncGroup being looked for.
 			dbSt = st
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index f41436c..31ad4c2 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -25,10 +25,12 @@
 		t.Errorf("num-members (%s): got %v instead of %v", which, num, numMembers)
 	}
 
-	sgids := make(map[interfaces.GroupId]struct{})
+	sgids := make(map[interfaces.GroupId]bool)
 	for _, info := range view.members {
-		for gid := range info.gid2info {
-			sgids[gid] = struct{}{}
+		for _, sgmi := range info.db2sg {
+			for gid := range sgmi {
+				sgids[gid] = true
+			}
 		}
 	}
 
@@ -116,11 +118,19 @@
 		if mi == nil {
 			t.Errorf("cannot get info for SyncGroup member %s", mm)
 		}
-		if len(mi.gid2info) != 1 {
+		if len(mi.db2sg) != 1 {
 			t.Errorf("invalid info for SyncGroup member %s: %v", mm, mi)
 		}
+		var sgmi sgMemberInfo
+		for _, v := range mi.db2sg {
+			sgmi = v
+			break
+		}
+		if len(sgmi) != 1 {
+			t.Errorf("invalid member info for SyncGroup member %s: %v", mm, sgmi)
+		}
 		expJoinerInfo := sg.Joiners[mm]
-		joinerInfo := mi.gid2info[sgId]
+		joinerInfo := sgmi[sgId]
 		if !reflect.DeepEqual(joinerInfo, expJoinerInfo) {
 			t.Errorf("invalid Info for SyncGroup member %s in group ID %d: got %v instead of %v",
 				mm, sgId, joinerInfo, expJoinerInfo)
@@ -363,29 +373,39 @@
 
 	expMemberInfo := map[string]*memberInfo{
 		"phone": &memberInfo{
-			gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
-				sgId1: sg1.Joiners["phone"],
+			db2sg: map[string]sgMemberInfo{
+				"mockapp:mockdb": sgMemberInfo{
+					sgId1: sg1.Joiners["phone"],
+				},
 			},
 		},
 		"tablet": &memberInfo{
-			gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
-				sgId1: sg1.Joiners["tablet"],
-				sgId2: sg2.Joiners["tablet"],
+			db2sg: map[string]sgMemberInfo{
+				"mockapp:mockdb": sgMemberInfo{
+					sgId1: sg1.Joiners["tablet"],
+					sgId2: sg2.Joiners["tablet"],
+				},
 			},
 		},
 		"cloud": &memberInfo{
-			gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
-				sgId1: sg1.Joiners["cloud"],
+			db2sg: map[string]sgMemberInfo{
+				"mockapp:mockdb": sgMemberInfo{
+					sgId1: sg1.Joiners["cloud"],
+				},
 			},
 		},
 		"door": &memberInfo{
-			gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
-				sgId2: sg2.Joiners["door"],
+			db2sg: map[string]sgMemberInfo{
+				"mockapp:mockdb": sgMemberInfo{
+					sgId2: sg2.Joiners["door"],
+				},
 			},
 		},
 		"lamp": &memberInfo{
-			gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
-				sgId2: sg2.Joiners["lamp"],
+			db2sg: map[string]sgMemberInfo{
+				"mockapp:mockdb": sgMemberInfo{
+					sgId2: sg2.Joiners["lamp"],
+				},
 			},
 		},
 	}
@@ -425,18 +445,24 @@
 
 	expMemberInfo = map[string]*memberInfo{
 		"tablet": &memberInfo{
-			gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
-				sgId2: sg2.Joiners["tablet"],
+			db2sg: map[string]sgMemberInfo{
+				"mockapp:mockdb": sgMemberInfo{
+					sgId2: sg2.Joiners["tablet"],
+				},
 			},
 		},
 		"door": &memberInfo{
-			gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
-				sgId2: sg2.Joiners["door"],
+			db2sg: map[string]sgMemberInfo{
+				"mockapp:mockdb": sgMemberInfo{
+					sgId2: sg2.Joiners["door"],
+				},
 			},
 		},
 		"lamp": &memberInfo{
-			gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
-				sgId2: sg2.Joiners["lamp"],
+			db2sg: map[string]sgMemberInfo{
+				"mockapp:mockdb": sgMemberInfo{
+					sgId2: sg2.Joiners["lamp"],
+				},
 			},
 		},
 	}
diff --git a/services/syncbase/vsync/util.go b/services/syncbase/vsync/util.go
index bb34c27..01bc3eb 100644
--- a/services/syncbase/vsync/util.go
+++ b/services/syncbase/vsync/util.go
@@ -16,7 +16,7 @@
 // service and invokes the callback function on each database. The callback
 // returns a "done" flag to make forEachDatabaseStore() stop the iteration
 // earlier; otherwise the function loops across all databases of all apps.
-func (s *syncService) forEachDatabaseStore(ctx *context.T, callback func(store.Store) bool) {
+func (s *syncService) forEachDatabaseStore(ctx *context.T, callback func(string, string, store.Store) bool) {
 	// Get the apps and iterate over them.
 	// TODO(rdaoud): use a "privileged call" parameter instead of nil (here and
 	// elsewhere).
@@ -47,7 +47,7 @@
 				continue
 			}
 
-			if callback(db.St()) {
+			if callback(a, d, db.St()) {
 				return // done, early exit
 			}
 		}