syncbase: Refactor and expand Watch tests.
Refactored Watch tests to reduce inline boilerplate and expanded
tests to cover trickier cases.
Also fixed Watch error when checking ACLs of a Collection that no
longer exists. Expanded test to cover this case.
MultiPart: 2/2
Change-Id: Ia6f70403380e9ca234ffcd7118c4bde7709e9d0a
diff --git a/services/syncbase/server/database_watch.go b/services/syncbase/server/database_watch.go
index 2065208..fd5870d 100644
--- a/services/syncbase/server/database_watch.go
+++ b/services/syncbase/server/database_watch.go
@@ -129,12 +129,12 @@
// Check permissions.
// TODO(ivanpi): Collection scan already gets perms, optimize?
if _, err := c.checkAccess(ctx, call, sntx); err != nil {
- if verror.ErrorID(err) != verror.ErrNoAccess.ID {
- return err
+ if verror.ErrorID(err) == verror.ErrNoAccess.ID {
+ // TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
+ // this to caller.
+ continue
}
- // TODO(ivanpi): Inaccessible collections are skipped. Figure out how to
- // signal this to caller.
- continue
+ return err
}
// TODO(ivanpi): Send collection info.
// Send matching rows.
@@ -174,11 +174,11 @@
if err := sender.addChange(
naming.Join(pubutil.EncodeId(c.id), externalKey),
watch.Exists,
- vom.RawBytesOf(wire.StoreChange{
+ &wire.StoreChange{
Value: valueAsRawBytes,
// Note: FromSync cannot be reconstructed from scan.
FromSync: false,
- })); err != nil {
+ }); err != nil {
return err
}
}
@@ -272,12 +272,15 @@
// Filter out rows that we can't access.
// TODO(ivanpi): Check only once per collection per batch.
if _, err := c.checkAccess(ctx, call, sn); err != nil {
- if verror.ErrorID(err) != verror.ErrNoAccess.ID {
- return err
+ if verror.ErrorID(err) == verror.ErrNoAccess.ID || verror.ErrorID(err) == verror.ErrNoExist.ID {
+ // Note, the collection may not exist anymore, in which case permissions
+ // cannot be retrieved. This case is treated the same as ErrNoAccess, by
+ // skipping the row.
+ // TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
+ // this to caller.
+ continue
}
- // TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
- // this to caller.
- continue
+ return err
}
switch op := op.(type) {
case *watchable.PutOp:
@@ -291,18 +294,18 @@
}
if err := sender.addChange(naming.Join(pubutil.EncodeId(cxId), row),
watch.Exists,
- vom.RawBytesOf(wire.StoreChange{
+ &wire.StoreChange{
Value: rowValueAsRawBytes,
FromSync: logEntry.FromSync,
- })); err != nil {
+ }); err != nil {
return err
}
case *watchable.DeleteOp:
if err := sender.addChange(naming.Join(pubutil.EncodeId(cxId), row),
watch.DoesNotExist,
- vom.RawBytesOf(wire.StoreChange{
+ &wire.StoreChange{
FromSync: logEntry.FromSync,
- })); err != nil {
+ }); err != nil {
return err
}
}
@@ -322,7 +325,12 @@
// addChange sends the previously added change (if any) with Continued set to
// true and stages the new one to be sent by the next addChange or finishBatch.
-func (w *watchBatchSender) addChange(name string, state int32, value *vom.RawBytes) error {
+func (w *watchBatchSender) addChange(name string, state int32, storeChange *wire.StoreChange) error {
+ // Encode the StoreChange for sending.
+ storeChangeAsRawBytes, err := vom.RawBytesFromValue(*storeChange)
+ if err != nil {
+ return verror.New(verror.ErrInternal, nil, err)
+ }
// Send previously staged change now that we know the batch is continuing.
if w.staged != nil {
w.staged.Continued = true
@@ -334,7 +342,7 @@
w.staged = &watch.Change{
Name: name,
State: state,
- Value: value,
+ Value: storeChangeAsRawBytes,
}
return nil
}
diff --git a/services/syncbase/testutil/util.go b/services/syncbase/testutil/util.go
index 9c848ee..06b52fd 100644
--- a/services/syncbase/testutil/util.go
+++ b/services/syncbase/testutil/util.go
@@ -19,6 +19,7 @@
"v.io/v23/security"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
+ "v.io/v23/services/watch"
"v.io/v23/syncbase"
"v.io/v23/syncbase/util"
"v.io/v23/vdl"
@@ -202,6 +203,31 @@
ValueBytes *vom.RawBytes
}
+func WatchChangeTestRowPut(cxId wire.Id, rowKey string, value interface{}, resumeMarker watch.ResumeMarker) WatchChangeTest {
+ return WatchChangeTest{
+ WatchChange: syncbase.WatchChange{
+ Collection: cxId,
+ Row: rowKey,
+ ChangeType: syncbase.PutChange,
+ ResumeMarker: resumeMarker,
+ Continued: (resumeMarker == nil),
+ },
+ ValueBytes: vom.RawBytesOf(value),
+ }
+}
+
+func WatchChangeTestRowDelete(cxId wire.Id, rowKey string, resumeMarker watch.ResumeMarker) WatchChangeTest {
+ return WatchChangeTest{
+ WatchChange: syncbase.WatchChange{
+ Collection: cxId,
+ Row: rowKey,
+ ChangeType: syncbase.DeleteChange,
+ ResumeMarker: resumeMarker,
+ Continued: (resumeMarker == nil),
+ },
+ }
+}
+
// WatchChangeEq returns whether *want and *got represent the same value.
func WatchChangeEq(got *syncbase.WatchChange, want *WatchChangeTest) (eq bool) {
if want.Collection == got.Collection &&
@@ -233,7 +259,7 @@
Fatalf(t, "wstream.Advance() reached the end: %v", wstream.Err())
}
if got := wstream.Change(); !WatchChangeEq(&got, &want) {
- Fatalf(t, "unexpected watch change: got %v, want %v", got, want)
+ Fatalf(t, "unexpected watch change: got %+v, want %+v", got, want)
}
}
}