syncbase: Expose Collection ACLs in Watch.
Implement API from v.io/c/23042 .
Collection creations and permissions changes are propagated to the
client with appropriate CollectionInfo, as well as collection destroys.
Collection updates were hooked up end-to-end in Go. Only row updates are
propagated in Java and Cgo for now.
Also factored out collection perms key parsing.
MultiPart: 1/2
Change-Id: Ibe05257f1b96e6eb08f7c3090924bb791bb8a919
diff --git a/syncbase/client_test.go b/syncbase/client_test.go
index ccea852..ec07050 100644
--- a/syncbase/client_test.go
+++ b/syncbase/client_test.go
@@ -675,20 +675,27 @@
// one batch.
tu.CheckWatch(t, wstreamAllAdmin, []tu.WatchChangeTest{
tu.WatchChangeTestRootPut(nil),
+ tu.WatchChangeTestCollectionPut(ch.Id(), wire.AllCollectionTags, adminAcl, nil),
tu.WatchChangeTestRowPut(ch.Id(), "b/1", "value", nil),
+ tu.WatchChangeTestCollectionPut(cp.Id(), wire.AllCollectionTags, openAcl, nil),
tu.WatchChangeTestRowPut(cp.Id(), "a/1", "value", nil),
tu.WatchChangeTestRowPut(cp.Id(), "c/1", "value", resumeMarkerInitial),
})
// Watch with empty prefix should have seen the initial state as one batch,
- // omitting the row in the hidden collection.
+ // omitting the row in the hidden collection and seeing no perms on it.
tu.CheckWatch(t, wstreamAll, []tu.WatchChangeTest{
tu.WatchChangeTestRootPut(nil),
+ tu.WatchChangeTestCollectionPut(ch.Id(), nil, nil, nil),
+ // Skip row "b/1" in hidden collection.
+ tu.WatchChangeTestCollectionPut(cp.Id(), wire.AllCollectionTags, openAcl, nil),
tu.WatchChangeTestRowPut(cp.Id(), "a/1", "value", nil),
tu.WatchChangeTestRowPut(cp.Id(), "c/1", "value", resumeMarkerInitial),
})
- // Watch with prefix "d" should have seen only the initial root update.
+ // Watch with prefix "d" should have seen only the initial root update and
+ // public collection change.
tu.CheckWatch(t, wstreamD, []tu.WatchChangeTest{
- tu.WatchChangeTestRootPut(resumeMarkerInitial),
+ tu.WatchChangeTestRootPut(nil),
+ tu.WatchChangeTestCollectionPut(cp.Id(), wire.AllCollectionTags, openAcl, resumeMarkerInitial),
})
// === 2) More writes ===
@@ -768,17 +775,20 @@
}
// Check 3) Writes and permissions change
+ // Admin batch no longer has Read access on the hidden collection, so it sees
+ // only the ACL update. The ACL is checked when scanning the log after commit,
+ // so the new ACL applies to the entire batch.
+ tu.CheckWatch(t, wstreamAllAdmin, []tu.WatchChangeTest{
+ tu.WatchChangeTestCollectionPut(ch.Id(), []access.Tag{access.Write, access.Admin}, newPerms, resumeMarkerAfterB3BspB4),
+ })
// Watch with empty prefix should have seen the continued changes since it has
// Read access on the hidden collection now. The ACL is checked when scanning
// the log after commit, so the new ACL applies to the entire batch.
tu.CheckWatch(t, wstreamAll, []tu.WatchChangeTest{
tu.WatchChangeTestRowPut(ch.Id(), "b/3", "value", nil),
+ tu.WatchChangeTestCollectionPut(ch.Id(), []access.Tag{access.Read, access.Admin}, newPerms, nil),
tu.WatchChangeTestRowPut(ch.Id(), "b/4", "value", resumeMarkerAfterB3BspB4),
})
- // Admin batch no longer has Read access on the hidden collection, so it sees
- // nothing (this is checked later, when there are more rows to see). The ACL
- // is checked when scanning the log after commit, so the new ACL applies to
- // the entire batch.
// === 4) Writes and collection destroy ===
// -> Put ch:"b/5", destroy ch, put cp:"a/3" in a batch.
@@ -809,15 +819,16 @@
}
// Check 4) Writes and collection destroy
- // Both watches do not see any of the hidden collection changes since the
+ // Both watches do not see any of the hidden collection rows since the
// collection has been destroyed and there is no ACL to check against. They
- // see only the public collection changes.
+ // see only the public collection rows and the hidden collection destroy.
finalChangesBoth := []tu.WatchChangeTest{
+ tu.WatchChangeTestCollectionDelete(ch.Id(), nil),
tu.WatchChangeTestRowPut(cp.Id(), "a/3", "value", resumeMarkerAfterB5BdA3),
tu.WatchChangeTestRowDelete(cp.Id(), "a/3", resumeMarkerAfterA3d),
}
- tu.CheckWatch(t, wstreamAll, finalChangesBoth)
tu.CheckWatch(t, wstreamAllAdmin, finalChangesBoth)
+ tu.CheckWatch(t, wstreamAll, finalChangesBoth)
wstreamAllAdmin.Cancel()
wstreamD.Cancel()
@@ -956,22 +967,28 @@
tu.CheckWatch(t, wstream1, []tu.WatchChangeTest{
tu.WatchChangeTestRootPut(nil),
+ tu.WatchChangeTestCollectionPut(cAFoobar.Id(), wire.AllCollectionTags, cxPerms, nil),
tu.WatchChangeTestRowPut(cAFoobar.Id(), "a", "value", nil),
tu.WatchChangeTestRowPut(cAFoobar.Id(), "abc", "value", nil),
+ tu.WatchChangeTestCollectionPut(cAFoo.Id(), wire.AllCollectionTags, cxPerms, nil),
tu.WatchChangeTestRowPut(cAFoo.Id(), "abcd", "value", nil),
tu.WatchChangeTestRowPut(cAFoo.Id(), "cd", "value", resumeMarkerInitial),
})
tu.CheckWatch(t, wstream2, []tu.WatchChangeTest{
tu.WatchChangeTestRootPut(nil),
+ tu.WatchChangeTestCollectionPut(cBFoo.Id(), wire.AllCollectionTags, cxPerms, nil),
tu.WatchChangeTestRowPut(cBFoo.Id(), "ef", "value", nil),
tu.WatchChangeTestRowPut(cBFoo.Id(), "efg", "value", resumeMarkerInitial),
})
tu.CheckWatch(t, wstream3, []tu.WatchChangeTest{
tu.WatchChangeTestRootPut(nil),
+ tu.WatchChangeTestCollectionPut(cBFoo.Id(), wire.AllCollectionTags, cxPerms, nil),
tu.WatchChangeTestRowPut(cBFoo.Id(), "ab", "value", nil),
tu.WatchChangeTestRowPut(cBFoo.Id(), "x\\yz", "value", nil),
+ tu.WatchChangeTestCollectionPut(cAFoobar.Id(), wire.AllCollectionTags, cxPerms, nil),
tu.WatchChangeTestRowPut(cAFoobar.Id(), "abc", "value", nil),
tu.WatchChangeTestRowPut(cAFoobar.Id(), "cd", "value", nil),
+ tu.WatchChangeTestCollectionPut(cAFoo.Id(), wire.AllCollectionTags, cxPerms, nil),
tu.WatchChangeTestRowPut(cAFoo.Id(), "abc", "value", nil),
tu.WatchChangeTestRowPut(cAFoo.Id(), "abcd", "value", nil),
tu.WatchChangeTestRowPut(cAFoo.Id(), "cd", "value", resumeMarkerInitial),
@@ -1024,6 +1041,7 @@
tu.CheckWatch(t, wstream1, []tu.WatchChangeTest{
tu.WatchChangeTestRowPut(cAFoo.Id(), "abcd", "value", nil),
tu.WatchChangeTestRowPut(cAFoobar.Id(), "abcd", "value", resumeMarkerAfterBatch1),
+ tu.WatchChangeTestCollectionPut(cNewId, wire.AllCollectionTags, cxPerms, nil),
tu.WatchChangeTestRowPut(cNewId, "acd", "value", nil),
tu.WatchChangeTestRowPut(cNewId, "abcd", "value", nil),
tu.WatchChangeTestRowPut(cAFoo.Id(), "bcd", "value", resumeMarkerAfterBatch2),
@@ -1034,6 +1052,7 @@
tu.CheckWatch(t, wstream3, []tu.WatchChangeTest{
tu.WatchChangeTestRowPut(cAFoo.Id(), "abcd", "value", nil),
tu.WatchChangeTestRowPut(cAFoobar.Id(), "abcd", "value", resumeMarkerAfterBatch1),
+ tu.WatchChangeTestCollectionPut(cNewId, wire.AllCollectionTags, cxPerms, nil),
tu.WatchChangeTestRowPut(cNewId, "abcd", "value", nil),
tu.WatchChangeTestRowPut(cAFoo.Id(), "bcd", "value", resumeMarkerAfterBatch2),
})
diff --git a/syncbase/featuretests/client_v23_test.go b/syncbase/featuretests/client_v23_test.go
index cf8d635..11eb963 100644
--- a/syncbase/featuretests/client_v23_test.go
+++ b/syncbase/featuretests/client_v23_test.go
@@ -65,6 +65,9 @@
t.Fatalf("watch stream unexpectedly reached the end: %v", stream.Err())
}
change := stream.Change()
+ if got, want := change.EntityType, syncbase.EntityRow; got != want {
+ t.Fatalf("unexpected watch entity type: got %q, want %q", got, want)
+ }
if got, want := change.Collection, testCx; got != want {
t.Fatalf("unexpected watch collection: got %q, want %q", got, want)
}
diff --git a/syncbase/featuretests/sync_v23_test.go b/syncbase/featuretests/sync_v23_test.go
index 9008697..c5b59dc 100644
--- a/syncbase/featuretests/sync_v23_test.go
+++ b/syncbase/featuretests/sync_v23_test.go
@@ -561,8 +561,10 @@
})
var changes []syncbase.WatchChange
- for i := 0; i < count && stream.Advance(); i++ {
- changes = append(changes, stream.Change())
+ for len(changes) < count && stream.Advance() {
+ if stream.Change().EntityType == syncbase.EntityRow {
+ changes = append(changes, stream.Change())
+ }
}
if err := stream.Err(); err != nil {
return fmt.Errorf("watch stream error: %v\n", err)
diff --git a/syncbase/model.go b/syncbase/model.go
index b42e02b..943dfe0 100644
--- a/syncbase/model.go
+++ b/syncbase/model.go
@@ -478,6 +478,8 @@
}
ci := &wire.StoreChangeCollectionInfo{}
if err := c.value.ToValue(ci); err != nil {
+ // This should never panic since we verify the collection info is decodable
+ // when constructing the WatchChange.
panic(fmt.Errorf("ToValue StoreChangeCollectionInfo failed: %v, RawBytes: %#v", err, c.value))
}
return ci
diff --git a/syncbase/watch_stream.go b/syncbase/watch_stream.go
index 82b7052..7939fbc 100644
--- a/syncbase/watch_stream.go
+++ b/syncbase/watch_stream.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "strings"
"sync"
"v.io/v23/context"
@@ -125,7 +126,7 @@
if c.Name == "" {
res.EntityType = EntityRoot
// No other fields need to be set for EntityRoot.
- } else {
+ } else if strings.ContainsRune(c.Name, '/') {
res.EntityType = EntityRow
// Parse the collection id and row key.
if res.Collection, res.Row, err = util.ParseCollectionRowPair(nil, c.Name); err != nil {
@@ -134,6 +135,18 @@
if res.Row == "" {
panic("empty row name")
}
+ } else {
+ res.EntityType = EntityCollection
+ // Parse the collection id.
+ if res.Collection, err = util.DecodeId(c.Name); err != nil {
+ panic(err)
+ }
+ if res.ChangeType == PutChange {
+ // Verify that the collection info is decodable.
+ if err := storeChange.Value.ToValue(&wire.StoreChangeCollectionInfo{}); err != nil {
+ panic(fmt.Errorf("ToValue StoreChangeCollectionInfo failed: %v, RawBytes: %#v", err, storeChange.Value))
+ }
+ }
}
return res
}