syncbase/vsync: add sync watcher

Add the sync watcher thread to read batches of store log entries
and apply them to the sync metadata.  Provide a hook for the
SyncGroup create/join/leave methods to inform the watcher of
changes to the key prefixes to sync.

Change-Id: Ic30f02c5506d01a09dbddde245b0b149a62c72e4
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 3cbff62..ccb291d 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -72,14 +72,16 @@
 	if tx.err != nil {
 		return convertError(tx.err)
 	}
-	var err error
 	if !tx.st.managesKey(key) {
-		err = tx.itx.Put(key, value)
-	} else {
-		err = putVersioned(tx.itx, key, value)
-		tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+		return tx.itx.Put(key, value)
 	}
-	return err
+
+	version, err := putVersioned(tx.itx, key, value)
+	if err != nil {
+		return err
+	}
+	tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Version: version}})
+	return nil
 }
 
 // Delete implements the store.StoreWriter interface.
@@ -118,15 +120,15 @@
 	}
 	// Write LogEntry records.
 	timestamp := tx.st.clock.Now().UnixNano()
+	// TODO(rdaoud): switch to using a single counter for log entries
+	// instead of a (sequence, index) combo.
 	keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
 	for txSeq, op := range tx.ops {
 		key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
 		value := &LogEntry{
 			Op:              op,
 			CommitTimestamp: timestamp,
-			// TODO(sadovsky): This information is also captured in LogEntry keys.
-			// Optimize to avoid redundancy.
-			Continued: txSeq < len(tx.ops)-1,
+			Continued:       txSeq < len(tx.ops)-1,
 		}
 		if err := util.PutObject(tx.itx, key, value); err != nil {
 			return err
@@ -150,6 +152,22 @@
 	return tx.itx.Abort()
 }
 
+// AddSyncGroupOp injects a SyncGroup operation notification in the log entries
+// that the transaction writes when it is committed.  It allows the SyncGroup
+// operations (create, join, leave, destroy) to notify the sync watcher of the
+// change at its proper position in the timeline (the transaction commit).
+// Note: this is an internal function used by sync, not part of the interface.
+func AddSyncGroupOp(tx store.Transaction, prefixes []string, remove bool) error {
+	wtx := tx.(*transaction)
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+	wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: prefixes, Remove: remove}})
+	return nil
+}
+
 // Exported as a helper function for testing purposes
 func getLogEntryKeyPrefix(seq uint64) string {
 	// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
diff --git a/x/ref/services/syncbase/server/watchable/types.vdl b/x/ref/services/syncbase/server/watchable/types.vdl
index 07d1ce1..6f07a1c 100644
--- a/x/ref/services/syncbase/server/watchable/types.vdl
+++ b/x/ref/services/syncbase/server/watchable/types.vdl
@@ -15,10 +15,12 @@
 	Limit []byte
 }
 
-// PutOp represents a store put operation.
+// PutOp represents a store put operation.  The new version is written instead
+// of the value to avoid duplicating the user data in the store.  The version
+// is used to access the user data of that specific mutation.
 type PutOp struct {
-	Key   []byte
-	Value []byte
+	Key     []byte
+	Version []byte
 }
 
 // DeleteOp represents a store delete operation.
@@ -26,12 +28,21 @@
 	Key []byte
 }
 
+// SyncGroupOp represents a change in SyncGroup tracking, adding or removing
+// key prefixes to sync.  SyncGroup prefixes cannot be changed, this is used
+// to track changes due to SyncGroup create/join/leave/destroy.
+type SyncGroupOp struct {
+	Prefixes []string
+	Remove   bool
+}
+
 // Op represents a store operation.
 type Op union {
-	Get    GetOp
-	Scan   ScanOp
-	Put    PutOp
-	Delete DeleteOp
+	Get       GetOp
+	Scan      ScanOp
+	Put       PutOp
+	Delete    DeleteOp
+	SyncGroup SyncGroupOp
 }
 
 // LogEntry represents a single store operation. This operation may have been
diff --git a/x/ref/services/syncbase/server/watchable/types.vdl.go b/x/ref/services/syncbase/server/watchable/types.vdl.go
index e94587a..a4f0619 100644
--- a/x/ref/services/syncbase/server/watchable/types.vdl.go
+++ b/x/ref/services/syncbase/server/watchable/types.vdl.go
@@ -33,10 +33,12 @@
 }) {
 }
 
-// PutOp represents a store put operation.
+// PutOp represents a store put operation.  The new version is written instead
+// of the value to avoid duplicating the user data in the store.  The version
+// is used to access the user data of that specific mutation.
 type PutOp struct {
-	Key   []byte
-	Value []byte
+	Key     []byte
+	Version []byte
 }
 
 func (PutOp) __VDLReflect(struct {
@@ -54,6 +56,19 @@
 }) {
 }
 
+// SyncGroupOp represents a change in SyncGroup tracking, adding or removing
+// key prefixes to sync.  SyncGroup prefixes cannot be changed, this is used
+// to track changes due to SyncGroup create/join/leave/destroy.
+type SyncGroupOp struct {
+	Prefixes []string
+	Remove   bool
+}
+
+func (SyncGroupOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.SyncGroupOp"`
+}) {
+}
+
 type (
 	// Op represents any single field of the Op union type.
 	//
@@ -76,15 +91,18 @@
 	OpPut struct{ Value PutOp }
 	// OpDelete represents field Delete of the Op union type.
 	OpDelete struct{ Value DeleteOp }
+	// OpSyncGroup represents field SyncGroup of the Op union type.
+	OpSyncGroup struct{ Value SyncGroupOp }
 	// __OpReflect describes the Op union type.
 	__OpReflect struct {
 		Name  string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
 		Type  Op
 		Union struct {
-			Get    OpGet
-			Scan   OpScan
-			Put    OpPut
-			Delete OpDelete
+			Get       OpGet
+			Scan      OpScan
+			Put       OpPut
+			Delete    OpDelete
+			SyncGroup OpSyncGroup
 		}
 	}
 )
@@ -109,6 +127,11 @@
 func (x OpDelete) Name() string             { return "Delete" }
 func (x OpDelete) __VDLReflect(__OpReflect) {}
 
+func (x OpSyncGroup) Index() int               { return 4 }
+func (x OpSyncGroup) Interface() interface{}   { return x.Value }
+func (x OpSyncGroup) Name() string             { return "SyncGroup" }
+func (x OpSyncGroup) __VDLReflect(__OpReflect) {}
+
 // LogEntry represents a single store operation. This operation may have been
 // part of a transaction, as signified by the Continued boolean. Read-only
 // operations (and read-only transactions) are not logged.
@@ -132,6 +155,7 @@
 	vdl.Register((*ScanOp)(nil))
 	vdl.Register((*PutOp)(nil))
 	vdl.Register((*DeleteOp)(nil))
+	vdl.Register((*SyncGroupOp)(nil))
 	vdl.Register((*Op)(nil))
 	vdl.Register((*LogEntry)(nil))
 }
diff --git a/x/ref/services/syncbase/server/watchable/util.go b/x/ref/services/syncbase/server/watchable/util.go
index 3dc3e78..036d326 100644
--- a/x/ref/services/syncbase/server/watchable/util.go
+++ b/x/ref/services/syncbase/server/watchable/util.go
@@ -7,6 +7,10 @@
 // TODO(sadovsky): Avoid copying back and forth between []byte's and strings.
 // We should probably convert incoming strings to []byte's as early as possible,
 // and deal exclusively in []byte's internally.
+// TODO(rdaoud): I propose we standardize on key and version being strings and
+// the value being []byte within Syncbase.  We define invalid characters in the
+// key space (and reserve "$" and ":").  The lower storage engine layers are
+// free to map that to what they need internally ([]byte or string).
 
 import (
 	"fmt"
@@ -25,6 +29,19 @@
 	rngLock sync.Mutex
 )
 
+// NewVersion returns a new version for a store entry mutation.
+func NewVersion() []byte {
+	// TODO(rdaoud): revisit the number of bits: should we use 128 bits?
+	// Note: the version has to be unique per object key, not on its own.
+	// TODO(rdaoud): move sync's rand64() to a general Syncbase spot and
+	// reuse it here.
+	rngLock.Lock()
+	num := rng.Int63()
+	rngLock.Unlock()
+
+	return []byte(fmt.Sprintf("%x", num))
+}
+
 func makeVersionKey(key []byte) []byte {
 	return []byte(join(util.VersionPrefix, string(key)))
 }
@@ -109,20 +126,19 @@
 	if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
 		return err
 	}
-	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Value: version}})
+	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
 	return nil
 }
 
-func putVersioned(tx store.Transaction, key, value []byte) error {
-	rngLock.Lock()
-	num := rng.Int63()
-	rngLock.Unlock()
-
-	version := []byte(fmt.Sprintf("%x", num))
+func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
+	version := NewVersion()
 	if err := tx.Put(makeVersionKey(key), version); err != nil {
-		return err
+		return nil, err
 	}
-	return tx.Put(makeAtVersionKey(key, version), value)
+	if err := tx.Put(makeAtVersionKey(key, version), value); err != nil {
+		return nil, err
+	}
+	return version, nil
 }
 
 func deleteVersioned(tx store.Transaction, key []byte) error {
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 5a432c7..72e8d91 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -726,8 +726,9 @@
 	}
 
 	var node dagNode
-	if err := util.GetObject(st, nodeKey(oid, version), &node); err != nil {
-		return nil, verror.New(verror.ErrInternal, ctx, err)
+	key := nodeKey(oid, version)
+	if err := util.GetObject(st, key, &node); err != nil {
+		return nil, translateError(ctx, err, key)
 	}
 	return &node, nil
 }
@@ -777,8 +778,9 @@
 // getHead retrieves the DAG object head.
 func getHead(ctx *context.T, st store.StoreReader, oid string) (string, error) {
 	var version string
-	if err := util.GetObject(st, headKey(oid), &version); err != nil {
-		return NoVersion, verror.New(verror.ErrInternal, ctx, err)
+	key := headKey(oid)
+	if err := util.GetObject(st, key, &version); err != nil {
+		return NoVersion, translateError(ctx, err, key)
 	}
 	return version, nil
 }
@@ -819,8 +821,9 @@
 	}
 
 	var info batchInfo
-	if err := util.GetObject(st, batchKey(btid), &info); err != nil {
-		return nil, verror.New(verror.ErrInternal, ctx, err)
+	key := batchKey(btid)
+	if err := util.GetObject(st, key, &info); err != nil {
+		return nil, translateError(ctx, err, key)
 	}
 	return &info, nil
 }
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index ca2e3fc..eac07f7 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -129,7 +129,7 @@
 	s.pending.Add(2)
 
 	// Start watcher thread to watch for updates to local store.
-	go s.watchStore()
+	go s.watchStore(ctx)
 
 	// Start initiator thread to periodically get deltas from peers.
 	go s.syncer(ctx)
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index f025327..101d08c 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -55,10 +55,10 @@
 }
 
 // initSync initializes the sync module during startup. It scans all the
-// databases across all apps to a) initialize the in-memory sync state of a
-// Database consisting of the current generation number, log position and
-// generation vector; b) initialize the prefixes that are currently being
-// synced.
+// databases across all apps to initialize the following:
+// a) in-memory sync state of a Database consisting of the current generation
+//    number, log position and generation vector.
+// b) watcher map of prefixes currently being synced.
 //
 // TODO(hpucha): This is incomplete. Flesh this out further.
 func (s *syncService) initSync(ctx *context.T) error {
@@ -68,40 +68,49 @@
 	var errFinal error
 	s.syncState = make(map[string]*dbSyncStateInMem)
 
-	// TODO(hpucha): Temporary hack.
-	return errFinal
-
-	/*s.forEachDatabaseStore(ctx, func(st store.Store) bool {
-		sn := st.NewSnapshot()
-		defer sn.Close()
-
-		ds, err := getDbSyncState(ctx, sn)
-		if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
-			errFinal = err
+	s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
+		// Scan the SyncGroups, skipping those not yet being watched.
+		forEachSyncGroup(st, func(sg *interfaces.SyncGroup) bool {
+			// TODO(rdaoud): only use SyncGroups that have been
+			// marked as "watchable" by the sync watcher thread.
+			// This is to handle the case of a SyncGroup being
+			// created but Syncbase restarting before the watcher
+			// processed the SyncGroupOp entry in the watch queue.
+			// It should not be syncing that SyncGroup's data after
+			// restart, but wait until the watcher processes the
+			// entry as would have happened without a restart.
+			for _, prefix := range sg.Spec.Prefixes {
+				incrWatchPrefix(appName, dbName, prefix)
+			}
 			return false
+		})
+
+		if false {
+			// Fetch the sync state.
+			ds, err := getDbSyncState(ctx, st)
+			if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
+				errFinal = err
+				return false
+			}
+			var scanStart, scanLimit []byte
+			// Figure out what to scan among local log records.
+			if verror.ErrorID(err) == verror.ErrNoExist.ID {
+				scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "")
+			} else {
+				scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "")
+			}
+			var maxpos uint64
+			var dbName string
+			// Scan local log records to find the most recent one.
+			st.Scan(scanStart, scanLimit)
+			// Scan remote log records using the persisted GenVector.
+			s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1}
 		}
 
-		var scanStart, scanLimit []byte
-		// Figure out what to scan among local log records.
-		if verror.ErrorID(err) == verror.ErrNoExist.ID {
-			scanStart, scanLimit = util.ScanPrefixArgs(logRecsPerDeviceScanPrefix(s.id), "")
-		} else {
-			scanStart, scanLimit = util.ScanPrefixArgs(logRecKey(s.id, ds.Gen), "")
-		}
-
-		var maxpos uint64
-		var dbName string
-		// Scan local log records to find the most recent one.
-		sn.Scan(scanStart, scanLimit)
-
-		// Scan remote log records using the persisted GenVector.
-
-		s.syncState[dbName] = &dbSyncStateInMem{pos: maxpos + 1}
-
 		return false
 	})
 
-	return errFinal*/
+	return errFinal
 }
 
 // reserveGenAndPosInDbLog reserves a chunk of generation numbers and log
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 823aeb1..79cc9bf 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -209,7 +209,6 @@
 
 	// Create a new aggregate view of SyncGroup members across all app databases.
 	newMembers := make(map[string]*memberInfo)
-	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
 
 	s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
 		// For each database, fetch its SyncGroup data entries by scanning their
@@ -218,14 +217,7 @@
 		defer sn.Close()
 		name := appDbName(appName, dbName)
 
-		stream := sn.Scan(scanStart, scanLimit)
-		for stream.Advance() {
-			var sg interfaces.SyncGroup
-			if vom.Decode(stream.Value(nil), &sg) != nil {
-				vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
-				continue
-			}
-
+		forEachSyncGroup(sn, func(sg *interfaces.SyncGroup) bool {
 			// Add all members of this SyncGroup to the membership view.
 			// A member's info is different across SyncGroups, so gather all of them.
 			for member, info := range sg.Joiners {
@@ -237,7 +229,8 @@
 				}
 				newMembers[member].db2sg[name][sg.Id] = info
 			}
-		}
+			return false
+		})
 		return false
 	})
 
@@ -245,6 +238,30 @@
 	view.expiration = time.Now().Add(memberViewTTL)
 }
 
+// forEachSyncGroup iterates over all SyncGroups in the Database and invokes
+// the callback function on each one.  The callback returns a "done" flag to
+// make forEachSyncGroup() stop the iteration earlier; otherwise the function
+// loops across all SyncGroups in the Database.
+func forEachSyncGroup(st store.StoreReader, callback func(*interfaces.SyncGroup) bool) {
+	scanStart, scanLimit := util.ScanPrefixArgs(sgDataKeyScanPrefix(), "")
+	stream := st.Scan(scanStart, scanLimit)
+	for stream.Advance() {
+		var sg interfaces.SyncGroup
+		if vom.Decode(stream.Value(nil), &sg) != nil {
+			vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
+			continue
+		}
+
+		if callback(&sg) {
+			break // done, early exit
+		}
+	}
+
+	if err := stream.Err(); err != nil {
+		vlog.Errorf("forEachSyncGroup: scan stream error: %v", err)
+	}
+}
+
 // getMembers returns all SyncGroup members and the count of SyncGroups each one
 // joined.
 func (s *syncService) getMembers(ctx *context.T) map[string]uint32 {
diff --git a/x/ref/services/syncbase/vsync/util.go b/x/ref/services/syncbase/vsync/util.go
index 62edcd2..65d47eb 100644
--- a/x/ref/services/syncbase/vsync/util.go
+++ b/x/ref/services/syncbase/vsync/util.go
@@ -7,6 +7,8 @@
 // Sync utility functions
 
 import (
+	"time"
+
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
@@ -14,6 +16,10 @@
 	"v.io/x/lib/vlog"
 )
 
+const (
+	nanoPerSec = int64(1000000000)
+)
+
 // forEachDatabaseStore iterates over all Databases in all Apps within the
 // service and invokes the callback function on each database. The callback
 // returns a "done" flag to make forEachDatabaseStore() stop the iteration
@@ -76,3 +82,11 @@
 	}
 	return verror.New(verror.ErrInternal, ctx, key, err)
 }
+
+// unixNanoToTime converts a Unix timestamp in nanoseconds to a Time object.
+func unixNanoToTime(timestamp int64) time.Time {
+	if timestamp < 0 {
+		vlog.Fatalf("unixNanoToTime: invalid timestamp %d", timestamp)
+	}
+	return time.Unix(timestamp/nanoPerSec, timestamp%nanoPerSec)
+}
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 5ac1b52..bbd1ac8 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -4,30 +4,159 @@
 
 package vsync
 
-import (
-	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
-	"v.io/syncbase/x/ref/services/syncbase/store"
+// Syncbase Watcher is a goroutine that listens to local Database updates from
+// applications and modifies sync metadata (e.g. DAG and local log records).
+// The coupling between Syncbase storage and sync is loose, via asynchronous
+// listening by the Watcher, to unblock the application operations as soon as
+// possible, and offload the sync metadata update to the Watcher.  When the
+// application mutates objects in a Database, additional entries are written
+// to a log queue, persisted in the same Database.  This queue is read by the
+// sync Watcher to learn of the changes.
 
+import (
+	"strings"
+	"time"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
+	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/v23/context"
 	"v.io/v23/verror"
+	"v.io/v23/vom"
+	"v.io/x/lib/vlog"
 )
 
-// When applications update objects in the local Store, the sync
-// watcher thread learns about them asynchronously via the "watch"
-// stream of object mutations. In turn, this sync watcher thread
-// updates the DAG and log records to track the object change
-// histories.
+var (
+	// watchPollInterval is the duration between consecutive watch polling
+	// events across all app databases.  Every watch event loops across all
+	// app databases and fetches from each one at most one batch update
+	// (transaction) to process.
+	// TODO(rdaoud): add a channel between store and watch to get change
+	// notifications instead of using a polling solution.
+	watchPollInterval = 100 * time.Millisecond
 
-// watchStore processes updates obtained by watching the store.
-func (s *syncService) watchStore() {
+	// watchPrefixes is an in-memory cache of SyncGroup prefixes for each
+	// app database.  It is filled at startup from persisted SyncGroup data
+	// and updated at runtime when SyncGroups are joined or left.  It is
+	// not guarded by a mutex because only the watcher goroutine uses it
+	// beyond the startup phase (before any sync goroutines are started).
+	// The map keys are the appdb names (globally unique).
+	watchPrefixes = make(map[string]sgPrefixes)
+)
+
+// sgPrefixes tracks SyncGroup prefixes being synced in a database and their
+// counts.
+type sgPrefixes map[string]uint32
+
+// watchStore processes updates obtained by watching the store.  This is the
+// sync watcher goroutine that learns about store updates asynchronously by
+// reading log records that track object mutation histories in each database.
+// For each batch mutation, the watcher updates the sync DAG and log records.
+// When an application makes a single non-transactional put, it is represented
+// as a batch of one log record. Thus the watcher only deals with batches.
+func (s *syncService) watchStore(ctx *context.T) {
 	defer s.pending.Done()
+
+	ticker := time.NewTicker(watchPollInterval)
+	defer ticker.Stop()
+
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
+	for {
+		select {
+		case <-s.closed:
+			vlog.VI(1).Info("watchStore: sync channel closed, stop watching and exit")
+			return
+
+		case <-ticker.C:
+			s.forEachDatabaseStore(ctx, func(appName, dbName string, st store.Store) bool {
+				s.processDatabase(ctx, appName, dbName, st)
+				return false
+			})
+		}
+	}
 }
 
-// TODO(hpucha): This is a skeleton only to drive the change for log.
+// processDatabase fetches from the given database at most one new batch update
+// (transaction) and processes it.  The one-batch limit prevents one database
+// from starving others.  A batch is stored as a contiguous set of log records
+// ending with one record having the "continued" flag set to false.
+func (s *syncService) processDatabase(ctx *context.T, appName, dbName string, st store.Store) {
+	resMark, err := getResMark(ctx, st)
+	if err != nil {
+		if verror.ErrorID(err) != verror.ErrNoExist.ID {
+			vlog.Errorf("processDatabase: %s, %s: cannot get resMark: %v", appName, dbName, err)
+			return
+		}
+		resMark = ""
+	}
+
+	// Get a batch of watch log entries, if any, after this resume marker.
+	if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
+		s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
+	}
+}
+
+// processWatchLogBatch parses the given batch of watch log records, updates the
+// watchable SyncGroup prefixes, uses the prefixes to filter the batch to the
+// subset of syncable records, and transactionally applies these updates to the
+// sync metadata (DAG & log records) and updates the watch resume marker.
+func (s *syncService) processWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, logs []*watchable.LogEntry, resMark string) {
+	if len(logs) == 0 {
+		return
+	}
+
+	// First handle SyncGroup prefix changes within the batch by updating
+	// the watch prefixes.  It is as if these log entries were at the start
+	// of the batch.  Exclude them from the actual data batch.
+	dataLogs := make([]*watchable.LogEntry, 0, len(logs))
+	for _, entry := range logs {
+		if processSyncGroupLogRecord(appName, dbName, entry) == false {
+			dataLogs = append(dataLogs, entry)
+		}
+	}
+
+	// Filter out the log entries for keys not part of any SyncGroup.
+	// TODO(rdaoud): filter out entries made by sync (echo suppression).
+	totalCount := uint64(len(dataLogs))
+	appdb := appDbName(appName, dbName)
+	logs = make([]*watchable.LogEntry, 0, len(dataLogs))
+
+	for _, entry := range dataLogs {
+		if syncable(appdb, entry) {
+			logs = append(logs, entry)
+		}
+	}
+	dataLogs = nil
+
+	// Transactional processing of the batch: convert these syncable log
+	// records to a batch of sync log records, filling their parent versions
+	// from the DAG head nodes.
+	err := store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
+		batch := make([]*localLogRec, 0, len(logs))
+		for _, entry := range logs {
+			if rec := convertLogRecord(ctx, tx, entry); rec != nil {
+				batch = append(batch, rec)
+			}
+		}
+
+		if err := s.processBatch(ctx, appName, dbName, batch, totalCount, tx); err != nil {
+			return err
+		}
+		return setResMark(ctx, tx, resMark)
+	})
+
+	if err != nil {
+		// TODO(rdaoud): don't crash, quarantine this app database.
+		vlog.Fatalf("processDatabase: %s, %s: watcher cannot process batch: %v", appName, dbName, err)
+	}
+}
 
 // processBatch applies a single batch of changes (object mutations) received
 // from watching a particular Database.
-func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, tx store.StoreReadWriter) error {
+func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, totalCount uint64, tx store.StoreReadWriter) error {
 	_ = tx.(store.Transaction)
 
 	count := uint64(len(batch))
@@ -37,7 +166,7 @@
 
 	// If the batch has more than one mutation, start a batch for it.
 	batchId := NoBatchId
-	if count > 1 {
+	if totalCount > 1 {
 		batchId = s.startBatch(ctx, tx, batchId)
 		if batchId == NoBatchId {
 			return verror.New(verror.ErrInternal, ctx, "failed to generate batch ID")
@@ -47,7 +176,6 @@
 	gen, pos := s.reserveGenAndPosInDbLog(ctx, appName, dbName, count)
 
 	for _, rec := range batch {
-
 		// Update the log record. Portions of the record Metadata must
 		// already be filled.
 		rec.Metadata.Id = s.id
@@ -55,7 +183,7 @@
 		rec.Metadata.RecType = interfaces.NodeRec
 
 		rec.Metadata.BatchId = batchId
-		rec.Metadata.BatchCount = count
+		rec.Metadata.BatchCount = totalCount
 
 		rec.Pos = pos
 
@@ -77,7 +205,7 @@
 	return nil
 }
 
-// processLogRec processes a local log record by adding to the Database and
+// processLocalLogRec processes a local log record by adding to the Database and
 // suitably updating the DAG metadata.
 func (s *syncService) processLocalLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
 	// Insert the new log record into the log.
@@ -96,3 +224,216 @@
 	// Move the head.
 	return moveHead(ctx, tx, m.ObjId, m.CurVers)
 }
+
+// incrWatchPrefix increments (or sets) a SyncGroup prefix for an app database
+// in the watch prefix cache.
+func incrWatchPrefix(appName, dbName, prefix string) {
+	name := appDbName(appName, dbName)
+	if pfxs := watchPrefixes[name]; pfxs != nil {
+		pfxs[prefix]++ // it auto-initializes a non-existent prefix
+	} else {
+		watchPrefixes[name] = sgPrefixes{prefix: 1}
+	}
+}
+
+// decrWatchPrefix decrements (or unsets) a SyncGroup prefix for an app database
+// in the watch prefix cache.
+func decrWatchPrefix(appName, dbName, prefix string) {
+	name := appDbName(appName, dbName)
+	if pfxs := watchPrefixes[name]; pfxs != nil {
+		if pfxs[prefix] > 1 {
+			pfxs[prefix]--
+		} else if len(pfxs) > 1 {
+			delete(pfxs, prefix)
+		} else {
+			delete(watchPrefixes, name)
+		}
+	}
+}
+
+// dbLogScanArgs determines the arguments needed to start a new scan from a
+// given resume marker (last log entry read).  An empty resume marker is used
+// to begin the scan from the start of the log.
+func dbLogScanArgs(resMark string) ([]byte, []byte) {
+	start, limit := util.ScanPrefixArgs(util.LogPrefix, "")
+	if resMark != "" {
+		// To start just after the current resume marker, augment it by
+		// appending an extra byte at the end.  Use byte value zero to
+		// use the lowest value possible.  This works because resume
+		// markers have a fixed length and are sorted lexicographically.
+		// By creationg a fake longer resume marker that falls between
+		// real resume markers, the next scan will start right after
+		// where the previous one stopped without missing data.
+		start = append([]byte(resMark), 0)
+	}
+	return start, limit
+}
+
+// getWatchLogBatch returns a batch of watch log records (a transaction) from
+// the given database and the new resume marker at the end of the batch.
+func getWatchLogBatch(ctx *context.T, appName, dbName string, st store.Store, resMark string) ([]*watchable.LogEntry, string) {
+	scanStart, scanLimit := dbLogScanArgs(resMark)
+	endOfBatch := false
+	var newResmark string
+
+	// Use the store directly to scan these read-only log entries, no need
+	// to create a snapshot since they are never overwritten.  Read and
+	// buffer a batch before processing it.
+	var logs []*watchable.LogEntry
+	stream := st.Scan(scanStart, scanLimit)
+	for stream.Advance() {
+		logKey := string(stream.Key(nil))
+		var logEnt watchable.LogEntry
+		if vom.Decode(stream.Value(nil), &logEnt) != nil {
+			vlog.Fatalf("getWatchLogBatch: %s, %s: invalid watch LogEntry %s: %v",
+				appName, dbName, logKey, stream.Value(nil))
+		}
+
+		logs = append(logs, &logEnt)
+
+		// Stop if this is the end of the batch.
+		if logEnt.Continued == false {
+			newResmark = logKey
+			endOfBatch = true
+			break
+		}
+	}
+
+	if err := stream.Err(); err != nil {
+		vlog.Errorf("getWatchLogBatch: %s, %s: scan stream error: %v", appName, dbName, err)
+		return nil, resMark
+	}
+	if !endOfBatch {
+		if len(logs) > 0 {
+			vlog.Fatalf("processDatabase: %s, %s: end of batch not found after %d entries",
+				appName, dbName, len(logs))
+		}
+		return nil, resMark
+	}
+	return logs, newResmark
+}
+
+// convertLogRecord converts a store log entry to a sync log record.  It fills
+// the previous object version (parent) by fetching its current DAG head if it
+// has one.  For a delete, it generates a new object version because the store
+// does not version a deletion.
+// TODO(rdaoud): change Syncbase to store and version a deleted object to
+// simplify the store-to-sync interaction.  A deleted key would still have a
+// version and its value entry would encode the "deleted" flag, either in the
+// key or probably in a value wrapper that would contain other metadata.
+func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) *localLogRec {
+	_ = tx.(store.Transaction)
+
+	switch op := logEnt.Op.(type) {
+	case *watchable.OpGet:
+		// TODO(rdaoud): save read-set in sync.
+		return nil
+
+	case *watchable.OpScan:
+		// TODO(rdaoud): save scan-set in sync.
+		return nil
+
+	case *watchable.OpPut:
+		rec := localLogRec{}
+		oid := string(op.Value.Key)
+		rec.Metadata.ObjId = oid
+		rec.Metadata.CurVers = string(op.Value.Version)
+		if head, err := getHead(ctx, tx, oid); err == nil {
+			rec.Metadata.Parents = []string{head}
+		} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
+			vlog.Fatalf("cannot getHead to convert Put log record for %s: %v", oid, err)
+		}
+		rec.Metadata.UpdTime = unixNanoToTime(logEnt.CommitTimestamp)
+		return &rec
+
+	case *watchable.OpDelete:
+		rec := localLogRec{}
+		oid := string(op.Value.Key)
+		rec.Metadata.ObjId = oid
+		rec.Metadata.CurVers = string(watchable.NewVersion())
+		rec.Metadata.Delete = true
+		if head, err := getHead(ctx, tx, oid); err == nil {
+			rec.Metadata.Parents = []string{head}
+		} else {
+			vlog.Fatalf("cannot getHead to convert Delete log record for %s: %v", oid, err)
+		}
+		rec.Metadata.UpdTime = unixNanoToTime(logEnt.CommitTimestamp)
+		return &rec
+
+	case *watchable.OpSyncGroup:
+		vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt)
+		return nil
+
+	default:
+		vlog.Errorf("invalid watch LogEntry: %v", logEnt)
+		return nil
+	}
+}
+
+// processSyncGroupLogRecord checks if the log entry is a SyncGroup update and,
+// if it is, updates the watch prefixes for the app database and returns true.
+// Otherwise it returns false with no other changes.
+func processSyncGroupLogRecord(appName, dbName string, logEnt *watchable.LogEntry) bool {
+	switch op := logEnt.Op.(type) {
+	case *watchable.OpSyncGroup:
+		remove := op.Value.Remove
+		for _, prefix := range op.Value.Prefixes {
+			if remove {
+				decrWatchPrefix(appName, dbName, prefix)
+			} else {
+				incrWatchPrefix(appName, dbName, prefix)
+			}
+		}
+		return true
+
+	default:
+		return false
+	}
+}
+
+// syncable returns true if the given log entry falls within the scope of a
+// SyncGroup prefix for the given app database, and thus should be synced.
+// It is used to pre-filter the batch of log entries before sync processing.
+func syncable(appdb string, logEnt *watchable.LogEntry) bool {
+	var key string
+	switch op := logEnt.Op.(type) {
+	case *watchable.OpPut:
+		key = string(op.Value.Key)
+	case *watchable.OpDelete:
+		key = string(op.Value.Key)
+	default:
+		return false
+	}
+
+	for prefix := range watchPrefixes[appdb] {
+		if strings.HasPrefix(key, prefix) {
+			return true
+		}
+	}
+	return false
+}
+
+// resMarkKey returns the key used to access the watcher resume marker.
+func resMarkKey() string {
+	return util.JoinKeyParts(util.SyncPrefix, "w", "rm")
+}
+
+// setResMark stores the watcher resume marker for a database.
+func setResMark(ctx *context.T, tx store.StoreReadWriter, resMark string) error {
+	_ = tx.(store.Transaction)
+
+	if err := util.PutObject(tx, resMarkKey(), resMark); err != nil {
+		return verror.New(verror.ErrInternal, ctx, err)
+	}
+	return nil
+}
+
+// getResMark retrieves the watcher resume marker for a database.
+func getResMark(ctx *context.T, st store.StoreReader) (string, error) {
+	var resMark string
+	key := resMarkKey()
+	if err := util.GetObject(st, key, &resMark); err != nil {
+		return NoVersion, translateError(ctx, err, key)
+	}
+	return resMark, nil
+}
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
new file mode 100644
index 0000000..ee5e600
--- /dev/null
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -0,0 +1,236 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vsync
+
+// Tests for the sync watcher in Syncbase.
+
+import (
+	"reflect"
+	"testing"
+
+	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
+)
+
+// TestSetResmark tests setting and getting a resume marker.
+func TestSetResmark(t *testing.T) {
+	svc := createService(t)
+	defer destroyService(t, svc)
+	st := svc.St()
+
+	resmark, err := getResMark(nil, st)
+	if err == nil || resmark != "" {
+		t.Errorf("found non-existent resume marker: %s, %v", resmark, err)
+	}
+
+	wantResmark := "1234567890"
+	tx := st.NewTransaction()
+	if err := setResMark(nil, tx, wantResmark); err != nil {
+		t.Errorf("cannot set resume marker: %v", err)
+	}
+	tx.Commit()
+
+	resmark, err = getResMark(nil, st)
+	if err != nil {
+		t.Errorf("cannot get new resume marker: %v", err)
+	}
+	if resmark != wantResmark {
+		t.Errorf("invalid new resume: got %s instead of %s", resmark, wantResmark)
+	}
+}
+
+// TestWatchPrefixes tests setting and updating the watch prefixes map.
+func TestWatchPrefixes(t *testing.T) {
+	svc := createService(t)
+	defer destroyService(t, svc)
+
+	if len(watchPrefixes) != 0 {
+		t.Errorf("watch prefixes not empty: %v", watchPrefixes)
+	}
+
+	watchPrefixOps := []struct {
+		appName, dbName, key string
+		incr                 bool
+	}{
+		{"app1", "db1", "foo", true},
+		{"app1", "db1", "bar", true},
+		{"app2", "db1", "xyz", true},
+		{"app3", "db1", "haha", true},
+		{"app1", "db1", "foo", true},
+		{"app1", "db1", "foo", true},
+		{"app1", "db1", "foo", false},
+		{"app2", "db1", "ttt", true},
+		{"app2", "db1", "ttt", true},
+		{"app2", "db1", "ttt", false},
+		{"app2", "db1", "ttt", false},
+		{"app2", "db2", "qwerty", true},
+		{"app3", "db1", "haha", true},
+		{"app2", "db2", "qwerty", false},
+		{"app3", "db1", "haha", false},
+	}
+
+	for _, op := range watchPrefixOps {
+		if op.incr {
+			incrWatchPrefix(op.appName, op.dbName, op.key)
+		} else {
+			decrWatchPrefix(op.appName, op.dbName, op.key)
+		}
+	}
+
+	expPrefixes := map[string]sgPrefixes{
+		"app1:db1": sgPrefixes{"foo": 2, "bar": 1},
+		"app2:db1": sgPrefixes{"xyz": 1},
+		"app3:db1": sgPrefixes{"haha": 1},
+	}
+	if !reflect.DeepEqual(watchPrefixes, expPrefixes) {
+		t.Errorf("invalid watch prefixes: got %v instead of %v", watchPrefixes, expPrefixes)
+	}
+
+	checkSyncableTests := []struct {
+		appName, dbName, key string
+		result               bool
+	}{
+		{"app1", "db1", "foo", true},
+		{"app1", "db1", "foobar", true},
+		{"app1", "db1", "bar", true},
+		{"app1", "db1", "bar123", true},
+		{"app1", "db1", "f", false},
+		{"app1", "db1", "ba", false},
+		{"app1", "db1", "xyz", false},
+		{"app1", "db555", "foo", false},
+		{"app555", "db1", "foo", false},
+		{"app2", "db1", "xyz123", true},
+		{"app2", "db1", "ttt123", false},
+		{"app2", "db2", "qwerty", false},
+		{"app3", "db1", "hahahoho", true},
+		{"app3", "db1", "hoho", false},
+		{"app3", "db1", "h", false},
+	}
+
+	for _, test := range checkSyncableTests {
+		log := &watchable.LogEntry{
+			Op: &watchable.OpPut{
+				watchable.PutOp{Key: []byte(test.key)},
+			},
+		}
+		res := syncable(appDbName(test.appName, test.dbName), log)
+		if res != test.result {
+			t.Errorf("checkSyncable: invalid output: %s, %s, %s: got %t instead of %t",
+				test.appName, test.dbName, test.key, res, test.result)
+		}
+	}
+}
+
+// newLog creates a Put or Delete watch log entry.
+func newLog(key, version string, delete bool) *watchable.LogEntry {
+	k, v := []byte(key), []byte(version)
+	log := &watchable.LogEntry{}
+	if delete {
+		log.Op = &watchable.OpDelete{watchable.DeleteOp{Key: k}}
+	} else {
+		log.Op = &watchable.OpPut{watchable.PutOp{Key: k, Version: v}}
+	}
+	return log
+}
+
+// newSGLog creates a SyncGroup watch log entry.
+func newSGLog(prefixes []string, remove bool) *watchable.LogEntry {
+	return &watchable.LogEntry{
+		Op: &watchable.OpSyncGroup{
+			watchable.SyncGroupOp{Prefixes: prefixes, Remove: remove},
+		},
+	}
+}
+
+// TestProcessWatchLogBatch tests the processing of a batch of log records.
+func TestProcessWatchLogBatch(t *testing.T) {
+	svc := createService(t)
+	defer destroyService(t, svc)
+	st := svc.St()
+	s := svc.sync
+
+	app, db := "mockapp", "mockdb"
+
+	// Empty logs does not fail.
+	s.processWatchLogBatch(nil, app, db, st, nil, "")
+
+	// Non-syncable logs.
+	batch := []*watchable.LogEntry{
+		newLog("foo", "123", false),
+		newLog("bar", "555", false),
+	}
+
+	resmark := "abcd"
+	s.processWatchLogBatch(nil, app, db, st, batch, resmark)
+
+	if res, err := getResMark(nil, st); err != nil && res != resmark {
+		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
+	}
+	if hasNode(nil, st, "foo", "123") || hasNode(nil, st, "bar", "555") {
+		t.Error("hasNode() found DAG entries for non-syncable logs")
+	}
+
+	// Partially syncable logs.
+	batch = []*watchable.LogEntry{
+		newSGLog([]string{"f", "x"}, false),
+		newLog("foo", "333", false),
+		newLog("fooxyz", "444", false),
+		newLog("bar", "222", false),
+	}
+
+	resmark = "cdef"
+	s.processWatchLogBatch(nil, app, db, st, batch, resmark)
+
+	if res, err := getResMark(nil, st); err != nil && res != resmark {
+		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
+	}
+	if head, err := getHead(nil, st, "foo"); err != nil && head != "333" {
+		t.Errorf("getHead() did not find foo: %s, %v", head, err)
+	}
+	node, err := getNode(nil, st, "foo", "333")
+	if err != nil {
+		t.Errorf("getNode() did not find foo: %v", err)
+	}
+	if node.Level != 0 || node.Parents != nil || node.Logrec == "" || node.BatchId == NoBatchId {
+		t.Errorf("invalid DAG node for foo: %v", node)
+	}
+	node2, err := getNode(nil, st, "fooxyz", "444")
+	if err != nil {
+		t.Errorf("getNode() did not find fooxyz: %v", err)
+	}
+	if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != node.BatchId {
+		t.Errorf("invalid DAG node for fooxyz: %v", node2)
+	}
+	if hasNode(nil, st, "bar", "222") {
+		t.Error("hasNode() found DAG entries for non-syncable logs")
+	}
+
+	// Back to non-syncable logs (remove "f" prefix).
+	batch = []*watchable.LogEntry{
+		newSGLog([]string{"f"}, true),
+		newLog("foo", "99", false),
+		newLog("fooxyz", "888", true),
+		newLog("bar", "007", false),
+	}
+
+	resmark = "tuvw"
+	s.processWatchLogBatch(nil, app, db, st, batch, resmark)
+
+	if res, err := getResMark(nil, st); err != nil && res != resmark {
+		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
+	}
+	// No changes to "foo".
+	if head, err := getHead(nil, st, "foo"); err != nil && head != "333" {
+		t.Errorf("getHead() did not find foo: %s, %v", head, err)
+	}
+	if node, err := getNode(nil, st, "foo", "99"); err == nil {
+		t.Errorf("getNode() should not have found foo @ 99: %v", node)
+	}
+	if node, err := getNode(nil, st, "fooxyz", "888"); err == nil {
+		t.Errorf("getNode() should not have found fooxyz @ 888: %v", node)
+	}
+	if hasNode(nil, st, "bar", "007") {
+		t.Error("hasNode() found DAG entries for non-syncable logs")
+	}
+}