veyron2/services/store: API for Object-level watches.
Specifics:
- Object now has a Watch method consistent with Query, Glob, etc.
- Store no longer has a Watch method. For full-store watches, clients
can watch the root object.
- Vsync now calls raw.Store.Watch. The vstore handle has been deleted.
Change-Id: Id01b791564d0b684fe9118ef399d129669693267
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index a7c6ab9..7380819 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -63,7 +63,6 @@
// Next steps are to tie this up into mount table and auto-discover neighbors.
neighbors []string
neighborIDs []string
- store raw.Store
updObjects map[storage.ID]*objConflictState
}
@@ -93,16 +92,6 @@
vlog.Fatalf("newInitiator: Mismatch between number of endpoints and IDs")
}
- // TODO(hpucha): Merge this with vstore handler in syncd.
- if syncd.vstoreEndpoint != "" {
- var err error
- i.store, err = raw.BindStore(naming.JoinAddressName(syncd.vstoreEndpoint, raw.RawStoreSuffix))
- if err != nil {
- vlog.Fatalf("newInitiator: cannot connect to Veyron store endpoint (%s): %s",
- syncd.vstoreEndpoint, err)
- }
- }
-
// Override the default peerSyncInterval value if syncTick is specified.
if syncTick > 0 {
peerSyncInterval = syncTick
@@ -514,8 +503,8 @@
// TODO(hpucha): We will hold the lock across PutMutations rpc
// to prevent a race with watcher. The next iteration will
// clean up this coordination.
- if i.store != nil && len(m) > 0 {
- stream, err := i.store.PutMutations()
+ if store := i.syncd.store; store != nil && len(m) > 0 {
+ stream, err := store.PutMutations()
if err != nil {
vlog.Errorf("updateStoreAndSync:: putmutations err %v", err)
return err
diff --git a/runtimes/google/vsync/vsyncd.go b/runtimes/google/vsync/vsyncd.go
index 44b4028..2e6d639 100644
--- a/runtimes/google/vsync/vsyncd.go
+++ b/runtimes/google/vsync/vsyncd.go
@@ -14,8 +14,8 @@
"veyron/services/store/raw"
"veyron2/ipc"
+ "veyron2/naming"
"veyron2/storage"
- "veyron2/storage/vstore"
"veyron2/vlog"
"veyron2/vom"
@@ -39,8 +39,8 @@
closed chan struct{}
// Local Veyron store.
- vstoreEndpoint string
- vstore storage.Store
+ storeEndpoint string
+ store raw.Store
// Handlers for goroutine procedures.
hdlGC *syncGC
@@ -66,28 +66,28 @@
// sync.RWMutex. The spec says that the writers cannot be starved by
// the readers but it does not guarantee FIFO. We may have to revisit
// this in the future.
-func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string, syncTick time.Duration) *syncd {
+func NewSyncd(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint string, syncTick time.Duration) *syncd {
// Connect to the local Veyron store.
// At present this is optional to allow testing (from the command-line) w/o Veyron store running.
// TODO: connecting to Veyron store should be mandatory.
- var st storage.Store
- if vstoreEndpoint != "" {
- vs, err := vstore.New(vstoreEndpoint)
+ var store raw.Store
+ if storeEndpoint != "" {
+ var err error
+ store, err = raw.BindStore(naming.JoinAddressName(storeEndpoint, raw.RawStoreSuffix))
if err != nil {
- vlog.Fatalf("newSyncd: cannot connect to Veyron store endpoint (%s): %s", vstoreEndpoint, err)
+ vlog.Fatalf("NewSyncd: cannot connect to Veyron store endpoint (%s): %s", storeEndpoint, err)
}
- st = vs
}
- return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint, st, syncTick)
+ return newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint, store, syncTick)
}
// newSyncdCore is the internal function that creates the Syncd
// structure and initilizes its thread (goroutines). It takes a
// Veyron Store parameter to separate the core of Syncd setup from the
// external dependency on Veyron Store.
-func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, vstoreEndpoint string,
- store storage.Store, syncTick time.Duration) *syncd {
+func newSyncdCore(peerEndpoints, peerDeviceIDs, devid, storePath, storeEndpoint string,
+ store raw.Store, syncTick time.Duration) *syncd {
s := &syncd{}
// Bootstrap my own DeviceID.
@@ -110,9 +110,9 @@
}
// Veyron Store.
- s.vstoreEndpoint = vstoreEndpoint
- s.vstore = store
- vlog.VI(1).Infof("newSyncd: Local Veyron store: %s", s.vstoreEndpoint)
+ s.storeEndpoint = storeEndpoint
+ s.store = store
+ vlog.VI(1).Infof("newSyncd: Local Veyron store: %s", s.storeEndpoint)
// Register these Watch data types with VOM.
// TODO(tilaks): why aren't they auto-retrieved from the IDL?
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index 5cbf0bf..e081d7f 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -11,6 +11,8 @@
"veyron/services/store/raw"
+ "veyron2"
+ "veyron2/ipc"
"veyron2/query"
"veyron2/services/watch"
"veyron2/storage"
@@ -57,7 +59,7 @@
defer w.syncd.pending.Done()
// If no Veyron store is configured, there is nothing to watch.
- if w.syncd.vstore == nil {
+ if w.syncd.store == nil {
vlog.VI(1).Info("watchStore: Veyron store not configured; skipping the watcher")
return
}
@@ -99,7 +101,7 @@
req.ResumeMarker = resmark
}
- stream, err := w.syncd.vstore.Watch(req)
+ stream, err := w.syncd.store.Watch(req, veyron2.CallTimeout(ipc.NoTimeout))
if err == nil {
return stream
}
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 45be727..30ea172 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -4,7 +4,6 @@
import (
"bytes"
- "errors"
"fmt"
"io"
"os"
@@ -13,7 +12,7 @@
"veyron/services/store/raw"
- "veyron2/query"
+ "veyron2/ipc"
"veyron2/services/watch"
"veyron2/storage"
)
@@ -39,7 +38,19 @@
type fakeVStore struct {
}
-func (v *fakeVStore) Watch(req watch.Request) (watch.WatcherWatchStream, error) {
+func (*fakeVStore) GetMethodTags(_ string, _ ...ipc.ClientCallOpt) ([]interface{}, error) {
+ panic("not implemented")
+}
+
+func (*fakeVStore) UnresolveStep(_ ...ipc.ClientCallOpt) ([]string, error) {
+ panic("not implemented")
+}
+
+func (*fakeVStore) Signature(_ ...ipc.ClientCallOpt) (ipc.ServiceSignature, error) {
+ panic("not implemented")
+}
+
+func (v *fakeVStore) Watch(req watch.Request, _ ...ipc.ClientCallOpt) (watch.WatcherWatchStream, error) {
// If "failWatch" is set, simulate a failed RPC call.
if info.failWatch {
info.failWatchCount++
@@ -53,24 +64,8 @@
return newFakeStream(), nil
}
-func (v *fakeVStore) Bind(path string) storage.Object {
- return nil
-}
-
-func (v *fakeVStore) Glob(t storage.Transaction, pattern string) (storage.GlobStream, error) {
- return nil, errors.New("not implemented")
-}
-
-func (v *fakeVStore) Search(t storage.Transaction, q query.Query) storage.Iterator {
- return nil
-}
-
-func (v *fakeVStore) SetConflictResolver(ty string, r storage.ConflictResolver) {
- return
-}
-
-func (v *fakeVStore) Close() error {
- return nil
+func (*fakeVStore) PutMutations(_ ...ipc.ClientCallOpt) (raw.StorePutMutationsStream, error) {
+ panic("not implemented")
}
// fakeStream is used to simulate the reply stream of the Watch() API.
diff --git a/services/store/raw/service.vdl b/services/store/raw/service.vdl
index 9d02376..03e469f 100644
--- a/services/store/raw/service.vdl
+++ b/services/store/raw/service.vdl
@@ -48,6 +48,7 @@
// Store defines a raw interface for the Veyron store. Mutations can be received
// via the Watcher interface, and committed via PutMutation.
type Store interface {
+ // Watcher is an API for watching updates that satisfy a query.
watch.Watcher
// PutMutations atomically commits a stream of Mutations when the stream is
diff --git a/services/store/server/object.go b/services/store/server/object.go
index 5ec1fa2..741954b 100644
--- a/services/store/server/object.go
+++ b/services/store/server/object.go
@@ -9,6 +9,7 @@
"veyron2/query"
"veyron2/services/mounttable"
"veyron2/services/store"
+ "veyron2/services/watch"
"veyron2/storage"
"veyron2/vdl"
)
@@ -212,3 +213,8 @@
}
return nil
}
+
+// Watch returns a stream of changes.
+func (o *object) Watch(ctx ipc.Context, req watch.Request, stream watch.WatcherServiceWatchStream) error {
+ panic("not implemented")
+}