syncbase: Send initial root update in Watch.

An empty update for the root entity is sent as the first update on the
watch stream.
* For watches from an empty resume marker, this update is propagated to
the client as part of the initial state batch. It is necessary because
the initial state batch may not contain other updates (if the watch
filter matches no collections) and the client needs to be notified of
this. It is intended to be filtered out by the high-level API when
the initial state batch is being separated out.
* For watches from 'now' or a specific resume marker, the initial update
is sent with initial_state_skipped for compliance with
v.io/v23/services/watch. It is filtered out by the client library
instead of propagating to the user.

Initial update was hooked up end-to-end in Go. Only row updates are
propagated in Java and Cgo for now.

Errors in watch code have also been scrubbed to prevent leaking info
to the client before the authorization step.

MultiPart: 2/4
Change-Id: Iddd43cf73742e14ca8f66818016d6e3ef890b28c
diff --git a/services/syncbase/bridge/cgo/impl.go b/services/syncbase/bridge/cgo/impl.go
index b7c232c..aa78bed 100644
--- a/services/syncbase/bridge/cgo/impl.go
+++ b/services/syncbase/bridge/cgo/impl.go
@@ -550,10 +550,18 @@
 	if !ok {
 		return verror.NewErrInternal(s.ctx)
 	}
+	if wireWC.State == watch.InitialStateSkipped {
+		return nil
+	}
+	wc := syncbase.ToWatchChange(wireWC)
+	if wc.EntityType != syncbase.EntityRow {
+		// TODO(ivanpi): Pipe collection and initial changes through.
+		return nil
+	}
 	// C.CallDbWatchPatternsCallbacksOnChange() blocks until the client acks the
 	// previous invocation, thus providing flow control.
 	cWatchChange := C.v23_syncbase_WatchChange{}
-	cWatchChange.init(syncbase.ToWatchChange(wireWC))
+	cWatchChange.init(*wc)
 	C.CallDbWatchPatternsCallbacksOnChange(s.cbs, cWatchChange)
 	return nil
 }
diff --git a/services/syncbase/longevity_tests/client/watcher_test.go b/services/syncbase/longevity_tests/client/watcher_test.go
index c6bbb96..bc73216 100644
--- a/services/syncbase/longevity_tests/client/watcher_test.go
+++ b/services/syncbase/longevity_tests/client/watcher_test.go
@@ -33,8 +33,10 @@
 		OnChange: func(watchChange syncbase.WatchChange) {
 			mu.Lock()
 			defer mu.Unlock()
-			gotRows = append(gotRows, watchChange.Row)
-			wg.Done()
+			if watchChange.EntityType == syncbase.EntityRow {
+				gotRows = append(gotRows, watchChange.Row)
+				wg.Done()
+			}
 		},
 	}
 
diff --git a/services/syncbase/longevity_tests/control/control_test.go b/services/syncbase/longevity_tests/control/control_test.go
index 9a10020..f5d2523 100644
--- a/services/syncbase/longevity_tests/control/control_test.go
+++ b/services/syncbase/longevity_tests/control/control_test.go
@@ -328,9 +328,11 @@
 	control.RegisterClient("test-watcher", func() client.Client {
 		return &client.Watcher{
 			OnChange: func(wc syncbase.WatchChange) {
-				changesReceived++
-				if changesReceived == 5 {
-					mu.Unlock()
+				if wc.EntityType == syncbase.EntityRow {
+					changesReceived++
+					if changesReceived == 5 {
+						mu.Unlock()
+					}
 				}
 			},
 		}
@@ -381,9 +383,11 @@
 	control.RegisterClient("test-watcher", func() client.Client {
 		return &client.Watcher{
 			OnChange: func(wc syncbase.WatchChange) {
-				changesReceived++
-				if changesReceived == 5 {
-					mu.Unlock()
+				if wc.EntityType == syncbase.EntityRow {
+					changesReceived++
+					if changesReceived == 5 {
+						mu.Unlock()
+					}
 				}
 			},
 		}
diff --git a/services/syncbase/server/database_watch.go b/services/syncbase/server/database_watch.go
index fd5870d..991bfd1 100644
--- a/services/syncbase/server/database_watch.go
+++ b/services/syncbase/server/database_watch.go
@@ -70,24 +70,35 @@
 	}
 	initImpl := func(sntx store.SnapshotOrTransaction) error {
 		// TODO(ivanpi): Check permissions here.
-		// Get the resume marker and fetch the initial state if necessary.
-		if len(resumeMarker) == 0 {
+		needInitialState := len(resumeMarker) == 0
+		needResumeMarker := needInitialState || bytes.Equal(resumeMarker, []byte("now"))
+		// Get the resume marker if necessary.
+		if needResumeMarker {
 			var err error
 			if resumeMarker, err = watchable.GetResumeMarker(sntx); err != nil {
 				return err
 			}
-			// Send initial state.
-			if err = d.scanInitialState(ctx, call, sender, sntx, watchFilter); err != nil {
-				return err
-			}
-		} else if bytes.Equal(resumeMarker, []byte("now")) {
-			var err error
-			// TODO(ivanpi): Add initial_state_skipped change.
-			if resumeMarker, err = watchable.GetResumeMarker(sntx); err != nil {
-				return err
-			}
 		}
-		// Finalize initial state batch if necessary.
+		// Send the root update to notify the client that watch has started.
+		rootChangeState := watch.InitialStateSkipped
+		if needInitialState {
+			rootChangeState = watch.Exists
+		}
+		if err := sender.addChange(
+			"",
+			rootChangeState,
+			&wire.StoreChange{
+				FromSync: false,
+			}); err != nil {
+			return err
+		}
+		// Send initial state if necessary.
+		if needInitialState {
+			if err := d.scanInitialState(ctx, call, sender, sntx, watchFilter); err != nil {
+				return err
+			}
+		}
+		// Finalize initial state or root update batch.
 		return sender.finishBatch(resumeMarker)
 	}
 	if err := store.RunWithSnapshot(d.st, initImpl); err != nil {
@@ -99,8 +110,6 @@
 // scanInitialState sends the initial state of all matching and accessible
 // collections and rows in the database. Checks access on collections, but
 // not on database.
-// TODO(ivanpi): Send dummy update for empty prefix to be compatible with
-// v.io/v23/services/watch.
 // TODO(ivanpi): Abstract out multi-scan for scan and possibly query support.
 // TODO(ivanpi): Use watch pattern prefixes to optimize scan ranges.
 func (d *database) scanInitialState(ctx *context.T, call rpc.ServerCall, sender *watchBatchSender, sntx store.SnapshotOrTransaction, watchFilter filter.CollectionRowFilter) error {
@@ -126,7 +135,7 @@
 			id: cxId,
 			d:  d,
 		}
-		// Check permissions.
+		// Check permissions for row access.
 		// TODO(ivanpi): Collection scan already gets perms, optimize?
 		if _, err := c.checkAccess(ctx, call, sntx); err != nil {
 			if verror.ErrorID(err) == verror.ErrNoAccess.ID {
@@ -169,7 +178,7 @@
 		// Send row.
 		var valueAsRawBytes *vom.RawBytes
 		if err := vom.Decode(value, &valueAsRawBytes); err != nil {
-			return err
+			return verror.New(verror.ErrInternal, ctx, err)
 		}
 		if err := sender.addChange(
 			naming.Join(pubutil.EncodeId(c.id), externalKey),
@@ -205,7 +214,8 @@
 			// a time, and would need to be updated as well.
 			logs, nextResumeMarker, err := watchable.ReadBatchFromLog(d.st, resumeMarker)
 			if err != nil {
-				return err
+				// TODO(ivanpi): Log all internal errors, especially ones not returned.
+				return verror.NewErrInternal(ctx) // no detailed error before access check
 			}
 			if logs == nil {
 				// No new log records available at this time.
@@ -241,11 +251,12 @@
 	defer sn.Abort()
 	// TODO(ivanpi): Recheck database perms here and fail, or cache for collection
 	// access checks.
+	valueBytes := []byte{}
 	for _, logEntry := range logs {
 		var opKey string
 		var op interface{}
 		if err := logEntry.Op.ToValue(&op); err != nil {
-			return err
+			return verror.NewErrInternal(ctx) // no detailed error before access check
 		}
 		switch op := op.(type) {
 		case *watchable.PutOp:
@@ -260,7 +271,10 @@
 		if !common.IsRowKey(opKey) {
 			continue
 		}
-		cxId, row := common.ParseRowKeyOrDie(opKey)
+		cxId, row, err := common.ParseRowKey(opKey)
+		if err != nil {
+			return verror.NewErrInternal(ctx) // no detailed error before access check
+		}
 		// Filter out unnecessary rows.
 		if !watchFilter.RowMatches(cxId, row) {
 			continue
@@ -276,6 +290,8 @@
 				// Note, the collection may not exist anymore, in which case permissions
 				// cannot be retrieved. This case is treated the same as ErrNoAccess, by
 				// skipping the row.
+				// TODO(ivanpi): Consider using the implicit ACL instead for nonexistent
+				// collections.
 				// TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
 				// this to caller.
 				continue
@@ -284,15 +300,18 @@
 		}
 		switch op := op.(type) {
 		case *watchable.PutOp:
-			rowValue, err := watchable.GetAtVersion(ctx, sn, op.Key, nil, op.Version)
-			if err != nil {
-				return err
+			// Note, valueBytes is reused on each iteration, so the reference must not
+			// be used beyond this case block. The code below is safe since only the
+			// VOM-decoded copy is used after the call to vom.Decode.
+			if valueBytes, err = watchable.GetAtVersion(ctx, sn, op.Key, valueBytes, op.Version); err != nil {
+				return verror.New(verror.ErrInternal, ctx, err)
 			}
 			var rowValueAsRawBytes *vom.RawBytes
-			if err := vom.Decode(rowValue, &rowValueAsRawBytes); err != nil {
-				return err
+			if err := vom.Decode(valueBytes, &rowValueAsRawBytes); err != nil {
+				return verror.New(verror.ErrInternal, ctx, err)
 			}
-			if err := sender.addChange(naming.Join(pubutil.EncodeId(cxId), row),
+			if err := sender.addChange(
+				naming.Join(pubutil.EncodeId(cxId), row),
 				watch.Exists,
 				&wire.StoreChange{
 					Value:    rowValueAsRawBytes,
@@ -301,7 +320,8 @@
 				return err
 			}
 		case *watchable.DeleteOp:
-			if err := sender.addChange(naming.Join(pubutil.EncodeId(cxId), row),
+			if err := sender.addChange(
+				naming.Join(pubutil.EncodeId(cxId), row),
 				watch.DoesNotExist,
 				&wire.StoreChange{
 					FromSync: logEntry.FromSync,
diff --git a/services/syncbase/testutil/util.go b/services/syncbase/testutil/util.go
index 06b52fd..875f97b 100644
--- a/services/syncbase/testutil/util.go
+++ b/services/syncbase/testutil/util.go
@@ -203,9 +203,21 @@
 	ValueBytes *vom.RawBytes
 }
 
+func WatchChangeTestRootPut(resumeMarker watch.ResumeMarker) WatchChangeTest {
+	return WatchChangeTest{
+		WatchChange: syncbase.WatchChange{
+			EntityType:   syncbase.EntityRoot,
+			ChangeType:   syncbase.PutChange,
+			ResumeMarker: resumeMarker,
+			Continued:    (resumeMarker == nil),
+		},
+	}
+}
+
 func WatchChangeTestRowPut(cxId wire.Id, rowKey string, value interface{}, resumeMarker watch.ResumeMarker) WatchChangeTest {
 	return WatchChangeTest{
 		WatchChange: syncbase.WatchChange{
+			EntityType:   syncbase.EntityRow,
 			Collection:   cxId,
 			Row:          rowKey,
 			ChangeType:   syncbase.PutChange,
@@ -219,6 +231,7 @@
 func WatchChangeTestRowDelete(cxId wire.Id, rowKey string, resumeMarker watch.ResumeMarker) WatchChangeTest {
 	return WatchChangeTest{
 		WatchChange: syncbase.WatchChange{
+			EntityType:   syncbase.EntityRow,
 			Collection:   cxId,
 			Row:          rowKey,
 			ChangeType:   syncbase.DeleteChange,
@@ -230,7 +243,8 @@
 
 // WatchChangeEq returns whether *want and *got represent the same value.
 func WatchChangeEq(got *syncbase.WatchChange, want *WatchChangeTest) (eq bool) {
-	if want.Collection == got.Collection &&
+	if want.EntityType == got.EntityType &&
+		want.Collection == got.Collection &&
 		want.Row == got.Row &&
 		want.ChangeType == got.ChangeType &&
 		bytes.Equal(want.ResumeMarker, got.ResumeMarker) &&
@@ -240,12 +254,17 @@
 		if want.ChangeType == syncbase.DeleteChange {
 			eq = true
 		} else {
-			var wantValue interface{}
-			var gotValue interface{}
-			gotErr := got.Value(&gotValue)
-			wantErr := want.ValueBytes.ToValue(&wantValue)
-			eq = ((gotErr == nil) == (wantErr == nil)) &&
-				reflect.DeepEqual(gotValue, wantValue)
+			switch want.EntityType {
+			case syncbase.EntityRow:
+				var wantValue interface{}
+				var gotValue interface{}
+				gotErr := got.Value(&gotValue)
+				wantErr := want.ValueBytes.ToValue(&wantValue)
+				eq = ((gotErr == nil) == (wantErr == nil)) &&
+					reflect.DeepEqual(gotValue, wantValue)
+			default:
+				eq = true
+			}
 		}
 	}
 	return eq