syncbase/vsync: bootstrap sync metadata for SyncGroup

Add the bootstrapping of sync metadata when a SyncGroup is created or
joined.  It notifies sync, through the watcher, of pre-existing database
entries that match the SyncGroup prefixes.  Such entries must be added
to the sync metadata (DAG & logs) at their current versions.

Change-Id: I37b4277a370fb31c520bbb858664e0ad078cce74
diff --git a/v23/syncbase/nosql/syncgroup_test.go b/v23/syncbase/nosql/syncgroup_test.go
index 072502f..3b2da73 100644
--- a/v23/syncbase/nosql/syncgroup_test.go
+++ b/v23/syncbase/nosql/syncgroup_test.go
@@ -32,11 +32,21 @@
 
 	createSyncGroup(t, ctx, d, sg1, spec, verror.ErrBadArg.ID)
 
+	// Prefill entries before creating a SyncGroup to exercise the bootstrap
+	// of a SyncGroup through Snapshot operations to the watcher.
+	t1 := tu.CreateTable(t, ctx, d, "t1")
+	for _, k := range []string{"foo123", "foobar123", "xyz"} {
+		if err := t1.Put(ctx, k, "value@"+k); err != nil {
+			t.Fatalf("t1.Put() of %s failed: %v", k, err)
+		}
+	}
+
 	// Create successfully.
+	// TODO(rdaoud): switch prefixes to (table, prefix) tuples.
 	spec = wire.SyncGroupSpec{
 		Description: "test syncgroup sg1",
 		Perms:       nil,
-		Prefixes:    []string{"t1/foo"},
+		Prefixes:    []string{"t1:foo"},
 	}
 	createSyncGroup(t, ctx, d, sg1, spec, verror.ID(""))
 
@@ -50,7 +60,7 @@
 
 	// Create a nested syncgroup.
 	spec.Description = "test syncgroup sg3"
-	spec.Prefixes = []string{"t1/foobar"}
+	spec.Prefixes = []string{"t1:foobar"}
 	sg3 := naming.Join(sName, util.SyncbaseSuffix, "sg3")
 	createSyncGroup(t, ctx, d, sg3, spec, verror.ID(""))
 
@@ -80,7 +90,7 @@
 	specA := wire.SyncGroupSpec{
 		Description: "test syncgroup sgA",
 		Perms:       perms("root/client1"),
-		Prefixes:    []string{"t1/foo"},
+		Prefixes:    []string{"t1:foo"},
 	}
 	sgNameA := naming.Join(sName, util.SyncbaseSuffix, "sgA")
 	createSyncGroup(t, ctx1, d1, sgNameA, specA, verror.ID(""))
@@ -113,7 +123,7 @@
 	specB := wire.SyncGroupSpec{
 		Description: "test syncgroup sgB",
 		Perms:       perms("root/client1", "root/client2"),
-		Prefixes:    []string{"t1/foo"},
+		Prefixes:    []string{"t1:foo"},
 	}
 	sgNameB := naming.Join(sName, util.SyncbaseSuffix, "sgB")
 	createSyncGroup(t, ctx1, d1, sgNameB, specB, verror.ID(""))
diff --git a/x/ref/services/syncbase/server/watchable/store.go b/x/ref/services/syncbase/server/watchable/store.go
index 0e3bb0f..82fc8d4 100644
--- a/x/ref/services/syncbase/server/watchable/store.go
+++ b/x/ref/services/syncbase/server/watchable/store.go
@@ -144,6 +144,14 @@
 	return newSnapshot(st)
 }
 
+// GetOptions returns the options configured on a watchable.Store.
+// TODO(rdaoud): expose watchable store through an interface and change this
+// function to be a method on the store.
+func GetOptions(st store.Store) (*Options, error) {
+	wst := st.(*wstore)
+	return wst.opts, nil
+}
+
 ////////////////////////////////////////
 // Internal helpers
 
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 9890b6a..3c4a054 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -161,7 +161,7 @@
 // operations (create, join, leave, destroy) to notify the sync watcher of the
 // change at its proper position in the timeline (the transaction commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.StoreReadWriter, prefixes []string, remove bool) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
@@ -178,7 +178,7 @@
 // current keys and their versions to use when initializing the sync metadata
 // at the point in the timeline when these keys become syncable (at commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
+func AddSyncSnapshotOp(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 4b6512c..31b1a3d 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -404,12 +404,7 @@
 		// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
 
 		// Take a snapshot of the data to bootstrap the SyncGroup.
-		if err := bootstrapSyncGroup(tx, spec.Prefixes); err != nil {
-			return err
-		}
-
-		tx1 := tx.(store.Transaction)
-		return watchable.AddSyncGroupOp(ctx, tx1, spec.Prefixes, false)
+		return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes)
 	})
 
 	if err != nil {
@@ -499,12 +494,7 @@
 		}
 
 		// Take a snapshot of the data to bootstrap the SyncGroup.
-		if err := bootstrapSyncGroup(tx, sg.Spec.Prefixes); err != nil {
-			return err
-		}
-
-		tx1 := tx.(store.Transaction)
-		return watchable.AddSyncGroupOp(ctx, tx1, sg.Spec.Prefixes, false)
+		return sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes)
 	})
 
 	if err != nil {
@@ -571,23 +561,63 @@
 	return parts[0], ""
 }
 
-func bootstrapSyncGroup(tx store.StoreReadWriter, prefixes []string) error {
-	_ = tx.(store.Transaction)
+// bootstrapSyncGroup inserts into the transaction log a SyncGroup operation and
+// a set of Snapshot operations to notify the sync watcher about the SyncGroup
+// prefixes to start accepting and the initial state of existing store keys that
+// match these prefixes (both data and permission keys).
+// TODO(rdaoud): this operation scans the managed keys of the database and can
+// be time consuming.  Consider doing it asynchronously and letting the server
+// reply to the client earlier.  However it must happen within the scope of this
+// transaction (and its snapshot view).
+func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.StoreReadWriter, prefixes []string) error {
+	if len(prefixes) == 0 {
+		return verror.New(verror.ErrInternal, ctx, "no prefixes specified")
+	}
 
-	for _, p := range prefixes {
-		table, row := splitPrefix(p)
-		it := tx.Scan(util.ScanRangeArgs(table, row, ""))
-		key, value := []byte{}, []byte{}
-		for it.Advance() {
-			key, value = it.Key(key), it.Value(value)
+	// Get the store options to retrieve the list of managed key prefixes.
+	opts, err := watchable.GetOptions(sd.db.St())
+	if err != nil {
+		return err
+	}
+	if len(opts.ManagedPrefixes) == 0 {
+		return verror.New(verror.ErrInternal, ctx, "store has no managed prefixes")
+	}
 
-			// TODO(hpucha): Ensure prefix ACLs show up in the scan
-			// stream.
+	// Notify the watcher of the SyncGroup prefixes to start accepting.
+	if err := watchable.AddSyncGroupOp(ctx, tx, prefixes, false); err != nil {
+		return err
+	}
 
-			// TODO(hpucha): Process this object.
-		}
-		if err := it.Err(); err != nil {
-			return err
+	// Loop over the store managed key prefixes (e.g. data and permissions).
+	// For each one, scan the ranges of the given SyncGroup prefixes.  For
+	// each matching key, insert a snapshot operation in the log.  Scanning
+	// is done over the version entries to retrieve the matching keys and
+	// their version numbers (the key values).  Remove the version prefix
+	// from the key used in the snapshot operation.
+	// TODO(rdaoud): for SyncGroup prefixes, there should be a separation
+	// between their representation at the client (a list of (db, prefix)
+	// tuples) and internally as strings that match the store's key format.
+	for _, mp := range opts.ManagedPrefixes {
+		for _, p := range prefixes {
+			var k, v []byte
+			start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.VersionPrefix, mp), p)
+			stream := tx.Scan(start, limit)
+			for stream.Advance() {
+				k, v = stream.Key(k), stream.Value(v)
+				parts := util.SplitKeyParts(string(k))
+				if len(parts) < 2 {
+					vlog.Fatalf("bootstrapSyncGroup: invalid version key %s", string(k))
+
+				}
+				key := []byte(util.JoinKeyParts(parts[1:]...))
+				if err := watchable.AddSyncSnapshotOp(ctx, tx, key, v); err != nil {
+					return err
+				}
+
+			}
+			if err := stream.Err(); err != nil {
+				return err
+			}
 		}
 	}
 	return nil
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index f21dfa6..5e14292 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -111,28 +111,30 @@
 		return
 	}
 
-	// First handle SyncGroup prefix changes within the batch by updating
-	// the watch prefixes.  It is as if these log entries were at the start
-	// of the batch.  Exclude them from the actual data batch.
-	dataLogs := make([]*watchable.LogEntry, 0, len(logs))
-	for _, entry := range logs {
-		if processSyncGroupLogRecord(appName, dbName, entry) == false {
-			dataLogs = append(dataLogs, entry)
-		}
+	// If the first log entry is a SyncGroup prefix operation, then this is
+	// a SyncGroup snapshot and not an application batch.  In this case,
+	// handle the SyncGroup prefix changes by updating the watch prefixes
+	// and exclude the first entry from the batch.  Also inform the batch
+	// processing below to not assign it a batch ID in the DAG.
+	appBatch := true
+	if processSyncGroupLogRecord(appName, dbName, logs[0]) {
+		appBatch = false
+		logs = logs[1:]
 	}
 
 	// Filter out the log entries for keys not part of any SyncGroup.
 	// Ignore as well log entries made by sync (echo suppression).
-	totalCount := uint64(len(dataLogs))
+	totalCount := uint64(len(logs))
 	appdb := appDbName(appName, dbName)
-	logs = make([]*watchable.LogEntry, 0, len(dataLogs))
 
-	for _, entry := range dataLogs {
+	i := 0
+	for _, entry := range logs {
 		if !entry.FromSync && syncable(appdb, entry) {
-			logs = append(logs, entry)
+			logs[i] = entry
+			i++
 		}
 	}
-	dataLogs = nil
+	logs = logs[:i]
 
 	// Transactional processing of the batch: convert these syncable log
 	// records to a batch of sync log records, filling their parent versions
@@ -145,7 +147,7 @@
 			}
 		}
 
-		if err := s.processBatch(ctx, appName, dbName, batch, totalCount, tx); err != nil {
+		if err := s.processBatch(ctx, appName, dbName, batch, appBatch, totalCount, tx); err != nil {
 			return err
 		}
 		return setResMark(ctx, tx, resMark)
@@ -159,7 +161,7 @@
 
 // processBatch applies a single batch of changes (object mutations) received
 // from watching a particular Database.
-func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, totalCount uint64, tx store.StoreReadWriter) error {
+func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, appBatch bool, totalCount uint64, tx store.StoreReadWriter) error {
 	_ = tx.(store.Transaction)
 
 	count := uint64(len(batch))
@@ -167,9 +169,9 @@
 		return nil
 	}
 
-	// If the batch has more than one mutation, start a batch for it.
+	// If an application batch has more than one mutation, start a batch for it.
 	batchId := NoBatchId
-	if totalCount > 1 {
+	if appBatch && totalCount > 1 {
 		batchId = s.startBatch(ctx, tx, batchId)
 		if batchId == NoBatchId {
 			return verror.New(verror.ErrInternal, ctx, "failed to generate batch ID")
@@ -200,7 +202,7 @@
 
 	// End the batch if any.
 	if batchId != NoBatchId {
-		if err := s.endBatch(ctx, tx, batchId, count); err != nil {
+		if err := s.endBatch(ctx, tx, batchId, totalCount); err != nil {
 			return err
 		}
 	}
@@ -340,7 +342,12 @@
 		rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
 
 	case watchable.OpSyncSnapshot:
-		rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
+		// Create records for object versions not already in the DAG.
+		// Duplicates can appear here in cases of nested SyncGroups or
+		// peer SyncGroups.
+		if !hasNode(ctx, tx, string(op.Value.Key), string(op.Value.Version)) {
+			rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
+		}
 
 	case watchable.OpDelete:
 		rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
@@ -363,6 +370,7 @@
 
 	rec := localLogRec{}
 	oid := string(key)
+
 	rec.Metadata.ObjId = oid
 	rec.Metadata.CurVers = string(version)
 	rec.Metadata.Delete = deleted
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
index bf4fb95..a2107d5 100644
--- a/x/ref/services/syncbase/vsync/watcher_test.go
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -15,6 +15,7 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
+	"v.io/v23/vom"
 	_ "v.io/x/ref/runtime/factories/generic"
 )
 
@@ -201,14 +202,14 @@
 	if err != nil {
 		t.Errorf("getNode() did not find foo: %v", err)
 	}
-	if node.Level != 0 || node.Parents != nil || node.Logrec == "" || node.BatchId == NoBatchId {
+	if node.Level != 0 || node.Parents != nil || node.Logrec == "" || node.BatchId != NoBatchId {
 		t.Errorf("invalid DAG node for foo: %v", node)
 	}
 	node2, err := getNode(nil, st, fooxyzKey, "444")
 	if err != nil {
 		t.Errorf("getNode() did not find fooxyz: %v", err)
 	}
-	if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != node.BatchId {
+	if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != NoBatchId {
 		t.Errorf("invalid DAG node for fooxyz: %v", node2)
 	}
 	if hasNode(nil, st, barKey, "222") {
@@ -284,6 +285,33 @@
 	if hasNode(nil, st, barKey, "007") {
 		t.Error("hasNode() found DAG entries for non-syncable logs")
 	}
+
+	// Scan the batch records and verify that there is only 1 DAG batch
+	// stored, with a total count of 3 and a map of 2 syncable entries.
+	// This is because the 1st batch, while containing syncable keys, is a
+	// SyncGroup snapshot that does not get assigned a batch ID.  The 2nd
+	// batch is an application batch with 3 keys of which 2 are syncable.
+	// The 3rd batch is also a SyncGroup snapshot.
+	count := 0
+	start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.SyncPrefix, "dag", "b"), "")
+	stream := st.Scan(start, limit)
+	for stream.Advance() {
+		count++
+		key := string(stream.Key(nil))
+		var info batchInfo
+		if err := vom.Decode(stream.Value(nil), &info); err != nil {
+			t.Errorf("cannot decode batch %s: %v", key, err)
+		}
+		if info.Count != 3 {
+			t.Errorf("wrong total count in batch %s: got %d instead of 3", key, info.Count)
+		}
+		if n := len(info.Objects); n != 2 {
+			t.Errorf("wrong object count in batch %s: got %d instead of 2", key, n)
+		}
+	}
+	if count != 1 {
+		t.Errorf("wrong count of batches: got %d instead of 2", count)
+	}
 }
 
 // TestGetWatchLogBatch tests fetching a batch of log records.