syncbase: Expose Collection ACLs in Watch.

Implement API from v.io/c/23042 .
Collection creations and permissions changes are propagated to the
client with appropriate CollectionInfo, as well as collection destroys.

Collection updates were hooked up end-to-end in Go. Only row updates are
propagated in Java and Cgo for now.

Also factored out collection perms key parsing.

MultiPart: 1/2
Change-Id: Ibe05257f1b96e6eb08f7c3090924bb791bb8a919
diff --git a/syncbase/client_test.go b/syncbase/client_test.go
index ccea852..ec07050 100644
--- a/syncbase/client_test.go
+++ b/syncbase/client_test.go
@@ -675,20 +675,27 @@
 	// one batch.
 	tu.CheckWatch(t, wstreamAllAdmin, []tu.WatchChangeTest{
 		tu.WatchChangeTestRootPut(nil),
+		tu.WatchChangeTestCollectionPut(ch.Id(), wire.AllCollectionTags, adminAcl, nil),
 		tu.WatchChangeTestRowPut(ch.Id(), "b/1", "value", nil),
+		tu.WatchChangeTestCollectionPut(cp.Id(), wire.AllCollectionTags, openAcl, nil),
 		tu.WatchChangeTestRowPut(cp.Id(), "a/1", "value", nil),
 		tu.WatchChangeTestRowPut(cp.Id(), "c/1", "value", resumeMarkerInitial),
 	})
 	// Watch with empty prefix should have seen the initial state as one batch,
-	// omitting the row in the hidden collection.
+	// omitting the row in the hidden collection and seeing no perms on it.
 	tu.CheckWatch(t, wstreamAll, []tu.WatchChangeTest{
 		tu.WatchChangeTestRootPut(nil),
+		tu.WatchChangeTestCollectionPut(ch.Id(), nil, nil, nil),
+		// Skip row "b/1" in hidden collection.
+		tu.WatchChangeTestCollectionPut(cp.Id(), wire.AllCollectionTags, openAcl, nil),
 		tu.WatchChangeTestRowPut(cp.Id(), "a/1", "value", nil),
 		tu.WatchChangeTestRowPut(cp.Id(), "c/1", "value", resumeMarkerInitial),
 	})
-	// Watch with prefix "d" should have seen only the initial root update.
+	// Watch with prefix "d" should have seen only the initial root update and
+	// public collection change.
 	tu.CheckWatch(t, wstreamD, []tu.WatchChangeTest{
-		tu.WatchChangeTestRootPut(resumeMarkerInitial),
+		tu.WatchChangeTestRootPut(nil),
+		tu.WatchChangeTestCollectionPut(cp.Id(), wire.AllCollectionTags, openAcl, resumeMarkerInitial),
 	})
 
 	// === 2) More writes ===
@@ -768,17 +775,20 @@
 	}
 
 	// Check 3) Writes and permissions change
+	// Admin batch no longer has Read access on the hidden collection, so it sees
+	// only the ACL update. The ACL is checked when scanning the log after commit,
+	// so the new ACL applies to the entire batch.
+	tu.CheckWatch(t, wstreamAllAdmin, []tu.WatchChangeTest{
+		tu.WatchChangeTestCollectionPut(ch.Id(), []access.Tag{access.Write, access.Admin}, newPerms, resumeMarkerAfterB3BspB4),
+	})
 	// Watch with empty prefix should have seen the continued changes since it has
 	// Read access on the hidden collection now. The ACL is checked when scanning
 	// the log after commit, so the new ACL applies to the entire batch.
 	tu.CheckWatch(t, wstreamAll, []tu.WatchChangeTest{
 		tu.WatchChangeTestRowPut(ch.Id(), "b/3", "value", nil),
+		tu.WatchChangeTestCollectionPut(ch.Id(), []access.Tag{access.Read, access.Admin}, newPerms, nil),
 		tu.WatchChangeTestRowPut(ch.Id(), "b/4", "value", resumeMarkerAfterB3BspB4),
 	})
-	// Admin batch no longer has Read access on the hidden collection, so it sees
-	// nothing (this is checked later, when there are more rows to see). The ACL
-	// is checked when scanning the log after commit, so the new ACL applies to
-	// the entire batch.
 
 	// === 4) Writes and collection destroy ===
 	// -> Put ch:"b/5", destroy ch, put cp:"a/3" in a batch.
@@ -809,15 +819,16 @@
 	}
 
 	// Check 4) Writes and collection destroy
-	// Both watches do not see any of the hidden collection changes since the
+	// Both watches do not see any of the hidden collection rows since the
 	// collection has been destroyed and there is no ACL to check against. They
-	// see only the public collection changes.
+	// see only the public collection rows and the hidden collection destroy.
 	finalChangesBoth := []tu.WatchChangeTest{
+		tu.WatchChangeTestCollectionDelete(ch.Id(), nil),
 		tu.WatchChangeTestRowPut(cp.Id(), "a/3", "value", resumeMarkerAfterB5BdA3),
 		tu.WatchChangeTestRowDelete(cp.Id(), "a/3", resumeMarkerAfterA3d),
 	}
-	tu.CheckWatch(t, wstreamAll, finalChangesBoth)
 	tu.CheckWatch(t, wstreamAllAdmin, finalChangesBoth)
+	tu.CheckWatch(t, wstreamAll, finalChangesBoth)
 
 	wstreamAllAdmin.Cancel()
 	wstreamD.Cancel()
@@ -956,22 +967,28 @@
 
 	tu.CheckWatch(t, wstream1, []tu.WatchChangeTest{
 		tu.WatchChangeTestRootPut(nil),
+		tu.WatchChangeTestCollectionPut(cAFoobar.Id(), wire.AllCollectionTags, cxPerms, nil),
 		tu.WatchChangeTestRowPut(cAFoobar.Id(), "a", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoobar.Id(), "abc", "value", nil),
+		tu.WatchChangeTestCollectionPut(cAFoo.Id(), wire.AllCollectionTags, cxPerms, nil),
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "abcd", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "cd", "value", resumeMarkerInitial),
 	})
 	tu.CheckWatch(t, wstream2, []tu.WatchChangeTest{
 		tu.WatchChangeTestRootPut(nil),
+		tu.WatchChangeTestCollectionPut(cBFoo.Id(), wire.AllCollectionTags, cxPerms, nil),
 		tu.WatchChangeTestRowPut(cBFoo.Id(), "ef", "value", nil),
 		tu.WatchChangeTestRowPut(cBFoo.Id(), "efg", "value", resumeMarkerInitial),
 	})
 	tu.CheckWatch(t, wstream3, []tu.WatchChangeTest{
 		tu.WatchChangeTestRootPut(nil),
+		tu.WatchChangeTestCollectionPut(cBFoo.Id(), wire.AllCollectionTags, cxPerms, nil),
 		tu.WatchChangeTestRowPut(cBFoo.Id(), "ab", "value", nil),
 		tu.WatchChangeTestRowPut(cBFoo.Id(), "x\\yz", "value", nil),
+		tu.WatchChangeTestCollectionPut(cAFoobar.Id(), wire.AllCollectionTags, cxPerms, nil),
 		tu.WatchChangeTestRowPut(cAFoobar.Id(), "abc", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoobar.Id(), "cd", "value", nil),
+		tu.WatchChangeTestCollectionPut(cAFoo.Id(), wire.AllCollectionTags, cxPerms, nil),
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "abc", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "abcd", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "cd", "value", resumeMarkerInitial),
@@ -1024,6 +1041,7 @@
 	tu.CheckWatch(t, wstream1, []tu.WatchChangeTest{
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "abcd", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoobar.Id(), "abcd", "value", resumeMarkerAfterBatch1),
+		tu.WatchChangeTestCollectionPut(cNewId, wire.AllCollectionTags, cxPerms, nil),
 		tu.WatchChangeTestRowPut(cNewId, "acd", "value", nil),
 		tu.WatchChangeTestRowPut(cNewId, "abcd", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "bcd", "value", resumeMarkerAfterBatch2),
@@ -1034,6 +1052,7 @@
 	tu.CheckWatch(t, wstream3, []tu.WatchChangeTest{
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "abcd", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoobar.Id(), "abcd", "value", resumeMarkerAfterBatch1),
+		tu.WatchChangeTestCollectionPut(cNewId, wire.AllCollectionTags, cxPerms, nil),
 		tu.WatchChangeTestRowPut(cNewId, "abcd", "value", nil),
 		tu.WatchChangeTestRowPut(cAFoo.Id(), "bcd", "value", resumeMarkerAfterBatch2),
 	})
diff --git a/syncbase/featuretests/client_v23_test.go b/syncbase/featuretests/client_v23_test.go
index cf8d635..11eb963 100644
--- a/syncbase/featuretests/client_v23_test.go
+++ b/syncbase/featuretests/client_v23_test.go
@@ -65,6 +65,9 @@
 		t.Fatalf("watch stream unexpectedly reached the end: %v", stream.Err())
 	}
 	change := stream.Change()
+	if got, want := change.EntityType, syncbase.EntityRow; got != want {
+		t.Fatalf("unexpected watch entity type: got %q, want %q", got, want)
+	}
 	if got, want := change.Collection, testCx; got != want {
 		t.Fatalf("unexpected watch collection: got %q, want %q", got, want)
 	}
diff --git a/syncbase/featuretests/sync_v23_test.go b/syncbase/featuretests/sync_v23_test.go
index 9008697..c5b59dc 100644
--- a/syncbase/featuretests/sync_v23_test.go
+++ b/syncbase/featuretests/sync_v23_test.go
@@ -561,8 +561,10 @@
 	})
 
 	var changes []syncbase.WatchChange
-	for i := 0; i < count && stream.Advance(); i++ {
-		changes = append(changes, stream.Change())
+	for len(changes) < count && stream.Advance() {
+		if stream.Change().EntityType == syncbase.EntityRow {
+			changes = append(changes, stream.Change())
+		}
 	}
 	if err := stream.Err(); err != nil {
 		return fmt.Errorf("watch stream error: %v\n", err)
diff --git a/syncbase/model.go b/syncbase/model.go
index b42e02b..943dfe0 100644
--- a/syncbase/model.go
+++ b/syncbase/model.go
@@ -478,6 +478,8 @@
 	}
 	ci := &wire.StoreChangeCollectionInfo{}
 	if err := c.value.ToValue(ci); err != nil {
+		// This should never panic since we verify the collection info is decodable
+		// when constructing the WatchChange.
 		panic(fmt.Errorf("ToValue StoreChangeCollectionInfo failed: %v, RawBytes: %#v", err, c.value))
 	}
 	return ci
diff --git a/syncbase/watch_stream.go b/syncbase/watch_stream.go
index 82b7052..7939fbc 100644
--- a/syncbase/watch_stream.go
+++ b/syncbase/watch_stream.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"strings"
 	"sync"
 
 	"v.io/v23/context"
@@ -125,7 +126,7 @@
 	if c.Name == "" {
 		res.EntityType = EntityRoot
 		// No other fields need to be set for EntityRoot.
-	} else {
+	} else if strings.ContainsRune(c.Name, '/') {
 		res.EntityType = EntityRow
 		// Parse the collection id and row key.
 		if res.Collection, res.Row, err = util.ParseCollectionRowPair(nil, c.Name); err != nil {
@@ -134,6 +135,18 @@
 		if res.Row == "" {
 			panic("empty row name")
 		}
+	} else {
+		res.EntityType = EntityCollection
+		// Parse the collection id.
+		if res.Collection, err = util.DecodeId(c.Name); err != nil {
+			panic(err)
+		}
+		if res.ChangeType == PutChange {
+			// Verify that the collection info is decodable.
+			if err := storeChange.Value.ToValue(&wire.StoreChangeCollectionInfo{}); err != nil {
+				panic(fmt.Errorf("ToValue StoreChangeCollectionInfo failed: %v, RawBytes: %#v", err, storeChange.Value))
+			}
+		}
 	}
 	return res
 }