syncbase/vsync: add sync watcher

Add the sync watcher thread to read batches of store log entries
and apply them to the sync metadata.  Provide a hook for the
SyncGroup create/join/leave methods to inform the watcher of
changes to the key prefixes to sync.

Change-Id: Ic30f02c5506d01a09dbddde245b0b149a62c72e4
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 3cbff62..ccb291d 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -72,14 +72,16 @@
 	if tx.err != nil {
 		return convertError(tx.err)
 	}
-	var err error
 	if !tx.st.managesKey(key) {
-		err = tx.itx.Put(key, value)
-	} else {
-		err = putVersioned(tx.itx, key, value)
-		tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+		return tx.itx.Put(key, value)
 	}
-	return err
+
+	version, err := putVersioned(tx.itx, key, value)
+	if err != nil {
+		return err
+	}
+	tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Version: version}})
+	return nil
 }
 
 // Delete implements the store.StoreWriter interface.
@@ -118,15 +120,15 @@
 	}
 	// Write LogEntry records.
 	timestamp := tx.st.clock.Now().UnixNano()
+	// TODO(rdaoud): switch to using a single counter for log entries
+	// instead of a (sequence, index) combo.
 	keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
 	for txSeq, op := range tx.ops {
 		key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
 		value := &LogEntry{
 			Op:              op,
 			CommitTimestamp: timestamp,
-			// TODO(sadovsky): This information is also captured in LogEntry keys.
-			// Optimize to avoid redundancy.
-			Continued: txSeq < len(tx.ops)-1,
+			Continued:       txSeq < len(tx.ops)-1,
 		}
 		if err := util.PutObject(tx.itx, key, value); err != nil {
 			return err
@@ -150,6 +152,22 @@
 	return tx.itx.Abort()
 }
 
+// AddSyncGroupOp injects a SyncGroup operation notification in the log entries
+// that the transaction writes when it is committed.  It allows the SyncGroup
+// operations (create, join, leave, destroy) to notify the sync watcher of the
+// change at its proper position in the timeline (the transaction commit).
+// Note: this is an internal function used by sync, not part of the interface.
+func AddSyncGroupOp(tx store.Transaction, prefixes []string, remove bool) error {
+	wtx := tx.(*transaction)
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+	wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: prefixes, Remove: remove}})
+	return nil
+}
+
 // Exported as a helper function for testing purposes
 func getLogEntryKeyPrefix(seq uint64) string {
 	// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.