// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package mounttablelib implements utilities for mounttable implementations.
package mounttablelib

import (
	"encoding/json"
	"os"
	"reflect"
	"strings"
	"sync"
	"time"

	"v.io/v23"
	"v.io/v23/context"
	"v.io/v23/naming"
	"v.io/v23/rpc"
	"v.io/v23/security"
	"v.io/v23/security/access"
	"v.io/v23/services/mounttable"
	"v.io/v23/verror"

	"v.io/x/lib/vlog"

	"v.io/x/ref/lib/glob"
	"v.io/x/ref/lib/stats"
)

const pkgPath = "v.io/x/ref/services/mounttable/mounttablelib"

var (
	errMalformedAddress = verror.Register(pkgPath+".errMalformedAddress", verror.NoRetry, "{1:}{2:} malformed address {3} for mounted server {4}{:_}")
	errMTDoesntMatch    = verror.Register(pkgPath+".errMTDoesntMatch", verror.NoRetry, "{1:}{2:} MT doesn't match{:_}")
	errLeafDoesntMatch  = verror.Register(pkgPath+".errLeafDoesntMatch", verror.NoRetry, "{1:}{2:} Leaf doesn't match{:_}")
	errCantDeleteRoot   = verror.Register(pkgPath+".errCantDeleteRoot", verror.NoRetry, "{1:}{2:} cannot delete root node{:_}")
	errNotEmpty         = verror.Register(pkgPath+".errNotEmpty", verror.NoRetry, "{1:}{2:} cannot delete {3}: has children{:_}")
	errNamingLoop       = verror.Register(pkgPath+".errNamingLoop", verror.NoRetry, "{1:}{2:} Loop in namespace{:_}")
)

var (
	traverseTags = []mounttable.Tag{mounttable.Read, mounttable.Resolve, mounttable.Create, mounttable.Admin}
	createTags   = []mounttable.Tag{mounttable.Create, mounttable.Admin}
	removeTags   = []mounttable.Tag{mounttable.Admin}
	mountTags    = []mounttable.Tag{mounttable.Mount, mounttable.Admin}
	resolveTags  = []mounttable.Tag{mounttable.Read, mounttable.Resolve, mounttable.Admin}
	globTags     = []mounttable.Tag{mounttable.Read, mounttable.Admin}
	setTags      = []mounttable.Tag{mounttable.Admin}
	getTags      = []mounttable.Tag{mounttable.Admin, mounttable.Read}
	allTags      = []mounttable.Tag{mounttable.Read, mounttable.Resolve, mounttable.Admin, mounttable.Mount, mounttable.Create}
)

// mountTable represents a namespace.  One exists per server instance.
type mountTable struct {
	root          *node
	superUsers    access.AccessList
	nodeCounter   *stats.Integer
	serverCounter *stats.Integer
}

var _ rpc.Dispatcher = (*mountTable)(nil)

// mountContext represents a client bind.  The name is the name that was bound to.
type mountContext struct {
	name  string
	elems []string // parsed elements of name
	mt    *mountTable
}

// mount represents a single mount point.  It contains the rooted names of all servers mounted
// here.  The servers are considered equivalent, i.e., RPCs to a name below this
// point can be sent to any of these servers.
type mount struct {
	servers *serverList
	mt      bool
	leaf    bool
}

// node is a single point in the tree representing the mount table.
type node struct {
	sync.RWMutex
	parent              *node
	mount               *mount
	children            map[string]*node
	acls                *TAMG
	amTemplate          access.Permissions
	explicitAccessLists bool
}

const templateVar = "%%"

// NewMountTableDispatcher creates a new server that uses the AccessLists specified in
// aclfile for authorization.
//
// aclfile is a JSON-encoded mapping from paths in the mounttable to the
// access.Permissions for that path. The tags used in the map are the typical
// access tags (the Tag type defined in v.io/v23/security/access).
//
// statsPrefix is the prefix for for exported statistics objects.
func NewMountTableDispatcher(aclfile, statsPrefix string) (rpc.Dispatcher, error) {
	mt := &mountTable{
		root:          new(node),
		nodeCounter:   stats.NewInteger(naming.Join(statsPrefix, "num-nodes")),
		serverCounter: stats.NewInteger(naming.Join(statsPrefix, "num-mounted-servers")),
	}
	mt.root.parent = mt.newNode() // just for its lock
	if err := mt.parseAccessLists(aclfile); err != nil && !os.IsNotExist(err) {
		return nil, err
	}
	return mt, nil
}

// newNode creates a new node, and updates the number of nodes.
func (mt *mountTable) newNode() *node {
	mt.nodeCounter.Incr(1)
	return new(node)
}

// deleteNode deletes a node and all its children, and updates the number of
// nodes.
func (mt *mountTable) deleteNode(parent *node, child string) {
	// Assumes that parent and parent[child] are locked.

	// Walk the tree and count the number of nodes deleted.
	n := parent.children[child]
	if n == nil {
		return
	}
	nodeCount := int64(0)
	serverCount := int64(0)
	queue := []*node{n}
	for len(queue) > 0 {
		n := queue[0]
		queue = queue[1:]
		nodeCount++
		serverCount += numServers(n)
		for _, ch := range n.children {
			ch.Lock() // Keep locked until it is deleted.
			queue = append(queue, ch)
		}
	}

	mt.nodeCounter.Incr(-nodeCount)
	mt.serverCounter.Incr(-serverCount)
	delete(parent.children, child)
}

func (mt *mountTable) parseAccessLists(path string) error {
	vlog.VI(2).Infof("parseAccessLists(%s)", path)
	if path == "" {
		return nil
	}
	var tams map[string]access.Permissions
	f, err := os.Open(path)
	if err != nil {
		return err
	}
	defer f.Close()
	if err = json.NewDecoder(f).Decode(&tams); err != nil {
		return err
	}
	for name, tam := range tams {
		var elems []string
		isPattern := false
		// Create name and add the AccessList map to it.
		if len(name) == 0 {
			// If the config file has is an Admin tag on the root AccessList, the
			// list of Admin users is the equivalent of a super user for
			// the whole table.  This is not updated if the AccessList is later
			// modified.
			if acl, exists := tam[string(mounttable.Admin)]; exists {
				mt.superUsers = acl
			}
		} else {
			// AccessList templates terminate with a %% element.  These are very
			// constrained matches, i.e., the trailing element of the name
			// is copied into every %% in the AccessList.
			elems = strings.Split(name, "/")
			if elems[len(elems)-1] == templateVar {
				isPattern = true
				elems = elems[:len(elems)-1]
			}
		}

		n, err := mt.findNode(nil, nil, elems, true, nil)
		if n != nil || err == nil {
			vlog.VI(2).Infof("added tam %v to %s", tam, name)
			if isPattern {
				n.amTemplate = tam
			} else {
				n.acls, _ = n.acls.Set("", tam)
				n.explicitAccessLists = true
			}
		}
		n.parent.Unlock()
		n.Unlock()
	}
	return nil
}

// Lookup implements rpc.Dispatcher.Lookup.
func (mt *mountTable) Lookup(name string) (interface{}, security.Authorizer, error) {
	vlog.VI(2).Infof("*********************Lookup %s", name)
	ms := &mountContext{
		name: name,
		mt:   mt,
	}
	if len(name) > 0 {
		ms.elems = strings.Split(name, "/")
	}
	return mounttable.MountTableServer(ms), ms, nil
}

// isActive returns true if a mount has unexpired servers attached.
func (m *mount) isActive() bool {
	if m == nil {
		return false
	}
	return m.servers.removeExpired() > 0
}

// satisfies returns no error if the ctx + n.acls satisfies the associated one of the required Tags.
func (n *node) satisfies(mt *mountTable, ctx *context.T, call security.Call, tags []mounttable.Tag) error {
	// No AccessLists means everything (for now).
	if ctx == nil || call == nil || tags == nil || n.acls == nil {
		return nil
	}
	// "Self-RPCs" are always authorized.
	if l, r := call.LocalBlessings().PublicKey(), call.RemoteBlessings().PublicKey(); l != nil && reflect.DeepEqual(l, r) {
		return nil
	}
	// Match client's blessings against the AccessLists.
	blessings, invalidB := security.RemoteBlessingNames(ctx, call)
	for _, tag := range tags {
		if acl, exists := n.acls.GetPermissionsForTag(string(tag)); exists && acl.Includes(blessings...) {
			return nil
		}
	}
	if mt.superUsers.Includes(blessings...) {
		return nil
	}
	if len(invalidB) > 0 {
		return verror.New(verror.ErrNoAccess, ctx, blessings, invalidB)
	}
	return verror.New(verror.ErrNoAccess, ctx, blessings)
}

func expand(acl *access.AccessList, name string) *access.AccessList {
	newAccessList := new(access.AccessList)
	for _, bp := range acl.In {
		newAccessList.In = append(newAccessList.In, security.BlessingPattern(strings.Replace(string(bp), templateVar, name, -1)))
	}
	for _, bp := range acl.NotIn {
		newAccessList.NotIn = append(newAccessList.NotIn, strings.Replace(bp, templateVar, name, -1))
	}
	return newAccessList
}

// satisfiesTemplate returns no error if the ctx + n.amTemplate satisfies the associated one of
// the required Tags.
func (n *node) satisfiesTemplate(ctx *context.T, call security.Call, tags []mounttable.Tag, name string) error {
	if n.amTemplate == nil {
		return nil
	}
	// Match client's blessings against the AccessLists.
	blessings, invalidB := security.RemoteBlessingNames(ctx, call)
	for _, tag := range tags {
		if acl, exists := n.amTemplate[string(tag)]; exists && expand(&acl, name).Includes(blessings...) {
			return nil
		}
	}
	return verror.New(verror.ErrNoAccess, ctx, blessings, invalidB)
}

// copyAccessLists copies one nodes AccessLists to another and adds the clients blessings as
// patterns to the Admin tag.
func copyAccessLists(ctx *context.T, call security.Call, cur *node) *TAMG {
	if ctx == nil {
		return nil
	}
	if cur.acls == nil {
		return nil
	}
	acls := cur.acls.Copy()
	blessings, _ := security.RemoteBlessingNames(ctx, call)
	for _, b := range blessings {
		acls.Add(security.BlessingPattern(b), string(mounttable.Admin))
	}
	return acls
}

// createTAMGFromTemplate creates a new TAMG from the template subsituting name for %% everywhere.
func createTAMGFromTemplate(tam access.Permissions, name string) *TAMG {
	tamg := NewTAMG()
	for tag, acl := range tam {
		tamg.tam[tag] = *expand(&acl, name)
	}
	return tamg
}

// traverse returns the node for the path represented by elems.  If none exists and create is false, return nil.
// Otherwise create the path and return a pointer to the terminal node.  If a mount point is encountered
// while following the path, return that node and any remaining elems.
//
// If it returns a node, both the node and its parent are locked.
func (mt *mountTable) traverse(ctx *context.T, call security.Call, elems []string, create bool) (*node, []string, error) {
	// Invariant is that the current node and its parent are both locked.
	cur := mt.root
	cur.parent.Lock()
	cur.Lock()
	for i, e := range elems {
		vlog.VI(2).Infof("satisfying %v %v", elems[0:i], *cur)
		if call != nil {
			if err := cur.satisfies(mt, ctx, call, traverseTags); err != nil {
				cur.parent.Unlock()
				cur.Unlock()
				return nil, nil, err
			}
		}
		// If we hit another mount table, we're done.
		if cur.mount.isActive() {
			return cur, elems[i:], nil
		}
		// Walk the children looking for a match.
		c, ok := cur.children[e]
		if ok {
			cur.parent.Unlock()
			cur = c
			cur.Lock()
			continue
		}
		if !create {
			cur.parent.Unlock()
			cur.Unlock()
			return nil, nil, nil
		}
		// Create a new node and keep recursing.
		cur.parent.Unlock()
		if err := cur.satisfies(mt, ctx, call, createTags); err != nil {
			cur.Unlock()
			return nil, nil, err
		}
		if call != nil {
			if err := cur.satisfiesTemplate(ctx, call, createTags, e); err != nil {
				cur.Unlock()
				return nil, nil, err
			}
		}
		// At this point cur is still locked, OK to use and change it.
		next := mt.newNode()
		next.parent = cur
		if cur.amTemplate != nil {
			next.acls = createTAMGFromTemplate(cur.amTemplate, e)
		} else {
			next.acls = copyAccessLists(ctx, call, cur)
		}
		if cur.children == nil {
			cur.children = make(map[string]*node)
		}
		cur.children[e] = next
		cur = next
		cur.Lock()
	}
	// Only way out of the loop is via a return or exhausting all elements.  In
	// the latter case both cur and cur.parent are locked.
	return cur, nil, nil
}

// findNode finds a node in the table and optionally creates a path to it.
//
// If a node is found, on return it and its parent are locked.
func (mt *mountTable) findNode(ctx *context.T, call security.Call, elems []string, create bool, tags []mounttable.Tag) (*node, error) {
	n, nelems, err := mt.traverse(ctx, call, elems, create)
	if err != nil {
		return nil, err
	}
	if n == nil {
		return nil, nil
	}
	if len(nelems) > 0 {
		n.parent.Unlock()
		n.Unlock()
		return nil, nil
	}
	if err := n.satisfies(mt, ctx, call, tags); err != nil {
		n.parent.Unlock()
		n.Unlock()
		return nil, err
	}
	return n, nil
}

// findMountPoint returns the first mount point encountered in the path and
// any elements remaining of the path.
//
// If a mountpoint is found, on return it and its parent are locked.
func (mt *mountTable) findMountPoint(ctx *context.T, call security.Call, elems []string) (*node, []string, error) {
	n, nelems, err := mt.traverse(ctx, call, elems, false)
	if err != nil {
		return nil, nil, err
	}
	if n == nil {
		return nil, nil, nil
	}
	// If we can't resolve it, we can't use it.
	if err := n.satisfies(mt, ctx, call, resolveTags); err != nil {
		n.parent.Unlock()
		n.Unlock()
		return nil, nil, err
	}
	if !n.mount.isActive() {
		removed := n.removeUseless(mt)
		n.parent.Unlock()
		n.Unlock()
		// If we removed the node, see if we can remove any of its
		// ascendants.
		if removed {
			mt.removeUselessRecursive(elems[:len(elems)-1])
		}
		return nil, nil, nil
	}
	return n, nelems, nil
}

// Authorize verifies that the client has access to the requested node.
// Since we do the check at the time of access, we always return OK here.
func (ms *mountContext) Authorize(*context.T, security.Call) error {
	return nil
}

// ResolveStep returns the next server in a resolution in the form of a MountEntry.  The name
// in the mount entry is the name relative to the server's root.
func (ms *mountContext) ResolveStep(ctx *context.T, call rpc.ServerCall) (entry naming.MountEntry, err error) {
	vlog.VI(2).Infof("ResolveStep %q", ms.name)
	mt := ms.mt
	// Find the next mount point for the name.
	n, elems, werr := mt.findMountPoint(ctx, call.Security(), ms.elems)
	if werr != nil {
		err = werr
		return
	}
	if n == nil {
		entry.Name = ms.name
		if len(ms.elems) == 0 {
			err = verror.New(naming.ErrNoSuchNameRoot, ctx, ms.name)
		} else {
			err = verror.New(naming.ErrNoSuchName, ctx, ms.name)
		}
		return
	}
	n.parent.Unlock()
	defer n.Unlock()
	entry.Servers = n.mount.servers.copyToSlice()
	entry.Name = strings.Join(elems, "/")
	entry.ServesMountTable = n.mount.mt
	entry.IsLeaf = n.mount.leaf
	return
}

func hasMTFlag(flags naming.MountFlag) bool {
	return (flags & naming.MT) == naming.MT
}

func hasLeafFlag(flags naming.MountFlag) bool {
	return (flags & naming.Leaf) == naming.Leaf
}

func hasReplaceFlag(flags naming.MountFlag) bool {
	return (flags & naming.Replace) == naming.Replace
}

func numServers(n *node) int64 {
	if n == nil || n.mount == nil || n.mount.servers == nil {
		return 0
	}
	return int64(n.mount.servers.len())
}

// Mount a server onto the name in the receiver.
func (ms *mountContext) Mount(ctx *context.T, call rpc.ServerCall, server string, ttlsecs uint32, flags naming.MountFlag) error {
	mt := ms.mt
	if ttlsecs == 0 {
		ttlsecs = 10 * 365 * 24 * 60 * 60 // a really long time
	}
	vlog.VI(2).Infof("*********************Mount %q -> %s", ms.name, server)

	// Make sure the server address is reasonable.
	epString := server
	if naming.Rooted(server) {
		epString, _ = naming.SplitAddressName(server)
	}
	_, err := v23.NewEndpoint(epString)
	if err != nil {
		return verror.New(errMalformedAddress, ctx, epString, server)
	}

	// Find/create node in namespace and add the mount.
	n, werr := mt.findNode(ctx, call.Security(), ms.elems, true, mountTags)
	if werr != nil {
		return werr
	}
	if n == nil {
		return verror.New(naming.ErrNoSuchNameRoot, ctx, ms.name)
	}
	// We don't need the parent lock
	n.parent.Unlock()
	defer n.Unlock()

	wantMT := hasMTFlag(flags)
	wantLeaf := hasLeafFlag(flags)
	if n.mount != nil {
		if wantMT != n.mount.mt {
			return verror.New(errMTDoesntMatch, ctx)
		}
		if wantLeaf != n.mount.leaf {
			return verror.New(errLeafDoesntMatch, ctx)
		}
	}
	nServersBefore := numServers(n)
	if hasReplaceFlag(flags) {
		n.mount = nil
	}
	if n.mount == nil {
		n.mount = &mount{servers: newServerList(), mt: wantMT, leaf: wantLeaf}
	}
	n.mount.servers.add(server, time.Duration(ttlsecs)*time.Second)
	mt.serverCounter.Incr(numServers(n) - nServersBefore)
	return nil
}

// fullName is for debugging only and should not normally be called.
func (n *node) fullName() string {
	if n.parent == nil || n.parent.parent == nil {
		return ""
	}
	for k, c := range n.parent.children {
		if c == n {
			return n.parent.fullName() + "/" + k
		}
	}
	return n.parent.fullName() + "/" + "?"
}

// removeUseless removes a node and all of its ascendants that are not useful.
//
// We assume both n and n.parent are locked.
func (n *node) removeUseless(mt *mountTable) bool {
	if len(n.children) > 0 || n.mount.isActive() || n.explicitAccessLists {
		return false
	}
	for k, c := range n.parent.children {
		if c == n {
			mt.deleteNode(n.parent, k)
			break
		}
	}
	return true
}

// removeUselessRecursive removes any useless nodes on the tail of the path.
func (mt *mountTable) removeUselessRecursive(elems []string) {
	for i := len(elems); i > 0; i-- {
		n, nelems, _ := mt.traverse(nil, nil, elems[:i-1], false)
		if n == nil {
			break
		}
		if nelems != nil {
			n.parent.Unlock()
			n.Unlock()
			break
		}
		removed := n.removeUseless(mt)
		n.parent.Unlock()
		n.Unlock()
		if !removed {
			break
		}
	}
}

// Unmount removes servers from the name in the receiver. If server is specified, only that
// server is removed.
func (ms *mountContext) Unmount(ctx *context.T, call rpc.ServerCall, server string) error {
	vlog.VI(2).Infof("*********************Unmount %q, %s", ms.name, server)
	mt := ms.mt
	n, err := mt.findNode(ctx, call.Security(), ms.elems, false, mountTags)
	if err != nil {
		return err
	}
	if n == nil {
		return nil
	}
	nServersBefore := numServers(n)
	if server == "" {
		n.mount = nil
	} else if n.mount != nil && n.mount.servers.remove(server) == 0 {
		n.mount = nil
	}
	mt.serverCounter.Incr(numServers(n) - nServersBefore)
	removed := n.removeUseless(mt)
	n.parent.Unlock()
	n.Unlock()
	if removed {
		// If we removed the node, see if we can also remove
		// any of its ascendants.
		mt.removeUselessRecursive(ms.elems[:len(ms.elems)-1])
	}
	return nil
}

// Delete removes the receiver.  If all is true, any subtree is also removed.
func (ms *mountContext) Delete(ctx *context.T, call rpc.ServerCall, deleteSubTree bool) error {
	vlog.VI(2).Infof("*********************Delete %q, %v", ms.name, deleteSubTree)
	if len(ms.elems) == 0 {
		// We can't delete the root.
		return verror.New(errCantDeleteRoot, ctx)
	}
	mt := ms.mt
	// Find and lock the parent node.
	n, err := mt.findNode(ctx, call.Security(), ms.elems, false, removeTags)
	if err != nil {
		return err
	}
	if n == nil {
		return nil
	}
	defer n.parent.Unlock()
	defer n.Unlock()
	if !deleteSubTree && len(n.children) > 0 {
		return verror.New(errNotEmpty, ctx, ms.name)
	}
	mt.deleteNode(n.parent, ms.elems[len(ms.elems)-1])
	return nil
}

// A struct holding a partial result of Glob.
type globEntry struct {
	n    *node
	name string
}

// globStep is called with n and n.parent locked.  Returns with both unlocked.
func (mt *mountTable) globStep(ctx *context.T, call security.Call, n *node, name string, pattern *glob.Glob, ch chan<- naming.GlobReply) {
	vlog.VI(2).Infof("globStep(%s, %s)", name, pattern)

	// If this is a mount point, we're done.
	if m := n.mount; m != nil {
		removed := n.removeUseless(mt)
		if removed {
			n.parent.Unlock()
			n.Unlock()
			return
		}
		// Don't need the parent lock anymore.
		n.parent.Unlock()
		me := naming.MountEntry{
			Name: name,
		}
		// Only fill in the mount info if we can resolve this name.
		if err := n.satisfies(mt, ctx, call, resolveTags); err == nil {
			me.Servers = m.servers.copyToSlice()
			me.ServesMountTable = n.mount.mt
			me.IsLeaf = n.mount.leaf
		} else {
			me.Servers = []naming.MountedServer{}
		}
		// Unlock while we are sending on the channel to avoid livelock.
		n.Unlock()
		ch <- naming.GlobReplyEntry{me}
		return
	}

	if !pattern.Finished() {
		// We can only list children to whom we have some access AND either
		// - we have Read or Admin access to the directory or
		// - we have Resolve or Create access to the directory and the
		//    next element in the pattern is a fixed string.
		if err := n.satisfies(mt, ctx, call, globTags); err != nil {
			if err := n.satisfies(mt, ctx, call, traverseTags); err != nil {
				goto out
			}
			fixed, _ := pattern.SplitFixedPrefix()
			if len(fixed) == 0 {
				goto out
			}
		}

		// Since we will be unlocking the node,
		// we need to grab the list of children before any unlocking.
		children := make(map[string]*node, len(n.children))
		for k, c := range n.children {
			children[k] = c
		}
		n.parent.Unlock()

		// Recurse through the children.
		for k, c := range children {
			// At this point, n lock is held.
			if ok, _, suffix := pattern.MatchInitialSegment(k); ok {
				c.Lock()
				// If child allows any access show it.  Otherwise, skip.
				if err := c.satisfies(mt, ctx, call, allTags); err != nil {
					c.Unlock()
					continue
				}
				mt.globStep(ctx, call, c, naming.Join(name, k), suffix, ch)
				n.Lock()
			}
		}
		// Relock the node and its parent in the correct order.
		// Safe to access n.parent when its unlocked because it never changes.
		n.Unlock()
		n.parent.Lock()
		n.Lock()
	}

out:
	// Remove if no longer useful.
	if n.removeUseless(mt) || pattern.Len() != 0 {
		n.parent.Unlock()
		n.Unlock()
		return
	}

	// To see anything, one has to have some access to the node.  Don't need the parent lock anymore.
	n.parent.Unlock()
	if err := n.satisfies(mt, ctx, call, allTags); err != nil {
		n.Unlock()
		return
	}
	// Unlock while we are sending on the channel to avoid livelock.
	n.Unlock()
	// Intermediate nodes are marked as serving a mounttable since they answer the mounttable methods.
	ch <- naming.GlobReplyEntry{naming.MountEntry{Name: name, ServesMountTable: true}}
}

// Glob finds matches in the namespace.  If we reach a mount point before matching the
// whole pattern, return that mount point.
//
// pattern is a glob pattern as defined by the v.io/x/ref/lib/glob package.
//
// To avoid livelocking an application, Glob grabs and releases locks as it descends the tree
// and holds no locks while writing to the channel.  As such a glob can interleave with other
// operations that add or remove nodes.  The result returned by glob may, therefore, represent
// a state that never existed in the mounttable.  For example, if someone removes c/d and later
// adds a/b while a Glob is in progress, the Glob may return a set of nodes that includes both
// c/d and a/b.
func (ms *mountContext) Glob__(ctx *context.T, call rpc.ServerCall, pattern string) (<-chan naming.GlobReply, error) {
	vlog.VI(2).Infof("mt.Glob %v", ms.elems)
	scall := call.Security()

	g, err := glob.Parse(pattern)
	if err != nil {
		return nil, err
	}

	mt := ms.mt
	ch := make(chan naming.GlobReply)
	go func() {
		defer close(ch)
		// If there was an access error, just ignore the entry, i.e., make it invisible.
		n, err := mt.findNode(ctx, scall, ms.elems, false, nil)
		if err != nil {
			return
		}
		// If the current name is not fully resolvable on this nameserver we
		// don't need to evaluate the glob expression. Send a partially resolved
		// name back to the client.
		if n == nil {
			ms.linkToLeaf(ctx, scall, ch)
			return
		}
		mt.globStep(ctx, scall, n, "", g, ch)
	}()
	return ch, nil
}

func (ms *mountContext) linkToLeaf(ctx *context.T, call security.Call, ch chan<- naming.GlobReply) {
	n, elems, err := ms.mt.findMountPoint(ctx, call, ms.elems)
	if err != nil || n == nil {
		return
	}
	n.parent.Unlock()
	servers := n.mount.servers.copyToSlice()
	for i, s := range servers {
		servers[i].Server = naming.Join(s.Server, strings.Join(elems, "/"))
	}
	n.Unlock()
	ch <- naming.GlobReplyEntry{naming.MountEntry{Name: "", Servers: servers}}
}

func (ms *mountContext) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
	vlog.VI(2).Infof("SetPermissions %q", ms.name)
	mt := ms.mt

	// Find/create node in namespace and add the mount.
	n, err := mt.findNode(ctx, call.Security(), ms.elems, true, setTags)
	if err != nil {
		return err
	}
	if n == nil {
		// TODO(p): can this even happen?
		return verror.New(naming.ErrNoSuchName, ctx, ms.name)
	}
	n.parent.Unlock()
	defer n.Unlock()

	// If the caller is trying to add a Permission that they are no longer Admin in,
	// retain the caller's blessings that were in Admin to prevent them from locking themselves out.
	bnames, _ := security.RemoteBlessingNames(ctx, call.Security())
	if acl, ok := perms[string(mounttable.Admin)]; !ok || !acl.Includes(bnames...) {
		_, oldPerms := n.acls.Get()
		oldAcl := oldPerms[string(mounttable.Admin)]
		for _, bname := range bnames {
			if oldAcl.Includes(bname) {
				perms.Add(security.BlessingPattern(bname), string(mounttable.Admin))
			}
		}
	}
	perms.Normalize()

	n.acls, err = n.acls.Set(version, perms)
	if err == nil {
		n.explicitAccessLists = true
	}
	return err
}

func (ms *mountContext) GetPermissions(ctx *context.T, call rpc.ServerCall) (access.Permissions, string, error) {
	vlog.VI(2).Infof("GetPermissions %q", ms.name)
	mt := ms.mt

	// Find node in namespace and add the mount.
	n, err := mt.findNode(ctx, call.Security(), ms.elems, false, getTags)
	if err != nil {
		return nil, "", err
	}
	if n == nil {
		return nil, "", verror.New(naming.ErrNoSuchName, ctx, ms.name)
	}
	n.parent.Unlock()
	defer n.Unlock()
	version, tam := n.acls.Get()
	return tam, version, nil
}
