Merge "mounttable: Make ACLs actually so something useful."
diff --git a/services/mounttable/lib/mounttable.go b/services/mounttable/lib/mounttable.go
index 1eff3b2..d1fc1bc 100644
--- a/services/mounttable/lib/mounttable.go
+++ b/services/mounttable/lib/mounttable.go
@@ -4,7 +4,7 @@
 	"encoding/json"
 	"fmt"
 	"os"
-	"path"
+	"reflect"
 	"strings"
 	"sync"
 	"time"
@@ -17,33 +17,39 @@
 	"v.io/core/veyron2/security"
 	"v.io/core/veyron2/services/mounttable"
 	"v.io/core/veyron2/services/security/access"
-	verror "v.io/core/veyron2/verror2"
+	"v.io/core/veyron2/verror2"
 	"v.io/core/veyron2/vlog"
 )
 
 var (
-	errNamingLoop = verror.Register("v.io/core/veyron/services/mountable/lib", verror.NoRetry, "Loop in namespace")
+	errNamingLoop = verror2.Register("v.io/core/veyron/services/mountable/lib", verror2.NoRetry, "Loop in namespace")
+	traverseTags  = []mounttable.Tag{mounttable.Read, mounttable.Resolve, mounttable.Create, mounttable.Admin}
+	createTags    = []mounttable.Tag{mounttable.Create, mounttable.Admin}
+	removeTags    = []mounttable.Tag{mounttable.Admin}
+	mountTags     = []mounttable.Tag{mounttable.Mount, mounttable.Admin}
+	globTags      = []mounttable.Tag{mounttable.Read, mounttable.Admin}
+	setTags       = []mounttable.Tag{mounttable.Admin}
+	getTags       = []mounttable.Tag{mounttable.Admin, mounttable.Read}
+	allTags       = []mounttable.Tag{mounttable.Read, mounttable.Resolve, mounttable.Admin, mounttable.Mount, mounttable.Create}
 )
 
 // mountTable represents a namespace.  One exists per server instance.
 type mountTable struct {
 	sync.RWMutex
-	root *node
-	acls map[string]security.Authorizer
+	root       *node
+	superUsers access.ACL
 }
 
 var _ ipc.Dispatcher = (*mountTable)(nil)
 
 // mountContext represents a client bind.  The name is the name that was bound to.
 type mountContext struct {
-	name         string
-	elems        []string // parsed elements of name
-	cleanedElems []string // parsed elements of cleaned name (with leading /
-	// and double / removed).
-	mt *mountTable
+	name  string
+	elems []string // parsed elements of name
+	mt    *mountTable
 }
 
-// mount represents a single mount point.  It contains OAs of all servers mounted
+// mount represents a single mount point.  It contains the rooted names of all servers mounted
 // here.  The servers are considered equivalent, i.e., RPCs to a name below this
 // point can be sent to any of these servers.
 type mount struct {
@@ -53,11 +59,15 @@
 
 // node is a single point in the tree representing the mount table.
 type node struct {
-	parent   *node
-	mount    *mount
-	children map[string]*node
+	parent     *node
+	mount      *mount
+	children   map[string]*node
+	acls       *TAMG
+	amTemplate access.TaggedACLMap
 }
 
+const templateVar = "%%"
+
 // NewMountTable creates a new server that uses the ACLs specified in
 // aclfile for authorization.
 //
@@ -65,40 +75,63 @@
 // access.TaggedACLMap for that path. The tags used in the map are the typical
 // access tags (the Tag type defined in veyron2/services/security/access).
 func NewMountTable(aclfile string) (*mountTable, error) {
-	acls, err := parseACLs(aclfile)
-	if err != nil {
+	mt := &mountTable{
+		root: new(node),
+	}
+	if err := mt.parseACLs(aclfile); err != nil {
 		return nil, err
 	}
-	return &mountTable{
-		root: new(node),
-		acls: acls,
-	}, nil
+	return mt, nil
 }
 
-func parseACLs(path string) (map[string]security.Authorizer, error) {
+func (mt *mountTable) parseACLs(path string) error {
+	vlog.VI(2).Infof("parseACLs(%s)", path)
 	if path == "" {
-		return nil, nil
+		return nil
 	}
-	var acls map[string]access.TaggedACLMap
+	var tams map[string]access.TaggedACLMap
 	f, err := os.Open(path)
 	if err != nil {
-		return nil, err
+		return err
 	}
 	defer f.Close()
-	if err = json.NewDecoder(f).Decode(&acls); err != nil {
-		return nil, err
+	if err = json.NewDecoder(f).Decode(&tams); err != nil {
+		return err
 	}
-	result := make(map[string]security.Authorizer)
-	for name, acl := range acls {
-		result[name], err = access.TaggedACLAuthorizer(acl, access.TypicalTagType())
-		if err != nil {
-			return nil, fmt.Errorf("Unable to create ACL for %q: %v", name, err)
+	for name, tam := range tams {
+		var elems []string
+		isPattern := false
+		// Create name and add the ACL map to it.
+		if len(name) == 0 {
+			// If the config file has is an Admin tag on the root ACL, the
+			// list of Admin users is the equivalent of a super user for
+			// the whole table.  This is not updated if the ACL is later
+			// modified.
+			if acl, exists := tam[string(mounttable.Admin)]; exists {
+				mt.superUsers = acl
+			}
+		} else {
+			// ACL templates terminate with a %% element.  These are very
+			// constrained matches, i.e., the trailing element of the name
+			// is copied into every %% in the ACL.
+			elems = strings.Split(name, "/")
+			if elems[len(elems)-1] == templateVar {
+				isPattern = true
+				elems = elems[:len(elems)-1]
+			}
+		}
+
+		n, err := mt.findNode(nil, elems, true, nil)
+		if n != nil || err == nil {
+			vlog.VI(2).Infof("added tam %v to %s", tam, name)
+			if isPattern {
+				n.amTemplate = tam
+			} else {
+				n.acls, _ = n.acls.Set("", tam)
+			}
 		}
 	}
-	if result["/"] == nil {
-		return nil, fmt.Errorf("No acl for / in %s", path)
-	}
-	return result, nil
+	return nil
 }
 
 // Lookup implements ipc.Dispatcher.Lookup.
@@ -112,42 +145,10 @@
 	}
 	if len(name) > 0 {
 		ms.elems = strings.Split(name, "/")
-		ms.cleanedElems = strings.Split(strings.TrimLeft(path.Clean(name), "/"), "/")
 	}
 	return mounttable.MountTableServer(ms), ms, nil
 }
 
-// findNode returns the node for the name path represented by elems.  If none exists and create is false, return nil.
-// Otherwise create the path and return a pointer to the terminal node.
-func (mt *mountTable) findNode(elems []string, create bool) *node {
-	cur := mt.root
-
-	// Iterate down the tree.
-	for _, e := range elems {
-		// if we hit another mount table, we're done
-		if cur.mount != nil {
-			return nil
-		}
-		// then walk the children
-		c, ok := cur.children[e]
-		if ok {
-			cur = c
-			continue
-		}
-		if !create {
-			return nil
-		}
-		next := new(node)
-		if cur.children == nil {
-			cur.children = make(map[string]*node)
-		}
-		cur.children[e] = next
-		next.parent = cur
-		cur = next
-	}
-	return cur
-}
-
 // isActive returns true if a mount has unexpired servers attached.
 func (m *mount) isActive() bool {
 	if m == nil {
@@ -156,78 +157,199 @@
 	return m.servers.removeExpired() > 0
 }
 
-// walk returns the first mount point node on the elems path and the suffix of elems below that mount point.
-// If no mount point is found, it returns nil,nil.
-func (mt *mountTable) walk(n *node, elems []string) (*node, []string) {
-	if n.mount.isActive() {
-		return n, elems
-	} else if n.mount != nil {
-		n.removeUseless()
-	}
-	if len(elems) > 0 {
-		if c, ok := n.children[elems[0]]; ok {
-			if nn, nelems := mt.walk(c, elems[1:]); nn != nil {
-				return nn, nelems
-			}
-		}
-	}
-	return nil, nil
-}
-
-func (mt *mountTable) authorizeStep(name string, c security.Context) error {
-	if mt.acls == nil {
+// satisfies returns no error if the ctx + n.acls satisfies the associated one of the required Tags.
+func (n *node) satisfies(mt *mountTable, ctx ipc.ServerContext, tags []mounttable.Tag) error {
+	// No ACLs means everything (for now).
+	if ctx == nil || tags == nil || n.acls == nil {
 		return nil
 	}
-	mt.Lock()
-	acl := mt.acls[name]
-	mt.Unlock()
-	vlog.VI(2).Infof("authorizeStep(%v) %v %v %v", name, c.RemoteBlessings(), c.MethodTags(), acl)
-	if acl != nil {
-		return acl.Authorize(c)
+	// "Self-RPCs" are always authorized.
+	if l, r := ctx.LocalBlessings(), ctx.RemoteBlessings(); l != nil && r != nil && reflect.DeepEqual(l.PublicKey(), r.PublicKey()) {
+		return nil
 	}
-	return nil
+	// Match client's blessings against the ACLs.
+	var blessings []string
+	if ctx.RemoteBlessings() != nil {
+		blessings = ctx.RemoteBlessings().ForContext(ctx)
+	}
+	for _, tag := range tags {
+		if acl, exists := n.acls.GetACLForTag(string(tag)); exists && acl.Includes(blessings...) {
+			return nil
+		}
+	}
+	if mt.superUsers.Includes(blessings...) {
+		return nil
+	}
+	return fmt.Errorf("%v does not match ACL", blessings)
+}
+
+func expand(acl *access.ACL, name string) *access.ACL {
+	newACL := new(access.ACL)
+	for _, bp := range acl.In {
+		newACL.In = append(newACL.In, security.BlessingPattern(strings.Replace(string(bp), templateVar, name, -1)))
+	}
+	for _, bp := range acl.NotIn {
+		newACL.NotIn = append(newACL.NotIn, strings.Replace(bp, templateVar, name, -1))
+	}
+	return newACL
+}
+
+// satisfiesTemplate returns no error if the ctx + n.amTemplate satisfies the associated one of
+// the required Tags.
+func (n *node) satisfiesTemplate(ctx ipc.ServerContext, tags []mounttable.Tag, name string) error {
+	if n.amTemplate == nil {
+		return nil
+	}
+	// Match client's blessings against the ACLs.
+	var blessings []string
+	if ctx.RemoteBlessings() != nil {
+		blessings = ctx.RemoteBlessings().ForContext(ctx)
+	}
+	for _, tag := range tags {
+		if acl, exists := n.amTemplate[string(tag)]; exists && expand(&acl, name).Includes(blessings...) {
+			return nil
+		}
+	}
+	return fmt.Errorf("%v does not match ACL", blessings)
+}
+
+// copyACLs copies one nodes ACLs to another and adds the clients blessings as
+// patterns to the Admin tag.
+func copyACLs(ctx ipc.ServerContext, cur *node) *TAMG {
+	if ctx == nil {
+		return nil
+	}
+	if cur.acls == nil {
+		return nil
+	}
+	acls := cur.acls.Copy()
+	var blessings []string
+	if ctx.RemoteBlessings() != nil {
+		blessings = ctx.RemoteBlessings().ForContext(ctx)
+	}
+	for _, b := range blessings {
+		acls.Add(security.BlessingPattern(b), string(mounttable.Admin))
+	}
+	return acls
+}
+
+// createTAMGFromTemplate creates a new TAMG from the template subsituting name for %% everywhere.
+func createTAMGFromTemplate(tam access.TaggedACLMap, name string) *TAMG {
+	tamg := NewTAMG()
+	for tag, acl := range tam {
+		tamg.tam[tag] = *expand(&acl, name)
+	}
+	return tamg
+}
+
+// traverse returns the node for the path represented by elems.  If none exists and create is false, return nil.
+// Otherwise create the path and return a pointer to the terminal node.  If a mount point is encountered
+// while following the path, return that node and any remaining elems.
+func (mt *mountTable) traverse(ctx ipc.ServerContext, cur *node, elems []string, create bool) (*node, []string, error) {
+	// Iterate down the tree.
+	for i, e := range elems {
+		vlog.VI(2).Infof("satisfying %v %v", elems[0:i], *cur)
+		if err := cur.satisfies(mt, ctx, traverseTags); err != nil {
+			return nil, nil, err
+		}
+		// If we hit another mount table, we're done.
+		if cur.mount.isActive() {
+			return cur, elems[i:], nil
+		}
+		// Walk the children looking for a match.
+		c, ok := cur.children[e]
+		if ok {
+			cur = c
+			continue
+		}
+		if !create {
+			return nil, nil, nil
+		}
+		// Create a new node and keep recursing.
+		if err := cur.satisfies(mt, ctx, createTags); err != nil {
+			return nil, nil, err
+		}
+		if err := cur.satisfiesTemplate(ctx, createTags, e); err != nil {
+			return nil, nil, err
+		}
+		next := new(node)
+		if cur.children == nil {
+			cur.children = make(map[string]*node)
+		}
+		cur.children[e] = next
+		next.parent = cur
+		if cur.amTemplate != nil {
+			next.acls = createTAMGFromTemplate(cur.amTemplate, e)
+		} else {
+			next.acls = copyACLs(ctx, cur)
+		}
+		cur = next
+	}
+	return cur, nil, nil
+}
+
+// findNode finds a node in the table and optionally creates a path to it.
+func (mt *mountTable) findNode(ctx ipc.ServerContext, elems []string, create bool, tags []mounttable.Tag) (*node, error) {
+	n, nelems, err := mt.traverse(ctx, mt.root, elems, create)
+	if err != nil {
+		return nil, err
+	}
+	if len(nelems) > 0 {
+		return nil, nil // Node not found.
+	}
+	if n == nil {
+		return nil, nil
+	}
+	if err := n.satisfies(mt, ctx, tags); err != nil {
+		return nil, err
+	}
+	return n, nil
+}
+
+// findMountPoint returns the first mount point encountered in the path and
+// any elements remaining of the path.
+func (mt *mountTable) findMountPoint(ctx ipc.ServerContext, n *node, elems []string) (*node, []string, error) {
+	var err error
+	n, elems, err = mt.traverse(ctx, n, elems, false)
+	if err != nil {
+		return nil, nil, err
+	}
+	if n == nil || !n.mount.isActive() {
+		return nil, nil, nil
+	}
+	return n, elems, nil
 }
 
 // Authorize verifies that the client has access to the requested node.
-// Checks the acls on all nodes in the path starting at the root.
-func (ms *mountContext) Authorize(context security.Context) error {
-	if err := ms.mt.authorizeStep("/", context); err != nil {
-		return err
-	}
-	key := ""
-	for _, step := range ms.cleanedElems {
-		key := key + "/" + step
-		if err := ms.mt.authorizeStep(key, context); err != nil {
-			return err
-		}
-	}
+// Since we do the check at the time of access, we always return OK here.
+func (ms *mountContext) Authorize(ctx security.Context) error {
 	return nil
 }
 
 // ResolveStep returns the next server in a resolution, the name remaining below that server,
 // and whether or not that server is another mount table.
-func (ms *mountContext) ResolveStep(context ipc.ServerContext) (servers []naming.VDLMountedServer, suffix string, err error) {
+func (ms *mountContext) ResolveStep(ctx ipc.ServerContext) (servers []naming.VDLMountedServer, suffix string, err error) {
 	vlog.VI(2).Infof("ResolveStep %q", ms.name)
 	mt := ms.mt
-	// TODO(caprita): we need to grab a write lock because walk may
-	// garbage-collect expired servers.  Rework this to avoid this potential
-	// bottleneck.
 	mt.Lock()
 	defer mt.Unlock()
 	// Find the next mount point for the name.
-	n, elems := mt.walk(mt.root, ms.elems)
+	n, elems, err := mt.findMountPoint(ctx, mt.root, ms.elems)
+	if err != nil {
+		return nil, "", err
+	}
 	if n == nil {
 		if len(ms.elems) == 0 {
-			return nil, ms.name, verror.Make(naming.ErrNoSuchNameRoot, context.Context(), ms.name)
+			return nil, ms.name, verror2.Make(naming.ErrNoSuchNameRoot, ctx.Context(), ms.name)
 		}
-		return nil, ms.name, verror.Make(naming.ErrNoSuchName, context.Context(), ms.name)
+		return nil, ms.name, verror2.Make(naming.ErrNoSuchName, ctx.Context(), ms.name)
 	}
 	return n.mount.servers.copyToSlice(), strings.Join(elems, "/"), nil
 }
 
 // ResolveStepX returns the next server in a resolution in the form of a MountEntry.  The name
 // in the mount entry is the name relative to the server's root.
-func (ms *mountContext) ResolveStepX(context ipc.ServerContext) (entry naming.VDLMountEntry, err error) {
+func (ms *mountContext) ResolveStepX(ctx ipc.ServerContext) (entry naming.VDLMountEntry, err error) {
 	vlog.VI(2).Infof("ResolveStep %q", ms.name)
 	mt := ms.mt
 	// TODO(caprita): we need to grab a write lock because walk may
@@ -236,13 +358,17 @@
 	mt.Lock()
 	defer mt.Unlock()
 	// Find the next mount point for the name.
-	n, elems := mt.walk(mt.root, ms.elems)
+	n, elems, werr := mt.findMountPoint(ctx, mt.root, ms.elems)
+	if werr != nil {
+		err = werr
+		return
+	}
 	if n == nil {
 		entry.Name = ms.name
 		if len(ms.elems) == 0 {
-			err = verror.Make(naming.ErrNoSuchNameRoot, context.Context(), ms.name)
+			err = verror2.Make(naming.ErrNoSuchNameRoot, ctx.Context(), ms.name)
 		} else {
-			err = verror.Make(naming.ErrNoSuchName, context.Context(), ms.name)
+			err = verror2.Make(naming.ErrNoSuchName, ctx.Context(), ms.name)
 		}
 		return
 	}
@@ -261,7 +387,7 @@
 }
 
 // Mount a server onto the name in the receiver.
-func (ms *mountContext) Mount(context ipc.ServerContext, server string, ttlsecs uint32, flags naming.MountFlag) error {
+func (ms *mountContext) Mount(ctx ipc.ServerContext, server string, ttlsecs uint32, flags naming.MountFlag) error {
 	mt := ms.mt
 	if ttlsecs == 0 {
 		ttlsecs = 10 * 365 * 24 * 60 * 60 // a really long time
@@ -270,7 +396,7 @@
 
 	// Make sure the server name is reasonable.
 	epString, _ := naming.SplitAddressName(server)
-	runtime := veyron2.RuntimeFromContext(context.Context())
+	runtime := veyron2.RuntimeFromContext(ctx.Context())
 	_, err := runtime.NewEndpoint(epString)
 	if err != nil {
 		return fmt.Errorf("malformed address %q for mounted server %q", epString, server)
@@ -279,15 +405,16 @@
 	// Find/create node in namespace and add the mount.
 	mt.Lock()
 	defer mt.Unlock()
-	n := mt.findNode(ms.cleanedElems, true)
+	n, werr := mt.findNode(ctx, ms.elems, true, mountTags)
+	if werr != nil {
+		return werr
+	}
 	if n == nil {
-		return verror.Make(naming.ErrNoSuchNameRoot, context.Context(), ms.name)
+		return verror2.Make(naming.ErrNoSuchNameRoot, ctx.Context(), ms.name)
 	}
 	if hasReplaceFlag(flags) {
 		n.mount = nil
 	}
-	// TODO(p): When the endpoint actually has the ServesMountTable bit,
-	// or this with ep.ServesMountTable().
 	wantMT := hasMTFlag(flags)
 	if n.mount == nil {
 		n.mount = &mount{servers: NewServerList(), mt: wantMT}
@@ -302,7 +429,7 @@
 
 // A useful node has children or an active mount.
 func (n *node) isUseful() bool {
-	return len(n.children) > 0 || n.mount.isActive()
+	return len(n.children) > 0 || n.mount.isActive() || n.acls != nil
 }
 
 // removeUseless removes a node and all of its ascendants that are not useful.
@@ -336,11 +463,14 @@
 
 // Unmount removes servers from the name in the receiver. If server is specified, only that
 // server is removed.
-func (ms *mountContext) Unmount(context ipc.ServerContext, server string) error {
+func (ms *mountContext) Unmount(ctx ipc.ServerContext, server string) error {
 	mt := ms.mt
 	mt.Lock()
 	defer mt.Unlock()
-	n := mt.findNode(ms.cleanedElems, false)
+	n, err := mt.findNode(ctx, ms.elems, false, mountTags)
+	if err != nil {
+		return err
+	}
 	if n == nil {
 		return nil
 	}
@@ -355,25 +485,39 @@
 	return nil
 }
 
+// Delete removes the receiver.  If all is true, any subtree is also removed.
+func (ms *mountContext) Delete(ctx ipc.ServerContext, deleteSubTree bool) error {
+	mt := ms.mt
+	mt.Lock()
+	defer mt.Unlock()
+	n, err := mt.findNode(ctx, ms.elems, false, removeTags)
+	if err != nil {
+		return err
+	}
+	if n == nil {
+		return nil
+	}
+	if !deleteSubTree && len(n.children) > 0 {
+		return fmt.Errorf("cannot remove %s: has children", ms.name)
+	}
+	for k, c := range n.parent.children {
+		if c == n {
+			delete(n.parent.children, k)
+			break
+		}
+	}
+	return nil
+}
+
 // A struct holding a partial result of Glob.
 type globEntry struct {
 	n    *node
 	name string
 }
 
-func (mt *mountTable) globStep(n *node, name string, pattern *glob.Glob, context ipc.ServerContext, ch chan<- naming.VDLMountEntry) {
+func (mt *mountTable) globStep(n *node, name string, pattern *glob.Glob, ctx ipc.ServerContext, ch chan<- naming.VDLMountEntry) {
 	vlog.VI(2).Infof("globStep(%s, %s)", name, pattern)
 
-	if mt.acls != nil {
-		acl_name := "/" + strings.TrimLeft(naming.Join(context.Suffix(), name), "/")
-		// Skip this node if the user isn't authorized.
-		if acl := mt.acls[acl_name]; acl != nil {
-			if err := acl.Authorize(context); err != nil {
-				return
-			}
-		}
-	}
-
 	// If this is a mount point, we're done.
 	if m := n.mount; m != nil {
 		// Garbage-collect if expired.
@@ -395,6 +539,10 @@
 			n.removeUseless()
 			return
 		}
+		// To see anything, one has to have some access to the node.
+		if err := n.satisfies(mt, ctx, allTags); err != nil {
+			return
+		}
 		ch <- naming.VDLMountEntry{Name: name}
 	}
 
@@ -402,10 +550,24 @@
 		return
 	}
 
-	// Recurse through the children.
+	// Recurse through the children.  OK if client has read access to the
+	// directory or traverse access and any access to the child.
+	allAllowed := true
+	if err := n.satisfies(mt, ctx, globTags); err != nil {
+		allAllowed = false
+		if err := n.satisfies(mt, ctx, traverseTags); err != nil {
+			return
+		}
+	}
 	for k, c := range n.children {
 		if ok, _, suffix := pattern.MatchInitialSegment(k); ok {
-			mt.globStep(c, naming.Join(name, k), suffix, context, ch)
+			if !allAllowed {
+				// If child allows any access show it.  Otherwise, skip.
+				if err := c.satisfies(mt, ctx, allTags); err != nil {
+					continue
+				}
+			}
+			mt.globStep(c, naming.Join(name, k), suffix, ctx, ch)
 		}
 	}
 }
@@ -413,7 +575,7 @@
 // Glob finds matches in the namespace.  If we reach a mount point before matching the
 // whole pattern, return that mount point.
 // pattern is a glob pattern as defined by the veyron/lib/glob package.
-func (ms *mountContext) Glob__(context ipc.ServerContext, pattern string) (<-chan naming.VDLMountEntry, error) {
+func (ms *mountContext) Glob__(ctx ipc.ServerContext, pattern string) (<-chan naming.VDLMountEntry, error) {
 	vlog.VI(2).Infof("mt.Glob %v", ms.elems)
 
 	g, err := glob.Parse(pattern)
@@ -422,7 +584,6 @@
 	}
 
 	mt := ms.mt
-
 	ch := make(chan naming.VDLMountEntry)
 	go func() {
 		defer close(ch)
@@ -431,24 +592,26 @@
 		// bottleneck.
 		mt.Lock()
 		defer mt.Unlock()
-
+		// If there was an access error, just ignore the entry, i.e., make it invisible.
+		n, err := mt.findNode(ctx, ms.elems, false, nil)
+		if err != nil {
+			return
+		}
 		// If the current name is not fully resolvable on this nameserver we
 		// don't need to evaluate the glob expression. Send a partially resolved
 		// name back to the client.
-		n := mt.findNode(ms.cleanedElems, false)
 		if n == nil {
-			ms.linkToLeaf(ch)
+			ms.linkToLeaf(ctx, ch)
 			return
 		}
-
-		mt.globStep(n, "", g, context, ch)
+		mt.globStep(n, "", g, ctx, ch)
 	}()
 	return ch, nil
 }
 
-func (ms *mountContext) linkToLeaf(ch chan<- naming.VDLMountEntry) {
-	n, elems := ms.mt.walk(ms.mt.root, ms.cleanedElems)
-	if n == nil {
+func (ms *mountContext) linkToLeaf(ctx ipc.ServerContext, ch chan<- naming.VDLMountEntry) {
+	n, elems, err := ms.mt.findMountPoint(ctx, ms.mt.root, ms.elems)
+	if err != nil || n == nil {
 		return
 	}
 	servers := n.mount.servers.copyToSlice()
@@ -457,3 +620,40 @@
 	}
 	ch <- naming.VDLMountEntry{Name: "", Servers: servers}
 }
+
+func (ms *mountContext) SetACL(ctx ipc.ServerContext, tam access.TaggedACLMap, etag string) error {
+	vlog.VI(2).Infof("SetACL %q", ms.name)
+	mt := ms.mt
+
+	// Find/create node in namespace and add the mount.
+	mt.Lock()
+	defer mt.Unlock()
+	n, err := mt.findNode(ctx, ms.elems, true, setTags)
+	if err != nil {
+		return err
+	}
+	if n == nil {
+		// TODO(p): can this even happen?
+		return verror2.Make(naming.ErrNoSuchName, ctx.Context(), ms.name)
+	}
+	n.acls, err = n.acls.Set(etag, tam)
+	return err
+}
+
+func (ms *mountContext) GetACL(ctx ipc.ServerContext) (access.TaggedACLMap, string, error) {
+	vlog.VI(2).Infof("GetACL %q", ms.name)
+	mt := ms.mt
+
+	// Find node in namespace and add the mount.
+	mt.Lock()
+	defer mt.Unlock()
+	n, err := mt.findNode(ctx, ms.elems, false, getTags)
+	if err != nil {
+		return nil, "", err
+	}
+	if n == nil {
+		return nil, "", verror2.Make(naming.ErrNoSuchName, ctx.Context(), ms.name)
+	}
+	etag, tam := n.acls.Get()
+	return tam, etag, nil
+}
diff --git a/services/mounttable/lib/mounttable_test.go b/services/mounttable/lib/mounttable_test.go
index ad6477c..623df86 100644
--- a/services/mounttable/lib/mounttable_test.go
+++ b/services/mounttable/lib/mounttable_test.go
@@ -16,6 +16,7 @@
 	"v.io/core/veyron2/options"
 	"v.io/core/veyron2/rt"
 	"v.io/core/veyron2/security"
+	"v.io/core/veyron2/services/security/access"
 	"v.io/core/veyron2/vlog"
 
 	"v.io/core/veyron/lib/testutil"
@@ -83,6 +84,108 @@
 	}
 }
 
+func doGetACL(t *testing.T, ep, suffix string, shouldSucceed bool, as veyron2.Runtime) (acl access.TaggedACLMap, etag string) {
+	name := naming.JoinAddressName(ep, suffix)
+	ctx := as.NewContext()
+	client := as.Client()
+	call, err := client.StartCall(ctx, name, "GetACL", nil, options.NoResolve(true))
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to GetACL %s: %s", name, err)
+	}
+	if ierr := call.Finish(&acl, &etag, &err); ierr != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to GetACL %s: %s", name, ierr)
+	}
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to GetACL %s: %s", name, err)
+	}
+	return
+}
+
+func doSetACL(t *testing.T, ep, suffix string, acl access.TaggedACLMap, etag string, shouldSucceed bool, as veyron2.Runtime) {
+	name := naming.JoinAddressName(ep, suffix)
+	ctx := as.NewContext()
+	client := as.Client()
+	call, err := client.StartCall(ctx, name, "SetACL", []interface{}{acl, etag}, options.NoResolve(true))
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to SetACL %s: %s", name, err)
+	}
+	if ierr := call.Finish(&err); ierr != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to SetACL %s: %s", name, ierr)
+	}
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to SetACL %s: %s", name, err)
+	}
+	return
+}
+
+func doDeleteNode(t *testing.T, ep, suffix string, shouldSucceed bool, as veyron2.Runtime) {
+	name := naming.JoinAddressName(ep, suffix)
+	ctx := as.NewContext()
+	client := as.Client()
+	call, err := client.StartCall(ctx, name, "Delete", []interface{}{false}, options.NoResolve(true))
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete node %s: %s", name, err)
+	}
+	if ierr := call.Finish(&err); ierr != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete node %s: %s", name, ierr)
+	}
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete node %s: %s", name, err)
+	}
+}
+
+func doDeleteSubtree(t *testing.T, ep, suffix string, shouldSucceed bool, as veyron2.Runtime) {
+	name := naming.JoinAddressName(ep, suffix)
+	ctx := as.NewContext()
+	client := as.Client()
+	call, err := client.StartCall(ctx, name, "Delete", []interface{}{true}, options.NoResolve(true))
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete subtree %s: %s", name, err)
+	}
+	if ierr := call.Finish(&err); ierr != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete subtree %s: %s", name, ierr)
+	}
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete subtree %s: %s", name, err)
+	}
+}
+
 // resolve assumes that the mount contains 0 or 1 servers.
 func resolve(name string, as veyron2.Runtime) (string, error) {
 	// Resolve the name one level.
@@ -214,61 +317,75 @@
 
 	// Mount the collection server into the mount table.
 	vlog.Infof("Mount the collection server into the mount table.")
-	doMount(t, mtAddr, "mounttable/stuff", collectionName, true, rootRT)
+	doMount(t, mtAddr, "stuff", collectionName, true, rootRT)
 
 	// Create a few objects and make sure we can read them.
 	vlog.Infof("Create a few objects.")
-	export(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/the/rain"), "the rain", rootRT)
-	export(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/in/spain"), "in spain", rootRT)
-	export(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/falls"), "falls mainly on the plain", rootRT)
+	export(t, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", rootRT)
+	export(t, naming.JoinAddressName(mtAddr, "stuff/in/spain"), "in spain", rootRT)
+	export(t, naming.JoinAddressName(mtAddr, "stuff/falls"), "falls mainly on the plain", rootRT)
 	vlog.Infof("Make sure we can read them.")
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/the/rain"), "the rain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/in/spain"), "in spain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable//stuff/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/nonexistant"), "falls mainly on the plain", false, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/the/rain"), "the rain", true, bobRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/the/rain"), "the rain", false, aliceRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/in/spain"), "in spain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "/stuff/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/nonexistant"), "falls mainly on the plain", false, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", true, bobRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", false, aliceRT)
 
 	// Test multiple mounts.
 	vlog.Infof("Multiple mounts.")
-	doMount(t, mtAddr, "mounttable/a/b", collectionName, true, rootRT)
-	doMount(t, mtAddr, "mounttable/x/y", collectionName, true, rootRT)
-	doMount(t, mtAddr, "mounttable/alpha//beta", collectionName, true, rootRT)
+	doMount(t, mtAddr, "a/b", collectionName, true, rootRT)
+	doMount(t, mtAddr, "x/y", collectionName, true, rootRT)
+	doMount(t, mtAddr, "alpha//beta", collectionName, true, rootRT)
 	vlog.Infof("Make sure we can read them.")
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuff/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/x/y/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/alpha/beta/falls"), "falls mainly on the plain", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", true, aliceRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", false, bobRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuff/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "x/y/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "alpha/beta/falls"), "falls mainly on the plain", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", true, aliceRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", false, bobRT)
+
+	// Test getting/setting ACLs.
+	acl, etag := doGetACL(t, mtAddr, "stuff", true, rootRT)
+	doSetACL(t, mtAddr, "stuff", acl, "xyzzy", false, rootRT) // bad etag
+	doSetACL(t, mtAddr, "stuff", acl, etag, true, rootRT)     // good etag
+	_, netag := doGetACL(t, mtAddr, "stuff", true, rootRT)
+	if netag == etag {
+		boom(t, "etag didn't change after SetACL: %s", netag)
+	}
+	doSetACL(t, mtAddr, "stuff", acl, "", true, rootRT) // no etag
+
+	// Bob should be able to create nodes under the mounttable root but not alice.
+	doSetACL(t, mtAddr, "onlybob", acl, "", false, aliceRT)
+	doSetACL(t, mtAddr, "onlybob", acl, "", true, bobRT)
 
 	// Test generic unmount.
 	vlog.Info("Test generic unmount.")
-	doUnmount(t, mtAddr, "mounttable/a/b", "", true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", false, rootRT)
+	doUnmount(t, mtAddr, "a/b", "", true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", false, rootRT)
 
 	// Test specific unmount.
 	vlog.Info("Test specific unmount.")
-	doMount(t, mtAddr, "mounttable/a/b", collectionName, true, rootRT)
-	doUnmount(t, mtAddr, "mounttable/a/b", collectionName, true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/a/b/falls"), "falls mainly on the plain", false, rootRT)
+	doMount(t, mtAddr, "a/b", collectionName, true, rootRT)
+	doUnmount(t, mtAddr, "a/b", collectionName, true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", false, rootRT)
 
 	// Try timing out a mount.
 	vlog.Info("Try timing out a mount.")
 	ft := NewFakeTimeClock()
 	setServerListClock(ft)
-	doMount(t, mtAddr, "mounttable/stuffWithTTL", collectionName, true, rootRT)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuffWithTTL/the/rain"), "the rain", true, rootRT)
+	doMount(t, mtAddr, "stuffWithTTL", collectionName, true, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuffWithTTL/the/rain"), "the rain", true, rootRT)
 	ft.advance(time.Duration(ttlSecs+4) * time.Second)
-	checkContents(t, naming.JoinAddressName(mtAddr, "mounttable/stuffWithTTL/the/rain"), "the rain", false, rootRT)
+	checkContents(t, naming.JoinAddressName(mtAddr, "stuffWithTTL/the/rain"), "the rain", false, rootRT)
 
 	// Test unauthorized mount.
 	vlog.Info("Test unauthorized mount.")
-	doMount(t, mtAddr, "mounttable//a/b", collectionName, false, bobRT)
-	doMount(t, mtAddr, "mounttable//a/b", collectionName, false, aliceRT)
+	doMount(t, mtAddr, "/a/b", collectionName, false, bobRT)
+	doMount(t, mtAddr, "/a/b", collectionName, false, aliceRT)
 
-	doUnmount(t, mtAddr, "mounttable/x/y", collectionName, false, bobRT)
+	doUnmount(t, mtAddr, "x/y", collectionName, false, bobRT)
 }
 
 func doGlobX(t *testing.T, ep, suffix, pattern string, as veyron2.Runtime, joinServer bool) []string {
@@ -321,6 +438,20 @@
 	}
 }
 
+// checkExists makes sure a name exists (or not).
+func checkExists(t *testing.T, ep, suffix string, shouldSucceed bool, as veyron2.Runtime) {
+	x := doGlobX(t, ep, "", suffix, as, false)
+	if len(x) != 1 || x[0] != suffix {
+		if shouldSucceed {
+			boom(t, "Failed to find %s", suffix)
+		}
+		return
+	}
+	if !shouldSucceed {
+		boom(t, "%s exists but shouldn't", suffix)
+	}
+}
+
 func TestGlob(t *testing.T) {
 	server, estr := newMT(t, "")
 	defer server.Stop()
@@ -365,15 +496,30 @@
 	}
 }
 
-func TestGlobACLs(t *testing.T) {
-	t.Skip("Skipped until ACLs are correctly implemented for mounttable.Glob.")
+func TestACLTemplate(t *testing.T) {
+	server, estr := newMT(t, "testdata/test.acl")
+	defer server.Stop()
+	fakeServer := naming.JoinAddressName(estr, "quux")
 
+	// Noone should be able to mount on someone else's names.
+	doMount(t, estr, "users/ted", fakeServer, false, aliceRT)
+	doMount(t, estr, "users/carol", fakeServer, false, bobRT)
+	doMount(t, estr, "users/george", fakeServer, false, rootRT)
+
+	// Anyone should be able to mount on their own names.
+	doMount(t, estr, "users/alice", fakeServer, true, aliceRT)
+	doMount(t, estr, "users/bob", fakeServer, true, bobRT)
+	doMount(t, estr, "users/root", fakeServer, true, rootRT)
+}
+
+func TestGlobACLs(t *testing.T) {
 	server, estr := newMT(t, "testdata/test.acl")
 	defer server.Stop()
 
 	// set up a mount space
 	fakeServer := naming.JoinAddressName(estr, "quux")
-	doMount(t, estr, "one/bright/day", fakeServer, true, rootRT)
+	doMount(t, estr, "one/bright/day", fakeServer, false, aliceRT) // Fails because alice can't mount there.
+	doMount(t, estr, "one/bright/day", fakeServer, true, bobRT)
 	doMount(t, estr, "a/b/c", fakeServer, true, rootRT)
 
 	// Try various globs.
@@ -382,12 +528,13 @@
 		in       string
 		expected []string
 	}{
-		{rootRT, "*", []string{"one", "a"}},
-		{aliceRT, "*", []string{"one", "a"}},
-		{bobRT, "*", []string{"one"}},
-		{rootRT, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c"}},
-		{aliceRT, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c"}},
-		{bobRT, "*/...", []string{"one", "one/bright", "one/bright/day"}},
+		{rootRT, "*", []string{"one", "a", "stuff", "users"}},
+		{aliceRT, "*", []string{"one", "a", "users"}},
+		{bobRT, "*", []string{"one", "stuff", "users"}},
+		// bob, alice, and root have different visibility to the space.
+		{rootRT, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c", "stuff", "users"}},
+		{aliceRT, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c", "users"}},
+		{bobRT, "*/...", []string{"one", "one/bright", "one/bright/day", "stuff", "users"}},
 	}
 	for _, test := range tests {
 		out := doGlob(t, estr, "", test.in, test.as)
@@ -395,16 +542,40 @@
 	}
 }
 
+func TestDelete(t *testing.T) {
+	server, estr := newMT(t, "testdata/test.acl")
+	defer server.Stop()
+
+	// set up a mount space
+	fakeServer := naming.JoinAddressName(estr, "quux")
+	doMount(t, estr, "one/bright/day", fakeServer, true, bobRT)
+	doMount(t, estr, "a/b/c", fakeServer, true, rootRT)
+
+	// It shouldn't be possible to delete anything with children unless explicitly requested.
+	doDeleteNode(t, estr, "a/b", false, rootRT)
+	checkExists(t, estr, "a/b", true, rootRT)
+	doDeleteSubtree(t, estr, "a/b", true, rootRT)
+	checkExists(t, estr, "a/b", false, rootRT)
+
+	// Alice shouldn't be able to delete what bob created but bob and root should.
+	doDeleteNode(t, estr, "one/bright/day", false, aliceRT)
+	checkExists(t, estr, "one/bright/day", true, rootRT)
+	doDeleteNode(t, estr, "one/bright/day", true, rootRT)
+	checkExists(t, estr, "one/bright/day", false, rootRT)
+	doDeleteNode(t, estr, "one/bright", true, bobRT)
+	checkExists(t, estr, "one/bright", false, rootRT)
+}
+
 func TestServerFormat(t *testing.T) {
 	server, estr := newMT(t, "")
 	defer server.Stop()
 
-	doMount(t, estr, "mounttable/endpoint", naming.JoinAddressName(estr, "life/on/the/mississippi"), true, rootRT)
-	doMount(t, estr, "mounttable/hostport", "/atrampabroad:8000", true, rootRT)
-	doMount(t, estr, "mounttable/hostport-endpoint-platypus", "/@atrampabroad:8000@@", true, rootRT)
-	doMount(t, estr, "mounttable/invalid/not/rooted", "atrampabroad:8000", false, rootRT)
-	doMount(t, estr, "mounttable/invalid/no/port", "/atrampabroad", false, rootRT)
-	doMount(t, estr, "mounttable/invalid/endpoint", "/@following the equator:8000@@@", false, rootRT)
+	doMount(t, estr, "endpoint", naming.JoinAddressName(estr, "life/on/the/mississippi"), true, rootRT)
+	doMount(t, estr, "hostport", "/atrampabroad:8000", true, rootRT)
+	doMount(t, estr, "hostport-endpoint-platypus", "/@atrampabroad:8000@@", true, rootRT)
+	doMount(t, estr, "invalid/not/rooted", "atrampabroad:8000", false, rootRT)
+	doMount(t, estr, "invalid/no/port", "/atrampabroad", false, rootRT)
+	doMount(t, estr, "invalid/endpoint", "/@following the equator:8000@@@", false, rootRT)
 }
 
 func TestExpiry(t *testing.T) {
@@ -417,21 +588,21 @@
 
 	ft := NewFakeTimeClock()
 	setServerListClock(ft)
-	doMount(t, estr, "mounttable/a1/b1", collectionName, true, rootRT)
-	doMount(t, estr, "mounttable/a1/b2", collectionName, true, rootRT)
-	doMount(t, estr, "mounttable/a2/b1", collectionName, true, rootRT)
-	doMount(t, estr, "mounttable/a2/b2/c", collectionName, true, rootRT)
+	doMount(t, estr, "a1/b1", collectionName, true, rootRT)
+	doMount(t, estr, "a1/b2", collectionName, true, rootRT)
+	doMount(t, estr, "a2/b1", collectionName, true, rootRT)
+	doMount(t, estr, "a2/b2/c", collectionName, true, rootRT)
 
-	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, estr, "mounttable", "*/b1/...", rootRT))
+	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, estr, "", "*/b1/...", rootRT))
 	ft.advance(time.Duration(ttlSecs/2) * time.Second)
-	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, estr, "mounttable", "*/b1/...", rootRT))
-	checkMatch(t, []string{"c"}, doGlob(t, estr, "mounttable/a2/b2", "*", rootRT))
+	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, estr, "", "*/b1/...", rootRT))
+	checkMatch(t, []string{"c"}, doGlob(t, estr, "a2/b2", "*", rootRT))
 	// Refresh only a1/b1.  All the other mounts will expire upon the next
 	// ft advance.
-	doMount(t, estr, "mounttable/a1/b1", collectionName, true, rootRT)
+	doMount(t, estr, "a1/b1", collectionName, true, rootRT)
 	ft.advance(time.Duration(ttlSecs/2+4) * time.Second)
-	checkMatch(t, []string{"a1"}, doGlob(t, estr, "mounttable", "*", rootRT))
-	checkMatch(t, []string{"a1/b1"}, doGlob(t, estr, "mounttable", "*/b1/...", rootRT))
+	checkMatch(t, []string{"a1"}, doGlob(t, estr, "", "*", rootRT))
+	checkMatch(t, []string{"a1/b1"}, doGlob(t, estr, "", "*/b1/...", rootRT))
 }
 
 func TestBadACLs(t *testing.T) {
@@ -443,10 +614,6 @@
 	if err == nil {
 		boom(t, "Expected error from missing acl file")
 	}
-	_, err = NewMountTable("testdata/noroot.acl")
-	if err == nil {
-		boom(t, "Expected error for missing '/' acl")
-	}
 }
 
 func init() {
diff --git a/services/mounttable/lib/neighborhood.go b/services/mounttable/lib/neighborhood.go
index 1392b79..3cc6952 100644
--- a/services/mounttable/lib/neighborhood.go
+++ b/services/mounttable/lib/neighborhood.go
@@ -14,6 +14,7 @@
 	"v.io/core/veyron2/naming"
 	"v.io/core/veyron2/security"
 	"v.io/core/veyron2/services/mounttable"
+	"v.io/core/veyron2/services/security/access"
 	verror "v.io/core/veyron2/verror2"
 	"v.io/core/veyron2/vlog"
 
@@ -255,6 +256,11 @@
 	return errors.New("this server does not implement Unmount")
 }
 
+// Delete not implemented.
+func (*neighborhoodService) Delete(_ ipc.ServerContext, _ bool) error {
+	return errors.New("this server does not implement Delete")
+}
+
 // Glob__ implements ipc.AllGlobber
 func (ns *neighborhoodService) Glob__(ctx ipc.ServerContext, pattern string) (<-chan naming.VDLMountEntry, error) {
 	g, err := glob.Parse(pattern)
@@ -291,3 +297,11 @@
 		return nil, verror.Make(naming.ErrNoSuchName, ctx.Context(), ns.elems)
 	}
 }
+
+func (*neighborhoodService) SetACL(ctx ipc.ServerContext, acl access.TaggedACLMap, etag string) error {
+	return errors.New("this server does not implement SetACL")
+}
+
+func (*neighborhoodService) GetACL(ctx ipc.ServerContext) (acl access.TaggedACLMap, etag string, err error) {
+	return nil, "", nil
+}
diff --git a/services/mounttable/lib/tamg.go b/services/mounttable/lib/tamg.go
new file mode 100644
index 0000000..d61bd3d
--- /dev/null
+++ b/services/mounttable/lib/tamg.go
@@ -0,0 +1,70 @@
+package mounttable
+
+import (
+	"strconv"
+
+	"v.io/core/veyron2/security"
+	"v.io/core/veyron2/services/security/access"
+	"v.io/core/veyron2/verror2"
+)
+
+// TAMG associates a generation with a TaggedACLMap
+type TAMG struct {
+	tam        access.TaggedACLMap
+	generation int32
+}
+
+func NewTAMG() *TAMG {
+	return &TAMG{tam: make(access.TaggedACLMap)}
+}
+
+// Set sets the ACLs iff generation matches the current generation.  If the set happens, the generation is advanced.
+// If b is nil, this creates a new TAMG.
+func (b *TAMG) Set(genstr string, tam access.TaggedACLMap) (*TAMG, error) {
+	if b == nil {
+		b = new(TAMG)
+	}
+	if len(genstr) > 0 {
+		gen, err := strconv.ParseInt(genstr, 10, 32)
+		if err != nil {
+			return b, verror2.Make(access.BadEtag, nil)
+		}
+		if gen >= 0 && int32(gen) != b.generation {
+			return b, verror2.Make(access.BadEtag, nil)
+		}
+	}
+	b.tam = tam
+	b.generation++
+	// Protect against wrap.
+	if b.generation < 0 {
+		b.generation = 0
+	}
+	return b, nil
+}
+
+// Get returns the current generation and acls.
+func (b *TAMG) Get() (string, access.TaggedACLMap) {
+	if b == nil {
+		return "", nil
+	}
+	return strconv.FormatInt(int64(b.generation), 10), b.tam
+}
+
+// GetACLForTag returns the current acls for the given tag.
+func (b *TAMG) GetACLForTag(tag string) (access.ACL, bool) {
+	acl, exists := b.tam[tag]
+	return acl, exists
+}
+
+// Copy copies the receiver.
+func (b *TAMG) Copy() *TAMG {
+	nt := new(TAMG)
+	nt.tam = b.tam.Copy()
+	nt.generation = b.generation
+	return nt
+}
+
+// Add adds the blessing pattern to the tag in the reciever.
+func (b *TAMG) Add(pattern security.BlessingPattern, tag string) {
+	b.tam.Add(pattern, tag)
+}
diff --git a/services/mounttable/lib/testdata/test.acl b/services/mounttable/lib/testdata/test.acl
index 7990532..edfb36f 100644
--- a/services/mounttable/lib/testdata/test.acl
+++ b/services/mounttable/lib/testdata/test.acl
@@ -1,14 +1,21 @@
 {
-"/": {
+"": {
 	"Read":  { "In": ["..."] },
-	"Write": { "In": ["root"] }
+	"Admin": { "In": ["root"] },
+	"Create": { "In": ["root", "bob"] }
 },
-"/stuff": {
+"stuff": {
 	"Read":  { "In": ["bob", "root"] },
-        "Write": { "In": ["root"] }
+        "Mount": { "In": ["root"] }
 },
-"/a": {
+"a": {
 	"Read":  { "In": ["alice", "root"] },
-        "Write": { "In": ["root"] }
+        "Create": { "In": ["root"] }
+},
+"users": {
+	"Create": { "In": ["..."] }
+},
+"users/%%": {
+	"Admin":  { "In": ["%%/..."] }
 }
 }
diff --git a/tools/mounttable/impl_test.go b/tools/mounttable/impl_test.go
index 4d216a0..6a091fe 100644
--- a/tools/mounttable/impl_test.go
+++ b/tools/mounttable/impl_test.go
@@ -11,6 +11,7 @@
 	"v.io/core/veyron2/rt"
 	"v.io/core/veyron2/security"
 	"v.io/core/veyron2/services/mounttable"
+	"v.io/core/veyron2/services/security/access"
 	"v.io/core/veyron2/vlog"
 
 	"v.io/core/veyron/profiles"
@@ -53,6 +54,20 @@
 	return
 }
 
+func (s *server) Delete(ipc.ServerContext, bool) error {
+	vlog.VI(2).Infof("Delete() was called. suffix=%v", s.suffix)
+	return nil
+}
+func (s *server) SetACL(ipc.ServerContext, access.TaggedACLMap, string) error {
+	vlog.VI(2).Infof("SetACL() was called. suffix=%v", s.suffix)
+	return nil
+}
+
+func (s *server) GetACL(ipc.ServerContext) (access.TaggedACLMap, string, error) {
+	vlog.VI(2).Infof("GetACL() was called. suffix=%v", s.suffix)
+	return nil, "", nil
+}
+
 type dispatcher struct {
 }