syncbase/vsync: track app/db in SG membership view.
* Modify the SyncGroup membership view to track the nesting of SyncGroups
within their app/db shards.
* The DB iterator now passes the app/db names to the callback function.
Change-Id: Ie1a1bb0af92307dd7731787681333b5943f6b550
diff --git a/services/syncbase/vsync/sync_state.go b/services/syncbase/vsync/sync_state.go
index 4d4e271..dce51f2 100644
--- a/services/syncbase/vsync/sync_state.go
+++ b/services/syncbase/vsync/sync_state.go
@@ -116,7 +116,7 @@
s.syncStateLock.Lock()
defer s.syncStateLock.Unlock()
- name := globalDbName(appName, dbName)
+ name := appDbName(appName, dbName)
ds, ok := s.syncState[name]
if !ok {
ds = &dbSyncStateInMem{gen: 1}
@@ -132,8 +132,10 @@
return gen, pos
}
-// globalDbName returns the global name of a Database by combining the app and db names.
-func globalDbName(appName, dbName string) string {
+// appDbName combines the app and db names to return a globally unique name for
+// a Database. This relies on the fact that the app name is globally unique and
+// the db name is unique within the scope of the app.
+func appDbName(appName, dbName string) string {
return util.JoinKeyParts(appName, dbName)
}
diff --git a/services/syncbase/vsync/sync_state_test.go b/services/syncbase/vsync/sync_state_test.go
index 03cb268..8606472 100644
--- a/services/syncbase/vsync/sync_state_test.go
+++ b/services/syncbase/vsync/sync_state_test.go
@@ -30,7 +30,7 @@
wantGen += 5
wantPos += 10
- name := globalDbName("mockapp", "mockdb")
+ name := appDbName("mockapp", "mockdb")
if s.syncState[name].gen != wantGen || s.syncState[name].pos != wantPos {
t.Fatalf("reserveGenAndPosInternal failed, gotGen %v wantGen %v, gotPos %v wantPos %v", s.syncState[name].gen, wantGen, s.syncState[name].pos, wantPos)
}
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 9e6efd4..da98994 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -54,11 +54,15 @@
}
// memberInfo holds the member metadata for each SyncGroup this member belongs
-// to.
+// to within each App/Database (i.e. global database name). It's a mapping of
+// global DB names to sets of SyncGroup member information.
type memberInfo struct {
- gid2info map[interfaces.GroupId]wire.SyncGroupMemberInfo
+ db2sg map[string]sgMemberInfo
}
+// sgMemberInfo maps SyncGroups to their member metadata.
+type sgMemberInfo map[interfaces.GroupId]wire.SyncGroupMemberInfo
+
// newSyncGroupVersion generates a random SyncGroup version ("etag").
func newSyncGroupVersion() string {
return fmt.Sprintf("%x", rand64())
@@ -195,7 +199,7 @@
if view == nil {
// The empty expiration time in Go is before "now" and treated as expired
// below.
- view = &memberView{expiration: time.Time{}, members: make(map[string]*memberInfo)}
+ view = &memberView{expiration: time.Time{}, members: nil}
s.allMembers = view
}
@@ -207,11 +211,12 @@
newMembers := make(map[string]*memberInfo)
scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
- s.forEachDatabaseStore(ctx, func(st store.Store) bool {
+ s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
// For each database, fetch its SyncGroup data entries by scanning their
// prefix range. Use a database snapshot for the scan.
sn := st.NewSnapshot()
defer sn.Close()
+ name := appDbName(appName, dbName)
stream := sn.Scan(scanStart, scanLimit)
for stream.Advance() {
@@ -225,11 +230,12 @@
// A member's info is different across SyncGroups, so gather all of them.
for member, info := range sg.Joiners {
if _, ok := newMembers[member]; !ok {
- newMembers[member] = &memberInfo{
- gid2info: make(map[interfaces.GroupId]wire.SyncGroupMemberInfo),
- }
+ newMembers[member] = &memberInfo{db2sg: make(map[string]sgMemberInfo)}
}
- newMembers[member].gid2info[sg.Id] = info
+ if _, ok := newMembers[member].db2sg[name]; !ok {
+ newMembers[member].db2sg[name] = make(sgMemberInfo)
+ }
+ newMembers[member].db2sg[name][sg.Id] = info
}
}
return false
@@ -246,7 +252,11 @@
members := make(map[string]uint32)
for member, info := range s.allMembers.members {
- members[member] = uint32(len(info.gid2info))
+ count := 0
+ for _, sgmi := range info.db2sg {
+ count += len(sgmi)
+ }
+ members[member] = uint32(count)
}
return members
@@ -669,7 +679,7 @@
// feedback from app developers (see discussion in SyncGroup API
// doc). If we decide to keep the SG name as stand-alone, this scan can
// be optimized by a lazy cache of sgname to <app, db> info.
- s.forEachDatabaseStore(ctx, func(st store.Store) bool {
+ s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
if gid, err = getSyncGroupId(ctx, st, sgName); err == nil {
// Found the SyncGroup being looked for.
dbSt = st
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index f41436c..31ad4c2 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -25,10 +25,12 @@
t.Errorf("num-members (%s): got %v instead of %v", which, num, numMembers)
}
- sgids := make(map[interfaces.GroupId]struct{})
+ sgids := make(map[interfaces.GroupId]bool)
for _, info := range view.members {
- for gid := range info.gid2info {
- sgids[gid] = struct{}{}
+ for _, sgmi := range info.db2sg {
+ for gid := range sgmi {
+ sgids[gid] = true
+ }
}
}
@@ -116,11 +118,19 @@
if mi == nil {
t.Errorf("cannot get info for SyncGroup member %s", mm)
}
- if len(mi.gid2info) != 1 {
+ if len(mi.db2sg) != 1 {
t.Errorf("invalid info for SyncGroup member %s: %v", mm, mi)
}
+ var sgmi sgMemberInfo
+ for _, v := range mi.db2sg {
+ sgmi = v
+ break
+ }
+ if len(sgmi) != 1 {
+ t.Errorf("invalid member info for SyncGroup member %s: %v", mm, sgmi)
+ }
expJoinerInfo := sg.Joiners[mm]
- joinerInfo := mi.gid2info[sgId]
+ joinerInfo := sgmi[sgId]
if !reflect.DeepEqual(joinerInfo, expJoinerInfo) {
t.Errorf("invalid Info for SyncGroup member %s in group ID %d: got %v instead of %v",
mm, sgId, joinerInfo, expJoinerInfo)
@@ -363,29 +373,39 @@
expMemberInfo := map[string]*memberInfo{
"phone": &memberInfo{
- gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
- sgId1: sg1.Joiners["phone"],
+ db2sg: map[string]sgMemberInfo{
+ "mockapp:mockdb": sgMemberInfo{
+ sgId1: sg1.Joiners["phone"],
+ },
},
},
"tablet": &memberInfo{
- gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
- sgId1: sg1.Joiners["tablet"],
- sgId2: sg2.Joiners["tablet"],
+ db2sg: map[string]sgMemberInfo{
+ "mockapp:mockdb": sgMemberInfo{
+ sgId1: sg1.Joiners["tablet"],
+ sgId2: sg2.Joiners["tablet"],
+ },
},
},
"cloud": &memberInfo{
- gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
- sgId1: sg1.Joiners["cloud"],
+ db2sg: map[string]sgMemberInfo{
+ "mockapp:mockdb": sgMemberInfo{
+ sgId1: sg1.Joiners["cloud"],
+ },
},
},
"door": &memberInfo{
- gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
- sgId2: sg2.Joiners["door"],
+ db2sg: map[string]sgMemberInfo{
+ "mockapp:mockdb": sgMemberInfo{
+ sgId2: sg2.Joiners["door"],
+ },
},
},
"lamp": &memberInfo{
- gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
- sgId2: sg2.Joiners["lamp"],
+ db2sg: map[string]sgMemberInfo{
+ "mockapp:mockdb": sgMemberInfo{
+ sgId2: sg2.Joiners["lamp"],
+ },
},
},
}
@@ -425,18 +445,24 @@
expMemberInfo = map[string]*memberInfo{
"tablet": &memberInfo{
- gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
- sgId2: sg2.Joiners["tablet"],
+ db2sg: map[string]sgMemberInfo{
+ "mockapp:mockdb": sgMemberInfo{
+ sgId2: sg2.Joiners["tablet"],
+ },
},
},
"door": &memberInfo{
- gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
- sgId2: sg2.Joiners["door"],
+ db2sg: map[string]sgMemberInfo{
+ "mockapp:mockdb": sgMemberInfo{
+ sgId2: sg2.Joiners["door"],
+ },
},
},
"lamp": &memberInfo{
- gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
- sgId2: sg2.Joiners["lamp"],
+ db2sg: map[string]sgMemberInfo{
+ "mockapp:mockdb": sgMemberInfo{
+ sgId2: sg2.Joiners["lamp"],
+ },
},
},
}
diff --git a/services/syncbase/vsync/util.go b/services/syncbase/vsync/util.go
index bb34c27..01bc3eb 100644
--- a/services/syncbase/vsync/util.go
+++ b/services/syncbase/vsync/util.go
@@ -16,7 +16,7 @@
// service and invokes the callback function on each database. The callback
// returns a "done" flag to make forEachDatabaseStore() stop the iteration
// earlier; otherwise the function loops across all databases of all apps.
-func (s *syncService) forEachDatabaseStore(ctx *context.T, callback func(store.Store) bool) {
+func (s *syncService) forEachDatabaseStore(ctx *context.T, callback func(string, string, store.Store) bool) {
// Get the apps and iterate over them.
// TODO(rdaoud): use a "privileged call" parameter instead of nil (here and
// elsewhere).
@@ -47,7 +47,7 @@
continue
}
- if callback(db.St()) {
+ if callback(a, d, db.St()) {
return // done, early exit
}
}