services/mounttable/btmtd: Add node&server limits

Add limits on the number of nodes each user can create and on the number
of servers they can mount.

Change-Id: Iedd242beac5d58eeb498892e73bf5724387130e3
diff --git a/services/mounttable/btmtd/doc.go b/services/mounttable/btmtd/doc.go
index a746e59..47eb159 100644
--- a/services/mounttable/btmtd/doc.go
+++ b/services/mounttable/btmtd/doc.go
@@ -25,6 +25,10 @@
    If true, use an in-memory bigtable server (for testing only)
  -key-file=
    The file that contains the Google Cloud JSON credentials to use
+ -max-nodes-per-user=10000
+   The maximum number of nodes that a single user can create.
+ -max-servers-per-user=10000
+   The maximum number of servers that a single user can mount.
  -name=
    If provided, causes the mount table to mount itself under this name.
  -permissions-file=
diff --git a/services/mounttable/btmtd/internal/bt.go b/services/mounttable/btmtd/internal/bt.go
index 86cb9ce..3769523 100644
--- a/services/mounttable/btmtd/internal/bt.go
+++ b/services/mounttable/btmtd/internal/bt.go
@@ -162,7 +162,7 @@
 	}
 	perms := make(access.Permissions)
 	perms.Add(security.AllPrincipals, string(v23mt.Admin))
-	return b.createRow(ctx, "", perms, "", b.now())
+	return b.createRow(ctx, "", perms, "", b.now(), 0)
 }
 
 func (b *BigTable) timeFloor(t bigtable.Timestamp) bigtable.Timestamp {
@@ -342,7 +342,7 @@
 	return row, err
 }
 
-func (b *BigTable) createRow(ctx *context.T, name string, perms access.Permissions, creator string, ts bigtable.Timestamp) error {
+func (b *BigTable) createRow(ctx *context.T, name string, perms access.Permissions, creator string, ts bigtable.Timestamp, limit int64) error {
 	jsonPerms, err := json.Marshal(perms)
 	if err != nil {
 		return err
@@ -350,6 +350,15 @@
 	if creator == "" {
 		creator = conventions.ServerUser
 	}
+	delta := int64(1)
+	if err := incrementCreatorNodeCount(ctx, b, creator, delta, limit); err != nil {
+		return err
+	}
+	defer func() {
+		if err := incrementCreatorNodeCount(ctx, b, creator, -delta, 0); err != nil {
+			ctx.Errorf("incrementCreatorNodeCount failed: %v", err)
+		}
+	}()
 	mut := bigtable.NewMutation()
 	mut.Set(metadataFamily, creatorColumn, ts, []byte(creator))
 	mut.Set(metadataFamily, permissionsColumn, bigtable.ServerTime, jsonPerms)
@@ -364,5 +373,6 @@
 	if exists {
 		return verror.New(errConcurrentAccess, ctx, name)
 	}
-	return incrementCreatorNodeCount(ctx, b, creator, 1)
+	delta = 0
+	return nil
 }
diff --git a/services/mounttable/btmtd/internal/concurrency_test.go b/services/mounttable/btmtd/internal/concurrency_test.go
index fc0a819..7e41a07 100644
--- a/services/mounttable/btmtd/internal/concurrency_test.go
+++ b/services/mounttable/btmtd/internal/concurrency_test.go
@@ -19,7 +19,7 @@
 	rootCtx, _, _, shutdown := initTest()
 	defer shutdown()
 
-	stop, mtAddr, bt, _ := newMT(t, "", rootCtx)
+	stop, mtAddr, bt, _ := newMT(t, "", rootCtx, nil)
 	defer stop()
 
 	var wg sync.WaitGroup
@@ -59,7 +59,7 @@
 	rootCtx, _, _, shutdown := initTest()
 	defer shutdown()
 
-	stop, mtAddr, bt, _ := newMT(t, "", rootCtx)
+	stop, mtAddr, bt, _ := newMT(t, "", rootCtx, nil)
 	defer stop()
 
 	var wg sync.WaitGroup
@@ -102,7 +102,7 @@
 	rootCtx, _, _, shutdown := initTest()
 	defer shutdown()
 
-	stop, mtAddr, bt, clock := newMT(t, "", rootCtx)
+	stop, mtAddr, bt, clock := newMT(t, "", rootCtx, nil)
 	defer stop()
 
 	const N = 100
diff --git a/services/mounttable/btmtd/internal/counters.go b/services/mounttable/btmtd/internal/counters.go
index 022e2bb..df6ed6d 100644
--- a/services/mounttable/btmtd/internal/counters.go
+++ b/services/mounttable/btmtd/internal/counters.go
@@ -14,6 +14,11 @@
 	"v.io/v23/verror"
 )
 
+var (
+	errNodeLimitExceeded   = verror.Register(pkgPath+".errNodeLimitExceeded", verror.NoRetry, "{1:}{2:} node limit per user ({3}) exceeded{:_}")
+	errServerLimitExceeded = verror.Register(pkgPath+".errServerLimitExceeded", verror.NoRetry, "{1:}{2:} mounted server limit per user ({3}) exceeded{:_}")
+)
+
 func incrementCounter(ctx *context.T, bt *BigTable, name string, delta int64) (int64, error) {
 	bctx, cancel := btctx(ctx)
 	defer cancel()
@@ -36,18 +41,27 @@
 	return
 }
 
-func incrementCreatorNodeCount(ctx *context.T, bt *BigTable, creator string, delta int64) error {
+func incrementWithLimit(ctx *context.T, bt *BigTable, name string, delta, limit int64, overLimitError verror.IDAction) error {
 	if delta == 0 {
 		return nil
 	}
-	_, err := incrementCounter(ctx, bt, "num-nodes-per-user:"+creator, delta)
-	return err
+	c, err := incrementCounter(ctx, bt, name, delta)
+	if err != nil {
+		return err
+	}
+	if delta > 0 && limit > 0 && c > limit {
+		if _, err := incrementCounter(ctx, bt, name, -delta); err != nil {
+			ctx.Errorf("incrementCounter(%q, %v) failed: %v", name, -delta, err)
+		}
+		return verror.New(overLimitError, ctx, limit)
+	}
+	return nil
 }
 
-func incrementCreatorServerCount(ctx *context.T, bt *BigTable, creator string, delta int64) error {
-	if delta == 0 {
-		return nil
-	}
-	_, err := incrementCounter(ctx, bt, "num-servers-per-user:"+creator, delta)
-	return err
+func incrementCreatorNodeCount(ctx *context.T, bt *BigTable, creator string, delta, limit int64) error {
+	return incrementWithLimit(ctx, bt, "num-nodes-per-user:"+creator, delta, limit, errNodeLimitExceeded)
+}
+
+func incrementCreatorServerCount(ctx *context.T, bt *BigTable, creator string, delta, limit int64) error {
+	return incrementWithLimit(ctx, bt, "num-servers-per-user:"+creator, delta, limit, errServerLimitExceeded)
 }
diff --git a/services/mounttable/btmtd/internal/dispatcher.go b/services/mounttable/btmtd/internal/dispatcher.go
index aebf694..7521e6b 100644
--- a/services/mounttable/btmtd/internal/dispatcher.go
+++ b/services/mounttable/btmtd/internal/dispatcher.go
@@ -16,6 +16,12 @@
 	"v.io/x/ref/lib/timekeeper"
 )
 
+type Config struct {
+	GlobalAcl         access.AccessList
+	MaxNodesPerUser   int64
+	MaxServersPerUser int64
+}
+
 var clock = timekeeper.RealTime()
 
 // For testing only.
@@ -23,13 +29,13 @@
 	clock = c
 }
 
-func NewDispatcher(bt *BigTable, globalAcl *access.AccessList) rpc.Dispatcher {
-	return &dispatcher{bt, globalAcl}
+func NewDispatcher(bt *BigTable, config *Config) rpc.Dispatcher {
+	return &dispatcher{bt, config}
 }
 
 type dispatcher struct {
-	bt        *BigTable
-	globalAcl *access.AccessList
+	bt     *BigTable
+	config *Config
 }
 
 func (d *dispatcher) Lookup(ctx *context.T, suffix string) (interface{}, security.Authorizer, error) {
@@ -37,8 +43,8 @@
 		suffix = path.Clean(suffix)
 	}
 	return v23mt.MountTableServer(&mounttable{
-		suffix:    suffix,
-		globalAcl: d.globalAcl,
-		bt:        d.bt,
+		suffix: suffix,
+		config: d.config,
+		bt:     d.bt,
 	}), security.AllowEveryone(), nil
 }
diff --git a/services/mounttable/btmtd/internal/mounttable.go b/services/mounttable/btmtd/internal/mounttable.go
index 46f186f..e7c1406 100644
--- a/services/mounttable/btmtd/internal/mounttable.go
+++ b/services/mounttable/btmtd/internal/mounttable.go
@@ -55,9 +55,9 @@
 }
 
 type mounttable struct {
-	suffix    string
-	globalAcl *access.AccessList
-	bt        *BigTable
+	suffix string
+	config *Config
+	bt     *BigTable
 
 	rbn    []string
 	rbnInv []security.RejectedBlessing
@@ -95,7 +95,7 @@
 		if err := mt.authorize(ctx, call.Security(), n.permissions, v23mt.Mount, v23mt.Admin); err != nil {
 			return err
 		}
-		return n.mount(ctx, server, clock.Now().Add(time.Duration(ttl)*time.Second), flags)
+		return n.mount(ctx, server, clock.Now().Add(time.Duration(ttl)*time.Second), flags, mt.config.MaxServersPerUser)
 	})
 }
 
@@ -306,10 +306,8 @@
 			return nil
 		}
 	}
-	if mt.globalAcl != nil {
-		if mt.globalAcl.Includes(blessings...) {
-			return nil
-		}
+	if mt.config.GlobalAcl.Includes(blessings...) {
+		return nil
 	}
 	return access.NewErrNoPermissions(ctx, blessings, invalid, string(tags[0]))
 }
@@ -372,7 +370,7 @@
 				}
 			}
 			perms.Normalize()
-			n, err = parent.createChild(ctx, elem, perms, pickCreator(ctx, call))
+			n, err = parent.createChild(ctx, elem, perms, pickCreator(ctx, call), mt.config.MaxNodesPerUser)
 			if err != nil {
 				return nil, "", err
 			}
diff --git a/services/mounttable/btmtd/internal/mounttable_test.go b/services/mounttable/btmtd/internal/mounttable_test.go
index b59784c..3d0a9cb 100644
--- a/services/mounttable/btmtd/internal/mounttable_test.go
+++ b/services/mounttable/btmtd/internal/mounttable_test.go
@@ -172,7 +172,7 @@
 	}
 }
 
-func newMT(t *testing.T, permsFile string, rootCtx *context.T) (func(), string, *internal.BigTable, timekeeper.ManualTime) {
+func newMT(t *testing.T, permsFile string, rootCtx *context.T, config *internal.Config) (func(), string, *internal.BigTable, timekeeper.ManualTime) {
 	reservedDisp := debuglib.NewDispatcher(nil)
 	ctx := v23.WithReservedNameDispatcher(rootCtx, reservedDisp)
 
@@ -197,7 +197,10 @@
 	// Add mount table service.
 	internal.SetClock(clock)
 	internal.SetGcGracePeriod(0)
-	mt := internal.NewDispatcher(bt, nil)
+	if config == nil {
+		config = &internal.Config{}
+	}
+	mt := internal.NewDispatcher(bt, config)
 
 	// Start serving on a loopback address.
 	ctx, cancel := context.WithCancel(ctx)
@@ -235,7 +238,7 @@
 	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
 	defer shutdown()
 
-	stop, mtAddr, _, clock := newMT(t, "testdata/test.perms", rootCtx)
+	stop, mtAddr, _, clock := newMT(t, "testdata/test.perms", rootCtx, nil)
 	defer stop()
 	stop, collectionAddr := newCollection(t, rootCtx)
 	defer stop()
@@ -407,7 +410,7 @@
 	rootCtx, shutdown := test.V23InitWithMounttable()
 	defer shutdown()
 
-	stop, estr, _, _ := newMT(t, "", rootCtx)
+	stop, estr, _, _ := newMT(t, "", rootCtx, nil)
 	defer stop()
 
 	// set up a mount space
@@ -524,7 +527,7 @@
 	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
 	defer shutdown()
 
-	stop, estr, _, _ := newMT(t, "testdata/test.perms", rootCtx)
+	stop, estr, _, _ := newMT(t, "testdata/test.perms", rootCtx, nil)
 	defer stop()
 	fakeServer := naming.JoinAddressName(estr, "quux")
 
@@ -581,7 +584,7 @@
 	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
 	defer shutdown()
 
-	stop, estr, _, _ := newMT(t, "testdata/test.perms", rootCtx)
+	stop, estr, _, _ := newMT(t, "testdata/test.perms", rootCtx, nil)
 	defer stop()
 
 	// set up a mount space
@@ -614,7 +617,7 @@
 	rootCtx, shutdown := test.V23InitWithMounttable()
 	defer shutdown()
 
-	stop, estr, _, _ := newMT(t, "", rootCtx)
+	stop, estr, _, _ := newMT(t, "", rootCtx, nil)
 	defer stop()
 
 	// Set up one mount.
@@ -642,7 +645,7 @@
 	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
 	defer shutdown()
 
-	stop, estr, _, _ := newMT(t, "testdata/test.perms", rootCtx)
+	stop, estr, _, _ := newMT(t, "testdata/test.perms", rootCtx, nil)
 	defer stop()
 
 	// set up a mount space
@@ -675,7 +678,7 @@
 	rootCtx, shutdown := test.V23InitWithMounttable()
 	defer shutdown()
 
-	stop, estr, _, _ := newMT(t, "", rootCtx)
+	stop, estr, _, _ := newMT(t, "", rootCtx, nil)
 	defer stop()
 
 	doMount(t, rootCtx, estr, "endpoint", naming.JoinAddressName(estr, "life/on/the/mississippi"), true)
@@ -689,7 +692,7 @@
 	rootCtx, shutdown := test.V23InitWithMounttable()
 	defer shutdown()
 
-	stop, estr, _, clock := newMT(t, "", rootCtx)
+	stop, estr, _, clock := newMT(t, "", rootCtx, nil)
 	defer stop()
 	stop, collectionAddr := newCollection(t, rootCtx)
 	defer stop()
@@ -719,7 +722,7 @@
 	rootCtx, shutdown := test.V23InitWithMounttable()
 	defer shutdown()
 
-	stop, estr, bt, clock := newMT(t, "", rootCtx)
+	stop, estr, bt, clock := newMT(t, "", rootCtx, nil)
 	defer stop()
 
 	nodeCount := func() int64 {
@@ -837,7 +840,7 @@
 	rootCtx, _, _, shutdown := initTest()
 	defer shutdown()
 
-	stop, estr, _, _ := newMT(t, "testdata/intermediate.perms", rootCtx)
+	stop, estr, _, _ := newMT(t, "testdata/intermediate.perms", rootCtx, nil)
 	defer stop()
 
 	// x and x/y should have the same permissions at the root.
@@ -853,6 +856,64 @@
 	}
 }
 
+func TestLimits(t *testing.T) {
+	ctx, _, _, shutdown := initTest()
+	defer shutdown()
+
+	stop, estr, bt, _ := newMT(t, "", ctx, &internal.Config{MaxNodesPerUser: 10, MaxServersPerUser: 15})
+	defer stop()
+
+	fakeServer := naming.JoinAddressName(estr, "quux")
+
+	checkCounters := func(n, s int) {
+		counters, err := bt.Counters(ctx)
+		if err != nil {
+			t.Fatalf("bt.Counters failed: %v", err)
+		}
+		numNodes, numServers := counters["num-nodes-per-user:root"], counters["num-servers-per-user:root"]
+		if numNodes != int64(n) {
+			t.Errorf("Unexpected number of nodes. Got %d, expected %d", numNodes, n)
+		}
+		if numServers != int64(s) {
+			t.Errorf("Unexpected number of servers. Got %d, expected %d", numServers, s)
+		}
+	}
+
+	// User is allowed to created 10 nodes.
+	for i := 1; i <= 10; i++ {
+		doMount(t, ctx, estr, fmt.Sprintf("%d", i), fakeServer, true)
+		checkCounters(i, i)
+	}
+	// But not 11.
+	doMount(t, ctx, estr, "foo", fakeServer, false)
+	checkCounters(10, 10)
+
+	// User is allowed to mount 15 servers.
+	for i := 1; i <= 5; i++ {
+		doMount(t, ctx, estr, fmt.Sprintf("%d", i), naming.Join(fakeServer, "foo"), true)
+		checkCounters(10, 10+i)
+	}
+	// But not 16.
+	doMount(t, ctx, estr, "6", naming.Join(fakeServer, "foo"), false)
+	checkCounters(10, 15)
+
+	// Freeing one node allows to create one.
+	doUnmount(t, ctx, estr, "10", "", true)
+	checkCounters(9, 14)
+	doMount(t, ctx, estr, "10", fakeServer, true)
+	checkCounters(10, 15)
+
+	// Unmount everything.
+	for i := 1; i <= 10; i++ {
+		doUnmount(t, ctx, estr, fmt.Sprintf("%d", i), "", true)
+		if i <= 5 {
+			checkCounters(10-i, 15-2*i)
+		} else {
+			checkCounters(10-i, 10-i)
+		}
+	}
+}
+
 func initTest() (rootCtx *context.T, aliceCtx *context.T, bobCtx *context.T, shutdown v23.Shutdown) {
 	ctx, shutdown := test.V23InitWithMounttable()
 	var err error
diff --git a/services/mounttable/btmtd/internal/node.go b/services/mounttable/btmtd/internal/node.go
index d428909..ffb96c1 100644
--- a/services/mounttable/btmtd/internal/node.go
+++ b/services/mounttable/btmtd/internal/node.go
@@ -130,7 +130,7 @@
 	return n
 }
 
-func (n *mtNode) createChild(ctx *context.T, child string, perms access.Permissions, creator string) (*mtNode, error) {
+func (n *mtNode) createChild(ctx *context.T, child string, perms access.Permissions, creator string, limit int64) (*mtNode, error) {
 	ts := n.bt.now()
 	mut := bigtable.NewMutation()
 	mut.Set(childrenFamily, child, ts, []byte{1})
@@ -148,7 +148,7 @@
 	childName := naming.Join(n.name, child)
 	longCtx, cancel := longTimeout(ctx)
 	defer cancel()
-	if err := n.bt.createRow(longCtx, childName, perms, creator, ts); err != nil {
+	if err := n.bt.createRow(longCtx, childName, perms, creator, ts, limit); err != nil {
 		return nil, err
 	}
 	n, err := getNode(ctx, n.bt, childName)
@@ -161,7 +161,7 @@
 	return n, nil
 }
 
-func (n *mtNode) mount(ctx *context.T, server string, deadline time.Time, flags naming.MountFlag) error {
+func (n *mtNode) mount(ctx *context.T, server string, deadline time.Time, flags naming.MountFlag, limit int64) error {
 	delta := int64(1)
 	mut := bigtable.NewMutation()
 	for _, s := range n.servers {
@@ -173,6 +173,14 @@
 		delta--
 		mut.DeleteCellsInColumn(serversFamily, s.Server)
 	}
+	if err := incrementCreatorServerCount(ctx, n.bt, n.creator, delta, limit); err != nil {
+		return err
+	}
+	defer func() {
+		if err := incrementCreatorServerCount(ctx, n.bt, n.creator, -delta, 0); err != nil {
+			ctx.Errorf("incrementCreatorServerCount failed: %v", err)
+		}
+	}()
 	f := mtFlags{
 		MT:   flags&naming.MT != 0,
 		Leaf: flags&naming.Leaf != 0,
@@ -185,7 +193,8 @@
 	if err := n.mutate(ctx, mut, false); err != nil {
 		return err
 	}
-	return incrementCreatorServerCount(ctx, n.bt, n.creator, delta)
+	delta = 0
+	return nil
 }
 
 func (n *mtNode) unmount(ctx *context.T, server string) error {
@@ -209,7 +218,7 @@
 	if n, err := getNode(ctx, n.bt, n.name); err == nil {
 		n.gc(ctx)
 	}
-	return incrementCreatorServerCount(ctx, n.bt, n.creator, delta)
+	return incrementCreatorServerCount(ctx, n.bt, n.creator, delta, 0)
 }
 
 func (n *mtNode) gc(ctx *context.T) (deletedSomething bool, err error) {
@@ -223,7 +232,7 @@
 				break
 			}
 			delta := -int64(len(n.expiredServers))
-			if err = incrementCreatorServerCount(ctx, n.bt, n.creator, delta); err != nil {
+			if err = incrementCreatorServerCount(ctx, n.bt, n.creator, delta, 0); err != nil {
 				// TODO(rthellend): Since counters are stored in different rows,
 				// there is no way to update them atomically, e.g. if the server
 				// dies here, or if incrementCreatorServerCount returns an error,
@@ -288,7 +297,7 @@
 			// exist. It could be that it is being created or
 			// deleted concurrently. To be sure, we have to create
 			// it before deleting it.
-			if cn, err = n.createChild(ctx, c, n.permissions, ""); err != nil {
+			if cn, err = n.createChild(ctx, c, n.permissions, "", 0); err != nil {
 				return err
 			}
 		}
@@ -327,10 +336,10 @@
 	// called directly without gc(), we need to include the expired servers
 	// in the counter adjustment.
 	delta := -int64(len(n.servers) + len(n.expiredServers))
-	if err := incrementCreatorServerCount(ctx, n.bt, n.creator, delta); err != nil {
+	if err := incrementCreatorServerCount(ctx, n.bt, n.creator, delta, 0); err != nil {
 		return err
 	}
-	return incrementCreatorNodeCount(ctx, n.bt, n.creator, -1)
+	return incrementCreatorNodeCount(ctx, n.bt, n.creator, -1, 0)
 }
 
 func (n *mtNode) setPermissions(ctx *context.T, perms access.Permissions) error {
@@ -416,7 +425,7 @@
 	for _, node := range sortedNodes {
 		perms := nodes[node]
 		if node == "" {
-			if err := bt.createRow(ctx, "", perms, "", ts); err != nil {
+			if err := bt.createRow(ctx, "", perms, "", ts, 0); err != nil {
 				return err
 			}
 			continue
@@ -432,7 +441,7 @@
 				if err != nil {
 					return err
 				}
-				if n, err = parent.createChild(ctx, e, parent.permissions, ""); err != nil {
+				if n, err = parent.createChild(ctx, e, parent.permissions, "", 0); err != nil {
 					return err
 				}
 			}
diff --git a/services/mounttable/btmtd/internal/node_test.go b/services/mounttable/btmtd/internal/node_test.go
index a3426ca..711d45d 100644
--- a/services/mounttable/btmtd/internal/node_test.go
+++ b/services/mounttable/btmtd/internal/node_test.go
@@ -51,7 +51,7 @@
 			return true
 		},
 		func(i int, n *mtNode) bool {
-			if child, err := n.createChild(ctx, "Y", n.permissions, ""); err != nil {
+			if child, err := n.createChild(ctx, "Y", n.permissions, "", 0); err != nil {
 				t.Logf("[%d] createChild: (%v, %v)", i, child, err)
 				return false
 			}
@@ -73,7 +73,7 @@
 		},
 		func(i int, n *mtNode) bool {
 			const serverAddr = "/example.com:1234"
-			if err := n.mount(ctx, serverAddr, clock.Now().Add(time.Minute), 0); err != nil {
+			if err := n.mount(ctx, serverAddr, clock.Now().Add(time.Minute), 0, 0); err != nil {
 				t.Logf("[%d] mount: %v", i, err)
 				return false
 			}
@@ -118,7 +118,7 @@
 	for i := 1; i <= 100; i++ {
 		// Create X.
 		root, _ = getNode(ctx, bt, "")
-		n, err := root.createChild(ctx, "X", root.permissions, "")
+		n, err := root.createChild(ctx, "X", root.permissions, "", 0)
 		if err != nil || n == nil {
 			t.Fatalf("createChild(X): (%v, %v)", n, err)
 		}
diff --git a/services/mounttable/btmtd/main.go b/services/mounttable/btmtd/main.go
index 750df0b..2b6b88e 100644
--- a/services/mounttable/btmtd/main.go
+++ b/services/mounttable/btmtd/main.go
@@ -61,6 +61,9 @@
 
 	permissionsFileFlag string
 	mountNameFlag       string
+
+	maxNodesPerUserFlag   int
+	maxServersPerUserFlag int
 )
 
 func main() {
@@ -74,6 +77,9 @@
 	cmdRoot.Flags.StringVar(&permissionsFileFlag, "permissions-file", "", "The file that contains the initial node permissions.")
 	cmdRoot.Flags.StringVar(&mountNameFlag, "name", "", "If provided, causes the mount table to mount itself under this name.")
 
+	cmdRoot.Flags.IntVar(&maxNodesPerUserFlag, "max-nodes-per-user", 10000, "The maximum number of nodes that a single user can create.")
+	cmdRoot.Flags.IntVar(&maxServersPerUserFlag, "max-servers-per-user", 10000, "The maximum number of servers that a single user can mount.")
+
 	cmdline.HideGlobalFlagsExcept()
 	cmdline.Main(cmdRoot)
 }
@@ -126,8 +132,12 @@
 	if err != nil {
 		return err
 	}
-	acl := globalPerms["Admin"]
-	disp := internal.NewDispatcher(bt, &acl)
+	cfg := &internal.Config{
+		GlobalAcl:         globalPerms["Admin"],
+		MaxNodesPerUser:   int64(maxNodesPerUserFlag),
+		MaxServersPerUser: int64(maxServersPerUserFlag),
+	}
+	disp := internal.NewDispatcher(bt, cfg)
 	ctx, server, err := v23.WithNewDispatchingServer(ctx, mountNameFlag, disp, options.ServesMountTable(true), options.LameDuckTimeout(30*time.Second))
 	if err != nil {
 		return err