syncbase/vsync: store iterator function
Refactor the app/dbs store scanning into a separate utility function.
Change the refresh of the membership view to use that iterator.
Change-Id: I787c3855245c7c956cbc027da3b0fb00badf0384
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index e56940d..4e95c12 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -21,6 +21,7 @@
"v.io/v23/rpc"
"v.io/v23/verror"
"v.io/v23/vom"
+ "v.io/x/lib/vlog"
)
var (
@@ -164,56 +165,36 @@
}
// Create a new aggregate view of SyncGroup members across all app databases.
- // Get the apps and iterate over them.
- appNames, err := s.sv.AppNames(ctx, nil)
- if err != nil {
- return
- }
-
newMembers := make(map[string]*memberInfo)
scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
- // For each app, get its databases and iterate over them.
- for _, a := range appNames {
- app, err := s.sv.App(ctx, nil, a)
- if err != nil {
- continue
- }
- dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
- if err != nil {
- continue
- }
-
+ s.forEachDatabaseStore(ctx, func(st store.Store) bool {
// For each database, fetch its SyncGroup data entries by scanning their
// prefix range. Use a database snapshot for the scan.
- for _, d := range dbNames {
- db, err := app.NoSQLDatabase(ctx, nil, d)
- if err != nil {
+ sn := st.NewSnapshot()
+ defer sn.Close()
+
+ stream := sn.Scan(scanStart, scanLimit)
+ for stream.Advance() {
+ var sg SyncGroup
+ if vom.Decode(stream.Value(nil), &sg) != nil {
+ vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
continue
}
- sn := db.St().NewSnapshot()
- stream := sn.Scan(scanStart, scanLimit)
- for stream.Advance() {
- var sg SyncGroup
- if vom.Decode(stream.Value(nil), &sg) != nil {
- continue
- }
-
- // Add all members of this SyncGroup to the membership view.
- for member, info := range sg.Joiners {
- if _, ok := newMembers[member]; !ok {
- newMembers[member] = &memberInfo{
- gid2info: make(map[GroupId]nosql.SyncGroupMemberInfo),
- }
+ // Add all members of this SyncGroup to the membership view.
+ // A member's info is different across SyncGroups, so gather all of them.
+ for member, info := range sg.Joiners {
+ if _, ok := newMembers[member]; !ok {
+ newMembers[member] = &memberInfo{
+ gid2info: make(map[GroupId]nosql.SyncGroupMemberInfo),
}
- newMembers[member].gid2info[sg.Id] = info
}
+ newMembers[member].gid2info[sg.Id] = info
}
-
- sn.Close()
}
- }
+ return false
+ })
view.members = newMembers
view.expiration = time.Now().Add(memberViewTTL)