Add basic per-node acl support to mounttabled

Change-Id: I56c9c8a6c29b2c9e9c4761b8606ebf143b3cc2a7
diff --git a/examples/unresolve/test_util.go b/examples/unresolve/test_util.go
index 9ea4cd5..0b6f6c2 100644
--- a/examples/unresolve/test_util.go
+++ b/examples/unresolve/test_util.go
@@ -57,7 +57,11 @@
 }
 
 func createMT(server ipc.Server) string {
-	return createServer(server, "mt", mounttable.NewMountTable())
+	mt, err := mounttable.NewMountTable("")
+	if err != nil {
+		panic(fmt.Sprintf("NewMountTable failed with %v", err))
+	}
+	return createServer(server, "mt", mt)
 }
 
 func childMT(args []string) {
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index 6fc59b5..54f86cf 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -27,7 +27,7 @@
 		}
 	}
 	if id.PrivateID != nil {
-		otherOpts = append(otherOpts, rt.id)
+		otherOpts = append(otherOpts, id)
 	}
 	return iipc.InternalNewClient(sm, mt, otherOpts...)
 }
diff --git a/services/mgmt/node/impl/impl_test.go b/services/mgmt/node/impl/impl_test.go
index c8ebeb8..435051e 100644
--- a/services/mgmt/node/impl/impl_test.go
+++ b/services/mgmt/node/impl/impl_test.go
@@ -176,7 +176,11 @@
 	if err != nil {
 		t.Fatalf("NewServer() failed: %v", err)
 	}
-	suffix, dispatcher := "mt", mtlib.NewMountTable()
+	dispatcher, err := mtlib.NewMountTable("")
+	if err != nil {
+		t.Fatalf("NewMountTable() failed: %v", err)
+	}
+	suffix := "mt"
 	if err := server.Register(suffix, dispatcher); err != nil {
 		t.Fatalf("Register(%v, %v) failed: %v", suffix, dispatcher, err)
 	}
diff --git a/services/mounttable/lib/collectionserver_test.go b/services/mounttable/lib/collectionserver_test.go
index ef39c9e..c393c08 100644
--- a/services/mounttable/lib/collectionserver_test.go
+++ b/services/mounttable/lib/collectionserver_test.go
@@ -31,7 +31,11 @@
 // Lookup implements ipc.Dispatcher.Lookup.
 func (d *collectionDispatcher) Lookup(name string) (ipc.Invoker, security.Authorizer, error) {
 	rpcc := &rpcContext{name: name, collectionServer: d.collectionServer}
-	return ipc.ReflectInvoker(rpcc), nil, nil
+	return ipc.ReflectInvoker(rpcc), d, nil
+}
+
+func (collectionDispatcher) Authorize(security.Context) error {
+	return nil
 }
 
 // Export implements CollectionService.Export.
diff --git a/services/mounttable/lib/mounttable.go b/services/mounttable/lib/mounttable.go
index a1ced6e..9503d7a 100644
--- a/services/mounttable/lib/mounttable.go
+++ b/services/mounttable/lib/mounttable.go
@@ -1,7 +1,10 @@
 package mounttable
 
 import (
+	"encoding/json"
+	"errors"
 	"fmt"
+	"os"
 	"path"
 	"strings"
 	"sync"
@@ -26,6 +29,7 @@
 type mountTable struct {
 	sync.RWMutex
 	root *node
+	acls map[string]security.Authorizer
 }
 
 // mountContext represents a client bind.  The name is the name that was bound to.
@@ -51,18 +55,39 @@
 	children map[string]*node
 }
 
-// dummyAuth allows all RPCs.
-type dummyAuth struct{}
-
-func (dummyAuth) Authorize(security.Context) error {
-	return nil
-}
-
 // NewMountTable creates a new server that uses the default authorization policy.
-func NewMountTable() *mountTable {
+func NewMountTable(aclfile string) (*mountTable, error) {
+	acls, err := parseACLs(aclfile)
+	if err != nil {
+		return nil, err
+	}
 	return &mountTable{
 		root: new(node),
+		acls: acls,
+	}, nil
+}
+
+func parseACLs(path string) (map[string]security.Authorizer, error) {
+	if path == "" {
+		return nil, nil
 	}
+	var acls map[string]security.ACL
+	f, err := os.Open(path)
+	if err != nil {
+		return nil, err
+	}
+	defer f.Close()
+	if err = json.NewDecoder(f).Decode(&acls); err != nil {
+		return nil, err
+	}
+	result := make(map[string]security.Authorizer)
+	for name, acl := range(acls) {
+		result[name] = security.NewACLAuthorizer(acl)
+	}
+	if result["/"] == nil {
+		return nil, fmt.Errorf("No acl for / in %s", path)
+	}
+	return result, nil
 }
 
 // LookupServer implements ipc.Dispatcher.Lookup.
@@ -78,7 +103,7 @@
 		ms.elems = strings.Split(name, "/")
 		ms.cleanedElems = strings.Split(strings.TrimLeft(path.Clean(name), "/"), "/")
 	}
-	return ipc.ReflectInvoker(mounttable.NewServerMountTable(ms)), new(dummyAuth), nil
+	return ipc.ReflectInvoker(mounttable.NewServerMountTable(ms)), ms, nil
 }
 
 // findNode returns the node for the name path represented by elems.  If none exists and create is false, return nil.
@@ -138,6 +163,20 @@
 	return nil, nil
 }
 
+func (mt *mountTable) authorizeStep(name string, c security.Context) error {
+	if mt.acls == nil {
+		return nil
+	}
+	mt.Lock()
+	acl := (*mt.acls)[name]
+	mt.Unlock()
+	vlog.VI(2).Infof("authorizeStep(%s) %s %s %s", name, c.RemoteID(), c.Label(), acl)
+	if acl != nil {
+		return acl.Authorize(c)
+	}
+	return nil
+}
+
 func slashSlashJoin(elems []string) string {
 	if len(elems) == 2 && len(elems[0]) == 0 && len(elems[1]) == 0 {
 		return "//"
@@ -148,6 +187,22 @@
 	return strings.Join(elems, "/")
 }
 
+// Authorize verifies that the client has access to the requested node.
+// Checks the acls on all nodes in the path starting at the root.
+func (ms *mountContext) Authorize(context security.Context) error {
+	key := "/"
+	if err := ms.mt.authorizeStep(key, context); err != nil {
+		return err
+	}
+	for _, step := range(ms.cleanedElems) {
+		key := naming.Join(key, step)
+		if err := ms.mt.authorizeStep(key, context); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 // ResolveStep returns the next server in a resolution, the name remaining below that server,
 // and whether or not that server is another mount table.
 func (ms *mountContext) ResolveStep(context ipc.Context) (servers []mounttable.MountedServer, suffix string, err error) {
@@ -261,9 +316,19 @@
 	name string
 }
 
-func (mt *mountTable) globStep(n *node, name string, pattern *glob.Glob, reply mounttable.GlobableServiceGlobStream) {
+func (mt *mountTable) globStep(n *node, name string, pattern *glob.Glob, context ipc.Context, reply mounttable.GlobableServiceGlobStream) {
 	vlog.VI(2).Infof("globStep(%s, %s)", name, pattern)
 
+	if mt.acls != nil {
+		acl_name := naming.Join("/", context.Suffix(), name)
+		// Skip this node if the user isn't authorized.
+		if acl := (*mt.acls)[acl_name]; acl != nil {
+			if err := acl.Authorize(context); err != nil {
+				return
+			}
+		}
+	}
+
 	// If this is a mount point, we're done.
 	if m := n.mount; m != nil {
 		// Garbage-collect if expired.
@@ -291,7 +356,7 @@
 	// Recurse through the children.
 	for k, c := range n.children {
 		if ok, suffix := pattern.MatchInitialSegment(k); ok {
-			mt.globStep(c, path.Join(name, k), suffix, reply)
+			mt.globStep(c, path.Join(name, k), suffix, context, reply)
 		}
 	}
 }
@@ -321,6 +386,6 @@
 		return nil
 	}
 
-	mt.globStep(n, "", g, reply)
+	mt.globStep(n, "", g, context, reply)
 	return nil
 }
diff --git a/services/mounttable/lib/mounttable_test.go b/services/mounttable/lib/mounttable_test.go
index 9e78b69..a2bcccf 100644
--- a/services/mounttable/lib/mounttable_test.go
+++ b/services/mounttable/lib/mounttable_test.go
@@ -16,15 +16,22 @@
 	"veyron2/ipc"
 	"veyron2/naming"
 	"veyron2/rt"
+	"veyron2/security"
 	"veyron2/services/mounttable"
 	"veyron2/vlog"
 )
 
 // stupidMT is a version of naming.MountTable that we can control.  This exists so that we have some
 // firm ground to stand on vis a vis the stub interface.
-type stupidMT bool
+type stupidMT struct {
+	id ipc.ClientOpt
+}
 
-var quux stupidMT
+var (
+	rootID = veyron2.LocalID(security.FakePrivateID("root"))
+	bobID = veyron2.LocalID(security.FakePrivateID("bob"))
+	aliceID = veyron2.LocalID(security.FakePrivateID("alice"))
+)
 
 const ttlSecs = 60 * 60
 
@@ -35,8 +42,9 @@
 
 // quuxClient returns an ipc.Client that uses the quux mounttable for name
 // resolution.
-func quuxClient() ipc.Client {
-	c, err := rt.R().NewClient(veyron2.MountTable(quux))
+func quuxClient(id ipc.ClientOpt) ipc.Client {
+	mt := stupidMT{id}
+	c, err := rt.R().NewClient(id, veyron2.MountTable(mt))
 	if err != nil {
 		panic(err)
 	}
@@ -52,7 +60,7 @@
 }
 
 // Resolve will only go one level deep, i.e., it doesn't walk past the first mount point.
-func (stupidMT) Resolve(name string) ([]string, error) {
+func (mt stupidMT) Resolve(name string) ([]string, error) {
 	vlog.VI(1).Infof("MyResolve %q", name)
 	address, suffix := naming.SplitAddressName(name)
 	if len(address) == 0 {
@@ -64,7 +72,7 @@
 	}
 
 	// Resolve via another
-	objectPtr, err := mounttable.BindMountTable("/"+address+"//"+suffix, quuxClient())
+	objectPtr, err := mounttable.BindMountTable("/"+address+"//"+suffix, quuxClient(mt.id))
 	if err != nil {
 		return nil, err
 	}
@@ -93,8 +101,8 @@
 	return nil, errors.New("Glob is not implemented in this MountTable")
 }
 
-func doMount(t *testing.T, name, service string, shouldSucceed bool) {
-	mtpt, err := mounttable.BindMountTable(name, quuxClient())
+func doMount(t *testing.T, name, service string, shouldSucceed bool, id ipc.ClientOpt) {
+	mtpt, err := mounttable.BindMountTable(name, quuxClient(id))
 	if err != nil {
 		boom(t, "Failed to BindMountTable: %s", err)
 	}
@@ -107,18 +115,22 @@
 	}
 }
 
-func doUnmount(t *testing.T, name, service string) {
-	mtpt, err := mounttable.BindMountTable(name, quuxClient())
+func doUnmount(t *testing.T, name, service string, shouldSucceed bool, id ipc.ClientOpt) {
+	mtpt, err := mounttable.BindMountTable(name, quuxClient(id))
 	if err != nil {
 		boom(t, "Failed to BindMountTable: %s", err)
 	}
 	if err := mtpt.Unmount(service); err != nil {
-		boom(t, "Failed to Unmount %s onto %s: %s", service, name, err)
+		if (shouldSucceed) {
+			boom(t, "Failed to Unmount %s onto %s: %s", service, name, err)
+		}
+	} else if (!shouldSucceed) {
+		boom(t, "doUnmount %s onto %s, expected failure but succeeded", service, name)
 	}
 }
 
 func create(t *testing.T, name, contents string) {
-	objectPtr, err := BindCollection(name, quuxClient())
+	objectPtr, err := BindCollection(name, quuxClient(rootID))
 	if err != nil {
 		boom(t, "Failed to BindCollection: %s", err)
 	}
@@ -127,8 +139,8 @@
 	}
 }
 
-func checkContents(t *testing.T, name, expected string, shouldSucceed bool) {
-	objectPtr, err := BindCollection(name, quuxClient())
+func checkContents(t *testing.T, name, expected string, shouldSucceed bool, id ipc.ClientOpt) {
+	objectPtr, err := BindCollection(name, quuxClient(id))
 	if err != nil {
 		boom(t, "Failed to BindCollection: %s", err)
 	}
@@ -147,7 +159,7 @@
 	}
 }
 
-func newServer(t *testing.T) (ipc.Server, string) {
+func newServer(acl string, t *testing.T) (ipc.Server, string) {
 	r := rt.Init()
 	server, err := r.NewServer()
 	if err != nil {
@@ -155,7 +167,11 @@
 	}
 
 	// Add mount table service.
-	if err := server.Register("mounttable", NewMountTable()); err != nil {
+	mt, err := NewMountTable(acl)
+	if err != nil {
+		boom(t, "NewMountTable: %v", err)
+	}
+	if err := server.Register("mounttable", mt); err != nil {
 		boom(t, "Failed to register mount table: %s", err)
 	}
 
@@ -170,7 +186,7 @@
 }
 
 func TestMountTable(t *testing.T) {
-	server, estr := newServer(t)
+	server, estr := newServer("testdata/test.acl", t)
 	defer server.Stop()
 	// Add a collection service.  This is just a service we can mount
 	// and test against.
@@ -180,47 +196,57 @@
 	}
 
 	// Mount the collection server into the mount table.
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/stuff"), naming.JoinAddressName(estr, "collection"), true)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/stuff"), naming.JoinAddressName(estr, "collection"), true, rootID)
 
 	// Create a few objects and make sure we can read them.
 	create(t, naming.JoinAddressName(estr, "mounttable/stuff/the/rain"), "the rain")
 	create(t, naming.JoinAddressName(estr, "mounttable/stuff/in/spain"), "in spain")
 	create(t, naming.JoinAddressName(estr, "mounttable/stuff/falls"), "falls mainly on the plain")
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/the/rain"), "the rain", true)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/in/spain"), "in spain", true)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/falls"), "falls mainly on the plain", true)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable//stuff/falls"), "falls mainly on the plain", false)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/nonexistant"), "falls mainly on the plain", false)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/the/rain"), "the rain", true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/in/spain"), "in spain", true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/falls"), "falls mainly on the plain", true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable//stuff/falls"), "falls mainly on the plain", false, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/nonexistant"), "falls mainly on the plain", false, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/the/rain"), "the rain", true, bobID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/the/rain"), "the rain", false, aliceID)
 
 	// Test multiple mounts.
-	doMount(t, naming.JoinAddressName(estr, "//mounttable//a/b"), naming.JoinAddressName(estr, "collection"), true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/x/y"), naming.JoinAddressName(estr, "collection"), true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/alpha//beta"), naming.JoinAddressName(estr, "collection"), true)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/falls"), "falls mainly on the plain", true)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/a/b/falls"), "falls mainly on the plain", true)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/x/y/falls"), "falls mainly on the plain", true)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/alpha/beta/falls"), "falls mainly on the plain", true)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable//a/b"), naming.JoinAddressName(estr, "collection"), true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/x/y"), naming.JoinAddressName(estr, "collection"), true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/alpha//beta"), naming.JoinAddressName(estr, "collection"), true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuff/falls"), "falls mainly on the plain", true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/a/b/falls"), "falls mainly on the plain", true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/x/y/falls"), "falls mainly on the plain", true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/alpha/beta/falls"), "falls mainly on the plain", true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/a/b/falls"), "falls mainly on the plain", true, aliceID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/a/b/falls"), "falls mainly on the plain", false, bobID)
 
 	// Test generic unmount.
-	doUnmount(t, naming.JoinAddressName(estr, "//mounttable/a/b"), "")
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/a/b/falls"), "falls mainly on the plain", false)
+	doUnmount(t, naming.JoinAddressName(estr, "//mounttable/a/b"), "", true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/a/b/falls"), "falls mainly on the plain", false, rootID)
 
 	// Test specific unmount.
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/a/b"), naming.JoinAddressName(estr, "collection"), true)
-	doUnmount(t, naming.JoinAddressName(estr, "//mounttable/a/b"), naming.JoinAddressName(estr, "collection"))
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/a/b/falls"), "falls mainly on the plain", false)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/a/b"), naming.JoinAddressName(estr, "collection"), true, rootID)
+	doUnmount(t, naming.JoinAddressName(estr, "//mounttable/a/b"), naming.JoinAddressName(estr, "collection"), true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/a/b/falls"), "falls mainly on the plain", false, rootID)
 
 	// Try timing out a mount.
 	ft := NewFakeTimeClock()
 	setServerListClock(ft)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/stuffWithTTL"), naming.JoinAddressName(estr, "collection"), true)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuffWithTTL/the/rain"), "the rain", true)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/stuffWithTTL"), naming.JoinAddressName(estr, "collection"), true, rootID)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuffWithTTL/the/rain"), "the rain", true, rootID)
 	ft.advance(time.Duration(ttlSecs+4) * time.Second)
-	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuffWithTTL/the/rain"), "the rain", false)
+	checkContents(t, naming.JoinAddressName(estr, "mounttable/stuffWithTTL/the/rain"), "the rain", false, rootID)
+
+	// test unauthorized mount
+	doMount(t, naming.JoinAddressName(estr, "//mounttable//a/b"), naming.JoinAddressName(estr, "collection"), false, bobID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable//a/b"), naming.JoinAddressName(estr, "collection"), false, aliceID)
+
+	doUnmount(t, naming.JoinAddressName(estr, "//mounttable/x/y"), naming.JoinAddressName(estr, "collection"), false, bobID)
 }
 
-func doGlob(t *testing.T, name, pattern string) []string {
-	mtpt, err := mounttable.BindMountTable(name, quuxClient())
+func doGlob(t *testing.T, name, pattern string, id ipc.ClientOpt) []string {
+	mtpt, err := mounttable.BindMountTable(name, quuxClient(id))
 	if err != nil {
 		boom(t, "Failed to BindMountTable: %s", err)
 	}
@@ -235,7 +261,7 @@
 			break
 		}
 		if err != nil {
-			t.Fatalf("Glob %s: %s", name, err)
+			boom(t, "Glob %s: %s", name, err)
 		}
 		reply = append(reply, e.Name)
 	}
@@ -256,14 +282,14 @@
 }
 
 func TestGlob(t *testing.T) {
-	server, estr := newServer(t)
+	server, estr := newServer("", t)
 	defer server.Stop()
 
 	// set up a mount space
 	fakeServer := naming.JoinAddressName(estr, "quux")
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/one/bright/day"), fakeServer, true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/in/the/middle"), fakeServer, true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/of/the/night"), fakeServer, true)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/one/bright/day"), fakeServer, true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/in/the/middle"), fakeServer, true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/of/the/night"), fakeServer, true, rootID)
 
 	// Try various globs.
 	tests := []struct {
@@ -278,42 +304,85 @@
 		{"", []string{""}},
 	}
 	for _, test := range tests {
-		out := doGlob(t, naming.JoinAddressName(estr, "//mounttable"), test.in)
+		out := doGlob(t, naming.JoinAddressName(estr, "//mounttable"), test.in, rootID)
+		checkMatch(t, test.expected, out)
+	}
+}
+
+func TestGlobACLs(t *testing.T) {
+	server, estr := newServer("testdata/test.acl", t)
+	defer server.Stop()
+
+	// set up a mount space
+	fakeServer := naming.JoinAddressName(estr, "quux")
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/one/bright/day"), fakeServer, true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/a/b/c"), fakeServer, true, rootID)
+
+	// Try various globs.
+	tests := []struct {
+		id       ipc.ClientOpt
+		in       string
+		expected []string
+	}{
+		{rootID, "*", []string{"one", "a"}},
+		{aliceID, "*", []string{"one", "a"}},
+		{bobID, "*", []string{"one"}},
+		{rootID, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c"}},
+		{aliceID, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c"}},
+		{bobID, "*/...", []string{"one", "one/bright", "one/bright/day"}},
+	}
+	for _, test := range tests {
+		out := doGlob(t, naming.JoinAddressName(estr, "//mounttable"), test.in, test.id)
 		checkMatch(t, test.expected, out)
 	}
 }
 
 func TestServerFormat(t *testing.T) {
-	server, estr := newServer(t)
+	server, estr := newServer("", t)
 	defer server.Stop()
 
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/endpoint"), naming.JoinAddressName(estr, "life/on/the/mississippi"), true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/hostport"), "/atrampabroad:8000", true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/hostport-endpoint-platypus"), "/@atrampabroad:8000@@", true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/invalid/not/rooted"), "atrampabroad:8000", false)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/invalid/no/port"), "/atrampabroad", false)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/invalid/endpoint"), "/@following the equator:8000@@@", false)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/endpoint"), naming.JoinAddressName(estr, "life/on/the/mississippi"), true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/hostport"), "/atrampabroad:8000", true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/hostport-endpoint-platypus"), "/@atrampabroad:8000@@", true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/invalid/not/rooted"), "atrampabroad:8000", false, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/invalid/no/port"), "/atrampabroad", false, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/invalid/endpoint"), "/@following the equator:8000@@@", false, rootID)
 }
 
 func TestExpiry(t *testing.T) {
-	server, estr := newServer(t)
+	server, estr := newServer("", t)
 	defer server.Stop()
 
 	ft := NewFakeTimeClock()
 	setServerListClock(ft)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/a1/b1"), naming.JoinAddressName(estr, "collection"), true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/a1/b2"), naming.JoinAddressName(estr, "collection"), true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/a2/b1"), naming.JoinAddressName(estr, "collection"), true)
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/a2/b2/c"), naming.JoinAddressName(estr, "collection"), true)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/a1/b1"), naming.JoinAddressName(estr, "collection"), true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/a1/b2"), naming.JoinAddressName(estr, "collection"), true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/a2/b1"), naming.JoinAddressName(estr, "collection"), true, rootID)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/a2/b2/c"), naming.JoinAddressName(estr, "collection"), true, rootID)
 
-	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable"), "*/b1/..."))
+	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable"), "*/b1/...", rootID))
 	ft.advance(time.Duration(ttlSecs/2) * time.Second)
-	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable"), "*/b1/..."))
-	checkMatch(t, []string{"c"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable/a2/b2"), "*"))
+	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable"), "*/b1/...", rootID))
+	checkMatch(t, []string{"c"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable/a2/b2"), "*", rootID))
 	// Refresh only a1/b1.  All the other mounts will expire upon the next
 	// ft advance.
-	doMount(t, naming.JoinAddressName(estr, "//mounttable/a1/b1"), naming.JoinAddressName(estr, "collection"), true)
+	doMount(t, naming.JoinAddressName(estr, "//mounttable/a1/b1"), naming.JoinAddressName(estr, "collection"), true, rootID)
 	ft.advance(time.Duration(ttlSecs/2+4) * time.Second)
-	checkMatch(t, []string{"a1"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable"), "*"))
-	checkMatch(t, []string{"a1/b1"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable"), "*/b1/..."))
+	checkMatch(t, []string{"a1"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable"), "*", rootID))
+	checkMatch(t, []string{"a1/b1"}, doGlob(t, naming.JoinAddressName(estr, "//mounttable"), "*/b1/...", rootID))
+}
+
+func TestBadACLs(t *testing.T) {
+	_, err := NewMountTable("testdata/invalid.acl")
+	if err == nil {
+		boom(t, "Expected json parse error in acl file")
+	}
+	_, err = NewMountTable("testdata/doesntexist.acl")
+	if err == nil {
+		boom(t, "Expected error from missing acl file")
+	}
+	_, err = NewMountTable("testdata/noroot.acl")
+	if err == nil {
+		boom(t, "Expected error for missing '/' acl")
+	}
 }
diff --git a/services/mounttable/lib/neighborhood_test.go b/services/mounttable/lib/neighborhood_test.go
index b4c6901..732fe3b 100644
--- a/services/mounttable/lib/neighborhood_test.go
+++ b/services/mounttable/lib/neighborhood_test.go
@@ -8,6 +8,7 @@
 
 	_ "veyron/lib/testutil"
 
+	"veyron2"
 	"veyron2/naming"
 	"veyron2/rt"
 	"veyron2/services/mounttable"
@@ -24,6 +25,7 @@
 
 func TestNeighborhood(t *testing.T) {
 	r := rt.Init()
+	id := veyron2.LocalID(rt.R().Identity())
 	vlog.Infof("TestNeighborhood")
 	server, err := r.NewServer()
 	if err != nil {
@@ -51,7 +53,7 @@
 	// Wait for the mounttable to appear in mdns
 L:
 	for tries := 1; tries < 2; tries++ {
-		names := doGlob(t, naming.JoinAddressName(estr, "//"+nhPrefix), "*")
+		names := doGlob(t, naming.JoinAddressName(estr, "//"+nhPrefix), "*", id)
 		t.Logf("names %v", names)
 		for _, n := range names {
 			if n == "joeblow" {
@@ -61,18 +63,18 @@
 		time.Sleep(1 * time.Second)
 	}
 
-	want, got := []string{"joeblow"}, doGlob(t, naming.JoinAddressName(estr, "//neighborhood"), "*")
+	want, got := []string{"joeblow"}, doGlob(t, naming.JoinAddressName(estr, "//neighborhood"), "*", id)
 	if !reflect.DeepEqual(want, got) {
 		t.Errorf("Unexpected Glob result want: %q, got: %q", want, got)
 	}
-	want, got = []string{""}, doGlob(t, naming.JoinAddressName(estr, "//neighborhood/joeblow"), "")
+	want, got = []string{""}, doGlob(t, naming.JoinAddressName(estr, "//neighborhood/joeblow"), "", id)
 	if !reflect.DeepEqual(want, got) {
 		t.Errorf("Unexpected Glob result want: %q, got: %q", want, got)
 	}
 
 	// Make sure we can resolve through the neighborhood.
 	expectedSuffix := "a/b"
-	objectPtr, err := mounttable.BindMountTable(naming.JoinAddressName(estr, "//neighborhood/joeblow"+"/"+expectedSuffix), quuxClient())
+	objectPtr, err := mounttable.BindMountTable(naming.JoinAddressName(estr, "//neighborhood/joeblow"+"/"+expectedSuffix), quuxClient(id))
 	if err != nil {
 		boom(t, "BindMountTable: %s", err)
 	}
diff --git a/services/mounttable/lib/testdata/invalid.acl b/services/mounttable/lib/testdata/invalid.acl
new file mode 100644
index 0000000..9d68933
--- /dev/null
+++ b/services/mounttable/lib/testdata/invalid.acl
@@ -0,0 +1 @@
+"
\ No newline at end of file
diff --git a/services/mounttable/lib/testdata/noRoot.acl b/services/mounttable/lib/testdata/noRoot.acl
new file mode 100644
index 0000000..0ff2383
--- /dev/null
+++ b/services/mounttable/lib/testdata/noRoot.acl
@@ -0,0 +1,3 @@
+{
+"/foo/bar": {"fake/root": "RW"},
+}
\ No newline at end of file
diff --git a/services/mounttable/lib/testdata/test.acl b/services/mounttable/lib/testdata/test.acl
new file mode 100644
index 0000000..1a3c98c
--- /dev/null
+++ b/services/mounttable/lib/testdata/test.acl
@@ -0,0 +1,5 @@
+{
+"/": {"fake/root": "RW", "*": "R"},
+"/stuff": {"fake/root": "RW", "fake/bob": "R"},
+"/a": {"fake/root": "RW", "fake/alice": "R"}
+}
\ No newline at end of file
diff --git a/services/mounttable/mounttabled/mounttable.go b/services/mounttable/mounttabled/mounttable.go
index d1ee01e..c5f8527 100644
--- a/services/mounttable/mounttabled/mounttable.go
+++ b/services/mounttable/mounttabled/mounttable.go
@@ -20,6 +20,7 @@
 	// TODO(rthellend): Remove the address flag when the config manager is working.
 	address   = flag.String("address", ":0", "Address to listen on.  Default is to use a randomly assigned port")
 	prefix    = flag.String("prefix", "mt", "The prefix to register the server at.")
+	aclFile = flag.String("acls", "", "ACL file. Default is to allow all access.")
 )
 
 const usage = `%s is a simple mount table daemon.
@@ -51,7 +52,12 @@
 		return
 	}
 	defer server.Stop()
-	if err := server.Register(*prefix, mounttable.NewMountTable()); err != nil {
+	mt, err := mounttable.NewMountTable(*aclFile)
+	if err != nil {
+		vlog.Errorf("r.NewMountTable failed: %v", err)
+		return
+	}
+	if err := server.Register(*prefix, mt); err != nil {
 		vlog.Errorf("server.Register failed to register mount table: %v", err)
 		return
 	}