syncbase/vsync: bootstrap sync metadata for SyncGroup
Add the bootstrapping of sync metadata when a SyncGroup is created or
joined. It notifies sync, through the watcher, of pre-existing database
entries that match the SyncGroup prefixes. Such entries must be added
to the sync metadata (DAG & logs) at their current versions.
Change-Id: I37b4277a370fb31c520bbb858664e0ad078cce74
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index 0e3bb0f..82fc8d4 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -144,6 +144,14 @@
return newSnapshot(st)
}
+// GetOptions returns the options configured on a watchable.Store.
+// TODO(rdaoud): expose watchable store through an interface and change this
+// function to be a method on the store.
+func GetOptions(st store.Store) (*Options, error) {
+ wst := st.(*wstore)
+ return wst.opts, nil
+}
+
////////////////////////////////////////
// Internal helpers
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 9890b6a..3c4a054 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -161,7 +161,7 @@
// operations (create, join, leave, destroy) to notify the sync watcher of the
// change at its proper position in the timeline (the transaction commit).
// Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.StoreReadWriter, prefixes []string, remove bool) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
defer wtx.mu.Unlock()
@@ -178,7 +178,7 @@
// current keys and their versions to use when initializing the sync metadata
// at the point in the timeline when these keys become syncable (at commit).
// Note: this is an internal function used by sync, not part of the interface.
-func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
+func AddSyncSnapshotOp(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
defer wtx.mu.Unlock()
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 4b6512c..31b1a3d 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -404,12 +404,7 @@
// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
// Take a snapshot of the data to bootstrap the SyncGroup.
- if err := bootstrapSyncGroup(tx, spec.Prefixes); err != nil {
- return err
- }
-
- tx1 := tx.(store.Transaction)
- return watchable.AddSyncGroupOp(ctx, tx1, spec.Prefixes, false)
+ return sd.bootstrapSyncGroup(ctx, tx, spec.Prefixes)
})
if err != nil {
@@ -499,12 +494,7 @@
}
// Take a snapshot of the data to bootstrap the SyncGroup.
- if err := bootstrapSyncGroup(tx, sg.Spec.Prefixes); err != nil {
- return err
- }
-
- tx1 := tx.(store.Transaction)
- return watchable.AddSyncGroupOp(ctx, tx1, sg.Spec.Prefixes, false)
+ return sd.bootstrapSyncGroup(ctx, tx, sg.Spec.Prefixes)
})
if err != nil {
@@ -571,23 +561,63 @@
return parts[0], ""
}
-func bootstrapSyncGroup(tx store.StoreReadWriter, prefixes []string) error {
- _ = tx.(store.Transaction)
+// bootstrapSyncGroup inserts into the transaction log a SyncGroup operation and
+// a set of Snapshot operations to notify the sync watcher about the SyncGroup
+// prefixes to start accepting and the initial state of existing store keys that
+// match these prefixes (both data and permission keys).
+// TODO(rdaoud): this operation scans the managed keys of the database and can
+// be time consuming. Consider doing it asynchronously and letting the server
+// reply to the client earlier. However it must happen within the scope of this
+// transaction (and its snapshot view).
+func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.StoreReadWriter, prefixes []string) error {
+ if len(prefixes) == 0 {
+ return verror.New(verror.ErrInternal, ctx, "no prefixes specified")
+ }
- for _, p := range prefixes {
- table, row := splitPrefix(p)
- it := tx.Scan(util.ScanRangeArgs(table, row, ""))
- key, value := []byte{}, []byte{}
- for it.Advance() {
- key, value = it.Key(key), it.Value(value)
+ // Get the store options to retrieve the list of managed key prefixes.
+ opts, err := watchable.GetOptions(sd.db.St())
+ if err != nil {
+ return err
+ }
+ if len(opts.ManagedPrefixes) == 0 {
+ return verror.New(verror.ErrInternal, ctx, "store has no managed prefixes")
+ }
- // TODO(hpucha): Ensure prefix ACLs show up in the scan
- // stream.
+ // Notify the watcher of the SyncGroup prefixes to start accepting.
+ if err := watchable.AddSyncGroupOp(ctx, tx, prefixes, false); err != nil {
+ return err
+ }
- // TODO(hpucha): Process this object.
- }
- if err := it.Err(); err != nil {
- return err
+ // Loop over the store managed key prefixes (e.g. data and permissions).
+ // For each one, scan the ranges of the given SyncGroup prefixes. For
+ // each matching key, insert a snapshot operation in the log. Scanning
+ // is done over the version entries to retrieve the matching keys and
+ // their version numbers (the key values). Remove the version prefix
+ // from the key used in the snapshot operation.
+ // TODO(rdaoud): for SyncGroup prefixes, there should be a separation
+ // between their representation at the client (a list of (db, prefix)
+ // tuples) and internally as strings that match the store's key format.
+ for _, mp := range opts.ManagedPrefixes {
+ for _, p := range prefixes {
+ var k, v []byte
+ start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.VersionPrefix, mp), p)
+ stream := tx.Scan(start, limit)
+ for stream.Advance() {
+ k, v = stream.Key(k), stream.Value(v)
+ parts := util.SplitKeyParts(string(k))
+ if len(parts) < 2 {
+ vlog.Fatalf("bootstrapSyncGroup: invalid version key %s", string(k))
+
+ }
+ key := []byte(util.JoinKeyParts(parts[1:]...))
+ if err := watchable.AddSyncSnapshotOp(ctx, tx, key, v); err != nil {
+ return err
+ }
+
+ }
+ if err := stream.Err(); err != nil {
+ return err
+ }
}
}
return nil
diff --git a/services/syncbase/vsync/watcher.go b/services/syncbase/vsync/watcher.go
index f21dfa6..5e14292 100644
--- a/services/syncbase/vsync/watcher.go
+++ b/services/syncbase/vsync/watcher.go
@@ -111,28 +111,30 @@
return
}
- // First handle SyncGroup prefix changes within the batch by updating
- // the watch prefixes. It is as if these log entries were at the start
- // of the batch. Exclude them from the actual data batch.
- dataLogs := make([]*watchable.LogEntry, 0, len(logs))
- for _, entry := range logs {
- if processSyncGroupLogRecord(appName, dbName, entry) == false {
- dataLogs = append(dataLogs, entry)
- }
+ // If the first log entry is a SyncGroup prefix operation, then this is
+ // a SyncGroup snapshot and not an application batch. In this case,
+ // handle the SyncGroup prefix changes by updating the watch prefixes
+ // and exclude the first entry from the batch. Also inform the batch
+ // processing below to not assign it a batch ID in the DAG.
+ appBatch := true
+ if processSyncGroupLogRecord(appName, dbName, logs[0]) {
+ appBatch = false
+ logs = logs[1:]
}
// Filter out the log entries for keys not part of any SyncGroup.
// Ignore as well log entries made by sync (echo suppression).
- totalCount := uint64(len(dataLogs))
+ totalCount := uint64(len(logs))
appdb := appDbName(appName, dbName)
- logs = make([]*watchable.LogEntry, 0, len(dataLogs))
- for _, entry := range dataLogs {
+ i := 0
+ for _, entry := range logs {
if !entry.FromSync && syncable(appdb, entry) {
- logs = append(logs, entry)
+ logs[i] = entry
+ i++
}
}
- dataLogs = nil
+ logs = logs[:i]
// Transactional processing of the batch: convert these syncable log
// records to a batch of sync log records, filling their parent versions
@@ -145,7 +147,7 @@
}
}
- if err := s.processBatch(ctx, appName, dbName, batch, totalCount, tx); err != nil {
+ if err := s.processBatch(ctx, appName, dbName, batch, appBatch, totalCount, tx); err != nil {
return err
}
return setResMark(ctx, tx, resMark)
@@ -159,7 +161,7 @@
// processBatch applies a single batch of changes (object mutations) received
// from watching a particular Database.
-func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, totalCount uint64, tx store.StoreReadWriter) error {
+func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, appBatch bool, totalCount uint64, tx store.StoreReadWriter) error {
_ = tx.(store.Transaction)
count := uint64(len(batch))
@@ -167,9 +169,9 @@
return nil
}
- // If the batch has more than one mutation, start a batch for it.
+ // If an application batch has more than one mutation, start a batch for it.
batchId := NoBatchId
- if totalCount > 1 {
+ if appBatch && totalCount > 1 {
batchId = s.startBatch(ctx, tx, batchId)
if batchId == NoBatchId {
return verror.New(verror.ErrInternal, ctx, "failed to generate batch ID")
@@ -200,7 +202,7 @@
// End the batch if any.
if batchId != NoBatchId {
- if err := s.endBatch(ctx, tx, batchId, count); err != nil {
+ if err := s.endBatch(ctx, tx, batchId, totalCount); err != nil {
return err
}
}
@@ -340,7 +342,12 @@
rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
case watchable.OpSyncSnapshot:
- rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
+ // Create records for object versions not already in the DAG.
+ // Duplicates can appear here in cases of nested SyncGroups or
+ // peer SyncGroups.
+ if !hasNode(ctx, tx, string(op.Value.Key), string(op.Value.Version)) {
+ rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
+ }
case watchable.OpDelete:
rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
@@ -363,6 +370,7 @@
rec := localLogRec{}
oid := string(key)
+
rec.Metadata.ObjId = oid
rec.Metadata.CurVers = string(version)
rec.Metadata.Delete = deleted
diff --git a/services/syncbase/vsync/watcher_test.go b/services/syncbase/vsync/watcher_test.go
index bf4fb95..a2107d5 100644
--- a/services/syncbase/vsync/watcher_test.go
+++ b/services/syncbase/vsync/watcher_test.go
@@ -15,6 +15,7 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
+ "v.io/v23/vom"
_ "v.io/x/ref/runtime/factories/generic"
)
@@ -201,14 +202,14 @@
if err != nil {
t.Errorf("getNode() did not find foo: %v", err)
}
- if node.Level != 0 || node.Parents != nil || node.Logrec == "" || node.BatchId == NoBatchId {
+ if node.Level != 0 || node.Parents != nil || node.Logrec == "" || node.BatchId != NoBatchId {
t.Errorf("invalid DAG node for foo: %v", node)
}
node2, err := getNode(nil, st, fooxyzKey, "444")
if err != nil {
t.Errorf("getNode() did not find fooxyz: %v", err)
}
- if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != node.BatchId {
+ if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != NoBatchId {
t.Errorf("invalid DAG node for fooxyz: %v", node2)
}
if hasNode(nil, st, barKey, "222") {
@@ -284,6 +285,33 @@
if hasNode(nil, st, barKey, "007") {
t.Error("hasNode() found DAG entries for non-syncable logs")
}
+
+ // Scan the batch records and verify that there is only 1 DAG batch
+ // stored, with a total count of 3 and a map of 2 syncable entries.
+ // This is because the 1st batch, while containing syncable keys, is a
+ // SyncGroup snapshot that does not get assigned a batch ID. The 2nd
+ // batch is an application batch with 3 keys of which 2 are syncable.
+ // The 3rd batch is also a SyncGroup snapshot.
+ count := 0
+ start, limit := util.ScanPrefixArgs(util.JoinKeyParts(util.SyncPrefix, "dag", "b"), "")
+ stream := st.Scan(start, limit)
+ for stream.Advance() {
+ count++
+ key := string(stream.Key(nil))
+ var info batchInfo
+ if err := vom.Decode(stream.Value(nil), &info); err != nil {
+ t.Errorf("cannot decode batch %s: %v", key, err)
+ }
+ if info.Count != 3 {
+ t.Errorf("wrong total count in batch %s: got %d instead of 3", key, info.Count)
+ }
+ if n := len(info.Objects); n != 2 {
+ t.Errorf("wrong object count in batch %s: got %d instead of 2", key, n)
+ }
+ }
+ if count != 1 {
+ t.Errorf("wrong count of batches: got %d instead of 2", count)
+ }
}
// TestGetWatchLogBatch tests fetching a batch of log records.