syncbase/vsync: add sync watcher
Add the sync watcher thread to read batches of store log entries
and apply them to the sync metadata. Provide a hook for the
SyncGroup create/join/leave methods to inform the watcher of
changes to the key prefixes to sync.
Change-Id: Ic30f02c5506d01a09dbddde245b0b149a62c72e4
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 3cbff62..ccb291d 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -72,14 +72,16 @@
if tx.err != nil {
return convertError(tx.err)
}
- var err error
if !tx.st.managesKey(key) {
- err = tx.itx.Put(key, value)
- } else {
- err = putVersioned(tx.itx, key, value)
- tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+ return tx.itx.Put(key, value)
}
- return err
+
+ version, err := putVersioned(tx.itx, key, value)
+ if err != nil {
+ return err
+ }
+ tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Version: version}})
+ return nil
}
// Delete implements the store.StoreWriter interface.
@@ -118,15 +120,15 @@
}
// Write LogEntry records.
timestamp := tx.st.clock.Now().UnixNano()
+ // TODO(rdaoud): switch to using a single counter for log entries
+ // instead of a (sequence, index) combo.
keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
for txSeq, op := range tx.ops {
key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
value := &LogEntry{
Op: op,
CommitTimestamp: timestamp,
- // TODO(sadovsky): This information is also captured in LogEntry keys.
- // Optimize to avoid redundancy.
- Continued: txSeq < len(tx.ops)-1,
+ Continued: txSeq < len(tx.ops)-1,
}
if err := util.PutObject(tx.itx, key, value); err != nil {
return err
@@ -150,6 +152,22 @@
return tx.itx.Abort()
}
+// AddSyncGroupOp injects a SyncGroup operation notification in the log entries
+// that the transaction writes when it is committed. It allows the SyncGroup
+// operations (create, join, leave, destroy) to notify the sync watcher of the
+// change at its proper position in the timeline (the transaction commit).
+// Note: this is an internal function used by sync, not part of the interface.
+func AddSyncGroupOp(tx store.Transaction, prefixes []string, remove bool) error {
+ wtx := tx.(*transaction)
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+ wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: prefixes, Remove: remove}})
+ return nil
+}
+
// Exported as a helper function for testing purposes
func getLogEntryKeyPrefix(seq uint64) string {
// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
diff --git a/x/ref/services/syncbase/server/watchable/types.vdl b/x/ref/services/syncbase/server/watchable/types.vdl
index 07d1ce1..6f07a1c 100644
--- a/x/ref/services/syncbase/server/watchable/types.vdl
+++ b/x/ref/services/syncbase/server/watchable/types.vdl
@@ -15,10 +15,12 @@
Limit []byte
}
-// PutOp represents a store put operation.
+// PutOp represents a store put operation. The new version is written instead
+// of the value to avoid duplicating the user data in the store. The version
+// is used to access the user data of that specific mutation.
type PutOp struct {
- Key []byte
- Value []byte
+ Key []byte
+ Version []byte
}
// DeleteOp represents a store delete operation.
@@ -26,12 +28,21 @@
Key []byte
}
+// SyncGroupOp represents a change in SyncGroup tracking, adding or removing
+// key prefixes to sync. SyncGroup prefixes cannot be changed, this is used
+// to track changes due to SyncGroup create/join/leave/destroy.
+type SyncGroupOp struct {
+ Prefixes []string
+ Remove bool
+}
+
// Op represents a store operation.
type Op union {
- Get GetOp
- Scan ScanOp
- Put PutOp
- Delete DeleteOp
+ Get GetOp
+ Scan ScanOp
+ Put PutOp
+ Delete DeleteOp
+ SyncGroup SyncGroupOp
}
// LogEntry represents a single store operation. This operation may have been
diff --git a/x/ref/services/syncbase/server/watchable/types.vdl.go b/x/ref/services/syncbase/server/watchable/types.vdl.go
index e94587a..a4f0619 100644
--- a/x/ref/services/syncbase/server/watchable/types.vdl.go
+++ b/x/ref/services/syncbase/server/watchable/types.vdl.go
@@ -33,10 +33,12 @@
}) {
}
-// PutOp represents a store put operation.
+// PutOp represents a store put operation. The new version is written instead
+// of the value to avoid duplicating the user data in the store. The version
+// is used to access the user data of that specific mutation.
type PutOp struct {
- Key []byte
- Value []byte
+ Key []byte
+ Version []byte
}
func (PutOp) __VDLReflect(struct {
@@ -54,6 +56,19 @@
}) {
}
+// SyncGroupOp represents a change in SyncGroup tracking, adding or removing
+// key prefixes to sync. SyncGroup prefixes cannot be changed, this is used
+// to track changes due to SyncGroup create/join/leave/destroy.
+type SyncGroupOp struct {
+ Prefixes []string
+ Remove bool
+}
+
+func (SyncGroupOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.SyncGroupOp"`
+}) {
+}
+
type (
// Op represents any single field of the Op union type.
//
@@ -76,15 +91,18 @@
OpPut struct{ Value PutOp }
// OpDelete represents field Delete of the Op union type.
OpDelete struct{ Value DeleteOp }
+ // OpSyncGroup represents field SyncGroup of the Op union type.
+ OpSyncGroup struct{ Value SyncGroupOp }
// __OpReflect describes the Op union type.
__OpReflect struct {
Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
Type Op
Union struct {
- Get OpGet
- Scan OpScan
- Put OpPut
- Delete OpDelete
+ Get OpGet
+ Scan OpScan
+ Put OpPut
+ Delete OpDelete
+ SyncGroup OpSyncGroup
}
}
)
@@ -109,6 +127,11 @@
func (x OpDelete) Name() string { return "Delete" }
func (x OpDelete) __VDLReflect(__OpReflect) {}
+func (x OpSyncGroup) Index() int { return 4 }
+func (x OpSyncGroup) Interface() interface{} { return x.Value }
+func (x OpSyncGroup) Name() string { return "SyncGroup" }
+func (x OpSyncGroup) __VDLReflect(__OpReflect) {}
+
// LogEntry represents a single store operation. This operation may have been
// part of a transaction, as signified by the Continued boolean. Read-only
// operations (and read-only transactions) are not logged.
@@ -132,6 +155,7 @@
vdl.Register((*ScanOp)(nil))
vdl.Register((*PutOp)(nil))
vdl.Register((*DeleteOp)(nil))
+ vdl.Register((*SyncGroupOp)(nil))
vdl.Register((*Op)(nil))
vdl.Register((*LogEntry)(nil))
}
diff --git a/x/ref/services/syncbase/server/watchable/util.go b/x/ref/services/syncbase/server/watchable/util.go
index 3dc3e78..036d326 100644
--- a/x/ref/services/syncbase/server/watchable/util.go
+++ b/x/ref/services/syncbase/server/watchable/util.go
@@ -7,6 +7,10 @@
// TODO(sadovsky): Avoid copying back and forth between []byte's and strings.
// We should probably convert incoming strings to []byte's as early as possible,
// and deal exclusively in []byte's internally.
+// TODO(rdaoud): I propose we standardize on key and version being strings and
+// the value being []byte within Syncbase. We define invalid characters in the
+// key space (and reserve "$" and ":"). The lower storage engine layers are
+// free to map that to what they need internally ([]byte or string).
import (
"fmt"
@@ -25,6 +29,19 @@
rngLock sync.Mutex
)
+// NewVersion returns a new version for a store entry mutation.
+func NewVersion() []byte {
+ // TODO(rdaoud): revisit the number of bits: should we use 128 bits?
+ // Note: the version has to be unique per object key, not on its own.
+ // TODO(rdaoud): move sync's rand64() to a general Syncbase spot and
+ // reuse it here.
+ rngLock.Lock()
+ num := rng.Int63()
+ rngLock.Unlock()
+
+ return []byte(fmt.Sprintf("%x", num))
+}
+
func makeVersionKey(key []byte) []byte {
return []byte(join(util.VersionPrefix, string(key)))
}
@@ -109,20 +126,19 @@
if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
return err
}
- wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Value: version}})
+ wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
return nil
}
-func putVersioned(tx store.Transaction, key, value []byte) error {
- rngLock.Lock()
- num := rng.Int63()
- rngLock.Unlock()
-
- version := []byte(fmt.Sprintf("%x", num))
+func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
+ version := NewVersion()
if err := tx.Put(makeVersionKey(key), version); err != nil {
- return err
+ return nil, err
}
- return tx.Put(makeAtVersionKey(key, version), value)
+ if err := tx.Put(makeAtVersionKey(key, version), value); err != nil {
+ return nil, err
+ }
+ return version, nil
}
func deleteVersioned(tx store.Transaction, key []byte) error {
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 5a432c7..72e8d91 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -726,8 +726,9 @@
}
var node dagNode
- if err := util.GetObject(st, nodeKey(oid, version), &node); err != nil {
- return nil, verror.New(verror.ErrInternal, ctx, err)
+ key := nodeKey(oid, version)
+ if err := util.GetObject(st, key, &node); err != nil {
+ return nil, translateError(ctx, err, key)
}
return &node, nil
}
@@ -777,8 +778,9 @@
// getHead retrieves the DAG object head.
func getHead(ctx *context.T, st store.StoreReader, oid string) (string, error) {
var version string
- if err := util.GetObject(st, headKey(oid), &version); err != nil {
- return NoVersion, verror.New(verror.ErrInternal, ctx, err)
+ key := headKey(oid)
+ if err := util.GetObject(st, key, &version); err != nil {
+ return NoVersion, translateError(ctx, err, key)
}
return version, nil
}
@@ -819,8 +821,9 @@
}
var info batchInfo
- if err := util.GetObject(st, batchKey(btid), &info); err != nil {
- return nil, verror.New(verror.ErrInternal, ctx, err)
+ key := batchKey(btid)
+ if err := util.GetObject(st, key, &info); err != nil {
+ return nil, translateError(ctx, err, key)
}
return &info, nil
}
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index ca2e3fc..eac07f7 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -129,7 +129,7 @@
s.pending.Add(2)
// Start watcher thread to watch for updates to local store.
- go s.watchStore()
+ go s.watchStore(ctx)
// Start initiator thread to periodically get deltas from peers.
go s.syncer(ctx)
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index f025327..101d08c 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -55,10 +55,10 @@
}
// initSync initializes the sync module during startup. It scans all the
-// databases across all apps to a) initialize the in-memory sync state of a
-// Database consisting of the current generation number, log position and
-// generation vector; b) initialize the prefixes that are currently being
-// synced.
+// databases across all apps to initialize the following:
+// a) in-memory sync state of a Database consisting of the current generation
+// number, log position and generation vector.
+// b) watcher map of prefixes currently being synced.
//
// TODO(hpucha): This is incomplete. Flesh this out further.
func (s *syncService) initSync(ctx *context.T) error {
@@ -68,40 +68,49 @@
var errFinal error
s.syncState = make(map[string]*dbSyncStateInMem)
- // TODO(hpucha): Temporary hack.
- return errFinal
-
- /*s.forEachDatabaseStore(ctx, func(st store.Store) bool {
- sn := st.NewSnapshot()
- defer sn.Close()
-
- ds, err := getDbSyncState(ctx, sn)
- if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
- errFinal = err
+ s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
+ // Scan the SyncGroups, skipping those not yet being watched.
+ forEachSyncGroup(st, func(sg *interfaces.SyncGroup) bool {
+ // TODO(rdaoud): only use SyncGroups that have been
+ // marked as "watchable" by the sync watcher thread.
+ // This is to handle the case of a SyncGroup being
+ // created but Syncbase restarting before the watcher
+ // processed the SyncGroupOp entry in the watch queue.
+ // It should not be syncing that SyncGroup's data after
+ // restart, but wait until the watcher processes the
+ // entry as would have happened without a restart.
+ for _, prefix := range sg.Spec.Prefixes {
+ incrWatchPrefix(appName, dbName, prefix)
+ }
return false
+ })
+
+ if false {
+ // Fetch the sync state.
+ ds, err := getDbSyncState(ctx, st)
+ if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
+ errFinal = err
+ return false
+ }
+ var scanStart, scanLimit []byte
+ // Figure out what to scan among local log records.
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "")
+ } else {
+ scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "")
+ }
+ var maxpos uint64
+ var dbName string
+ // Scan local log records to find the most recent one.
+ st.Scan(scanStart, scanLimit)
+ // Scan remote log records using the persisted GenVector.
+ s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1}
}
- var scanStart, scanLimit []byte
- // Figure out what to scan among local log records.
- if verror.ErrorID(err) == verror.ErrNoExist.ID {
- scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "")
- } else {
- scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "")
- }
-
- var maxpos uint64
- var dbName string
- // Scan local log records to find the most recent one.
- sn.Scan(scanStart, scanLimit)
-
- // Scan remote log records using the persisted GenVector.
-
- s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1}
-
return false
})
- return errFinal*/
+ return errFinal
}
// reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 823aeb1..79cc9bf 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -209,7 +209,6 @@
// Create a new aggregate view of SyncGroup members across all app databases.
newMembers := make(map[string]*memberInfo)
- scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
// For each database, fetch its SyncGroup data entries by scanning their
@@ -218,14 +217,7 @@
defer sn.Close()
name := appDbName(appName, dbName)
- stream := sn.Scan(scanStart, scanLimit)
- for stream.Advance() {
- var sg interfaces.SyncGroup
- if vom.Decode(stream.Value(nil), &sg) != nil {
- vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
- continue
- }
-
+ forEachSyncGroup(sn, func(sg *interfaces.SyncGroup) bool {
// Add all members of this SyncGroup to the membership view.
// A member's info is different across SyncGroups, so gather all of them.
for member, info := range sg.Joiners {
@@ -237,7 +229,8 @@
}
newMembers[member].db2sg[name][sg.Id] = info
}
- }
+ return false
+ })
return false
})
@@ -245,6 +238,30 @@
view.expiration = time.Now().Add(memberViewTTL)
}
+// forEachSyncGroup iterates over all SyncGroups in the Database and invokes
+// the callback function on each one. The callback returns a "done" flag to
+// make forEachSyncGroup() stop the iteration earlier; otherwise the function
+// loops across all SyncGroups in the Database.
+func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
+ scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
+ stream := st.Scan(scanStart, scanLimit)
+ for stream.Advance() {
+ var sg interfaces.SyncGroup
+ if vom.Decode(stream.Value(nil), &sg) != nil {
+ vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
+ continue
+ }
+
+ if callback(&sg) {
+ break // done, early exit
+ }
+ }
+
+ if err := stream.Err(); err != nil {
+ vlog.Errorf("forEachSyncGroup: scan stream error: %v", err)
+ }
+}
+
// getMembers returns all SyncGroup members and the count of SyncGroups each one
// joined.
func (s *syncService) getMembers(ctx *context.T) map[string]uint32 {
diff --git a/x/ref/services/syncbase/vsync/util.go b/x/ref/services/syncbase/vsync/util.go
index 62edcd2..65d47eb 100644
--- a/x/ref/services/syncbase/vsync/util.go
+++ b/x/ref/services/syncbase/vsync/util.go
@@ -7,6 +7,8 @@
// Sync utility functions
import (
+ "time"
+
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
@@ -14,6 +16,10 @@
"v.io/x/lib/vlog"
)
+const (
+ nanoPerSec = int64(1000000000)
+)
+
// forEachDatabaseStore iterates over all Databases in all Apps within the
// service and invokes the callback function on each database. The callback
// returns a "done" flag to make forEachDatabaseStore() stop the iteration
@@ -76,3 +82,11 @@
}
return verror.New(verror.ErrInternal, ctx, key, err)
}
+
+// unixNanoToTime converts a Unix timestamp in nanoseconds to a Time object.
+func unixNanoToTime(timestamp int64) time.Time {
+ if timestamp < 0 {
+ vlog.Fatalf("unixNanoToTime: invalid timestamp %d", timestamp)
+ }
+ return time.Unix(timestamp/nanoPerSec, timestamp%nanoPerSec)
+}
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 5ac1b52..bbd1ac8 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -4,30 +4,159 @@
package vsync
-import (
- "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
- "v.io/syncbase/x/ref/services/syncbase/store"
+// Syncbase Watcher is a goroutine that listens to local Database updates from
+// applications and modifies sync metadata (e.g. DAG and local log records).
+// The coupling between Syncbase storage and sync is loose, via asynchronous
+// listening by the Watcher, to unblock the application operations as soon as
+// possible, and offload the sync metadata update to the Watcher. When the
+// application mutates objects in a Database, additional entries are written
+// to a log queue, persisted in the same Database. This queue is read by the
+// sync Watcher to learn of the changes.
+import (
+ "strings"
+ "time"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
+ "v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/verror"
+ "v.io/v23/vom"
+ "v.io/x/lib/vlog"
)
-// When applications update objects in the local Store, the sync
-// watcher thread learns about them asynchronously via the "watch"
-// stream of object mutations. In turn, this sync watcher thread
-// updates the DAG and log records to track the object change
-// histories.
+var (
+ // watchPollInterval is the duration between consecutive watch polling
+ // events across all app databases. Every watch event loops across all
+ // app databases and fetches from each one at most one batch update
+ // (transaction) to process.
+ // TODO(rdaoud): add a channel between store and watch to get change
+ // notifications instead of using a polling solution.
+ watchPollInterval = 100 * time.Millisecond
-// watchStore processes updates obtained by watching the store.
-func (s *syncService) watchStore() {
+ // watchPrefixes is an in-memory cache of SyncGroup prefixes for each
+ // app database. It is filled at startup from persisted SyncGroup data
+ // and updated at runtime when SyncGroups are joined or left. It is
+ // not guarded by a mutex because only the watcher goroutine uses it
+ // beyond the startup phase (before any sync goroutines are started).
+ // The map keys are the appdb names (globally unique).
+ watchPrefixes = make(map[string]sgPrefixes)
+)
+
+// sgPrefixes tracks SyncGroup prefixes being synced in a database and their
+// counts.
+type sgPrefixes map[string]uint32
+
+// watchStore processes updates obtained by watching the store. This is the
+// sync watcher goroutine that learns about store updates asynchronously by
+// reading log records that track object mutation histories in each database.
+// For each batch mutation, the watcher updates the sync DAG and log records.
+// When an application makes a single non-transactional put, it is represented
+// as a batch of one log record. Thus the watcher only deals with batches.
+func (s *syncService) watchStore(ctx *context.T) {
defer s.pending.Done()
+
+ ticker := time.NewTicker(watchPollInterval)
+ defer ticker.Stop()
+
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ for {
+ select {
+ case <-s.closed:
+ vlog.VI(1).Info("watchStore: sync channel closed, stop watching and exit")
+ return
+
+ case <-ticker.C:
+ s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
+ s.processDatabase(ctx, appName, dbName, st)
+ return false
+ })
+ }
+ }
}
-// TODO(hpucha): This is a skeleton only to drive the change for log.
+// processDatabase fetches from the given database at most one new batch update
+// (transaction) and processes it. The one-batch limit prevents one database
+// from starving others. A batch is stored as a contiguous set of log records
+// ending with one record having the "continued" flag set to false.
+func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) {
+ resMark, err := getResMark(ctx, st)
+ if err != nil {
+ if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ vlog.Errorf("processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
+ return
+ }
+ resMark = ""
+ }
+
+ // Get a batch of watch log entries, if any, after this resume marker.
+ if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
+ s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
+ }
+}
+
+// processWatchLogBatch parses the given batch of watch log records, updates the
+// watchable SyncGroup prefixes, uses the prefixes to filter the batch to the
+// subset of syncable records, and transactionally applies these updates to the
+// sync metadata (DAG & log records) and updates the watch resume marker.
+func (s *syncService) processWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, logs []*watchable.LogEntry, resMark string) {
+ if len(logs) == 0 {
+ return
+ }
+
+ // First handle SyncGroup prefix changes within the batch by updating
+ // the watch prefixes. It is as if these log entries were at the start
+ // of the batch. Exclude them from the actual data batch.
+ dataLogs := make([]*watchable.LogEntry, 0, len(logs))
+ for _, entry := range logs {
+ if processSyncGroupLogRecord(appName, dbName, entry) == false {
+ dataLogs = append(dataLogs, entry)
+ }
+ }
+
+ // Filter out the log entries for keys not part of any SyncGroup.
+ // TODO(rdaoud): filter out entries made by sync (echo suppression).
+ totalCount := uint64(len(dataLogs))
+ appdb := appDbName(appName, dbName)
+ logs = make([]*watchable.LogEntry, 0, len(dataLogs))
+
+ for _, entry := range dataLogs {
+ if syncable(appdb, entry) {
+ logs = append(logs, entry)
+ }
+ }
+ dataLogs = nil
+
+ // Transactional processing of the batch: convert these syncable log
+ // records to a batch of sync log records, filling their parent versions
+ // from the DAG head nodes.
+ err := store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
+ batch := make([]*localLogRec, 0, len(logs))
+ for _, entry := range logs {
+ if rec := convertLogRecord(ctx, tx, entry); rec != nil {
+ batch = append(batch, rec)
+ }
+ }
+
+ if err := s.processBatch(ctx, appName, dbName, batch, totalCount, tx); err != nil {
+ return err
+ }
+ return setResMark(ctx, tx, resMark)
+ })
+
+ if err != nil {
+ // TODO(rdaoud): don't crash, quarantine this app database.
+ vlog.Fatalf("processDatabase: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
+ }
+}
// processBatch applies a single batch of changes (object mutations) received
// from watching a particular Database.
-func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, tx store.StoreReadWriter) error {
+func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, totalCount uint64, tx store.StoreReadWriter) error {
_ = tx.(store.Transaction)
count := uint64(len(batch))
@@ -37,7 +166,7 @@
// If the batch has more than one mutation, start a batch for it.
batchId := NoBatchId
- if count > 1 {
+ if totalCount > 1 {
batchId = s.startBatch(ctx, tx, batchId)
if batchId == NoBatchId {
return verror.New(verror.ErrInternal, ctx, "failed to generate batch ID")
@@ -47,7 +176,6 @@
gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count)
for _, rec := range batch {
-
// Update the log record. Portions of the record Metadata must
// already be filled.
rec.Metadata.Id = s.id
@@ -55,7 +183,7 @@
rec.Metadata.RecType = interfaces.NodeRec
rec.Metadata.BatchId = batchId
- rec.Metadata.BatchCount = count
+ rec.Metadata.BatchCount = totalCount
rec.Pos = pos
@@ -77,7 +205,7 @@
return nil
}
-// processLogRec processes a local log record by adding to the Database and
+// processLocalLogRec processes a local log record by adding to the Database and
// suitably updating the DAG metadata.
func (s *syncService) processLocalLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
// Insert the new log record into the log.
@@ -96,3 +224,216 @@
// Move the head.
return moveHead(ctx, tx, m.ObjId, m.CurVers)
}
+
+// incrWatchPrefix increments (or sets) a SyncGroup prefix for an app database
+// in the watch prefix cache.
+func incrWatchPrefix(appName, dbName, prefix string) {
+ name := appDbName(appName, dbName)
+ if pfxs := watchPrefixes[name]; pfxs != nil {
+ pfxs[prefix]++ // it auto-initializes a non-existent prefix
+ } else {
+ watchPrefixes[name] = sgPrefixes{prefix: 1}
+ }
+}
+
+// decrWatchPrefix decrements (or unsets) a SyncGroup prefix for an app database
+// in the watch prefix cache.
+func decrWatchPrefix(appName, dbName, prefix string) {
+ name := appDbName(appName, dbName)
+ if pfxs := watchPrefixes[name]; pfxs != nil {
+ if pfxs[prefix] > 1 {
+ pfxs[prefix]--
+ } else if len(pfxs) > 1 {
+ delete(pfxs, prefix)
+ } else {
+ delete(watchPrefixes, name)
+ }
+ }
+}
+
+// dbLogScanArgs determines the arguments needed to start a new scan from a
+// given resume marker (last log entry read). An empty resume marker is used
+// to begin the scan from the start of the log.
+func dbLogScanArgs(resMark string) ([]byte, []byte) {
+ start, limit := util.ScanPrefixArgs(util.LogPrefix, "")
+ if resMark != "" {
+ // To start just after the current resume marker, augment it by
+ // appending an extra byte at the end. Use byte value zero to
+ // use the lowest value possible. This works because resume
+ // markers have a fixed length and are sorted lexicographically.
+ // By creationg a fake longer resume marker that falls between
+ // real resume markers, the next scan will start right after
+ // where the previous one stopped without missing data.
+ start = append([]byte(resMark), 0)
+ }
+ return start, limit
+}
+
+// getWatchLogBatch returns a batch of watch log records (a transaction) from
+// the given database and the new resume marker at the end of the batch.
+func getWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, resMark string) ([]*watchable.LogEntry, string) {
+ scanStart, scanLimit := dbLogScanArgs(resMark)
+ endOfBatch := false
+ var newResmark string
+
+ // Use the store directly to scan these read-only log entries, no need
+ // to create a snapshot since they are never overwritten. Read and
+ // buffer a batch before processing it.
+ var logs []*watchable.LogEntry
+ stream := st.Scan(scanStart, scanLimit)
+ for stream.Advance() {
+ logKey := string(stream.Key(nil))
+ var logEnt watchable.LogEntry
+ if vom.Decode(stream.Value(nil), &logEnt) != nil {
+ vlog.Fatalf("getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v",
+ appName, dbName, logKey, stream.Value(nil))
+ }
+
+ logs = append(logs, &logEnt)
+
+ // Stop if this is the end of the batch.
+ if logEnt.Continued == false {
+ newResmark = logKey
+ endOfBatch = true
+ break
+ }
+ }
+
+ if err := stream.Err(); err != nil {
+ vlog.Errorf("getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err)
+ return nil, resMark
+ }
+ if !endOfBatch {
+ if len(logs) > 0 {
+ vlog.Fatalf("processDatabase: %s, %s: end of batch not found after %d entries",
+ appName, dbName, len(logs))
+ }
+ return nil, resMark
+ }
+ return logs, newResmark
+}
+
+// convertLogRecord converts a store log entry to a sync log record. It fills
+// the previous object version (parent) by fetching its current DAG head if it
+// has one. For a delete, it generates a new object version because the store
+// does not version a deletion.
+// TODO(rdaoud): change Syncbase to store and version a deleted object to
+// simplify the store-to-sync interaction. A deleted key would still have a
+// version and its value entry would encode the "deleted" flag, either in the
+// key or probably in a value wrapper that would contain other metadata.
+func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) *localLogRec {
+ _ = tx.(store.Transaction)
+
+ switch op := logEnt.Op.(type) {
+ case *watchable.OpGet:
+ // TODO(rdaoud): save read-set in sync.
+ return nil
+
+ case *watchable.OpScan:
+ // TODO(rdaoud): save scan-set in sync.
+ return nil
+
+ case *watchable.OpPut:
+ rec := localLogRec{}
+ oid := string(op.Value.Key)
+ rec.Metadata.ObjId = oid
+ rec.Metadata.CurVers = string(op.Value.Version)
+ if head, err := getHead(ctx, tx, oid); err == nil {
+ rec.Metadata.Parents = []string{head}
+ } else if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ vlog.Fatalf("cannot getHead to convert Put log record for %s: %v", oid, err)
+ }
+ rec.Metadata.UpdTime = unixNanoToTime(logEnt.CommitTimestamp)
+ return &rec
+
+ case *watchable.OpDelete:
+ rec := localLogRec{}
+ oid := string(op.Value.Key)
+ rec.Metadata.ObjId = oid
+ rec.Metadata.CurVers = string(watchable.NewVersion())
+ rec.Metadata.Delete = true
+ if head, err := getHead(ctx, tx, oid); err == nil {
+ rec.Metadata.Parents = []string{head}
+ } else {
+ vlog.Fatalf("cannot getHead to convert Delete log record for %s: %v", oid, err)
+ }
+ rec.Metadata.UpdTime = unixNanoToTime(logEnt.CommitTimestamp)
+ return &rec
+
+ case *watchable.OpSyncGroup:
+ vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt)
+ return nil
+
+ default:
+ vlog.Errorf("invalid watch LogEntry: %v", logEnt)
+ return nil
+ }
+}
+
+// processSyncGroupLogRecord checks if the log entry is a SyncGroup update and,
+// if it is, updates the watch prefixes for the app database and returns true.
+// Otherwise it returns false with no other changes.
+func processSyncGroupLogRecord(appName, dbName string, logEnt *watchable.LogEntry) bool {
+ switch op := logEnt.Op.(type) {
+ case *watchable.OpSyncGroup:
+ remove := op.Value.Remove
+ for _, prefix := range op.Value.Prefixes {
+ if remove {
+ decrWatchPrefix(appName, dbName, prefix)
+ } else {
+ incrWatchPrefix(appName, dbName, prefix)
+ }
+ }
+ return true
+
+ default:
+ return false
+ }
+}
+
+// syncable returns true if the given log entry falls within the scope of a
+// SyncGroup prefix for the given app database, and thus should be synced.
+// It is used to pre-filter the batch of log entries before sync processing.
+func syncable(appdb string, logEnt *watchable.LogEntry) bool {
+ var key string
+ switch op := logEnt.Op.(type) {
+ case *watchable.OpPut:
+ key = string(op.Value.Key)
+ case *watchable.OpDelete:
+ key = string(op.Value.Key)
+ default:
+ return false
+ }
+
+ for prefix := range watchPrefixes[appdb] {
+ if strings.HasPrefix(key, prefix) {
+ return true
+ }
+ }
+ return false
+}
+
+// resMarkKey returns the key used to access the watcher resume marker.
+func resMarkKey() string {
+ return util.JoinKeyParts(util.SyncPrefix, "w", "rm")
+}
+
+// setResMark stores the watcher resume marker for a database.
+func setResMark(ctx *context.T, tx store.StoreReadWriter, resMark string) error {
+ _ = tx.(store.Transaction)
+
+ if err := util.PutObject(tx, resMarkKey(), resMark); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
+}
+
+// getResMark retrieves the watcher resume marker for a database.
+func getResMark(ctx *context.T, st store.StoreReader) (string, error) {
+ var resMark string
+ key := resMarkKey()
+ if err := util.GetObject(st, key, &resMark); err != nil {
+ return NoVersion, translateError(ctx, err, key)
+ }
+ return resMark, nil
+}
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
new file mode 100644
index 0000000..ee5e600
--- /dev/null
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -0,0 +1,236 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Tests for the sync watcher in Syncbase.
+
+import (
+ "reflect"
+ "testing"
+
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
+)
+
+// TestSetResmark tests setting and getting a resume marker.
+func TestSetResmark(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ st := svc.St()
+
+ resmark, err := getResMark(nil, st)
+ if err == nil || resmark != "" {
+ t.Errorf("found non-existent resume marker: %s, %v", resmark, err)
+ }
+
+ wantResmark := "1234567890"
+ tx := st.NewTransaction()
+ if err := setResMark(nil, tx, wantResmark); err != nil {
+ t.Errorf("cannot set resume marker: %v", err)
+ }
+ tx.Commit()
+
+ resmark, err = getResMark(nil, st)
+ if err != nil {
+ t.Errorf("cannot get new resume marker: %v", err)
+ }
+ if resmark != wantResmark {
+ t.Errorf("invalid new resume: got %s instead of %s", resmark, wantResmark)
+ }
+}
+
+// TestWatchPrefixes tests setting and updating the watch prefixes map.
+func TestWatchPrefixes(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+
+ if len(watchPrefixes) != 0 {
+ t.Errorf("watch prefixes not empty: %v", watchPrefixes)
+ }
+
+ watchPrefixOps := []struct {
+ appName, dbName, key string
+ incr bool
+ }{
+ {"app1", "db1", "foo", true},
+ {"app1", "db1", "bar", true},
+ {"app2", "db1", "xyz", true},
+ {"app3", "db1", "haha", true},
+ {"app1", "db1", "foo", true},
+ {"app1", "db1", "foo", true},
+ {"app1", "db1", "foo", false},
+ {"app2", "db1", "ttt", true},
+ {"app2", "db1", "ttt", true},
+ {"app2", "db1", "ttt", false},
+ {"app2", "db1", "ttt", false},
+ {"app2", "db2", "qwerty", true},
+ {"app3", "db1", "haha", true},
+ {"app2", "db2", "qwerty", false},
+ {"app3", "db1", "haha", false},
+ }
+
+ for _, op := range watchPrefixOps {
+ if op.incr {
+ incrWatchPrefix(op.appName, op.dbName, op.key)
+ } else {
+ decrWatchPrefix(op.appName, op.dbName, op.key)
+ }
+ }
+
+ expPrefixes := map[string]sgPrefixes{
+ "app1:db1": sgPrefixes{"foo": 2, "bar": 1},
+ "app2:db1": sgPrefixes{"xyz": 1},
+ "app3:db1": sgPrefixes{"haha": 1},
+ }
+ if !reflect.DeepEqual(watchPrefixes, expPrefixes) {
+ t.Errorf("invalid watch prefixes: got %v instead of %v", watchPrefixes, expPrefixes)
+ }
+
+ checkSyncableTests := []struct {
+ appName, dbName, key string
+ result bool
+ }{
+ {"app1", "db1", "foo", true},
+ {"app1", "db1", "foobar", true},
+ {"app1", "db1", "bar", true},
+ {"app1", "db1", "bar123", true},
+ {"app1", "db1", "f", false},
+ {"app1", "db1", "ba", false},
+ {"app1", "db1", "xyz", false},
+ {"app1", "db555", "foo", false},
+ {"app555", "db1", "foo", false},
+ {"app2", "db1", "xyz123", true},
+ {"app2", "db1", "ttt123", false},
+ {"app2", "db2", "qwerty", false},
+ {"app3", "db1", "hahahoho", true},
+ {"app3", "db1", "hoho", false},
+ {"app3", "db1", "h", false},
+ }
+
+ for _, test := range checkSyncableTests {
+ log := &watchable.LogEntry{
+ Op: &watchable.OpPut{
+ watchable.PutOp{Key: []byte(test.key)},
+ },
+ }
+ res := syncable(appDbName(test.appName, test.dbName), log)
+ if res != test.result {
+ t.Errorf("checkSyncable: invalid output: %s, %s, %s: got %t instead of %t",
+ test.appName, test.dbName, test.key, res, test.result)
+ }
+ }
+}
+
+// newLog creates a Put or Delete watch log entry.
+func newLog(key, version string, delete bool) *watchable.LogEntry {
+ k, v := []byte(key), []byte(version)
+ log := &watchable.LogEntry{}
+ if delete {
+ log.Op = &watchable.OpDelete{watchable.DeleteOp{Key: k}}
+ } else {
+ log.Op = &watchable.OpPut{watchable.PutOp{Key: k, Version: v}}
+ }
+ return log
+}
+
+// newSGLog creates a SyncGroup watch log entry.
+func newSGLog(prefixes []string, remove bool) *watchable.LogEntry {
+ return &watchable.LogEntry{
+ Op: &watchable.OpSyncGroup{
+ watchable.SyncGroupOp{Prefixes: prefixes, Remove: remove},
+ },
+ }
+}
+
+// TestProcessWatchLogBatch tests the processing of a batch of log records.
+func TestProcessWatchLogBatch(t *testing.T) {
+ svc := createService(t)
+ defer destroyService(t, svc)
+ st := svc.St()
+ s := svc.sync
+
+ app, db := "mockapp", "mockdb"
+
+ // Empty logs does not fail.
+ s.processWatchLogBatch(nil, app, db, st, nil, "")
+
+ // Non-syncable logs.
+ batch := []*watchable.LogEntry{
+ newLog("foo", "123", false),
+ newLog("bar", "555", false),
+ }
+
+ resmark := "abcd"
+ s.processWatchLogBatch(nil, app, db, st, batch, resmark)
+
+ if res, err := getResMark(nil, st); err != nil && res != resmark {
+ t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
+ }
+ if hasNode(nil, st, "foo", "123") || hasNode(nil, st, "bar", "555") {
+ t.Error("hasNode() found DAG entries for non-syncable logs")
+ }
+
+ // Partially syncable logs.
+ batch = []*watchable.LogEntry{
+ newSGLog([]string{"f", "x"}, false),
+ newLog("foo", "333", false),
+ newLog("fooxyz", "444", false),
+ newLog("bar", "222", false),
+ }
+
+ resmark = "cdef"
+ s.processWatchLogBatch(nil, app, db, st, batch, resmark)
+
+ if res, err := getResMark(nil, st); err != nil && res != resmark {
+ t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
+ }
+ if head, err := getHead(nil, st, "foo"); err != nil && head != "333" {
+ t.Errorf("getHead() did not find foo: %s, %v", head, err)
+ }
+ node, err := getNode(nil, st, "foo", "333")
+ if err != nil {
+ t.Errorf("getNode() did not find foo: %v", err)
+ }
+ if node.Level != 0 || node.Parents != nil || node.Logrec == "" || node.BatchId == NoBatchId {
+ t.Errorf("invalid DAG node for foo: %v", node)
+ }
+ node2, err := getNode(nil, st, "fooxyz", "444")
+ if err != nil {
+ t.Errorf("getNode() did not find fooxyz: %v", err)
+ }
+ if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != node.BatchId {
+ t.Errorf("invalid DAG node for fooxyz: %v", node2)
+ }
+ if hasNode(nil, st, "bar", "222") {
+ t.Error("hasNode() found DAG entries for non-syncable logs")
+ }
+
+ // Back to non-syncable logs (remove "f" prefix).
+ batch = []*watchable.LogEntry{
+ newSGLog([]string{"f"}, true),
+ newLog("foo", "99", false),
+ newLog("fooxyz", "888", true),
+ newLog("bar", "007", false),
+ }
+
+ resmark = "tuvw"
+ s.processWatchLogBatch(nil, app, db, st, batch, resmark)
+
+ if res, err := getResMark(nil, st); err != nil && res != resmark {
+ t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
+ }
+ // No changes to "foo".
+ if head, err := getHead(nil, st, "foo"); err != nil && head != "333" {
+ t.Errorf("getHead() did not find foo: %s, %v", head, err)
+ }
+ if node, err := getNode(nil, st, "foo", "99"); err == nil {
+ t.Errorf("getNode() should not have found foo @ 99: %v", node)
+ }
+ if node, err := getNode(nil, st, "fooxyz", "888"); err == nil {
+ t.Errorf("getNode() should not have found fooxyz @ 888: %v", node)
+ }
+ if hasNode(nil, st, "bar", "007") {
+ t.Error("hasNode() found DAG entries for non-syncable logs")
+ }
+}