services/mounttable/btmtd: Keep track of nodes per user

Keep track of who created each node and how many they've created. This
is only accounting. There is no quota yet.

We use the same logic as mounttabled to identify the creator of a node.

Each node now contains the name of its creator, and we store one counter
per creator in a separate table. The counter is incremented when a node
is created, and decremented when a node is deleted.

Change-Id: I8a0045c393b00cf48aaaec76dbf08ff2d9cd2ccd
diff --git a/services/mounttable/btmtd/README.md b/services/mounttable/btmtd/README.md
index d08ce9a..8de7315 100644
--- a/services/mounttable/btmtd/README.md
+++ b/services/mounttable/btmtd/README.md
@@ -21,7 +21,8 @@
    * Metadata `m`: used to store information about the row:
       * Version `v`: The version changes every time the row is mutated. It is
         used to detect conflicts related to concurrent access.
-      * Timestamp `t`: The cell's timestamp is the node creation time.
+      * Creator `c`: The cell's value is the name of its creator and its
+        timestamp is the node creation time.
       * Sticky `s`: When this column is present, the node is not automatically
         garbage collected.
       * Permissions `p`: The [access.Permissions] of the node.
@@ -33,11 +34,11 @@
 
 Example:
 
-| Key              | Version | Timestamp | Sticky | Permissions  | Mounted Server...           | Child...  |
-| ---              | ---     | ---       | ---    | ---          | ---                         | ---       |
-| 540f1a56/        | 54321   | (ts)      | 1      | {"Admin":... |                             | foo (ts1) |
-| 1234abcd/foo     | 123     | (ts1)     |        | {"Admin":... |                             | bar (ts2) |
-| 46d523e3/foo/bar | 5436    | (ts2)     |        | {"Admin":... | /example.com:123 (deadline) |           |
+| Key              | Version | Creator    | Sticky | Permissions  | Mounted Server...           | Child...  |
+| ---              | ---     | ---        | ---    | ---          | ---                         | ---       |
+| 540f1a56/        | 54321   | user (ts)  | 1      | {"Admin":... |                             | foo (ts1) |
+| 1234abcd/foo     | 123     | user (ts1) |        | {"Admin":... |                             | bar (ts2) |
+| 46d523e3/foo/bar | 5436    | user (ts2) |        | {"Admin":... | /example.com:123 (deadline) |           |
 
 ## Mutations
 
diff --git a/services/mounttable/btmtd/internal/bt.go b/services/mounttable/btmtd/internal/bt.go
index 140a7a1..de89f98 100644
--- a/services/mounttable/btmtd/internal/bt.go
+++ b/services/mounttable/btmtd/internal/bt.go
@@ -5,6 +5,8 @@
 package internal
 
 import (
+	"bytes"
+	"encoding/binary"
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
@@ -23,6 +25,7 @@
 	"google.golang.org/grpc/codes"
 
 	"v.io/v23/context"
+	"v.io/v23/conventions"
 	"v.io/v23/security"
 	"v.io/v23/security/access"
 	v23mt "v.io/v23/services/mounttable"
@@ -35,10 +38,10 @@
 	serversFamily  = "s"
 	childrenFamily = "c"
 
-	versionColumn     = "v"
+	creatorColumn     = "c"
 	permissionsColumn = "p"
 	stickyColumn      = "s"
-	timestampColumn   = "t"
+	versionColumn     = "v"
 )
 
 // NewBigTable returns a BigTable object that abstracts some aspects of the
@@ -54,15 +57,17 @@
 		return nil, err
 	}
 
-	return &BigTable{
+	bt := &BigTable{
 		tableName: tableName,
 		cache:     &rowCache{},
-		tbl:       client.Open(tableName),
 		createAdminClient: func() (*bigtable.AdminClient, error) {
 			return bigtable.NewAdminClient(ctx, project, zone, cluster, cloud.WithTokenSource(tk))
 
 		},
-	}, nil
+	}
+	bt.nodeTbl = client.Open(bt.nodeTableName())
+	bt.counterTbl = client.Open(bt.counterTableName())
+	return bt, nil
 }
 
 // NewTestBigTable returns a BigTable object that is connected to an in-memory
@@ -82,11 +87,10 @@
 		return nil, nil, err
 	}
 
-	return &BigTable{
+	bt := &BigTable{
 		tableName: tableName,
 		testMode:  true,
 		cache:     &rowCache{},
-		tbl:       client.Open(tableName),
 		createAdminClient: func() (*bigtable.AdminClient, error) {
 			conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
 			if err != nil {
@@ -94,17 +98,29 @@
 			}
 			return bigtable.NewAdminClient(ctx, "", "", "", cloud.WithBaseGRPC(conn))
 		},
-	}, func() { srv.Close() }, nil
+	}
+	bt.nodeTbl = client.Open(bt.nodeTableName())
+	bt.counterTbl = client.Open(bt.counterTableName())
+	return bt, func() { srv.Close() }, nil
 }
 
 type BigTable struct {
 	tableName         string
 	testMode          bool
-	tbl               *bigtable.Table
+	nodeTbl           *bigtable.Table
+	counterTbl        *bigtable.Table
 	cache             *rowCache
 	createAdminClient func() (*bigtable.AdminClient, error)
 }
 
+func (b *BigTable) nodeTableName() string {
+	return b.tableName
+}
+
+func (b *BigTable) counterTableName() string {
+	return b.tableName + "-counters"
+}
+
 // SetupTable creates the table, column families, and GC policies.
 func (b *BigTable) SetupTable(ctx *context.T, permissionsFile string) error {
 	bctx, cancel := btctx(ctx)
@@ -116,23 +132,28 @@
 	}
 	defer client.Close()
 
-	if err := client.CreateTable(bctx, b.tableName); err != nil {
+	if err := client.CreateTable(bctx, b.counterTableName()); err != nil {
+		return err
+	}
+	if err := client.CreateTable(bctx, b.nodeTableName()); err != nil {
 		return err
 	}
 
 	families := []struct {
-		name     string
-		gcPolicy bigtable.GCPolicy
+		tableName  string
+		familyName string
+		gcPolicy   bigtable.GCPolicy
 	}{
-		{serversFamily, bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(1), bigtable.MaxAgePolicy(time.Second))},
-		{metadataFamily, bigtable.MaxVersionsPolicy(1)},
-		{childrenFamily, bigtable.MaxVersionsPolicy(1)},
+		{b.counterTableName(), metadataFamily, bigtable.MaxVersionsPolicy(1)},
+		{b.nodeTableName(), metadataFamily, bigtable.MaxVersionsPolicy(1)},
+		{b.nodeTableName(), serversFamily, bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(1), bigtable.MaxAgePolicy(time.Second))},
+		{b.nodeTableName(), childrenFamily, bigtable.MaxVersionsPolicy(1)},
 	}
 	for _, f := range families {
-		if err := client.CreateColumnFamily(bctx, b.tableName, f.name); err != nil {
+		if err := client.CreateColumnFamily(bctx, f.tableName, f.familyName); err != nil {
 			return err
 		}
-		if err := client.SetGCPolicy(bctx, b.tableName, f.name, f.gcPolicy); err != nil {
+		if err := client.SetGCPolicy(bctx, f.tableName, f.familyName, f.gcPolicy); err != nil {
 			return err
 		}
 	}
@@ -142,7 +163,7 @@
 	}
 	perms := make(access.Permissions)
 	perms.Add(security.AllPrincipals, string(v23mt.Admin))
-	return b.createRow(ctx, "", perms, b.now())
+	return b.createRow(ctx, "", perms, "", b.now())
 }
 
 func (b *BigTable) timeFloor(t bigtable.Timestamp) bigtable.Timestamp {
@@ -176,7 +197,10 @@
 		return err
 	}
 	defer client.Close()
-	return client.DeleteTable(bctx, b.tableName)
+	if err := client.DeleteTable(bctx, b.counterTableName()); err != nil {
+		return err
+	}
+	return client.DeleteTable(bctx, b.nodeTableName())
 }
 
 // DumpTable prints all the mounttable nodes stored in the bigtable.
@@ -185,7 +209,7 @@
 	defer cancel()
 
 	clock := timekeeper.RealTime()
-	return b.tbl.ReadRows(bctx, bigtable.InfiniteRange(""),
+	return b.nodeTbl.ReadRows(bctx, bigtable.InfiniteRange(""),
 		func(row bigtable.Row) bool {
 			n := nodeFromRow(ctx, b, row, clock)
 			if n.name == "" {
@@ -219,7 +243,7 @@
 	defer cancel()
 
 	count := 0
-	if err := b.tbl.ReadRows(bctx, bigtable.InfiniteRange(""),
+	if err := b.nodeTbl.ReadRows(bctx, bigtable.InfiniteRange(""),
 		func(row bigtable.Row) bool {
 			count++
 			return true
@@ -265,7 +289,7 @@
 	// server.
 	// Either way, we can't used the cached version anymore.
 	defer b.cache.invalidate(row)
-	return b.tbl.Apply(bctx, row, m, opts...)
+	return b.nodeTbl.Apply(bctx, row, m, opts...)
 }
 
 func (b *BigTable) readRow(ctx *context.T, key string, opts ...bigtable.ReadOption) (bigtable.Row, error) {
@@ -273,7 +297,7 @@
 		func() (bigtable.Row, error) {
 			bctx, cancel := btctx(ctx)
 			defer cancel()
-			return b.tbl.ReadRow(bctx, key, opts...)
+			return b.nodeTbl.ReadRow(bctx, key, opts...)
 		},
 	)
 	if grpc.Code(err) == codes.DeadlineExceeded {
@@ -285,14 +309,42 @@
 	return row, err
 }
 
-func (b *BigTable) createRow(ctx *context.T, name string, perms access.Permissions, ts bigtable.Timestamp) error {
+func (b *BigTable) createRow(ctx *context.T, name string, perms access.Permissions, creator string, ts bigtable.Timestamp) error {
 	jsonPerms, err := json.Marshal(perms)
 	if err != nil {
 		return err
 	}
+	if creator == "" {
+		creator = conventions.ServerUser
+	}
 	mut := bigtable.NewMutation()
-	mut.Set(metadataFamily, timestampColumn, ts, []byte{1})
+	mut.Set(metadataFamily, creatorColumn, ts, []byte(creator))
 	mut.Set(metadataFamily, permissionsColumn, bigtable.ServerTime, jsonPerms)
 	mut.Set(metadataFamily, versionColumn, bigtable.ServerTime, []byte(strconv.FormatUint(uint64(rand.Uint32()), 10)))
-	return b.apply(ctx, rowKey(name), mut)
+	if err := b.apply(ctx, rowKey(name), mut); err != nil {
+		return err
+	}
+	return b.incrementCreatorNodeCount(ctx, creator, 1)
+}
+
+func (b *BigTable) incrementCreatorNodeCount(ctx *context.T, creator string, delta int64) error {
+	bctx, cancel := btctx(ctx)
+	defer cancel()
+
+	key := "num-nodes-per-user:" + creator
+	m := bigtable.NewReadModifyWrite()
+	m.Increment(metadataFamily, "c", delta)
+	row, err := b.counterTbl.ApplyReadModifyWrite(bctx, key, m)
+	if err != nil {
+		return err
+	}
+	if len(row[metadataFamily]) == 1 {
+		var c int64
+		b := row[metadataFamily][0].Value
+		if err := binary.Read(bytes.NewReader(b), binary.BigEndian, &c); err != nil {
+			return err
+		}
+		ctx.Infof("Counter %s = %d", key, c)
+	}
+	return nil
 }
diff --git a/services/mounttable/btmtd/internal/mounttable.go b/services/mounttable/btmtd/internal/mounttable.go
index c744974..1100ebe 100644
--- a/services/mounttable/btmtd/internal/mounttable.go
+++ b/services/mounttable/btmtd/internal/mounttable.go
@@ -13,6 +13,7 @@
 	"time"
 
 	"v.io/v23/context"
+	"v.io/v23/conventions"
 	"v.io/v23/glob"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
@@ -359,7 +360,7 @@
 				}
 			}
 			perms.Normalize()
-			n, err = parent.createChild(ctx, elem, perms)
+			n, err = parent.createChild(ctx, elem, perms, pickCreator(ctx, call))
 			if err != nil {
 				return nil, "", err
 			}
@@ -423,3 +424,7 @@
 	}
 	return templatePerms
 }
+
+func pickCreator(ctx *context.T, call security.Call) string {
+	return conventions.GetClientUserIds(ctx, call)[0]
+}
diff --git a/services/mounttable/btmtd/internal/node.go b/services/mounttable/btmtd/internal/node.go
index d26aa3a..f07fd9e 100644
--- a/services/mounttable/btmtd/internal/node.go
+++ b/services/mounttable/btmtd/internal/node.go
@@ -34,6 +34,7 @@
 	creationTime bigtable.Timestamp
 	permissions  access.Permissions
 	version      string
+	creator      string
 	mountFlags   mtFlags
 	servers      []naming.MountedServer
 	children     []string
@@ -89,8 +90,9 @@
 				ctx.Errorf("Failed to decode permissions for %s", name)
 				return nil
 			}
-		case timestampColumn:
+		case creatorColumn:
 			n.creationTime = i.Timestamp
+			n.creator = string(i.Value)
 		}
 	}
 	n.servers = make([]naming.MountedServer, 0, len(row[serversFamily]))
@@ -116,7 +118,7 @@
 	return n
 }
 
-func (n *mtNode) createChild(ctx *context.T, child string, perms access.Permissions) (*mtNode, error) {
+func (n *mtNode) createChild(ctx *context.T, child string, perms access.Permissions, creator string) (*mtNode, error) {
 	ts := n.bt.now()
 	mut := bigtable.NewMutation()
 	mut.Set(childrenFamily, child, ts, []byte{1})
@@ -134,7 +136,7 @@
 	childName := naming.Join(n.name, child)
 	longCtx, cancel := longTimeout(ctx)
 	defer cancel()
-	if err := n.bt.createRow(longCtx, childName, perms, ts); err != nil {
+	if err := n.bt.createRow(longCtx, childName, perms, creator, ts); err != nil {
 		return nil, err
 	}
 	n, err := getNode(ctx, n.bt, childName)
@@ -241,7 +243,7 @@
 			// exist. It could be that it is being created or
 			// deleted concurrently. To be sure, we have to create
 			// it before deleting it.
-			if cn, err = n.createChild(ctx, c, n.permissions); err != nil {
+			if cn, err = n.createChild(ctx, c, n.permissions, ""); err != nil {
 				return err
 			}
 		}
@@ -273,7 +275,10 @@
 
 	longCtx, cancel := longTimeout(ctx)
 	defer cancel()
-	return n.bt.apply(longCtx, rowKey(parent), mut)
+	if err := n.bt.apply(longCtx, rowKey(parent), mut); err != nil {
+		return err
+	}
+	return n.bt.incrementCreatorNodeCount(ctx, n.creator, -1)
 }
 
 func (n *mtNode) setPermissions(ctx *context.T, perms access.Permissions) error {
@@ -359,7 +364,7 @@
 	for _, node := range sortedNodes {
 		perms := nodes[node]
 		if node == "" {
-			if err := bt.createRow(ctx, "", perms, ts); err != nil {
+			if err := bt.createRow(ctx, "", perms, "", ts); err != nil {
 				return err
 			}
 			continue
@@ -375,7 +380,7 @@
 				if err != nil {
 					return err
 				}
-				if n, err = parent.createChild(ctx, e, parent.permissions); err != nil {
+				if n, err = parent.createChild(ctx, e, parent.permissions, ""); err != nil {
 					return err
 				}
 			}