veyron2/services/watch: Split types into separate pkg
Split the watch interface into two separate packages: one for the interface
itself, and one for the types.
This will allow implementations to use the generated types without
depending on the generated stubs.
Change-Id: I8b52a865a3682dfb86aed7c1e56d29b2c9a27f44
diff --git a/examples/boxes/android/src/boxesp2p/main.go b/examples/boxes/android/src/boxesp2p/main.go
index 34c2ccd..ef39c88 100644
--- a/examples/boxes/android/src/boxesp2p/main.go
+++ b/examples/boxes/android/src/boxesp2p/main.go
@@ -77,7 +77,7 @@
"veyron2/naming"
"veyron2/rt"
"veyron2/security"
- iwatch "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
"veyron2/storage/vstore"
"veyron2/vom"
@@ -199,7 +199,7 @@
// Watch for any box updates from the store
go func() {
- req := iwatch.GlobRequest{Pattern: "*"}
+ req := types.GlobRequest{Pattern: "*"}
stream, err := root.WatchGlob(ctx, req)
if err != nil {
panic(fmt.Errorf("Can't watch store: %s: %s", gs.storeEndpoint, err))
diff --git a/examples/stfortune/stfortune/main.go b/examples/stfortune/stfortune/main.go
index aff4ff4..b48462c 100644
--- a/examples/stfortune/stfortune/main.go
+++ b/examples/stfortune/stfortune/main.go
@@ -20,7 +20,7 @@
"veyron2/context"
"veyron2/naming"
"veyron2/rt"
- iwatch "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
"veyron2/storage/vstore"
"veyron2/vom"
@@ -56,7 +56,7 @@
paths := []string{appPath, fortunePath(""), userPath("")}
for _, path := range paths {
abspath := naming.Join(storeAddress, path)
- req := iwatch.GlobRequest{Pattern: ""}
+ req := types.GlobRequest{Pattern: ""}
stream, err := vstore.New().Bind(abspath).WatchGlob(ctx, req)
if err != nil {
log.Fatalf("WatchGlob %s failed: %v", abspath, err)
@@ -89,7 +89,7 @@
fmt.Printf("Running as a Watcher monitoring new fortunes under %s...\n", path)
abspath := naming.Join(storeAddress, path)
- req := iwatch.GlobRequest{Pattern: "*"}
+ req := types.GlobRequest{Pattern: "*"}
stream, err := vstore.New().Bind(abspath).WatchGlob(ctx, req)
if err != nil {
log.Fatalf("watcher WatchGlob %s failed: %v", abspath, err)
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index 3fc5465..58fa467 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -15,6 +15,7 @@
"veyron2/context"
"veyron2/rt"
"veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/vlog"
)
@@ -156,7 +157,7 @@
// processChanges applies the batch of changes (object mutations) received from the Watch API.
// The function grabs the write-lock to access the Log and DAG DBs.
-func (w *syncWatcher) processChanges(changes watch.ChangeBatch, syncTime int64) error {
+func (w *syncWatcher) processChanges(changes types.ChangeBatch, syncTime int64) error {
w.syncd.lock.Lock()
defer w.syncd.lock.Unlock()
@@ -181,7 +182,7 @@
// All LogValues belonging to the same transaction get the same timestamp.
time = w.curTxSyncTime
}
- val := &LogValue{Mutation: *mu, SyncTime: time, Delete: ch.State == watch.DoesNotExist, Continued: ch.Continued}
+ val := &LogValue{Mutation: *mu, SyncTime: time, Delete: ch.State == types.DoesNotExist, Continued: ch.Continued}
vlog.VI(2).Infof("processChanges:: processing record %v, Tx %v", val, w.curTx)
if err := w.syncd.log.processWatchRecord(mu.ID, mu.Version, mu.PriorVersion, val, w.curTx); err != nil {
return fmt.Errorf("cannot process mutation: %#v: %s", ch, err)
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 0725b84..f6848d7 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -15,7 +15,7 @@
"veyron2/context"
"veyron2/ipc"
"veyron2/rt"
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
)
@@ -84,7 +84,7 @@
func (s *fakeStream) RecvStream() interface {
Advance() bool
- Value() watch.ChangeBatch
+ Value() types.ChangeBatch
Err() error
} {
return s
@@ -118,7 +118,7 @@
return true
}
-func (s *fakeStream) Value() watch.ChangeBatch {
+func (s *fakeStream) Value() types.ChangeBatch {
changes := getChangeBatch()
var lastCount byte
@@ -152,12 +152,12 @@
// getChangeBatch returns a batch of store mutations used to simulate the Watch API.
// The batch contains two transactions to verify both new-object creation and the
// mutation of an existing object.
-func getChangeBatch() watch.ChangeBatch {
- var batch watch.ChangeBatch
+func getChangeBatch() types.ChangeBatch {
+ var batch types.ChangeBatch
- batch.Changes = []watch.Change{
+ batch.Changes = []types.Change{
// 1st transaction: create "/" and "/a" and "/a/b" as 3 new objects (prior versions are 0).
- watch.Change{
+ types.Change{
Name: "",
State: 0,
Value: &raw.Mutation{
@@ -177,7 +177,7 @@
ResumeMarker: nil,
Continued: true,
},
- watch.Change{
+ types.Change{
Name: "",
State: 0,
Value: &raw.Mutation{
@@ -197,7 +197,7 @@
ResumeMarker: nil,
Continued: true,
},
- watch.Change{
+ types.Change{
Name: "",
State: 0,
Value: &raw.Mutation{
@@ -213,7 +213,7 @@
},
// 2nd transaction: create "/a/c" as a new object, which also updates "a" (its "Dir" field).
- watch.Change{
+ types.Change{
Name: "",
State: 0,
Value: &raw.Mutation{
@@ -238,7 +238,7 @@
ResumeMarker: nil,
Continued: true,
},
- watch.Change{
+ types.Change{
Name: "",
State: 0,
Value: &raw.Mutation{
@@ -254,7 +254,7 @@
},
// 3rd transaction: remove "/a/b" which updates "a" (its "Dir" field) and deletes "b".
- watch.Change{
+ types.Change{
Name: "",
State: 0,
Value: &raw.Mutation{
@@ -274,9 +274,9 @@
ResumeMarker: nil,
Continued: true,
},
- watch.Change{
+ types.Change{
Name: "",
- State: watch.DoesNotExist,
+ State: types.DoesNotExist,
Value: &raw.Mutation{
ID: storage.ID{0x6e, 0x4a, 0x32, 0x7c, 0x29, 0x7d, 0x76, 0xfb,
0x51, 0x42, 0xb1, 0xb1, 0xd9, 0x5b, 0x2d, 0x7},
diff --git a/services/store/memstore/blackbox/util.go b/services/store/memstore/blackbox/util.go
index 54b500e..c494310 100644
--- a/services/store/memstore/blackbox/util.go
+++ b/services/store/memstore/blackbox/util.go
@@ -12,7 +12,7 @@
"veyron/services/store/raw"
"veyron2/security"
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
)
@@ -168,7 +168,7 @@
}
}
-func Mutations(changes []watch.Change) []raw.Mutation {
+func Mutations(changes []types.Change) []raw.Mutation {
mutations := make([]raw.Mutation, len(changes))
for i, change := range changes {
mutations[i] = *(change.Value.(*raw.Mutation))
diff --git a/services/store/memstore/testing/util.go b/services/store/memstore/testing/util.go
index 4fc237e..293bc75 100644
--- a/services/store/memstore/testing/util.go
+++ b/services/store/memstore/testing/util.go
@@ -14,6 +14,7 @@
"veyron2/rt"
"veyron2/security"
"veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
)
@@ -153,10 +154,10 @@
type watcherServiceWatchStreamSender struct {
mu *sync.Mutex
ctx ipc.ServerContext
- output chan<- watch.ChangeBatch
+ output chan<- types.ChangeBatch
}
-func (s *watcherServiceWatchStreamSender) Send(cb watch.ChangeBatch) error {
+func (s *watcherServiceWatchStreamSender) Send(cb types.ChangeBatch) error {
s.mu.Lock()
defer s.mu.Unlock()
select {
@@ -173,7 +174,7 @@
}
func (s *watcherServiceWatchStream) SendStream() interface {
- Send(cb watch.ChangeBatch) error
+ Send(cb types.ChangeBatch) error
} {
return s
}
@@ -182,8 +183,8 @@
// watcherWatchStream implements watch.WatcherWatchStream.
type watcherWatchStream struct {
ctx *FakeServerContext
- value watch.ChangeBatch
- input <-chan watch.ChangeBatch
+ value types.ChangeBatch
+ input <-chan types.ChangeBatch
err <-chan error
}
@@ -193,7 +194,7 @@
return ok
}
-func (s *watcherWatchStream) Value() watch.ChangeBatch {
+func (s *watcherWatchStream) Value() types.ChangeBatch {
return s.value
}
@@ -212,7 +213,7 @@
func (s *watcherWatchStream) RecvStream() interface {
Advance() bool
- Value() watch.ChangeBatch
+ Value() types.ChangeBatch
Err() error
} {
return s
@@ -221,7 +222,7 @@
func watchImpl(id security.PublicID, watchFn func(ipc.ServerContext, *watcherServiceWatchStream) error) *watcherWatchStream {
mu := &sync.Mutex{}
ctx := NewFakeServerContext(id)
- c := make(chan watch.ChangeBatch, 1)
+ c := make(chan types.ChangeBatch, 1)
errc := make(chan error, 1)
go func() {
stream := &watcherServiceWatchStream{
@@ -252,23 +253,23 @@
})
}
-func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req watch.GlobRequest) watch.GlobWatcherWatchGlobCall {
+func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, types.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req types.GlobRequest) watch.GlobWatcherWatchGlobCall {
return watchImpl(id, func(ctx ipc.ServerContext, iterator *watcherServiceWatchStream) error {
return watchFn(ctx, req, iterator)
})
}
-func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req watch.GlobRequest) watch.GlobWatcherWatchGlobCall {
+func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, types.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req types.GlobRequest) watch.GlobWatcherWatchGlobCall {
return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
return watchFn(ctx, path, req, stream)
})
}
-func ExpectInitialStateSkipped(t *testing.T, change watch.Change) {
+func ExpectInitialStateSkipped(t *testing.T, change types.Change) {
if change.Name != "" {
t.Fatalf("Expect Name to be \"\" but was: %v", change.Name)
}
- if change.State != watch.InitialStateSkipped {
+ if change.State != types.InitialStateSkipped {
t.Fatalf("Expect State to be InitialStateSkipped but was: %v", change.State)
}
if len(change.ResumeMarker) != 0 {
@@ -276,9 +277,9 @@
}
}
-func ExpectEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
+func ExpectEntryExists(t *testing.T, changes []types.Change, name string, id storage.ID, value string) {
change := findEntry(t, changes, name)
- if change.State != watch.Exists {
+ if change.State != types.Exists {
t.Fatalf("Expected name to exist: %v", name)
}
cv, ok := change.Value.(*storage.Entry)
@@ -293,9 +294,9 @@
}
}
-func ExpectEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
+func ExpectEntryDoesNotExist(t *testing.T, changes []types.Change, name string) {
change := findEntry(t, changes, name)
- if change.State != watch.DoesNotExist {
+ if change.State != types.DoesNotExist {
t.Fatalf("Expected name to not exist: %v", name)
}
if change.Value != nil {
@@ -303,9 +304,9 @@
}
}
-func ExpectServiceEntryExists(t *testing.T, changes []watch.Change, name string, id storage.ID, value string) {
+func ExpectServiceEntryExists(t *testing.T, changes []types.Change, name string, id storage.ID, value string) {
change := findEntry(t, changes, name)
- if change.State != watch.Exists {
+ if change.State != types.Exists {
t.Fatalf("Expected name to exist: %v", name)
}
cv, ok := change.Value.(*storage.Entry)
@@ -320,9 +321,9 @@
}
}
-func ExpectServiceEntryDoesNotExist(t *testing.T, changes []watch.Change, name string) {
+func ExpectServiceEntryDoesNotExist(t *testing.T, changes []types.Change, name string) {
change := findEntry(t, changes, name)
- if change.State != watch.DoesNotExist {
+ if change.State != types.DoesNotExist {
t.Fatalf("Expected name to not exist: %v", name)
}
if change.Value != nil {
@@ -330,7 +331,7 @@
}
}
-func findEntry(t *testing.T, changes []watch.Change, name string) watch.Change {
+func findEntry(t *testing.T, changes []types.Change, name string) types.Change {
for _, change := range changes {
if change.Name == name {
return change
@@ -351,9 +352,9 @@
}}
}
-func ExpectMutationExists(t *testing.T, changes []watch.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
+func ExpectMutationExists(t *testing.T, changes []types.Change, id storage.ID, pre, post storage.Version, isRoot bool, value string, dir []storage.DEntry) {
change := findMutation(t, changes, id)
- if change.State != watch.Exists {
+ if change.State != types.Exists {
t.Fatalf("Expected id to exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
@@ -372,9 +373,9 @@
expectDirEquals(t, cv.Dir, dir)
}
-func ExpectMutationDoesNotExist(t *testing.T, changes []watch.Change, id storage.ID, pre storage.Version, isRoot bool) {
+func ExpectMutationDoesNotExist(t *testing.T, changes []types.Change, id storage.ID, pre storage.Version, isRoot bool) {
change := findMutation(t, changes, id)
- if change.State != watch.DoesNotExist {
+ if change.State != types.DoesNotExist {
t.Fatalf("Expected id to not exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
@@ -395,9 +396,9 @@
}
}
-func ExpectMutationExistsNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID, value string) {
+func ExpectMutationExistsNoVersionCheck(t *testing.T, changes []types.Change, id storage.ID, value string) {
change := findMutation(t, changes, id)
- if change.State != watch.Exists {
+ if change.State != types.Exists {
t.Fatalf("Expected id to exist: %v", id)
}
cv := change.Value.(*raw.Mutation)
@@ -406,14 +407,14 @@
}
}
-func ExpectMutationDoesNotExistNoVersionCheck(t *testing.T, changes []watch.Change, id storage.ID) {
+func ExpectMutationDoesNotExistNoVersionCheck(t *testing.T, changes []types.Change, id storage.ID) {
change := findMutation(t, changes, id)
- if change.State != watch.DoesNotExist {
+ if change.State != types.DoesNotExist {
t.Fatalf("Expected id to not exist: %v", id)
}
}
-func findMutation(t *testing.T, changes []watch.Change, id storage.ID) watch.Change {
+func findMutation(t *testing.T, changes []types.Change, id storage.ID) types.Change {
for _, change := range changes {
cv, ok := change.Value.(*raw.Mutation)
if !ok {
diff --git a/services/store/memstore/watch/glob_processor.go b/services/store/memstore/watch/glob_processor.go
index 6254a17..500e256 100644
--- a/services/store/memstore/watch/glob_processor.go
+++ b/services/store/memstore/watch/glob_processor.go
@@ -5,7 +5,7 @@
"veyron/services/store/memstore/state"
"veyron2/security"
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
)
@@ -37,7 +37,7 @@
}, nil
}
-func (p *globProcessor) processState(st *state.State) ([]watch.Change, error) {
+func (p *globProcessor) processState(st *state.State) ([]types.Change, error) {
// Check that the initial state has not already been processed.
if p.hasProcessedState {
return nil, errInitialStateAlreadyProcessed
@@ -53,15 +53,15 @@
p.st = st
p.matches = matches
- var changes []watch.Change
+ var changes []types.Change
// Create a change for every matching name.
for name, id := range matches {
cell := sn.Find(id)
entry := cell.GetEntry()
- change := watch.Change{
+ change := types.Change{
Name: name,
- State: watch.Exists,
+ State: types.Exists,
Value: entry,
}
// TODO(tilaks): don't clone change.
@@ -71,7 +71,7 @@
return changes, nil
}
-func (p *globProcessor) processTransaction(mus *state.Mutations) ([]watch.Change, error) {
+func (p *globProcessor) processTransaction(mus *state.Mutations) ([]types.Change, error) {
// Ensure that the initial state has been processed.
if !p.hasProcessedState {
return nil, errInitialStateNotProcessed
@@ -90,15 +90,15 @@
}
p.matches = newMatches
- var changes []watch.Change
+ var changes []types.Change
removed, updated := diffMatches(previousMatches, newMatches, mus.Delta)
// Create a change for every matching name that was removed.
for name := range removed {
- change := watch.Change{
+ change := types.Change{
Name: name,
- State: watch.DoesNotExist,
+ State: types.DoesNotExist,
}
// TODO(tilaks): don't clone change
changes = append(changes, change)
@@ -109,9 +109,9 @@
id := newMatches[name]
cell := sn.Find(id)
entry := cell.GetEntry()
- change := watch.Change{
+ change := types.Change{
Name: name,
- State: watch.Exists,
+ State: types.Exists,
Value: entry,
}
// TODO(tilaks): don't clone change.
diff --git a/services/store/memstore/watch/processor.go b/services/store/memstore/watch/processor.go
index ce3e07e..a3779e6 100644
--- a/services/store/memstore/watch/processor.go
+++ b/services/store/memstore/watch/processor.go
@@ -2,7 +2,7 @@
import (
"veyron/services/store/memstore/state"
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/verror"
)
@@ -21,10 +21,10 @@
// originally created the initial state (e.g. in the case of compress), but
// are sufficient to re-construct the state viewable within the request.
// processState may modify its input.
- processState(st *state.State) ([]watch.Change, error)
+ processState(st *state.State) ([]types.Change, error)
// processTransaction returns the set of changes made in some transaction.
// The changes are returned in no specific order.
// processTransaction may modify its input.
- processTransaction(mu *state.Mutations) ([]watch.Change, error)
+ processTransaction(mu *state.Mutations) ([]types.Change, error)
}
diff --git a/services/store/memstore/watch/raw_processor.go b/services/store/memstore/watch/raw_processor.go
index 962e0bd..3de157d 100644
--- a/services/store/memstore/watch/raw_processor.go
+++ b/services/store/memstore/watch/raw_processor.go
@@ -6,7 +6,7 @@
"veyron/services/store/raw"
"veyron2/security"
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
"veyron2/verror"
)
@@ -45,7 +45,7 @@
}, nil
}
-func (p *rawProcessor) processState(st *state.State) ([]watch.Change, error) {
+func (p *rawProcessor) processState(st *state.State) ([]types.Change, error) {
// Check that the initial state has not already been processed.
if p.hasProcessedState {
return nil, errInitialStateAlreadyProcessed
@@ -60,7 +60,7 @@
}
p.rootID = rootID
- var changes []watch.Change
+ var changes []types.Change
// Create a change for each id in the state. In each change, the object
// exists, has no PriorVersion, has the Version of the new cell, and
@@ -85,8 +85,8 @@
Value: cell.Value,
Dir: flattenDir(refs.FlattenDir(cell.Dir)),
}
- change := watch.Change{
- State: watch.Exists,
+ change := types.Change{
+ State: types.Exists,
Value: value,
}
// TODO(tilaks): don't clone change
@@ -95,7 +95,7 @@
return changes, nil
}
-func (p *rawProcessor) processTransaction(mus *state.Mutations) ([]watch.Change, error) {
+func (p *rawProcessor) processTransaction(mus *state.Mutations) ([]types.Change, error) {
// Ensure that the initial state has been processed.
if !p.hasProcessedState {
return nil, errInitialStateNotProcessed
@@ -106,7 +106,7 @@
if mus.SetRootID && !mus.RootID.IsValid() {
extra = 1
}
- changes := make([]watch.Change, 0, len(mus.Delta)+len(mus.Deletions)+extra)
+ changes := make([]types.Change, 0, len(mus.Delta)+len(mus.Deletions)+extra)
if mus.SetRootID {
if mus.RootID.IsValid() {
@@ -120,8 +120,8 @@
IsRoot: true,
}
// TODO(tilaks): don't clone value.
- change := watch.Change{
- State: watch.DoesNotExist,
+ change := types.Change{
+ State: types.DoesNotExist,
Value: value,
}
changes = append(changes, change)
@@ -150,8 +150,8 @@
Dir: flattenDir(mu.Dir),
}
// TODO(tilaks): don't clone value.
- change := watch.Change{
- State: watch.Exists,
+ change := types.Change{
+ State: types.Exists,
Value: value,
}
// TODO(tilaks): don't clone change.
@@ -172,8 +172,8 @@
IsRoot: false,
}
// TODO(tilaks): don't clone value.
- change := watch.Change{
- State: watch.DoesNotExist,
+ change := types.Change{
+ State: types.DoesNotExist,
Value: value,
}
// TODO(tilaks): don't clone change.
diff --git a/services/store/memstore/watch/test_util.go b/services/store/memstore/watch/test_util.go
index 2a5216b..32735bc 100644
--- a/services/store/memstore/watch/test_util.go
+++ b/services/store/memstore/watch/test_util.go
@@ -10,7 +10,7 @@
"veyron/services/store/memstore/state"
"veyron2/security"
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
)
@@ -129,7 +129,7 @@
}
}
-func expectState(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []watch.Change {
+func expectState(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []types.Change {
st := readState(t, log)
return processState(t, processor, st, numChanges)
}
@@ -142,7 +142,7 @@
return st.State
}
-func processState(t *testing.T, processor reqProcessor, st *state.State, numChanges int) []watch.Change {
+func processState(t *testing.T, processor reqProcessor, st *state.State, numChanges int) []types.Change {
changes, err := processor.processState(st)
if err != nil {
t.Fatalf("processState() failed: %v", err)
@@ -153,7 +153,7 @@
return changes
}
-func expectTransaction(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []watch.Change {
+func expectTransaction(t *testing.T, log *memstore.RLog, processor reqProcessor, numChanges int) []types.Change {
mus := readTransaction(t, log)
return processTransaction(t, processor, mus, numChanges)
}
@@ -166,7 +166,7 @@
return mus
}
-func processTransaction(t *testing.T, processor reqProcessor, mus *state.Mutations, numChanges int) []watch.Change {
+func processTransaction(t *testing.T, processor reqProcessor, mus *state.Mutations, numChanges int) []types.Change {
changes, err := processor.processTransaction(mus)
if err != nil {
t.Fatalf("processTransaction() failed: %v", err)
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index c6e007e..437f1cd 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -13,6 +13,7 @@
"veyron2/ipc"
"veyron2/security"
"veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
"veyron2/verror"
)
@@ -21,9 +22,9 @@
errWatchClosed = io.EOF
errUnknownResumeMarker = verror.BadArgf("Unknown ResumeMarker")
nowResumeMarker = []byte("now") // UTF-8 conversion.
- initialStateSkippedChange = watch.Change{
+ initialStateSkippedChange = types.Change{
Name: "",
- State: watch.InitialStateSkipped,
+ State: types.InitialStateSkipped,
}
)
@@ -65,7 +66,7 @@
// WatchGlob returns a stream of changes that match a pattern.
func (w *Watcher) WatchGlob(ctx ipc.ServerContext, path storage.PathName,
- req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
+ req types.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
processor, err := newGlobProcessor(ctx.RemoteID(), path, req.Pattern)
if err != nil {
@@ -76,7 +77,7 @@
// WatchQuery returns a stream of changes that satisfy a query.
func (w *Watcher) WatchQuery(ctx ipc.ServerContext, path storage.PathName,
- req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
+ req types.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
return verror.Internalf("WatchQuery not yet implemented")
}
@@ -85,7 +86,7 @@
type WatchStream interface {
// Send places the item onto the output stream, blocking if there is no
// buffer space available.
- Send(item watch.ChangeBatch) error
+ Send(item types.ChangeBatch) error
}
// Watch handles the specified request, processing records in the store log and
@@ -93,7 +94,7 @@
// otherwise closed early, Watch will terminate and return an error.
// Watch implements the service.Watcher interface.
func (w *Watcher) Watch(ctx ipc.ServerContext, processor reqProcessor,
- resumeMarker watch.ResumeMarker, stream WatchStream) error {
+ resumeMarker types.ResumeMarker, stream WatchStream) error {
// Closing cancel terminates processRequest.
cancel := make(chan struct{})
@@ -124,7 +125,7 @@
}
func (w *Watcher) processRequest(cancel <-chan struct{}, processor reqProcessor,
- resumeMarker watch.ResumeMarker, stream WatchStream) error {
+ resumeMarker types.ResumeMarker, stream WatchStream) error {
log, err := memstore.OpenLog(w.dbName, true)
if err != nil {
@@ -150,7 +151,7 @@
}
if isNowResumeMarker(resumeMarker) {
- sendChanges(stream, []watch.Change{initialStateSkippedChange})
+ sendChanges(stream, []types.Change{initialStateSkippedChange})
}
// Process initial state.
@@ -283,21 +284,21 @@
return true, nil
}
-func processChanges(stream WatchStream, changes []watch.Change, timestamp uint64) error {
+func processChanges(stream WatchStream, changes []types.Change, timestamp uint64) error {
addContinued(changes)
addResumeMarkers(changes, timestampToResumeMarker(timestamp))
return sendChanges(stream, changes)
}
-func sendChanges(stream WatchStream, changes []watch.Change) error {
+func sendChanges(stream WatchStream, changes []types.Change) error {
if len(changes) == 0 {
return nil
}
// TODO(tilaks): batch more aggressively.
- return stream.Send(watch.ChangeBatch{Changes: changes})
+ return stream.Send(types.ChangeBatch{Changes: changes})
}
-func addContinued(changes []watch.Change) {
+func addContinued(changes []types.Change) {
// Last change marks the end of the processed atomic group.
for i, _ := range changes {
changes[i].Continued = true
@@ -307,7 +308,7 @@
}
}
-func addResumeMarkers(changes []watch.Change, resumeMarker []byte) {
+func addResumeMarkers(changes []types.Change, resumeMarker []byte) {
for i, _ := range changes {
changes[i].ResumeMarker = resumeMarker
}
diff --git a/services/store/memstore/watch/watcher_test.go b/services/store/memstore/watch/watcher_test.go
index 971210e..6e5172c 100644
--- a/services/store/memstore/watch/watcher_test.go
+++ b/services/store/memstore/watch/watcher_test.go
@@ -12,7 +12,7 @@
"veyron/services/store/raw"
"veyron2/rt"
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
"veyron2/verror"
)
@@ -98,7 +98,7 @@
// Start a watch request.
path := storage.ParsePath("/")
- req := watch.GlobRequest{Pattern: "..."}
+ req := types.GlobRequest{Pattern: "..."}
ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
rStream := ws.RecvStream()
@@ -544,7 +544,7 @@
// Start a watch request.
path := storage.ParsePath("/")
- req := watch.GlobRequest{Pattern: "..."}
+ req := types.GlobRequest{Pattern: "..."}
ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
rStream := ws.RecvStream()
diff --git a/services/store/raw/service.vdl b/services/store/raw/service.vdl
index 4ec7e2c..20f0cad 100644
--- a/services/store/raw/service.vdl
+++ b/services/store/raw/service.vdl
@@ -7,7 +7,7 @@
import (
"veyron2/storage"
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
)
const (
@@ -47,14 +47,14 @@
type Request struct {
// ResumeMarker specifies how to resume from a previous Watch call.
// See the ResumeMarker type for detailed comments.
- ResumeMarker watch.ResumeMarker
+ ResumeMarker types.ResumeMarker
}
// Store defines a raw interface for the Veyron store. Mutations can be received
// via the Watcher interface, and committed via PutMutation.
type Store interface {
// Watch returns a stream of all changes.
- Watch(Req Request) stream<_, watch.ChangeBatch> error
+ Watch(Req Request) stream<_, types.ChangeBatch> error
// PutMutations atomically commits a stream of Mutations when the stream is
// closed. Mutations are not committed if the request is cancelled before
diff --git a/services/store/raw/service.vdl.go b/services/store/raw/service.vdl.go
index 1eb0cfd..d0c755a 100644
--- a/services/store/raw/service.vdl.go
+++ b/services/store/raw/service.vdl.go
@@ -4,7 +4,7 @@
package raw
import (
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
@@ -44,7 +44,7 @@
type Request struct {
// ResumeMarker specifies how to resume from a previous Watch call.
// See the ResumeMarker type for detailed comments.
- ResumeMarker watch.ResumeMarker
+ ResumeMarker types.ResumeMarker
}
const (
@@ -101,7 +101,7 @@
// Value returns the element that was staged by Advance.
// Value may panic if Advance returned false or was not
// called at all. Value does not block.
- Value() watch.ChangeBatch
+ Value() types.ChangeBatch
// Err returns a non-nil error iff the stream encountered
// any errors. Err does not block.
@@ -129,17 +129,17 @@
type implStoreWatchStreamIterator struct {
clientCall _gen_ipc.Call
- val watch.ChangeBatch
+ val types.ChangeBatch
err error
}
func (c *implStoreWatchStreamIterator) Advance() bool {
- c.val = watch.ChangeBatch{}
+ c.val = types.ChangeBatch{}
c.err = c.clientCall.Recv(&c.val)
return c.err == nil
}
-func (c *implStoreWatchStreamIterator) Value() watch.ChangeBatch {
+func (c *implStoreWatchStreamIterator) Value() types.ChangeBatch {
return c.val
}
@@ -158,7 +158,7 @@
func (c *implStoreWatchCall) RecvStream() interface {
Advance() bool
- Value() watch.ChangeBatch
+ Value() types.ChangeBatch
Err() error
} {
return &c.readStream
@@ -179,7 +179,7 @@
serverCall _gen_ipc.ServerCall
}
-func (s *implStoreServiceWatchStreamSender) Send(item watch.ChangeBatch) error {
+func (s *implStoreServiceWatchStreamSender) Send(item types.ChangeBatch) error {
return s.serverCall.Send(item)
}
@@ -190,7 +190,7 @@
SendStream() interface {
// Send places the item onto the output stream, blocking if there is no buffer
// space available. If the client has canceled, an error is returned.
- Send(item watch.ChangeBatch) error
+ Send(item types.ChangeBatch) error
}
}
@@ -202,7 +202,7 @@
func (s *implStoreServiceWatchStream) SendStream() interface {
// Send places the item onto the output stream, blocking if there is no buffer
// space available. If the client has canceled, an error is returned.
- Send(item watch.ChangeBatch) error
+ Send(item types.ChangeBatch) error
} {
return &s.writer
}
@@ -491,7 +491,7 @@
}
result.TypeDefs = []_gen_vdlutil.Any{
- _gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "byte", Tags: []string(nil)}, _gen_wiretype.SliceType{Elem: 0x41, Name: "veyron2/services/watch.ResumeMarker", Tags: []string(nil)}, _gen_wiretype.StructType{
+ _gen_wiretype.NamedPrimitiveType{Type: 0x32, Name: "byte", Tags: []string(nil)}, _gen_wiretype.SliceType{Elem: 0x41, Name: "veyron2/services/watch/types.ResumeMarker", Tags: []string(nil)}, _gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
_gen_wiretype.FieldType{Type: 0x42, Name: "ResumeMarker"},
},
@@ -504,12 +504,12 @@
_gen_wiretype.FieldType{Type: 0x42, Name: "ResumeMarker"},
_gen_wiretype.FieldType{Type: 0x2, Name: "Continued"},
},
- "veyron2/services/watch.Change", []string(nil)},
+ "veyron2/services/watch/types.Change", []string(nil)},
_gen_wiretype.SliceType{Elem: 0x46, Name: "", Tags: []string(nil)}, _gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
_gen_wiretype.FieldType{Type: 0x47, Name: "Changes"},
},
- "veyron2/services/watch.ChangeBatch", []string(nil)},
+ "veyron2/services/watch/types.ChangeBatch", []string(nil)},
_gen_wiretype.ArrayType{Elem: 0x41, Len: 0x10, Name: "veyron2/storage.ID", Tags: []string(nil)}, _gen_wiretype.NamedPrimitiveType{Type: 0x35, Name: "veyron2/storage.Version", Tags: []string(nil)}, _gen_wiretype.StructType{
[]_gen_wiretype.FieldType{
_gen_wiretype.FieldType{Type: 0x3, Name: "Name"},
diff --git a/services/store/server/object.go b/services/store/server/object.go
index f5bef59..cb78dc2 100644
--- a/services/store/server/object.go
+++ b/services/store/server/object.go
@@ -9,9 +9,10 @@
"veyron2/ipc"
"veyron2/query"
"veyron2/services/mounttable"
- "veyron2/services/mounttable/types"
+ mttypes "veyron2/services/mounttable/types"
"veyron2/services/store"
"veyron2/services/watch"
+ watchtypes "veyron2/services/watch/types"
"veyron2/storage"
"veyron2/vdl/vdlutil"
"veyron2/verror"
@@ -141,12 +142,12 @@
type globStreamSenderAdapter struct {
stream interface {
- Send(entry types.MountEntry) error
+ Send(entry mttypes.MountEntry) error
}
}
func (a *globStreamSenderAdapter) Send(item string) error {
- return a.stream.Send(types.MountEntry{Name: item})
+ return a.stream.Send(mttypes.MountEntry{Name: item})
}
type globStreamAdapter struct {
@@ -179,11 +180,11 @@
}
// WatchGlob returns a stream of changes that match a pattern.
-func (o *object) WatchGlob(ctx ipc.ServerContext, req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
+func (o *object) WatchGlob(ctx ipc.ServerContext, req watchtypes.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
return o.server.watcher.WatchGlob(ctx, storage.ParsePath(o.name), req, stream)
}
// WatchQuery returns a stream of changes that satisfy a query.
-func (o *object) WatchQuery(ctx ipc.ServerContext, req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
+func (o *object) WatchQuery(ctx ipc.ServerContext, req watchtypes.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
return o.server.watcher.WatchQuery(ctx, storage.ParsePath(o.name), req, stream)
}
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index 69d9304..f873c9f 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -18,7 +18,7 @@
"veyron2/rt"
"veyron2/security"
"veyron2/services/store"
- "veyron2/services/watch"
+ "veyron2/services/watch/types"
"veyron2/storage"
_ "veyron2/vlog"
"veyron2/vom"
@@ -374,7 +374,7 @@
}
// Start watch requests on / and /a.
- req := watch.GlobRequest{Pattern: "..."}
+ req := types.GlobRequest{Pattern: "..."}
ws1 := watchtesting.WatchGlob(rootPublicID, o1.WatchGlob, req)
ws2 := watchtesting.WatchGlob(rootPublicID, o2.WatchGlob, req)
diff --git a/services/wsprd/lib/remove_this.go b/services/wsprd/lib/remove_this.go
index 6368202..7625014 100644
--- a/services/wsprd/lib/remove_this.go
+++ b/services/wsprd/lib/remove_this.go
@@ -4,7 +4,7 @@
rps "veyron/examples/rockpaperscissors"
mttypes "veyron2/services/mounttable/types"
"veyron2/services/store"
- "veyron2/services/watch"
+ watchtypes "veyron2/services/watch/types"
"veyron2/storage"
"veyron2/vom"
)
@@ -15,10 +15,10 @@
vom.Register(storage.Stat{})
vom.Register(store.NestedResult(0))
vom.Register(store.QueryResult{})
- vom.Register(watch.GlobRequest{})
- vom.Register(watch.QueryRequest{})
- vom.Register(watch.ChangeBatch{})
- vom.Register(watch.Change{})
+ vom.Register(watchtypes.GlobRequest{})
+ vom.Register(watchtypes.QueryRequest{})
+ vom.Register(watchtypes.ChangeBatch{})
+ vom.Register(watchtypes.Change{})
vom.Register(rps.GameOptions{})
vom.Register(rps.GameID{})
vom.Register(rps.PlayResult{})