veyron2/services/watch: Split types into separate pkg

Split the watch interface into two separate packages: one for the interface
itself, and one for the types.

This will allow implementations to use the generated types without
depending on the generated stubs.

Change-Id: I8b52a865a3682dfb86aed7c1e56d29b2c9a27f44
diff --git a/examples/boxes/android/src/boxesp2p/main.go b/examples/boxes/android/src/boxesp2p/main.go
index 34c2ccd..ef39c88 100644
--- a/examples/boxes/android/src/boxesp2p/main.go
+++ b/examples/boxes/android/src/boxesp2p/main.go
@@ -77,7 +77,7 @@
 	"veyron2/naming"
 	"veyron2/rt"
 	"veyron2/security"
-	iwatch "veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 	"veyron2/storage/vstore"
 	"veyron2/vom"
@@ -199,7 +199,7 @@
 
 	// Watch for any box updates from the store
 	go func() {
-		req := iwatch.GlobRequest{Pattern: "*"}
+		req := types.GlobRequest{Pattern: "*"}
 		stream, err := root.WatchGlob(ctx, req)
 		if err != nil {
 			panic(fmt.Errorf("Can't watch store: %s: %s", gs.storeEndpoint, err))
diff --git a/examples/stfortune/stfortune/main.go b/examples/stfortune/stfortune/main.go
index aff4ff4..b48462c 100644
--- a/examples/stfortune/stfortune/main.go
+++ b/examples/stfortune/stfortune/main.go
@@ -20,7 +20,7 @@
 	"veyron2/context"
 	"veyron2/naming"
 	"veyron2/rt"
-	iwatch "veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 	"veyron2/storage/vstore"
 	"veyron2/vom"
@@ -56,7 +56,7 @@
 	paths := []string{appPath, fortunePath(""), userPath("")}
 	for _, path := range paths {
 		abspath := naming.Join(storeAddress, path)
-		req := iwatch.GlobRequest{Pattern: ""}
+		req := types.GlobRequest{Pattern: ""}
 		stream, err := vstore.New().Bind(abspath).WatchGlob(ctx, req)
 		if err != nil {
 			log.Fatalf("WatchGlob %s failed: %v", abspath, err)
@@ -89,7 +89,7 @@
 	fmt.Printf("Running as a Watcher monitoring new fortunes under %s...\n", path)
 
 	abspath := naming.Join(storeAddress, path)
-	req := iwatch.GlobRequest{Pattern: "*"}
+	req := types.GlobRequest{Pattern: "*"}
 	stream, err := vstore.New().Bind(abspath).WatchGlob(ctx, req)
 	if err != nil {
 		log.Fatalf("watcher WatchGlob %s failed: %v", abspath, err)
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index 3fc5465..58fa467 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -15,6 +15,7 @@
 	"veyron2/context"
 	"veyron2/rt"
 	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/vlog"
 )
 
@@ -156,7 +157,7 @@
 
 // processChanges applies the batch of changes (object mutations) received from the Watch API.
 // The function grabs the write-lock to access the Log and DAG DBs.
-func (w *syncWatcher) processChanges(changes watch.ChangeBatch, syncTime int64) error {
+func (w *syncWatcher) processChanges(changes types.ChangeBatch, syncTime int64) error {
 	w.syncd.lock.Lock()
 	defer w.syncd.lock.Unlock()
 
@@ -181,7 +182,7 @@
 			// All LogValues belonging to the same transaction get the same timestamp.
 			time = w.curTxSyncTime
 		}
-		val := &LogValue{Mutation: *mu, SyncTime: time, Delete: ch.State == watch.DoesNotExist, Continued: ch.Continued}
+		val := &LogValue{Mutation: *mu, SyncTime: time, Delete: ch.State == types.DoesNotExist, Continued: ch.Continued}
 		vlog.VI(2).Infof("processChanges:: processing record %v, Tx %v", val, w.curTx)
 		if err := w.syncd.log.processWatchRecord(mu.ID, mu.Version, mu.PriorVersion, val, w.curTx); err != nil {
 			return fmt.Errorf("cannot process mutation: %#v: %s", ch, err)
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 0725b84..f6848d7 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -15,7 +15,7 @@
 	"veyron2/context"
 	"veyron2/ipc"
 	"veyron2/rt"
-	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 )
 
@@ -84,7 +84,7 @@
 
 func (s *fakeStream) RecvStream() interface {
 	Advance() bool
-	Value() watch.ChangeBatch
+	Value() types.ChangeBatch
 	Err() error
 } {
 	return s
@@ -118,7 +118,7 @@
 	return true
 }
 
-func (s *fakeStream) Value() watch.ChangeBatch {
+func (s *fakeStream) Value() types.ChangeBatch {
 	changes := getChangeBatch()
 
 	var lastCount byte
@@ -152,12 +152,12 @@
 // getChangeBatch returns a batch of store mutations used to simulate the Watch API.
 // The batch contains two transactions to verify both new-object creation and the
 // mutation of an existing object.
-func getChangeBatch() watch.ChangeBatch {
-	var batch watch.ChangeBatch
+func getChangeBatch() types.ChangeBatch {
+	var batch types.ChangeBatch
 
-	batch.Changes = []watch.Change{
+	batch.Changes = []types.Change{
 		// 1st transaction: create "/" and "/a" and "/a/b" as 3 new objects (prior versions are 0).
-		watch.Change{
+		types.Change{
 			Name:  "",
 			State: 0,
 			Value: &raw.Mutation{
@@ -177,7 +177,7 @@
 			ResumeMarker: nil,
 			Continued:    true,
 		},
-		watch.Change{
+		types.Change{
 			Name:  "",
 			State: 0,
 			Value: &raw.Mutation{
@@ -197,7 +197,7 @@
 			ResumeMarker: nil,
 			Continued:    true,
 		},
-		watch.Change{
+		types.Change{
 			Name:  "",
 			State: 0,
 			Value: &raw.Mutation{
@@ -213,7 +213,7 @@
 		},
 
 		// 2nd transaction: create "/a/c" as a new object, which also updates "a" (its "Dir" field).
-		watch.Change{
+		types.Change{
 			Name:  "",
 			State: 0,
 			Value: &raw.Mutation{
@@ -238,7 +238,7 @@
 			ResumeMarker: nil,
 			Continued:    true,
 		},
-		watch.Change{
+		types.Change{
 			Name:  "",
 			State: 0,
 			Value: &raw.Mutation{
@@ -254,7 +254,7 @@
 		},
 
 		// 3rd transaction: remove "/a/b" which updates "a" (its "Dir" field) and deletes "b".
-		watch.Change{
+		types.Change{
 			Name:  "",
 			State: 0,
 			Value: &raw.Mutation{
@@ -274,9 +274,9 @@
 			ResumeMarker: nil,
 			Continued:    true,
 		},
-		watch.Change{
+		types.Change{
 			Name:  "",
-			State: watch.DoesNotExist,
+			State: types.DoesNotExist,
 			Value: &raw.Mutation{
 				ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb,
 					0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7},
diff --git a/services/store/memstore/blackbox/util.go b/services/store/memstore/blackbox/util.go
index 54b500e..c494310 100644
--- a/services/store/memstore/blackbox/util.go
+++ b/services/store/memstore/blackbox/util.go
@@ -12,7 +12,7 @@
 	"veyron/services/store/raw"
 
 	"veyron2/security"
-	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 )
 
@@ -168,7 +168,7 @@
 	}
 }
 
-func Mutations(changes []watch.Change) []raw.Mutation {
+func Mutations(changes []types.Change) []raw.Mutation {
 	mutations := make([]raw.Mutation, len(changes))
 	for i, change := range changes {
 		mutations[i] = *(change.Value.(*raw.Mutation))
diff --git a/services/store/memstore/testing/util.go b/services/store/memstore/testing/util.go
index 4fc237e..293bc75 100644
--- a/services/store/memstore/testing/util.go
+++ b/services/store/memstore/testing/util.go
@@ -14,6 +14,7 @@
 	"veyron2/rt"
 	"veyron2/security"
 	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 )
 
@@ -153,10 +154,10 @@
 type watcherServiceWatchStreamSender struct {
 	mu     *sync.Mutex
 	ctx    ipc.ServerContext
-	output chan<- watch.ChangeBatch
+	output chan<- types.ChangeBatch
 }
 
-func (s *watcherServiceWatchStreamSender) Send(cb watch.ChangeBatch) error {
+func (s *watcherServiceWatchStreamSender) Send(cb types.ChangeBatch) error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	select {
@@ -173,7 +174,7 @@
 }
 
 func (s *watcherServiceWatchStream) SendStream() interface {
-	Send(cb watch.ChangeBatch) error
+	Send(cb types.ChangeBatch) error
 } {
 	return s
 }
@@ -182,8 +183,8 @@
 // watcherWatchStream implements watch.WatcherWatchStream.
 type watcherWatchStream struct {
 	ctx   *FakeServerContext
-	value watch.ChangeBatch
-	input <-chan watch.ChangeBatch
+	value types.ChangeBatch
+	input <-chan types.ChangeBatch
 	err   <-chan error
 }
 
@@ -193,7 +194,7 @@
 	return ok
 }
 
-func (s *watcherWatchStream) Value() watch.ChangeBatch {
+func (s *watcherWatchStream) Value() types.ChangeBatch {
 	return s.value
 }
 
@@ -212,7 +213,7 @@
 
 func (s *watcherWatchStream) RecvStream() interface {
 	Advance() bool
-	Value() watch.ChangeBatch
+	Value() types.ChangeBatch
 	Err() error
 } {
 	return s
@@ -221,7 +222,7 @@
 func watchImpl(id security.PublicID, watchFn func(ipc.ServerContext, *watcherServiceWatchStream) error) *watcherWatchStream {
 	mu := &sync.Mutex{}
 	ctx := NewFakeServerContext(id)
-	c := make(chan watch.ChangeBatch, 1)
+	c := make(chan types.ChangeBatch, 1)
 	errc := make(chan error, 1)
 	go func() {
 		stream := &watcherServiceWatchStream{
@@ -252,23 +253,23 @@
 	})
 }
 
-func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req watch.GlobRequest) watch.GlobWatcherWatchGlobCall {
+func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, types.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req types.GlobRequest) watch.GlobWatcherWatchGlobCall {
 	return watchImpl(id, func(ctx ipc.ServerContext, iterator *watcherServiceWatchStream) error {
 		return watchFn(ctx, req, iterator)
 	})
 }
 
-func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req watch.GlobRequest) watch.GlobWatcherWatchGlobCall {
+func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, types.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req types.GlobRequest) watch.GlobWatcherWatchGlobCall {
 	return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
 		return watchFn(ctx, path, req, stream)
 	})
 }
 
-func ExpectInitialStateSkipped(t *testing.T, change watch.Change) {
+func ExpectInitialStateSkipped(t *testing.T, change types.Change) {
 	if change.Name != "" {
 		t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
 	}
-	if change.State != watch.InitialStateSkipped {
+	if change.State != types.InitialStateSkipped {
 		t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
 	}
 	if len(change.ResumeMarker) != 0 {
@@ -276,9 +277,9 @@
 	}
 }
 
-func ExpectEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
+func ExpectEntryExists(t *testing.T, changes []types.Change, name string, id storage.ID, value string) {
 	change := findEntry(t, changes, name)
-	if change.State != watch.Exists {
+	if change.State != types.Exists {
 		t.Fatalf("Expected name to exist: %v", name)
 	}
 	cv, ok := change.Value.(*storage.Entry)
@@ -293,9 +294,9 @@
 	}
 }
 
-func ExpectEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
+func ExpectEntryDoesNotExist(t *testing.T, changes []types.Change, name string) {
 	change := findEntry(t, changes, name)
-	if change.State != watch.DoesNotExist {
+	if change.State != types.DoesNotExist {
 		t.Fatalf("Expected name to not exist: %v", name)
 	}
 	if change.Value != nil {
@@ -303,9 +304,9 @@
 	}
 }
 
-func ExpectServiceEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
+func ExpectServiceEntryExists(t *testing.T, changes []types.Change, name string, id storage.ID, value string) {
 	change := findEntry(t, changes, name)
-	if change.State != watch.Exists {
+	if change.State != types.Exists {
 		t.Fatalf("Expected name to exist: %v", name)
 	}
 	cv, ok := change.Value.(*storage.Entry)
@@ -320,9 +321,9 @@
 	}
 }
 
-func ExpectServiceEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
+func ExpectServiceEntryDoesNotExist(t *testing.T, changes []types.Change, name string) {
 	change := findEntry(t, changes, name)
-	if change.State != watch.DoesNotExist {
+	if change.State != types.DoesNotExist {
 		t.Fatalf("Expected name to not exist: %v", name)
 	}
 	if change.Value != nil {
@@ -330,7 +331,7 @@
 	}
 }
 
-func findEntry(t *testing.T, changes []watch.Change, name string) watch.Change {
+func findEntry(t *testing.T, changes []types.Change, name string) types.Change {
 	for _, change := range changes {
 		if change.Name == name {
 			return change
@@ -351,9 +352,9 @@
 	}}
 }
 
-func ExpectMutationExists(t *testing.T, changes []watch.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
+func ExpectMutationExists(t *testing.T, changes []types.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
 	change := findMutation(t, changes, id)
-	if change.State != watch.Exists {
+	if change.State != types.Exists {
 		t.Fatalf("Expected id to exist: %v", id)
 	}
 	cv := change.Value.(*raw.Mutation)
@@ -372,9 +373,9 @@
 	expectDirEquals(t, cv.Dir, dir)
 }
 
-func ExpectMutationDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID, pre storage.Version, isRoot bool) {
+func ExpectMutationDoesNotExist(t *testing.T, changes []types.Change, id storage.ID, pre storage.Version, isRoot bool) {
 	change := findMutation(t, changes, id)
-	if change.State != watch.DoesNotExist {
+	if change.State != types.DoesNotExist {
 		t.Fatalf("Expected id to not exist: %v", id)
 	}
 	cv := change.Value.(*raw.Mutation)
@@ -395,9 +396,9 @@
 	}
 }
 
-func ExpectMutationExistsNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID, value string) {
+func ExpectMutationExistsNoVersionCheck(t *testing.T, changes []types.Change, id storage.ID, value string) {
 	change := findMutation(t, changes, id)
-	if change.State != watch.Exists {
+	if change.State != types.Exists {
 		t.Fatalf("Expected id to exist: %v", id)
 	}
 	cv := change.Value.(*raw.Mutation)
@@ -406,14 +407,14 @@
 	}
 }
 
-func ExpectMutationDoesNotExistNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID) {
+func ExpectMutationDoesNotExistNoVersionCheck(t *testing.T, changes []types.Change, id storage.ID) {
 	change := findMutation(t, changes, id)
-	if change.State != watch.DoesNotExist {
+	if change.State != types.DoesNotExist {
 		t.Fatalf("Expected id to not exist: %v", id)
 	}
 }
 
-func findMutation(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
+func findMutation(t *testing.T, changes []types.Change, id storage.ID) types.Change {
 	for _, change := range changes {
 		cv, ok := change.Value.(*raw.Mutation)
 		if !ok {
diff --git a/services/store/memstore/watch/glob_processor.go b/services/store/memstore/watch/glob_processor.go
index 6254a17..500e256 100644
--- a/services/store/memstore/watch/glob_processor.go
+++ b/services/store/memstore/watch/glob_processor.go
@@ -5,7 +5,7 @@
 	"veyron/services/store/memstore/state"
 
 	"veyron2/security"
-	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 )
 
@@ -37,7 +37,7 @@
 	}, nil
 }
 
-func (p *globProcessor) processState(st *state.State) ([]watch.Change, error) {
+func (p *globProcessor) processState(st *state.State) ([]types.Change, error) {
 	// Check that the initial state has not already been processed.
 	if p.hasProcessedState {
 		return nil, errInitialStateAlreadyProcessed
@@ -53,15 +53,15 @@
 	p.st = st
 	p.matches = matches
 
-	var changes []watch.Change
+	var changes []types.Change
 
 	// Create a change for every matching name.
 	for name, id := range matches {
 		cell := sn.Find(id)
 		entry := cell.GetEntry()
-		change := watch.Change{
+		change := types.Change{
 			Name:  name,
-			State: watch.Exists,
+			State: types.Exists,
 			Value: entry,
 		}
 		// TODO(tilaks): don't clone change.
@@ -71,7 +71,7 @@
 	return changes, nil
 }
 
-func (p *globProcessor) processTransaction(mus *state.Mutations) ([]watch.Change, error) {
+func (p *globProcessor) processTransaction(mus *state.Mutations) ([]types.Change, error) {
 	// Ensure that the initial state has been processed.
 	if !p.hasProcessedState {
 		return nil, errInitialStateNotProcessed
@@ -90,15 +90,15 @@
 	}
 	p.matches = newMatches
 
-	var changes []watch.Change
+	var changes []types.Change
 
 	removed, updated := diffMatches(previousMatches, newMatches, mus.Delta)
 
 	// Create a change for every matching name that was removed.
 	for name := range removed {
-		change := watch.Change{
+		change := types.Change{
 			Name:  name,
-			State: watch.DoesNotExist,
+			State: types.DoesNotExist,
 		}
 		// TODO(tilaks): don't clone change
 		changes = append(changes, change)
@@ -109,9 +109,9 @@
 		id := newMatches[name]
 		cell := sn.Find(id)
 		entry := cell.GetEntry()
-		change := watch.Change{
+		change := types.Change{
 			Name:  name,
-			State: watch.Exists,
+			State: types.Exists,
 			Value: entry,
 		}
 		// TODO(tilaks): don't clone change.
diff --git a/services/store/memstore/watch/processor.go b/services/store/memstore/watch/processor.go
index ce3e07e..a3779e6 100644
--- a/services/store/memstore/watch/processor.go
+++ b/services/store/memstore/watch/processor.go
@@ -2,7 +2,7 @@
 
 import (
 	"veyron/services/store/memstore/state"
-	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/verror"
 )
 
@@ -21,10 +21,10 @@
 	// originally created the initial state (e.g. in the case of compress), but
 	// are sufficient to re-construct the state viewable within the request.
 	// processState may modify its input.
-	processState(st *state.State) ([]watch.Change, error)
+	processState(st *state.State) ([]types.Change, error)
 
 	// processTransaction returns the set of changes made in some transaction.
 	// The changes are returned in no specific order.
 	// processTransaction may modify its input.
-	processTransaction(mu *state.Mutations) ([]watch.Change, error)
+	processTransaction(mu *state.Mutations) ([]types.Change, error)
 }
diff --git a/services/store/memstore/watch/raw_processor.go b/services/store/memstore/watch/raw_processor.go
index 962e0bd..3de157d 100644
--- a/services/store/memstore/watch/raw_processor.go
+++ b/services/store/memstore/watch/raw_processor.go
@@ -6,7 +6,7 @@
 	"veyron/services/store/raw"
 
 	"veyron2/security"
-	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 	"veyron2/verror"
 )
@@ -45,7 +45,7 @@
 	}, nil
 }
 
-func (p *rawProcessor) processState(st *state.State) ([]watch.Change, error) {
+func (p *rawProcessor) processState(st *state.State) ([]types.Change, error) {
 	// Check that the initial state has not already been processed.
 	if p.hasProcessedState {
 		return nil, errInitialStateAlreadyProcessed
@@ -60,7 +60,7 @@
 	}
 	p.rootID = rootID
 
-	var changes []watch.Change
+	var changes []types.Change
 
 	// Create a change for each id in the state. In each change, the object
 	// exists, has no PriorVersion, has the Version of the new cell, and
@@ -85,8 +85,8 @@
 			Value:        cell.Value,
 			Dir:          flattenDir(refs.FlattenDir(cell.Dir)),
 		}
-		change := watch.Change{
-			State: watch.Exists,
+		change := types.Change{
+			State: types.Exists,
 			Value: value,
 		}
 		// TODO(tilaks): don't clone change
@@ -95,7 +95,7 @@
 	return changes, nil
 }
 
-func (p *rawProcessor) processTransaction(mus *state.Mutations) ([]watch.Change, error) {
+func (p *rawProcessor) processTransaction(mus *state.Mutations) ([]types.Change, error) {
 	// Ensure that the initial state has been processed.
 	if !p.hasProcessedState {
 		return nil, errInitialStateNotProcessed
@@ -106,7 +106,7 @@
 	if mus.SetRootID && !mus.RootID.IsValid() {
 		extra = 1
 	}
-	changes := make([]watch.Change, 0, len(mus.Delta)+len(mus.Deletions)+extra)
+	changes := make([]types.Change, 0, len(mus.Delta)+len(mus.Deletions)+extra)
 
 	if mus.SetRootID {
 		if mus.RootID.IsValid() {
@@ -120,8 +120,8 @@
 				IsRoot:       true,
 			}
 			// TODO(tilaks): don't clone value.
-			change := watch.Change{
-				State: watch.DoesNotExist,
+			change := types.Change{
+				State: types.DoesNotExist,
 				Value: value,
 			}
 			changes = append(changes, change)
@@ -150,8 +150,8 @@
 			Dir:          flattenDir(mu.Dir),
 		}
 		// TODO(tilaks): don't clone value.
-		change := watch.Change{
-			State: watch.Exists,
+		change := types.Change{
+			State: types.Exists,
 			Value: value,
 		}
 		// TODO(tilaks): don't clone change.
@@ -172,8 +172,8 @@
 			IsRoot:       false,
 		}
 		// TODO(tilaks): don't clone value.
-		change := watch.Change{
-			State: watch.DoesNotExist,
+		change := types.Change{
+			State: types.DoesNotExist,
 			Value: value,
 		}
 		// TODO(tilaks): don't clone change.
diff --git a/services/store/memstore/watch/test_util.go b/services/store/memstore/watch/test_util.go
index 2a5216b..32735bc 100644
--- a/services/store/memstore/watch/test_util.go
+++ b/services/store/memstore/watch/test_util.go
@@ -10,7 +10,7 @@
 	"veyron/services/store/memstore/state"
 
 	"veyron2/security"
-	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 )
 
@@ -129,7 +129,7 @@
 	}
 }
 
-func expectState(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []watch.Change {
+func expectState(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []types.Change {
 	st := readState(t, log)
 	return processState(t, processor, st, numChanges)
 }
@@ -142,7 +142,7 @@
 	return st.State
 }
 
-func processState(t *testing.T, processor reqProcessor, st *state.State, numChanges int) []watch.Change {
+func processState(t *testing.T, processor reqProcessor, st *state.State, numChanges int) []types.Change {
 	changes, err := processor.processState(st)
 	if err != nil {
 		t.Fatalf("processState() failed: %v", err)
@@ -153,7 +153,7 @@
 	return changes
 }
 
-func expectTransaction(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []watch.Change {
+func expectTransaction(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []types.Change {
 	mus := readTransaction(t, log)
 	return processTransaction(t, processor, mus, numChanges)
 }
@@ -166,7 +166,7 @@
 	return mus
 }
 
-func processTransaction(t *testing.T, processor reqProcessor, mus *state.Mutations, numChanges int) []watch.Change {
+func processTransaction(t *testing.T, processor reqProcessor, mus *state.Mutations, numChanges int) []types.Change {
 	changes, err := processor.processTransaction(mus)
 	if err != nil {
 		t.Fatalf("processTransaction() failed: %v", err)
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index c6e007e..437f1cd 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -13,6 +13,7 @@
 	"veyron2/ipc"
 	"veyron2/security"
 	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 	"veyron2/verror"
 )
@@ -21,9 +22,9 @@
 	errWatchClosed            = io.EOF
 	errUnknownResumeMarker    = verror.BadArgf("Unknown ResumeMarker")
 	nowResumeMarker           = []byte("now") // UTF-8 conversion.
-	initialStateSkippedChange = watch.Change{
+	initialStateSkippedChange = types.Change{
 		Name:  "",
-		State: watch.InitialStateSkipped,
+		State: types.InitialStateSkipped,
 	}
 )
 
@@ -65,7 +66,7 @@
 
 // WatchGlob returns a stream of changes that match a pattern.
 func (w *Watcher) WatchGlob(ctx ipc.ServerContext, path storage.PathName,
-	req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
+	req types.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
 
 	processor, err := newGlobProcessor(ctx.RemoteID(), path, req.Pattern)
 	if err != nil {
@@ -76,7 +77,7 @@
 
 // WatchQuery returns a stream of changes that satisfy a query.
 func (w *Watcher) WatchQuery(ctx ipc.ServerContext, path storage.PathName,
-	req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
+	req types.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
 
 	return verror.Internalf("WatchQuery not yet implemented")
 }
@@ -85,7 +86,7 @@
 type WatchStream interface {
 	// Send places the item onto the output stream, blocking if there is no
 	// buffer space available.
-	Send(item watch.ChangeBatch) error
+	Send(item types.ChangeBatch) error
 }
 
 // Watch handles the specified request, processing records in the store log and
@@ -93,7 +94,7 @@
 // otherwise closed early, Watch will terminate and return an error.
 // Watch implements the service.Watcher interface.
 func (w *Watcher) Watch(ctx ipc.ServerContext, processor reqProcessor,
-	resumeMarker watch.ResumeMarker, stream WatchStream) error {
+	resumeMarker types.ResumeMarker, stream WatchStream) error {
 
 	// Closing cancel terminates processRequest.
 	cancel := make(chan struct{})
@@ -124,7 +125,7 @@
 }
 
 func (w *Watcher) processRequest(cancel <-chan struct{}, processor reqProcessor,
-	resumeMarker watch.ResumeMarker, stream WatchStream) error {
+	resumeMarker types.ResumeMarker, stream WatchStream) error {
 
 	log, err := memstore.OpenLog(w.dbName, true)
 	if err != nil {
@@ -150,7 +151,7 @@
 	}
 
 	if isNowResumeMarker(resumeMarker) {
-		sendChanges(stream, []watch.Change{initialStateSkippedChange})
+		sendChanges(stream, []types.Change{initialStateSkippedChange})
 	}
 
 	// Process initial state.
@@ -283,21 +284,21 @@
 	return true, nil
 }
 
-func processChanges(stream WatchStream, changes []watch.Change, timestamp uint64) error {
+func processChanges(stream WatchStream, changes []types.Change, timestamp uint64) error {
 	addContinued(changes)
 	addResumeMarkers(changes, timestampToResumeMarker(timestamp))
 	return sendChanges(stream, changes)
 }
 
-func sendChanges(stream WatchStream, changes []watch.Change) error {
+func sendChanges(stream WatchStream, changes []types.Change) error {
 	if len(changes) == 0 {
 		return nil
 	}
 	// TODO(tilaks): batch more aggressively.
-	return stream.Send(watch.ChangeBatch{Changes: changes})
+	return stream.Send(types.ChangeBatch{Changes: changes})
 }
 
-func addContinued(changes []watch.Change) {
+func addContinued(changes []types.Change) {
 	// Last change marks the end of the processed atomic group.
 	for i, _ := range changes {
 		changes[i].Continued = true
@@ -307,7 +308,7 @@
 	}
 }
 
-func addResumeMarkers(changes []watch.Change, resumeMarker []byte) {
+func addResumeMarkers(changes []types.Change, resumeMarker []byte) {
 	for i, _ := range changes {
 		changes[i].ResumeMarker = resumeMarker
 	}
diff --git a/services/store/memstore/watch/watcher_test.go b/services/store/memstore/watch/watcher_test.go
index 971210e..6e5172c 100644
--- a/services/store/memstore/watch/watcher_test.go
+++ b/services/store/memstore/watch/watcher_test.go
@@ -12,7 +12,7 @@
 	"veyron/services/store/raw"
 
 	"veyron2/rt"
-	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 	"veyron2/verror"
 )
@@ -98,7 +98,7 @@
 
 	// Start a watch request.
 	path := storage.ParsePath("/")
-	req := watch.GlobRequest{Pattern: "..."}
+	req := types.GlobRequest{Pattern: "..."}
 	ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
 
 	rStream := ws.RecvStream()
@@ -544,7 +544,7 @@
 
 	// Start a watch request.
 	path := storage.ParsePath("/")
-	req := watch.GlobRequest{Pattern: "..."}
+	req := types.GlobRequest{Pattern: "..."}
 	ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
 
 	rStream := ws.RecvStream()
diff --git a/services/store/raw/service.vdl b/services/store/raw/service.vdl
index 4ec7e2c..20f0cad 100644
--- a/services/store/raw/service.vdl
+++ b/services/store/raw/service.vdl
@@ -7,7 +7,7 @@
 
 import (
   "veyron2/storage"
-  "veyron2/services/watch"
+  "veyron2/services/watch/types"
 )
 
 const (
@@ -47,14 +47,14 @@
 type Request struct {
   // ResumeMarker specifies how to resume from a previous Watch call.
   // See the ResumeMarker type for detailed comments.
-  ResumeMarker watch.ResumeMarker
+  ResumeMarker types.ResumeMarker
 }
 
 // Store defines a raw interface for the Veyron store. Mutations can be received
 // via the Watcher interface, and committed via PutMutation.
 type Store interface {
   // Watch returns a stream of all changes.
-  Watch(Req Request) stream<_, watch.ChangeBatch> error
+  Watch(Req Request) stream<_, types.ChangeBatch> error
 
   // PutMutations atomically commits a stream of Mutations when the stream is
   // closed. Mutations are not committed if the request is cancelled before
diff --git a/services/store/raw/service.vdl.go b/services/store/raw/service.vdl.go
index 1eb0cfd..d0c755a 100644
--- a/services/store/raw/service.vdl.go
+++ b/services/store/raw/service.vdl.go
@@ -4,7 +4,7 @@
 package raw
 
 import (
-	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 
 	"veyron2/storage"
 
@@ -44,7 +44,7 @@
 type Request struct {
 	// ResumeMarker specifies how to resume from a previous Watch call.
 	// See the ResumeMarker type for detailed comments.
-	ResumeMarker watch.ResumeMarker
+	ResumeMarker types.ResumeMarker
 }
 
 const (
@@ -101,7 +101,7 @@
 		// Value returns the element that was staged by Advance.
 		// Value may panic if Advance returned false or was not
 		// called at all.  Value does not block.
-		Value() watch.ChangeBatch
+		Value() types.ChangeBatch
 
 		// Err returns a non-nil error iff the stream encountered
 		// any errors.  Err does not block.
@@ -129,17 +129,17 @@
 
 type implStoreWatchStreamIterator struct {
 	clientCall _gen_ipc.Call
-	val        watch.ChangeBatch
+	val        types.ChangeBatch
 	err        error
 }
 
 func (c *implStoreWatchStreamIterator) Advance() bool {
-	c.val = watch.ChangeBatch{}
+	c.val = types.ChangeBatch{}
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implStoreWatchStreamIterator) Value() watch.ChangeBatch {
+func (c *implStoreWatchStreamIterator) Value() types.ChangeBatch {
 	return c.val
 }
 
@@ -158,7 +158,7 @@
 
 func (c *implStoreWatchCall) RecvStream() interface {
 	Advance() bool
-	Value() watch.ChangeBatch
+	Value() types.ChangeBatch
 	Err() error
 } {
 	return &c.readStream
@@ -179,7 +179,7 @@
 	serverCall _gen_ipc.ServerCall
 }
 
-func (s *implStoreServiceWatchStreamSender) Send(item watch.ChangeBatch) error {
+func (s *implStoreServiceWatchStreamSender) Send(item types.ChangeBatch) error {
 	return s.serverCall.Send(item)
 }
 
@@ -190,7 +190,7 @@
 	SendStream() interface {
 		// Send places the item onto the output stream, blocking if there is no buffer
 		// space available.  If the client has canceled, an error is returned.
-		Send(item watch.ChangeBatch) error
+		Send(item types.ChangeBatch) error
 	}
 }
 
@@ -202,7 +202,7 @@
 func (s *implStoreServiceWatchStream) SendStream() interface {
 	// Send places the item onto the output stream, blocking if there is no buffer
 	// space available.  If the client has canceled, an error is returned.
-	Send(item watch.ChangeBatch) error
+	Send(item types.ChangeBatch) error
 } {
 	return &s.writer
 }
@@ -491,7 +491,7 @@
 	}
 
 	result.TypeDefs = []_gen_vdlutil.Any{
-		_gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "byte", Tags: []string(nil)}, _gen_wiretype.SliceType{Elem: 0x41, Name: "veyron2/services/watch.ResumeMarker", Tags: []string(nil)}, _gen_wiretype.StructType{
+		_gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "byte", Tags: []string(nil)}, _gen_wiretype.SliceType{Elem: 0x41, Name: "veyron2/services/watch/types.ResumeMarker", Tags: []string(nil)}, _gen_wiretype.StructType{
 			[]_gen_wiretype.FieldType{
 				_gen_wiretype.FieldType{Type: 0x42, Name: "ResumeMarker"},
 			},
@@ -504,12 +504,12 @@
 				_gen_wiretype.FieldType{Type: 0x42, Name: "ResumeMarker"},
 				_gen_wiretype.FieldType{Type: 0x2, Name: "Continued"},
 			},
-			"veyron2/services/watch.Change", []string(nil)},
+			"veyron2/services/watch/types.Change", []string(nil)},
 		_gen_wiretype.SliceType{Elem: 0x46, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
 			[]_gen_wiretype.FieldType{
 				_gen_wiretype.FieldType{Type: 0x47, Name: "Changes"},
 			},
-			"veyron2/services/watch.ChangeBatch", []string(nil)},
+			"veyron2/services/watch/types.ChangeBatch", []string(nil)},
 		_gen_wiretype.ArrayType{Elem: 0x41, Len: 0x10, Name: "veyron2/storage.ID", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x35, Name: "veyron2/storage.Version", Tags: []string(nil)}, _gen_wiretype.StructType{
 			[]_gen_wiretype.FieldType{
 				_gen_wiretype.FieldType{Type: 0x3, Name: "Name"},
diff --git a/services/store/server/object.go b/services/store/server/object.go
index f5bef59..cb78dc2 100644
--- a/services/store/server/object.go
+++ b/services/store/server/object.go
@@ -9,9 +9,10 @@
 	"veyron2/ipc"
 	"veyron2/query"
 	"veyron2/services/mounttable"
-	"veyron2/services/mounttable/types"
+	mttypes "veyron2/services/mounttable/types"
 	"veyron2/services/store"
 	"veyron2/services/watch"
+	watchtypes "veyron2/services/watch/types"
 	"veyron2/storage"
 	"veyron2/vdl/vdlutil"
 	"veyron2/verror"
@@ -141,12 +142,12 @@
 
 type globStreamSenderAdapter struct {
 	stream interface {
-		Send(entry types.MountEntry) error
+		Send(entry mttypes.MountEntry) error
 	}
 }
 
 func (a *globStreamSenderAdapter) Send(item string) error {
-	return a.stream.Send(types.MountEntry{Name: item})
+	return a.stream.Send(mttypes.MountEntry{Name: item})
 }
 
 type globStreamAdapter struct {
@@ -179,11 +180,11 @@
 }
 
 // WatchGlob returns a stream of changes that match a pattern.
-func (o *object) WatchGlob(ctx ipc.ServerContext, req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
+func (o *object) WatchGlob(ctx ipc.ServerContext, req watchtypes.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
 	return o.server.watcher.WatchGlob(ctx, storage.ParsePath(o.name), req, stream)
 }
 
 // WatchQuery returns a stream of changes that satisfy a query.
-func (o *object) WatchQuery(ctx ipc.ServerContext, req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
+func (o *object) WatchQuery(ctx ipc.ServerContext, req watchtypes.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
 	return o.server.watcher.WatchQuery(ctx, storage.ParsePath(o.name), req, stream)
 }
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index 69d9304..f873c9f 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -18,7 +18,7 @@
 	"veyron2/rt"
 	"veyron2/security"
 	"veyron2/services/store"
-	"veyron2/services/watch"
+	"veyron2/services/watch/types"
 	"veyron2/storage"
 	_ "veyron2/vlog"
 	"veyron2/vom"
@@ -374,7 +374,7 @@
 	}
 
 	// Start watch requests on / and /a.
-	req := watch.GlobRequest{Pattern: "..."}
+	req := types.GlobRequest{Pattern: "..."}
 	ws1 := watchtesting.WatchGlob(rootPublicID, o1.WatchGlob, req)
 	ws2 := watchtesting.WatchGlob(rootPublicID, o2.WatchGlob, req)
 
diff --git a/services/wsprd/lib/remove_this.go b/services/wsprd/lib/remove_this.go
index 6368202..7625014 100644
--- a/services/wsprd/lib/remove_this.go
+++ b/services/wsprd/lib/remove_this.go
@@ -4,7 +4,7 @@
 	rps "veyron/examples/rockpaperscissors"
 	mttypes "veyron2/services/mounttable/types"
 	"veyron2/services/store"
-	"veyron2/services/watch"
+	watchtypes "veyron2/services/watch/types"
 	"veyron2/storage"
 	"veyron2/vom"
 )
@@ -15,10 +15,10 @@
 	vom.Register(storage.Stat{})
 	vom.Register(store.NestedResult(0))
 	vom.Register(store.QueryResult{})
-	vom.Register(watch.GlobRequest{})
-	vom.Register(watch.QueryRequest{})
-	vom.Register(watch.ChangeBatch{})
-	vom.Register(watch.Change{})
+	vom.Register(watchtypes.GlobRequest{})
+	vom.Register(watchtypes.QueryRequest{})
+	vom.Register(watchtypes.ChangeBatch{})
+	vom.Register(watchtypes.Change{})
 	vom.Register(rps.GameOptions{})
 	vom.Register(rps.GameID{})
 	vom.Register(rps.PlayResult{})