syncbase: Send initial root update in Watch.

An empty update for the root entity is sent as the first update on the
watch stream.
* For watches from an empty resume marker, this update is propagated to
the client as part of the initial state batch. It is necessary because
the initial state batch may not contain other updates (if the watch
filter matches no collections) and the client needs to be notified of
this. It is intended to be filtered out by the high-level API when
the initial state batch is being separated out.
* For watches from 'now' or a specific resume marker, the initial update
is sent with initial_state_skipped for compliance with
v.io/v23/services/watch. It is filtered out by the client library
instead of propagating to the user.

Initial update was hooked up end-to-end in Go. Only row updates are
propagated in Java and Cgo for now.

Errors in watch code have also been scrubbed to prevent leaking info
to the client before the authorization step.

MultiPart: 1/4
Change-Id: I720dee79c8d4a69ffb4e99576c0fd67af7c030ca
diff --git a/syncbase/.api b/syncbase/.api
index e436b24..29d1b43 100644
--- a/syncbase/.api
+++ b/syncbase/.api
@@ -9,7 +9,7 @@
 pkg syncbase, func Range(string, string) RowRange
 pkg syncbase, func RunInBatch(*context.T, Database, wire.BatchOptions, func(BatchDatabase) error) error
 pkg syncbase, func SingleRow(string) RowRange
-pkg syncbase, func ToWatchChange(watch.Change) WatchChange
+pkg syncbase, func ToWatchChange(watch.Change) *WatchChange
 pkg syncbase, func UUID() string
 pkg syncbase, method (*Conflict) VDLRead(vdl.Decoder) error
 pkg syncbase, method (*ConflictRow) VDLRead(vdl.Decoder) error
diff --git a/syncbase/client_test.go b/syncbase/client_test.go
index 9b8dbd4..ccea852 100644
--- a/syncbase/client_test.go
+++ b/syncbase/client_test.go
@@ -674,6 +674,7 @@
 	// Admin watch with empty prefix should have seen the full initial state as
 	// one batch.
 	tu.CheckWatch(t, wstreamAllAdmin, []tu.WatchChangeTest{
+		tu.WatchChangeTestRootPut(nil),
 		tu.WatchChangeTestRowPut(ch.Id(), "b/1", "value", nil),
 		tu.WatchChangeTestRowPut(cp.Id(), "a/1", "value", nil),
 		tu.WatchChangeTestRowPut(cp.Id(), "c/1", "value", resumeMarkerInitial),
@@ -681,9 +682,14 @@
 	// Watch with empty prefix should have seen the initial state as one batch,
 	// omitting the row in the hidden collection.
 	tu.CheckWatch(t, wstreamAll, []tu.WatchChangeTest{
+		tu.WatchChangeTestRootPut(nil),
 		tu.WatchChangeTestRowPut(cp.Id(), "a/1", "value", nil),
 		tu.WatchChangeTestRowPut(cp.Id(), "c/1", "value", resumeMarkerInitial),
 	})
+	// Watch with prefix "d" should have seen only the initial root update.
+	tu.CheckWatch(t, wstreamD, []tu.WatchChangeTest{
+		tu.WatchChangeTestRootPut(resumeMarkerInitial),
+	})
 
 	// === 2) More writes ===
 	// -> Put cp:"a/2" and ch:"b/2" in a batch.
@@ -949,16 +955,19 @@
 	}
 
 	tu.CheckWatch(t, wstream1, []tu.WatchChangeTest{
+		tu.WatchChangeTestRootPut(nil),
 		tu.WatchChangeTestRowPut(cAFoobar.Id(), "a", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoobar.Id(), "abc", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "abcd", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "cd", "value", resumeMarkerInitial),
 	})
 	tu.CheckWatch(t, wstream2, []tu.WatchChangeTest{
+		tu.WatchChangeTestRootPut(nil),
 		tu.WatchChangeTestRowPut(cBFoo.Id(), "ef", "value", nil),
 		tu.WatchChangeTestRowPut(cBFoo.Id(), "efg", "value", resumeMarkerInitial),
 	})
 	tu.CheckWatch(t, wstream3, []tu.WatchChangeTest{
+		tu.WatchChangeTestRootPut(nil),
 		tu.WatchChangeTestRowPut(cBFoo.Id(), "ab", "value", nil),
 		tu.WatchChangeTestRowPut(cBFoo.Id(), "x\\yz", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoobar.Id(), "abc", "value", nil),
diff --git a/syncbase/watch_stream.go b/syncbase/watch_stream.go
index 5059b9b..82b7052 100644
--- a/syncbase/watch_stream.go
+++ b/syncbase/watch_stream.go
@@ -47,16 +47,21 @@
 	if s.finished {
 		return false
 	}
-	// Advance never blocks if the context has been cancelled.
-	if !s.call.RecvStream().Advance() {
-		s.err = s.call.Finish()
-		s.cancel()
-		s.finished = true
-		return false
+	for {
+		// Advance never blocks if the context has been cancelled.
+		if !s.call.RecvStream().Advance() {
+			s.err = s.call.Finish()
+			s.cancel()
+			s.finished = true
+			return false
+		}
+		// Skip initial-state-skipped changes.
+		if s.call.RecvStream().Value().State == watch.InitialStateSkipped {
+			continue
+		}
+		s.curr = ToWatchChange(s.call.RecvStream().Value())
+		return true
 	}
-	watchChange := ToWatchChange(s.call.RecvStream().Value())
-	s.curr = &watchChange
-	return true
 }
 
 // Change implements WatchStream interface.
@@ -93,38 +98,42 @@
 // ToWatchChange converts a generic Change struct as defined in
 // v.io/v23/services/watch to a Syncbase-specific WatchChange struct as defined
 // in v.io/v23/syncbase.
-func ToWatchChange(c watch.Change) WatchChange {
-	// Parse the collection and the row.
-	collection, row, err := util.ParseCollectionRowPair(nil, c.Name)
-	if err != nil {
-		panic(err)
-	}
-	if row == "" {
-		panic("empty row name")
-	}
+func ToWatchChange(c watch.Change) *WatchChange {
+	// TODO(ivanpi): Store the errors in err instead of panicking.
 	// Parse the store change.
 	var storeChange wire.StoreChange
 	if err := c.Value.ToValue(&storeChange); err != nil {
-		panic(fmt.Errorf("ToValue failed: %v, RawBytes: %#v", err, c.Value))
+		panic(fmt.Errorf("ToValue StoreChange failed: %v, RawBytes: %#v", err, c.Value))
 	}
-	// Parse the state.
-	var changeType ChangeType
+	res := &WatchChange{
+		value:        storeChange.Value,
+		FromSync:     storeChange.FromSync,
+		Continued:    c.Continued,
+		ResumeMarker: c.ResumeMarker,
+	}
+	// Convert the state.
 	switch c.State {
 	case watch.Exists:
-		changeType = PutChange
+		res.ChangeType = PutChange
 	case watch.DoesNotExist:
-		changeType = DeleteChange
+		res.ChangeType = DeleteChange
 	default:
 		panic(fmt.Sprintf("unsupported watch change state: %v", c.State))
 	}
-	return WatchChange{
-		EntityType:   EntityRow,
-		Collection:   collection,
-		Row:          row,
-		ChangeType:   changeType,
-		value:        storeChange.Value,
-		ResumeMarker: c.ResumeMarker,
-		FromSync:     storeChange.FromSync,
-		Continued:    c.Continued,
+	var err error
+	// Parse the name and determine the entity type.
+	if c.Name == "" {
+		res.EntityType = EntityRoot
+		// No other fields need to be set for EntityRoot.
+	} else {
+		res.EntityType = EntityRow
+		// Parse the collection id and row key.
+		if res.Collection, res.Row, err = util.ParseCollectionRowPair(nil, c.Name); err != nil {
+			panic(err)
+		}
+		if res.Row == "" {
+			panic("empty row name")
+		}
 	}
+	return res
 }