mounttable: Make ACLs actually so something useful.

1) The Tags are explained in service.vdl.  Very similar to Unix.  The
only non trivial one is for Glob.  There you can glob any entry in
a directory if the directory has Read access set for the client.
If the directory has Resolve access but not Read access the client
can still see entries if it has any access to them.

2) If the acl config file grants Admin access to the root to any user,
that user becomes a superuser for the whole mounttable.   Subsequent
changes to the root's ACL cannot change that.

3) The acl config file can also include acl templates.  This is to allow
us to say things like "any google user can create a name matching his
user id in directory google/users".  The patterns are extremely
constrained and can only come from the config file (i.e. not from
SetACL).  I may make that more flexible in the future.  A pattern
looks like:

"google/users/%%": {
	"Admin":  { "In": ["google/%%/..."] }
}

This says that a user has Admin access to any name "google/users/xxx"
if he has a blessing that matches "google/xxx/...".  The template
variable %% can only appear at the end of the name but can appear
anywhere in the blessing pattern.  I may also relax that in the
future but this already allows us to do some fairly useful
stuff if we offer a mounttable service to the world.

Change-Id: I363d723b8eb25dc2752079be68609e1d1314dfb9
diff --git a/services/mounttable/lib/mounttable.go b/services/mounttable/lib/mounttable.go
index 1eff3b2..d1fc1bc 100644
--- a/services/mounttable/lib/mounttable.go
+++ b/services/mounttable/lib/mounttable.go
@@ -4,7 +4,7 @@
 	"encoding/json"
 	"fmt"
 	"os"
-	"path"
+	"reflect"
 	"strings"
 	"sync"
 	"time"
@@ -17,33 +17,39 @@
 	"v.io/core/veyron2/security"
 	"v.io/core/veyron2/services/mounttable"
 	"v.io/core/veyron2/services/security/access"
-	verror "v.io/core/veyron2/verror2"
+	"v.io/core/veyron2/verror2"
 	"v.io/core/veyron2/vlog"
 )
 
 var (
-	errNamingLoop = verror.Register("v.io/core/veyron/services/mountable/lib", verror.NoRetry, "Loop in namespace")
+	errNamingLoop = verror2.Register("v.io/core/veyron/services/mountable/lib", verror2.NoRetry, "Loop in namespace")
+	traverseTags  = []mounttable.Tag{mounttable.Read, mounttable.Resolve, mounttable.Create, mounttable.Admin}
+	createTags    = []mounttable.Tag{mounttable.Create, mounttable.Admin}
+	removeTags    = []mounttable.Tag{mounttable.Admin}
+	mountTags     = []mounttable.Tag{mounttable.Mount, mounttable.Admin}
+	globTags      = []mounttable.Tag{mounttable.Read, mounttable.Admin}
+	setTags       = []mounttable.Tag{mounttable.Admin}
+	getTags       = []mounttable.Tag{mounttable.Admin, mounttable.Read}
+	allTags       = []mounttable.Tag{mounttable.Read, mounttable.Resolve, mounttable.Admin, mounttable.Mount, mounttable.Create}
 )
 
 // mountTable represents a namespace.  One exists per server instance.
 type mountTable struct {
 	sync.RWMutex
-	root *node
-	acls map[string]security.Authorizer
+	root       *node
+	superUsers access.ACL
 }
 
 var _ ipc.Dispatcher = (*mountTable)(nil)
 
 // mountContext represents a client bind.  The name is the name that was bound to.
 type mountContext struct {
-	name         string
-	elems        []string // parsed elements of name
-	cleanedElems []string // parsed elements of cleaned name (with leading /
-	// and double / removed).
-	mt *mountTable
+	name  string
+	elems []string // parsed elements of name
+	mt    *mountTable
 }
 
-// mount represents a single mount point.  It contains OAs of all servers mounted
+// mount represents a single mount point.  It contains the rooted names of all servers mounted
 // here.  The servers are considered equivalent, i.e., RPCs to a name below this
 // point can be sent to any of these servers.
 type mount struct {
@@ -53,11 +59,15 @@
 
 // node is a single point in the tree representing the mount table.
 type node struct {
-	parent   *node
-	mount    *mount
-	children map[string]*node
+	parent     *node
+	mount      *mount
+	children   map[string]*node
+	acls       *TAMG
+	amTemplate access.TaggedACLMap
 }
 
+const templateVar = "%%"
+
 // NewMountTable creates a new server that uses the ACLs specified in
 // aclfile for authorization.
 //
@@ -65,40 +75,63 @@
 // access.TaggedACLMap for that path. The tags used in the map are the typical
 // access tags (the Tag type defined in veyron2/services/security/access).
 func NewMountTable(aclfile string) (*mountTable, error) {
-	acls, err := parseACLs(aclfile)
-	if err != nil {
+	mt := &mountTable{
+		root: new(node),
+	}
+	if err := mt.parseACLs(aclfile); err != nil {
 		return nil, err
 	}
-	return &mountTable{
-		root: new(node),
-		acls: acls,
-	}, nil
+	return mt, nil
 }
 
-func parseACLs(path string) (map[string]security.Authorizer, error) {
+func (mt *mountTable) parseACLs(path string) error {
+	vlog.VI(2).Infof("parseACLs(%s)", path)
 	if path == "" {
-		return nil, nil
+		return nil
 	}
-	var acls map[string]access.TaggedACLMap
+	var tams map[string]access.TaggedACLMap
 	f, err := os.Open(path)
 	if err != nil {
-		return nil, err
+		return err
 	}
 	defer f.Close()
-	if err = json.NewDecoder(f).Decode(&acls); err != nil {
-		return nil, err
+	if err = json.NewDecoder(f).Decode(&tams); err != nil {
+		return err
 	}
-	result := make(map[string]security.Authorizer)
-	for name, acl := range acls {
-		result[name], err = access.TaggedACLAuthorizer(acl, access.TypicalTagType())
-		if err != nil {
-			return nil, fmt.Errorf("Unable to create ACL for %q: %v", name, err)
+	for name, tam := range tams {
+		var elems []string
+		isPattern := false
+		// Create name and add the ACL map to it.
+		if len(name) == 0 {
+			// If the config file has is an Admin tag on the root ACL, the
+			// list of Admin users is the equivalent of a super user for
+			// the whole table.  This is not updated if the ACL is later
+			// modified.
+			if acl, exists := tam[string(mounttable.Admin)]; exists {
+				mt.superUsers = acl
+			}
+		} else {
+			// ACL templates terminate with a %% element.  These are very
+			// constrained matches, i.e., the trailing element of the name
+			// is copied into every %% in the ACL.
+			elems = strings.Split(name, "/")
+			if elems[len(elems)-1] == templateVar {
+				isPattern = true
+				elems = elems[:len(elems)-1]
+			}
+		}
+
+		n, err := mt.findNode(nil, elems, true, nil)
+		if n != nil || err == nil {
+			vlog.VI(2).Infof("added tam %v to %s", tam, name)
+			if isPattern {
+				n.amTemplate = tam
+			} else {
+				n.acls, _ = n.acls.Set("", tam)
+			}
 		}
 	}
-	if result["/"] == nil {
-		return nil, fmt.Errorf("No acl for / in %s", path)
-	}
-	return result, nil
+	return nil
 }
 
 // Lookup implements ipc.Dispatcher.Lookup.
@@ -112,42 +145,10 @@
 	}
 	if len(name) > 0 {
 		ms.elems = strings.Split(name, "/")
-		ms.cleanedElems = strings.Split(strings.TrimLeft(path.Clean(name), "/"), "/")
 	}
 	return mounttable.MountTableServer(ms), ms, nil
 }
 
-// findNode returns the node for the name path represented by elems.  If none exists and create is false, return nil.
-// Otherwise create the path and return a pointer to the terminal node.
-func (mt *mountTable) findNode(elems []string, create bool) *node {
-	cur := mt.root
-
-	// Iterate down the tree.
-	for _, e := range elems {
-		// if we hit another mount table, we're done
-		if cur.mount != nil {
-			return nil
-		}
-		// then walk the children
-		c, ok := cur.children[e]
-		if ok {
-			cur = c
-			continue
-		}
-		if !create {
-			return nil
-		}
-		next := new(node)
-		if cur.children == nil {
-			cur.children = make(map[string]*node)
-		}
-		cur.children[e] = next
-		next.parent = cur
-		cur = next
-	}
-	return cur
-}
-
 // isActive returns true if a mount has unexpired servers attached.
 func (m *mount) isActive() bool {
 	if m == nil {
@@ -156,78 +157,199 @@
 	return m.servers.removeExpired() > 0
 }
 
-// walk returns the first mount point node on the elems path and the suffix of elems below that mount point.
-// If no mount point is found, it returns nil,nil.
-func (mt *mountTable) walk(n *node, elems []string) (*node, []string) {
-	if n.mount.isActive() {
-		return n, elems
-	} else if n.mount != nil {
-		n.removeUseless()
-	}
-	if len(elems) > 0 {
-		if c, ok := n.children[elems[0]]; ok {
-			if nn, nelems := mt.walk(c, elems[1:]); nn != nil {
-				return nn, nelems
-			}
-		}
-	}
-	return nil, nil
-}
-
-func (mt *mountTable) authorizeStep(name string, c security.Context) error {
-	if mt.acls == nil {
+// satisfies returns no error if the ctx + n.acls satisfies the associated one of the required Tags.
+func (n *node) satisfies(mt *mountTable, ctx ipc.ServerContext, tags []mounttable.Tag) error {
+	// No ACLs means everything (for now).
+	if ctx == nil || tags == nil || n.acls == nil {
 		return nil
 	}
-	mt.Lock()
-	acl := mt.acls[name]
-	mt.Unlock()
-	vlog.VI(2).Infof("authorizeStep(%v) %v %v %v", name, c.RemoteBlessings(), c.MethodTags(), acl)
-	if acl != nil {
-		return acl.Authorize(c)
+	// "Self-RPCs" are always authorized.
+	if l, r := ctx.LocalBlessings(), ctx.RemoteBlessings(); l != nil && r != nil && reflect.DeepEqual(l.PublicKey(), r.PublicKey()) {
+		return nil
 	}
-	return nil
+	// Match client's blessings against the ACLs.
+	var blessings []string
+	if ctx.RemoteBlessings() != nil {
+		blessings = ctx.RemoteBlessings().ForContext(ctx)
+	}
+	for _, tag := range tags {
+		if acl, exists := n.acls.GetACLForTag(string(tag)); exists && acl.Includes(blessings...) {
+			return nil
+		}
+	}
+	if mt.superUsers.Includes(blessings...) {
+		return nil
+	}
+	return fmt.Errorf("%v does not match ACL", blessings)
+}
+
+func expand(acl *access.ACL, name string) *access.ACL {
+	newACL := new(access.ACL)
+	for _, bp := range acl.In {
+		newACL.In = append(newACL.In, security.BlessingPattern(strings.Replace(string(bp), templateVar, name, -1)))
+	}
+	for _, bp := range acl.NotIn {
+		newACL.NotIn = append(newACL.NotIn, strings.Replace(bp, templateVar, name, -1))
+	}
+	return newACL
+}
+
+// satisfiesTemplate returns no error if the ctx + n.amTemplate satisfies the associated one of
+// the required Tags.
+func (n *node) satisfiesTemplate(ctx ipc.ServerContext, tags []mounttable.Tag, name string) error {
+	if n.amTemplate == nil {
+		return nil
+	}
+	// Match client's blessings against the ACLs.
+	var blessings []string
+	if ctx.RemoteBlessings() != nil {
+		blessings = ctx.RemoteBlessings().ForContext(ctx)
+	}
+	for _, tag := range tags {
+		if acl, exists := n.amTemplate[string(tag)]; exists && expand(&acl, name).Includes(blessings...) {
+			return nil
+		}
+	}
+	return fmt.Errorf("%v does not match ACL", blessings)
+}
+
+// copyACLs copies one nodes ACLs to another and adds the clients blessings as
+// patterns to the Admin tag.
+func copyACLs(ctx ipc.ServerContext, cur *node) *TAMG {
+	if ctx == nil {
+		return nil
+	}
+	if cur.acls == nil {
+		return nil
+	}
+	acls := cur.acls.Copy()
+	var blessings []string
+	if ctx.RemoteBlessings() != nil {
+		blessings = ctx.RemoteBlessings().ForContext(ctx)
+	}
+	for _, b := range blessings {
+		acls.Add(security.BlessingPattern(b), string(mounttable.Admin))
+	}
+	return acls
+}
+
+// createTAMGFromTemplate creates a new TAMG from the template subsituting name for %% everywhere.
+func createTAMGFromTemplate(tam access.TaggedACLMap, name string) *TAMG {
+	tamg := NewTAMG()
+	for tag, acl := range tam {
+		tamg.tam[tag] = *expand(&acl, name)
+	}
+	return tamg
+}
+
+// traverse returns the node for the path represented by elems.  If none exists and create is false, return nil.
+// Otherwise create the path and return a pointer to the terminal node.  If a mount point is encountered
+// while following the path, return that node and any remaining elems.
+func (mt *mountTable) traverse(ctx ipc.ServerContext, cur *node, elems []string, create bool) (*node, []string, error) {
+	// Iterate down the tree.
+	for i, e := range elems {
+		vlog.VI(2).Infof("satisfying %v %v", elems[0:i], *cur)
+		if err := cur.satisfies(mt, ctx, traverseTags); err != nil {
+			return nil, nil, err
+		}
+		// If we hit another mount table, we're done.
+		if cur.mount.isActive() {
+			return cur, elems[i:], nil
+		}
+		// Walk the children looking for a match.
+		c, ok := cur.children[e]
+		if ok {
+			cur = c
+			continue
+		}
+		if !create {
+			return nil, nil, nil
+		}
+		// Create a new node and keep recursing.
+		if err := cur.satisfies(mt, ctx, createTags); err != nil {
+			return nil, nil, err
+		}
+		if err := cur.satisfiesTemplate(ctx, createTags, e); err != nil {
+			return nil, nil, err
+		}
+		next := new(node)
+		if cur.children == nil {
+			cur.children = make(map[string]*node)
+		}
+		cur.children[e] = next
+		next.parent = cur
+		if cur.amTemplate != nil {
+			next.acls = createTAMGFromTemplate(cur.amTemplate, e)
+		} else {
+			next.acls = copyACLs(ctx, cur)
+		}
+		cur = next
+	}
+	return cur, nil, nil
+}
+
+// findNode finds a node in the table and optionally creates a path to it.
+func (mt *mountTable) findNode(ctx ipc.ServerContext, elems []string, create bool, tags []mounttable.Tag) (*node, error) {
+	n, nelems, err := mt.traverse(ctx, mt.root, elems, create)
+	if err != nil {
+		return nil, err
+	}
+	if len(nelems) > 0 {
+		return nil, nil // Node not found.
+	}
+	if n == nil {
+		return nil, nil
+	}
+	if err := n.satisfies(mt, ctx, tags); err != nil {
+		return nil, err
+	}
+	return n, nil
+}
+
+// findMountPoint returns the first mount point encountered in the path and
+// any elements remaining of the path.
+func (mt *mountTable) findMountPoint(ctx ipc.ServerContext, n *node, elems []string) (*node, []string, error) {
+	var err error
+	n, elems, err = mt.traverse(ctx, n, elems, false)
+	if err != nil {
+		return nil, nil, err
+	}
+	if n == nil || !n.mount.isActive() {
+		return nil, nil, nil
+	}
+	return n, elems, nil
 }
 
 // Authorize verifies that the client has access to the requested node.
-// Checks the acls on all nodes in the path starting at the root.
-func (ms *mountContext) Authorize(context security.Context) error {
-	if err := ms.mt.authorizeStep("/", context); err != nil {
-		return err
-	}
-	key := ""
-	for _, step := range ms.cleanedElems {
-		key := key + "/" + step
-		if err := ms.mt.authorizeStep(key, context); err != nil {
-			return err
-		}
-	}
+// Since we do the check at the time of access, we always return OK here.
+func (ms *mountContext) Authorize(ctx security.Context) error {
 	return nil
 }
 
 // ResolveStep returns the next server in a resolution, the name remaining below that server,
 // and whether or not that server is another mount table.
-func (ms *mountContext) ResolveStep(context ipc.ServerContext) (servers []naming.VDLMountedServer, suffix string, err error) {
+func (ms *mountContext) ResolveStep(ctx ipc.ServerContext) (servers []naming.VDLMountedServer, suffix string, err error) {
 	vlog.VI(2).Infof("ResolveStep %q", ms.name)
 	mt := ms.mt
-	// TODO(caprita): we need to grab a write lock because walk may
-	// garbage-collect expired servers.  Rework this to avoid this potential
-	// bottleneck.
 	mt.Lock()
 	defer mt.Unlock()
 	// Find the next mount point for the name.
-	n, elems := mt.walk(mt.root, ms.elems)
+	n, elems, err := mt.findMountPoint(ctx, mt.root, ms.elems)
+	if err != nil {
+		return nil, "", err
+	}
 	if n == nil {
 		if len(ms.elems) == 0 {
-			return nil, ms.name, verror.Make(naming.ErrNoSuchNameRoot, context.Context(), ms.name)
+			return nil, ms.name, verror2.Make(naming.ErrNoSuchNameRoot, ctx.Context(), ms.name)
 		}
-		return nil, ms.name, verror.Make(naming.ErrNoSuchName, context.Context(), ms.name)
+		return nil, ms.name, verror2.Make(naming.ErrNoSuchName, ctx.Context(), ms.name)
 	}
 	return n.mount.servers.copyToSlice(), strings.Join(elems, "/"), nil
 }
 
 // ResolveStepX returns the next server in a resolution in the form of a MountEntry.  The name
 // in the mount entry is the name relative to the server's root.
-func (ms *mountContext) ResolveStepX(context ipc.ServerContext) (entry naming.VDLMountEntry, err error) {
+func (ms *mountContext) ResolveStepX(ctx ipc.ServerContext) (entry naming.VDLMountEntry, err error) {
 	vlog.VI(2).Infof("ResolveStep %q", ms.name)
 	mt := ms.mt
 	// TODO(caprita): we need to grab a write lock because walk may
@@ -236,13 +358,17 @@
 	mt.Lock()
 	defer mt.Unlock()
 	// Find the next mount point for the name.
-	n, elems := mt.walk(mt.root, ms.elems)
+	n, elems, werr := mt.findMountPoint(ctx, mt.root, ms.elems)
+	if werr != nil {
+		err = werr
+		return
+	}
 	if n == nil {
 		entry.Name = ms.name
 		if len(ms.elems) == 0 {
-			err = verror.Make(naming.ErrNoSuchNameRoot, context.Context(), ms.name)
+			err = verror2.Make(naming.ErrNoSuchNameRoot, ctx.Context(), ms.name)
 		} else {
-			err = verror.Make(naming.ErrNoSuchName, context.Context(), ms.name)
+			err = verror2.Make(naming.ErrNoSuchName, ctx.Context(), ms.name)
 		}
 		return
 	}
@@ -261,7 +387,7 @@
 }
 
 // Mount a server onto the name in the receiver.
-func (ms *mountContext) Mount(context ipc.ServerContext, server string, ttlsecs uint32, flags naming.MountFlag) error {
+func (ms *mountContext) Mount(ctx ipc.ServerContext, server string, ttlsecs uint32, flags naming.MountFlag) error {
 	mt := ms.mt
 	if ttlsecs == 0 {
 		ttlsecs = 10 * 365 * 24 * 60 * 60 // a really long time
@@ -270,7 +396,7 @@
 
 	// Make sure the server name is reasonable.
 	epString, _ := naming.SplitAddressName(server)
-	runtime := veyron2.RuntimeFromContext(context.Context())
+	runtime := veyron2.RuntimeFromContext(ctx.Context())
 	_, err := runtime.NewEndpoint(epString)
 	if err != nil {
 		return fmt.Errorf("malformed address %q for mounted server %q", epString, server)
@@ -279,15 +405,16 @@
 	// Find/create node in namespace and add the mount.
 	mt.Lock()
 	defer mt.Unlock()
-	n := mt.findNode(ms.cleanedElems, true)
+	n, werr := mt.findNode(ctx, ms.elems, true, mountTags)
+	if werr != nil {
+		return werr
+	}
 	if n == nil {
-		return verror.Make(naming.ErrNoSuchNameRoot, context.Context(), ms.name)
+		return verror2.Make(naming.ErrNoSuchNameRoot, ctx.Context(), ms.name)
 	}
 	if hasReplaceFlag(flags) {
 		n.mount = nil
 	}
-	// TODO(p): When the endpoint actually has the ServesMountTable bit,
-	// or this with ep.ServesMountTable().
 	wantMT := hasMTFlag(flags)
 	if n.mount == nil {
 		n.mount = &mount{servers: NewServerList(), mt: wantMT}
@@ -302,7 +429,7 @@
 
 // A useful node has children or an active mount.
 func (n *node) isUseful() bool {
-	return len(n.children) > 0 || n.mount.isActive()
+	return len(n.children) > 0 || n.mount.isActive() || n.acls != nil
 }
 
 // removeUseless removes a node and all of its ascendants that are not useful.
@@ -336,11 +463,14 @@
 
 // Unmount removes servers from the name in the receiver. If server is specified, only that
 // server is removed.
-func (ms *mountContext) Unmount(context ipc.ServerContext, server string) error {
+func (ms *mountContext) Unmount(ctx ipc.ServerContext, server string) error {
 	mt := ms.mt
 	mt.Lock()
 	defer mt.Unlock()
-	n := mt.findNode(ms.cleanedElems, false)
+	n, err := mt.findNode(ctx, ms.elems, false, mountTags)
+	if err != nil {
+		return err
+	}
 	if n == nil {
 		return nil
 	}
@@ -355,25 +485,39 @@
 	return nil
 }
 
+// Delete removes the receiver.  If all is true, any subtree is also removed.
+func (ms *mountContext) Delete(ctx ipc.ServerContext, deleteSubTree bool) error {
+	mt := ms.mt
+	mt.Lock()
+	defer mt.Unlock()
+	n, err := mt.findNode(ctx, ms.elems, false, removeTags)
+	if err != nil {
+		return err
+	}
+	if n == nil {
+		return nil
+	}
+	if !deleteSubTree && len(n.children) > 0 {
+		return fmt.Errorf("cannot remove %s: has children", ms.name)
+	}
+	for k, c := range n.parent.children {
+		if c == n {
+			delete(n.parent.children, k)
+			break
+		}
+	}
+	return nil
+}
+
 // A struct holding a partial result of Glob.
 type globEntry struct {
 	n    *node
 	name string
 }
 
-func (mt *mountTable) globStep(n *node, name string, pattern *glob.Glob, context ipc.ServerContext, ch chan<- naming.VDLMountEntry) {
+func (mt *mountTable) globStep(n *node, name string, pattern *glob.Glob, ctx ipc.ServerContext, ch chan<- naming.VDLMountEntry) {
 	vlog.VI(2).Infof("globStep(%s, %s)", name, pattern)
 
-	if mt.acls != nil {
-		acl_name := "/" + strings.TrimLeft(naming.Join(context.Suffix(), name), "/")
-		// Skip this node if the user isn't authorized.
-		if acl := mt.acls[acl_name]; acl != nil {
-			if err := acl.Authorize(context); err != nil {
-				return
-			}
-		}
-	}
-
 	// If this is a mount point, we're done.
 	if m := n.mount; m != nil {
 		// Garbage-collect if expired.
@@ -395,6 +539,10 @@
 			n.removeUseless()
 			return
 		}
+		// To see anything, one has to have some access to the node.
+		if err := n.satisfies(mt, ctx, allTags); err != nil {
+			return
+		}
 		ch <- naming.VDLMountEntry{Name: name}
 	}
 
@@ -402,10 +550,24 @@
 		return
 	}
 
-	// Recurse through the children.
+	// Recurse through the children.  OK if client has read access to the
+	// directory or traverse access and any access to the child.
+	allAllowed := true
+	if err := n.satisfies(mt, ctx, globTags); err != nil {
+		allAllowed = false
+		if err := n.satisfies(mt, ctx, traverseTags); err != nil {
+			return
+		}
+	}
 	for k, c := range n.children {
 		if ok, _, suffix := pattern.MatchInitialSegment(k); ok {
-			mt.globStep(c, naming.Join(name, k), suffix, context, ch)
+			if !allAllowed {
+				// If child allows any access show it.  Otherwise, skip.
+				if err := c.satisfies(mt, ctx, allTags); err != nil {
+					continue
+				}
+			}
+			mt.globStep(c, naming.Join(name, k), suffix, ctx, ch)
 		}
 	}
 }
@@ -413,7 +575,7 @@
 // Glob finds matches in the namespace.  If we reach a mount point before matching the
 // whole pattern, return that mount point.
 // pattern is a glob pattern as defined by the veyron/lib/glob package.
-func (ms *mountContext) Glob__(context ipc.ServerContext, pattern string) (<-chan naming.VDLMountEntry, error) {
+func (ms *mountContext) Glob__(ctx ipc.ServerContext, pattern string) (<-chan naming.VDLMountEntry, error) {
 	vlog.VI(2).Infof("mt.Glob %v", ms.elems)
 
 	g, err := glob.Parse(pattern)
@@ -422,7 +584,6 @@
 	}
 
 	mt := ms.mt
-
 	ch := make(chan naming.VDLMountEntry)
 	go func() {
 		defer close(ch)
@@ -431,24 +592,26 @@
 		// bottleneck.
 		mt.Lock()
 		defer mt.Unlock()
-
+		// If there was an access error, just ignore the entry, i.e., make it invisible.
+		n, err := mt.findNode(ctx, ms.elems, false, nil)
+		if err != nil {
+			return
+		}
 		// If the current name is not fully resolvable on this nameserver we
 		// don't need to evaluate the glob expression. Send a partially resolved
 		// name back to the client.
-		n := mt.findNode(ms.cleanedElems, false)
 		if n == nil {
-			ms.linkToLeaf(ch)
+			ms.linkToLeaf(ctx, ch)
 			return
 		}
-
-		mt.globStep(n, "", g, context, ch)
+		mt.globStep(n, "", g, ctx, ch)
 	}()
 	return ch, nil
 }
 
-func (ms *mountContext) linkToLeaf(ch chan<- naming.VDLMountEntry) {
-	n, elems := ms.mt.walk(ms.mt.root, ms.cleanedElems)
-	if n == nil {
+func (ms *mountContext) linkToLeaf(ctx ipc.ServerContext, ch chan<- naming.VDLMountEntry) {
+	n, elems, err := ms.mt.findMountPoint(ctx, ms.mt.root, ms.elems)
+	if err != nil || n == nil {
 		return
 	}
 	servers := n.mount.servers.copyToSlice()
@@ -457,3 +620,40 @@
 	}
 	ch <- naming.VDLMountEntry{Name: "", Servers: servers}
 }
+
+func (ms *mountContext) SetACL(ctx ipc.ServerContext, tam access.TaggedACLMap, etag string) error {
+	vlog.VI(2).Infof("SetACL %q", ms.name)
+	mt := ms.mt
+
+	// Find/create node in namespace and add the mount.
+	mt.Lock()
+	defer mt.Unlock()
+	n, err := mt.findNode(ctx, ms.elems, true, setTags)
+	if err != nil {
+		return err
+	}
+	if n == nil {
+		// TODO(p): can this even happen?
+		return verror2.Make(naming.ErrNoSuchName, ctx.Context(), ms.name)
+	}
+	n.acls, err = n.acls.Set(etag, tam)
+	return err
+}
+
+func (ms *mountContext) GetACL(ctx ipc.ServerContext) (access.TaggedACLMap, string, error) {
+	vlog.VI(2).Infof("GetACL %q", ms.name)
+	mt := ms.mt
+
+	// Find node in namespace and add the mount.
+	mt.Lock()
+	defer mt.Unlock()
+	n, err := mt.findNode(ctx, ms.elems, false, getTags)
+	if err != nil {
+		return nil, "", err
+	}
+	if n == nil {
+		return nil, "", verror2.Make(naming.ErrNoSuchName, ctx.Context(), ms.name)
+	}
+	etag, tam := n.acls.Get()
+	return tam, etag, nil
+}
diff --git a/services/mounttable/lib/mounttable_test.go b/services/mounttable/lib/mounttable_test.go
index ad6477c..623df86 100644
--- a/services/mounttable/lib/mounttable_test.go
+++ b/services/mounttable/lib/mounttable_test.go
@@ -16,6 +16,7 @@
 	"v.io/core/veyron2/options"
 	"v.io/core/veyron2/rt"
 	"v.io/core/veyron2/security"
+	"v.io/core/veyron2/services/security/access"
 	"v.io/core/veyron2/vlog"
 
 	"v.io/core/veyron/lib/testutil"
@@ -83,6 +84,108 @@
 	}
 }
 
+func doGetACL(t *testing.T, ep, suffix string, shouldSucceed bool, as veyron2.Runtime) (acl access.TaggedACLMap, etag string) {
+	name := naming.JoinAddressName(ep, suffix)
+	ctx := as.NewContext()
+	client := as.Client()
+	call, err := client.StartCall(ctx, name, "GetACL", nil, options.NoResolve(true))
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to GetACL %s: %s", name, err)
+	}
+	if ierr := call.Finish(&acl, &etag, &err); ierr != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to GetACL %s: %s", name, ierr)
+	}
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to GetACL %s: %s", name, err)
+	}
+	return
+}
+
+func doSetACL(t *testing.T, ep, suffix string, acl access.TaggedACLMap, etag string, shouldSucceed bool, as veyron2.Runtime) {
+	name := naming.JoinAddressName(ep, suffix)
+	ctx := as.NewContext()
+	client := as.Client()
+	call, err := client.StartCall(ctx, name, "SetACL", []interface{}{acl, etag}, options.NoResolve(true))
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to SetACL %s: %s", name, err)
+	}
+	if ierr := call.Finish(&err); ierr != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to SetACL %s: %s", name, ierr)
+	}
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to SetACL %s: %s", name, err)
+	}
+	return
+}
+
+func doDeleteNode(t *testing.T, ep, suffix string, shouldSucceed bool, as veyron2.Runtime) {
+	name := naming.JoinAddressName(ep, suffix)
+	ctx := as.NewContext()
+	client := as.Client()
+	call, err := client.StartCall(ctx, name, "Delete", []interface{}{false}, options.NoResolve(true))
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete node %s: %s", name, err)
+	}
+	if ierr := call.Finish(&err); ierr != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete node %s: %s", name, ierr)
+	}
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete node %s: %s", name, err)
+	}
+}
+
+func doDeleteSubtree(t *testing.T, ep, suffix string, shouldSucceed bool, as veyron2.Runtime) {
+	name := naming.JoinAddressName(ep, suffix)
+	ctx := as.NewContext()
+	client := as.Client()
+	call, err := client.StartCall(ctx, name, "Delete", []interface{}{true}, options.NoResolve(true))
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete subtree %s: %s", name, err)
+	}
+	if ierr := call.Finish(&err); ierr != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete subtree %s: %s", name, ierr)
+	}
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete subtree %s: %s", name, err)
+	}
+}
+
 // resolve assumes that the mount contains 0 or 1 servers.
 func resolve(name string, as veyron2.Runtime) (string, error) {
 	// Resolve the name one level.
@@ -214,61 +317,75 @@
 
 	// Mount the collection server into the mount table.
 	vlog.Infof("Mount the collection server into the mount table.")
-	doMount(t, mtAddr, "mounttable/stuff", collectionName, true, rootRT)
+	doMount(t, mtAddr, "stuff", collectionName, true, rootRT)
 
 	// Create a few objects and make sure we can read them.
 	vlog.Infof("Create a few objects.")
-	export(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/the/rain"), "the rain", rootRT)
-	export(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/in/spain"), "in spain", rootRT)
-	export(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/falls"), "falls mainly on the plain", rootRT)
+	export(t, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", rootRT)
+	export(t, naming.JoinAddressName(mtAddr, "stuff/in/spain"), "in spain", rootRT)
+	export(t, naming.JoinAddressName(mtAddr, "stuff/falls"), "falls mainly on the plain", rootRT)
 	vlog.Infof("Make sure we can read them.")
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/the/rain"), "the rain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/in/spain"), "in spain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable//stuff/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/nonexistant"), "falls mainly on the plain", false, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/the/rain"), "the rain", true, bobRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/the/rain"), "the rain", false, aliceRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/in/spain"), "in spain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "/stuff/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/nonexistant"), "falls mainly on the plain", false, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", true, bobRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", false, aliceRT)
 
 	// Test multiple mounts.
 	vlog.Infof("Multiple mounts.")
-	doMount(t, mtAddr, "mounttable/a/b", collectionName, true, rootRT)
-	doMount(t, mtAddr, "mounttable/x/y", collectionName, true, rootRT)
-	doMount(t, mtAddr, "mounttable/alpha//beta", collectionName, true, rootRT)
+	doMount(t, mtAddr, "a/b", collectionName, true, rootRT)
+	doMount(t, mtAddr, "x/y", collectionName, true, rootRT)
+	doMount(t, mtAddr, "alpha//beta", collectionName, true, rootRT)
 	vlog.Infof("Make sure we can read them.")
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/x/y/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/alpha/beta/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", true, aliceRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", false, bobRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "x/y/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "alpha/beta/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", true, aliceRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", false, bobRT)
+
+	// Test getting/setting ACLs.
+	acl, etag := doGetACL(t, mtAddr, "stuff", true, rootRT)
+	doSetACL(t, mtAddr, "stuff", acl, "xyzzy", false, rootRT) // bad etag
+	doSetACL(t, mtAddr, "stuff", acl, etag, true, rootRT)     // good etag
+	_, netag := doGetACL(t, mtAddr, "stuff", true, rootRT)
+	if netag == etag {
+		boom(t, "etag didn't change after SetACL: %s", netag)
+	}
+	doSetACL(t, mtAddr, "stuff", acl, "", true, rootRT) // no etag
+
+	// Bob should be able to create nodes under the mounttable root but not alice.
+	doSetACL(t, mtAddr, "onlybob", acl, "", false, aliceRT)
+	doSetACL(t, mtAddr, "onlybob", acl, "", true, bobRT)
 
 	// Test generic unmount.
 	vlog.Info("Test generic unmount.")
-	doUnmount(t, mtAddr, "mounttable/a/b", "", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", false, rootRT)
+	doUnmount(t, mtAddr, "a/b", "", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", false, rootRT)
 
 	// Test specific unmount.
 	vlog.Info("Test specific unmount.")
-	doMount(t, mtAddr, "mounttable/a/b", collectionName, true, rootRT)
-	doUnmount(t, mtAddr, "mounttable/a/b", collectionName, true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", false, rootRT)
+	doMount(t, mtAddr, "a/b", collectionName, true, rootRT)
+	doUnmount(t, mtAddr, "a/b", collectionName, true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", false, rootRT)
 
 	// Try timing out a mount.
 	vlog.Info("Try timing out a mount.")
 	ft := NewFakeTimeClock()
 	setServerListClock(ft)
-	doMount(t, mtAddr, "mounttable/stuffWithTTL", collectionName, true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuffWithTTL/the/rain"), "the rain", true, rootRT)
+	doMount(t, mtAddr, "stuffWithTTL", collectionName, true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuffWithTTL/the/rain"), "the rain", true, rootRT)
 	ft.advance(time.Duration(ttlSecs+4) * time.Second)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuffWithTTL/the/rain"), "the rain", false, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuffWithTTL/the/rain"), "the rain", false, rootRT)
 
 	// Test unauthorized mount.
 	vlog.Info("Test unauthorized mount.")
-	doMount(t, mtAddr, "mounttable//a/b", collectionName, false, bobRT)
-	doMount(t, mtAddr, "mounttable//a/b", collectionName, false, aliceRT)
+	doMount(t, mtAddr, "/a/b", collectionName, false, bobRT)
+	doMount(t, mtAddr, "/a/b", collectionName, false, aliceRT)
 
-	doUnmount(t, mtAddr, "mounttable/x/y", collectionName, false, bobRT)
+	doUnmount(t, mtAddr, "x/y", collectionName, false, bobRT)
 }
 
 func doGlobX(t *testing.T, ep, suffix, pattern string, as veyron2.Runtime, joinServer bool) []string {
@@ -321,6 +438,20 @@
 	}
 }
 
+// checkExists makes sure a name exists (or not).
+func checkExists(t *testing.T, ep, suffix string, shouldSucceed bool, as veyron2.Runtime) {
+	x := doGlobX(t, ep, "", suffix, as, false)
+	if len(x) != 1 || x[0] != suffix {
+		if shouldSucceed {
+			boom(t, "Failed to find %s", suffix)
+		}
+		return
+	}
+	if !shouldSucceed {
+		boom(t, "%s exists but shouldn't", suffix)
+	}
+}
+
 func TestGlob(t *testing.T) {
 	server, estr := newMT(t, "")
 	defer server.Stop()
@@ -365,15 +496,30 @@
 	}
 }
 
-func TestGlobACLs(t *testing.T) {
-	t.Skip("Skipped until ACLs are correctly implemented for mounttable.Glob.")
+func TestACLTemplate(t *testing.T) {
+	server, estr := newMT(t, "testdata/test.acl")
+	defer server.Stop()
+	fakeServer := naming.JoinAddressName(estr, "quux")
 
+	// Noone should be able to mount on someone else's names.
+	doMount(t, estr, "users/ted", fakeServer, false, aliceRT)
+	doMount(t, estr, "users/carol", fakeServer, false, bobRT)
+	doMount(t, estr, "users/george", fakeServer, false, rootRT)
+
+	// Anyone should be able to mount on their own names.
+	doMount(t, estr, "users/alice", fakeServer, true, aliceRT)
+	doMount(t, estr, "users/bob", fakeServer, true, bobRT)
+	doMount(t, estr, "users/root", fakeServer, true, rootRT)
+}
+
+func TestGlobACLs(t *testing.T) {
 	server, estr := newMT(t, "testdata/test.acl")
 	defer server.Stop()
 
 	// set up a mount space
 	fakeServer := naming.JoinAddressName(estr, "quux")
-	doMount(t, estr, "one/bright/day", fakeServer, true, rootRT)
+	doMount(t, estr, "one/bright/day", fakeServer, false, aliceRT) // Fails because alice can't mount there.
+	doMount(t, estr, "one/bright/day", fakeServer, true, bobRT)
 	doMount(t, estr, "a/b/c", fakeServer, true, rootRT)
 
 	// Try various globs.
@@ -382,12 +528,13 @@
 		in       string
 		expected []string
 	}{
-		{rootRT, "*", []string{"one", "a"}},
-		{aliceRT, "*", []string{"one", "a"}},
-		{bobRT, "*", []string{"one"}},
-		{rootRT, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c"}},
-		{aliceRT, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c"}},
-		{bobRT, "*/...", []string{"one", "one/bright", "one/bright/day"}},
+		{rootRT, "*", []string{"one", "a", "stuff", "users"}},
+		{aliceRT, "*", []string{"one", "a", "users"}},
+		{bobRT, "*", []string{"one", "stuff", "users"}},
+		// bob, alice, and root have different visibility to the space.
+		{rootRT, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c", "stuff", "users"}},
+		{aliceRT, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c", "users"}},
+		{bobRT, "*/...", []string{"one", "one/bright", "one/bright/day", "stuff", "users"}},
 	}
 	for _, test := range tests {
 		out := doGlob(t, estr, "", test.in, test.as)
@@ -395,16 +542,40 @@
 	}
 }
 
+func TestDelete(t *testing.T) {
+	server, estr := newMT(t, "testdata/test.acl")
+	defer server.Stop()
+
+	// set up a mount space
+	fakeServer := naming.JoinAddressName(estr, "quux")
+	doMount(t, estr, "one/bright/day", fakeServer, true, bobRT)
+	doMount(t, estr, "a/b/c", fakeServer, true, rootRT)
+
+	// It shouldn't be possible to delete anything with children unless explicitly requested.
+	doDeleteNode(t, estr, "a/b", false, rootRT)
+	checkExists(t, estr, "a/b", true, rootRT)
+	doDeleteSubtree(t, estr, "a/b", true, rootRT)
+	checkExists(t, estr, "a/b", false, rootRT)
+
+	// Alice shouldn't be able to delete what bob created but bob and root should.
+	doDeleteNode(t, estr, "one/bright/day", false, aliceRT)
+	checkExists(t, estr, "one/bright/day", true, rootRT)
+	doDeleteNode(t, estr, "one/bright/day", true, rootRT)
+	checkExists(t, estr, "one/bright/day", false, rootRT)
+	doDeleteNode(t, estr, "one/bright", true, bobRT)
+	checkExists(t, estr, "one/bright", false, rootRT)
+}
+
 func TestServerFormat(t *testing.T) {
 	server, estr := newMT(t, "")
 	defer server.Stop()
 
-	doMount(t, estr, "mounttable/endpoint", naming.JoinAddressName(estr, "life/on/the/mississippi"), true, rootRT)
-	doMount(t, estr, "mounttable/hostport", "/atrampabroad:8000", true, rootRT)
-	doMount(t, estr, "mounttable/hostport-endpoint-platypus", "/@atrampabroad:8000@@", true, rootRT)
-	doMount(t, estr, "mounttable/invalid/not/rooted", "atrampabroad:8000", false, rootRT)
-	doMount(t, estr, "mounttable/invalid/no/port", "/atrampabroad", false, rootRT)
-	doMount(t, estr, "mounttable/invalid/endpoint", "/@following the equator:8000@@@", false, rootRT)
+	doMount(t, estr, "endpoint", naming.JoinAddressName(estr, "life/on/the/mississippi"), true, rootRT)
+	doMount(t, estr, "hostport", "/atrampabroad:8000", true, rootRT)
+	doMount(t, estr, "hostport-endpoint-platypus", "/@atrampabroad:8000@@", true, rootRT)
+	doMount(t, estr, "invalid/not/rooted", "atrampabroad:8000", false, rootRT)
+	doMount(t, estr, "invalid/no/port", "/atrampabroad", false, rootRT)
+	doMount(t, estr, "invalid/endpoint", "/@following the equator:8000@@@", false, rootRT)
 }
 
 func TestExpiry(t *testing.T) {
@@ -417,21 +588,21 @@
 
 	ft := NewFakeTimeClock()
 	setServerListClock(ft)
-	doMount(t, estr, "mounttable/a1/b1", collectionName, true, rootRT)
-	doMount(t, estr, "mounttable/a1/b2", collectionName, true, rootRT)
-	doMount(t, estr, "mounttable/a2/b1", collectionName, true, rootRT)
-	doMount(t, estr, "mounttable/a2/b2/c", collectionName, true, rootRT)
+	doMount(t, estr, "a1/b1", collectionName, true, rootRT)
+	doMount(t, estr, "a1/b2", collectionName, true, rootRT)
+	doMount(t, estr, "a2/b1", collectionName, true, rootRT)
+	doMount(t, estr, "a2/b2/c", collectionName, true, rootRT)
 
-	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, estr, "mounttable", "*/b1/...", rootRT))
+	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, estr, "", "*/b1/...", rootRT))
 	ft.advance(time.Duration(ttlSecs/2) * time.Second)
-	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, estr, "mounttable", "*/b1/...", rootRT))
-	checkMatch(t, []string{"c"}, doGlob(t, estr, "mounttable/a2/b2", "*", rootRT))
+	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, estr, "", "*/b1/...", rootRT))
+	checkMatch(t, []string{"c"}, doGlob(t, estr, "a2/b2", "*", rootRT))
 	// Refresh only a1/b1.  All the other mounts will expire upon the next
 	// ft advance.
-	doMount(t, estr, "mounttable/a1/b1", collectionName, true, rootRT)
+	doMount(t, estr, "a1/b1", collectionName, true, rootRT)
 	ft.advance(time.Duration(ttlSecs/2+4) * time.Second)
-	checkMatch(t, []string{"a1"}, doGlob(t, estr, "mounttable", "*", rootRT))
-	checkMatch(t, []string{"a1/b1"}, doGlob(t, estr, "mounttable", "*/b1/...", rootRT))
+	checkMatch(t, []string{"a1"}, doGlob(t, estr, "", "*", rootRT))
+	checkMatch(t, []string{"a1/b1"}, doGlob(t, estr, "", "*/b1/...", rootRT))
 }
 
 func TestBadACLs(t *testing.T) {
@@ -443,10 +614,6 @@
 	if err == nil {
 		boom(t, "Expected error from missing acl file")
 	}
-	_, err = NewMountTable("testdata/noroot.acl")
-	if err == nil {
-		boom(t, "Expected error for missing '/' acl")
-	}
 }
 
 func init() {
diff --git a/services/mounttable/lib/neighborhood.go b/services/mounttable/lib/neighborhood.go
index 1392b79..3cc6952 100644
--- a/services/mounttable/lib/neighborhood.go
+++ b/services/mounttable/lib/neighborhood.go
@@ -14,6 +14,7 @@
 	"v.io/core/veyron2/naming"
 	"v.io/core/veyron2/security"
 	"v.io/core/veyron2/services/mounttable"
+	"v.io/core/veyron2/services/security/access"
 	verror "v.io/core/veyron2/verror2"
 	"v.io/core/veyron2/vlog"
 
@@ -255,6 +256,11 @@
 	return errors.New("this server does not implement Unmount")
 }
 
+// Delete not implemented.
+func (*neighborhoodService) Delete(_ ipc.ServerContext, _ bool) error {
+	return errors.New("this server does not implement Delete")
+}
+
 // Glob__ implements ipc.AllGlobber
 func (ns *neighborhoodService) Glob__(ctx ipc.ServerContext, pattern string) (<-chan naming.VDLMountEntry, error) {
 	g, err := glob.Parse(pattern)
@@ -291,3 +297,11 @@
 		return nil, verror.Make(naming.ErrNoSuchName, ctx.Context(), ns.elems)
 	}
 }
+
+func (*neighborhoodService) SetACL(ctx ipc.ServerContext, acl access.TaggedACLMap, etag string) error {
+	return errors.New("this server does not implement SetACL")
+}
+
+func (*neighborhoodService) GetACL(ctx ipc.ServerContext) (acl access.TaggedACLMap, etag string, err error) {
+	return nil, "", nil
+}
diff --git a/services/mounttable/lib/tamg.go b/services/mounttable/lib/tamg.go
new file mode 100644
index 0000000..d61bd3d
--- /dev/null
+++ b/services/mounttable/lib/tamg.go
@@ -0,0 +1,70 @@
+package mounttable
+
+import (
+	"strconv"
+
+	"v.io/core/veyron2/security"
+	"v.io/core/veyron2/services/security/access"
+	"v.io/core/veyron2/verror2"
+)
+
+// TAMG associates a generation with a TaggedACLMap
+type TAMG struct {
+	tam        access.TaggedACLMap
+	generation int32
+}
+
+func NewTAMG() *TAMG {
+	return &TAMG{tam: make(access.TaggedACLMap)}
+}
+
+// Set sets the ACLs iff generation matches the current generation.  If the set happens, the generation is advanced.
+// If b is nil, this creates a new TAMG.
+func (b *TAMG) Set(genstr string, tam access.TaggedACLMap) (*TAMG, error) {
+	if b == nil {
+		b = new(TAMG)
+	}
+	if len(genstr) > 0 {
+		gen, err := strconv.ParseInt(genstr, 10, 32)
+		if err != nil {
+			return b, verror2.Make(access.BadEtag, nil)
+		}
+		if gen >= 0 && int32(gen) != b.generation {
+			return b, verror2.Make(access.BadEtag, nil)
+		}
+	}
+	b.tam = tam
+	b.generation++
+	// Protect against wrap.
+	if b.generation < 0 {
+		b.generation = 0
+	}
+	return b, nil
+}
+
+// Get returns the current generation and acls.
+func (b *TAMG) Get() (string, access.TaggedACLMap) {
+	if b == nil {
+		return "", nil
+	}
+	return strconv.FormatInt(int64(b.generation), 10), b.tam
+}
+
+// GetACLForTag returns the current acls for the given tag.
+func (b *TAMG) GetACLForTag(tag string) (access.ACL, bool) {
+	acl, exists := b.tam[tag]
+	return acl, exists
+}
+
+// Copy copies the receiver.
+func (b *TAMG) Copy() *TAMG {
+	nt := new(TAMG)
+	nt.tam = b.tam.Copy()
+	nt.generation = b.generation
+	return nt
+}
+
+// Add adds the blessing pattern to the tag in the reciever.
+func (b *TAMG) Add(pattern security.BlessingPattern, tag string) {
+	b.tam.Add(pattern, tag)
+}
diff --git a/services/mounttable/lib/testdata/test.acl b/services/mounttable/lib/testdata/test.acl
index 7990532..edfb36f 100644
--- a/services/mounttable/lib/testdata/test.acl
+++ b/services/mounttable/lib/testdata/test.acl
@@ -1,14 +1,21 @@
 {
-"/": {
+"": {
 	"Read":  { "In": ["..."] },
-	"Write": { "In": ["root"] }
+	"Admin": { "In": ["root"] },
+	"Create": { "In": ["root", "bob"] }
 },
-"/stuff": {
+"stuff": {
 	"Read":  { "In": ["bob", "root"] },
-        "Write": { "In": ["root"] }
+        "Mount": { "In": ["root"] }
 },
-"/a": {
+"a": {
 	"Read":  { "In": ["alice", "root"] },
-        "Write": { "In": ["root"] }
+        "Create": { "In": ["root"] }
+},
+"users": {
+	"Create": { "In": ["..."] }
+},
+"users/%%": {
+	"Admin":  { "In": ["%%/..."] }
 }
 }
diff --git a/tools/mounttable/impl_test.go b/tools/mounttable/impl_test.go
index 4d216a0..6a091fe 100644
--- a/tools/mounttable/impl_test.go
+++ b/tools/mounttable/impl_test.go
@@ -11,6 +11,7 @@
 	"v.io/core/veyron2/rt"
 	"v.io/core/veyron2/security"
 	"v.io/core/veyron2/services/mounttable"
+	"v.io/core/veyron2/services/security/access"
 	"v.io/core/veyron2/vlog"
 
 	"v.io/core/veyron/profiles"
@@ -53,6 +54,20 @@
 	return
 }
 
+func (s *server) Delete(ipc.ServerContext, bool) error {
+	vlog.VI(2).Infof("Delete() was called. suffix=%v", s.suffix)
+	return nil
+}
+func (s *server) SetACL(ipc.ServerContext, access.TaggedACLMap, string) error {
+	vlog.VI(2).Infof("SetACL() was called. suffix=%v", s.suffix)
+	return nil
+}
+
+func (s *server) GetACL(ipc.ServerContext) (access.TaggedACLMap, string, error) {
+	vlog.VI(2).Infof("GetACL() was called. suffix=%v", s.suffix)
+	return nil, "", nil
+}
+
 type dispatcher struct {
 }