syncbase/server/watchable: watch enhancements

Various enhancements to the watchable store and its use by sync:
* Add the ability to mark whether a transaction is make by sync
  vs applications.  This allows sync to filter out its own updates
  (echo suppression) from the watch log.
* Add the ability for sync to inject into the watch log snapshot
  entries to initialize DAG metadata when a SyncGroup is created
  or joined.  These entries act as fake-puts that sync uses but
  the future app-watch API will filter out.
* Change the key of watch log entries to use one sequence number
  and update the tests accordingly.
* Make ACLs managed (versioned) keys.
* Improve testing of sync watcher and refactor some code.

Change-Id: I02f6af04c995c5a5b17857c6213b51b6e38dcf13
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index ccb291d..d5206b1 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -11,6 +11,7 @@
 
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/context"
 	"v.io/v23/verror"
 )
 
@@ -20,6 +21,11 @@
 	mu  sync.Mutex // protects the fields below
 	err error
 	ops []Op
+	// fromSync is true when a transaction is created by sync.  This causes
+	// the log entries written at commit time to have their "FromSync" field
+	// set to true.  That in turn causes the sync watcher to filter out such
+	// updates since sync already knows about them (echo suppression).
+	fromSync bool
 }
 
 var _ store.Transaction = (*transaction)(nil)
@@ -111,33 +117,30 @@
 	tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
 	tx.st.mu.Lock()
 	defer tx.st.mu.Unlock()
-	// Check sequence numbers.
-	if uint64(len(tx.ops)) > math.MaxUint16 {
-		return verror.New(verror.ErrInternal, nil, "too many ops")
-	}
-	if tx.st.seq == math.MaxUint64 {
+	// Check if there is enough space left in the sequence number.
+	if (math.MaxUint64 - tx.st.seq) < uint64(len(tx.ops)) {
 		return verror.New(verror.ErrInternal, nil, "seq maxed out")
 	}
 	// Write LogEntry records.
 	timestamp := tx.st.clock.Now().UnixNano()
-	// TODO(rdaoud): switch to using a single counter for log entries
-	// instead of a (sequence, index) combo.
-	keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
-	for txSeq, op := range tx.ops {
-		key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
+	seq := tx.st.seq
+	for i, op := range tx.ops {
+		key := getLogEntryKey(seq)
 		value := &LogEntry{
 			Op:              op,
 			CommitTimestamp: timestamp,
-			Continued:       txSeq < len(tx.ops)-1,
+			FromSync:        tx.fromSync,
+			Continued:       i < len(tx.ops)-1,
 		}
 		if err := util.PutObject(tx.itx, key, value); err != nil {
 			return err
 		}
+		seq++
 	}
 	if err := tx.itx.Commit(); err != nil {
 		return err
 	}
-	tx.st.seq++
+	tx.st.seq = seq
 	return nil
 }
 
@@ -157,7 +160,7 @@
 // operations (create, join, leave, destroy) to notify the sync watcher of the
 // change at its proper position in the timeline (the transaction commit).
 // Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(tx store.Transaction, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
 	wtx := tx.(*transaction)
 	wtx.mu.Lock()
 	defer wtx.mu.Unlock()
@@ -168,9 +171,45 @@
 	return nil
 }
 
+// AddSyncSnapshotOp injects a sync snapshot operation notification in the log
+// entries that the transaction writes when it is committed.  It allows the
+// SyncGroup create or join operations to notify the sync watcher of the
+// current keys and their versions to use when initializing the sync metadata
+// at the point in the timeline when these keys become syncable (at commit).
+// Note: this is an internal function used by sync, not part of the interface.
+func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
+	wtx := tx.(*transaction)
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+	if !wtx.st.managesKey(key) {
+		return verror.New(verror.ErrInternal, ctx,
+			fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
+	}
+	wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: key, Version: version}})
+	return nil
+}
+
+// SetTransactionFromSync marks this transaction as created by sync as opposed
+// to one created by an application.  The net effect is that, at commit time,
+// the log entries written are marked as made by sync.  This allows the sync
+// Watcher to ignore them (echo suppression) because it made these updates.
+// Note: this is an internal function used by sync, not part of the interface.
+// TODO(rdaoud): support a generic echo-suppression mechanism for apps as well
+// maybe by having a creator ID in the transaction and log entries.
+// TODO(rdaoud): fold this flag (or creator ID) into Tx options when available.
+func SetTransactionFromSync(tx store.Transaction) {
+	wtx := tx.(*transaction)
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	wtx.fromSync = true
+}
+
 // Exported as a helper function for testing purposes
-func getLogEntryKeyPrefix(seq uint64) string {
-	// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
+func getLogEntryKey(seq uint64) string {
+	// Note: MaxUint64 is 0xffffffffffffffff.
 	// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
 	return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
 }