syncbase: Expose Collection ACLs in Watch.

Implement API from v.io/c/23042 .
Collection creations and permissions changes are propagated to the
client with appropriate CollectionInfo, as well as collection destroys.

Collection updates were hooked up end-to-end in Go. Only row updates are
propagated in Java and Cgo for now.

Also factored out collection perms key parsing.

MultiPart: 2/2
Change-Id: I83499fa5b9e24f6a2665db5a18ffe676830620f1
diff --git a/services/syncbase/common/key_util.go b/services/syncbase/common/key_util.go
index 45038f3..976052e 100644
--- a/services/syncbase/common/key_util.go
+++ b/services/syncbase/common/key_util.go
@@ -73,11 +73,29 @@
 func ParseRowKeyOrDie(key string) (collection wire.Id, row string) {
 	collection, row, err := ParseRowKey(key)
 	if err != nil {
+		// TODO(ivanpi): Get rid of *OrDie() functions. Always log internal errors.
 		vlog.Fatal(err)
 	}
 	return collection, row
 }
 
+// ParseCollectionPermsKey extracts the collection id from the given storage
+// engine key for a collection perms entry. Returns an error if the given key
+// is not a storage engine key for a collection perms entry.
+func ParseCollectionPermsKey(key string) (collection wire.Id, err error) {
+	// TODO(rdaoud,ivanpi): See hack in collection.go.
+	parts := SplitNKeyParts(key, 3)
+	pfx := parts[0]
+	if len(parts) < 3 || pfx != CollectionPermsPrefix || parts[2] != "" {
+		return wire.Id{}, fmt.Errorf("ParseCollectionPermsKey: invalid key %q", key)
+	}
+	cxId, err := util.DecodeId(parts[1])
+	if err != nil {
+		return wire.Id{}, fmt.Errorf("ParseCollectionPermsKey: invalid collection %q: %v", parts[1], err)
+	}
+	return cxId, nil
+}
+
 // ScanPrefixArgs returns args for sn.Scan() for the specified prefix.
 func ScanPrefixArgs(stKeyPrefix, prefix string) ([]byte, []byte) {
 	return ScanRangeArgs(stKeyPrefix, util.PrefixRangeStart(prefix), util.PrefixRangeLimit(prefix))
diff --git a/services/syncbase/common/key_util_test.go b/services/syncbase/common/key_util_test.go
index 3b2e45c..4c61d7e 100644
--- a/services/syncbase/common/key_util_test.go
+++ b/services/syncbase/common/key_util_test.go
@@ -162,6 +162,36 @@
 	}
 }
 
+func TestParseCollectionPermsKey(t *testing.T) {
+	tests := []struct {
+		key        string
+		collection wire.Id
+		err        bool
+	}{
+		{common.CollectionPermsPrefix + "\xfeu,c\xfe", wire.Id{"u", "c"}, false},
+		{common.CollectionPermsPrefix + "\xfeu,\xfe", wire.Id{"u", ""}, false},
+		{common.CollectionPermsPrefix + "\xfe,c\xfe", wire.Id{"", "c"}, false},
+		{common.CollectionPermsPrefix + "\xfe,\xfe", wire.Id{"", ""}, false},
+		{"\xfeu,c\xfe", wire.Id{}, true},
+		{"pfx\xfeu,c\xfe", wire.Id{}, true},
+		{common.RowPrefix + "\xfeu,c\xfe", wire.Id{}, true},
+		{common.CollectionPermsPrefix + "\xfe", wire.Id{}, true},
+		{common.CollectionPermsPrefix + "\xfe\xfe", wire.Id{}, true},
+		{common.CollectionPermsPrefix + "\xfeu,c\xferow", wire.Id{}, true},
+		{common.CollectionPermsPrefix + "\xfeu,c", wire.Id{}, true},
+		{common.CollectionPermsPrefix + "\xfe", wire.Id{}, true},
+	}
+	for _, test := range tests {
+		collection, err := common.ParseCollectionPermsKey(test.key)
+		if !reflect.DeepEqual(collection, test.collection) {
+			t.Errorf("%q: got %v, want %v", test.key, collection, test.collection)
+		}
+		if !reflect.DeepEqual(err != nil, test.err) {
+			t.Errorf("%q: got %v, want %v", test.key, err != nil, test.err)
+		}
+	}
+}
+
 func TestScanPrefixArgs(t *testing.T) {
 	tests := []struct {
 		stKeyPrefix, prefix, wantStart, wantLimit string
diff --git a/services/syncbase/server/database.go b/services/syncbase/server/database.go
index 8211194..eb7de69 100644
--- a/services/syncbase/server/database.go
+++ b/services/syncbase/server/database.go
@@ -460,8 +460,7 @@
 		keyBytes := []byte{}
 		for it.Advance() {
 			keyBytes = it.Key(keyBytes)
-			parts := common.SplitNKeyParts(string(keyBytes), 3)
-			id, err := pubutil.DecodeId(parts[1])
+			id, err := common.ParseCollectionPermsKey(string(keyBytes))
 			if err != nil {
 				it.Cancel()
 				return verror.New(verror.ErrInternal, ctx, err)
diff --git a/services/syncbase/server/database_watch.go b/services/syncbase/server/database_watch.go
index 991bfd1..0d8a6cd 100644
--- a/services/syncbase/server/database_watch.go
+++ b/services/syncbase/server/database_watch.go
@@ -10,6 +10,7 @@
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
+	"v.io/v23/security/access"
 	wire "v.io/v23/services/syncbase"
 	"v.io/v23/services/watch"
 	pubutil "v.io/v23/syncbase/util"
@@ -17,6 +18,7 @@
 	"v.io/v23/vom"
 	"v.io/x/ref/services/syncbase/common"
 	"v.io/x/ref/services/syncbase/server/filter"
+	"v.io/x/ref/services/syncbase/server/interfaces"
 	"v.io/x/ref/services/syncbase/store"
 	"v.io/x/ref/services/syncbase/store/watchable"
 )
@@ -110,6 +112,8 @@
 // scanInitialState sends the initial state of all matching and accessible
 // collections and rows in the database. Checks access on collections, but
 // not on database.
+// TODO(ivanpi): Assumes Read perms on database. Careful if supporting RPCs
+// requiring only Resolve (e.g. WatchGlob).
 // TODO(ivanpi): Abstract out multi-scan for scan and possibly query support.
 // TODO(ivanpi): Use watch pattern prefixes to optimize scan ranges.
 func (d *database) scanInitialState(ctx *context.T, call rpc.ServerCall, sender *watchBatchSender, sntx store.SnapshotOrTransaction, watchFilter filter.CollectionRowFilter) error {
@@ -117,13 +121,12 @@
 	// TODO(ivanpi): Collection scan order not alphabetical.
 	cxIt := sntx.Scan(common.ScanPrefixArgs(common.CollectionPermsPrefix, ""))
 	defer cxIt.Cancel()
-	cxKey := []byte{}
+	cxKey, cxPermsValue := []byte{}, []byte{}
 	for cxIt.Advance() {
-		cxKey = cxIt.Key(cxKey)
-		// See comment in util/constants.go for why we use SplitNKeyParts.
-		// TODO(rdaoud,ivanpi): See hack in collection.go.
-		cxParts := common.SplitNKeyParts(string(cxKey), 3)
-		cxId, err := pubutil.DecodeId(cxParts[1])
+		cxKey, cxPermsValue = cxIt.Key(cxKey), cxIt.Value(cxPermsValue)
+		// Database permissions for Watch ensure that the user is always allowed
+		// to know that a collection exists.
+		cxId, err := common.ParseCollectionPermsKey(string(cxKey))
 		if err != nil {
 			return verror.New(verror.ErrInternal, ctx, err)
 		}
@@ -131,21 +134,43 @@
 		if !watchFilter.CollectionMatches(cxId) {
 			continue
 		}
+		// Send collection info.
+		var cxPerms interfaces.CollectionPerms
+		if err := vom.Decode(cxPermsValue, &cxPerms); err != nil {
+			return verror.NewErrInternal(ctx) // no detailed error for cxPerms before filtering cxInfo
+		}
+		cxInfo := collectionInfoFromPerms(ctx, call, cxPerms.GetPerms())
+		cxInfoAsRawBytes, err := vom.RawBytesFromValue(cxInfo)
+		if err != nil {
+			return verror.New(verror.ErrInternal, ctx, err)
+		}
+		if err := sender.addChange(
+			pubutil.EncodeId(cxId),
+			watch.Exists,
+			&wire.StoreChange{
+				Value: cxInfoAsRawBytes,
+				// Note: FromSync cannot be reconstructed from scan.
+				FromSync: false,
+			}); err != nil {
+			return err
+		}
+		// Check permissions for row access.
+		// TODO(ivanpi): Collection scan already gets perms, optimize?
 		c := &collectionReq{
 			id: cxId,
 			d:  d,
 		}
-		// Check permissions for row access.
-		// TODO(ivanpi): Collection scan already gets perms, optimize?
 		if _, err := c.checkAccess(ctx, call, sntx); err != nil {
 			if verror.ErrorID(err) == verror.ErrNoAccess.ID {
-				// TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
-				// this to caller.
+				// Skip sending rows if the collection is inaccessible. Caller can see
+				// from collection info that they have no read access and may therefore
+				// have missing rows.
+				// TODO(ivanpi): If read access is regained, should skipped rows be sent
+				// retroactively?
 				continue
 			}
 			return err
 		}
-		// TODO(ivanpi): Send collection info.
 		// Send matching rows.
 		if err := c.scanInitialState(ctx, call, sender, sntx, watchFilter); err != nil {
 			return err
@@ -246,6 +271,8 @@
 // Note: Since the governing ACL for each change is no longer tracked, the
 // permissions check uses the ACLs in effect at the time processLogBatch is
 // called.
+// TODO(ivanpi): Assumes Read perms on database. Careful if supporting RPCs
+// requiring only Resolve (e.g. WatchGlob).
 func (d *database) processLogBatch(ctx *context.T, call rpc.ServerCall, sender *watchBatchSender, watchFilter filter.CollectionRowFilter, logs []*watchable.LogEntry) error {
 	sn := d.st.NewSnapshot()
 	defer sn.Abort()
@@ -266,73 +293,145 @@
 		default:
 			continue
 		}
-		// TODO(rogulenko): Currently we only process rows, i.e. keys of the form
-		// <RowPrefix>:xxx:yyy. Consider processing other keys.
-		if !common.IsRowKey(opKey) {
-			continue
-		}
-		cxId, row, err := common.ParseRowKey(opKey)
-		if err != nil {
-			return verror.NewErrInternal(ctx) // no detailed error before access check
-		}
-		// Filter out unnecessary rows.
-		if !watchFilter.RowMatches(cxId, row) {
-			continue
-		}
-		c := &collectionReq{
-			id: cxId,
-			d:  d,
-		}
-		// Filter out rows that we can't access.
-		// TODO(ivanpi): Check only once per collection per batch.
-		if _, err := c.checkAccess(ctx, call, sn); err != nil {
-			if verror.ErrorID(err) == verror.ErrNoAccess.ID || verror.ErrorID(err) == verror.ErrNoExist.ID {
-				// Note, the collection may not exist anymore, in which case permissions
-				// cannot be retrieved. This case is treated the same as ErrNoAccess, by
-				// skipping the row.
-				// TODO(ivanpi): Consider using the implicit ACL instead for nonexistent
-				// collections.
-				// TODO(ivanpi): Inaccessible rows are skipped. Figure out how to signal
-				// this to caller.
+		// TODO(rogulenko,ivanpi): Currently we only process rows and collection
+		// perms. Consider making watchable and processing other keys.
+		switch common.FirstKeyPart(opKey) {
+		case common.RowPrefix:
+			cxId, row, err := common.ParseRowKey(opKey)
+			if err != nil {
+				return verror.NewErrInternal(ctx) // no detailed error before access check
+			}
+			// Filter out unnecessary rows.
+			if !watchFilter.RowMatches(cxId, row) {
 				continue
 			}
-			return err
-		}
-		switch op := op.(type) {
-		case *watchable.PutOp:
-			// Note, valueBytes is reused on each iteration, so the reference must not
-			// be used beyond this case block. The code below is safe since only the
-			// VOM-decoded copy is used after the call to vom.Decode.
-			if valueBytes, err = watchable.GetAtVersion(ctx, sn, op.Key, valueBytes, op.Version); err != nil {
-				return verror.New(verror.ErrInternal, ctx, err)
+			// Filter out rows that we can't access.
+			// TODO(ivanpi): Check only once per collection per batch.
+			c := &collectionReq{
+				id: cxId,
+				d:  d,
 			}
-			var rowValueAsRawBytes *vom.RawBytes
-			if err := vom.Decode(valueBytes, &rowValueAsRawBytes); err != nil {
-				return verror.New(verror.ErrInternal, ctx, err)
-			}
-			if err := sender.addChange(
-				naming.Join(pubutil.EncodeId(cxId), row),
-				watch.Exists,
-				&wire.StoreChange{
-					Value:    rowValueAsRawBytes,
-					FromSync: logEntry.FromSync,
-				}); err != nil {
+			if _, err := c.checkAccess(ctx, call, sn); err != nil {
+				if verror.ErrorID(err) == verror.ErrNoAccess.ID || verror.ErrorID(err) == verror.ErrNoExist.ID {
+					// Skip sending rows if the collection is inaccessible. Caller can see
+					// from collection info that they have no read access and may therefore
+					// have missing rows.
+					// Note, the collection may not exist anymore, in which case permissions
+					// cannot be retrieved. This case is treated the same as ErrNoAccess, by
+					// skipping the row.
+					// TODO(ivanpi): Consider using the implicit ACL instead for nonexistent
+					// collections.
+					// TODO(ivanpi): If read access is regained, should skipped rows be sent
+					// retroactively?
+					continue
+				}
 				return err
 			}
-		case *watchable.DeleteOp:
-			if err := sender.addChange(
-				naming.Join(pubutil.EncodeId(cxId), row),
-				watch.DoesNotExist,
-				&wire.StoreChange{
-					FromSync: logEntry.FromSync,
-				}); err != nil {
-				return err
+			switch op := op.(type) {
+			case *watchable.PutOp:
+				// Note, valueBytes is reused on each iteration, so the reference must not
+				// be used beyond this case block. The code below is safe since only the
+				// VOM-decoded copy is used after the call to vom.Decode.
+				if valueBytes, err = watchable.GetAtVersion(ctx, sn, op.Key, valueBytes, op.Version); err != nil {
+					return verror.New(verror.ErrInternal, ctx, err)
+				}
+				var rowValueAsRawBytes *vom.RawBytes
+				if err := vom.Decode(valueBytes, &rowValueAsRawBytes); err != nil {
+					return verror.New(verror.ErrInternal, ctx, err)
+				}
+				if err := sender.addChange(
+					naming.Join(pubutil.EncodeId(cxId), row),
+					watch.Exists,
+					&wire.StoreChange{
+						Value:    rowValueAsRawBytes,
+						FromSync: logEntry.FromSync,
+					}); err != nil {
+					return err
+				}
+			case *watchable.DeleteOp:
+				if err := sender.addChange(
+					naming.Join(pubutil.EncodeId(cxId), row),
+					watch.DoesNotExist,
+					&wire.StoreChange{
+						FromSync: logEntry.FromSync,
+					}); err != nil {
+					return err
+				}
 			}
+
+		case common.CollectionPermsPrefix:
+			// Database permissions for Watch ensure that the user is always allowed
+			// to know that a collection exists.
+			cxId, err := common.ParseCollectionPermsKey(opKey)
+			if err != nil {
+				return verror.New(verror.ErrInternal, ctx, err)
+			}
+			// Filter out unnecessary collections.
+			if !watchFilter.CollectionMatches(cxId) {
+				continue
+			}
+			switch op := op.(type) {
+			case *watchable.PutOp:
+				if valueBytes, err = watchable.GetAtVersion(ctx, sn, op.Key, valueBytes, op.Version); err != nil {
+					return verror.NewErrInternal(ctx) // no detailed error for cxPerms before filtering cxInfo
+				}
+				var cxPerms interfaces.CollectionPerms
+				if err := vom.Decode(valueBytes, &cxPerms); err != nil {
+					return verror.NewErrInternal(ctx) // no detailed error for cxPerms before filtering cxInfo
+				}
+				cxInfo := collectionInfoFromPerms(ctx, call, cxPerms.GetPerms())
+				cxInfoAsRawBytes, err := vom.RawBytesFromValue(cxInfo)
+				if err != nil {
+					return verror.New(verror.ErrInternal, ctx, err)
+				}
+				if err := sender.addChange(
+					pubutil.EncodeId(cxId),
+					watch.Exists,
+					&wire.StoreChange{
+						Value:    cxInfoAsRawBytes,
+						FromSync: logEntry.FromSync,
+					}); err != nil {
+					return err
+				}
+			case *watchable.DeleteOp:
+				if err := sender.addChange(
+					pubutil.EncodeId(cxId),
+					watch.DoesNotExist,
+					&wire.StoreChange{
+						FromSync: logEntry.FromSync,
+					}); err != nil {
+					return err
+				}
+			}
+
+		default:
+			continue
 		}
 	}
 	return nil
 }
 
+// collectionInfoFromPerms converts a collection permissions object into a
+// StoreChangeCollectionInfo tailored to the caller. The returned collection
+// info is safe to send to the caller, assuming the caller is allowed to know
+// the collection exists. It includes a set listing all access tags that the
+// caller has on the collection. The collection permissions object itself is
+// included only if the caller is allowed to see it (has Admin permissions).
+func collectionInfoFromPerms(ctx *context.T, call rpc.ServerCall, cxPerms access.Permissions) *wire.StoreChangeCollectionInfo {
+	ci := &wire.StoreChangeCollectionInfo{
+		Allowed: make(map[access.Tag]struct{}),
+	}
+	for tag, acl := range cxPerms {
+		if acl.Authorize(ctx, call.Security()) == nil {
+			ci.Allowed[access.Tag(tag)] = struct{}{}
+		}
+	}
+	if _, isAdmin := ci.Allowed[access.Admin]; isAdmin {
+		ci.Perms = cxPerms
+	}
+	return ci
+}
+
 // watchBatchSender sends a sequence of watch changes forming a batch, delaying
 // sends to allow setting the Continued flag on the last change.
 type watchBatchSender struct {
diff --git a/services/syncbase/testutil/util.go b/services/syncbase/testutil/util.go
index 875f97b..22f9a6f 100644
--- a/services/syncbase/testutil/util.go
+++ b/services/syncbase/testutil/util.go
@@ -197,10 +197,11 @@
 }
 
 // A WatchChangeTest is a syncbase.WatchChange that has a public ValueBytes
-// field, to allow tests to set it.
+// and CollectionInfo field, to allow tests to set them.
 type WatchChangeTest struct {
 	syncbase.WatchChange
-	ValueBytes *vom.RawBytes
+	ValueBytes     *vom.RawBytes
+	CollectionInfo *wire.StoreChangeCollectionInfo
 }
 
 func WatchChangeTestRootPut(resumeMarker watch.ResumeMarker) WatchChangeTest {
@@ -214,6 +215,42 @@
 	}
 }
 
+func WatchChangeTestCollectionPut(cxId wire.Id, allowedTags []access.Tag, perms access.Permissions, resumeMarker watch.ResumeMarker) WatchChangeTest {
+	allowedTagsSet := make(map[access.Tag]struct{})
+	if len(allowedTags) == 0 {
+		allowedTagsSet = nil
+	} else {
+		for _, tag := range allowedTags {
+			allowedTagsSet[tag] = struct{}{}
+		}
+	}
+	return WatchChangeTest{
+		WatchChange: syncbase.WatchChange{
+			EntityType:   syncbase.EntityCollection,
+			Collection:   cxId,
+			ChangeType:   syncbase.PutChange,
+			ResumeMarker: resumeMarker,
+			Continued:    (resumeMarker == nil),
+		},
+		CollectionInfo: &wire.StoreChangeCollectionInfo{
+			Allowed: allowedTagsSet,
+			Perms:   perms,
+		},
+	}
+}
+
+func WatchChangeTestCollectionDelete(cxId wire.Id, resumeMarker watch.ResumeMarker) WatchChangeTest {
+	return WatchChangeTest{
+		WatchChange: syncbase.WatchChange{
+			EntityType:   syncbase.EntityCollection,
+			Collection:   cxId,
+			ChangeType:   syncbase.DeleteChange,
+			ResumeMarker: resumeMarker,
+			Continued:    (resumeMarker == nil),
+		},
+	}
+}
+
 func WatchChangeTestRowPut(cxId wire.Id, rowKey string, value interface{}, resumeMarker watch.ResumeMarker) WatchChangeTest {
 	return WatchChangeTest{
 		WatchChange: syncbase.WatchChange{
@@ -262,6 +299,8 @@
 				wantErr := want.ValueBytes.ToValue(&wantValue)
 				eq = ((gotErr == nil) == (wantErr == nil)) &&
 					reflect.DeepEqual(gotValue, wantValue)
+			case syncbase.EntityCollection:
+				eq = reflect.DeepEqual(got.CollectionInfo(), want.CollectionInfo)
 			default:
 				eq = true
 			}
@@ -278,7 +317,11 @@
 			Fatalf(t, "wstream.Advance() reached the end: %v", wstream.Err())
 		}
 		if got := wstream.Change(); !WatchChangeEq(&got, &want) {
-			Fatalf(t, "unexpected watch change: got %+v, want %+v", got, want)
+			if got.ChangeType == syncbase.PutChange && got.EntityType == syncbase.EntityCollection {
+				Fatalf(t, "unexpected watch change: got %+v, want %+v (cx info: got %+v, want %+v)", got, want, got.CollectionInfo(), want.CollectionInfo)
+			} else {
+				Fatalf(t, "unexpected watch change: got %+v, want %+v", got, want)
+			}
 		}
 	}
 }