Merge "veyron/services/store: Implement Object.WatchGlob()."
diff --git a/services/store/memstore/query/glob.go b/services/store/memstore/query/glob.go
index 98ce585..3614af3 100644
--- a/services/store/memstore/query/glob.go
+++ b/services/store/memstore/query/glob.go
@@ -18,6 +18,11 @@
// Glob returns an iterator that emits all values that match the given pattern.
func Glob(sn state.Snapshot, clientID security.PublicID, path storage.PathName, pattern string) (service.GlobStream, error) {
+ return GlobIterator(sn, clientID, path, pattern)
+}
+
+// GlobIterator returns an iterator that emits all values that match the given pattern.
+func GlobIterator(sn state.Snapshot, clientID security.PublicID, path storage.PathName, pattern string) (state.Iterator, error) {
parsed, err := glob.Parse(pattern)
if err != nil {
return nil, err
diff --git a/services/store/memstore/state/cell.go b/services/store/memstore/state/cell.go
index 8671f7b..2913146 100644
--- a/services/store/memstore/state/cell.go
+++ b/services/store/memstore/state/cell.go
@@ -74,7 +74,7 @@
}
// get the *storage.Entry for a cell.
-func (c *Cell) getEntry() *storage.Entry {
+func (c *Cell) GetEntry() *storage.Entry {
entry := newSubfieldEntry(c.Value)
c.fillStat(&entry.Stat)
return entry
diff --git a/services/store/memstore/state/iterator.go b/services/store/memstore/state/iterator.go
index 791b20e..92614be 100644
--- a/services/store/memstore/state/iterator.go
+++ b/services/store/memstore/state/iterator.go
@@ -118,7 +118,7 @@
r.AddValue(v)
set = r.Get()
} else {
- it.entry = cell.getEntry()
+ it.entry = cell.GetEntry()
it.visited[cell.ID] = struct{}{}
set = cell.refs
}
@@ -203,7 +203,7 @@
// Mark as visited.
it.visited[n.id] = struct{}{}
- it.entry, it.path = c.getEntry(), fullPath
+ it.entry, it.path = c.GetEntry(), fullPath
}
func (it *iterator) Snapshot() Snapshot {
diff --git a/services/store/memstore/state/mutable_snapshot.go b/services/store/memstore/state/mutable_snapshot.go
index 7b7c675..f6677d1 100644
--- a/services/store/memstore/state/mutable_snapshot.go
+++ b/services/store/memstore/state/mutable_snapshot.go
@@ -355,7 +355,7 @@
}
var e *storage.Entry
if len(suffix) == 0 {
- e = cell.getEntry()
+ e = cell.GetEntry()
} else {
e = newSubfieldEntry(v)
}
diff --git a/services/store/memstore/state/snapshot.go b/services/store/memstore/state/snapshot.go
index e6b84b7..6c4b613 100644
--- a/services/store/memstore/state/snapshot.go
+++ b/services/store/memstore/state/snapshot.go
@@ -98,7 +98,7 @@
}
var e *storage.Entry
if len(suffix) == 0 {
- e = cell.getEntry()
+ e = cell.GetEntry()
} else {
e = newSubfieldEntry(v)
}
diff --git a/services/store/memstore/state/state.go b/services/store/memstore/state/state.go
index d9170d9..a1cd56c 100644
--- a/services/store/memstore/state/state.go
+++ b/services/store/memstore/state/state.go
@@ -35,6 +35,12 @@
return st.timestamp
}
+// DeepCopy creates a copy of the state. Mutations to the copy do not affect
+// the original, and vice versa.
+func (st *State) DeepCopy() *State {
+ return &State{st.MutableSnapshot(), st.timestamp}
+}
+
// GC performs a manual garbage collection.
func (st *State) GC() {
st.snapshot.gc()
diff --git a/services/store/memstore/watch/glob_processor.go b/services/store/memstore/watch/glob_processor.go
new file mode 100644
index 0000000..38ea6c1
--- /dev/null
+++ b/services/store/memstore/watch/glob_processor.go
@@ -0,0 +1,191 @@
+package watch
+
+import (
+ "errors"
+
+ iquery "veyron/services/store/memstore/query"
+ "veyron/services/store/memstore/state"
+
+ "veyron2/security"
+ "veyron2/services/watch"
+ "veyron2/storage"
+)
+
+// globProcessor processes log entries into storage entries that match a pattern.
+type globProcessor struct {
+ // hasProcessedState is true iff the initial state has been processed.
+ hasProcessedState bool
+ // pid is the identity of the client watching for changes.
+ pid security.PublicID
+ // path on which the watch is placed. Returned names are rooted at this path.
+ path storage.PathName
+ // pattern that the returned names match.
+ pattern string
+ // st is the store state as of the last processed event.
+ st *state.State
+ // matches is a map of each matching name to the id of the object at that
+ // name, as of the last processed event.
+ matches map[string]storage.ID
+}
+
+func newGlobProcessor(pid security.PublicID, path storage.PathName,
+ pattern string) (reqProcessor, error) {
+
+ return &globProcessor{
+ hasProcessedState: false,
+ pid: pid,
+ path: path,
+ pattern: pattern,
+ }, nil
+}
+
+func (p *globProcessor) processState(st *state.State) ([]watch.Change, error) {
+ // Check that the initial state has not already been processed.
+ if p.hasProcessedState {
+ return nil, errors.New("cannot process state after processing the initial state")
+ }
+ p.hasProcessedState = true
+
+ // Find all names that match the pattern.
+ sn := st.MutableSnapshot()
+ matches, err := glob(sn, p.pid, p.path, p.pattern)
+ if err != nil {
+ return nil, err
+ }
+ p.st = st
+ p.matches = matches
+
+ var changes []watch.Change
+
+ // Create a change for every matching name.
+ for name, id := range matches {
+ cell := sn.Find(id)
+ entry := cell.GetEntry()
+ change := watch.Change{
+ Name: name,
+ State: watch.Exists,
+ Value: entry,
+ }
+ // TODO(tilaks): don't clone change.
+ changes = append(changes, change)
+ }
+
+ return changes, nil
+}
+
+func (p *globProcessor) processTransaction(mus *state.Mutations) ([]watch.Change, error) {
+ // Ensure that the initial state has been processed.
+ if !p.hasProcessedState {
+ return nil, errors.New("cannot process a transaction before processing the initial state")
+ }
+
+ previousMatches := p.matches
+ // Apply the transaction to the state.
+ if err := p.st.ApplyMutations(mus); err != nil {
+ return nil, err
+ }
+ // Find all names that match the pattern in the new state.
+ sn := p.st.MutableSnapshot()
+ newMatches, err := glob(sn, p.pid, p.path, p.pattern)
+ if err != nil {
+ return nil, err
+ }
+ p.matches = newMatches
+
+ var changes []watch.Change
+
+ removed, updated := diffMatches(previousMatches, newMatches, mus.Delta)
+
+ // Create a change for every matching name that was removed.
+ for name := range removed {
+ change := watch.Change{
+ Name: name,
+ State: watch.DoesNotExist,
+ }
+ // TODO(tilaks): don't clone change
+ changes = append(changes, change)
+ }
+
+ // Create a change for every matching name that was updated.
+ for name := range updated {
+ id := newMatches[name]
+ cell := sn.Find(id)
+ entry := cell.GetEntry()
+ change := watch.Change{
+ Name: name,
+ State: watch.Exists,
+ Value: entry,
+ }
+ // TODO(tilaks): don't clone change.
+ changes = append(changes, change)
+ }
+
+ return changes, nil
+}
+
+// diffMatches returns the names that have been removed or updated.
+//
+// A name is removed if it can no longer be resolved, or if the object at that
+// name is no longer accessible.
+//
+// A name is updated if
+// 1) it is newly added.
+// 2) the object at that name is now accessible.
+// 3) the object at the name has a new value or new references.
+// 4) the object at that name replaced a previous object.
+func diffMatches(previousMatches, newMatches map[string]storage.ID,
+ delta map[storage.ID]*state.Mutation) (removed, updated map[string]struct{}) {
+
+ removed = make(map[string]struct{})
+ updated = make(map[string]struct{})
+ present := struct{}{}
+
+ for name, previousID := range previousMatches {
+ if newID, ok := newMatches[name]; !ok {
+ // There is no longer an object at this name.
+ removed[name] = present
+ } else if newID != previousID {
+ // The object at this name was replaced.
+ updated[name] = present
+ }
+ }
+
+ for name, newID := range newMatches {
+ if _, ok := previousMatches[name]; !ok {
+ // An object was added at this name.
+ updated[name] = present
+ continue
+ }
+ if _, ok := delta[newID]; ok {
+ // The value or implicit directory of the object at this name was
+ // updated.
+ updated[name] = present
+ }
+ }
+
+ return
+}
+
+// glob returns all names in a snapshot that match a pattern. Each name maps to
+// the id of the object in the snapshot at that name.
+func glob(sn state.Snapshot, pid security.PublicID, path storage.PathName,
+ pattern string) (map[string]storage.ID, error) {
+
+ matches := make(map[string]storage.ID)
+
+ it, err := iquery.GlobIterator(sn, pid, path, pattern)
+ if err != nil {
+ return nil, err
+ }
+
+ for it.IsValid() {
+ name := it.Name()
+ matchName := append(path, storage.ParsePath(name)...).String()
+ entry := it.Get()
+ id := entry.Stat.ID
+ matches[matchName] = id
+ it.Next()
+ }
+
+ return matches, nil
+}
diff --git a/services/store/memstore/watch/glob_processor_test.go b/services/store/memstore/watch/glob_processor_test.go
new file mode 100644
index 0000000..fc2c310
--- /dev/null
+++ b/services/store/memstore/watch/glob_processor_test.go
@@ -0,0 +1,387 @@
+package watch
+
+import (
+ "testing"
+
+ "veyron/services/store/memstore"
+ watchtesting "veyron/services/store/memstore/watch/testing"
+
+ "veyron2/storage"
+)
+
+func TestGlobProcessState(t *testing.T) {
+ // Create a new store.
+ dbName, st, cleanup := createStore(t)
+ defer cleanup()
+
+ // Put /, /a, /a/b.
+ tr := memstore.NewTransaction()
+ id1 := put(t, st, tr, "/", "val1")
+ id2 := put(t, st, tr, "/a", "val2")
+ put(t, st, tr, "/a/b", "val3")
+ commit(t, tr)
+
+ // Remove /a/b.
+ tr = memstore.NewTransaction()
+ remove(t, st, tr, "/a/b")
+ commit(t, tr)
+ gc(t, st)
+
+ if err := st.Close(); err != nil {
+ t.Fatalf("Close() failed: %v", err)
+ }
+
+ // Re-create a new store. This should compress the log, creating an initial
+ // state containing / and /a.
+ st, cleanup = openStore(t, dbName)
+ defer cleanup()
+
+ log, cleanup := openLog(t, dbName)
+ defer cleanup()
+
+ rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+ rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+ aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+ aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+
+ // Expect initial state that contains / and /a.
+ logst := readState(t, log)
+
+ changes := processState(t, rootRecursiveProcessor, logst, 2)
+ watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+ changes = processState(t, rootListProcessor, logst, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+ changes = processState(t, aRecursiveProcessor, logst, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+ processState(t, aListProcessor, logst, 0)
+}
+
+func TestGlobProcessTransactionAdd(t *testing.T) {
+ dbName, st, cleanup := createStore(t)
+ defer cleanup()
+
+ log, cleanup := openLog(t, dbName)
+ defer cleanup()
+
+ rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+ rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+ aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+ aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+ bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+ logst := readState(t, log)
+ processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, rootListProcessor, logst.DeepCopy(), 0)
+ processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, aListProcessor, logst.DeepCopy(), 0)
+ processState(t, bRecursiveProcessor, logst.DeepCopy(), 0)
+
+ // First transaction, put /, /a, /a/b.
+ tr := memstore.NewTransaction()
+ id1 := put(t, st, tr, "/", "val1")
+ id2 := put(t, st, tr, "/a", "val2")
+ id3 := put(t, st, tr, "/a/b", "val3")
+ commit(t, tr)
+
+ // Expect transaction that adds /, /a and /a/b.
+ mus := readTransaction(t, log)
+
+ changes := processTransaction(t, rootRecursiveProcessor, mus, 3)
+ watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+ watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+
+ changes = processTransaction(t, rootListProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+ changes = processTransaction(t, aRecursiveProcessor, mus, 2)
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+ watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+
+ changes = processTransaction(t, aListProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+
+ changes = processTransaction(t, bRecursiveProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+}
+
+func TestGlobProcessTransactionEmptyPath(t *testing.T) {
+ dbName, st, cleanup := createStore(t)
+ defer cleanup()
+
+ log, cleanup := openLog(t, dbName)
+ defer cleanup()
+
+ bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+ expectState(t, log, bRecursiveProcessor, 0)
+
+ // First transaction, put /, /a.
+ tr := memstore.NewTransaction()
+ put(t, st, tr, "/", "val1")
+ put(t, st, tr, "/a", "val2")
+ commit(t, tr)
+
+ // Expect no change.
+ expectTransaction(t, log, bRecursiveProcessor, 0)
+
+ // Next transaction, put /a/b.
+ tr = memstore.NewTransaction()
+ id3 := put(t, st, tr, "/a/b", "val3")
+ commit(t, tr)
+
+ // Expect transaction that adds /a/b.
+ changes := expectTransaction(t, log, bRecursiveProcessor, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+}
+
+func TestGlobProcessTransactionUpdate(t *testing.T) {
+ dbName, st, cleanup := createStore(t)
+ defer cleanup()
+
+ log, cleanup := openLog(t, dbName)
+ defer cleanup()
+
+ rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+ rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+ aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+ aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+ bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+ logst := readState(t, log)
+ processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, rootListProcessor, logst.DeepCopy(), 0)
+ processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, aListProcessor, logst.DeepCopy(), 0)
+ processState(t, bRecursiveProcessor, logst.DeepCopy(), 0)
+
+ // First transaction, put /, /a, /a/b.
+ tr := memstore.NewTransaction()
+ put(t, st, tr, "/", "val1")
+ put(t, st, tr, "/a", "val2")
+ id3 := put(t, st, tr, "/a/b", "val3")
+ commit(t, tr)
+
+ mus := readTransaction(t, log)
+ changes := processTransaction(t, rootRecursiveProcessor, mus, 3)
+ changes = processTransaction(t, rootListProcessor, mus, 1)
+ changes = processTransaction(t, aRecursiveProcessor, mus, 2)
+ changes = processTransaction(t, aListProcessor, mus, 1)
+ changes = processTransaction(t, bRecursiveProcessor, mus, 1)
+
+ // Next transaction, remove /a/b.
+ tr = memstore.NewTransaction()
+ put(t, st, tr, "/a/b", "val4")
+ commit(t, tr)
+
+ // Expect transaction that updates /a/b.
+ mus = readTransaction(t, log)
+
+ changes = processTransaction(t, rootRecursiveProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val4")
+
+ processTransaction(t, rootListProcessor, mus, 0)
+
+ changes = processTransaction(t, aRecursiveProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val4")
+
+ processTransaction(t, aListProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val4")
+
+ processTransaction(t, bRecursiveProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val4")
+}
+
+func TestGlobProcessTransactionRemove(t *testing.T) {
+ dbName, st, cleanup := createStore(t)
+ defer cleanup()
+
+ log, cleanup := openLog(t, dbName)
+ defer cleanup()
+
+ rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+ rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+ aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+ aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+ bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+ logst := readState(t, log)
+ processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, rootListProcessor, logst.DeepCopy(), 0)
+ processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, aListProcessor, logst.DeepCopy(), 0)
+ processState(t, bRecursiveProcessor, logst.DeepCopy(), 0)
+
+ // First transaction, put /, /a, /a/b.
+ tr := memstore.NewTransaction()
+ put(t, st, tr, "/", "val1")
+ id2 := put(t, st, tr, "/a", "val2")
+ put(t, st, tr, "/a/b", "val3")
+ commit(t, tr)
+
+ mus := readTransaction(t, log)
+ processTransaction(t, rootRecursiveProcessor, mus, 3)
+ processTransaction(t, rootListProcessor, mus, 1)
+ processTransaction(t, aRecursiveProcessor, mus, 2)
+ processTransaction(t, aListProcessor, mus, 1)
+ processTransaction(t, bRecursiveProcessor, mus, 1)
+
+ // Next transaction, remove /a/b.
+ tr = memstore.NewTransaction()
+ remove(t, st, tr, "/a/b")
+ commit(t, tr)
+
+ // Expect transaction that updates /a and removes /a/b.
+ mus = readTransaction(t, log)
+
+ changes := processTransaction(t, rootRecursiveProcessor, mus, 2)
+ // TODO(tilaks): Should we report implicit directory changes?
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+ changes = processTransaction(t, rootListProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+ changes = processTransaction(t, aRecursiveProcessor, mus, 2)
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+ changes = processTransaction(t, aListProcessor, mus, 1)
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+ changes = processTransaction(t, bRecursiveProcessor, mus, 1)
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+ // Garbage-collect the node at /a/b.
+ gc(t, st)
+
+ // Expect no change.
+ mus = readTransaction(t, log)
+ processTransaction(t, rootRecursiveProcessor, mus, 0)
+ processTransaction(t, rootListProcessor, mus, 0)
+ processTransaction(t, aRecursiveProcessor, mus, 0)
+ processTransaction(t, aListProcessor, mus, 0)
+ processTransaction(t, bRecursiveProcessor, mus, 0)
+}
+
+func TestGlobProcessTransactionRemoveRecursive(t *testing.T) {
+ dbName, st, cleanup := createStore(t)
+ defer cleanup()
+
+ log, cleanup := openLog(t, dbName)
+ defer cleanup()
+
+ rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+ rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+ aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+ aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+ bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+ logst := readState(t, log)
+ processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, rootListProcessor, logst.DeepCopy(), 0)
+ processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, aListProcessor, logst.DeepCopy(), 0)
+ processState(t, bRecursiveProcessor, logst.DeepCopy(), 0)
+
+ // First transaction, put /, /a, /a/b.
+ tr := memstore.NewTransaction()
+ id1 := put(t, st, tr, "/", "val1")
+ put(t, st, tr, "/a", "val2")
+ put(t, st, tr, "/a/b", "val3")
+ commit(t, tr)
+
+ mus := readTransaction(t, log)
+ processTransaction(t, rootRecursiveProcessor, mus, 3)
+ processTransaction(t, rootListProcessor, mus, 1)
+ processTransaction(t, aRecursiveProcessor, mus, 2)
+ processTransaction(t, aListProcessor, mus, 1)
+ processTransaction(t, bRecursiveProcessor, mus, 1)
+
+ // Next transaction, remove /a.
+ tr = memstore.NewTransaction()
+ remove(t, st, tr, "/a")
+ commit(t, tr)
+
+ // Expect transaction that removes /a and /a/b.
+ mus = readTransaction(t, log)
+
+ changes := processTransaction(t, rootRecursiveProcessor, mus, 3)
+ // TODO(tilaks): Should we report implicit directory changes?
+ watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a")
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+ changes = processTransaction(t, rootListProcessor, mus, 1)
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a")
+
+ changes = processTransaction(t, aRecursiveProcessor, mus, 2)
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a")
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+ changes = processTransaction(t, aListProcessor, mus, 1)
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+ changes = processTransaction(t, bRecursiveProcessor, mus, 1)
+ watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+}
+
+func TestGlobProcessTransactionReplace(t *testing.T) {
+ dbName, st, cleanup := createStore(t)
+ defer cleanup()
+
+ log, cleanup := openLog(t, dbName)
+ defer cleanup()
+
+ rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+ rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+ aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+ aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+
+ logst := readState(t, log)
+
+ processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, rootListProcessor, logst.DeepCopy(), 0)
+ processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+ processState(t, aListProcessor, logst.DeepCopy(), 0)
+
+ // First transaction, put /, /a, /a/b.
+ tr := memstore.NewTransaction()
+ id1 := put(t, st, tr, "/", "val1")
+ put(t, st, tr, "/a", "val2")
+ commit(t, tr)
+
+ mus := readTransaction(t, log)
+ processTransaction(t, rootRecursiveProcessor, mus, 2)
+ processTransaction(t, rootListProcessor, mus, 1)
+ processTransaction(t, aRecursiveProcessor, mus, 1)
+ processTransaction(t, aListProcessor, mus, 0)
+
+ // Next transaction, replace /a.
+ tr = memstore.NewTransaction()
+ remove(t, st, tr, "/a")
+ id2 := put(t, st, tr, "/a", "val2")
+ commit(t, tr)
+
+ // Expect transaction that updates /a.
+ mus = readTransaction(t, log)
+
+ changes := processTransaction(t, rootRecursiveProcessor, mus, 2)
+ // TODO(tilaks): Should we report implicit directory changes?
+ watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+ changes = processTransaction(t, rootListProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+ changes = processTransaction(t, aRecursiveProcessor, mus, 1)
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+ processTransaction(t, aListProcessor, mus, 0)
+}
+
+// TODO(tilaks): test ACL update.
diff --git a/services/store/memstore/watch/processor.go b/services/store/memstore/watch/processor.go
index 9fcc394..ab9bb69 100644
--- a/services/store/memstore/watch/processor.go
+++ b/services/store/memstore/watch/processor.go
@@ -14,9 +14,11 @@
// the store. The returned changes need not be the sequence of changes that
// originally created the initial state (e.g. in the case of compress), but
// are sufficient to re-construct the state viewable within the request.
+ // processState may modify its input.
processState(st *state.State) ([]watch.Change, error)
// processTransaction returns the set of changes made in some transaction.
// The changes are returned in no specific order.
+ // processTransaction may modify its input.
processTransaction(mu *state.Mutations) ([]watch.Change, error)
}
diff --git a/services/store/memstore/watch/raw_processor_test.go b/services/store/memstore/watch/raw_processor_test.go
index 34b0c25..8599ef0 100644
--- a/services/store/memstore/watch/raw_processor_test.go
+++ b/services/store/memstore/watch/raw_processor_test.go
@@ -4,6 +4,8 @@
"testing"
"veyron/services/store/memstore"
+ watchtesting "veyron/services/store/memstore/watch/testing"
+
"veyron2/storage"
)
@@ -34,8 +36,9 @@
st, cleanup = openStore(t, dbName)
defer cleanup()
- log, cleanup, processor := openLog(t, dbName)
+ log, cleanup := openLog(t, dbName)
defer cleanup()
+ processor := createRawProcessor(t)
post1 := st.Snapshot().Find(id1).Version
post2 := st.Snapshot().Find(id2).Version
@@ -43,38 +46,20 @@
// Expect initial state that
// 1) Contains / with value val1 and implicit directory entry /a
// 2) Contains /a with value val2
- logstore, err := log.ReadState(rootPublicID)
- if err != nil {
- t.Fatalf("ReadState() failed: %v", err)
- }
- logst := logstore.State
- changes, err := processor.processState(logst)
- if len(changes) != 2 {
- t.Fatalf("Expected changes to have 2 entries, got: %v", changes)
- }
- expectExists(t, changes, id1, storage.NoVersion, post1, true, "val1", dir("a", id2))
- expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+ changes := expectState(t, log, processor, 2)
+ watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post1, true, "val1", watchtesting.DirOf("a", id2))
+ watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post2, false, "val2", watchtesting.EmptyDir)
}
func TestRawProcessTransactionAddRemove(t *testing.T) {
dbName, st, cleanup := createStore(t)
defer cleanup()
- log, cleanup, processor := openLog(t, dbName)
+ log, cleanup := openLog(t, dbName)
defer cleanup()
+ processor := createRawProcessor(t)
- logstore, err := log.ReadState(rootPublicID)
- if err != nil {
- t.Fatalf("ReadState() failed: %v", err)
- }
- logst := logstore.State
- changes, err := processor.processState(logst)
- if err != nil {
- t.Fatalf("processState() failed: %v", err)
- }
- if len(changes) != 0 {
- t.Fatal("Expected changes to have 0 entries")
- }
+ expectState(t, log, processor, 0)
// First transaction, put /, /a, /a/b
tr := memstore.NewTransaction()
@@ -91,20 +76,10 @@
// 1) Adds / with value val1 and implicit directory entry /a
// 2) Adds /a with value val2 and implicit directory entry /a/b
// 3) Adds /a/b with value val3
- logmu, err := log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- changes, err = processor.processTransaction(logmu)
- if err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
- if len(changes) != 3 {
- t.Fatal("Expected changes to have 3 entries")
- }
- expectExists(t, changes, id1, storage.NoVersion, post1, true, "val1", dir("a", id2))
- expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", dir("b", id3))
- expectExists(t, changes, id3, storage.NoVersion, post3, false, "val3", empty)
+ changes := expectTransaction(t, log, processor, 3)
+ watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post1, true, "val1", watchtesting.DirOf("a", id2))
+ watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post2, false, "val2", watchtesting.DirOf("b", id3))
+ watchtesting.ExpectMutationExists(t, changes, id3, storage.NoVersion, post3, false, "val3", watchtesting.EmptyDir)
// Next transaction, remove /a/b
tr = memstore.NewTransaction()
@@ -116,61 +91,31 @@
post2 = st.Snapshot().Find(id2).Version
// Expect transaction that removes implicit dir entry /a/b from /a
- logmu, err = log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- changes, err = processor.processTransaction(logmu)
- if err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
- if len(changes) != 1 {
- t.Fatal("Expected changes to have 1 entry")
- }
- expectExists(t, changes, id2, pre2, post2, false, "val2", empty)
+ changes = expectTransaction(t, log, processor, 1)
+ watchtesting.ExpectMutationExists(t, changes, id2, pre2, post2, false, "val2", watchtesting.EmptyDir)
// Garbage-collect the node at /a/b
gc(t, st)
// Expect transaction that deletes the node at /a/b
- logmu, err = log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- changes, err = processor.processTransaction(logmu)
- if err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
- if len(changes) != 1 {
- t.Fatal("Expected changes to have 1 entry")
- }
- expectDoesNotExist(t, changes, id3, pre3, false)
+ changes = expectTransaction(t, log, processor, 1)
+ watchtesting.ExpectMutationDoesNotExist(t, changes, id3, pre3, false)
}
func TestRawProcessTransactionRemoveRecursive(t *testing.T) {
dbName, st, cleanup := createStore(t)
defer cleanup()
- log, cleanup, processor := openLog(t, dbName)
+ log, cleanup := openLog(t, dbName)
defer cleanup()
+ processor := createRawProcessor(t)
processor, err := newRawProcessor(rootPublicID)
if err != nil {
t.Fatalf("newRawProcessor() failed: %v", err)
}
- logstore, err := log.ReadState(rootPublicID)
- if err != nil {
- t.Fatalf("ReadState() failed: %v", err)
- }
- logst := logstore.State
- changes, err := processor.processState(logst)
- if err != nil {
- t.Fatalf("processState() failed: %v", err)
- }
- if len(changes) != 0 {
- t.Fatal("Expected changes to have 0 entries")
- }
+ expectState(t, log, processor, 0)
// First transaction, put /, /a, /a/b
tr := memstore.NewTransaction()
@@ -187,13 +132,7 @@
// 1) Adds / with value val1 and implicit directory entry /a
// 2) Adds /a with value val2 and implicit directory entry /a/b
// 3) Adds /a/b with value val3
- logmu, err := log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- if _, err := processor.processTransaction(logmu); err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
+ expectTransaction(t, log, processor, 3)
// Next transaction, remove /a
tr = memstore.NewTransaction()
@@ -206,62 +145,32 @@
post1 = st.Snapshot().Find(id1).Version
// Expect transaction that removes implicit dir entry /a from /
- logmu, err = log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- changes, err = processor.processTransaction(logmu)
- if err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
- if len(changes) != 1 {
- t.Fatal("Expected changes to have 1 entry")
- }
- expectExists(t, changes, id1, pre1, post1, true, "val1", empty)
+ changes := expectTransaction(t, log, processor, 1)
+ watchtesting.ExpectMutationExists(t, changes, id1, pre1, post1, true, "val1", watchtesting.EmptyDir)
// Garbage-collect the nodes at /a and /a/b
gc(t, st)
// Expect transaction that deletes the nodes at /a and /a/b
- logmu, err = log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- changes, err = processor.processTransaction(logmu)
- if err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
- if len(changes) != 2 {
- t.Fatal("Expected changes to have 2 entries")
- }
- expectDoesNotExist(t, changes, id2, pre2, false)
- expectDoesNotExist(t, changes, id3, pre3, false)
+ changes = expectTransaction(t, log, processor, 2)
+ watchtesting.ExpectMutationDoesNotExist(t, changes, id2, pre2, false)
+ watchtesting.ExpectMutationDoesNotExist(t, changes, id3, pre3, false)
}
func TestRawProcessTransactionUpdateRemoveRoot(t *testing.T) {
dbName, st, cleanup := createStore(t)
defer cleanup()
- log, cleanup, processor := openLog(t, dbName)
+ log, cleanup := openLog(t, dbName)
defer cleanup()
+ processor := createRawProcessor(t)
processor, err := newRawProcessor(rootPublicID)
if err != nil {
t.Fatalf("newRawProcessor() failed: %v", err)
}
- logstore, err := log.ReadState(rootPublicID)
- if err != nil {
- t.Fatalf("ReadState() failed: %v", err)
- }
- logst := logstore.State
- changes, err := processor.processState(logst)
- if err != nil {
- t.Fatalf("processState() failed: %v", err)
- }
- if len(changes) != 0 {
- t.Fatal("Expected changes to have 0 entries")
- }
+ expectState(t, log, processor, 0)
// First transaction, put /, /a
tr := memstore.NewTransaction()
@@ -275,13 +184,7 @@
// Assume the first transaction
// 1) Adds / with value val1 and implicit directory entry /a
// 2) Adds /a with value val2
- logmu, err := log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- if _, err := processor.processTransaction(logmu); err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
+ expectTransaction(t, log, processor, 2)
// Next transaction, update /
tr = memstore.NewTransaction()
@@ -292,18 +195,8 @@
post1 = st.Snapshot().Find(id1).Version
// Expect transaction that updates / with value val3
- logmu, err = log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- changes, err = processor.processTransaction(logmu)
- if err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
- if len(changes) != 1 {
- t.Fatal("Expected changes to have 1 entry")
- }
- expectExists(t, changes, id1, pre1, post1, true, "val3", dir("a", id2))
+ changes := expectTransaction(t, log, processor, 1)
+ watchtesting.ExpectMutationExists(t, changes, id1, pre1, post1, true, "val3", watchtesting.DirOf("a", id2))
// Next transaction, remove /
tr = memstore.NewTransaction()
@@ -314,33 +207,13 @@
pre2 := post2
// Expect a transaction that deletes /
- logmu, err = log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- changes, err = processor.processTransaction(logmu)
- if err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
- if len(changes) != 1 {
- t.Fatal("Expected changes to have 1 entry")
- }
- expectDoesNotExist(t, changes, id1, pre1, true)
+ changes = expectTransaction(t, log, processor, 1)
+ watchtesting.ExpectMutationDoesNotExist(t, changes, id1, pre1, true)
// Garbage-collect the nodes at / and /a
gc(t, st)
// Expect transaction that deletes the nodes at / and /a
- logmu, err = log.ReadTransaction()
- if err != nil {
- t.Fatalf("ReadTransaction() failed: %v", err)
- }
- changes, err = processor.processTransaction(logmu)
- if err != nil {
- t.Fatalf("processTransaction() failed: %v", err)
- }
- if len(changes) != 1 {
- t.Fatal("Expected changes to have 1 entry")
- }
- expectDoesNotExist(t, changes, id2, pre2, false)
+ changes = expectTransaction(t, log, processor, 1)
+ watchtesting.ExpectMutationDoesNotExist(t, changes, id2, pre2, false)
}
diff --git a/services/store/memstore/watch/test_util.go b/services/store/memstore/watch/test_util.go
index 0ac3293..882e03e 100644
--- a/services/store/memstore/watch/test_util.go
+++ b/services/store/memstore/watch/test_util.go
@@ -7,7 +7,7 @@
"testing"
"veyron/services/store/memstore"
- "veyron/services/store/raw"
+ "veyron/services/store/memstore/state"
"veyron/services/store/service"
"veyron2/security"
@@ -93,22 +93,31 @@
}
}
-func openLog(t *testing.T, dbName string) (*memstore.RLog, func(), reqProcessor) {
+func openLog(t *testing.T, dbName string) (*memstore.RLog, func()) {
log, err := memstore.OpenLog(dbName, true)
if err != nil {
t.Fatalf("openLog() failed: %v", err)
}
- cleanup := func() {
+
+ return log, func() {
log.Close()
}
+}
+func createRawProcessor(t *testing.T) reqProcessor {
processor, err := newRawProcessor(rootPublicID)
if err != nil {
- cleanup()
t.Fatalf("newRawProcessor() failed: %v", err)
}
+ return processor
+}
- return log, cleanup, processor
+func createGlobProcessor(t *testing.T, path storage.PathName, pattern string) reqProcessor {
+ processor, err := newGlobProcessor(rootPublicID, path, pattern)
+ if err != nil {
+ t.Fatalf("newGlobProcessor() failed: %v", err)
+ }
+ return processor
}
func createWatcher(t *testing.T, dbName string) (service.Watcher, func()) {
@@ -121,95 +130,50 @@
}
}
-var (
- empty = []storage.DEntry{}
-)
-
-func dir(name string, id storage.ID) []storage.DEntry {
- return []storage.DEntry{storage.DEntry{
- Name: name,
- ID: id,
- }}
+func expectState(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []watch.Change {
+ st := readState(t, log)
+ return processState(t, processor, st, numChanges)
}
-func expectInitialStateSkipped(t *testing.T, change watch.Change) {
- if change.Name != "" {
- t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
+func readState(t *testing.T, log *memstore.RLog) *state.State {
+ st, err := log.ReadState(rootPublicID)
+ if err != nil {
+ t.Fatalf("ReadState() failed: %v", err)
}
- if change.State != watch.InitialStateSkipped {
- t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
- }
- if len(change.ResumeMarker) != 0 {
- t.Fatalf("Expect no ResumeMarker but was: %v", change.ResumeMarker)
- }
+ return st.State
}
-func expectExists(t *testing.T, changes []watch.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
- change := findChange(t, changes, id)
- if change.State != watch.Exists {
- t.Fatalf("Expected id to exist: %v", id)
+func processState(t *testing.T, processor reqProcessor, st *state.State, numChanges int) []watch.Change {
+ changes, err := processor.processState(st)
+ if err != nil {
+ t.Fatalf("processState() failed: %v", err)
}
- cv := change.Value.(*raw.Mutation)
- if cv.PriorVersion != pre {
- t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
+ if len(changes) != numChanges {
+ t.Fatalf("Expected state to have %d changes, got %d", numChanges, len(changes))
}
- if cv.Version != post {
- t.Fatalf("Expected Version to be %v, but was: %v", post, cv.Version)
- }
- if cv.IsRoot != isRoot {
- t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
- }
- if cv.Value != value {
- t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
- }
- expectDirEquals(t, cv.Dir, dir)
+ return changes
}
-func expectDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID, pre storage.Version, isRoot bool) {
- change := findChange(t, changes, id)
- if change.State != watch.DoesNotExist {
- t.Fatalf("Expected id to not exist: %v", id)
- }
- cv := change.Value.(*raw.Mutation)
- if cv.PriorVersion != pre {
- t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
- }
- if cv.Version != storage.NoVersion {
- t.Fatalf("Expected Version to be NoVersion, but was: %v", cv.Version)
- }
- if cv.IsRoot != isRoot {
- t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
- }
- if cv.Value != nil {
- t.Fatal("Expected Value to be nil")
- }
- if cv.Dir != nil {
- t.Fatal("Expected Dir to be nil")
- }
+func expectTransaction(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []watch.Change {
+ mus := readTransaction(t, log)
+ return processTransaction(t, processor, mus, numChanges)
}
-func findChange(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
- for _, change := range changes {
- cv, ok := change.Value.(*raw.Mutation)
- if !ok {
- t.Fatal("Expected a Mutation")
- }
- if cv.ID == id {
- return change
- }
+func readTransaction(t *testing.T, log *memstore.RLog) *state.Mutations {
+ mus, err := log.ReadTransaction()
+ if err != nil {
+ t.Fatalf("ReadTransaction() failed: %v", err)
}
- t.Fatalf("Expected a change for id: %v", id)
- panic("should not reach here")
+ return mus
}
-func expectDirEquals(t *testing.T, actual, expected []storage.DEntry) {
- if len(actual) != len(expected) {
- t.Fatalf("Expected Dir to have %v refs, but had %v", len(expected), len(actual))
+func processTransaction(t *testing.T, processor reqProcessor, mus *state.Mutations, numChanges int) []watch.Change {
+ changes, err := processor.processTransaction(mus)
+ if err != nil {
+ t.Fatalf("processTransaction() failed: %v", err)
}
- for i, e := range expected {
- a := actual[i]
- if a != e {
- t.Fatalf("Expected Dir entry %v to be %v, but was %v", i, e, a)
- }
+ if len(changes) != numChanges {
+ t.Fatalf("Expected transaction to have %d changes, got %d", numChanges, len(changes))
}
+ return changes
}
diff --git a/services/store/memstore/watch/testing/util.go b/services/store/memstore/watch/testing/util.go
index b3e8037..1e34a32 100644
--- a/services/store/memstore/watch/testing/util.go
+++ b/services/store/memstore/watch/testing/util.go
@@ -3,6 +3,7 @@
import (
"errors"
"sync"
+ "testing"
"time"
"veyron/services/store/raw"
@@ -10,7 +11,9 @@
"veyron2/ipc"
"veyron2/naming"
"veyron2/security"
+ "veyron2/services/store"
"veyron2/services/watch"
+ "veyron2/storage"
)
var (
@@ -144,17 +147,18 @@
s.ctx.Cancel()
}
-func WatchRaw(id security.PublicID, watchFn func(ipc.ServerContext, raw.Request, raw.StoreServiceWatchStream) error, req raw.Request) raw.StoreWatchStream {
+func watchImpl(id security.PublicID, watchFn func(ipc.ServerContext, *watcherServiceWatchStream) error) *watcherWatchStream {
mu := &sync.Mutex{}
ctx := NewCancellableContext(id)
c := make(chan watch.ChangeBatch, 1)
errc := make(chan error, 1)
go func() {
- err := watchFn(ctx, req, &watcherServiceWatchStream{
+ stream := &watcherServiceWatchStream{
mu: mu,
ctx: ctx,
output: c,
- })
+ }
+ err := watchFn(ctx, stream)
mu.Lock()
defer mu.Unlock()
ctx.Cancel()
@@ -168,3 +172,196 @@
err: errc,
}
}
+
+func WatchRaw(id security.PublicID, watchFn func(ipc.ServerContext, raw.Request, raw.StoreServiceWatchStream) error, req raw.Request) raw.StoreWatchStream {
+ return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
+ return watchFn(ctx, req, stream)
+ })
+}
+
+func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req watch.GlobRequest) watch.GlobWatcherWatchGlobStream {
+ return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
+ return watchFn(ctx, req, stream)
+ })
+}
+
+func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req watch.GlobRequest) watch.GlobWatcherWatchGlobStream {
+ return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
+ return watchFn(ctx, path, req, stream)
+ })
+}
+
+func ExpectInitialStateSkipped(t *testing.T, change watch.Change) {
+ if change.Name != "" {
+ t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
+ }
+ if change.State != watch.InitialStateSkipped {
+ t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
+ }
+ if len(change.ResumeMarker) != 0 {
+ t.Fatalf("Expect no ResumeMarker but was: %v", change.ResumeMarker)
+ }
+}
+
+func ExpectEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
+ change := findEntry(t, changes, name)
+ if change.State != watch.Exists {
+ t.Fatalf("Expected name to exist: %v", name)
+ }
+ cv, ok := change.Value.(*storage.Entry)
+ if !ok {
+ t.Fatal("Expected an Entry")
+ }
+ if cv.Stat.ID != id {
+ t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
+ }
+ if cv.Value != value {
+ t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
+ }
+}
+
+func ExpectEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
+ change := findEntry(t, changes, name)
+ if change.State != watch.DoesNotExist {
+ t.Fatalf("Expected name to not exist: %v", name)
+ }
+ if change.Value != nil {
+ t.Fatal("Expected entry to be nil")
+ }
+}
+
+func ExpectServiceEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
+ change := findEntry(t, changes, name)
+ if change.State != watch.Exists {
+ t.Fatalf("Expected name to exist: %v", name)
+ }
+ cv, ok := change.Value.(*store.Entry)
+ if !ok {
+ t.Fatal("Expected a service Entry")
+ }
+ if cv.Stat.ID != id {
+ t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
+ }
+ if cv.Value != value {
+ t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
+ }
+}
+
+func ExpectServiceEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
+ change := findEntry(t, changes, name)
+ if change.State != watch.DoesNotExist {
+ t.Fatalf("Expected name to not exist: %v", name)
+ }
+ if change.Value != nil {
+ t.Fatal("Expected entry to be nil")
+ }
+}
+
+func findEntry(t *testing.T, changes []watch.Change, name string) watch.Change {
+ for _, change := range changes {
+ if change.Name == name {
+ return change
+ }
+ }
+ t.Fatalf("Expected a change for name: %v", name)
+ panic("Should not reach here")
+}
+
+var (
+ EmptyDir = []storage.DEntry{}
+)
+
+func DirOf(name string, id storage.ID) []storage.DEntry {
+ return []storage.DEntry{storage.DEntry{
+ Name: name,
+ ID: id,
+ }}
+}
+
+func ExpectMutationExists(t *testing.T, changes []watch.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
+ change := findMutation(t, changes, id)
+ if change.State != watch.Exists {
+ t.Fatalf("Expected id to exist: %v", id)
+ }
+ cv := change.Value.(*raw.Mutation)
+ if cv.PriorVersion != pre {
+ t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
+ }
+ if cv.Version != post {
+ t.Fatalf("Expected Version to be %v, but was: %v", post, cv.Version)
+ }
+ if cv.IsRoot != isRoot {
+ t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
+ }
+ if cv.Value != value {
+ t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
+ }
+ expectDirEquals(t, cv.Dir, dir)
+}
+
+func ExpectMutationDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID, pre storage.Version, isRoot bool) {
+ change := findMutation(t, changes, id)
+ if change.State != watch.DoesNotExist {
+ t.Fatalf("Expected id to not exist: %v", id)
+ }
+ cv := change.Value.(*raw.Mutation)
+ if cv.PriorVersion != pre {
+ t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
+ }
+ if cv.Version != storage.NoVersion {
+ t.Fatalf("Expected Version to be NoVersion, but was: %v", cv.Version)
+ }
+ if cv.IsRoot != isRoot {
+ t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
+ }
+ if cv.Value != nil {
+ t.Fatal("Expected Value to be nil")
+ }
+ if cv.Dir != nil {
+ t.Fatal("Expected Dir to be nil")
+ }
+}
+
+func ExpectMutationExistsNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID, value string) {
+ change := findMutation(t, changes, id)
+ if change.State != watch.Exists {
+ t.Fatalf("Expected id to exist: %v", id)
+ }
+ cv := change.Value.(*raw.Mutation)
+ if cv.Value != value {
+ t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
+ }
+}
+
+func ExpectMutationDoesNotExistNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID) {
+ change := findMutation(t, changes, id)
+ if change.State != watch.DoesNotExist {
+ t.Fatalf("Expected id to not exist: %v", id)
+ }
+}
+
+func findMutation(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
+ for _, change := range changes {
+ cv, ok := change.Value.(*raw.Mutation)
+ if !ok {
+ t.Fatal("Expected a Mutation")
+ }
+ if cv.ID == id {
+ return change
+ }
+ }
+ t.Fatalf("Expected a change for id: %v", id)
+ panic("Should not reach here")
+}
+
+func expectDirEquals(t *testing.T, actual, expected []storage.DEntry) {
+ if len(actual) != len(expected) {
+ t.Fatalf("Expected Dir to have %v refs, but had %v", len(expected), len(actual))
+ }
+ for i, e := range expected {
+ a := actual[i]
+ if a != e {
+ t.Fatalf("Expected Dir entry %v to be %v, but was %v", i, e, a)
+ }
+ }
+}
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index ac5b10e..5956110 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -55,7 +55,9 @@
}
// WatchRaw returns a stream of all changes.
-func (w *watcher) WatchRaw(ctx ipc.ServerContext, req raw.Request, stream raw.StoreServiceWatchStream) error {
+func (w *watcher) WatchRaw(ctx ipc.ServerContext, req raw.Request,
+ stream raw.StoreServiceWatchStream) error {
+
processor, err := newRawProcessor(ctx.RemoteID())
if err != nil {
return err
@@ -66,12 +68,18 @@
// WatchGlob returns a stream of changes that match a pattern.
func (w *watcher) WatchGlob(ctx ipc.ServerContext, path storage.PathName,
req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
- return verror.Internalf("WatchGlob not yet implemented")
+
+ processor, err := newGlobProcessor(ctx.RemoteID(), path, req.Pattern)
+ if err != nil {
+ return err
+ }
+ return w.Watch(ctx, processor, req.ResumeMarker, stream)
}
// WatchQuery returns a stream of changes that satisfy a query.
func (w *watcher) WatchQuery(ctx ipc.ServerContext, path storage.PathName,
req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
+
return verror.Internalf("WatchQuery not yet implemented")
}
@@ -86,7 +94,9 @@
// sending changes to the specified watch stream. If the call is cancelled or
// otherwise closed early, Watch will terminate and return an error.
// Watch implements the service.Watcher interface.
-func (w *watcher) Watch(ctx ipc.ServerContext, processor reqProcessor, resumeMarker watch.ResumeMarker, stream WatchStream) error {
+func (w *watcher) Watch(ctx ipc.ServerContext, processor reqProcessor,
+ resumeMarker watch.ResumeMarker, stream WatchStream) error {
+
// Closing cancel terminates processRequest.
cancel := make(chan struct{})
defer close(cancel)
@@ -113,7 +123,9 @@
return ErrWatchClosed
}
-func (w *watcher) processRequest(cancel <-chan struct{}, processor reqProcessor, resumeMarker watch.ResumeMarker, stream WatchStream) error {
+func (w *watcher) processRequest(cancel <-chan struct{}, processor reqProcessor,
+ resumeMarker watch.ResumeMarker, stream WatchStream) error {
+
log, err := memstore.OpenLog(w.dbName, true)
if err != nil {
return err
@@ -147,11 +159,12 @@
return err
}
st := store.State
+ // Save timestamp as processState may modify st.
+ timestamp := st.Timestamp()
changes, err := processor.processState(st)
if err != nil {
return err
}
- timestamp := st.Timestamp()
if send, err := filter.shouldProcessChanges(timestamp); err != nil {
return err
} else if send {
@@ -166,11 +179,12 @@
if err != nil {
return err
}
+ // Save timestamp as processTransaction may modify mu.
+ timestamp := mu.Timestamp
changes, err = processor.processTransaction(mu)
if err != nil {
return err
}
- timestamp := mu.Timestamp
if send, err := filter.shouldProcessChanges(timestamp); err != nil {
return err
} else if send {
diff --git a/services/store/memstore/watch/watcher_test.go b/services/store/memstore/watch/watcher_test.go
index 5679339..1ce5af6 100644
--- a/services/store/memstore/watch/watcher_test.go
+++ b/services/store/memstore/watch/watcher_test.go
@@ -1,6 +1,7 @@
package watch
import (
+ "bytes"
"io"
"sync"
"testing"
@@ -10,10 +11,11 @@
watchtesting "veyron/services/store/memstore/watch/testing"
"veyron/services/store/raw"
+ "veyron2/services/watch"
"veyron2/storage"
)
-func TestWatch(t *testing.T) {
+func TestWatchRaw(t *testing.T) {
// Create a new store.
dbName, st, cleanup := createStore(t)
defer cleanup()
@@ -43,7 +45,7 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, storage.NoVersion, post1, true, "val1", empty)
+ watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post1, true, "val1", watchtesting.EmptyDir)
// Put /a
tr = memstore.NewTransaction()
@@ -69,8 +71,63 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, pre1, post1, true, "val1", dir("a", id2))
- expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+ watchtesting.ExpectMutationExists(t, changes, id1, pre1, post1, true, "val1", watchtesting.DirOf("a", id2))
+ watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post2, false, "val2", watchtesting.EmptyDir)
+}
+
+func TestWatchGlob(t *testing.T) {
+ // Create a new store.
+ dbName, st, cleanup := createStore(t)
+ defer cleanup()
+
+ // Create a new watcher.
+ w, cleanup := createWatcher(t, dbName)
+ defer cleanup()
+
+ // Put /
+ tr := memstore.NewTransaction()
+ id1 := put(t, st, tr, "/", "val1")
+ commit(t, tr)
+
+ // Start a watch request.
+ path := storage.ParsePath("/")
+ req := watch.GlobRequest{Pattern: "..."}
+ ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
+
+ // Check that watch detects the changes in the first transaction.
+ cb, err := ws.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes := cb.Changes
+ change := changes[0]
+ if change.Continued {
+ t.Error("Expected change to be the last in this transaction")
+ }
+ watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+
+ // Put /a
+ tr = memstore.NewTransaction()
+ id2 := put(t, st, tr, "/a", "val2")
+ commit(t, tr)
+
+ // Check that watch detects the changes in the second transaction.
+
+ cb, err = ws.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes = cb.Changes
+ change = changes[0]
+ if !change.Continued {
+ t.Error("Expected change to continue the transaction")
+ }
+ change = changes[1]
+ if change.Continued {
+ t.Error("Expected change to be the last in this transaction")
+ }
+ watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+ watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
}
func TestWatchCancellation(t *testing.T) {
@@ -213,7 +270,7 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, storage.NoVersion, post11, true, "val1", empty)
+ watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
cb, err = ws.Recv()
if err != nil {
@@ -228,8 +285,8 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
- expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
+ watchtesting.ExpectMutationExists(t, changes, id1, pre21, post21, true, "val1", watchtesting.DirOf("a", id2))
+ watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post22, false, "val2", watchtesting.EmptyDir)
}
func TestTransactionResumeMarker(t *testing.T) {
@@ -288,7 +345,7 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, storage.NoVersion, post11, true, "val1", empty)
+ watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
cb, err = ws.Recv()
if err != nil {
@@ -304,8 +361,8 @@
t.Error("Expected change to be the last in this transaction")
}
resumeMarker2 := change.ResumeMarker
- expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
- expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
+ watchtesting.ExpectMutationExists(t, changes, id1, pre21, post21, true, "val1", watchtesting.DirOf("a", id2))
+ watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post22, false, "val2", watchtesting.EmptyDir)
// Cancel the watch request.
ws.Cancel()
@@ -329,8 +386,8 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
- expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
+ watchtesting.ExpectMutationExists(t, changes, id1, pre21, post21, true, "val1", watchtesting.DirOf("a", id2))
+ watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post22, false, "val2", watchtesting.EmptyDir)
}
func TestNowResumeMarker(t *testing.T) {
@@ -377,7 +434,7 @@
}
changes := cb.Changes
change := changes[0]
- expectInitialStateSkipped(t, change)
+ watchtesting.ExpectInitialStateSkipped(t, change)
// Check that watch detects the changes in the third transaction.
cb, err = ws.Recv()
@@ -393,8 +450,8 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id2, pre32, post32, false, "val2", dir("b", id3))
- expectExists(t, changes, id3, storage.NoVersion, post33, false, "val3", empty)
+ watchtesting.ExpectMutationExists(t, changes, id2, pre32, post32, false, "val2", watchtesting.DirOf("b", id3))
+ watchtesting.ExpectMutationExists(t, changes, id3, storage.NoVersion, post33, false, "val3", watchtesting.EmptyDir)
}
func TestUnknownResumeMarkers(t *testing.T) {
@@ -431,3 +488,46 @@
t.Errorf("Unexpected error: %v", err)
}
}
+
+func TestConsistentResumeMarkers(t *testing.T) {
+ // Create a new store.
+ dbName, st, cleanup := createStore(t)
+ defer cleanup()
+
+ // Create a new watcher.
+ w, cleanup := createWatcher(t, dbName)
+ defer cleanup()
+
+ // Put /
+ tr := memstore.NewTransaction()
+ put(t, st, tr, "/", "val1")
+ commit(t, tr)
+
+ // Start a watch request.
+ path := storage.ParsePath("/")
+ req := watch.GlobRequest{Pattern: "..."}
+ ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
+
+ cb, err := ws.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes := cb.Changes
+ change := changes[0]
+ // Save the ResumeMarker of the change.
+ r := change.ResumeMarker
+
+ // Start another watch request.
+ ws = watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
+
+ cb, err = ws.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes = cb.Changes
+ change = changes[0]
+ // Expect the same ResumeMarker.
+ if !bytes.Equal(r, change.ResumeMarker) {
+ t.Error("Inconsistent ResumeMarker.")
+ }
+}
diff --git a/services/store/server/object.go b/services/store/server/object.go
index 5964669..f29112e 100644
--- a/services/store/server/object.go
+++ b/services/store/server/object.go
@@ -214,14 +214,46 @@
return nil
}
+// ChangeBatchStream is an interface for streaming responses of type ChangeBatch.
+type ChangeBatchStream interface {
+ // Send places the item onto the output stream, blocking if there is no buffer
+ // space available.
+ Send(watch.ChangeBatch) error
+}
+
+// entryTransformStream implements GlobWatcherServiceWatchGlobStream and
+// QueryWatcherServiceWatchQueryStream. It wraps a ChangeBatchStream,
+// transforming the value in each change from *storage.Entry to *store.Entry.
+type entryTransformStream struct {
+ delegate ChangeBatchStream
+}
+
+func (s *entryTransformStream) Send(cb watch.ChangeBatch) error {
+ // Copy and transform the ChangeBatch.
+ changes := cb.Changes
+ changesCp := make([]watch.Change, len(changes))
+ cbCp := watch.ChangeBatch{changesCp}
+ for i, changeCp := range changes {
+ if changes[i].Value != nil {
+ entry := changes[i].Value.(*storage.Entry)
+ serviceEntry := makeServiceEntry(entry)
+ changeCp.Value = &serviceEntry
+ }
+ changesCp[i] = changeCp
+ }
+ return s.delegate.Send(cbCp)
+}
+
// WatchGlob returns a stream of changes that match a pattern.
func (o *object) WatchGlob(ctx ipc.ServerContext, req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
path := storage.ParsePath(o.name)
+ stream = &entryTransformStream{stream}
return o.server.watcher.WatchGlob(ctx, path, req, stream)
}
// WatchQuery returns a stream of changes that satisfy a query.
func (o *object) WatchQuery(ctx ipc.ServerContext, req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
path := storage.ParsePath(o.name)
+ stream = &entryTransformStream{stream}
return o.server.watcher.WatchQuery(ctx, path, req, stream)
}
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index 157fe1b..8a0ca7f 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -132,38 +132,6 @@
return s, closer
}
-func expectExists(t *testing.T, changes []watch.Change, id storage.ID, value string) {
- change := findChange(t, changes, id)
- if change.State != watch.Exists {
- t.Fatalf("Expected id to exist: %v", id)
- }
- cv := change.Value.(*raw.Mutation)
- if cv.Value != value {
- t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
- }
-}
-
-func expectDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID) {
- change := findChange(t, changes, id)
- if change.State != watch.DoesNotExist {
- t.Fatalf("Expected id to not exist: %v", id)
- }
-}
-
-func findChange(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
- for _, change := range changes {
- cv, ok := change.Value.(*raw.Mutation)
- if !ok {
- t.Fatal("Expected a Mutation")
- }
- if cv.ID == id {
- return change
- }
- }
- t.Fatalf("Expected a change for id: %v", id)
- panic("should not reach here")
-}
-
func TestPutGetRemoveRoot(t *testing.T) {
s, c := newServer()
defer c()
@@ -390,7 +358,7 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, []watch.Change{change}, id1, value1)
+ watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
}
path2 := "/a"
@@ -429,8 +397,107 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, value1)
- expectExists(t, changes, id2, value2)
+ watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
+ watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id2, value2)
+ }
+}
+
+func TestWatchGlob(t *testing.T) {
+ s, c := newServer()
+ defer c()
+
+ value1 := "v1"
+ var id1 storage.ID
+
+ o1 := s.lookupObject("/")
+ o2 := s.lookupObject("/a")
+
+ // Before the watch request has been made, commit a transaction that puts /.
+ {
+ tr := newTransaction()
+ if err := s.CreateTransaction(rootCtx, tr, nil); err != nil {
+ t.Errorf("Unexpected error: %s", err)
+ }
+ st, err := o1.Put(rootCtx, tr, value1)
+ if err != nil {
+ t.Errorf("Unexpected error: %s", err)
+ }
+ id1 = st.ID
+ if err := s.Commit(rootCtx, tr); err != nil {
+ t.Errorf("Unexpected error: %s", err)
+ }
+ }
+
+ // Start watch requests on / and /a.
+ req := watch.GlobRequest{Pattern: "..."}
+ ws1 := watchtesting.WatchGlob(rootPublicID, o1.WatchGlob, req)
+ ws2 := watchtesting.WatchGlob(rootPublicID, o2.WatchGlob, req)
+
+ // The watch on / should send a change on /.
+ {
+ cb, err := ws1.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes := cb.Changes
+ change := changes[0]
+ if change.Continued {
+ t.Error("Expected change to be the last in this transaction")
+ }
+ watchtesting.ExpectServiceEntryExists(t, changes, "", id1, value1)
+ }
+ // The watch on /a should send no change. The first change it sends is
+ // verified below.
+
+ value2 := "v2"
+ var id2 storage.ID
+
+ // Commit a second transaction that puts /a.
+ {
+ tr := newTransaction()
+ if err := s.CreateTransaction(rootCtx, tr, nil); err != nil {
+ t.Errorf("Unexpected error: %s", err)
+ }
+ st, err := o2.Put(rootCtx, tr, value2)
+ if err != nil {
+ t.Errorf("Unexpected error: %s", err)
+ }
+ id2 = st.ID
+ if err := s.Commit(rootCtx, tr); err != nil {
+ t.Errorf("Unexpected error: %s", err)
+ }
+ }
+
+ // The watch on / should send changes on / and /a.
+ {
+ cb, err := ws1.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes := cb.Changes
+ change := changes[0]
+ if !change.Continued {
+ t.Error("Expected change to continue the transaction")
+ }
+ change = changes[1]
+ if change.Continued {
+ t.Error("Expected change to be the last in this transaction")
+ }
+ watchtesting.ExpectServiceEntryExists(t, changes, "", id1, value1)
+ watchtesting.ExpectServiceEntryExists(t, changes, "a", id2, value2)
+ }
+ // The watch on /a should send a change on /a.
+ {
+ cb, err := ws2.Recv()
+ if err != nil {
+ t.Error("Recv() failed: %v", err)
+ }
+ changes := cb.Changes
+ change := changes[0]
+ if change.Continued {
+ t.Error("Expected change to be the last in this transaction")
+ }
+ watchtesting.ExpectServiceEntryExists(t, changes, "a", id2, value2)
}
}
@@ -474,7 +541,7 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, []watch.Change{change}, id1, value1)
+ watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
}
path2 := "/a"
@@ -513,8 +580,8 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, value1)
- expectExists(t, changes, id2, value2)
+ watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
+ watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id2, value2)
}
// Commit a third transaction that removes /a.
@@ -543,7 +610,7 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectExists(t, changes, id1, value1)
+ watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
}
// Check that watch detects the garbage collection of /a.
@@ -557,7 +624,7 @@
if change.Continued {
t.Error("Expected change to be the last in this transaction")
}
- expectDoesNotExist(t, changes, id2)
+ watchtesting.ExpectMutationDoesNotExistNoVersionCheck(t, changes, id2)
}
}