syncbase/server/watchable: watch enhancements
Various enhancements to the watchable store and its use by sync:
* Add the ability to mark whether a transaction is make by sync
vs applications. This allows sync to filter out its own updates
(echo suppression) from the watch log.
* Add the ability for sync to inject into the watch log snapshot
entries to initialize DAG metadata when a SyncGroup is created
or joined. These entries act as fake-puts that sync uses but
the future app-watch API will filter out.
* Change the key of watch log entries to use one sequence number
and update the tests accordingly.
* Make ACLs managed (versioned) keys.
* Improve testing of sync watcher and refactor some code.
Change-Id: I02f6af04c995c5a5b17857c6213b51b6e38dcf13
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index 2aaa997..0e3bb0f 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -9,7 +9,7 @@
// internal watching of store updates.
//
// LogEntry records are stored chronologically, using keys of the form
-// "$log:<seq>:<txSeq>". Sequence numbers are zero-padded to ensure that the
+// "$log:<seq>". Sequence numbers are zero-padded to ensure that the
// lexicographic order matches the numeric order.
//
// Version number records are stored using keys of the form "$version:<key>",
@@ -49,6 +49,7 @@
// Wrap returns a watchable.Store that wraps the given store.Store.
func Wrap(st store.Store, opts *Options) (Store, error) {
// Determine initial value for seq.
+ // TODO(sadovsky): Consider using a bigger seq.
var seq uint64 = 0
// TODO(sadovsky): Perform a binary search to determine seq, or persist the
// current sequence number on every nth write so that at startup time we can
@@ -67,7 +68,7 @@
if advanced {
key := string(keybuf)
parts := split(key)
- if len(parts) != 3 {
+ if len(parts) != 2 {
panic("wrong number of parts: " + key)
}
var err error
diff --git a/services/syncbase/server/watchable/test_util.go b/services/syncbase/server/watchable/test_util.go
index 5a5d6dc..d81d79e 100644
--- a/services/syncbase/server/watchable/test_util.go
+++ b/services/syncbase/server/watchable/test_util.go
@@ -7,6 +7,7 @@
import (
"fmt"
"io/ioutil"
+ "math"
"time"
"v.io/syncbase/x/ref/services/syncbase/clock"
@@ -72,26 +73,45 @@
wst.clock.SetSystemClock(mockClock)
}
+// LogEntryReader provides a stream-like interface to scan over the log entries
+// of a single batch, starting for a given sequence number. It opens a stream
+// that scans the log from the sequence number given. It stops after reading
+// the last entry in that batch (indicated by a false Continued flag).
type LogEntryReader struct {
- stream store.Stream
+ stream store.Stream // scan stream on the store Database
+ done bool // true after reading the last batch entry
+ key string // key of most recent log entry read
+ entry LogEntry // most recent log entry read
}
func NewLogEntryReader(st store.Store, seq uint64) *LogEntryReader {
- stream := st.Scan([]byte(getLogEntryKeyPrefix(seq)), []byte(getLogEntryKeyPrefix(seq+1)))
+ stream := st.Scan([]byte(getLogEntryKey(seq)), []byte(getLogEntryKey(math.MaxUint64)))
return &LogEntryReader{stream: stream}
}
func (ler *LogEntryReader) Advance() bool {
- return ler.stream.Advance()
+ if ler.done {
+ return false
+ }
+
+ if ler.stream.Advance() {
+ ler.key = string(ler.stream.Key(nil))
+ if err := vom.Decode(ler.stream.Value(nil), &ler.entry); err != nil {
+ panic(fmt.Errorf("Failed to decode LogEntry for key: %q", ler.key))
+ }
+ if ler.entry.Continued == false {
+ ler.done = true
+ }
+ return true
+ }
+
+ ler.key = ""
+ ler.entry = LogEntry{}
+ return false
}
func (ler *LogEntryReader) GetEntry() (string, LogEntry) {
- key := string(ler.stream.Key(nil))
- var entry LogEntry = LogEntry{}
- if err := vom.Decode(ler.stream.Value(nil), &entry); err != nil {
- panic(fmt.Errorf("Failed to decode LogEntry for key: %q", key))
- }
- return key, entry
+ return ler.key, ler.entry
}
/////// Clock related utility code ///////
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index ccb291d..d5206b1 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -11,6 +11,7 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/context"
"v.io/v23/verror"
)
@@ -20,6 +21,11 @@
mu sync.Mutex // protects the fields below
err error
ops []Op
+ // fromSync is true when a transaction is created by sync. This causes
+ // the log entries written at commit time to have their "FromSync" field
+ // set to true. That in turn causes the sync watcher to filter out such
+ // updates since sync already knows about them (echo suppression).
+ fromSync bool
}
var _ store.Transaction = (*transaction)(nil)
@@ -111,33 +117,30 @@
tx.err = verror.New(verror.ErrBadState, nil, store.ErrMsgCommittedTxn)
tx.st.mu.Lock()
defer tx.st.mu.Unlock()
- // Check sequence numbers.
- if uint64(len(tx.ops)) > math.MaxUint16 {
- return verror.New(verror.ErrInternal, nil, "too many ops")
- }
- if tx.st.seq == math.MaxUint64 {
+ // Check if there is enough space left in the sequence number.
+ if (math.MaxUint64 - tx.st.seq) < uint64(len(tx.ops)) {
return verror.New(verror.ErrInternal, nil, "seq maxed out")
}
// Write LogEntry records.
timestamp := tx.st.clock.Now().UnixNano()
- // TODO(rdaoud): switch to using a single counter for log entries
- // instead of a (sequence, index) combo.
- keyPrefix := getLogEntryKeyPrefix(tx.st.seq)
- for txSeq, op := range tx.ops {
- key := join(keyPrefix, fmt.Sprintf("%04x", txSeq))
+ seq := tx.st.seq
+ for i, op := range tx.ops {
+ key := getLogEntryKey(seq)
value := &LogEntry{
Op: op,
CommitTimestamp: timestamp,
- Continued: txSeq < len(tx.ops)-1,
+ FromSync: tx.fromSync,
+ Continued: i < len(tx.ops)-1,
}
if err := util.PutObject(tx.itx, key, value); err != nil {
return err
}
+ seq++
}
if err := tx.itx.Commit(); err != nil {
return err
}
- tx.st.seq++
+ tx.st.seq = seq
return nil
}
@@ -157,7 +160,7 @@
// operations (create, join, leave, destroy) to notify the sync watcher of the
// change at its proper position in the timeline (the transaction commit).
// Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(tx store.Transaction, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
defer wtx.mu.Unlock()
@@ -168,9 +171,45 @@
return nil
}
+// AddSyncSnapshotOp injects a sync snapshot operation notification in the log
+// entries that the transaction writes when it is committed. It allows the
+// SyncGroup create or join operations to notify the sync watcher of the
+// current keys and their versions to use when initializing the sync metadata
+// at the point in the timeline when these keys become syncable (at commit).
+// Note: this is an internal function used by sync, not part of the interface.
+func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
+ wtx := tx.(*transaction)
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ if wtx.err != nil {
+ return convertError(wtx.err)
+ }
+ if !wtx.st.managesKey(key) {
+ return verror.New(verror.ErrInternal, ctx,
+ fmt.Sprintf("cannot create SyncSnapshotOp on unmanaged key: %s", string(key)))
+ }
+ wtx.ops = append(wtx.ops, &OpSyncSnapshot{SyncSnapshotOp{Key: key, Version: version}})
+ return nil
+}
+
+// SetTransactionFromSync marks this transaction as created by sync as opposed
+// to one created by an application. The net effect is that, at commit time,
+// the log entries written are marked as made by sync. This allows the sync
+// Watcher to ignore them (echo suppression) because it made these updates.
+// Note: this is an internal function used by sync, not part of the interface.
+// TODO(rdaoud): support a generic echo-suppression mechanism for apps as well
+// maybe by having a creator ID in the transaction and log entries.
+// TODO(rdaoud): fold this flag (or creator ID) into Tx options when available.
+func SetTransactionFromSync(tx store.Transaction) {
+ wtx := tx.(*transaction)
+ wtx.mu.Lock()
+ defer wtx.mu.Unlock()
+ wtx.fromSync = true
+}
+
// Exported as a helper function for testing purposes
-func getLogEntryKeyPrefix(seq uint64) string {
- // Note, MaxUint16 is 0xffff and MaxUint64 is 0xffffffffffffffff.
+func getLogEntryKey(seq uint64) string {
+ // Note: MaxUint64 is 0xffffffffffffffff.
// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
return join(util.LogPrefix, fmt.Sprintf("%016x", seq))
}
diff --git a/services/syncbase/server/watchable/transaction_test.go b/services/syncbase/server/watchable/transaction_test.go
index 54405a2..3625896 100644
--- a/services/syncbase/server/watchable/transaction_test.go
+++ b/services/syncbase/server/watchable/transaction_test.go
@@ -80,7 +80,8 @@
t.Errorf("Failed to wrap store for update")
}
seqForUpdate := getSeq(wst2)
- if seqForUpdate != (seqForCreate + 1) {
+ // We expect the sequence number to have moved by +2 for the two puts.
+ if seqForUpdate != (seqForCreate + 2) {
t.Errorf("unexpected sequence number for update. seq for create: %d, seq for update: %d", seqForCreate, seqForUpdate)
}
diff --git a/services/syncbase/server/watchable/types.vdl b/services/syncbase/server/watchable/types.vdl
index 6f07a1c..3f5181b 100644
--- a/services/syncbase/server/watchable/types.vdl
+++ b/services/syncbase/server/watchable/types.vdl
@@ -36,13 +36,26 @@
Remove bool
}
+// SyncSnapshotOp represents a snapshot operation when creating and joining a
+// SyncGroup. The sync watcher needs to get a snapshot of the Database at the
+// point of creating/joining a SyncGroup. A SyncSnapshotOp entry is written to
+// the log for each Database key that falls within the SyncGroup prefixes. This
+// allows sync to initialize its metadata at the correct versions of the objects
+// when they become syncable. These log entries should be filtered by the
+// client-facing Watch interface because the user data did not actually change.
+type SyncSnapshotOp struct {
+ Key []byte
+ Version []byte
+}
+
// Op represents a store operation.
type Op union {
- Get GetOp
- Scan ScanOp
- Put PutOp
- Delete DeleteOp
- SyncGroup SyncGroupOp
+ Get GetOp
+ Scan ScanOp
+ Put PutOp
+ Delete DeleteOp
+ SyncGroup SyncGroupOp
+ SyncSnapshot SyncSnapshotOp
}
// LogEntry represents a single store operation. This operation may have been
@@ -55,6 +68,9 @@
// Time when the operation was committed.
CommitTimestamp int64
+ // Operation came from sync (used for echo suppression).
+ FromSync bool
+
// If true, this entry is followed by more entries that belong to the same
// commit as this entry.
Continued bool
diff --git a/services/syncbase/server/watchable/types.vdl.go b/services/syncbase/server/watchable/types.vdl.go
index a4f0619..5fd2e04 100644
--- a/services/syncbase/server/watchable/types.vdl.go
+++ b/services/syncbase/server/watchable/types.vdl.go
@@ -69,6 +69,23 @@
}) {
}
+// SyncSnapshotOp represents a snapshot operation when creating and joining a
+// SyncGroup. The sync watcher needs to get a snapshot of the Database at the
+// point of creating/joining a SyncGroup. A SyncSnapshotOp entry is written to
+// the log for each Database key that falls within the SyncGroup prefixes. This
+// allows sync to initialize its metadata at the correct versions of the objects
+// when they become syncable. These log entries should be filtered by the
+// client-facing Watch interface because the user data did not actually change.
+type SyncSnapshotOp struct {
+ Key []byte
+ Version []byte
+}
+
+func (SyncSnapshotOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.SyncSnapshotOp"`
+}) {
+}
+
type (
// Op represents any single field of the Op union type.
//
@@ -93,16 +110,19 @@
OpDelete struct{ Value DeleteOp }
// OpSyncGroup represents field SyncGroup of the Op union type.
OpSyncGroup struct{ Value SyncGroupOp }
+ // OpSyncSnapshot represents field SyncSnapshot of the Op union type.
+ OpSyncSnapshot struct{ Value SyncSnapshotOp }
// __OpReflect describes the Op union type.
__OpReflect struct {
Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/watchable.Op"`
Type Op
Union struct {
- Get OpGet
- Scan OpScan
- Put OpPut
- Delete OpDelete
- SyncGroup OpSyncGroup
+ Get OpGet
+ Scan OpScan
+ Put OpPut
+ Delete OpDelete
+ SyncGroup OpSyncGroup
+ SyncSnapshot OpSyncSnapshot
}
}
)
@@ -132,6 +152,11 @@
func (x OpSyncGroup) Name() string { return "SyncGroup" }
func (x OpSyncGroup) __VDLReflect(__OpReflect) {}
+func (x OpSyncSnapshot) Index() int { return 5 }
+func (x OpSyncSnapshot) Interface() interface{} { return x.Value }
+func (x OpSyncSnapshot) Name() string { return "SyncSnapshot" }
+func (x OpSyncSnapshot) __VDLReflect(__OpReflect) {}
+
// LogEntry represents a single store operation. This operation may have been
// part of a transaction, as signified by the Continued boolean. Read-only
// operations (and read-only transactions) are not logged.
@@ -140,6 +165,8 @@
Op Op
// Time when the operation was committed.
CommitTimestamp int64
+ // Operation came from sync (used for echo suppression).
+ FromSync bool
// If true, this entry is followed by more entries that belong to the same
// commit as this entry.
Continued bool
@@ -156,6 +183,7 @@
vdl.Register((*PutOp)(nil))
vdl.Register((*DeleteOp)(nil))
vdl.Register((*SyncGroupOp)(nil))
+ vdl.Register((*SyncSnapshotOp)(nil))
vdl.Register((*Op)(nil))
vdl.Register((*LogEntry)(nil))
}