Merge "veyron/services/store: Implement Object.WatchGlob()."
diff --git a/services/store/memstore/query/glob.go b/services/store/memstore/query/glob.go
index 98ce585..3614af3 100644
--- a/services/store/memstore/query/glob.go
+++ b/services/store/memstore/query/glob.go
@@ -18,6 +18,11 @@
 
 // Glob returns an iterator that emits all values that match the given pattern.
 func Glob(sn state.Snapshot, clientID security.PublicID, path storage.PathName, pattern string) (service.GlobStream, error) {
+	return GlobIterator(sn, clientID, path, pattern)
+}
+
+// GlobIterator returns an iterator that emits all values that match the given pattern.
+func GlobIterator(sn state.Snapshot, clientID security.PublicID, path storage.PathName, pattern string) (state.Iterator, error) {
 	parsed, err := glob.Parse(pattern)
 	if err != nil {
 		return nil, err
diff --git a/services/store/memstore/state/cell.go b/services/store/memstore/state/cell.go
index 8671f7b..2913146 100644
--- a/services/store/memstore/state/cell.go
+++ b/services/store/memstore/state/cell.go
@@ -74,7 +74,7 @@
 }
 
 // get the *storage.Entry for a cell.
-func (c *Cell) getEntry() *storage.Entry {
+func (c *Cell) GetEntry() *storage.Entry {
 	entry := newSubfieldEntry(c.Value)
 	c.fillStat(&entry.Stat)
 	return entry
diff --git a/services/store/memstore/state/iterator.go b/services/store/memstore/state/iterator.go
index 791b20e..92614be 100644
--- a/services/store/memstore/state/iterator.go
+++ b/services/store/memstore/state/iterator.go
@@ -118,7 +118,7 @@
 		r.AddValue(v)
 		set = r.Get()
 	} else {
-		it.entry = cell.getEntry()
+		it.entry = cell.GetEntry()
 		it.visited[cell.ID] = struct{}{}
 		set = cell.refs
 	}
@@ -203,7 +203,7 @@
 
 	// Mark as visited.
 	it.visited[n.id] = struct{}{}
-	it.entry, it.path = c.getEntry(), fullPath
+	it.entry, it.path = c.GetEntry(), fullPath
 }
 
 func (it *iterator) Snapshot() Snapshot {
diff --git a/services/store/memstore/state/mutable_snapshot.go b/services/store/memstore/state/mutable_snapshot.go
index 7b7c675..f6677d1 100644
--- a/services/store/memstore/state/mutable_snapshot.go
+++ b/services/store/memstore/state/mutable_snapshot.go
@@ -355,7 +355,7 @@
 	}
 	var e *storage.Entry
 	if len(suffix) == 0 {
-		e = cell.getEntry()
+		e = cell.GetEntry()
 	} else {
 		e = newSubfieldEntry(v)
 	}
diff --git a/services/store/memstore/state/snapshot.go b/services/store/memstore/state/snapshot.go
index e6b84b7..6c4b613 100644
--- a/services/store/memstore/state/snapshot.go
+++ b/services/store/memstore/state/snapshot.go
@@ -98,7 +98,7 @@
 	}
 	var e *storage.Entry
 	if len(suffix) == 0 {
-		e = cell.getEntry()
+		e = cell.GetEntry()
 	} else {
 		e = newSubfieldEntry(v)
 	}
diff --git a/services/store/memstore/state/state.go b/services/store/memstore/state/state.go
index d9170d9..a1cd56c 100644
--- a/services/store/memstore/state/state.go
+++ b/services/store/memstore/state/state.go
@@ -35,6 +35,12 @@
 	return st.timestamp
 }
 
+// DeepCopy creates a copy of the state.  Mutations to the copy do not affect
+// the original, and vice versa.
+func (st *State) DeepCopy() *State {
+	return &State{st.MutableSnapshot(), st.timestamp}
+}
+
 // GC performs a manual garbage collection.
 func (st *State) GC() {
 	st.snapshot.gc()
diff --git a/services/store/memstore/watch/glob_processor.go b/services/store/memstore/watch/glob_processor.go
new file mode 100644
index 0000000..38ea6c1
--- /dev/null
+++ b/services/store/memstore/watch/glob_processor.go
@@ -0,0 +1,191 @@
+package watch
+
+import (
+	"errors"
+
+	iquery "veyron/services/store/memstore/query"
+	"veyron/services/store/memstore/state"
+
+	"veyron2/security"
+	"veyron2/services/watch"
+	"veyron2/storage"
+)
+
+// globProcessor processes log entries into storage entries that match a pattern.
+type globProcessor struct {
+	// hasProcessedState is true iff the initial state has been processed.
+	hasProcessedState bool
+	// pid is the identity of the client watching for changes.
+	pid security.PublicID
+	// path on which the watch is placed. Returned names are rooted at this path.
+	path storage.PathName
+	// pattern that the returned names match.
+	pattern string
+	// st is the store state as of the last processed event.
+	st *state.State
+	// matches is a map of each matching name to the id of the object at that
+	// name, as of the last processed event.
+	matches map[string]storage.ID
+}
+
+func newGlobProcessor(pid security.PublicID, path storage.PathName,
+	pattern string) (reqProcessor, error) {
+
+	return &globProcessor{
+		hasProcessedState: false,
+		pid:               pid,
+		path:              path,
+		pattern:           pattern,
+	}, nil
+}
+
+func (p *globProcessor) processState(st *state.State) ([]watch.Change, error) {
+	// Check that the initial state has not already been processed.
+	if p.hasProcessedState {
+		return nil, errors.New("cannot process state after processing the initial state")
+	}
+	p.hasProcessedState = true
+
+	// Find all names that match the pattern.
+	sn := st.MutableSnapshot()
+	matches, err := glob(sn, p.pid, p.path, p.pattern)
+	if err != nil {
+		return nil, err
+	}
+	p.st = st
+	p.matches = matches
+
+	var changes []watch.Change
+
+	// Create a change for every matching name.
+	for name, id := range matches {
+		cell := sn.Find(id)
+		entry := cell.GetEntry()
+		change := watch.Change{
+			Name:  name,
+			State: watch.Exists,
+			Value: entry,
+		}
+		// TODO(tilaks): don't clone change.
+		changes = append(changes, change)
+	}
+
+	return changes, nil
+}
+
+func (p *globProcessor) processTransaction(mus *state.Mutations) ([]watch.Change, error) {
+	// Ensure that the initial state has been processed.
+	if !p.hasProcessedState {
+		return nil, errors.New("cannot process a transaction before processing the initial state")
+	}
+
+	previousMatches := p.matches
+	// Apply the transaction to the state.
+	if err := p.st.ApplyMutations(mus); err != nil {
+		return nil, err
+	}
+	// Find all names that match the pattern in the new state.
+	sn := p.st.MutableSnapshot()
+	newMatches, err := glob(sn, p.pid, p.path, p.pattern)
+	if err != nil {
+		return nil, err
+	}
+	p.matches = newMatches
+
+	var changes []watch.Change
+
+	removed, updated := diffMatches(previousMatches, newMatches, mus.Delta)
+
+	// Create a change for every matching name that was removed.
+	for name := range removed {
+		change := watch.Change{
+			Name:  name,
+			State: watch.DoesNotExist,
+		}
+		// TODO(tilaks): don't clone change
+		changes = append(changes, change)
+	}
+
+	// Create a change for every matching name that was updated.
+	for name := range updated {
+		id := newMatches[name]
+		cell := sn.Find(id)
+		entry := cell.GetEntry()
+		change := watch.Change{
+			Name:  name,
+			State: watch.Exists,
+			Value: entry,
+		}
+		// TODO(tilaks): don't clone change.
+		changes = append(changes, change)
+	}
+
+	return changes, nil
+}
+
+// diffMatches returns the names that have been removed or updated.
+//
+// A name is removed if it can no longer be resolved, or if the object at that
+// name is no longer accessible.
+//
+// A name is updated if
+// 1) it is newly added.
+// 2) the object at that name is now accessible.
+// 3) the object at the name has a new value or new references.
+// 4) the object at that name replaced a previous object.
+func diffMatches(previousMatches, newMatches map[string]storage.ID,
+	delta map[storage.ID]*state.Mutation) (removed, updated map[string]struct{}) {
+
+	removed = make(map[string]struct{})
+	updated = make(map[string]struct{})
+	present := struct{}{}
+
+	for name, previousID := range previousMatches {
+		if newID, ok := newMatches[name]; !ok {
+			// There is no longer an object at this name.
+			removed[name] = present
+		} else if newID != previousID {
+			// The object at this name was replaced.
+			updated[name] = present
+		}
+	}
+
+	for name, newID := range newMatches {
+		if _, ok := previousMatches[name]; !ok {
+			// An object was added at this name.
+			updated[name] = present
+			continue
+		}
+		if _, ok := delta[newID]; ok {
+			// The value or implicit directory of the object at this name was
+			// updated.
+			updated[name] = present
+		}
+	}
+
+	return
+}
+
+// glob returns all names in a snapshot that match a pattern. Each name maps to
+// the id of the object in the snapshot at that name.
+func glob(sn state.Snapshot, pid security.PublicID, path storage.PathName,
+	pattern string) (map[string]storage.ID, error) {
+
+	matches := make(map[string]storage.ID)
+
+	it, err := iquery.GlobIterator(sn, pid, path, pattern)
+	if err != nil {
+		return nil, err
+	}
+
+	for it.IsValid() {
+		name := it.Name()
+		matchName := append(path, storage.ParsePath(name)...).String()
+		entry := it.Get()
+		id := entry.Stat.ID
+		matches[matchName] = id
+		it.Next()
+	}
+
+	return matches, nil
+}
diff --git a/services/store/memstore/watch/glob_processor_test.go b/services/store/memstore/watch/glob_processor_test.go
new file mode 100644
index 0000000..fc2c310
--- /dev/null
+++ b/services/store/memstore/watch/glob_processor_test.go
@@ -0,0 +1,387 @@
+package watch
+
+import (
+	"testing"
+
+	"veyron/services/store/memstore"
+	watchtesting "veyron/services/store/memstore/watch/testing"
+
+	"veyron2/storage"
+)
+
+func TestGlobProcessState(t *testing.T) {
+	// Create a new store.
+	dbName, st, cleanup := createStore(t)
+	defer cleanup()
+
+	// Put /, /a, /a/b.
+	tr := memstore.NewTransaction()
+	id1 := put(t, st, tr, "/", "val1")
+	id2 := put(t, st, tr, "/a", "val2")
+	put(t, st, tr, "/a/b", "val3")
+	commit(t, tr)
+
+	// Remove /a/b.
+	tr = memstore.NewTransaction()
+	remove(t, st, tr, "/a/b")
+	commit(t, tr)
+	gc(t, st)
+
+	if err := st.Close(); err != nil {
+		t.Fatalf("Close() failed: %v", err)
+	}
+
+	// Re-create a new store. This should compress the log, creating an initial
+	// state containing / and /a.
+	st, cleanup = openStore(t, dbName)
+	defer cleanup()
+
+	log, cleanup := openLog(t, dbName)
+	defer cleanup()
+
+	rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+	rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+	aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+	aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+
+	// Expect initial state that contains / and /a.
+	logst := readState(t, log)
+
+	changes := processState(t, rootRecursiveProcessor, logst, 2)
+	watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+	changes = processState(t, rootListProcessor, logst, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+	changes = processState(t, aRecursiveProcessor, logst, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+	processState(t, aListProcessor, logst, 0)
+}
+
+func TestGlobProcessTransactionAdd(t *testing.T) {
+	dbName, st, cleanup := createStore(t)
+	defer cleanup()
+
+	log, cleanup := openLog(t, dbName)
+	defer cleanup()
+
+	rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+	rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+	aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+	aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+	bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+	logst := readState(t, log)
+	processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, rootListProcessor, logst.DeepCopy(), 0)
+	processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, aListProcessor, logst.DeepCopy(), 0)
+	processState(t, bRecursiveProcessor, logst.DeepCopy(), 0)
+
+	// First transaction, put /, /a, /a/b.
+	tr := memstore.NewTransaction()
+	id1 := put(t, st, tr, "/", "val1")
+	id2 := put(t, st, tr, "/a", "val2")
+	id3 := put(t, st, tr, "/a/b", "val3")
+	commit(t, tr)
+
+	// Expect transaction that adds /, /a and /a/b.
+	mus := readTransaction(t, log)
+
+	changes := processTransaction(t, rootRecursiveProcessor, mus, 3)
+	watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+	watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+
+	changes = processTransaction(t, rootListProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+	changes = processTransaction(t, aRecursiveProcessor, mus, 2)
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+	watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+
+	changes = processTransaction(t, aListProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+
+	changes = processTransaction(t, bRecursiveProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+}
+
+func TestGlobProcessTransactionEmptyPath(t *testing.T) {
+	dbName, st, cleanup := createStore(t)
+	defer cleanup()
+
+	log, cleanup := openLog(t, dbName)
+	defer cleanup()
+
+	bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+	expectState(t, log, bRecursiveProcessor, 0)
+
+	// First transaction, put /, /a.
+	tr := memstore.NewTransaction()
+	put(t, st, tr, "/", "val1")
+	put(t, st, tr, "/a", "val2")
+	commit(t, tr)
+
+	// Expect no change.
+	expectTransaction(t, log, bRecursiveProcessor, 0)
+
+	// Next transaction, put /a/b.
+	tr = memstore.NewTransaction()
+	id3 := put(t, st, tr, "/a/b", "val3")
+	commit(t, tr)
+
+	// Expect transaction that adds /a/b.
+	changes := expectTransaction(t, log, bRecursiveProcessor, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val3")
+}
+
+func TestGlobProcessTransactionUpdate(t *testing.T) {
+	dbName, st, cleanup := createStore(t)
+	defer cleanup()
+
+	log, cleanup := openLog(t, dbName)
+	defer cleanup()
+
+	rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+	rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+	aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+	aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+	bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+	logst := readState(t, log)
+	processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, rootListProcessor, logst.DeepCopy(), 0)
+	processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, aListProcessor, logst.DeepCopy(), 0)
+	processState(t, bRecursiveProcessor, logst.DeepCopy(), 0)
+
+	// First transaction, put /, /a, /a/b.
+	tr := memstore.NewTransaction()
+	put(t, st, tr, "/", "val1")
+	put(t, st, tr, "/a", "val2")
+	id3 := put(t, st, tr, "/a/b", "val3")
+	commit(t, tr)
+
+	mus := readTransaction(t, log)
+	changes := processTransaction(t, rootRecursiveProcessor, mus, 3)
+	changes = processTransaction(t, rootListProcessor, mus, 1)
+	changes = processTransaction(t, aRecursiveProcessor, mus, 2)
+	changes = processTransaction(t, aListProcessor, mus, 1)
+	changes = processTransaction(t, bRecursiveProcessor, mus, 1)
+
+	// Next transaction, remove /a/b.
+	tr = memstore.NewTransaction()
+	put(t, st, tr, "/a/b", "val4")
+	commit(t, tr)
+
+	// Expect transaction that updates /a/b.
+	mus = readTransaction(t, log)
+
+	changes = processTransaction(t, rootRecursiveProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val4")
+
+	processTransaction(t, rootListProcessor, mus, 0)
+
+	changes = processTransaction(t, aRecursiveProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val4")
+
+	processTransaction(t, aListProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val4")
+
+	processTransaction(t, bRecursiveProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a/b", id3, "val4")
+}
+
+func TestGlobProcessTransactionRemove(t *testing.T) {
+	dbName, st, cleanup := createStore(t)
+	defer cleanup()
+
+	log, cleanup := openLog(t, dbName)
+	defer cleanup()
+
+	rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+	rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+	aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+	aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+	bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+	logst := readState(t, log)
+	processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, rootListProcessor, logst.DeepCopy(), 0)
+	processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, aListProcessor, logst.DeepCopy(), 0)
+	processState(t, bRecursiveProcessor, logst.DeepCopy(), 0)
+
+	// First transaction, put /, /a, /a/b.
+	tr := memstore.NewTransaction()
+	put(t, st, tr, "/", "val1")
+	id2 := put(t, st, tr, "/a", "val2")
+	put(t, st, tr, "/a/b", "val3")
+	commit(t, tr)
+
+	mus := readTransaction(t, log)
+	processTransaction(t, rootRecursiveProcessor, mus, 3)
+	processTransaction(t, rootListProcessor, mus, 1)
+	processTransaction(t, aRecursiveProcessor, mus, 2)
+	processTransaction(t, aListProcessor, mus, 1)
+	processTransaction(t, bRecursiveProcessor, mus, 1)
+
+	// Next transaction, remove /a/b.
+	tr = memstore.NewTransaction()
+	remove(t, st, tr, "/a/b")
+	commit(t, tr)
+
+	// Expect transaction that updates /a and removes /a/b.
+	mus = readTransaction(t, log)
+
+	changes := processTransaction(t, rootRecursiveProcessor, mus, 2)
+	// TODO(tilaks): Should we report implicit directory changes?
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+	changes = processTransaction(t, rootListProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+	changes = processTransaction(t, aRecursiveProcessor, mus, 2)
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+	changes = processTransaction(t, aListProcessor, mus, 1)
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+	changes = processTransaction(t, bRecursiveProcessor, mus, 1)
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+	// Garbage-collect the node at /a/b.
+	gc(t, st)
+
+	// Expect no change.
+	mus = readTransaction(t, log)
+	processTransaction(t, rootRecursiveProcessor, mus, 0)
+	processTransaction(t, rootListProcessor, mus, 0)
+	processTransaction(t, aRecursiveProcessor, mus, 0)
+	processTransaction(t, aListProcessor, mus, 0)
+	processTransaction(t, bRecursiveProcessor, mus, 0)
+}
+
+func TestGlobProcessTransactionRemoveRecursive(t *testing.T) {
+	dbName, st, cleanup := createStore(t)
+	defer cleanup()
+
+	log, cleanup := openLog(t, dbName)
+	defer cleanup()
+
+	rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+	rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+	aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+	aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+	bRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a/b"), "...")
+
+	logst := readState(t, log)
+	processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, rootListProcessor, logst.DeepCopy(), 0)
+	processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, aListProcessor, logst.DeepCopy(), 0)
+	processState(t, bRecursiveProcessor, logst.DeepCopy(), 0)
+
+	// First transaction, put /, /a, /a/b.
+	tr := memstore.NewTransaction()
+	id1 := put(t, st, tr, "/", "val1")
+	put(t, st, tr, "/a", "val2")
+	put(t, st, tr, "/a/b", "val3")
+	commit(t, tr)
+
+	mus := readTransaction(t, log)
+	processTransaction(t, rootRecursiveProcessor, mus, 3)
+	processTransaction(t, rootListProcessor, mus, 1)
+	processTransaction(t, aRecursiveProcessor, mus, 2)
+	processTransaction(t, aListProcessor, mus, 1)
+	processTransaction(t, bRecursiveProcessor, mus, 1)
+
+	// Next transaction, remove /a.
+	tr = memstore.NewTransaction()
+	remove(t, st, tr, "/a")
+	commit(t, tr)
+
+	// Expect transaction that removes /a and /a/b.
+	mus = readTransaction(t, log)
+
+	changes := processTransaction(t, rootRecursiveProcessor, mus, 3)
+	// TODO(tilaks): Should we report implicit directory changes?
+	watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a")
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+	changes = processTransaction(t, rootListProcessor, mus, 1)
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a")
+
+	changes = processTransaction(t, aRecursiveProcessor, mus, 2)
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a")
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+	changes = processTransaction(t, aListProcessor, mus, 1)
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+
+	changes = processTransaction(t, bRecursiveProcessor, mus, 1)
+	watchtesting.ExpectEntryDoesNotExist(t, changes, "a/b")
+}
+
+func TestGlobProcessTransactionReplace(t *testing.T) {
+	dbName, st, cleanup := createStore(t)
+	defer cleanup()
+
+	log, cleanup := openLog(t, dbName)
+	defer cleanup()
+
+	rootRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/"), "...")
+	rootListProcessor := createGlobProcessor(t, storage.ParsePath("/"), "*")
+	aRecursiveProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "...")
+	aListProcessor := createGlobProcessor(t, storage.ParsePath("/a"), "*")
+
+	logst := readState(t, log)
+
+	processState(t, rootRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, rootListProcessor, logst.DeepCopy(), 0)
+	processState(t, aRecursiveProcessor, logst.DeepCopy(), 0)
+	processState(t, aListProcessor, logst.DeepCopy(), 0)
+
+	// First transaction, put /, /a, /a/b.
+	tr := memstore.NewTransaction()
+	id1 := put(t, st, tr, "/", "val1")
+	put(t, st, tr, "/a", "val2")
+	commit(t, tr)
+
+	mus := readTransaction(t, log)
+	processTransaction(t, rootRecursiveProcessor, mus, 2)
+	processTransaction(t, rootListProcessor, mus, 1)
+	processTransaction(t, aRecursiveProcessor, mus, 1)
+	processTransaction(t, aListProcessor, mus, 0)
+
+	// Next transaction, replace /a.
+	tr = memstore.NewTransaction()
+	remove(t, st, tr, "/a")
+	id2 := put(t, st, tr, "/a", "val2")
+	commit(t, tr)
+
+	// Expect transaction that updates /a.
+	mus = readTransaction(t, log)
+
+	changes := processTransaction(t, rootRecursiveProcessor, mus, 2)
+	// TODO(tilaks): Should we report implicit directory changes?
+	watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+	changes = processTransaction(t, rootListProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+	changes = processTransaction(t, aRecursiveProcessor, mus, 1)
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
+
+	processTransaction(t, aListProcessor, mus, 0)
+}
+
+// TODO(tilaks): test ACL update.
diff --git a/services/store/memstore/watch/processor.go b/services/store/memstore/watch/processor.go
index 9fcc394..ab9bb69 100644
--- a/services/store/memstore/watch/processor.go
+++ b/services/store/memstore/watch/processor.go
@@ -14,9 +14,11 @@
 	// the store. The returned changes need not be the sequence of changes that
 	// originally created the initial state (e.g. in the case of compress), but
 	// are sufficient to re-construct the state viewable within the request.
+	// processState may modify its input.
 	processState(st *state.State) ([]watch.Change, error)
 
 	// processTransaction returns the set of changes made in some transaction.
 	// The changes are returned in no specific order.
+	// processTransaction may modify its input.
 	processTransaction(mu *state.Mutations) ([]watch.Change, error)
 }
diff --git a/services/store/memstore/watch/raw_processor_test.go b/services/store/memstore/watch/raw_processor_test.go
index 34b0c25..8599ef0 100644
--- a/services/store/memstore/watch/raw_processor_test.go
+++ b/services/store/memstore/watch/raw_processor_test.go
@@ -4,6 +4,8 @@
 	"testing"
 
 	"veyron/services/store/memstore"
+	watchtesting "veyron/services/store/memstore/watch/testing"
+
 	"veyron2/storage"
 )
 
@@ -34,8 +36,9 @@
 	st, cleanup = openStore(t, dbName)
 	defer cleanup()
 
-	log, cleanup, processor := openLog(t, dbName)
+	log, cleanup := openLog(t, dbName)
 	defer cleanup()
+	processor := createRawProcessor(t)
 
 	post1 := st.Snapshot().Find(id1).Version
 	post2 := st.Snapshot().Find(id2).Version
@@ -43,38 +46,20 @@
 	// Expect initial state that
 	// 1) Contains / with value val1 and implicit directory entry /a
 	// 2) Contains /a with value val2
-	logstore, err := log.ReadState(rootPublicID)
-	if err != nil {
-		t.Fatalf("ReadState() failed: %v", err)
-	}
-	logst := logstore.State
-	changes, err := processor.processState(logst)
-	if len(changes) != 2 {
-		t.Fatalf("Expected changes to have 2 entries, got: %v", changes)
-	}
-	expectExists(t, changes, id1, storage.NoVersion, post1, true, "val1", dir("a", id2))
-	expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+	changes := expectState(t, log, processor, 2)
+	watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post1, true, "val1", watchtesting.DirOf("a", id2))
+	watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post2, false, "val2", watchtesting.EmptyDir)
 }
 
 func TestRawProcessTransactionAddRemove(t *testing.T) {
 	dbName, st, cleanup := createStore(t)
 	defer cleanup()
 
-	log, cleanup, processor := openLog(t, dbName)
+	log, cleanup := openLog(t, dbName)
 	defer cleanup()
+	processor := createRawProcessor(t)
 
-	logstore, err := log.ReadState(rootPublicID)
-	if err != nil {
-		t.Fatalf("ReadState() failed: %v", err)
-	}
-	logst := logstore.State
-	changes, err := processor.processState(logst)
-	if err != nil {
-		t.Fatalf("processState() failed: %v", err)
-	}
-	if len(changes) != 0 {
-		t.Fatal("Expected changes to have 0 entries")
-	}
+	expectState(t, log, processor, 0)
 
 	// First transaction, put /, /a, /a/b
 	tr := memstore.NewTransaction()
@@ -91,20 +76,10 @@
 	// 1) Adds / with value val1 and implicit directory entry /a
 	// 2) Adds /a with value val2 and implicit directory entry /a/b
 	// 3) Adds /a/b with value val3
-	logmu, err := log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	changes, err = processor.processTransaction(logmu)
-	if err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
-	if len(changes) != 3 {
-		t.Fatal("Expected changes to have 3 entries")
-	}
-	expectExists(t, changes, id1, storage.NoVersion, post1, true, "val1", dir("a", id2))
-	expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", dir("b", id3))
-	expectExists(t, changes, id3, storage.NoVersion, post3, false, "val3", empty)
+	changes := expectTransaction(t, log, processor, 3)
+	watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post1, true, "val1", watchtesting.DirOf("a", id2))
+	watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post2, false, "val2", watchtesting.DirOf("b", id3))
+	watchtesting.ExpectMutationExists(t, changes, id3, storage.NoVersion, post3, false, "val3", watchtesting.EmptyDir)
 
 	// Next transaction, remove /a/b
 	tr = memstore.NewTransaction()
@@ -116,61 +91,31 @@
 	post2 = st.Snapshot().Find(id2).Version
 
 	// Expect transaction that removes implicit dir entry /a/b from /a
-	logmu, err = log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	changes, err = processor.processTransaction(logmu)
-	if err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
-	if len(changes) != 1 {
-		t.Fatal("Expected changes to have 1 entry")
-	}
-	expectExists(t, changes, id2, pre2, post2, false, "val2", empty)
+	changes = expectTransaction(t, log, processor, 1)
+	watchtesting.ExpectMutationExists(t, changes, id2, pre2, post2, false, "val2", watchtesting.EmptyDir)
 
 	// Garbage-collect the node at /a/b
 	gc(t, st)
 
 	// Expect transaction that deletes the node at /a/b
-	logmu, err = log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	changes, err = processor.processTransaction(logmu)
-	if err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
-	if len(changes) != 1 {
-		t.Fatal("Expected changes to have 1 entry")
-	}
-	expectDoesNotExist(t, changes, id3, pre3, false)
+	changes = expectTransaction(t, log, processor, 1)
+	watchtesting.ExpectMutationDoesNotExist(t, changes, id3, pre3, false)
 }
 
 func TestRawProcessTransactionRemoveRecursive(t *testing.T) {
 	dbName, st, cleanup := createStore(t)
 	defer cleanup()
 
-	log, cleanup, processor := openLog(t, dbName)
+	log, cleanup := openLog(t, dbName)
 	defer cleanup()
+	processor := createRawProcessor(t)
 
 	processor, err := newRawProcessor(rootPublicID)
 	if err != nil {
 		t.Fatalf("newRawProcessor() failed: %v", err)
 	}
 
-	logstore, err := log.ReadState(rootPublicID)
-	if err != nil {
-		t.Fatalf("ReadState() failed: %v", err)
-	}
-	logst := logstore.State
-	changes, err := processor.processState(logst)
-	if err != nil {
-		t.Fatalf("processState() failed: %v", err)
-	}
-	if len(changes) != 0 {
-		t.Fatal("Expected changes to have 0 entries")
-	}
+	expectState(t, log, processor, 0)
 
 	// First transaction, put /, /a, /a/b
 	tr := memstore.NewTransaction()
@@ -187,13 +132,7 @@
 	// 1) Adds / with value val1 and implicit directory entry /a
 	// 2) Adds /a with value val2 and implicit directory entry /a/b
 	// 3) Adds /a/b with value val3
-	logmu, err := log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	if _, err := processor.processTransaction(logmu); err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
+	expectTransaction(t, log, processor, 3)
 
 	// Next transaction, remove /a
 	tr = memstore.NewTransaction()
@@ -206,62 +145,32 @@
 	post1 = st.Snapshot().Find(id1).Version
 
 	// Expect transaction that removes implicit dir entry /a from /
-	logmu, err = log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	changes, err = processor.processTransaction(logmu)
-	if err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
-	if len(changes) != 1 {
-		t.Fatal("Expected changes to have 1 entry")
-	}
-	expectExists(t, changes, id1, pre1, post1, true, "val1", empty)
+	changes := expectTransaction(t, log, processor, 1)
+	watchtesting.ExpectMutationExists(t, changes, id1, pre1, post1, true, "val1", watchtesting.EmptyDir)
 
 	// Garbage-collect the nodes at /a and /a/b
 	gc(t, st)
 
 	// Expect transaction that deletes the nodes at /a and /a/b
-	logmu, err = log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	changes, err = processor.processTransaction(logmu)
-	if err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
-	if len(changes) != 2 {
-		t.Fatal("Expected changes to have 2 entries")
-	}
-	expectDoesNotExist(t, changes, id2, pre2, false)
-	expectDoesNotExist(t, changes, id3, pre3, false)
+	changes = expectTransaction(t, log, processor, 2)
+	watchtesting.ExpectMutationDoesNotExist(t, changes, id2, pre2, false)
+	watchtesting.ExpectMutationDoesNotExist(t, changes, id3, pre3, false)
 }
 
 func TestRawProcessTransactionUpdateRemoveRoot(t *testing.T) {
 	dbName, st, cleanup := createStore(t)
 	defer cleanup()
 
-	log, cleanup, processor := openLog(t, dbName)
+	log, cleanup := openLog(t, dbName)
 	defer cleanup()
+	processor := createRawProcessor(t)
 
 	processor, err := newRawProcessor(rootPublicID)
 	if err != nil {
 		t.Fatalf("newRawProcessor() failed: %v", err)
 	}
 
-	logstore, err := log.ReadState(rootPublicID)
-	if err != nil {
-		t.Fatalf("ReadState() failed: %v", err)
-	}
-	logst := logstore.State
-	changes, err := processor.processState(logst)
-	if err != nil {
-		t.Fatalf("processState() failed: %v", err)
-	}
-	if len(changes) != 0 {
-		t.Fatal("Expected changes to have 0 entries")
-	}
+	expectState(t, log, processor, 0)
 
 	// First transaction, put /, /a
 	tr := memstore.NewTransaction()
@@ -275,13 +184,7 @@
 	// Assume the first transaction
 	// 1) Adds / with value val1 and implicit directory entry /a
 	// 2) Adds /a with value val2
-	logmu, err := log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	if _, err := processor.processTransaction(logmu); err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
+	expectTransaction(t, log, processor, 2)
 
 	// Next transaction, update /
 	tr = memstore.NewTransaction()
@@ -292,18 +195,8 @@
 	post1 = st.Snapshot().Find(id1).Version
 
 	// Expect transaction that updates / with value val3
-	logmu, err = log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	changes, err = processor.processTransaction(logmu)
-	if err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
-	if len(changes) != 1 {
-		t.Fatal("Expected changes to have 1 entry")
-	}
-	expectExists(t, changes, id1, pre1, post1, true, "val3", dir("a", id2))
+	changes := expectTransaction(t, log, processor, 1)
+	watchtesting.ExpectMutationExists(t, changes, id1, pre1, post1, true, "val3", watchtesting.DirOf("a", id2))
 
 	// Next transaction, remove /
 	tr = memstore.NewTransaction()
@@ -314,33 +207,13 @@
 	pre2 := post2
 
 	// Expect a transaction that deletes /
-	logmu, err = log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	changes, err = processor.processTransaction(logmu)
-	if err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
-	if len(changes) != 1 {
-		t.Fatal("Expected changes to have 1 entry")
-	}
-	expectDoesNotExist(t, changes, id1, pre1, true)
+	changes = expectTransaction(t, log, processor, 1)
+	watchtesting.ExpectMutationDoesNotExist(t, changes, id1, pre1, true)
 
 	// Garbage-collect the nodes at / and /a
 	gc(t, st)
 
 	// Expect transaction that deletes the nodes at / and /a
-	logmu, err = log.ReadTransaction()
-	if err != nil {
-		t.Fatalf("ReadTransaction() failed: %v", err)
-	}
-	changes, err = processor.processTransaction(logmu)
-	if err != nil {
-		t.Fatalf("processTransaction() failed: %v", err)
-	}
-	if len(changes) != 1 {
-		t.Fatal("Expected changes to have 1 entry")
-	}
-	expectDoesNotExist(t, changes, id2, pre2, false)
+	changes = expectTransaction(t, log, processor, 1)
+	watchtesting.ExpectMutationDoesNotExist(t, changes, id2, pre2, false)
 }
diff --git a/services/store/memstore/watch/test_util.go b/services/store/memstore/watch/test_util.go
index 0ac3293..882e03e 100644
--- a/services/store/memstore/watch/test_util.go
+++ b/services/store/memstore/watch/test_util.go
@@ -7,7 +7,7 @@
 	"testing"
 
 	"veyron/services/store/memstore"
-	"veyron/services/store/raw"
+	"veyron/services/store/memstore/state"
 	"veyron/services/store/service"
 
 	"veyron2/security"
@@ -93,22 +93,31 @@
 	}
 }
 
-func openLog(t *testing.T, dbName string) (*memstore.RLog, func(), reqProcessor) {
+func openLog(t *testing.T, dbName string) (*memstore.RLog, func()) {
 	log, err := memstore.OpenLog(dbName, true)
 	if err != nil {
 		t.Fatalf("openLog() failed: %v", err)
 	}
-	cleanup := func() {
+
+	return log, func() {
 		log.Close()
 	}
+}
 
+func createRawProcessor(t *testing.T) reqProcessor {
 	processor, err := newRawProcessor(rootPublicID)
 	if err != nil {
-		cleanup()
 		t.Fatalf("newRawProcessor() failed: %v", err)
 	}
+	return processor
+}
 
-	return log, cleanup, processor
+func createGlobProcessor(t *testing.T, path storage.PathName, pattern string) reqProcessor {
+	processor, err := newGlobProcessor(rootPublicID, path, pattern)
+	if err != nil {
+		t.Fatalf("newGlobProcessor() failed: %v", err)
+	}
+	return processor
 }
 
 func createWatcher(t *testing.T, dbName string) (service.Watcher, func()) {
@@ -121,95 +130,50 @@
 	}
 }
 
-var (
-	empty = []storage.DEntry{}
-)
-
-func dir(name string, id storage.ID) []storage.DEntry {
-	return []storage.DEntry{storage.DEntry{
-		Name: name,
-		ID:   id,
-	}}
+func expectState(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []watch.Change {
+	st := readState(t, log)
+	return processState(t, processor, st, numChanges)
 }
 
-func expectInitialStateSkipped(t *testing.T, change watch.Change) {
-	if change.Name != "" {
-		t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
+func readState(t *testing.T, log *memstore.RLog) *state.State {
+	st, err := log.ReadState(rootPublicID)
+	if err != nil {
+		t.Fatalf("ReadState() failed: %v", err)
 	}
-	if change.State != watch.InitialStateSkipped {
-		t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
-	}
-	if len(change.ResumeMarker) != 0 {
-		t.Fatalf("Expect no ResumeMarker but was: %v", change.ResumeMarker)
-	}
+	return st.State
 }
 
-func expectExists(t *testing.T, changes []watch.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
-	change := findChange(t, changes, id)
-	if change.State != watch.Exists {
-		t.Fatalf("Expected id to exist: %v", id)
+func processState(t *testing.T, processor reqProcessor, st *state.State, numChanges int) []watch.Change {
+	changes, err := processor.processState(st)
+	if err != nil {
+		t.Fatalf("processState() failed: %v", err)
 	}
-	cv := change.Value.(*raw.Mutation)
-	if cv.PriorVersion != pre {
-		t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
+	if len(changes) != numChanges {
+		t.Fatalf("Expected state to have %d changes, got %d", numChanges, len(changes))
 	}
-	if cv.Version != post {
-		t.Fatalf("Expected Version to be %v, but was: %v", post, cv.Version)
-	}
-	if cv.IsRoot != isRoot {
-		t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
-	}
-	if cv.Value != value {
-		t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
-	}
-	expectDirEquals(t, cv.Dir, dir)
+	return changes
 }
 
-func expectDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID, pre storage.Version, isRoot bool) {
-	change := findChange(t, changes, id)
-	if change.State != watch.DoesNotExist {
-		t.Fatalf("Expected id to not exist: %v", id)
-	}
-	cv := change.Value.(*raw.Mutation)
-	if cv.PriorVersion != pre {
-		t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
-	}
-	if cv.Version != storage.NoVersion {
-		t.Fatalf("Expected Version to be NoVersion, but was: %v", cv.Version)
-	}
-	if cv.IsRoot != isRoot {
-		t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
-	}
-	if cv.Value != nil {
-		t.Fatal("Expected Value to be nil")
-	}
-	if cv.Dir != nil {
-		t.Fatal("Expected Dir to be nil")
-	}
+func expectTransaction(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []watch.Change {
+	mus := readTransaction(t, log)
+	return processTransaction(t, processor, mus, numChanges)
 }
 
-func findChange(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
-	for _, change := range changes {
-		cv, ok := change.Value.(*raw.Mutation)
-		if !ok {
-			t.Fatal("Expected a Mutation")
-		}
-		if cv.ID == id {
-			return change
-		}
+func readTransaction(t *testing.T, log *memstore.RLog) *state.Mutations {
+	mus, err := log.ReadTransaction()
+	if err != nil {
+		t.Fatalf("ReadTransaction() failed: %v", err)
 	}
-	t.Fatalf("Expected a change for id: %v", id)
-	panic("should not reach here")
+	return mus
 }
 
-func expectDirEquals(t *testing.T, actual, expected []storage.DEntry) {
-	if len(actual) != len(expected) {
-		t.Fatalf("Expected Dir to have %v refs, but had %v", len(expected), len(actual))
+func processTransaction(t *testing.T, processor reqProcessor, mus *state.Mutations, numChanges int) []watch.Change {
+	changes, err := processor.processTransaction(mus)
+	if err != nil {
+		t.Fatalf("processTransaction() failed: %v", err)
 	}
-	for i, e := range expected {
-		a := actual[i]
-		if a != e {
-			t.Fatalf("Expected Dir entry %v to be %v, but was %v", i, e, a)
-		}
+	if len(changes) != numChanges {
+		t.Fatalf("Expected transaction to have %d changes, got %d", numChanges, len(changes))
 	}
+	return changes
 }
diff --git a/services/store/memstore/watch/testing/util.go b/services/store/memstore/watch/testing/util.go
index b3e8037..1e34a32 100644
--- a/services/store/memstore/watch/testing/util.go
+++ b/services/store/memstore/watch/testing/util.go
@@ -3,6 +3,7 @@
 import (
 	"errors"
 	"sync"
+	"testing"
 	"time"
 
 	"veyron/services/store/raw"
@@ -10,7 +11,9 @@
 	"veyron2/ipc"
 	"veyron2/naming"
 	"veyron2/security"
+	"veyron2/services/store"
 	"veyron2/services/watch"
+	"veyron2/storage"
 )
 
 var (
@@ -144,17 +147,18 @@
 	s.ctx.Cancel()
 }
 
-func WatchRaw(id security.PublicID, watchFn func(ipc.ServerContext, raw.Request, raw.StoreServiceWatchStream) error, req raw.Request) raw.StoreWatchStream {
+func watchImpl(id security.PublicID, watchFn func(ipc.ServerContext, *watcherServiceWatchStream) error) *watcherWatchStream {
 	mu := &sync.Mutex{}
 	ctx := NewCancellableContext(id)
 	c := make(chan watch.ChangeBatch, 1)
 	errc := make(chan error, 1)
 	go func() {
-		err := watchFn(ctx, req, &watcherServiceWatchStream{
+		stream := &watcherServiceWatchStream{
 			mu:     mu,
 			ctx:    ctx,
 			output: c,
-		})
+		}
+		err := watchFn(ctx, stream)
 		mu.Lock()
 		defer mu.Unlock()
 		ctx.Cancel()
@@ -168,3 +172,196 @@
 		err:   errc,
 	}
 }
+
+func WatchRaw(id security.PublicID, watchFn func(ipc.ServerContext, raw.Request, raw.StoreServiceWatchStream) error, req raw.Request) raw.StoreWatchStream {
+	return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
+		return watchFn(ctx, req, stream)
+	})
+}
+
+func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req watch.GlobRequest) watch.GlobWatcherWatchGlobStream {
+	return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
+		return watchFn(ctx, req, stream)
+	})
+}
+
+func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req watch.GlobRequest) watch.GlobWatcherWatchGlobStream {
+	return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
+		return watchFn(ctx, path, req, stream)
+	})
+}
+
+func ExpectInitialStateSkipped(t *testing.T, change watch.Change) {
+	if change.Name != "" {
+		t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
+	}
+	if change.State != watch.InitialStateSkipped {
+		t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
+	}
+	if len(change.ResumeMarker) != 0 {
+		t.Fatalf("Expect no ResumeMarker but was: %v", change.ResumeMarker)
+	}
+}
+
+func ExpectEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
+	change := findEntry(t, changes, name)
+	if change.State != watch.Exists {
+		t.Fatalf("Expected name to exist: %v", name)
+	}
+	cv, ok := change.Value.(*storage.Entry)
+	if !ok {
+		t.Fatal("Expected an Entry")
+	}
+	if cv.Stat.ID != id {
+		t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
+	}
+	if cv.Value != value {
+		t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
+	}
+}
+
+func ExpectEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
+	change := findEntry(t, changes, name)
+	if change.State != watch.DoesNotExist {
+		t.Fatalf("Expected name to not exist: %v", name)
+	}
+	if change.Value != nil {
+		t.Fatal("Expected entry to be nil")
+	}
+}
+
+func ExpectServiceEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
+	change := findEntry(t, changes, name)
+	if change.State != watch.Exists {
+		t.Fatalf("Expected name to exist: %v", name)
+	}
+	cv, ok := change.Value.(*store.Entry)
+	if !ok {
+		t.Fatal("Expected a service Entry")
+	}
+	if cv.Stat.ID != id {
+		t.Fatalf("Expected ID to be %v, but was: %v", id, cv.Stat.ID)
+	}
+	if cv.Value != value {
+		t.Fatalf("Expected Value to be %v, but was: %v", value, cv.Value)
+	}
+}
+
+func ExpectServiceEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
+	change := findEntry(t, changes, name)
+	if change.State != watch.DoesNotExist {
+		t.Fatalf("Expected name to not exist: %v", name)
+	}
+	if change.Value != nil {
+		t.Fatal("Expected entry to be nil")
+	}
+}
+
+func findEntry(t *testing.T, changes []watch.Change, name string) watch.Change {
+	for _, change := range changes {
+		if change.Name == name {
+			return change
+		}
+	}
+	t.Fatalf("Expected a change for name: %v", name)
+	panic("Should not reach here")
+}
+
+var (
+	EmptyDir = []storage.DEntry{}
+)
+
+func DirOf(name string, id storage.ID) []storage.DEntry {
+	return []storage.DEntry{storage.DEntry{
+		Name: name,
+		ID:   id,
+	}}
+}
+
+func ExpectMutationExists(t *testing.T, changes []watch.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
+	change := findMutation(t, changes, id)
+	if change.State != watch.Exists {
+		t.Fatalf("Expected id to exist: %v", id)
+	}
+	cv := change.Value.(*raw.Mutation)
+	if cv.PriorVersion != pre {
+		t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
+	}
+	if cv.Version != post {
+		t.Fatalf("Expected Version to be %v, but was: %v", post, cv.Version)
+	}
+	if cv.IsRoot != isRoot {
+		t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
+	}
+	if cv.Value != value {
+		t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
+	}
+	expectDirEquals(t, cv.Dir, dir)
+}
+
+func ExpectMutationDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID, pre storage.Version, isRoot bool) {
+	change := findMutation(t, changes, id)
+	if change.State != watch.DoesNotExist {
+		t.Fatalf("Expected id to not exist: %v", id)
+	}
+	cv := change.Value.(*raw.Mutation)
+	if cv.PriorVersion != pre {
+		t.Fatalf("Expected PriorVersion to be %v, but was: %v", pre, cv.PriorVersion)
+	}
+	if cv.Version != storage.NoVersion {
+		t.Fatalf("Expected Version to be NoVersion, but was: %v", cv.Version)
+	}
+	if cv.IsRoot != isRoot {
+		t.Fatalf("Expected IsRoot to be: %v, but was: %v", isRoot, cv.IsRoot)
+	}
+	if cv.Value != nil {
+		t.Fatal("Expected Value to be nil")
+	}
+	if cv.Dir != nil {
+		t.Fatal("Expected Dir to be nil")
+	}
+}
+
+func ExpectMutationExistsNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID, value string) {
+	change := findMutation(t, changes, id)
+	if change.State != watch.Exists {
+		t.Fatalf("Expected id to exist: %v", id)
+	}
+	cv := change.Value.(*raw.Mutation)
+	if cv.Value != value {
+		t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
+	}
+}
+
+func ExpectMutationDoesNotExistNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID) {
+	change := findMutation(t, changes, id)
+	if change.State != watch.DoesNotExist {
+		t.Fatalf("Expected id to not exist: %v", id)
+	}
+}
+
+func findMutation(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
+	for _, change := range changes {
+		cv, ok := change.Value.(*raw.Mutation)
+		if !ok {
+			t.Fatal("Expected a Mutation")
+		}
+		if cv.ID == id {
+			return change
+		}
+	}
+	t.Fatalf("Expected a change for id: %v", id)
+	panic("Should not reach here")
+}
+
+func expectDirEquals(t *testing.T, actual, expected []storage.DEntry) {
+	if len(actual) != len(expected) {
+		t.Fatalf("Expected Dir to have %v refs, but had %v", len(expected), len(actual))
+	}
+	for i, e := range expected {
+		a := actual[i]
+		if a != e {
+			t.Fatalf("Expected Dir entry %v to be %v, but was %v", i, e, a)
+		}
+	}
+}
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index ac5b10e..5956110 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -55,7 +55,9 @@
 }
 
 // WatchRaw returns a stream of all changes.
-func (w *watcher) WatchRaw(ctx ipc.ServerContext, req raw.Request, stream raw.StoreServiceWatchStream) error {
+func (w *watcher) WatchRaw(ctx ipc.ServerContext, req raw.Request,
+	stream raw.StoreServiceWatchStream) error {
+
 	processor, err := newRawProcessor(ctx.RemoteID())
 	if err != nil {
 		return err
@@ -66,12 +68,18 @@
 // WatchGlob returns a stream of changes that match a pattern.
 func (w *watcher) WatchGlob(ctx ipc.ServerContext, path storage.PathName,
 	req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
-	return verror.Internalf("WatchGlob not yet implemented")
+
+	processor, err := newGlobProcessor(ctx.RemoteID(), path, req.Pattern)
+	if err != nil {
+		return err
+	}
+	return w.Watch(ctx, processor, req.ResumeMarker, stream)
 }
 
 // WatchQuery returns a stream of changes that satisfy a query.
 func (w *watcher) WatchQuery(ctx ipc.ServerContext, path storage.PathName,
 	req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
+
 	return verror.Internalf("WatchQuery not yet implemented")
 }
 
@@ -86,7 +94,9 @@
 // sending changes to the specified watch stream. If the call is cancelled or
 // otherwise closed early, Watch will terminate and return an error.
 // Watch implements the service.Watcher interface.
-func (w *watcher) Watch(ctx ipc.ServerContext, processor reqProcessor, resumeMarker watch.ResumeMarker, stream WatchStream) error {
+func (w *watcher) Watch(ctx ipc.ServerContext, processor reqProcessor,
+	resumeMarker watch.ResumeMarker, stream WatchStream) error {
+
 	// Closing cancel terminates processRequest.
 	cancel := make(chan struct{})
 	defer close(cancel)
@@ -113,7 +123,9 @@
 	return ErrWatchClosed
 }
 
-func (w *watcher) processRequest(cancel <-chan struct{}, processor reqProcessor, resumeMarker watch.ResumeMarker, stream WatchStream) error {
+func (w *watcher) processRequest(cancel <-chan struct{}, processor reqProcessor,
+	resumeMarker watch.ResumeMarker, stream WatchStream) error {
+
 	log, err := memstore.OpenLog(w.dbName, true)
 	if err != nil {
 		return err
@@ -147,11 +159,12 @@
 		return err
 	}
 	st := store.State
+	// Save timestamp as processState may modify st.
+	timestamp := st.Timestamp()
 	changes, err := processor.processState(st)
 	if err != nil {
 		return err
 	}
-	timestamp := st.Timestamp()
 	if send, err := filter.shouldProcessChanges(timestamp); err != nil {
 		return err
 	} else if send {
@@ -166,11 +179,12 @@
 		if err != nil {
 			return err
 		}
+		// Save timestamp as processTransaction may modify mu.
+		timestamp := mu.Timestamp
 		changes, err = processor.processTransaction(mu)
 		if err != nil {
 			return err
 		}
-		timestamp := mu.Timestamp
 		if send, err := filter.shouldProcessChanges(timestamp); err != nil {
 			return err
 		} else if send {
diff --git a/services/store/memstore/watch/watcher_test.go b/services/store/memstore/watch/watcher_test.go
index 5679339..1ce5af6 100644
--- a/services/store/memstore/watch/watcher_test.go
+++ b/services/store/memstore/watch/watcher_test.go
@@ -1,6 +1,7 @@
 package watch
 
 import (
+	"bytes"
 	"io"
 	"sync"
 	"testing"
@@ -10,10 +11,11 @@
 	watchtesting "veyron/services/store/memstore/watch/testing"
 	"veyron/services/store/raw"
 
+	"veyron2/services/watch"
 	"veyron2/storage"
 )
 
-func TestWatch(t *testing.T) {
+func TestWatchRaw(t *testing.T) {
 	// Create a new store.
 	dbName, st, cleanup := createStore(t)
 	defer cleanup()
@@ -43,7 +45,7 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id1, storage.NoVersion, post1, true, "val1", empty)
+	watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post1, true, "val1", watchtesting.EmptyDir)
 
 	// Put /a
 	tr = memstore.NewTransaction()
@@ -69,8 +71,63 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id1, pre1, post1, true, "val1", dir("a", id2))
-	expectExists(t, changes, id2, storage.NoVersion, post2, false, "val2", empty)
+	watchtesting.ExpectMutationExists(t, changes, id1, pre1, post1, true, "val1", watchtesting.DirOf("a", id2))
+	watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post2, false, "val2", watchtesting.EmptyDir)
+}
+
+func TestWatchGlob(t *testing.T) {
+	// Create a new store.
+	dbName, st, cleanup := createStore(t)
+	defer cleanup()
+
+	// Create a new watcher.
+	w, cleanup := createWatcher(t, dbName)
+	defer cleanup()
+
+	// Put /
+	tr := memstore.NewTransaction()
+	id1 := put(t, st, tr, "/", "val1")
+	commit(t, tr)
+
+	// Start a watch request.
+	path := storage.ParsePath("/")
+	req := watch.GlobRequest{Pattern: "..."}
+	ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
+
+	// Check that watch detects the changes in the first transaction.
+	cb, err := ws.Recv()
+	if err != nil {
+		t.Error("Recv() failed: %v", err)
+	}
+	changes := cb.Changes
+	change := changes[0]
+	if change.Continued {
+		t.Error("Expected change to be the last in this transaction")
+	}
+	watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+
+	// Put /a
+	tr = memstore.NewTransaction()
+	id2 := put(t, st, tr, "/a", "val2")
+	commit(t, tr)
+
+	// Check that watch detects the changes in the second transaction.
+
+	cb, err = ws.Recv()
+	if err != nil {
+		t.Error("Recv() failed: %v", err)
+	}
+	changes = cb.Changes
+	change = changes[0]
+	if !change.Continued {
+		t.Error("Expected change to continue the transaction")
+	}
+	change = changes[1]
+	if change.Continued {
+		t.Error("Expected change to be the last in this transaction")
+	}
+	watchtesting.ExpectEntryExists(t, changes, "", id1, "val1")
+	watchtesting.ExpectEntryExists(t, changes, "a", id2, "val2")
 }
 
 func TestWatchCancellation(t *testing.T) {
@@ -213,7 +270,7 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id1, storage.NoVersion, post11, true, "val1", empty)
+	watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
 
 	cb, err = ws.Recv()
 	if err != nil {
@@ -228,8 +285,8 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
-	expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
+	watchtesting.ExpectMutationExists(t, changes, id1, pre21, post21, true, "val1", watchtesting.DirOf("a", id2))
+	watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post22, false, "val2", watchtesting.EmptyDir)
 }
 
 func TestTransactionResumeMarker(t *testing.T) {
@@ -288,7 +345,7 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id1, storage.NoVersion, post11, true, "val1", empty)
+	watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
 
 	cb, err = ws.Recv()
 	if err != nil {
@@ -304,8 +361,8 @@
 		t.Error("Expected change to be the last in this transaction")
 	}
 	resumeMarker2 := change.ResumeMarker
-	expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
-	expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
+	watchtesting.ExpectMutationExists(t, changes, id1, pre21, post21, true, "val1", watchtesting.DirOf("a", id2))
+	watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post22, false, "val2", watchtesting.EmptyDir)
 
 	// Cancel the watch request.
 	ws.Cancel()
@@ -329,8 +386,8 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id1, pre21, post21, true, "val1", dir("a", id2))
-	expectExists(t, changes, id2, storage.NoVersion, post22, false, "val2", empty)
+	watchtesting.ExpectMutationExists(t, changes, id1, pre21, post21, true, "val1", watchtesting.DirOf("a", id2))
+	watchtesting.ExpectMutationExists(t, changes, id2, storage.NoVersion, post22, false, "val2", watchtesting.EmptyDir)
 }
 
 func TestNowResumeMarker(t *testing.T) {
@@ -377,7 +434,7 @@
 	}
 	changes := cb.Changes
 	change := changes[0]
-	expectInitialStateSkipped(t, change)
+	watchtesting.ExpectInitialStateSkipped(t, change)
 
 	// Check that watch detects the changes in the third transaction.
 	cb, err = ws.Recv()
@@ -393,8 +450,8 @@
 	if change.Continued {
 		t.Error("Expected change to be the last in this transaction")
 	}
-	expectExists(t, changes, id2, pre32, post32, false, "val2", dir("b", id3))
-	expectExists(t, changes, id3, storage.NoVersion, post33, false, "val3", empty)
+	watchtesting.ExpectMutationExists(t, changes, id2, pre32, post32, false, "val2", watchtesting.DirOf("b", id3))
+	watchtesting.ExpectMutationExists(t, changes, id3, storage.NoVersion, post33, false, "val3", watchtesting.EmptyDir)
 }
 
 func TestUnknownResumeMarkers(t *testing.T) {
@@ -431,3 +488,46 @@
 		t.Errorf("Unexpected error: %v", err)
 	}
 }
+
+func TestConsistentResumeMarkers(t *testing.T) {
+	// Create a new store.
+	dbName, st, cleanup := createStore(t)
+	defer cleanup()
+
+	// Create a new watcher.
+	w, cleanup := createWatcher(t, dbName)
+	defer cleanup()
+
+	// Put /
+	tr := memstore.NewTransaction()
+	put(t, st, tr, "/", "val1")
+	commit(t, tr)
+
+	// Start a watch request.
+	path := storage.ParsePath("/")
+	req := watch.GlobRequest{Pattern: "..."}
+	ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
+
+	cb, err := ws.Recv()
+	if err != nil {
+		t.Error("Recv() failed: %v", err)
+	}
+	changes := cb.Changes
+	change := changes[0]
+	// Save the ResumeMarker of the change.
+	r := change.ResumeMarker
+
+	// Start another watch request.
+	ws = watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
+
+	cb, err = ws.Recv()
+	if err != nil {
+		t.Error("Recv() failed: %v", err)
+	}
+	changes = cb.Changes
+	change = changes[0]
+	// Expect the same ResumeMarker.
+	if !bytes.Equal(r, change.ResumeMarker) {
+		t.Error("Inconsistent ResumeMarker.")
+	}
+}
diff --git a/services/store/server/object.go b/services/store/server/object.go
index 5964669..f29112e 100644
--- a/services/store/server/object.go
+++ b/services/store/server/object.go
@@ -214,14 +214,46 @@
 	return nil
 }
 
+// ChangeBatchStream is an interface for streaming responses of type ChangeBatch.
+type ChangeBatchStream interface {
+	// Send places the item onto the output stream, blocking if there is no buffer
+	// space available.
+	Send(watch.ChangeBatch) error
+}
+
+// entryTransformStream implements GlobWatcherServiceWatchGlobStream and
+// QueryWatcherServiceWatchQueryStream. It wraps a ChangeBatchStream,
+// transforming the value in each change from *storage.Entry to *store.Entry.
+type entryTransformStream struct {
+	delegate ChangeBatchStream
+}
+
+func (s *entryTransformStream) Send(cb watch.ChangeBatch) error {
+	// Copy and transform the ChangeBatch.
+	changes := cb.Changes
+	changesCp := make([]watch.Change, len(changes))
+	cbCp := watch.ChangeBatch{changesCp}
+	for i, changeCp := range changes {
+		if changes[i].Value != nil {
+			entry := changes[i].Value.(*storage.Entry)
+			serviceEntry := makeServiceEntry(entry)
+			changeCp.Value = &serviceEntry
+		}
+		changesCp[i] = changeCp
+	}
+	return s.delegate.Send(cbCp)
+}
+
 // WatchGlob returns a stream of changes that match a pattern.
 func (o *object) WatchGlob(ctx ipc.ServerContext, req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
 	path := storage.ParsePath(o.name)
+	stream = &entryTransformStream{stream}
 	return o.server.watcher.WatchGlob(ctx, path, req, stream)
 }
 
 // WatchQuery returns a stream of changes that satisfy a query.
 func (o *object) WatchQuery(ctx ipc.ServerContext, req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
 	path := storage.ParsePath(o.name)
+	stream = &entryTransformStream{stream}
 	return o.server.watcher.WatchQuery(ctx, path, req, stream)
 }
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index 157fe1b..8a0ca7f 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -132,38 +132,6 @@
 	return s, closer
 }
 
-func expectExists(t *testing.T, changes []watch.Change, id storage.ID, value string) {
-	change := findChange(t, changes, id)
-	if change.State != watch.Exists {
-		t.Fatalf("Expected id to exist: %v", id)
-	}
-	cv := change.Value.(*raw.Mutation)
-	if cv.Value != value {
-		t.Fatalf("Expected Value to be: %v, but was: %v", value, cv.Value)
-	}
-}
-
-func expectDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID) {
-	change := findChange(t, changes, id)
-	if change.State != watch.DoesNotExist {
-		t.Fatalf("Expected id to not exist: %v", id)
-	}
-}
-
-func findChange(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
-	for _, change := range changes {
-		cv, ok := change.Value.(*raw.Mutation)
-		if !ok {
-			t.Fatal("Expected a Mutation")
-		}
-		if cv.ID == id {
-			return change
-		}
-	}
-	t.Fatalf("Expected a change for id: %v", id)
-	panic("should not reach here")
-}
-
 func TestPutGetRemoveRoot(t *testing.T) {
 	s, c := newServer()
 	defer c()
@@ -390,7 +358,7 @@
 		if change.Continued {
 			t.Error("Expected change to be the last in this transaction")
 		}
-		expectExists(t, []watch.Change{change}, id1, value1)
+		watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
 	}
 
 	path2 := "/a"
@@ -429,8 +397,107 @@
 		if change.Continued {
 			t.Error("Expected change to be the last in this transaction")
 		}
-		expectExists(t, changes, id1, value1)
-		expectExists(t, changes, id2, value2)
+		watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
+		watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id2, value2)
+	}
+}
+
+func TestWatchGlob(t *testing.T) {
+	s, c := newServer()
+	defer c()
+
+	value1 := "v1"
+	var id1 storage.ID
+
+	o1 := s.lookupObject("/")
+	o2 := s.lookupObject("/a")
+
+	// Before the watch request has been made, commit a transaction that puts /.
+	{
+		tr := newTransaction()
+		if err := s.CreateTransaction(rootCtx, tr, nil); err != nil {
+			t.Errorf("Unexpected error: %s", err)
+		}
+		st, err := o1.Put(rootCtx, tr, value1)
+		if err != nil {
+			t.Errorf("Unexpected error: %s", err)
+		}
+		id1 = st.ID
+		if err := s.Commit(rootCtx, tr); err != nil {
+			t.Errorf("Unexpected error: %s", err)
+		}
+	}
+
+	// Start watch requests on / and /a.
+	req := watch.GlobRequest{Pattern: "..."}
+	ws1 := watchtesting.WatchGlob(rootPublicID, o1.WatchGlob, req)
+	ws2 := watchtesting.WatchGlob(rootPublicID, o2.WatchGlob, req)
+
+	// The watch on / should send a change on /.
+	{
+		cb, err := ws1.Recv()
+		if err != nil {
+			t.Error("Recv() failed: %v", err)
+		}
+		changes := cb.Changes
+		change := changes[0]
+		if change.Continued {
+			t.Error("Expected change to be the last in this transaction")
+		}
+		watchtesting.ExpectServiceEntryExists(t, changes, "", id1, value1)
+	}
+	// The watch on /a should send no change. The first change it sends is
+	// verified below.
+
+	value2 := "v2"
+	var id2 storage.ID
+
+	// Commit a second transaction that puts /a.
+	{
+		tr := newTransaction()
+		if err := s.CreateTransaction(rootCtx, tr, nil); err != nil {
+			t.Errorf("Unexpected error: %s", err)
+		}
+		st, err := o2.Put(rootCtx, tr, value2)
+		if err != nil {
+			t.Errorf("Unexpected error: %s", err)
+		}
+		id2 = st.ID
+		if err := s.Commit(rootCtx, tr); err != nil {
+			t.Errorf("Unexpected error: %s", err)
+		}
+	}
+
+	// The watch on / should send changes on / and /a.
+	{
+		cb, err := ws1.Recv()
+		if err != nil {
+			t.Error("Recv() failed: %v", err)
+		}
+		changes := cb.Changes
+		change := changes[0]
+		if !change.Continued {
+			t.Error("Expected change to continue the transaction")
+		}
+		change = changes[1]
+		if change.Continued {
+			t.Error("Expected change to be the last in this transaction")
+		}
+		watchtesting.ExpectServiceEntryExists(t, changes, "", id1, value1)
+		watchtesting.ExpectServiceEntryExists(t, changes, "a", id2, value2)
+	}
+	// The watch on /a should send a change on /a.
+	{
+		cb, err := ws2.Recv()
+		if err != nil {
+			t.Error("Recv() failed: %v", err)
+		}
+		changes := cb.Changes
+		change := changes[0]
+		if change.Continued {
+			t.Error("Expected change to be the last in this transaction")
+		}
+		watchtesting.ExpectServiceEntryExists(t, changes, "a", id2, value2)
 	}
 }
 
@@ -474,7 +541,7 @@
 		if change.Continued {
 			t.Error("Expected change to be the last in this transaction")
 		}
-		expectExists(t, []watch.Change{change}, id1, value1)
+		watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
 	}
 
 	path2 := "/a"
@@ -513,8 +580,8 @@
 		if change.Continued {
 			t.Error("Expected change to be the last in this transaction")
 		}
-		expectExists(t, changes, id1, value1)
-		expectExists(t, changes, id2, value2)
+		watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
+		watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id2, value2)
 	}
 
 	// Commit a third transaction that removes /a.
@@ -543,7 +610,7 @@
 		if change.Continued {
 			t.Error("Expected change to be the last in this transaction")
 		}
-		expectExists(t, changes, id1, value1)
+		watchtesting.ExpectMutationExistsNoVersionCheck(t, changes, id1, value1)
 	}
 
 	// Check that watch detects the garbage collection of /a.
@@ -557,7 +624,7 @@
 		if change.Continued {
 			t.Error("Expected change to be the last in this transaction")
 		}
-		expectDoesNotExist(t, changes, id2)
+		watchtesting.ExpectMutationDoesNotExistNoVersionCheck(t, changes, id2)
 	}
 }