syncbase/vsync: store iterator function

Refactor the app/dbs store scanning into a separate utility function.
Change the refresh of the membership view to use that iterator.

Change-Id: I787c3855245c7c956cbc027da3b0fb00badf0384
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index e56940d..4e95c12 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -21,6 +21,7 @@
 	"v.io/v23/rpc"
 	"v.io/v23/verror"
 	"v.io/v23/vom"
+	"v.io/x/lib/vlog"
 )
 
 var (
@@ -164,56 +165,36 @@
 	}
 
 	// Create a new aggregate view of SyncGroup members across all app databases.
-	// Get the apps and iterate over them.
-	appNames, err := s.sv.AppNames(ctx, nil)
-	if err != nil {
-		return
-	}
-
 	newMembers := make(map[string]*memberInfo)
 	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
 
-	// For each app, get its databases and iterate over them.
-	for _, a := range appNames {
-		app, err := s.sv.App(ctx, nil, a)
-		if err != nil {
-			continue
-		}
-		dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
-		if err != nil {
-			continue
-		}
-
+	s.forEachDatabaseStore(ctx, func(st store.Store) bool {
 		// For each database, fetch its SyncGroup data entries by scanning their
 		// prefix range.  Use a database snapshot for the scan.
-		for _, d := range dbNames {
-			db, err := app.NoSQLDatabase(ctx, nil, d)
-			if err != nil {
+		sn := st.NewSnapshot()
+		defer sn.Close()
+
+		stream := sn.Scan(scanStart, scanLimit)
+		for stream.Advance() {
+			var sg SyncGroup
+			if vom.Decode(stream.Value(nil), &sg) != nil {
+				vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
 				continue
 			}
 
-			sn := db.St().NewSnapshot()
-			stream := sn.Scan(scanStart, scanLimit)
-			for stream.Advance() {
-				var sg SyncGroup
-				if vom.Decode(stream.Value(nil), &sg) != nil {
-					continue
-				}
-
-				// Add all members of this SyncGroup to the membership view.
-				for member, info := range sg.Joiners {
-					if _, ok := newMembers[member]; !ok {
-						newMembers[member] = &memberInfo{
-							gid2info: make(map[GroupId]nosql.SyncGroupMemberInfo),
-						}
+			// Add all members of this SyncGroup to the membership view.
+			// A member's info is different across SyncGroups, so gather all of them.
+			for member, info := range sg.Joiners {
+				if _, ok := newMembers[member]; !ok {
+					newMembers[member] = &memberInfo{
+						gid2info: make(map[GroupId]nosql.SyncGroupMemberInfo),
 					}
-					newMembers[member].gid2info[sg.Id] = info
 				}
+				newMembers[member].gid2info[sg.Id] = info
 			}
-
-			sn.Close()
 		}
-	}
+		return false
+	})
 
 	view.members = newMembers
 	view.expiration = time.Now().Add(memberViewTTL)
diff --git a/services/syncbase/vsync/util.go b/services/syncbase/vsync/util.go
new file mode 100644
index 0000000..4f6d285
--- /dev/null
+++ b/services/syncbase/vsync/util.go
@@ -0,0 +1,52 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Sync utility functions
+
+import (
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/context"
+)
+
+// forEachDatabaseStore iterates over all Databases in all Apps within the
+// service and invokes the callback function provided on each database.
+// The callback returns a "done" flag to make storeIter() stop the iteration
+// earlier, otherwise the function loops across all databases of all apps.
+func (s *syncService) forEachDatabaseStore(ctx *context.T, callback func(store.Store) bool) {
+	// Get the apps and iterate over them.
+	// TODO(rdaoud): use a "privileged call" parameter instead of nil.
+	appNames, err := s.sv.AppNames(ctx, nil)
+	if err != nil {
+		return
+	}
+
+	for _, a := range appNames {
+		// For each app, get its databases and iterate over them.
+		// TODO(rdaoud): use a "privileged call" parameter instead of nil.
+		app, err := s.sv.App(ctx, nil, a)
+		if err != nil {
+			continue
+		}
+		// TODO(rdaoud): use a "privileged call" parameter instead of nil.
+		dbNames, err := app.NoSQLDatabaseNames(ctx, nil)
+		if err != nil {
+			continue
+		}
+
+		for _, d := range dbNames {
+			// For each database, get its Store and invoke the callback.
+			// TODO(rdaoud): use a "privileged call" parameter instead of nil.
+			db, err := app.NoSQLDatabase(ctx, nil, d)
+			if err != nil {
+				continue
+			}
+
+			if callback(db.St()) {
+				return // done, early exit
+			}
+		}
+	}
+}