veyron2/services/store: API for Object-level watches.

Specifics:
 - Object now has a Watch method consistent with Query, Glob, etc.
 - Store no longer has a Watch method. For full-store watches, clients
   can watch the root object.
 - Vsync now calls raw.Store.Watch. The vstore handle has been deleted.

Change-Id: Id01b791564d0b684fe9118ef399d129669693267
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index a7c6ab9..7380819 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -63,7 +63,6 @@
 	// Next steps are to tie this up into mount table and auto-discover neighbors.
 	neighbors   []string
 	neighborIDs []string
-	store       raw.Store
 
 	updObjects map[storage.ID]*objConflictState
 }
@@ -93,16 +92,6 @@
 		vlog.Fatalf("newInitiator: Mismatch between number of endpoints and IDs")
 	}
 
-	// TODO(hpucha): Merge this with vstore handler in syncd.
-	if syncd.vstoreEndpoint != "" {
-		var err error
-		i.store, err = raw.BindStore(naming.JoinAddressName(syncd.vstoreEndpoint, raw.RawStoreSuffix))
-		if err != nil {
-			vlog.Fatalf("newInitiator: cannot connect to Veyron store endpoint (%s): %s",
-				syncd.vstoreEndpoint, err)
-		}
-	}
-
 	// Override the default peerSyncInterval value if syncTick is specified.
 	if syncTick > 0 {
 		peerSyncInterval = syncTick
@@ -514,8 +503,8 @@
 	// TODO(hpucha): We will hold the lock across PutMutations rpc
 	// to prevent a race with watcher. The next iteration will
 	// clean up this coordination.
-	if i.store != nil && len(m) > 0 {
-		stream, err := i.store.PutMutations()
+	if store := i.syncd.store; store != nil && len(m) > 0 {
+		stream, err := store.PutMutations()
 		if err != nil {
 			vlog.Errorf("updateStoreAndSync:: putmutations err %v", err)
 			return err
diff --git a/runtimes/google/vsync/vsyncd.go b/runtimes/google/vsync/vsyncd.go
index 44b4028..2e6d639 100644
--- a/runtimes/google/vsync/vsyncd.go
+++ b/runtimes/google/vsync/vsyncd.go
@@ -14,8 +14,8 @@
 	"veyron/services/store/raw"
 
 	"veyron2/ipc"
+	"veyron2/naming"
 	"veyron2/storage"
-	"veyron2/storage/vstore"
 	"veyron2/vlog"
 	"veyron2/vom"
 
@@ -39,8 +39,8 @@
 	closed  chan struct{}
 
 	// Local Veyron store.
-	vstoreEndpoint string
-	vstore         storage.Store
+	storeEndpoint string
+	store         raw.Store
 
 	// Handlers for goroutine procedures.
 	hdlGC        *syncGC
@@ -66,28 +66,28 @@
 // sync.RWMutex. The spec says that the writers cannot be starved by
 // the readers but it does not guarantee FIFO. We may have to revisit
 // this in the future.
-func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string, syncTick time.Duration) *syncd {
+func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint string, syncTick time.Duration) *syncd {
 	// Connect to the local Veyron store.
 	// At present this is optional to allow testing (from the command-line) w/o Veyron store running.
 	// TODO: connecting to Veyron store should be mandatory.
-	var st storage.Store
-	if vstoreEndpoint != "" {
-		vs, err := vstore.New(vstoreEndpoint)
+	var store raw.Store
+	if storeEndpoint != "" {
+		var err error
+		store, err = raw.BindStore(naming.JoinAddressName(storeEndpoint, raw.RawStoreSuffix))
 		if err != nil {
-			vlog.Fatalf("newSyncd: cannot connect to Veyron store endpoint (%s): %s", vstoreEndpoint, err)
+			vlog.Fatalf("NewSyncd: cannot connect to Veyron store endpoint (%s): %s", storeEndpoint, err)
 		}
-		st = vs
 	}
 
-	return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint, st, syncTick)
+	return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint, store, syncTick)
 }
 
 // newSyncdCore is the internal function that creates the Syncd
 // structure and initilizes its thread (goroutines).  It takes a
 // Veyron Store parameter to separate the core of Syncd setup from the
 // external dependency on Veyron Store.
-func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string,
-	store storage.Store, syncTick time.Duration) *syncd {
+func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint string,
+	store raw.Store, syncTick time.Duration) *syncd {
 	s := &syncd{}
 
 	// Bootstrap my own DeviceID.
@@ -110,9 +110,9 @@
 	}
 
 	// Veyron Store.
-	s.vstoreEndpoint = vstoreEndpoint
-	s.vstore = store
-	vlog.VI(1).Infof("newSyncd: Local Veyron store: %s", s.vstoreEndpoint)
+	s.storeEndpoint = storeEndpoint
+	s.store = store
+	vlog.VI(1).Infof("newSyncd: Local Veyron store: %s", s.storeEndpoint)
 
 	// Register these Watch data types with VOM.
 	// TODO(tilaks): why aren't they auto-retrieved from the IDL?
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index 5cbf0bf..e081d7f 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -11,6 +11,8 @@
 
 	"veyron/services/store/raw"
 
+	"veyron2"
+	"veyron2/ipc"
 	"veyron2/query"
 	"veyron2/services/watch"
 	"veyron2/storage"
@@ -57,7 +59,7 @@
 	defer w.syncd.pending.Done()
 
 	// If no Veyron store is configured, there is nothing to watch.
-	if w.syncd.vstore == nil {
+	if w.syncd.store == nil {
 		vlog.VI(1).Info("watchStore: Veyron store not configured; skipping the watcher")
 		return
 	}
@@ -99,7 +101,7 @@
 			req.ResumeMarker = resmark
 		}
 
-		stream, err := w.syncd.vstore.Watch(req)
+		stream, err := w.syncd.store.Watch(req, veyron2.CallTimeout(ipc.NoTimeout))
 		if err == nil {
 			return stream
 		}
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 45be727..30ea172 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -4,7 +4,6 @@
 
 import (
 	"bytes"
-	"errors"
 	"fmt"
 	"io"
 	"os"
@@ -13,7 +12,7 @@
 
 	"veyron/services/store/raw"
 
-	"veyron2/query"
+	"veyron2/ipc"
 	"veyron2/services/watch"
 	"veyron2/storage"
 )
@@ -39,7 +38,19 @@
 type fakeVStore struct {
 }
 
-func (v *fakeVStore) Watch(req watch.Request) (watch.WatcherWatchStream, error) {
+func (*fakeVStore) GetMethodTags(_ string, _ ...ipc.ClientCallOpt) ([]interface{}, error) {
+	panic("not implemented")
+}
+
+func (*fakeVStore) UnresolveStep(_ ...ipc.ClientCallOpt) ([]string, error) {
+	panic("not implemented")
+}
+
+func (*fakeVStore) Signature(_ ...ipc.ClientCallOpt) (ipc.ServiceSignature, error) {
+	panic("not implemented")
+}
+
+func (v *fakeVStore) Watch(req watch.Request, _ ...ipc.ClientCallOpt) (watch.WatcherWatchStream, error) {
 	// If "failWatch" is set, simulate a failed RPC call.
 	if info.failWatch {
 		info.failWatchCount++
@@ -53,24 +64,8 @@
 	return newFakeStream(), nil
 }
 
-func (v *fakeVStore) Bind(path string) storage.Object {
-	return nil
-}
-
-func (v *fakeVStore) Glob(t storage.Transaction, pattern string) (storage.GlobStream, error) {
-	return nil, errors.New("not implemented")
-}
-
-func (v *fakeVStore) Search(t storage.Transaction, q query.Query) storage.Iterator {
-	return nil
-}
-
-func (v *fakeVStore) SetConflictResolver(ty string, r storage.ConflictResolver) {
-	return
-}
-
-func (v *fakeVStore) Close() error {
-	return nil
+func (*fakeVStore) PutMutations(_ ...ipc.ClientCallOpt) (raw.StorePutMutationsStream, error) {
+	panic("not implemented")
 }
 
 // fakeStream is used to simulate the reply stream of the Watch() API.
diff --git a/services/store/raw/service.vdl b/services/store/raw/service.vdl
index 9d02376..03e469f 100644
--- a/services/store/raw/service.vdl
+++ b/services/store/raw/service.vdl
@@ -48,6 +48,7 @@
 // Store defines a raw interface for the Veyron store. Mutations can be received
 // via the Watcher interface, and committed via PutMutation.
 type Store interface {
+  // Watcher is an API for watching updates that satisfy a query.
   watch.Watcher
 
   // PutMutations atomically commits a stream of Mutations when the stream is
diff --git a/services/store/server/object.go b/services/store/server/object.go
index 5ec1fa2..741954b 100644
--- a/services/store/server/object.go
+++ b/services/store/server/object.go
@@ -9,6 +9,7 @@
 	"veyron2/query"
 	"veyron2/services/mounttable"
 	"veyron2/services/store"
+	"veyron2/services/watch"
 	"veyron2/storage"
 	"veyron2/vdl"
 )
@@ -212,3 +213,8 @@
 	}
 	return nil
 }
+
+// Watch returns a stream of changes.
+func (o *object) Watch(ctx ipc.Context, req watch.Request, stream watch.WatcherServiceWatchStream) error {
+	panic("not implemented")
+}