syncbase: Enforce ACL spec on sync RPCs.

Syncgroup Manager RPCs now check permissions according to the Syncbase
ACL specification document, including recursively checking for Resolve
access where appropriate.
Documented status of Syncbase-to-Syncbase RPC permissions better.
Added other sanity checks - Collection existence and Read permission at
syncgroup create/join, prevent removing self from syncgroup ACL.
Added permission spec tests for client-to-Syncbase Syncgroup Manager and
Blob RPCs.

MultiPart: 2/2
Change-Id: Iac805e6801e0adfbfa6d6c206e7366d2b470ade4
diff --git a/services/syncbase/common/access_util.go b/services/syncbase/common/access_util.go
index 2bc7f1c..8674303 100644
--- a/services/syncbase/common/access_util.go
+++ b/services/syncbase/common/access_util.go
@@ -60,6 +60,15 @@
 	}
 }
 
+// TagAuthorizer provides an authorizer that allows blessings matching any
+// pattern in perms corresponding to the provided tag.
+func TagAuthorizer(tag access.Tag, perms access.Permissions) *anyOfTagsAuthorizer {
+	return &anyOfTagsAuthorizer{
+		tags:  []access.Tag{tag},
+		perms: perms,
+	}
+}
+
 type anyOfTagsAuthorizer struct {
 	tags  []access.Tag
 	perms access.Permissions
@@ -86,7 +95,7 @@
 func CheckImplicitPerms(ctx *context.T, call rpc.ServerCall, id wire.Id, allowedTags []access.Tag) (access.Permissions, error) {
 	implicitPerms := access.Permissions{}.Add(security.BlessingPattern(id.Blessing), access.TagStrings(allowedTags...)...)
 	// Note, allowedTags is expected to contain access.Admin.
-	if err := AnyOfTagsAuthorizer([]access.Tag{access.Admin}, implicitPerms).Authorize(ctx, call.Security()); err != nil {
+	if err := TagAuthorizer(access.Admin, implicitPerms).Authorize(ctx, call.Security()); err != nil {
 		return nil, verror.New(wire.ErrUnauthorizedCreateId, ctx, id.Blessing, id.Name, err)
 	}
 	return implicitPerms, nil
@@ -130,7 +139,7 @@
 	if existErr != nil {
 		return nil, existErr
 	}
-	return perms, AnyOfTagsAuthorizer([]access.Tag{access.Resolve}, parentPerms).Authorize(ctx, call.Security())
+	return perms, TagAuthorizer(access.Resolve, parentPerms).Authorize(ctx, call.Security())
 }
 
 // GetPermsWithExistAndParentResolveAuth returns a nil error only if the object
@@ -168,7 +177,18 @@
 	if st == nil {
 		return fuzzifyErrorForExists(ctx, call, name, parentPerms, nil, verror.New(verror.ErrNoExist, ctx, name))
 	}
-	if getErr := store.Get(ctx, st, k, v); getErr != nil {
+	getErr := store.Get(ctx, st, k, v)
+	return ExistAuthStep(ctx, call, name, parentPerms, v, getErr)
+}
+
+// ExistAuthStep is a helper intended for use in GetDataWithExistAuth
+// implementations. It assumes Resolve access up to and including the object's
+// grandparent. Taking into account the error from retrieving the object's
+// metadata, it returns ErrNoExistOrNoAccess, ErrNoExist or other errors when
+// appropriate; if the caller is not authorized for exist access,
+// ErrNoExistOrNoAccess is always returned.
+func ExistAuthStep(ctx *context.T, call rpc.ServerCall, name string, parentPerms access.Permissions, v PermserData, getErr error) error {
+	if getErr != nil {
 		if verror.ErrorID(getErr) == verror.ErrNoExist.ID {
 			getErr = verror.New(verror.ErrNoExist, ctx, name)
 		}
@@ -193,7 +213,7 @@
 	}
 	// No Read or Write on parent, caller must have both Resolve on parent and at
 	// least one tag on the object itself to get the original error.
-	if parentXErr := AnyOfTagsAuthorizer([]access.Tag{access.Resolve}, parentPerms).Authorize(ctx, call.Security()); parentXErr != nil {
+	if parentXErr := TagAuthorizer(access.Resolve, parentPerms).Authorize(ctx, call.Security()); parentXErr != nil {
 		return fuzzyErr
 	}
 	if selfAnyErr := AnyOfTagsAuthorizer(access.AllTypicalTags(), perms).Authorize(ctx, call.Security()); selfAnyErr != nil {
diff --git a/services/syncbase/server/database.go b/services/syncbase/server/database.go
index 6ab1d86..b4974dd 100644
--- a/services/syncbase/server/database.go
+++ b/services/syncbase/server/database.go
@@ -168,7 +168,7 @@
 // passed in perms.
 func hasPermission(ctx *context.T, securityCall security.Call, perms access.Permissions, tag access.Tag) bool {
 	// Authorize returns either an error or nil, so nil means the caller is authorized.
-	return common.AnyOfTagsAuthorizer([]access.Tag{tag}, perms).Authorize(ctx, securityCall) == nil
+	return common.TagAuthorizer(tag, perms).Authorize(ctx, securityCall) == nil
 }
 
 // openDatabase opens a database and returns a *database for it. Designed for
@@ -551,11 +551,17 @@
 	return d.s
 }
 
-func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
+func (d *database) GetCollectionPerms(ctx *context.T, cxId wire.Id, st store.StoreReader) (access.Permissions, error) {
 	if !d.exists {
 		vlog.Fatalf("database %v does not exist", d.id)
 	}
-	return util.GetWithAuth(ctx, call, st, d.stKey(), &DatabaseData{})
+	c := &collectionReq{
+		id: cxId,
+		d:  d,
+	}
+	var cp interfaces.CollectionPerms
+	err := store.Get(ctx, st, c.permsKey(), &cp)
+	return cp.GetPerms(), err
 }
 
 func (d *database) Id() wire.Id {
diff --git a/services/syncbase/server/database_bm.go b/services/syncbase/server/database_bm.go
index a25a5a2..97338fb 100644
--- a/services/syncbase/server/database_bm.go
+++ b/services/syncbase/server/database_bm.go
@@ -16,6 +16,7 @@
 // RPCs for managing blobs between Syncbase and its clients.
 
 // Note, access authorization is checked in SyncDatabase methods.
+// TODO(ivanpi): Move d.exists checks into SyncDatabase to prevent existence leaks.
 
 func (d *database) CreateBlob(ctx *context.T, call rpc.ServerCall) (wire.BlobRef, error) {
 	if !d.exists {
diff --git a/services/syncbase/server/database_sgm.go b/services/syncbase/server/database_sgm.go
index 73605c2..07a5ca0 100644
--- a/services/syncbase/server/database_sgm.go
+++ b/services/syncbase/server/database_sgm.go
@@ -16,6 +16,7 @@
 // Syncgroup RPC methods
 
 // Note, access authorization is checked in SyncDatabase methods.
+// TODO(ivanpi): Move d.exists checks into SyncDatabase to prevent existence leaks.
 
 func (d *database) ListSyncgroups(ctx *context.T, call rpc.ServerCall) ([]wire.Id, error) {
 	if !d.exists {
@@ -45,21 +46,24 @@
 	if !d.exists {
 		return verror.New(verror.ErrNoExist, ctx, d.id)
 	}
-	return verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.LeaveSyncgroup(ctx, call, sgId)
 }
 
 func (d *database) DestroySyncgroup(ctx *context.T, call rpc.ServerCall, sgId wire.Id) error {
 	if !d.exists {
 		return verror.New(verror.ErrNoExist, ctx, d.id)
 	}
-	return verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.DestroySyncgroup(ctx, call, sgId)
 }
 
 func (d *database) EjectFromSyncgroup(ctx *context.T, call rpc.ServerCall, sgId wire.Id, member string) error {
 	if !d.exists {
 		return verror.New(verror.ErrNoExist, ctx, d.id)
 	}
-	return verror.NewErrNotImplemented(ctx)
+	sd := vsync.NewSyncDatabase(d)
+	return sd.EjectFromSyncgroup(ctx, call, sgId, member)
 }
 
 func (d *database) GetSyncgroupSpec(ctx *context.T, call rpc.ServerCall, sgId wire.Id) (wire.SyncgroupSpec, string, error) {
diff --git a/services/syncbase/server/interfaces/database.go b/services/syncbase/server/interfaces/database.go
index 07089f3..69326a4 100644
--- a/services/syncbase/server/interfaces/database.go
+++ b/services/syncbase/server/interfaces/database.go
@@ -6,7 +6,7 @@
 
 import (
 	"v.io/v23/context"
-	"v.io/v23/rpc"
+	"v.io/v23/security/access"
 	wire "v.io/v23/services/syncbase"
 	"v.io/x/ref/services/syncbase/common"
 	"v.io/x/ref/services/syncbase/store"
@@ -24,10 +24,9 @@
 	// Service returns the service handle for this database.
 	Service() Service
 
-	// CheckPermsInternal checks whether the given RPC (ctx, call) is allowed per
-	// the database perms.
-	// TODO(ivanpi): Remove once all callers are ported to explicit auth.
-	CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error
+	// GetCollectionPerms retrieves the permissions for the Collection with the
+	// given id. No authorization check is performed.
+	GetCollectionPerms(ctx *context.T, cxId wire.Id, st store.StoreReader) (access.Permissions, error)
 
 	// GetSchemaMetadataInternal returns SchemaMetadata stored for this db
 	// without checking any credentials.
diff --git a/services/syncbase/server/interfaces/interfaces.vdl.go b/services/syncbase/server/interfaces/interfaces.vdl.go
index 359db0f..ded5dab 100644
--- a/services/syncbase/server/interfaces/interfaces.vdl.go
+++ b/services/syncbase/server/interfaces/interfaces.vdl.go
@@ -2501,12 +2501,21 @@
 type SyncClientMethods interface {
 	// GetTime returns metadata related to the Syncbase virtual clock, including
 	// system clock values, last NTP timestamp, num reboots, etc.
+	//
+	// Requires: Always allowed.
+	// TODO(ivanpi): Consider adding an ACL or checking syncgroup ACLs.
 	GetTime(_ *context.T, req TimeReq, initiator string, _ ...rpc.CallOpt) (TimeResp, error)
 	// GetDeltas returns the responder's current generation vectors and all
 	// the missing log records when compared to the initiator's generation
 	// vectors for one Database for either syncgroup metadata or data.
 	// The final result (in DeltaFinalResp) currently includes the
 	// syncgroup priorities for blob ownership for the server.
+	//
+	// Requires: Read on syncgroup.
+	// TODO(ivanpi): Consider rechecking Collection Read access.
+	// The caller should verify that all received changes (data, ACLs, spec) are
+	// signed by a blessing that had the appropriate permission (Write or Admin).
+	// TODO(ivanpi): Implement signatures and signature verification.
 	GetDeltas(_ *context.T, req DeltaReq, initiator string, _ ...rpc.CallOpt) (SyncGetDeltasClientCall, error)
 	// PublishSyncgroup is invoked on the syncgroup name (typically served
 	// by a "central" peer) to publish the syncgroup.  It takes the name of
@@ -2524,6 +2533,9 @@
 	// locally deems the syncgroup to be in a pending state and does not
 	// mutate it.  Thus it locally rejects syncgroup joins or updates to
 	// its spec until it is caught up on the syncgroup history.
+	//
+	// Requires: Always allowed.
+	// TODO(ivanpi): Add separate ACL for PublishSyncgroup and check it.
 	PublishSyncgroup(_ *context.T, publisher string, sg Syncgroup, version string, genvec GenVector, _ ...rpc.CallOpt) (string, error)
 	// JoinSyncgroupAtAdmin is invoked by a prospective syncgroup member's
 	// Syncbase on a syncgroup admin. It checks whether the requestor is
@@ -2536,6 +2548,8 @@
 	// local updates to the syncgroup spec or, if it were also an admin on
 	// the syncgroup, it would reject syncgroup joins until it is caught up
 	// on the syncgroup history through p2p sync.
+	//
+	// Requires: Read on syncgroup and on all Collections in the syncgroup spec.
 	JoinSyncgroupAtAdmin(_ *context.T, dbId syncbase.Id, sgId syncbase.Id, joinerName string, myInfo syncbase.SyncgroupMemberInfo, _ ...rpc.CallOpt) (sg Syncgroup, version string, genvec GenVector, _ error)
 	// HaveBlob verifies that the peer has the requested blob, and if
 	// present, returns its size.  Otherwise, it returns -1, and the location
@@ -2991,12 +3005,21 @@
 type SyncServerMethods interface {
 	// GetTime returns metadata related to the Syncbase virtual clock, including
 	// system clock values, last NTP timestamp, num reboots, etc.
+	//
+	// Requires: Always allowed.
+	// TODO(ivanpi): Consider adding an ACL or checking syncgroup ACLs.
 	GetTime(_ *context.T, _ rpc.ServerCall, req TimeReq, initiator string) (TimeResp, error)
 	// GetDeltas returns the responder's current generation vectors and all
 	// the missing log records when compared to the initiator's generation
 	// vectors for one Database for either syncgroup metadata or data.
 	// The final result (in DeltaFinalResp) currently includes the
 	// syncgroup priorities for blob ownership for the server.
+	//
+	// Requires: Read on syncgroup.
+	// TODO(ivanpi): Consider rechecking Collection Read access.
+	// The caller should verify that all received changes (data, ACLs, spec) are
+	// signed by a blessing that had the appropriate permission (Write or Admin).
+	// TODO(ivanpi): Implement signatures and signature verification.
 	GetDeltas(_ *context.T, _ SyncGetDeltasServerCall, req DeltaReq, initiator string) (DeltaFinalResp, error)
 	// PublishSyncgroup is invoked on the syncgroup name (typically served
 	// by a "central" peer) to publish the syncgroup.  It takes the name of
@@ -3014,6 +3037,9 @@
 	// locally deems the syncgroup to be in a pending state and does not
 	// mutate it.  Thus it locally rejects syncgroup joins or updates to
 	// its spec until it is caught up on the syncgroup history.
+	//
+	// Requires: Always allowed.
+	// TODO(ivanpi): Add separate ACL for PublishSyncgroup and check it.
 	PublishSyncgroup(_ *context.T, _ rpc.ServerCall, publisher string, sg Syncgroup, version string, genvec GenVector) (string, error)
 	// JoinSyncgroupAtAdmin is invoked by a prospective syncgroup member's
 	// Syncbase on a syncgroup admin. It checks whether the requestor is
@@ -3026,6 +3052,8 @@
 	// local updates to the syncgroup spec or, if it were also an admin on
 	// the syncgroup, it would reject syncgroup joins until it is caught up
 	// on the syncgroup history through p2p sync.
+	//
+	// Requires: Read on syncgroup and on all Collections in the syncgroup spec.
 	JoinSyncgroupAtAdmin(_ *context.T, _ rpc.ServerCall, dbId syncbase.Id, sgId syncbase.Id, joinerName string, myInfo syncbase.SyncgroupMemberInfo) (sg Syncgroup, version string, genvec GenVector, _ error)
 	// HaveBlob verifies that the peer has the requested blob, and if
 	// present, returns its size.  Otherwise, it returns -1, and the location
@@ -3091,12 +3119,21 @@
 type SyncServerStubMethods interface {
 	// GetTime returns metadata related to the Syncbase virtual clock, including
 	// system clock values, last NTP timestamp, num reboots, etc.
+	//
+	// Requires: Always allowed.
+	// TODO(ivanpi): Consider adding an ACL or checking syncgroup ACLs.
 	GetTime(_ *context.T, _ rpc.ServerCall, req TimeReq, initiator string) (TimeResp, error)
 	// GetDeltas returns the responder's current generation vectors and all
 	// the missing log records when compared to the initiator's generation
 	// vectors for one Database for either syncgroup metadata or data.
 	// The final result (in DeltaFinalResp) currently includes the
 	// syncgroup priorities for blob ownership for the server.
+	//
+	// Requires: Read on syncgroup.
+	// TODO(ivanpi): Consider rechecking Collection Read access.
+	// The caller should verify that all received changes (data, ACLs, spec) are
+	// signed by a blessing that had the appropriate permission (Write or Admin).
+	// TODO(ivanpi): Implement signatures and signature verification.
 	GetDeltas(_ *context.T, _ *SyncGetDeltasServerCallStub, req DeltaReq, initiator string) (DeltaFinalResp, error)
 	// PublishSyncgroup is invoked on the syncgroup name (typically served
 	// by a "central" peer) to publish the syncgroup.  It takes the name of
@@ -3114,6 +3151,9 @@
 	// locally deems the syncgroup to be in a pending state and does not
 	// mutate it.  Thus it locally rejects syncgroup joins or updates to
 	// its spec until it is caught up on the syncgroup history.
+	//
+	// Requires: Always allowed.
+	// TODO(ivanpi): Add separate ACL for PublishSyncgroup and check it.
 	PublishSyncgroup(_ *context.T, _ rpc.ServerCall, publisher string, sg Syncgroup, version string, genvec GenVector) (string, error)
 	// JoinSyncgroupAtAdmin is invoked by a prospective syncgroup member's
 	// Syncbase on a syncgroup admin. It checks whether the requestor is
@@ -3126,6 +3166,8 @@
 	// local updates to the syncgroup spec or, if it were also an admin on
 	// the syncgroup, it would reject syncgroup joins until it is caught up
 	// on the syncgroup history through p2p sync.
+	//
+	// Requires: Read on syncgroup and on all Collections in the syncgroup spec.
 	JoinSyncgroupAtAdmin(_ *context.T, _ rpc.ServerCall, dbId syncbase.Id, sgId syncbase.Id, joinerName string, myInfo syncbase.SyncgroupMemberInfo) (sg Syncgroup, version string, genvec GenVector, _ error)
 	// HaveBlob verifies that the peer has the requested blob, and if
 	// present, returns its size.  Otherwise, it returns -1, and the location
@@ -3272,7 +3314,7 @@
 	Methods: []rpc.MethodDesc{
 		{
 			Name: "GetTime",
-			Doc:  "// GetTime returns metadata related to the Syncbase virtual clock, including\n// system clock values, last NTP timestamp, num reboots, etc.",
+			Doc:  "// GetTime returns metadata related to the Syncbase virtual clock, including\n// system clock values, last NTP timestamp, num reboots, etc.\n//\n// Requires: Always allowed.\n// TODO(ivanpi): Consider adding an ACL or checking syncgroup ACLs.",
 			InArgs: []rpc.ArgDesc{
 				{"req", ``},       // TimeReq
 				{"initiator", ``}, // string
@@ -3283,7 +3325,7 @@
 		},
 		{
 			Name: "GetDeltas",
-			Doc:  "// GetDeltas returns the responder's current generation vectors and all\n// the missing log records when compared to the initiator's generation\n// vectors for one Database for either syncgroup metadata or data.\n// The final result (in DeltaFinalResp) currently includes the\n// syncgroup priorities for blob ownership for the server.",
+			Doc:  "// GetDeltas returns the responder's current generation vectors and all\n// the missing log records when compared to the initiator's generation\n// vectors for one Database for either syncgroup metadata or data.\n// The final result (in DeltaFinalResp) currently includes the\n// syncgroup priorities for blob ownership for the server.\n//\n// Requires: Read on syncgroup.\n// TODO(ivanpi): Consider rechecking Collection Read access.\n// The caller should verify that all received changes (data, ACLs, spec) are\n// signed by a blessing that had the appropriate permission (Write or Admin).\n// TODO(ivanpi): Implement signatures and signature verification.",
 			InArgs: []rpc.ArgDesc{
 				{"req", ``},       // DeltaReq
 				{"initiator", ``}, // string
@@ -3291,11 +3333,10 @@
 			OutArgs: []rpc.ArgDesc{
 				{"", ``}, // DeltaFinalResp
 			},
-			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
 			Name: "PublishSyncgroup",
-			Doc:  "// PublishSyncgroup is invoked on the syncgroup name (typically served\n// by a \"central\" peer) to publish the syncgroup.  It takes the name of\n// Syncbase doing the publishing (the publisher) and returns the name\n// of the Syncbase where the syncgroup is published (the publishee).\n// This allows the publisher and the publishee to learn of each other.\n// When a syncgroup is published, the publishee is given the syncgroup\n// metadata, its current version at the publisher, and the current\n// syncgroup generation vector.  The generation vector serves as a\n// checkpoint at the time of publishing.  The publishing proceeds\n// asynchronously, and the publishee learns the syncgroup history\n// through the routine p2p sync process and determines when it has\n// caught up to the level of knowledge at the time of publishing using\n// the checkpointed generation vector.  Until that point, the publishee\n// locally deems the syncgroup to be in a pending state and does not\n// mutate it.  Thus it locally rejects syncgroup joins or updates to\n// its spec until it is caught up on the syncgroup history.",
+			Doc:  "// PublishSyncgroup is invoked on the syncgroup name (typically served\n// by a \"central\" peer) to publish the syncgroup.  It takes the name of\n// Syncbase doing the publishing (the publisher) and returns the name\n// of the Syncbase where the syncgroup is published (the publishee).\n// This allows the publisher and the publishee to learn of each other.\n// When a syncgroup is published, the publishee is given the syncgroup\n// metadata, its current version at the publisher, and the current\n// syncgroup generation vector.  The generation vector serves as a\n// checkpoint at the time of publishing.  The publishing proceeds\n// asynchronously, and the publishee learns the syncgroup history\n// through the routine p2p sync process and determines when it has\n// caught up to the level of knowledge at the time of publishing using\n// the checkpointed generation vector.  Until that point, the publishee\n// locally deems the syncgroup to be in a pending state and does not\n// mutate it.  Thus it locally rejects syncgroup joins or updates to\n// its spec until it is caught up on the syncgroup history.\n//\n// Requires: Always allowed.\n// TODO(ivanpi): Add separate ACL for PublishSyncgroup and check it.",
 			InArgs: []rpc.ArgDesc{
 				{"publisher", ``}, // string
 				{"sg", ``},        // Syncgroup
@@ -3305,11 +3346,10 @@
 			OutArgs: []rpc.ArgDesc{
 				{"", ``}, // string
 			},
-			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
 		},
 		{
 			Name: "JoinSyncgroupAtAdmin",
-			Doc:  "// JoinSyncgroupAtAdmin is invoked by a prospective syncgroup member's\n// Syncbase on a syncgroup admin. It checks whether the requestor is\n// allowed to join the named syncgroup, and if so, adds the requestor to\n// the syncgroup.  It returns a copy of the updated syncgroup metadata,\n// its version, and the syncgroup generation vector at the time of the\n// join.  Similar to the PublishSyncgroup scenario, the joiner at that\n// point does not have the syncgroup history and locally deems it to be\n// in a pending state and does not mutate it.  This means it rejects\n// local updates to the syncgroup spec or, if it were also an admin on\n// the syncgroup, it would reject syncgroup joins until it is caught up\n// on the syncgroup history through p2p sync.",
+			Doc:  "// JoinSyncgroupAtAdmin is invoked by a prospective syncgroup member's\n// Syncbase on a syncgroup admin. It checks whether the requestor is\n// allowed to join the named syncgroup, and if so, adds the requestor to\n// the syncgroup.  It returns a copy of the updated syncgroup metadata,\n// its version, and the syncgroup generation vector at the time of the\n// join.  Similar to the PublishSyncgroup scenario, the joiner at that\n// point does not have the syncgroup history and locally deems it to be\n// in a pending state and does not mutate it.  This means it rejects\n// local updates to the syncgroup spec or, if it were also an admin on\n// the syncgroup, it would reject syncgroup joins until it is caught up\n// on the syncgroup history through p2p sync.\n//\n// Requires: Read on syncgroup and on all Collections in the syncgroup spec.",
 			InArgs: []rpc.ArgDesc{
 				{"dbId", ``},       // syncbase.Id
 				{"sgId", ``},       // syncbase.Id
@@ -3321,7 +3361,6 @@
 				{"version", ``}, // string
 				{"genvec", ``},  // GenVector
 			},
-			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
 		},
 		{
 			Name: "HaveBlob",
diff --git a/services/syncbase/server/interfaces/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
index f49b297..830f7c3 100644
--- a/services/syncbase/server/interfaces/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -6,7 +6,6 @@
 
 import (
 	wire "v.io/v23/services/syncbase"
-	"v.io/v23/security/access"
 )
 
 // Sync defines methods for data exchange between Syncbases.
@@ -14,6 +13,9 @@
 type Sync interface {
 	// GetTime returns metadata related to the Syncbase virtual clock, including
 	// system clock values, last NTP timestamp, num reboots, etc.
+	//
+	// Requires: Always allowed.
+	// TODO(ivanpi): Consider adding an ACL or checking syncgroup ACLs.
 	GetTime(req TimeReq, initiator string) (TimeResp | error)
 
 	// GetDeltas returns the responder's current generation vectors and all
@@ -21,7 +23,13 @@
 	// vectors for one Database for either syncgroup metadata or data.
 	// The final result (in DeltaFinalResp) currently includes the
 	// syncgroup priorities for blob ownership for the server.
-	GetDeltas(req DeltaReq, initiator string) stream<_, DeltaResp> (DeltaFinalResp | error) {access.Read}
+	//
+	// Requires: Read on syncgroup.
+	// TODO(ivanpi): Consider rechecking Collection Read access.
+	// The caller should verify that all received changes (data, ACLs, spec) are
+	// signed by a blessing that had the appropriate permission (Write or Admin).
+	// TODO(ivanpi): Implement signatures and signature verification.
+	GetDeltas(req DeltaReq, initiator string) stream<_, DeltaResp> (DeltaFinalResp | error)
 
 	// Syncgroup-related methods.
 
@@ -41,7 +49,10 @@
 	// locally deems the syncgroup to be in a pending state and does not
 	// mutate it.  Thus it locally rejects syncgroup joins or updates to
 	// its spec until it is caught up on the syncgroup history.
-	PublishSyncgroup(publisher string, sg Syncgroup, version string, genvec GenVector) (string | error) {access.Write}
+	//
+	// Requires: Always allowed.
+	// TODO(ivanpi): Add separate ACL for PublishSyncgroup and check it.
+	PublishSyncgroup(publisher string, sg Syncgroup, version string, genvec GenVector) (string | error)
 
 	// JoinSyncgroupAtAdmin is invoked by a prospective syncgroup member's
 	// Syncbase on a syncgroup admin. It checks whether the requestor is
@@ -54,9 +65,12 @@
 	// local updates to the syncgroup spec or, if it were also an admin on
 	// the syncgroup, it would reject syncgroup joins until it is caught up
 	// on the syncgroup history through p2p sync.
-	JoinSyncgroupAtAdmin(dbId wire.Id, sgId wire.Id, joinerName string, myInfo wire.SyncgroupMemberInfo) (sg Syncgroup, version string, genvec GenVector | error) {access.Read}
+	//
+	// Requires: Read on syncgroup and on all Collections in the syncgroup spec.
+	JoinSyncgroupAtAdmin(dbId wire.Id, sgId wire.Id, joinerName string, myInfo wire.SyncgroupMemberInfo) (sg Syncgroup, version string, genvec GenVector | error)
 
 	// BlobSync methods.
+	// TODO(ivanpi): Document authorization policies.
 
 	// HaveBlob verifies that the peer has the requested blob, and if
 	// present, returns its size.  Otherwise, it returns -1, and the location
diff --git a/services/syncbase/server/interfaces/sync_types.go b/services/syncbase/server/interfaces/sync_types.go
index b948b83..e6a6340 100644
--- a/services/syncbase/server/interfaces/sync_types.go
+++ b/services/syncbase/server/interfaces/sync_types.go
@@ -71,8 +71,13 @@
 
 var (
 	_ common.PermserData = (*CollectionPerms)(nil)
+	_ common.PermserData = (*Syncgroup)(nil)
 )
 
 func (perms *CollectionPerms) GetPerms() access.Permissions {
 	return access.Permissions(*perms)
 }
+
+func (sg *Syncgroup) GetPerms() access.Permissions {
+	return sg.Spec.Perms
+}
diff --git a/services/syncbase/server/util/store.go b/services/syncbase/server/util/store.go
index ce3163e..c819bae 100644
--- a/services/syncbase/server/util/store.go
+++ b/services/syncbase/server/util/store.go
@@ -8,11 +8,7 @@
 	"strconv"
 
 	"v.io/v23/context"
-	"v.io/v23/rpc"
-	"v.io/v23/security/access"
 	"v.io/v23/verror"
-	"v.io/x/ref/services/syncbase/common"
-	"v.io/x/ref/services/syncbase/store"
 )
 
 func FormatVersion(version uint64) string {
@@ -25,19 +21,3 @@
 	}
 	return nil
 }
-
-// TODO(sadovsky): Perhaps these functions should strip key prefixes such as
-// "c:" from the error messages they return.
-
-// GetWithAuth does Get followed by an auth check.
-// TODO(ivanpi): Remove when all callers are gone.
-func GetWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, k string, v common.PermserData) error {
-	if err := store.Get(ctx, st, k, v); err != nil {
-		return err
-	}
-	auth, _ := access.PermissionsAuthorizer(v.GetPerms(), access.TypicalTagType())
-	if err := auth.Authorize(ctx, call.Security()); err != nil {
-		return verror.New(verror.ErrNoAccess, ctx, err)
-	}
-	return nil
-}
diff --git a/services/syncbase/vsync/responder.go b/services/syncbase/vsync/responder.go
index 49c16f3..8d6bab7 100644
--- a/services/syncbase/vsync/responder.go
+++ b/services/syncbase/vsync/responder.go
@@ -10,6 +10,7 @@
 	"strings"
 
 	"v.io/v23/context"
+	"v.io/v23/security/access"
 	wire "v.io/v23/services/syncbase"
 	"v.io/v23/verror"
 	"v.io/v23/vom"
@@ -143,6 +144,8 @@
 		return interfaces.NewErrDbOffline(ctx, rSt.dbId)
 	}
 
+	// TODO(ivanpi): Ensure that Database and syncgroup existence is not leaked.
+
 	// Phase 1 of sendDeltas: Authorize the initiator and respond to the
 	// caller only for the syncgroups that allow access.
 	err := rSt.authorizeAndFilterSyncgroups(ctx)
@@ -184,7 +187,7 @@
 		var sg *interfaces.Syncgroup
 		sg, err = getSyncgroupByGid(ctx, rSt.st, sgid)
 		if err == nil {
-			err = authorize(ctx, rSt.call.Security(), sg)
+			err = common.TagAuthorizer(access.Read, sg.Spec.Perms).Authorize(ctx, rSt.call.Security())
 		}
 		if err == nil {
 			if !rSt.sg {
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index df78503..6bbce3a 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -273,7 +273,7 @@
 	return getHead(ctx, st, sgOID(gid))
 }
 
-// getSyncgroupById retrieves the syncgroup given its ID.
+// getSyncgroupByGid retrieves the syncgroup given its gid.
 func getSyncgroupByGid(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.Syncgroup, error) {
 	version, err := getSyncgroupVersion(ctx, st, gid)
 	if err != nil {
@@ -282,7 +282,7 @@
 	return getSGDataEntry(ctx, st, gid, version)
 }
 
-// delSyncgroupById deletes the syncgroup given its ID.
+// delSyncgroupByGid deletes the syncgroup given its gid.
 // bst may be nil.
 func delSyncgroupByGid(ctx *context.T, bst blob.BlobStore, tx *watchable.Transaction, gid interfaces.GroupId) error {
 	sg, err := getSyncgroupByGid(ctx, tx, gid)
@@ -292,7 +292,7 @@
 	return delSyncgroupBySgId(ctx, bst, tx, sg.DbId, sg.Id)
 }
 
-// delSyncgroupByName deletes the syncgroup given its name.
+// delSyncgroupBySgId deletes the syncgroup given its id.
 // bst may be nil.
 func delSyncgroupBySgId(ctx *context.T, bst blob.BlobStore, tx *watchable.Transaction, dbId, sgId wire.Id) error {
 	// Get the syncgroup ID and current version.
@@ -660,6 +660,8 @@
 
 // TODO(hpucha): Pass blessings along.
 func (sd *syncDatabase) CreateSyncgroup(ctx *context.T, call rpc.ServerCall, sgId wire.Id, spec wire.SyncgroupSpec, myInfo wire.SyncgroupMemberInfo) error {
+	allowCreateSyncgroupDb := []access.Tag{access.Write}
+
 	vlog.VI(2).Infof("sync: CreateSyncgroup: begin: %s, spec %+v", sgId, spec)
 	defer vlog.VI(2).Infof("sync: CreateSyncgroup: end: %s", sgId)
 
@@ -667,6 +669,11 @@
 		return verror.New(wire.ErrInvalidName, ctx, pubutil.EncodeId(sgId), err)
 	}
 
+	// Client must add themselves to Read ACL.
+	if err := common.TagAuthorizer(access.Read, spec.Perms).Authorize(ctx, call.Security()); err != nil {
+		return verror.New(verror.ErrBadArg, ctx, "must include self in syncgroup Read ACL")
+	}
+
 	ss := sd.sync.(*syncService)
 	dbId := sd.db.Id()
 
@@ -691,7 +698,7 @@
 
 	err = watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
 		// Check permissions on Database.
-		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
+		if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowCreateSyncgroupDb, tx); err != nil {
 			return err
 		}
 
@@ -702,20 +709,11 @@
 			return err
 		}
 
-		// Check that all the collections on which the syncgroup is
-		// being created exist.
-		for _, c := range spec.Collections {
-			collKey := common.JoinKeyParts(common.CollectionPermsPrefix, pubutil.EncodeId(c), "")
-			if err := store.Get(ctx, tx, collKey, &interfaces.CollectionPerms{}); err != nil {
-				// TODO(hpucha): Is this error ok to return to
-				// the end user in terms of error visibility.
-				return verror.New(verror.ErrNoExist, ctx, "collection missing", c)
-			}
+		// Check that all the collections in spec exist and are syncable.
+		if err := verifyCollectionsForSync(ctx, call, sd.db, spec.Collections, true, tx); err != nil {
+			return err
 		}
 
-		// TODO(hpucha): Check ACLs on all SG collections.
-		// This may need another method on util.Database interface.
-
 		// Reserve a log generation and position counts for the new syncgroup.
 		gen, pos := ss.reserveGenAndPosInDbLog(ctx, dbId, sgOID(gid), 1)
 
@@ -770,6 +768,8 @@
 }
 
 func (sd *syncDatabase) JoinSyncgroup(ctx *context.T, call rpc.ServerCall, remoteSyncbaseName string, expectedSyncbaseBlessings []string, sgId wire.Id, myInfo wire.SyncgroupMemberInfo) (wire.SyncgroupSpec, error) {
+	allowJoinSyncgroupDb := []access.Tag{access.Write}
+
 	vlog.VI(2).Infof("sync: JoinSyncgroup: begin: %v at %s, call is %v", sgId, remoteSyncbaseName, call)
 	defer vlog.VI(2).Infof("sync: JoinSyncgroup: end: %v at %s", sgId, remoteSyncbaseName)
 
@@ -779,7 +779,7 @@
 	gid := SgIdToGid(sd.db.Id(), sgId)
 	err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
 		// Check permissions on Database.
-		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
+		if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowJoinSyncgroupDb, tx); err != nil {
 			return err
 		}
 
@@ -790,8 +790,16 @@
 		}
 
 		// Check SG ACL. Caller must have Read access on the syncgroup
-		// acl to join a syncgroup.
-		if err := authorize(ctx, call.Security(), sg); err != nil {
+		// ACL to join a syncgroup. Database Resolve is not necessary.
+		// Note, since joiner has Write access on Database, they are
+		// allowed to know that the syncgroup exists.
+		if err := common.TagAuthorizer(access.Read, sg.Spec.Perms).Authorize(ctx, call.Security()); err != nil {
+			return err
+		}
+
+		// Syncgroup already exists, so all collections in spec must exist.
+		// Check that they are all syncable by the joiner.
+		if err := verifyCollectionsForSync(ctx, call, sd.db, sg.Spec.Collections, true, tx); err != nil {
 			return err
 		}
 
@@ -859,6 +867,12 @@
 	}
 
 	err = watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
+		// Collections aren't required to exist when joining a syncgroup. However,
+		// any collections that do exist must be syncable by the joiner.
+		if err := verifyCollectionsForSync(ctx, call, sd.db, sg2.Spec.Collections, false, tx); err != nil {
+			return err
+		}
+
 		if err := ss.addSyncgroup(ctx, tx, version, false, "", genvec, 0, 0, 0, &sg2); err != nil {
 			return err
 		}
@@ -884,6 +898,77 @@
 	return sg2.Spec, nil
 }
 
+func (sd *syncDatabase) LeaveSyncgroup(ctx *context.T, call rpc.ServerCall, sgId wire.Id) error {
+	allowLeaveSyncgroupDb := []access.Tag{access.Write}
+
+	err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
+		// Check permissions on Database.
+		if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowLeaveSyncgroupDb, tx); err != nil {
+			return err
+		}
+
+		return verror.NewErrNotImplemented(ctx)
+	})
+
+	return err
+}
+
+func (sd *syncDatabase) DestroySyncgroup(ctx *context.T, call rpc.ServerCall, sgId wire.Id) error {
+	allowDestroySyncgroupDb := []access.Tag{access.Write}
+
+	var sg *interfaces.Syncgroup
+	gid := SgIdToGid(sd.db.Id(), sgId)
+	err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
+		// Check permissions on Database.
+		if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowDestroySyncgroupDb, tx); err != nil {
+			return err
+		}
+
+		// Check if syncgroup already exists and get its info.
+		var sgErr error
+		sg, sgErr = getSyncgroupByGid(ctx, tx, gid)
+		if sgErr != nil {
+			return sgErr
+		}
+
+		// Check SG ACL. Caller must have Admin access on the syncgroup
+		// ACL to destroy a syncgroup. Database Resolve is not necessary.
+		// Note, since destroyer has Write access on Database, they are
+		// allowed to know that the syncgroup exists.
+		if err := common.TagAuthorizer(access.Admin, sg.GetPerms()).Authorize(ctx, call.Security()); err != nil {
+			return err
+		}
+
+		return verror.NewErrNotImplemented(ctx)
+	})
+
+	return err
+}
+
+func (sd *syncDatabase) EjectFromSyncgroup(ctx *context.T, call rpc.ServerCall, sgId wire.Id, member string) error {
+	allowEjectFromSyncgroup := []access.Tag{access.Admin}
+
+	var sg interfaces.Syncgroup
+
+	err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
+		// Get the syncgroup information with auth check.
+		sgAuth := &syncgroupAuth{
+			db: sd.db,
+			id: sgId,
+		}
+		if _, err := common.GetDataWithAuth(ctx, call, sgAuth, allowEjectFromSyncgroup, tx, &sg); err != nil {
+			return err
+		}
+
+		// TODO(ivanpi): Check that client is not ejecting themselves. See comment
+		// in SetSyncgroupSpec().
+
+		return verror.NewErrNotImplemented(ctx)
+	})
+
+	return err
+}
+
 type syncgroups []wire.Id
 
 func (s syncgroups) Len() int {
@@ -903,6 +988,8 @@
 }
 
 func (sd *syncDatabase) ListSyncgroups(ctx *context.T, call rpc.ServerCall) ([]wire.Id, error) {
+	allowListSyncgroupsDb := []access.Tag{access.Read}
+
 	vlog.VI(2).Infof("sync: ListSyncgroups: begin")
 	defer vlog.VI(2).Infof("sync: ListSyncgroups: end")
 
@@ -910,7 +997,7 @@
 	defer sn.Abort()
 
 	// Check permissions on Database.
-	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+	if _, err := common.GetPermsWithAuth(ctx, call, sd.db, allowListSyncgroupsDb, sn); err != nil {
 		return nil, err
 	}
 
@@ -928,6 +1015,8 @@
 }
 
 func (sd *syncDatabase) GetSyncgroupSpec(ctx *context.T, call rpc.ServerCall, sgId wire.Id) (wire.SyncgroupSpec, string, error) {
+	allowGetSyncgroupSpec := []access.Tag{access.Read}
+
 	vlog.VI(2).Infof("sync: GetSyncgroupSpec: begin %v", sgId)
 	defer vlog.VI(2).Infof("sync: GetSyncgroupSpec: end: %v", sgId)
 
@@ -936,42 +1025,39 @@
 
 	var spec wire.SyncgroupSpec
 
-	// Check permissions on Database.
-	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+	// Get the syncgroup information with auth check.
+	var sg interfaces.Syncgroup
+	sgAuth := &syncgroupAuth{
+		db: sd.db,
+		id: sgId,
+	}
+	if _, err := common.GetDataWithAuth(ctx, call, sgAuth, allowGetSyncgroupSpec, sn, &sg); err != nil {
 		return spec, "", err
 	}
 
-	// Get the syncgroup information.
-	sg, err := getSyncgroupByGid(ctx, sn, SgIdToGid(sd.db.Id(), sgId))
-	if err != nil {
-		return spec, "", err
-	}
-	// TODO(hpucha): Check syncgroup ACL.
-
 	vlog.VI(2).Infof("sync: GetSyncgroupSpec: %v spec %v", sgId, sg.Spec)
 	return sg.Spec, sg.SpecVersion, nil
 }
 
 func (sd *syncDatabase) GetSyncgroupMembers(ctx *context.T, call rpc.ServerCall, sgId wire.Id) (map[string]wire.SyncgroupMemberInfo, error) {
+	allowGetSyncgroupMembers := []access.Tag{access.Read}
+
 	vlog.VI(2).Infof("sync: GetSyncgroupMembers: begin %v", sgId)
 	defer vlog.VI(2).Infof("sync: GetSyncgroupMembers: end: %v", sgId)
 
 	sn := sd.db.St().NewSnapshot()
 	defer sn.Abort()
 
-	// Check permissions on Database.
-	if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
+	// Get the syncgroup information with auth check.
+	var sg interfaces.Syncgroup
+	sgAuth := &syncgroupAuth{
+		db: sd.db,
+		id: sgId,
+	}
+	if _, err := common.GetDataWithAuth(ctx, call, sgAuth, allowGetSyncgroupMembers, sn, &sg); err != nil {
 		return nil, err
 	}
 
-	// Get the syncgroup information.
-	sg, err := getSyncgroupByGid(ctx, sn, SgIdToGid(sd.db.Id(), sgId))
-	if err != nil {
-		return nil, err
-	}
-
-	// TODO(hpucha): Check syncgroup ACL.
-
 	vlog.VI(2).Infof("sync: GetSyncgroupMembers: %v members %v, len %v", sgId, sg.Joiners, len(sg.Joiners))
 	joiners := make(map[string]wire.SyncgroupMemberInfo)
 	for key, value := range sg.Joiners {
@@ -981,6 +1067,8 @@
 }
 
 func (sd *syncDatabase) SetSyncgroupSpec(ctx *context.T, call rpc.ServerCall, sgId wire.Id, spec wire.SyncgroupSpec, version string) error {
+	allowSetSyncgroupSpec := []access.Tag{access.Admin}
+
 	vlog.VI(2).Infof("sync: SetSyncgroupSpec: begin %v %v %s", sgId, spec, version)
 	defer vlog.VI(2).Infof("sync: SetSyncgroupSpec: end: %v", sgId)
 
@@ -989,19 +1077,16 @@
 	}
 
 	ss := sd.sync.(*syncService)
-	dbId := sd.db.Id()
-	gid := SgIdToGid(dbId, sgId)
-	var sg *interfaces.Syncgroup
+	gid := SgIdToGid(sd.db.Id(), sgId)
+	var sg interfaces.Syncgroup
 
 	err := watchable.RunInTransaction(sd.db.St(), func(tx *watchable.Transaction) error {
-		// Check permissions on Database.
-		if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
-			return err
+		// Get the syncgroup information with auth check.
+		sgAuth := &syncgroupAuth{
+			db: sd.db,
+			id: sgId,
 		}
-
-		var err error
-		sg, err = getSyncgroupByGid(ctx, tx, gid)
-		if err != nil {
+		if _, err := common.GetDataWithAuth(ctx, call, sgAuth, allowSetSyncgroupSpec, tx, &sg); err != nil {
 			return err
 		}
 
@@ -1014,6 +1099,17 @@
 			return verror.New(verror.ErrBadArg, ctx, "cannot modify collections")
 		}
 
+		// Client must not remove themselves from Read ACL. LeaveSyncgroup should be
+		// used instead. (Assumes client is already on Read ACL.)
+		// Note, this check prevents only the common case of removing the client
+		// from the ACL. It is possible to revoke Syncbase access while pasing this
+		// check (e.g. blacklisting the blessing issued to Syncbase while continuing
+		// to allow own blessing). As with many other Syncbase sanity checks, it is
+		// best-effort only; failures still need to be handled gracefully.
+		if err := common.TagAuthorizer(access.Read, spec.Perms).Authorize(ctx, call.Security()); err != nil {
+			return verror.New(verror.ErrBadArg, ctx, "cannot remove self from syncgroup Read ACL, use LeaveSyncgroup instead")
+		}
+
 		sgState, err := getSGIdEntry(ctx, tx, gid)
 		if err != nil {
 			return err
@@ -1022,34 +1118,101 @@
 			return verror.NewErrBadState(ctx)
 		}
 
-		// Check if this peer is allowed to change the spec.
-		blessingNames, _ := security.RemoteBlessingNames(ctx, call.Security())
-		vlog.VI(4).Infof("sync: SetSyncgroupSpec: authorizing blessings %v against permissions %v", blessingNames, sg.Spec.Perms)
-		if !isAuthorizedForTag(sg.Spec.Perms, access.Admin, blessingNames) {
-			return verror.New(verror.ErrNoAccess, ctx)
-		}
-
 		// Reserve a log generation and position counts for the new syncgroup.
-		gen, pos := ss.reserveGenAndPosInDbLog(ctx, dbId, sgOID(gid), 1)
+		gen, pos := ss.reserveGenAndPosInDbLog(ctx, sd.db.Id(), sgOID(gid), 1)
 
 		newVersion := ss.newSyncgroupVersion()
 		sg.Spec = spec
 		sg.SpecVersion = newVersion
-		return ss.updateSyncgroupVersioning(ctx, tx, gid, newVersion, true, ss.id, gen, pos, sg)
+		return ss.updateSyncgroupVersioning(ctx, tx, gid, newVersion, true, ss.id, gen, pos, &sg)
 	})
 
 	if err != nil {
 		return err
 	}
-	if err = ss.advertiseSyncgroupInNeighborhood(sg); err != nil {
+	if err = ss.advertiseSyncgroupInNeighborhood(&sg); err != nil {
 		return err
 	}
-	return ss.checkptSgLocalGen(ctx, dbId, gid)
+	return ss.checkptSgLocalGen(ctx, sd.db.Id(), gid)
+}
+
+////////////////////////////////////////
+// Authorization hooks
+
+type syncgroupAuth struct {
+	db interfaces.Database
+	id wire.Id
+}
+
+var _ common.Permser = (*syncgroupAuth)(nil)
+
+func (sa *syncgroupAuth) GetDataWithExistAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, v common.PermserData) (parentPerms, perms access.Permissions, _ error) {
+	sg := v.(*interfaces.Syncgroup)
+	parentPerms, err := common.GetPermsWithExistAndParentResolveAuth(ctx, call, sa.db, st)
+	if err != nil {
+		return nil, nil, err
+	}
+	gotSg, getErr := getSyncgroupByGid(ctx, st, SgIdToGid(sa.db.Id(), sa.id))
+	if gotSg != nil {
+		*sg = *gotSg
+	}
+	err = common.ExistAuthStep(ctx, call, sa.id.String(), parentPerms, sg, getErr)
+	return parentPerms, sg.GetPerms(), err
+}
+
+func (sa *syncgroupAuth) PermserData() common.PermserData {
+	return &interfaces.Syncgroup{}
 }
 
 //////////////////////////////
 // Helper functions
 
+// verifyCollectionsForSync verifies, for all existing collections in the list,
+// that they are syncable to the caller:
+// - The caller must have Read access.
+// - The collection must not be frozen.
+// If mustExist is true, all the collections in the list must exist.
+// TODO(ivanpi): Also verify that data and ACL signature chain of trust can be
+// followed to the Collection creator (allowed by the id-based implicit perms)
+// based on available ACL history.
+func verifyCollectionsForSync(ctx *context.T, call rpc.ServerCall, db interfaces.Database, collections []wire.Id, mustExist bool, sntx store.SnapshotOrTransaction) error {
+	for _, cxId := range collections {
+		// Retrieve Collection perms, checking that the Collection exists.
+		// Note, only Collection ACLs are synced, so we do not need to check
+		// Database Resolve. We assume that the client is allowed to know that
+		// the Collections exist based on a previous Database permissions check.
+		cxPerms, err := db.GetCollectionPerms(ctx, cxId, sntx)
+		if err != nil {
+			if !mustExist && verror.ErrorID(err) == verror.ErrNoExist.ID {
+				// We can skip the missing Collection since mustExist is not set.
+				continue
+			}
+			return err
+		}
+
+		// Check Read access.
+		if err := common.TagAuthorizer(access.Read, cxPerms).Authorize(ctx, call.Security()); err != nil {
+			return err
+		}
+
+		// TODO(ivanpi): Check if Collection is frozen because the last syncgroup
+		// on it was left/destroyed.
+
+		// TODO(hpucha,ivanpi): Check signature provenance on Collection data and
+		// permissions.
+
+		// TODO(ivanpi): Since signatures are not implemented yet, we should
+		// sanity check that at least one writer exists on the Collection.
+		// However, this should be done only on Collections with no existing
+		// Syncgroups.
+		//if writeAcl, ok := cxPerms[access.Write]; !ok || len(writeAcl.In) == 0 {
+		//	return verror.New(verror.ErrBadState, ctx, fmt.Sprintf("collection %s has no writers", c.String()))
+		//}
+	}
+
+	return nil
+}
+
 // checkptSgLocalGen cuts a local generation for the specified syncgroup to
 // capture its updates.
 func (s *syncService) checkptSgLocalGen(ctx *context.T, dbId wire.Id, sgid interfaces.GroupId) error {
@@ -1334,14 +1497,6 @@
 	return interfaces.Syncgroup{}, "", interfaces.GenVector{}, verror.New(wire.ErrSyncgroupJoinFailed, ctxIn)
 }
 
-func authorize(ctx *context.T, call security.Call, sg *interfaces.Syncgroup) error {
-	auth := access.TypicalTagTypePermissionsAuthorizer(sg.Spec.Perms)
-	if err := auth.Authorize(ctx, call); err != nil {
-		return verror.New(verror.ErrNoAccess, ctx, err)
-	}
-	return nil
-}
-
 // isAuthorizedForTag returns whether at least one of the blessingNames is
 // authorized via the specified tag in perms.
 func isAuthorizedForTag(perms access.Permissions, tag access.Tag, blessingNames []string) bool {
@@ -1370,6 +1525,10 @@
 	vlog.VI(2).Infof("sync: PublishSyncgroup: begin: %s from peer %s", sg.Id, publisher)
 	defer vlog.VI(2).Infof("sync: PublishSyncgroup: end: %s from peer %s", sg.Id, publisher)
 
+	// TODO(ivanpi): Add separate ACL for PublishSyncgroup and check it.
+	// TODO(ivanpi): Ensure that Database existence is not leaked.
+	// TODO(hpucha): Ensure node is on Admin ACL.
+
 	st, err := s.getDbStore(ctx, call, sg.DbId)
 	if err != nil {
 		return s.name, err
@@ -1404,9 +1563,6 @@
 
 		// Publish the syncgroup.
 
-		// TODO(hpucha): Use some ACL check to allow/deny publishing.
-		// TODO(hpucha): Ensure node is on Admin ACL.
-
 		return s.addSyncgroup(ctx, tx, version, false, publisher, genvec, 0, 0, 0, &sg)
 	})
 
@@ -1432,6 +1588,8 @@
 
 	nullSG, nullGV := interfaces.Syncgroup{}, interfaces.GenVector{}
 
+	// TODO(ivanpi): Ensure that Database and syncgroup existence is not leaked.
+
 	// If this admin is offline, it shouldn't accept the join request since it
 	// would be unable to send out the new syncgroup updates. However, it is still
 	// possible that the admin goes offline right after processing the request.
@@ -1439,14 +1597,14 @@
 		return nullSG, "", nullGV, interfaces.NewErrDbOffline(ctx, dbId)
 	}
 
-	// Find the database store for this syncgroup.
-	dbSt, err := s.getDbStore(ctx, call, dbId)
+	// Find the database for this syncgroup.
+	db, err := s.sv.Database(ctx, call, dbId)
 	if err != nil {
 		return nullSG, "", nullGV, verror.New(verror.ErrNoExist, ctx, "Database not found", dbId)
 	}
 
 	gid := SgIdToGid(dbId, sgId)
-	if _, err = getSyncgroupVersion(ctx, dbSt, gid); err != nil {
+	if _, err = getSyncgroupVersion(ctx, db.St(), gid); err != nil {
 		vlog.VI(4).Infof("sync: JoinSyncgroupAtAdmin: end: %v from peer %s, err in sg search %v", sgId, joinerName, err)
 		return nullSG, "", nullGV, verror.New(verror.ErrNoExist, ctx, "Syncgroup not found", sgId)
 	}
@@ -1456,7 +1614,7 @@
 	var sg *interfaces.Syncgroup
 	var gen, pos uint64
 
-	err = watchable.RunInTransaction(dbSt, func(tx *watchable.Transaction) error {
+	err = watchable.RunInTransaction(db.St(), func(tx *watchable.Transaction) error {
 		var err error
 		sg, err = getSyncgroupByGid(ctx, tx, gid)
 		if err != nil {
@@ -1470,7 +1628,7 @@
 
 		// Check SG ACL. Caller must have Read access on the syncgroup
 		// ACL to join a syncgroup.
-		if err := authorize(ctx, call.Security(), sg); err != nil {
+		if err := common.TagAuthorizer(access.Read, sg.Spec.Perms).Authorize(ctx, call.Security()); err != nil {
 			return err
 		}
 
@@ -1483,6 +1641,11 @@
 			return verror.NewErrBadState(ctx)
 		}
 
+		// Check that all the collections in spec are syncable to the joiner.
+		if err := verifyCollectionsForSync(ctx, call, db, sg.Spec.Collections, true, tx); err != nil {
+			return err
+		}
+
 		// Reserve a log generation and position counts for the new syncgroup.
 		gen, pos = s.reserveGenAndPosInDbLog(ctx, dbId, sgoid, 1)
 
diff --git a/services/syncbase/vsync/testutil_test.go b/services/syncbase/vsync/testutil_test.go
index 7b8e2e9..442811f 100644
--- a/services/syncbase/vsync/testutil_test.go
+++ b/services/syncbase/vsync/testutil_test.go
@@ -68,7 +68,7 @@
 }
 
 func (s *mockService) GetDataWithExistAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, v common.PermserData) (parentPerms, perms access.Permissions, existErr error) {
-	return nil, nil, nil
+	return nil, nil, verror.NewErrNotImplemented(ctx)
 }
 
 func (s *mockService) PermserData() common.PermserData {
@@ -93,8 +93,8 @@
 	return d.st
 }
 
-func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
-	return verror.NewErrNotImplemented(ctx)
+func (d *mockDatabase) GetCollectionPerms(ctx *context.T, cxId wire.Id, st store.StoreReader) (access.Permissions, error) {
+	return nil, verror.NewErrNotImplemented(ctx)
 }
 
 func (d *mockDatabase) GetSchemaMetadataInternal(ctx *context.T) (*wire.SchemaMetadata, error) {
@@ -110,7 +110,7 @@
 }
 
 func (d *mockDatabase) GetDataWithExistAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, v common.PermserData) (parentPerms, perms access.Permissions, existErr error) {
-	return nil, nil, nil
+	return nil, nil, verror.NewErrNotImplemented(ctx)
 }
 
 func (d *mockDatabase) PermserData() common.PermserData {