syncbase: Refactor and expand Watch tests.

Refactored Watch tests to reduce inline boilerplate and expanded
tests to cover trickier cases.
Also fixed Watch error when checking ACLs of a Collection that no
longer exists. Expanded test to cover this case.

MultiPart: 2/2
Change-Id: Ia6f70403380e9ca234ffcd7118c4bde7709e9d0a
diff --git a/services/syncbase/server/database_watch.go b/services/syncbase/server/database_watch.go
index 2065208..fd5870d 100644
--- a/services/syncbase/server/database_watch.go
+++ b/services/syncbase/server/database_watch.go
@@ -129,12 +129,12 @@
 		// Check permissions.
 		// TODO(ivanpi): Collection scan already gets perms, optimize?
 		if _, err := c.checkAccess(ctx, call, sntx); err != nil {
-			if verror.ErrorID(err) != verror.ErrNoAccess.ID {
-				return err
+			if verror.ErrorID(err) == verror.ErrNoAccess.ID {
+				// TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
+				// this to caller.
+				continue
 			}
-			// TODO(ivanpi): Inaccessible collections are skipped. Figure out how to
-			// signal this to caller.
-			continue
+			return err
 		}
 		// TODO(ivanpi): Send collection info.
 		// Send matching rows.
@@ -174,11 +174,11 @@
 		if err := sender.addChange(
 			naming.Join(pubutil.EncodeId(c.id), externalKey),
 			watch.Exists,
-			vom.RawBytesOf(wire.StoreChange{
+			&wire.StoreChange{
 				Value: valueAsRawBytes,
 				// Note: FromSync cannot be reconstructed from scan.
 				FromSync: false,
-			})); err != nil {
+			}); err != nil {
 			return err
 		}
 	}
@@ -272,12 +272,15 @@
 		// Filter out rows that we can't access.
 		// TODO(ivanpi): Check only once per collection per batch.
 		if _, err := c.checkAccess(ctx, call, sn); err != nil {
-			if verror.ErrorID(err) != verror.ErrNoAccess.ID {
-				return err
+			if verror.ErrorID(err) == verror.ErrNoAccess.ID || verror.ErrorID(err) == verror.ErrNoExist.ID {
+				// Note, the collection may not exist anymore, in which case permissions
+				// cannot be retrieved. This case is treated the same as ErrNoAccess, by
+				// skipping the row.
+				// TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
+				// this to caller.
+				continue
 			}
-			// TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
-			// this to caller.
-			continue
+			return err
 		}
 		switch op := op.(type) {
 		case *watchable.PutOp:
@@ -291,18 +294,18 @@
 			}
 			if err := sender.addChange(naming.Join(pubutil.EncodeId(cxId), row),
 				watch.Exists,
-				vom.RawBytesOf(wire.StoreChange{
+				&wire.StoreChange{
 					Value:    rowValueAsRawBytes,
 					FromSync: logEntry.FromSync,
-				})); err != nil {
+				}); err != nil {
 				return err
 			}
 		case *watchable.DeleteOp:
 			if err := sender.addChange(naming.Join(pubutil.EncodeId(cxId), row),
 				watch.DoesNotExist,
-				vom.RawBytesOf(wire.StoreChange{
+				&wire.StoreChange{
 					FromSync: logEntry.FromSync,
-				})); err != nil {
+				}); err != nil {
 				return err
 			}
 		}
@@ -322,7 +325,12 @@
 
 // addChange sends the previously added change (if any) with Continued set to
 // true and stages the new one to be sent by the next addChange or finishBatch.
-func (w *watchBatchSender) addChange(name string, state int32, value *vom.RawBytes) error {
+func (w *watchBatchSender) addChange(name string, state int32, storeChange *wire.StoreChange) error {
+	// Encode the StoreChange for sending.
+	storeChangeAsRawBytes, err := vom.RawBytesFromValue(*storeChange)
+	if err != nil {
+		return verror.New(verror.ErrInternal, nil, err)
+	}
 	// Send previously staged change now that we know the batch is continuing.
 	if w.staged != nil {
 		w.staged.Continued = true
@@ -334,7 +342,7 @@
 	w.staged = &watch.Change{
 		Name:  name,
 		State: state,
-		Value: value,
+		Value: storeChangeAsRawBytes,
 	}
 	return nil
 }
diff --git a/services/syncbase/testutil/util.go b/services/syncbase/testutil/util.go
index 9c848ee..06b52fd 100644
--- a/services/syncbase/testutil/util.go
+++ b/services/syncbase/testutil/util.go
@@ -19,6 +19,7 @@
 	"v.io/v23/security"
 	"v.io/v23/security/access"
 	wire "v.io/v23/services/syncbase"
+	"v.io/v23/services/watch"
 	"v.io/v23/syncbase"
 	"v.io/v23/syncbase/util"
 	"v.io/v23/vdl"
@@ -202,6 +203,31 @@
 	ValueBytes *vom.RawBytes
 }
 
+func WatchChangeTestRowPut(cxId wire.Id, rowKey string, value interface{}, resumeMarker watch.ResumeMarker) WatchChangeTest {
+	return WatchChangeTest{
+		WatchChange: syncbase.WatchChange{
+			Collection:   cxId,
+			Row:          rowKey,
+			ChangeType:   syncbase.PutChange,
+			ResumeMarker: resumeMarker,
+			Continued:    (resumeMarker == nil),
+		},
+		ValueBytes: vom.RawBytesOf(value),
+	}
+}
+
+func WatchChangeTestRowDelete(cxId wire.Id, rowKey string, resumeMarker watch.ResumeMarker) WatchChangeTest {
+	return WatchChangeTest{
+		WatchChange: syncbase.WatchChange{
+			Collection:   cxId,
+			Row:          rowKey,
+			ChangeType:   syncbase.DeleteChange,
+			ResumeMarker: resumeMarker,
+			Continued:    (resumeMarker == nil),
+		},
+	}
+}
+
 // WatchChangeEq returns whether *want and *got represent the same value.
 func WatchChangeEq(got *syncbase.WatchChange, want *WatchChangeTest) (eq bool) {
 	if want.Collection == got.Collection &&
@@ -233,7 +259,7 @@
 			Fatalf(t, "wstream.Advance() reached the end: %v", wstream.Err())
 		}
 		if got := wstream.Change(); !WatchChangeEq(&got, &want) {
-			Fatalf(t, "unexpected watch change: got %v, want %v", got, want)
+			Fatalf(t, "unexpected watch change: got %+v, want %+v", got, want)
 		}
 	}
 }