syncbase/vsync: add sync watcher

Add the sync watcher thread to read batches of store log entries
and apply them to the sync metadata.  Provide a hook for the
SyncGroup create/join/leave methods to inform the watcher of
changes to the key prefixes to sync.

Change-Id: Ic30f02c5506d01a09dbddde245b0b149a62c72e4
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 3cbff62..ccb291d 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -72,14 +72,16 @@
 	if tx.err != nil {
 		return convertError(tx.err)
 	}
-	var err error
 	if !tx.st.managesKey(key) {
-		err = tx.itx.Put(key, value)
-	} else {
-		err = putVersioned(tx.itx, key, value)
-		tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+		return tx.itx.Put(key, value)
 	}
-	return err
+
+	version, err := putVersioned(tx.itx, key, value)
+	if err != nil {
+		return err
+	}
+	tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Version: version}})
+	return nil
 }
 
 // Delete implements the store.StoreWriter interface.
@@ -118,15 +120,15 @@
 	}
 	// Write LogEntry records.
 	timestamp := tx.st.clock.Now().UnixNano()
+	// TODO(rdaoud): switch to using a single counter for log entries
+	// instead of a (sequence, index) combo.
 	keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
 	for txSeq, op := range tx.ops {
 		key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
 		value := &LogEntry{
 			Op:              op,
 			CommitTimestamp: timestamp,
-			// TODO(sadovsky): This information is also captured in LogEntry keys.
-			// Optimize to avoid redundancy.
-			Continued: txSeq < len(tx.ops)-1,
+			Continued:       txSeq < len(tx.ops)-1,
 		}
 		if err := util.PutObject(tx.itx, key, value); err != nil {
 			return err
@@ -150,6 +152,22 @@
 	return tx.itx.Abort()
 }
 
+// AddSyncGroupOp injects a SyncGroup operation notification in the log entries
+// that the transaction writes when it is committed.  It allows the SyncGroup
+// operations (create, join, leave, destroy) to notify the sync watcher of the
+// change at its proper position in the timeline (the transaction commit).
+// Note: this is an internal function used by sync, not part of the interface.
+func AddSyncGroupOp(tx store.Transaction, prefixes []string, remove bool) error {
+	wtx := tx.(*transaction)
+	wtx.mu.Lock()
+	defer wtx.mu.Unlock()
+	if wtx.err != nil {
+		return convertError(wtx.err)
+	}
+	wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: prefixes, Remove: remove}})
+	return nil
+}
+
 // Exported as a helper function for testing purposes
 func getLogEntryKeyPrefix(seq uint64) string {
 	// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
diff --git a/services/syncbase/server/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
index 07d1ce1..6f07a1c 100644
--- a/services/syncbase/server/watchable/types.vdl
+++ b/services/syncbase/server/watchable/types.vdl
@@ -15,10 +15,12 @@
 	Limit []byte
 }
 
-// PutOp represents a store put operation.
+// PutOp represents a store put operation.  The new version is written instead
+// of the value to avoid duplicating the user data in the store.  The version
+// is used to access the user data of that specific mutation.
 type PutOp struct {
-	Key   []byte
-	Value []byte
+	Key     []byte
+	Version []byte
 }
 
 // DeleteOp represents a store delete operation.
@@ -26,12 +28,21 @@
 	Key []byte
 }
 
+// SyncGroupOp represents a change in SyncGroup tracking, adding or removing
+// key prefixes to sync.  SyncGroup prefixes cannot be changed, this is used
+// to track changes due to SyncGroup create/join/leave/destroy.
+type SyncGroupOp struct {
+	Prefixes []string
+	Remove   bool
+}
+
 // Op represents a store operation.
 type Op union {
-	Get    GetOp
-	Scan   ScanOp
-	Put    PutOp
-	Delete DeleteOp
+	Get       GetOp
+	Scan      ScanOp
+	Put       PutOp
+	Delete    DeleteOp
+	SyncGroup SyncGroupOp
 }
 
 // LogEntry represents a single store operation. This operation may have been
diff --git a/services/syncbase/server/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
index e94587a..a4f0619 100644
--- a/services/syncbase/server/watchable/types.vdl.go
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -33,10 +33,12 @@
 }) {
 }
 
-// PutOp represents a store put operation.
+// PutOp represents a store put operation.  The new version is written instead
+// of the value to avoid duplicating the user data in the store.  The version
+// is used to access the user data of that specific mutation.
 type PutOp struct {
-	Key   []byte
-	Value []byte
+	Key     []byte
+	Version []byte
 }
 
 func (PutOp) __VDLReflect(struct {
@@ -54,6 +56,19 @@
 }) {
 }
 
+// SyncGroupOp represents a change in SyncGroup tracking, adding or removing
+// key prefixes to sync.  SyncGroup prefixes cannot be changed, this is used
+// to track changes due to SyncGroup create/join/leave/destroy.
+type SyncGroupOp struct {
+	Prefixes []string
+	Remove   bool
+}
+
+func (SyncGroupOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.SyncGroupOp"`
+}) {
+}
+
 type (
 	// Op represents any single field of the Op union type.
 	//
@@ -76,15 +91,18 @@
 	OpPut struct{ Value PutOp }
 	// OpDelete represents field Delete of the Op union type.
 	OpDelete struct{ Value DeleteOp }
+	// OpSyncGroup represents field SyncGroup of the Op union type.
+	OpSyncGroup struct{ Value SyncGroupOp }
 	// __OpReflect describes the Op union type.
 	__OpReflect struct {
 		Name  string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
 		Type  Op
 		Union struct {
-			Get    OpGet
-			Scan   OpScan
-			Put    OpPut
-			Delete OpDelete
+			Get       OpGet
+			Scan      OpScan
+			Put       OpPut
+			Delete    OpDelete
+			SyncGroup OpSyncGroup
 		}
 	}
 )
@@ -109,6 +127,11 @@
 func (x OpDelete) Name() string             { return "Delete" }
 func (x OpDelete) __VDLReflect(__OpReflect) {}
 
+func (x OpSyncGroup) Index() int               { return 4 }
+func (x OpSyncGroup) Interface() interface{}   { return x.Value }
+func (x OpSyncGroup) Name() string             { return "SyncGroup" }
+func (x OpSyncGroup) __VDLReflect(__OpReflect) {}
+
 // LogEntry represents a single store operation. This operation may have been
 // part of a transaction, as signified by the Continued boolean. Read-only
 // operations (and read-only transactions) are not logged.
@@ -132,6 +155,7 @@
 	vdl.Register((*ScanOp)(nil))
 	vdl.Register((*PutOp)(nil))
 	vdl.Register((*DeleteOp)(nil))
+	vdl.Register((*SyncGroupOp)(nil))
 	vdl.Register((*Op)(nil))
 	vdl.Register((*LogEntry)(nil))
 }
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
index 3dc3e78..036d326 100644
--- a/services/syncbase/server/watchable/util.go
+++ b/services/syncbase/server/watchable/util.go
@@ -7,6 +7,10 @@
 // TODO(sadovsky): Avoid copying back and forth between []byte's and strings.
 // We should probably convert incoming strings to []byte's as early as possible,
 // and deal exclusively in []byte's internally.
+// TODO(rdaoud): I propose we standardize on key and version being strings and
+// the value being []byte within Syncbase.  We define invalid characters in the
+// key space (and reserve "$" and ":").  The lower storage engine layers are
+// free to map that to what they need internally ([]byte or string).
 
 import (
 	"fmt"
@@ -25,6 +29,19 @@
 	rngLock sync.Mutex
 )
 
+// NewVersion returns a new version for a store entry mutation.
+func NewVersion() []byte {
+	// TODO(rdaoud): revisit the number of bits: should we use 128 bits?
+	// Note: the version has to be unique per object key, not on its own.
+	// TODO(rdaoud): move sync's rand64() to a general Syncbase spot and
+	// reuse it here.
+	rngLock.Lock()
+	num := rng.Int63()
+	rngLock.Unlock()
+
+	return []byte(fmt.Sprintf("%x", num))
+}
+
 func makeVersionKey(key []byte) []byte {
 	return []byte(join(util.VersionPrefix, string(key)))
 }
@@ -109,20 +126,19 @@
 	if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
 		return err
 	}
-	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Value: version}})
+	wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
 	return nil
 }
 
-func putVersioned(tx store.Transaction, key, value []byte) error {
-	rngLock.Lock()
-	num := rng.Int63()
-	rngLock.Unlock()
-
-	version := []byte(fmt.Sprintf("%x", num))
+func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
+	version := NewVersion()
 	if err := tx.Put(makeVersionKey(key), version); err != nil {
-		return err
+		return nil, err
 	}
-	return tx.Put(makeAtVersionKey(key, version), value)
+	if err := tx.Put(makeAtVersionKey(key, version), value); err != nil {
+		return nil, err
+	}
+	return version, nil
 }
 
 func deleteVersioned(tx store.Transaction, key []byte) error {