Merge "mounttablelib: Limit users to 1000 nodes apiece.  A user is identified by the patterns over blessings used in the templates that restrict some directories to only contain nodes matching user names."
diff --git a/services/mounttable/mounttablelib/mounttable.go b/services/mounttable/mounttablelib/mounttable.go
index 3b87098..4fb98d6 100644
--- a/services/mounttable/mounttablelib/mounttable.go
+++ b/services/mounttable/mounttablelib/mounttable.go
@@ -28,6 +28,7 @@
 )
 
 const pkgPath = "v.io/x/ref/services/mounttable/mounttablelib"
+const defaultMaxNodesPerUser = 1000
 
 var (
 	errMalformedAddress = verror.Register(pkgPath+".errMalformedAddress", verror.NoRetry, "{1:}{2:} malformed address {3} for mounted server {4}{:_}")
@@ -36,6 +37,8 @@
 	errCantDeleteRoot   = verror.Register(pkgPath+".errCantDeleteRoot", verror.NoRetry, "{1:}{2:} cannot delete root node{:_}")
 	errNotEmpty         = verror.Register(pkgPath+".errNotEmpty", verror.NoRetry, "{1:}{2:} cannot delete {3}: has children{:_}")
 	errNamingLoop       = verror.Register(pkgPath+".errNamingLoop", verror.NoRetry, "{1:}{2:} Loop in namespace{:_}")
+	errTooManyNodes     = verror.Register(pkgPath+".errTooManyNodes", verror.NoRetry, "{1:}{2:} User has exceeded his node limit {:_}")
+	errNoSharedRoot     = verror.Register(pkgPath+".errNoSharedRoot", verror.NoRetry, "{1:}{2:} Server and User share no blessing root {:_}")
 )
 
 var (
@@ -51,19 +54,23 @@
 )
 
 type persistence interface {
-	persistPerms(name string, perm *VersionedPermissions) error
+	persistPerms(name, creator string, perm *VersionedPermissions) error
 	persistDelete(name string) error
 	close()
 }
 
 // mountTable represents a namespace.  One exists per server instance.
 type mountTable struct {
-	root          *node
-	superUsers    access.AccessList
-	persisting    bool
-	persist       persistence
-	nodeCounter   *stats.Integer
-	serverCounter *stats.Integer
+	sync.Mutex
+	root               *node
+	superUsers         access.AccessList
+	persisting         bool
+	persist            persistence
+	nodeCounter        *stats.Integer
+	serverCounter      *stats.Integer
+	perUserNodeCounter *stats.Map
+	maxNodesPerUser    int64
+	userPrefixes       []string
 }
 
 var _ rpc.Dispatcher = (*mountTable)(nil)
@@ -93,6 +100,7 @@
 	vPerms              *VersionedPermissions
 	permsTemplate       access.Permissions
 	explicitPermissions bool
+	creator             string
 }
 
 const templateVar = "%%"
@@ -109,9 +117,11 @@
 // statsPrefix is the prefix for for exported statistics objects.
 func NewMountTableDispatcher(permsFile, persistDir, statsPrefix string) (rpc.Dispatcher, error) {
 	mt := &mountTable{
-		root:          new(node),
-		nodeCounter:   stats.NewInteger(naming.Join(statsPrefix, "num-nodes")),
-		serverCounter: stats.NewInteger(naming.Join(statsPrefix, "num-mounted-servers")),
+		root:               new(node),
+		nodeCounter:        stats.NewInteger(naming.Join(statsPrefix, "num-nodes")),
+		serverCounter:      stats.NewInteger(naming.Join(statsPrefix, "num-mounted-servers")),
+		perUserNodeCounter: stats.NewMap(naming.Join(statsPrefix, "num-nodes-per-user")),
+		maxNodesPerUser:    defaultMaxNodesPerUser,
 	}
 	mt.root.parent = mt.newNode() // just for its lock
 	if persistDir != "" {
@@ -140,6 +150,7 @@
 	if n == nil {
 		return
 	}
+	mt.credit(n)
 	nodeCount := int64(0)
 	serverCount := int64(0)
 	queue := []*node{n}
@@ -151,6 +162,7 @@
 		for _, ch := range n.children {
 			ch.Lock() // Keep locked until it is deleted.
 			queue = append(queue, ch)
+			mt.credit(ch)
 		}
 	}
 
@@ -315,8 +327,14 @@
 				return nil, nil, err
 			}
 		}
+		// Obey account limits.
+		creator, err := mt.debit(ctx, call)
+		if err != nil {
+			return nil, nil, err
+		}
 		// At this point cur is still locked, OK to use and change it.
 		next := mt.newNode()
+		next.creator = creator
 		next.parent = cur
 		if cur.permsTemplate != nil {
 			next.vPerms = createVersionedPermissionsFromTemplate(cur.permsTemplate, e)
@@ -805,7 +823,7 @@
 	n.vPerms, err = n.vPerms.Set(ctx, version, perms)
 	if err == nil {
 		if mt.persisting {
-			mt.persist.persistPerms(ms.name, n.vPerms)
+			mt.persist.persistPerms(ms.name, n.creator, n.vPerms)
 		}
 		n.explicitPermissions = true
 	}
@@ -829,3 +847,23 @@
 	version, perms := n.vPerms.Get()
 	return perms, version, nil
 }
+
+// credit user for node deletion.
+func (mt *mountTable) credit(n *node) {
+	mt.perUserNodeCounter.Incr(n.creator, -1)
+}
+
+// debit user for node creation.
+func (mt *mountTable) debit(ctx *context.T, call security.Call) (string, error) {
+	creator := mt.pickCreator(ctx, call)
+	count, ok := mt.perUserNodeCounter.Incr(creator, 1).(int64)
+	if !ok {
+		return "", verror.New(errTooManyNodes, ctx)
+	}
+	// If we have no prefixes defining users, don't bother with checking per user limits.
+	if len(mt.userPrefixes) != 0 && count > mt.maxNodesPerUser {
+		mt.perUserNodeCounter.Incr(creator, -1)
+		return "", verror.New(errTooManyNodes, ctx)
+	}
+	return creator, nil
+}
diff --git a/services/mounttable/mounttablelib/mounttable_test.go b/services/mounttable/mounttablelib/mounttable_test.go
index 5f0da6e..6d27b94 100644
--- a/services/mounttable/mounttablelib/mounttable_test.go
+++ b/services/mounttable/mounttablelib/mounttable_test.go
@@ -5,6 +5,7 @@
 package mounttablelib
 
 import (
+	"encoding/json"
 	"errors"
 	"fmt"
 	"io"
@@ -25,6 +26,7 @@
 	"v.io/v23/vdl"
 	"v.io/x/lib/vlog"
 
+	libstats "v.io/x/ref/lib/stats"
 	"v.io/x/ref/services/debug/debuglib"
 	"v.io/x/ref/test"
 	"v.io/x/ref/test/testutil"
@@ -177,7 +179,7 @@
 	}
 }
 
-func newMT(t *testing.T, permsFile, persistDir string, rootCtx *context.T) (rpc.Server, string) {
+func newMT(t *testing.T, permsFile, persistDir, statsDir string, rootCtx *context.T) (rpc.Server, string) {
 	reservedDisp := debuglib.NewDispatcher(vlog.Log.LogDir, nil)
 	ctx := v23.WithReservedNameDispatcher(rootCtx, reservedDisp)
 	server, err := v23.NewServer(ctx, options.ServesMountTable(true))
@@ -185,7 +187,7 @@
 		boom(t, "r.NewServer: %s", err)
 	}
 	// Add mount table service.
-	mt, err := NewMountTableDispatcher(permsFile, persistDir, "mounttable")
+	mt, err := NewMountTableDispatcher(permsFile, persistDir, statsDir)
 	if err != nil {
 		boom(t, "NewMountTableDispatcher: %v", err)
 	}
@@ -227,7 +229,7 @@
 	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
 	defer shutdown()
 
-	mt, mtAddr := newMT(t, "testdata/test.perms", "", rootCtx)
+	mt, mtAddr := newMT(t, "testdata/test.perms", "", "testMountTable", rootCtx)
 	defer mt.Stop()
 	collection, collectionAddr := newCollection(t, rootCtx)
 	defer collection.Stop()
@@ -397,7 +399,7 @@
 	rootCtx, shutdown := test.InitForTest()
 	defer shutdown()
 
-	server, estr := newMT(t, "", "", rootCtx)
+	server, estr := newMT(t, "", "", "testGlob", rootCtx)
 	defer server.Stop()
 
 	// set up a mount space
@@ -444,7 +446,7 @@
 	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
 	defer shutdown()
 
-	server, estr := newMT(t, "testdata/test.perms", "", rootCtx)
+	server, estr := newMT(t, "testdata/test.perms", "", "testAccessListTemplate", rootCtx)
 	defer server.Stop()
 	fakeServer := naming.JoinAddressName(estr, "quux")
 
@@ -457,13 +459,67 @@
 	doMount(t, aliceCtx, estr, "users/alice", fakeServer, true)
 	doMount(t, bobCtx, estr, "users/bob", fakeServer, true)
 	doMount(t, rootCtx, estr, "users/root", fakeServer, true)
+
+	// Make sure the counter works.
+	doUnmount(t, aliceCtx, estr, "users/alice", "", true)
+	doUnmount(t, bobCtx, estr, "users/bob", "", true)
+	doUnmount(t, rootCtx, estr, "users/root", "", true)
+	perms := access.Permissions{"Admin": access.AccessList{In: []security.BlessingPattern{security.AllPrincipals}}}
+	doSetPermissions(t, aliceCtx, estr, "users/alice/a/b/c/d", perms, "", true)
+	doSetPermissions(t, aliceCtx, estr, "users/alice/a/b/c/d", perms, "", true)
+
+	// Do we obey limits?
+	for i := 0; i < defaultMaxNodesPerUser-5; i++ {
+		node := fmt.Sprintf("users/alice/a/b/c/d/%d", i)
+		doSetPermissions(t, aliceCtx, estr, node, perms, "", true)
+	}
+	doSetPermissions(t, aliceCtx, estr, "users/alice/a/b/c/d/straw", perms, "", false)
+
+	// See if the stats numbers are correct.
+	testcases := []struct {
+		key      string
+		expected interface{}
+	}{
+		{"alice", int64(defaultMaxNodesPerUser)},
+		{"bob", int64(0)},
+		{"root", int64(0)},
+		{localUser, int64(3)},
+	}
+	for _, tc := range testcases {
+		name := "testAccessListTemplate/num-nodes-per-user/" + tc.key
+		got, err := libstats.Value(name)
+		if err != nil {
+			t.Errorf("unexpected error getting map entry for %s: %s", name, err)
+		}
+		if got != tc.expected {
+			t.Errorf("unexpected getting map entry for %s. Got %v, want %v", name, got, tc.expected)
+		}
+	}
+}
+
+func getUserNodeCounts(t *testing.T) (counts map[string]int32) {
+	s, err := libstats.Value("mounttable/num-nodes-per-user")
+	if err != nil {
+		boom(t, "Can't get mounttable statistics")
+	}
+	// This string is a json encoded map.  Decode.
+	switch v := s.(type) {
+	default:
+		boom(t, "Wrong type for mounttable statistics")
+	case string:
+		err = json.Unmarshal([]byte(v), &counts)
+		if err != nil {
+			boom(t, "Can't unmarshal mounttable statistics")
+		}
+	}
+	return
 }
 
 func TestGlobAccessLists(t *testing.T) {
 	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
 	defer shutdown()
 
-	server, estr := newMT(t, "testdata/test.perms", "", rootCtx)
+	server, estr := newMT(t, "testdata/test.perms", "", "testGlobAccessLists", rootCtx)
 	defer server.Stop()
 
 	// set up a mount space
@@ -496,7 +552,7 @@
 	rootCtx, shutdown := test.InitForTest()
 	defer shutdown()
 
-	server, estr := newMT(t, "", "", rootCtx)
+	server, estr := newMT(t, "", "", "testCleanup", rootCtx)
 	defer server.Stop()
 
 	// Set up one mount.
@@ -524,7 +580,7 @@
 	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
 	defer shutdown()
 
-	server, estr := newMT(t, "testdata/test.perms", "", rootCtx)
+	server, estr := newMT(t, "testdata/test.perms", "", "testDelete", rootCtx)
 	defer server.Stop()
 
 	// set up a mount space
@@ -551,7 +607,7 @@
 	rootCtx, shutdown := test.InitForTest()
 	defer shutdown()
 
-	server, estr := newMT(t, "", "", rootCtx)
+	server, estr := newMT(t, "", "", "testerverFormat", rootCtx)
 	defer server.Stop()
 
 	doMount(t, rootCtx, estr, "endpoint", naming.JoinAddressName(estr, "life/on/the/mississippi"), true)
@@ -565,7 +621,7 @@
 	rootCtx, shutdown := test.InitForTest()
 	defer shutdown()
 
-	server, estr := newMT(t, "", "", rootCtx)
+	server, estr := newMT(t, "", "", "testExpiry", rootCtx)
 	defer server.Stop()
 	collection, collectionAddr := newCollection(t, rootCtx)
 	defer collection.Stop()
@@ -633,7 +689,7 @@
 	ft := NewFakeTimeClock()
 	setServerListClock(ft)
 
-	server, estr := newMT(t, "", "", rootCtx)
+	server, estr := newMT(t, "", "", "mounttable", rootCtx)
 	defer server.Stop()
 
 	// Test flat tree
diff --git a/services/mounttable/mounttablelib/persist_test.go b/services/mounttable/mounttablelib/persist_test.go
index a6b936b..c6b7aca 100644
--- a/services/mounttable/mounttablelib/persist_test.go
+++ b/services/mounttable/mounttablelib/persist_test.go
@@ -25,7 +25,7 @@
 	}
 	defer os.RemoveAll(td)
 	fmt.Printf("temp persist dir %s\n", td)
-	mt, mtAddr := newMT(t, "", td, rootCtx)
+	mt, mtAddr := newMT(t, "", td, "testPersistence", rootCtx)
 
 	perms1 := access.Permissions{
 		"Read":    access.AccessList{In: []security.BlessingPattern{security.AllPrincipals}},
@@ -53,7 +53,7 @@
 	mt.Stop()
 
 	// Restart with the persisted data.
-	mt, mtAddr = newMT(t, "", td, rootCtx)
+	mt, mtAddr = newMT(t, "", td, "testPersistence", rootCtx)
 
 	// Add root as Admin to each of the perms since the mounttable itself will.
 	perms1["Admin"] = access.AccessList{In: []security.BlessingPattern{"root"}}
diff --git a/services/mounttable/mounttablelib/persistentstore.go b/services/mounttable/mounttablelib/persistentstore.go
index 1dfd2f8..0cb614d 100644
--- a/services/mounttable/mounttablelib/persistentstore.go
+++ b/services/mounttable/mounttablelib/persistentstore.go
@@ -26,7 +26,8 @@
 type storeElement struct {
 	N string // Name of affected node
 	V VersionedPermissions
-	D bool // True if the subtree at N has been deleted
+	D bool   // True if the subtree at N has been deleted
+	C string // Creator
 }
 
 // newPersistentStore will read the permissions log from the directory and apply them to the
@@ -132,6 +133,7 @@
 		elems := strings.Split(e.N, "/")
 		n, err := mt.findNode(nil, nil, elems, true, nil)
 		if n != nil || err == nil {
+			n.creator = e.C
 			if e.D {
 				mt.deleteNode(n.parent, elems[len(elems)-1])
 				vlog.VI(2).Infof("deleted %s", e.N)
@@ -152,7 +154,7 @@
 // any duplicate or deleted entries disappear.
 func (s *store) depthFirstPersist(n *node, name string) {
 	if n.explicitPermissions {
-		s.persistPerms(name, n.vPerms)
+		s.persistPerms(name, n.creator, n.vPerms)
 	}
 	for nodeName, c := range n.children {
 		s.depthFirstPersist(c, path.Join(name, nodeName))
@@ -160,10 +162,10 @@
 }
 
 // persistPerms appends a changed permission to the log.
-func (s *store) persistPerms(name string, vPerms *VersionedPermissions) error {
+func (s *store) persistPerms(name, creator string, vPerms *VersionedPermissions) error {
 	s.l.Lock()
 	defer s.l.Unlock()
-	e := storeElement{N: name, V: *vPerms}
+	e := storeElement{N: name, V: *vPerms, C: creator}
 	return s.enc.Encode(&e)
 }
 
diff --git a/services/mounttable/mounttablelib/versionedpermissions.go b/services/mounttable/mounttablelib/versionedpermissions.go
index f68401f..867be02 100644
--- a/services/mounttable/mounttablelib/versionedpermissions.go
+++ b/services/mounttable/mounttablelib/versionedpermissions.go
@@ -8,6 +8,7 @@
 	"encoding/json"
 	"io"
 	"os"
+	"reflect"
 	"strconv"
 	"strings"
 
@@ -19,6 +20,12 @@
 	"v.io/x/lib/vlog"
 )
 
+// Blessings can't include a comma so we use them in made up user ids.  The following distinctions are
+// made so that we can account for them differently.
+const localUser = ",LOCAL,"     // a client that has our public key but no blessing from which we can extract a user name
+const blessedUser = ",BLESSED," // a client with blessings we trust but from which we can't extract a user name
+const unknownUser = ",UNKNOWN," // a client which presents no blessing we trust
+
 // VersionedPermissions associates a Version with a Permissions
 type VersionedPermissions struct {
 	V int32
@@ -45,8 +52,8 @@
 		}
 	}
 	b.P = perm
+	// Increment with possible wrap.
 	b.V++
-	// Protect against wrap.
 	if b.V < 0 {
 		b.V = 0
 	}
@@ -132,6 +139,12 @@
 				vlog.VI(2).Infof("added perms %v to %s", perms, name)
 				if isPattern {
 					n.permsTemplate = perms
+					// Save the pattern prefix as a prefix describing a user.
+					prefix := strings.Join(elems[:len(elems)-1], "/")
+					if prefix != "" {
+						prefix += "/"
+					}
+					mt.userPrefixes = append(mt.userPrefixes, prefix)
 				} else {
 					n.vPerms, _ = n.vPerms.Set(nil, "", perms)
 					n.explicitPermissions = true
@@ -143,3 +156,38 @@
 	}
 	return nil
 }
+
+// pickCreator returns a string matching the blessing of the user performing the creation.  We do this using
+// the user prefixes found when parsing the config.  Eventually we may need a better way to define user
+// prefixes.
+//
+// TODO(p): readdress this argument after we have some experience with real users.
+func (mt *mountTable) pickCreator(ctx *context.T, call security.Call) string {
+	// For each blessing, look for one that has a matching user prefix.  The creator is the perfix
+	// plus one more element.
+	//
+	// The prefixes themselves come from the templates in the config that constrain node names to
+	// match the user.
+	blessings, _ := security.RemoteBlessingNames(ctx, call)
+	for _, b := range blessings {
+		for _, p := range mt.userPrefixes {
+			sb := string(b)
+			if !strings.HasPrefix(sb, p) {
+				continue
+			}
+			suffix := strings.TrimPrefix(sb, p)
+			elems := strings.Split(suffix, "/")
+			return p + elems[0]
+		}
+	}
+	if ctx == nil || call == nil {
+		return localUser
+	}
+	if l, r := call.LocalBlessings().PublicKey(), call.RemoteBlessings().PublicKey(); l != nil && reflect.DeepEqual(l, r) {
+		return localUser
+	}
+	if len(blessings) > 0 {
+		return blessedUser
+	}
+	return unknownUser
+}