syncbase/vsync: add sync watcher
Add the sync watcher thread to read batches of store log entries
and apply them to the sync metadata. Provide a hook for the
SyncGroup create/join/leave methods to inform the watcher of
changes to the key prefixes to sync.
Change-Id: Ic30f02c5506d01a09dbddde245b0b149a62c72e4
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 3cbff62..ccb291d 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -72,14 +72,16 @@
if tx.err != nil {
return convertError(tx.err)
}
- var err error
if !tx.st.managesKey(key) {
- err = tx.itx.Put(key, value)
- } else {
- err = putVersioned(tx.itx, key, value)
- tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+ return tx.itx.Put(key, value)
}
- return err
+
+ version, err := putVersioned(tx.itx, key, value)
+ if err != nil {
+ return err
+ }
+ tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Version: version}})
+ return nil
}
// Delete implements the store.StoreWriter interface.
@@ -118,15 +120,15 @@
}
// Write LogEntry records.
timestamp := tx.st.clock.Now().UnixNano()
+ // TODO(rdaoud): switch to using a single counter for log entries
+ // instead of a (sequence, index) combo.
keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
for txSeq, op := range tx.ops {
key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
value := &LogEntry{
Op: op,
CommitTimestamp: timestamp,
- // TODO(sadovsky): This information is also captured in LogEntry keys.
- // Optimize to avoid redundancy.
- Continued: txSeq < len(tx.ops)-1,
+ Continued: txSeq < len(tx.ops)-1,
}
if err := util.PutObject(tx.itx, key, value); err != nil {
return err
@@ -150,6 +152,22 @@
return tx.itx.Abort()
}
+// AddSyncGroupOp injects a SyncGroup operation notification in the log entries
+// that the transaction writes when it is committed. It allows the SyncGroup
+// operations (create, join, leave, destroy) to notify the sync watcher of the
+// change at its proper position in the timeline (the transaction commit).
+// Note: this is an internal function used by sync, not part of the interface.
+func AddSyncGroupOp(tx store.Transaction, prefixes []string, remove bool) error {
+ wtx := tx.(*transaction)
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+ wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: prefixes, Remove: remove}})
+ return nil
+}
+
// Exported as a helper function for testing purposes
func getLogEntryKeyPrefix(seq uint64) string {
// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.