syncbase/server/watchable: watch enhancements

Various enhancements to the watchable store and its use by sync:
* Add the ability to mark whether a transaction is make by sync
  vs applications.  This allows sync to filter out its own updates
  (echo suppression) from the watch log.
* Add the ability for sync to inject into the watch log snapshot
  entries to initialize DAG metadata when a SyncGroup is created
  or joined.  These entries act as fake-puts that sync uses but
  the future app-watch API will filter out.
* Change the key of watch log entries to use one sequence number
  and update the tests accordingly.
* Make ACLs managed (versioned) keys.
* Improve testing of sync watcher and refactor some code.

Change-Id: I02f6af04c995c5a5b17857c6213b51b6e38dcf13
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 9ad3e56..dd661ef 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -85,7 +85,7 @@
 		return nil, err
 	}
 	st, err = watchable.Wrap(st, &watchable.Options{
-		ManagedPrefixes: []string{util.RowPrefix},
+		ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
 	})
 	if err != nil {
 		return nil, err
diff --git a/x/ref/services/syncbase/server/watchable/store.go b/x/ref/services/syncbase/server/watchable/store.go
index 2aaa997..0e3bb0f 100644
--- a/x/ref/services/syncbase/server/watchable/store.go
+++ b/x/ref/services/syncbase/server/watchable/store.go
@@ -9,7 +9,7 @@
 // internal watching of store updates.
 //
 // LogEntry records are stored chronologically, using keys of the form
-// "$log:<seq>:<txSeq>". Sequence numbers are zero-padded to ensure that the
+// "$log:<seq>". Sequence numbers are zero-padded to ensure that the
 // lexicographic order matches the numeric order.
 //
 // Version number records are stored using keys of the form "$version:<key>",
@@ -49,6 +49,7 @@
 // Wrap returns a watchable.Store that wraps the given store.Store.
 func Wrap(st store.Store, opts *Options) (Store, error) {
 	// Determine initial value for seq.
+	// TODO(sadovsky): Consider using a bigger seq.
 	var seq uint64 = 0
 	// TODO(sadovsky): Perform a binary search to determine seq, or persist the
 	// current sequence number on every nth write so that at startup time we can
@@ -67,7 +68,7 @@
 	if advanced {
 		key := string(keybuf)
 		parts := split(key)
-		if len(parts) != 3 {
+		if len(parts) != 2 {
 			panic("wrong number of parts: " + key)
 		}
 		var err error
diff --git a/x/ref/services/syncbase/server/watchable/test_util.go b/x/ref/services/syncbase/server/watchable/test_util.go
index 5a5d6dc..d81d79e 100644
--- a/x/ref/services/syncbase/server/watchable/test_util.go
+++ b/x/ref/services/syncbase/server/watchable/test_util.go
@@ -7,6 +7,7 @@
 import (
 	"fmt"
 	"io/ioutil"
+	"math"
 	"time"
 
 	"v.io/syncbase/x/ref/services/syncbase/clock"
@@ -72,26 +73,45 @@
 	wst.clock.SetSystemClock(mockClock)
 }
 
+// LogEntryReader provides a stream-like interface to scan over the log entries
+// of a single batch, starting for a given sequence number.  It opens a stream
+// that scans the log from the sequence number given.  It stops after reading
+// the last entry in that batch (indicated by a false Continued flag).
 type LogEntryReader struct {
-	stream store.Stream
+	stream store.Stream // scan stream on the store Database
+	done   bool         // true after reading the last batch entry
+	key    string       // key of most recent log entry read
+	entry  LogEntry     // most recent log entry read
 }
 
 func NewLogEntryReader(st store.Store, seq uint64) *LogEntryReader {
-	stream := st.Scan([]byte(getLogEntryKeyPrefix(seq)), []byte(getLogEntryKeyPrefix(seq+1)))
+	stream := st.Scan([]byte(getLogEntryKey(seq)), []byte(getLogEntryKey(math.MaxUint64)))
 	return &LogEntryReader{stream: stream}
 }
 
 func (ler *LogEntryReader) Advance() bool {
-	return ler.stream.Advance()
+	if ler.done {
+		return false
+	}
+
+	if ler.stream.Advance() {
+		ler.key = string(ler.stream.Key(nil))
+		if err := vom.Decode(ler.stream.Value(nil), &ler.entry); err != nil {
+			panic(fmt.Errorf("Failed to decode LogEntry for key: %q", ler.key))
+		}
+		if ler.entry.Continued == false {
+			ler.done = true
+		}
+		return true
+	}
+
+	ler.key = ""
+	ler.entry = LogEntry{}
+	return false
 }
 
 func (ler *LogEntryReader) GetEntry() (string, LogEntry) {
-	key := string(ler.stream.Key(nil))
-	var entry LogEntry = LogEntry{}
-	if err := vom.Decode(ler.stream.Value(nil), &entry); err != nil {
-		panic(fmt.Errorf("Failed to decode LogEntry for key: %q", key))
-	}
-	return key, entry
+	return ler.key, ler.entry
 }
 
 ///////  Clock related utility code  ///////
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index ccb291d..d5206b1 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -11,6 +11,7 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/context"
 	"v.io/v23/verror"
 )
 
@@ -20,6 +21,11 @@
 	mu  sync.Mutex // protects the fields below
 	err error
 	ops []Op
+	// fromSync is true when a transaction is created by sync.  This causes
+	// the log entries written at commit time to have their "FromSync" field
+	// set to true.  That in turn causes the sync watcher to filter out such
+	// updates since sync already knows about them (echo suppression).
+	fromSync bool
 }
 
 var _ store.Transaction = (*transaction)(nil)
@@ -111,33 +117,30 @@
 	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
 	tx.st.mu.Lock()
 	defer tx.st.mu.Unlock()
-	// Check sequence numbers.
-	if uint64(len(tx.ops)) > math.MaxUint16 {
-		return verror.New(verror.ErrInternal, nil, "too many ops")
-	}
-	if tx.st.seq == math.MaxUint64 {
+	// Check if there is enough space left in the sequence number.
+	if (math.MaxUint64 - tx.st.seq) < uint64(len(tx.ops)) {
 		return verror.New(verror.ErrInternal, nil, "seq maxed out")
 	}
 	// Write LogEntry records.
 	timestamp := tx.st.clock.Now().UnixNano()
-	// TODO(rdaoud): switch to using a single counter for log entries
-	// instead of a (sequence, index) combo.
-	keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
-	for txSeq, op := range tx.ops {
-		key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
+	seq := tx.st.seq
+	for i, op := range tx.ops {
+		key := getLogEntryKey(seq)
 		value := &LogEntry{
 			Op:              op,
 			CommitTimestamp: timestamp,
-			Continued:       txSeq < len(tx.ops)-1,
+			FromSync:        tx.fromSync,
+			Continued:       i < len(tx.ops)-1,
 		}
 		if err := util.PutObject(tx.itx, key, value); err != nil {
 			return err
 		}
+		seq++
 	}
 	if err := tx.itx.Commit(); err != nil {
 		return err
 	}
-	tx.st.seq++
+	tx.st.seq = seq
 	return nil
 }
 
@@ -157,7 +160,7 @@
 // operations (create, join, leave, destroy) to notify the sync watcher of the
 // change at its proper position in the timeline (the transaction commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(tx store.Transaction, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
@@ -168,9 +171,45 @@
 	return nil
 }
 
+// AddSyncSnapshotOp injects a sync snapshot operation notification in the log
+// entries that the transaction writes when it is committed.  It allows the
+// SyncGroup create or join operations to notify the sync watcher of the
+// current keys and their versions to use when initializing the sync metadata
+// at the point in the timeline when these keys become syncable (at commit).
+// Note: this is an internal function used by sync, not part of the interface.
+func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
+	wtx := tx.(*transaction)
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+	if !wtx.st.managesKey(key) {
+		return verror.New(verror.ErrInternal, ctx,
+			fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
+	}
+	wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: key, Version: version}})
+	return nil
+}
+
+// SetTransactionFromSync marks this transaction as created by sync as opposed
+// to one created by an application.  The net effect is that, at commit time,
+// the log entries written are marked as made by sync.  This allows the sync
+// Watcher to ignore them (echo suppression) because it made these updates.
+// Note: this is an internal function used by sync, not part of the interface.
+// TODO(rdaoud): support a generic echo-suppression mechanism for apps as well
+// maybe by having a creator ID in the transaction and log entries.
+// TODO(rdaoud): fold this flag (or creator ID) into Tx options when available.
+func SetTransactionFromSync(tx store.Transaction) {
+	wtx := tx.(*transaction)
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	wtx.fromSync = true
+}
+
 // Exported as a helper function for testing purposes
-func getLogEntryKeyPrefix(seq uint64) string {
-	// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
+func getLogEntryKey(seq uint64) string {
+	// Note: MaxUint64 is 0xffffffffffffffff.
 	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
 	return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
 }
diff --git a/x/ref/services/syncbase/server/watchable/transaction_test.go b/x/ref/services/syncbase/server/watchable/transaction_test.go
index 54405a2..3625896 100644
--- a/x/ref/services/syncbase/server/watchable/transaction_test.go
+++ b/x/ref/services/syncbase/server/watchable/transaction_test.go
@@ -80,7 +80,8 @@
 		t.Errorf("Failed to wrap store for update")
 	}
 	seqForUpdate := getSeq(wst2)
-	if seqForUpdate != (seqForCreate + 1) {
+	// We expect the sequence number to have moved by +2 for the two puts.
+	if seqForUpdate != (seqForCreate + 2) {
 		t.Errorf("unexpected sequence number for update. seq for create: %d, seq for update: %d", seqForCreate, seqForUpdate)
 	}
 
diff --git a/x/ref/services/syncbase/server/watchable/types.vdl b/x/ref/services/syncbase/server/watchable/types.vdl
index 6f07a1c..3f5181b 100644
--- a/x/ref/services/syncbase/server/watchable/types.vdl
+++ b/x/ref/services/syncbase/server/watchable/types.vdl
@@ -36,13 +36,26 @@
 	Remove   bool
 }
 
+// SyncSnapshotOp represents a snapshot operation when creating and joining a
+// SyncGroup.  The sync watcher needs to get a snapshot of the Database at the
+// point of creating/joining a SyncGroup.  A SyncSnapshotOp entry is written to
+// the log for each Database key that falls within the SyncGroup prefixes.  This
+// allows sync to initialize its metadata at the correct versions of the objects
+// when they become syncable.  These log entries should be filtered by the
+// client-facing Watch interface because the user data did not actually change.
+type SyncSnapshotOp struct {
+	Key     []byte
+	Version []byte
+}
+
 // Op represents a store operation.
 type Op union {
-	Get       GetOp
-	Scan      ScanOp
-	Put       PutOp
-	Delete    DeleteOp
-	SyncGroup SyncGroupOp
+	Get          GetOp
+	Scan         ScanOp
+	Put          PutOp
+	Delete       DeleteOp
+	SyncGroup    SyncGroupOp
+	SyncSnapshot SyncSnapshotOp
 }
 
 // LogEntry represents a single store operation. This operation may have been
@@ -55,6 +68,9 @@
 	// Time when the operation was committed.
 	CommitTimestamp int64
 
+	// Operation came from sync (used for echo suppression).
+	FromSync bool
+
 	// If true, this entry is followed by more entries that belong to the same
 	// commit as this entry.
 	Continued bool
diff --git a/x/ref/services/syncbase/server/watchable/types.vdl.go b/x/ref/services/syncbase/server/watchable/types.vdl.go
index a4f0619..5fd2e04 100644
--- a/x/ref/services/syncbase/server/watchable/types.vdl.go
+++ b/x/ref/services/syncbase/server/watchable/types.vdl.go
@@ -69,6 +69,23 @@
 }) {
 }
 
+// SyncSnapshotOp represents a snapshot operation when creating and joining a
+// SyncGroup.  The sync watcher needs to get a snapshot of the Database at the
+// point of creating/joining a SyncGroup.  A SyncSnapshotOp entry is written to
+// the log for each Database key that falls within the SyncGroup prefixes.  This
+// allows sync to initialize its metadata at the correct versions of the objects
+// when they become syncable.  These log entries should be filtered by the
+// client-facing Watch interface because the user data did not actually change.
+type SyncSnapshotOp struct {
+	Key     []byte
+	Version []byte
+}
+
+func (SyncSnapshotOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.SyncSnapshotOp"`
+}) {
+}
+
 type (
 	// Op represents any single field of the Op union type.
 	//
@@ -93,16 +110,19 @@
 	OpDelete struct{ Value DeleteOp }
 	// OpSyncGroup represents field SyncGroup of the Op union type.
 	OpSyncGroup struct{ Value SyncGroupOp }
+	// OpSyncSnapshot represents field SyncSnapshot of the Op union type.
+	OpSyncSnapshot struct{ Value SyncSnapshotOp }
 	// __OpReflect describes the Op union type.
 	__OpReflect struct {
 		Name  string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
 		Type  Op
 		Union struct {
-			Get       OpGet
-			Scan      OpScan
-			Put       OpPut
-			Delete    OpDelete
-			SyncGroup OpSyncGroup
+			Get          OpGet
+			Scan         OpScan
+			Put          OpPut
+			Delete       OpDelete
+			SyncGroup    OpSyncGroup
+			SyncSnapshot OpSyncSnapshot
 		}
 	}
 )
@@ -132,6 +152,11 @@
 func (x OpSyncGroup) Name() string             { return "SyncGroup" }
 func (x OpSyncGroup) __VDLReflect(__OpReflect) {}
 
+func (x OpSyncSnapshot) Index() int               { return 5 }
+func (x OpSyncSnapshot) Interface() interface{}   { return x.Value }
+func (x OpSyncSnapshot) Name() string             { return "SyncSnapshot" }
+func (x OpSyncSnapshot) __VDLReflect(__OpReflect) {}
+
 // LogEntry represents a single store operation. This operation may have been
 // part of a transaction, as signified by the Continued boolean. Read-only
 // operations (and read-only transactions) are not logged.
@@ -140,6 +165,8 @@
 	Op Op
 	// Time when the operation was committed.
 	CommitTimestamp int64
+	// Operation came from sync (used for echo suppression).
+	FromSync bool
 	// If true, this entry is followed by more entries that belong to the same
 	// commit as this entry.
 	Continued bool
@@ -156,6 +183,7 @@
 	vdl.Register((*PutOp)(nil))
 	vdl.Register((*DeleteOp)(nil))
 	vdl.Register((*SyncGroupOp)(nil))
+	vdl.Register((*SyncSnapshotOp)(nil))
 	vdl.Register((*Op)(nil))
 	vdl.Register((*LogEntry)(nil))
 }
diff --git a/x/ref/services/syncbase/vsync/dag_test.go b/x/ref/services/syncbase/vsync/dag_test.go
index 6177079..7dd7175 100644
--- a/x/ref/services/syncbase/vsync/dag_test.go
+++ b/x/ref/services/syncbase/vsync/dag_test.go
@@ -14,8 +14,8 @@
 	"testing"
 
 	"v.io/syncbase/x/ref/services/syncbase/store"
-
 	"v.io/v23/context"
+	_ "v.io/x/ref/runtime/factories/generic"
 )
 
 // TestSetNode tests setting and getting a DAG node.
diff --git a/x/ref/services/syncbase/vsync/initiator_test.go b/x/ref/services/syncbase/vsync/initiator_test.go
index 83fc8f4..bfcb250 100644
--- a/x/ref/services/syncbase/vsync/initiator_test.go
+++ b/x/ref/services/syncbase/vsync/initiator_test.go
@@ -14,6 +14,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
+	_ "v.io/x/ref/runtime/factories/generic"
 )
 
 // TestLogStreamRemoteOnly tests processing of a remote log stream. Commands are
diff --git a/x/ref/services/syncbase/vsync/responder_test.go b/x/ref/services/syncbase/vsync/responder_test.go
index e2158b2..82c0d94 100644
--- a/x/ref/services/syncbase/vsync/responder_test.go
+++ b/x/ref/services/syncbase/vsync/responder_test.go
@@ -15,6 +15,7 @@
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
+	_ "v.io/x/ref/runtime/factories/generic"
 )
 
 // TestDiffPrefixGenVectors tests diffing prefix gen vectors.
diff --git a/x/ref/services/syncbase/vsync/sync_state_test.go b/x/ref/services/syncbase/vsync/sync_state_test.go
index aa232e4..437a373 100644
--- a/x/ref/services/syncbase/vsync/sync_state_test.go
+++ b/x/ref/services/syncbase/vsync/sync_state_test.go
@@ -11,6 +11,7 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/store"
+	_ "v.io/x/ref/runtime/factories/generic"
 )
 
 // Tests for sync state management and storage in Syncbase.
diff --git a/x/ref/services/syncbase/vsync/syncgroup_test.go b/x/ref/services/syncbase/vsync/syncgroup_test.go
index c37dba7..396a6bd 100644
--- a/x/ref/services/syncbase/vsync/syncgroup_test.go
+++ b/x/ref/services/syncbase/vsync/syncgroup_test.go
@@ -14,6 +14,7 @@
 	"v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/store"
+	_ "v.io/x/ref/runtime/factories/generic"
 )
 
 // checkSGStats verifies SyncGroup stats.
diff --git a/x/ref/services/syncbase/vsync/util_test.go b/x/ref/services/syncbase/vsync/test_util.go
similarity index 90%
rename from x/ref/services/syncbase/vsync/util_test.go
rename to x/ref/services/syncbase/vsync/test_util.go
index 3e6b629..92540f1 100644
--- a/x/ref/services/syncbase/vsync/util_test.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -20,7 +20,6 @@
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	"v.io/v23/verror"
-	_ "v.io/x/ref/runtime/factories/generic"
 	"v.io/x/ref/test"
 )
 
@@ -127,7 +126,7 @@
 		t.Fatalf("cannot create store %s (%s): %v", engine, path, err)
 	}
 	st, err = watchable.Wrap(st, &watchable.Options{
-		ManagedPrefixes: []string{util.RowPrefix},
+		ManagedPrefixes: []string{util.RowPrefix, util.PermsPrefix},
 	})
 
 	s := &mockService{
@@ -151,3 +150,13 @@
 		t.Fatalf("cannot destroy store %s (%s): %v", s.engine, s.path, err)
 	}
 }
+
+// makeResMark returns the resume marker for a given log entry position.
+func makeResMark(pos int) string {
+	return util.JoinKeyParts(util.LogPrefix, fmt.Sprintf("%016x", pos))
+}
+
+// makeRowKey returns the database row key for a given application key.
+func makeRowKey(key string) string {
+	return util.JoinKeyParts(util.RowPrefix, key)
+}
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index bbd1ac8..804907c 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -119,13 +119,13 @@
 	}
 
 	// Filter out the log entries for keys not part of any SyncGroup.
-	// TODO(rdaoud): filter out entries made by sync (echo suppression).
+	// Ignore as well log entries made by sync (echo suppression).
 	totalCount := uint64(len(dataLogs))
 	appdb := appDbName(appName, dbName)
 	logs = make([]*watchable.LogEntry, 0, len(dataLogs))
 
 	for _, entry := range dataLogs {
-		if syncable(appdb, entry) {
+		if !entry.FromSync && syncable(appdb, entry) {
 			logs = append(logs, entry)
 		}
 	}
@@ -323,51 +323,53 @@
 // key or probably in a value wrapper that would contain other metadata.
 func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) *localLogRec {
 	_ = tx.(store.Transaction)
+	var rec *localLogRec
+	timestamp := logEnt.CommitTimestamp
 
 	switch op := logEnt.Op.(type) {
 	case *watchable.OpGet:
 		// TODO(rdaoud): save read-set in sync.
-		return nil
 
 	case *watchable.OpScan:
 		// TODO(rdaoud): save scan-set in sync.
-		return nil
 
 	case *watchable.OpPut:
-		rec := localLogRec{}
-		oid := string(op.Value.Key)
-		rec.Metadata.ObjId = oid
-		rec.Metadata.CurVers = string(op.Value.Version)
-		if head, err := getHead(ctx, tx, oid); err == nil {
-			rec.Metadata.Parents = []string{head}
-		} else if verror.ErrorID(err) != verror.ErrNoExist.ID {
-			vlog.Fatalf("cannot getHead to convert Put log record for %s: %v", oid, err)
-		}
-		rec.Metadata.UpdTime = unixNanoToTime(logEnt.CommitTimestamp)
-		return &rec
+		rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
+
+	case *watchable.OpSyncSnapshot:
+		rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
 
 	case *watchable.OpDelete:
-		rec := localLogRec{}
-		oid := string(op.Value.Key)
-		rec.Metadata.ObjId = oid
-		rec.Metadata.CurVers = string(watchable.NewVersion())
-		rec.Metadata.Delete = true
-		if head, err := getHead(ctx, tx, oid); err == nil {
-			rec.Metadata.Parents = []string{head}
-		} else {
-			vlog.Fatalf("cannot getHead to convert Delete log record for %s: %v", oid, err)
-		}
-		rec.Metadata.UpdTime = unixNanoToTime(logEnt.CommitTimestamp)
-		return &rec
+		rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
 
 	case *watchable.OpSyncGroup:
 		vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt)
-		return nil
 
 	default:
 		vlog.Errorf("invalid watch LogEntry: %v", logEnt)
-		return nil
 	}
+
+	return rec
+}
+
+// newLocalLogRec creates a local sync log record given its information: key,
+// version, deletion flag, and timestamp.  It retrieves the current DAG head
+// for the key (if one exists) to use as its parent (previous) version.
+func newLocalLogRec(ctx *context.T, tx store.StoreReadWriter, key, version []byte, deleted bool, timestamp int64) *localLogRec {
+	_ = tx.(store.Transaction)
+
+	rec := localLogRec{}
+	oid := string(key)
+	rec.Metadata.ObjId = oid
+	rec.Metadata.CurVers = string(version)
+	rec.Metadata.Delete = deleted
+	if head, err := getHead(ctx, tx, oid); err == nil {
+		rec.Metadata.Parents = []string{head}
+	} else if deleted || (verror.ErrorID(err) != verror.ErrNoExist.ID) {
+		vlog.Fatalf("cannot getHead to convert log record for %s: %v", oid, err)
+	}
+	rec.Metadata.UpdTime = unixNanoToTime(timestamp)
+	return &rec
 }
 
 // processSyncGroupLogRecord checks if the log entry is a SyncGroup update and,
@@ -401,10 +403,21 @@
 		key = string(op.Value.Key)
 	case *watchable.OpDelete:
 		key = string(op.Value.Key)
+	case *watchable.OpSyncSnapshot:
+		key = string(op.Value.Key)
 	default:
 		return false
 	}
 
+	// The key starts with one of the store's reserved prefixes for managed
+	// namespaced (e.g. $row or $perm).  Remove that prefix before comparing
+	// it with the SyncGroup prefixes which are defined by the application.
+	parts := util.SplitKeyParts(key)
+	if len(parts) < 2 {
+		vlog.Fatalf("syncable: %s: invalid entry key %s: %v", appdb, key, logEnt)
+	}
+	key = util.JoinKeyParts(parts[1:]...)
+
 	for prefix := range watchPrefixes[appdb] {
 		if strings.HasPrefix(key, prefix) {
 			return true
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
index ee5e600..aa7a72f 100644
--- a/x/ref/services/syncbase/vsync/watcher_test.go
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -7,10 +7,15 @@
 // Tests for the sync watcher in Syncbase.
 
 import (
+	"bytes"
+	"fmt"
 	"reflect"
 	"testing"
+	"time"
 
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
+	_ "v.io/x/ref/runtime/factories/generic"
 )
 
 // TestSetResmark tests setting and getting a resume marker.
@@ -42,6 +47,7 @@
 
 // TestWatchPrefixes tests setting and updating the watch prefixes map.
 func TestWatchPrefixes(t *testing.T) {
+	watchPollInterval = time.Millisecond
 	svc := createService(t)
 	defer destroyService(t, svc)
 
@@ -111,7 +117,7 @@
 	for _, test := range checkSyncableTests {
 		log := &watchable.LogEntry{
 			Op: &watchable.OpPut{
-				watchable.PutOp{Key: []byte(test.key)},
+				watchable.PutOp{Key: []byte(makeRowKey(test.key))},
 			},
 		}
 		res := syncable(appDbName(test.appName, test.dbName), log)
@@ -151,14 +157,17 @@
 	s := svc.sync
 
 	app, db := "mockapp", "mockdb"
+	fooKey := makeRowKey("foo")
+	barKey := makeRowKey("bar")
+	fooxyzKey := makeRowKey("fooxyz")
 
 	// Empty logs does not fail.
 	s.processWatchLogBatch(nil, app, db, st, nil, "")
 
 	// Non-syncable logs.
 	batch := []*watchable.LogEntry{
-		newLog("foo", "123", false),
-		newLog("bar", "555", false),
+		newLog(fooKey, "123", false),
+		newLog(barKey, "555", false),
 	}
 
 	resmark := "abcd"
@@ -167,16 +176,16 @@
 	if res, err := getResMark(nil, st); err != nil && res != resmark {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
 	}
-	if hasNode(nil, st, "foo", "123") || hasNode(nil, st, "bar", "555") {
+	if hasNode(nil, st, fooKey, "123") || hasNode(nil, st, barKey, "555") {
 		t.Error("hasNode() found DAG entries for non-syncable logs")
 	}
 
 	// Partially syncable logs.
 	batch = []*watchable.LogEntry{
 		newSGLog([]string{"f", "x"}, false),
-		newLog("foo", "333", false),
-		newLog("fooxyz", "444", false),
-		newLog("bar", "222", false),
+		newLog(fooKey, "333", false),
+		newLog(fooxyzKey, "444", false),
+		newLog(barKey, "222", false),
 	}
 
 	resmark = "cdef"
@@ -185,33 +194,75 @@
 	if res, err := getResMark(nil, st); err != nil && res != resmark {
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
 	}
-	if head, err := getHead(nil, st, "foo"); err != nil && head != "333" {
+	if head, err := getHead(nil, st, fooKey); err != nil && head != "333" {
 		t.Errorf("getHead() did not find foo: %s, %v", head, err)
 	}
-	node, err := getNode(nil, st, "foo", "333")
+	node, err := getNode(nil, st, fooKey, "333")
 	if err != nil {
 		t.Errorf("getNode() did not find foo: %v", err)
 	}
 	if node.Level != 0 || node.Parents != nil || node.Logrec == "" || node.BatchId == NoBatchId {
 		t.Errorf("invalid DAG node for foo: %v", node)
 	}
-	node2, err := getNode(nil, st, "fooxyz", "444")
+	node2, err := getNode(nil, st, fooxyzKey, "444")
 	if err != nil {
 		t.Errorf("getNode() did not find fooxyz: %v", err)
 	}
 	if node2.Level != 0 || node2.Parents != nil || node2.Logrec == "" || node2.BatchId != node.BatchId {
 		t.Errorf("invalid DAG node for fooxyz: %v", node2)
 	}
-	if hasNode(nil, st, "bar", "222") {
+	if hasNode(nil, st, barKey, "222") {
+		t.Error("hasNode() found DAG entries for non-syncable logs")
+	}
+
+	// More partially syncable logs updating existing ones.
+	batch = []*watchable.LogEntry{
+		newLog(fooKey, "1", false),
+		newLog(fooxyzKey, "", true),
+		newLog(barKey, "7", false),
+	}
+
+	resmark = "ghij"
+	s.processWatchLogBatch(nil, app, db, st, batch, resmark)
+
+	if res, err := getResMark(nil, st); err != nil && res != resmark {
+		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
+	}
+	if head, err := getHead(nil, st, fooKey); err != nil && head != "1" {
+		t.Errorf("getHead() did not find foo: %s, %v", head, err)
+	}
+	node, err = getNode(nil, st, fooKey, "1")
+	if err != nil {
+		t.Errorf("getNode() did not find foo: %v", err)
+	}
+	expParents := []string{"333"}
+	if node.Level != 1 || !reflect.DeepEqual(node.Parents, expParents) ||
+		node.Logrec == "" || node.BatchId == NoBatchId {
+		t.Errorf("invalid DAG node for foo: %v", node)
+	}
+	head2, err := getHead(nil, st, fooxyzKey)
+	if err != nil {
+		t.Errorf("getHead() did not find fooxyz: %v", err)
+	}
+	node2, err = getNode(nil, st, fooxyzKey, head2)
+	if err != nil {
+		t.Errorf("getNode() did not find fooxyz: %v", err)
+	}
+	expParents = []string{"444"}
+	if node2.Level != 1 || !reflect.DeepEqual(node2.Parents, expParents) ||
+		node2.Logrec == "" || node2.BatchId == NoBatchId {
+		t.Errorf("invalid DAG node for fooxyz: %v", node2)
+	}
+	if hasNode(nil, st, barKey, "7") {
 		t.Error("hasNode() found DAG entries for non-syncable logs")
 	}
 
 	// Back to non-syncable logs (remove "f" prefix).
 	batch = []*watchable.LogEntry{
 		newSGLog([]string{"f"}, true),
-		newLog("foo", "99", false),
-		newLog("fooxyz", "888", true),
-		newLog("bar", "007", false),
+		newLog(fooKey, "99", false),
+		newLog(fooxyzKey, "888", true),
+		newLog(barKey, "007", false),
 	}
 
 	resmark = "tuvw"
@@ -221,16 +272,92 @@
 		t.Errorf("invalid resmark batch processing: got %s instead of %s", res, resmark)
 	}
 	// No changes to "foo".
-	if head, err := getHead(nil, st, "foo"); err != nil && head != "333" {
+	if head, err := getHead(nil, st, fooKey); err != nil && head != "333" {
 		t.Errorf("getHead() did not find foo: %s, %v", head, err)
 	}
-	if node, err := getNode(nil, st, "foo", "99"); err == nil {
+	if node, err := getNode(nil, st, fooKey, "99"); err == nil {
 		t.Errorf("getNode() should not have found foo @ 99: %v", node)
 	}
-	if node, err := getNode(nil, st, "fooxyz", "888"); err == nil {
+	if node, err := getNode(nil, st, fooxyzKey, "888"); err == nil {
 		t.Errorf("getNode() should not have found fooxyz @ 888: %v", node)
 	}
-	if hasNode(nil, st, "bar", "007") {
+	if hasNode(nil, st, barKey, "007") {
 		t.Error("hasNode() found DAG entries for non-syncable logs")
 	}
 }
+
+// TestGetWatchLogBatch tests fetching a batch of log records.
+func TestGetWatchLogBatch(t *testing.T) {
+	svc := createService(t)
+	defer destroyService(t, svc)
+	st := svc.St()
+
+	// Create a set of batches to fill the log queue.
+	numTx, numPut := 3, 4
+
+	makeKeyVal := func(batchNum, recNum int) ([]byte, []byte) {
+		key := util.JoinKeyParts(util.RowPrefix, fmt.Sprintf("foo-%d-%d", batchNum, recNum))
+		val := fmt.Sprintf("val-%d-%d", batchNum, recNum)
+		return []byte(key), []byte(val)
+	}
+
+	for i := 0; i < numTx; i++ {
+		tx := st.NewTransaction()
+		for j := 0; j < numPut; j++ {
+			key, val := makeKeyVal(i, j)
+			if err := tx.Put(key, val); err != nil {
+				t.Errorf("cannot put %s (%s): %v", key, val, err)
+			}
+		}
+		tx.Commit()
+	}
+
+	// Fetch the batches and a few more empty fetches and verify them.
+	app, db := "mockapp", "mockdb"
+	resmark := ""
+	count := 0
+
+	for i := 0; i < (numTx + 3); i++ {
+		logs, newResmark := getWatchLogBatch(nil, app, db, st, resmark)
+		if i < numTx {
+			if len(logs) != numPut {
+				t.Errorf("log fetch (i=%d) wrong log count: %d instead of %d",
+					i, len(logs), numPut)
+			}
+
+			count += len(logs)
+			expResmark := makeResMark(count - 1)
+			if newResmark != expResmark {
+				t.Errorf("log fetch (i=%d) wrong resmark: %s instead of %s",
+					i, newResmark, expResmark)
+			}
+
+			for j, log := range logs {
+				op := log.Op.(watchable.OpPut)
+				expKey, expVal := makeKeyVal(i, j)
+				key := op.Value.Key
+				if !bytes.Equal(key, expKey) {
+					t.Errorf("log fetch (i=%d, j=%d) bad key: %s instead of %s",
+						i, j, key, expKey)
+				}
+				tx := st.NewTransaction()
+				var val []byte
+				val, err := watchable.GetAtVersion(nil, tx, key, val, op.Value.Version)
+				if err != nil {
+					t.Errorf("log fetch (i=%d, j=%d) cannot GetAtVersion(): %v", i, j, err)
+				}
+				if !bytes.Equal(val, expVal) {
+					t.Errorf("log fetch (i=%d, j=%d) bad value: %s instead of %s",
+						i, j, val, expVal)
+				}
+				tx.Abort()
+			}
+		} else {
+			if logs != nil || newResmark != resmark {
+				t.Errorf("NOP log fetch (i=%d) had changes: %d logs, resmask %s",
+					i, len(logs), newResmark)
+			}
+		}
+		resmark = newResmark
+	}
+}