syncbase/vsync: add sync watcher
Add the sync watcher thread to read batches of store log entries
and apply them to the sync metadata. Provide a hook for the
SyncGroup create/join/leave methods to inform the watcher of
changes to the key prefixes to sync.
Change-Id: Ic30f02c5506d01a09dbddde245b0b149a62c72e4
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 3cbff62..ccb291d 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -72,14 +72,16 @@
if tx.err != nil {
return convertError(tx.err)
}
- var err error
if !tx.st.managesKey(key) {
- err = tx.itx.Put(key, value)
- } else {
- err = putVersioned(tx.itx, key, value)
- tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+ return tx.itx.Put(key, value)
}
- return err
+
+ version, err := putVersioned(tx.itx, key, value)
+ if err != nil {
+ return err
+ }
+ tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Version: version}})
+ return nil
}
// Delete implements the store.StoreWriter interface.
@@ -118,15 +120,15 @@
}
// Write LogEntry records.
timestamp := tx.st.clock.Now().UnixNano()
+ // TODO(rdaoud): switch to using a single counter for log entries
+ // instead of a (sequence, index) combo.
keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
for txSeq, op := range tx.ops {
key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
value := &LogEntry{
Op: op,
CommitTimestamp: timestamp,
- // TODO(sadovsky): This information is also captured in LogEntry keys.
- // Optimize to avoid redundancy.
- Continued: txSeq < len(tx.ops)-1,
+ Continued: txSeq < len(tx.ops)-1,
}
if err := util.PutObject(tx.itx, key, value); err != nil {
return err
@@ -150,6 +152,22 @@
return tx.itx.Abort()
}
+// AddSyncGroupOp injects a SyncGroup operation notification in the log entries
+// that the transaction writes when it is committed. It allows the SyncGroup
+// operations (create, join, leave, destroy) to notify the sync watcher of the
+// change at its proper position in the timeline (the transaction commit).
+// Note: this is an internal function used by sync, not part of the interface.
+func AddSyncGroupOp(tx store.Transaction, prefixes []string, remove bool) error {
+ wtx := tx.(*transaction)
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+ wtx.ops = append(wtx.ops, &OpSyncGroup{SyncGroupOp{Prefixes: prefixes, Remove: remove}})
+ return nil
+}
+
// Exported as a helper function for testing purposes
func getLogEntryKeyPrefix(seq uint64) string {
// Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
diff --git a/services/syncbase/server/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
index 07d1ce1..6f07a1c 100644
--- a/services/syncbase/server/watchable/types.vdl
+++ b/services/syncbase/server/watchable/types.vdl
@@ -15,10 +15,12 @@
Limit []byte
}
-// PutOp represents a store put operation.
+// PutOp represents a store put operation. The new version is written instead
+// of the value to avoid duplicating the user data in the store. The version
+// is used to access the user data of that specific mutation.
type PutOp struct {
- Key []byte
- Value []byte
+ Key []byte
+ Version []byte
}
// DeleteOp represents a store delete operation.
@@ -26,12 +28,21 @@
Key []byte
}
+// SyncGroupOp represents a change in SyncGroup tracking, adding or removing
+// key prefixes to sync. SyncGroup prefixes cannot be changed, this is used
+// to track changes due to SyncGroup create/join/leave/destroy.
+type SyncGroupOp struct {
+ Prefixes []string
+ Remove bool
+}
+
// Op represents a store operation.
type Op union {
- Get GetOp
- Scan ScanOp
- Put PutOp
- Delete DeleteOp
+ Get GetOp
+ Scan ScanOp
+ Put PutOp
+ Delete DeleteOp
+ SyncGroup SyncGroupOp
}
// LogEntry represents a single store operation. This operation may have been
diff --git a/services/syncbase/server/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
index e94587a..a4f0619 100644
--- a/services/syncbase/server/watchable/types.vdl.go
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -33,10 +33,12 @@
}) {
}
-// PutOp represents a store put operation.
+// PutOp represents a store put operation. The new version is written instead
+// of the value to avoid duplicating the user data in the store. The version
+// is used to access the user data of that specific mutation.
type PutOp struct {
- Key []byte
- Value []byte
+ Key []byte
+ Version []byte
}
func (PutOp) __VDLReflect(struct {
@@ -54,6 +56,19 @@
}) {
}
+// SyncGroupOp represents a change in SyncGroup tracking, adding or removing
+// key prefixes to sync. SyncGroup prefixes cannot be changed, this is used
+// to track changes due to SyncGroup create/join/leave/destroy.
+type SyncGroupOp struct {
+ Prefixes []string
+ Remove bool
+}
+
+func (SyncGroupOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.SyncGroupOp"`
+}) {
+}
+
type (
// Op represents any single field of the Op union type.
//
@@ -76,15 +91,18 @@
OpPut struct{ Value PutOp }
// OpDelete represents field Delete of the Op union type.
OpDelete struct{ Value DeleteOp }
+ // OpSyncGroup represents field SyncGroup of the Op union type.
+ OpSyncGroup struct{ Value SyncGroupOp }
// __OpReflect describes the Op union type.
__OpReflect struct {
Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
Type Op
Union struct {
- Get OpGet
- Scan OpScan
- Put OpPut
- Delete OpDelete
+ Get OpGet
+ Scan OpScan
+ Put OpPut
+ Delete OpDelete
+ SyncGroup OpSyncGroup
}
}
)
@@ -109,6 +127,11 @@
func (x OpDelete) Name() string { return "Delete" }
func (x OpDelete) __VDLReflect(__OpReflect) {}
+func (x OpSyncGroup) Index() int { return 4 }
+func (x OpSyncGroup) Interface() interface{} { return x.Value }
+func (x OpSyncGroup) Name() string { return "SyncGroup" }
+func (x OpSyncGroup) __VDLReflect(__OpReflect) {}
+
// LogEntry represents a single store operation. This operation may have been
// part of a transaction, as signified by the Continued boolean. Read-only
// operations (and read-only transactions) are not logged.
@@ -132,6 +155,7 @@
vdl.Register((*ScanOp)(nil))
vdl.Register((*PutOp)(nil))
vdl.Register((*DeleteOp)(nil))
+ vdl.Register((*SyncGroupOp)(nil))
vdl.Register((*Op)(nil))
vdl.Register((*LogEntry)(nil))
}
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
index 3dc3e78..036d326 100644
--- a/services/syncbase/server/watchable/util.go
+++ b/services/syncbase/server/watchable/util.go
@@ -7,6 +7,10 @@
// TODO(sadovsky): Avoid copying back and forth between []byte's and strings.
// We should probably convert incoming strings to []byte's as early as possible,
// and deal exclusively in []byte's internally.
+// TODO(rdaoud): I propose we standardize on key and version being strings and
+// the value being []byte within Syncbase. We define invalid characters in the
+// key space (and reserve "$" and ":"). The lower storage engine layers are
+// free to map that to what they need internally ([]byte or string).
import (
"fmt"
@@ -25,6 +29,19 @@
rngLock sync.Mutex
)
+// NewVersion returns a new version for a store entry mutation.
+func NewVersion() []byte {
+ // TODO(rdaoud): revisit the number of bits: should we use 128 bits?
+ // Note: the version has to be unique per object key, not on its own.
+ // TODO(rdaoud): move sync's rand64() to a general Syncbase spot and
+ // reuse it here.
+ rngLock.Lock()
+ num := rng.Int63()
+ rngLock.Unlock()
+
+ return []byte(fmt.Sprintf("%x", num))
+}
+
func makeVersionKey(key []byte) []byte {
return []byte(join(util.VersionPrefix, string(key)))
}
@@ -109,20 +126,19 @@
if err := wtx.itx.Put(makeVersionKey(key), version); err != nil {
return err
}
- wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Value: version}})
+ wtx.ops = append(wtx.ops, &OpPut{PutOp{Key: key, Version: version}})
return nil
}
-func putVersioned(tx store.Transaction, key, value []byte) error {
- rngLock.Lock()
- num := rng.Int63()
- rngLock.Unlock()
-
- version := []byte(fmt.Sprintf("%x", num))
+func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
+ version := NewVersion()
if err := tx.Put(makeVersionKey(key), version); err != nil {
- return err
+ return nil, err
}
- return tx.Put(makeAtVersionKey(key, version), value)
+ if err := tx.Put(makeAtVersionKey(key, version), value); err != nil {
+ return nil, err
+ }
+ return version, nil
}
func deleteVersioned(tx store.Transaction, key []byte) error {