services/mounttable/btmtd: mounttable on bigtable

This is a proof of concept for implementing a mounttable server on top
of Cloud Bigtable.

Everything works except:
  - Accounting and quotas
  - Exported stats

Most of the original mounttabled tests pass with this new
implementation.

See README.md for more details.

Change-Id: I93637a47f2ed7dec474f832d88d29eb1431293c5
diff --git a/cmd/mounttable/doc.go b/cmd/mounttable/doc.go
index 0bd4ca8..935802c 100644
--- a/cmd/mounttable/doc.go
+++ b/cmd/mounttable/doc.go
@@ -87,7 +87,7 @@
 Mounts a server <name> onto a mount table
 
 Usage:
-   mounttable mount [flags] <mount name> <name> <ttl> [M|R]
+   mounttable mount [flags] <mount name> <name> <ttl> [L|M|R]
 
 <mount name> is a mount name on a mount table.
 
@@ -96,8 +96,8 @@
 <ttl> is the TTL of the new entry. It is a decimal number followed by a unit
 suffix (s, m, h). A value of 0s represents an infinite duration.
 
-[M|R] are mount options. M indicates that <name> is a mounttable. R indicates
-that existing entries should be removed.
+[L|M|R] are mount options. L indicates that <name> is a leaf. M indicates that
+<name> is a mounttable. R indicates that existing entries should be removed.
 
 Mounttable unmount
 
diff --git a/cmd/mounttable/impl.go b/cmd/mounttable/impl.go
index 25a0c80..cf1e781 100644
--- a/cmd/mounttable/impl.go
+++ b/cmd/mounttable/impl.go
@@ -89,7 +89,7 @@
 	Name:     "mount",
 	Short:    "Mounts a server <name> onto a mount table",
 	Long:     "Mounts a server <name> onto a mount table",
-	ArgsName: "<mount name> <name> <ttl> [M|R]",
+	ArgsName: "<mount name> <name> <ttl> [L|M|R]",
 	ArgsLong: `
 <mount name> is a mount name on a mount table.
 
@@ -98,8 +98,8 @@
 <ttl> is the TTL of the new entry. It is a decimal number followed by a unit
 suffix (s, m, h). A value of 0s represents an infinite duration.
 
-[M|R] are mount options. M indicates that <name> is a mounttable. R indicates
-that existing entries should be removed.
+[L|M|R] are mount options. L indicates that <name> is a leaf. M indicates that
+<name> is a mounttable. R indicates that existing entries should be removed.
 `,
 }
 
@@ -123,6 +123,8 @@
 	if got >= 4 {
 		for _, c := range args[3] {
 			switch c {
+			case 'L':
+				flags |= naming.MountFlag(naming.Leaf)
 			case 'M':
 				flags |= naming.MountFlag(naming.MT)
 			case 'R':
diff --git a/services/mounttable/btmtd/README.md b/services/mounttable/btmtd/README.md
new file mode 100644
index 0000000..d08ce9a
--- /dev/null
+++ b/services/mounttable/btmtd/README.md
@@ -0,0 +1,110 @@
+# Mounttable on Cloud Bigtable
+
+This package and its sub-packages contain a [mounttable server] implementation
+that uses Google's [Cloud Bigtable] service for storage. It is fast and scalable
+to millions of nodes and, with enough replicas, millions of requests per second.
+
+## Schema
+
+Bigtable is not a relational database. Each table has only one key, and all
+operations are atomic only at the row level. There is no way to mutate multiple
+rows together atomically.
+
+See the [Overview of Cloud Bigtable] for more information.
+
+Our table has one row per node. The row key is a hash of the node name followed
+by the name itself. This spreads rows evenly across all tablet servers with no
+risk of name collision.
+
+The table has three column families:
+
+   * Metadata `m`: used to store information about the row:
+      * Version `v`: The version changes every time the row is mutated. It is
+        used to detect conflicts related to concurrent access.
+      * Timestamp `t`: The cell's timestamp is the node creation time.
+      * Sticky `s`: When this column is present, the node is not automatically
+        garbage collected.
+      * Permissions `p`: The [access.Permissions] of the node.
+   * Servers `s`: Each mounted server has its own column. The column name is the
+     server's address. The value contains the [mount flags]. The timestamp is
+     the mount deadline.
+   * Children `c`: Each child has its own column. The column name is the name of
+     the child, without the path. The timestamp is the child creation time.
+
+Example:
+
+| Key              | Version | Timestamp | Sticky | Permissions  | Mounted Server...           | Child...  |
+| ---              | ---     | ---       | ---    | ---          | ---                         | ---       |
+| 540f1a56/        | 54321   | (ts)      | 1      | {"Admin":... |                             | foo (ts1) |
+| 1234abcd/foo     | 123     | (ts1)     |        | {"Admin":... |                             | bar (ts2) |
+| 46d523e3/foo/bar | 5436    | (ts2)     |        | {"Admin":... | /example.com:123 (deadline) |           |
+
+## Mutations
+
+All operations use optimistic concurrency control. If a conflicting
+change happens during a mutation, the whole operation is restarted.
+
+  * Mutation on N
+    * Get Node N
+    * Check caller's permissions
+    * Apply mutation on N if node version hasn't changed
+
+If the node version changed, it means that another mutation was applied between
+the time when we retrieved the node and when we tried to apply our mutation.
+When that happens, we restart to whole operation, starting with retrieving the
+node again.
+
+### Mounting / Unmounting a server
+
+Mounting a server consists of adding a new cell to the node's row. The column
+family is `s`, the column name is the address of the server, the timestamp is
+the mount deadline, and the value contains the [mount flags].
+
+Unmounting a server consists of deleting the server's column.
+
+### Adding / Removing a child node
+
+Adding or removing a node requires two mutations: one on the parent, one on the
+child.
+
+When adding a node, we first add it to the parent, and then create a new row
+with the same timestamp.
+
+When deleting a node, we first delete the row, and then delete the child column
+on the parent.
+
+If the server process dies between the two mutations, it will leave the parent
+with a reference to a child row that doesn't exist. As a consequence, the parent
+will never be seen as "empty" and will not be automatically garbage collected.
+This will be corrected when:
+
+  * the child is re-created, or
+  * the parent is forcibly deleted.
+
+## Hot rows & caching
+
+Some nodes are expected to be accessed significantly more than others, e.g. the
+root node and its immediate children are traversed more often than nodes that
+are further down the tree. The bigtable rows associated with these nodes are
+"hotter" which can lead to traffic imbalance and poor performance.
+
+This problem is alleviated with a small cache in the bigtable client. High
+frequency or concurrent requests for the same rows can be bundled together to
+reduce both latency and bigtable load at the same time.
+
+## Opportunistic garbage collection
+
+A node can be garbage-collected when it has no children, no mounted servers, and
+hasn't been marked as _sticky_. A node is _sticky_ when someone explicitly
+called [SetPermissions] on it.
+
+The garbage collection happens opportunistically. When a mounttable server
+accessed a node that is eligible for garbage collection while processing a
+request, this node is removed before the ongoing request completes.
+
+[Cloud Bigtable]: https://cloud.google.com/bigtable/docs/
+[Overview of Cloud Bigtable]: https://cloud.google.com/bigtable/docs/api-overview
+[mounttable server]: https://github.com/vanadium/go.v23/blob/master/services/mounttable/service.vdl
+[SetPermissions]: https://github.com/vanadium/go.v23/blob/master/services/permissions/service.vdl#L58
+[access.Permissions]: https://github.com/vanadium/go.v23/blob/master/security/access/types.vdl#L130
+[mount flags]: https://github.com/vanadium/go.v23/blob/master/naming/types.vdl#L9
diff --git a/services/mounttable/btmtd/doc.go b/services/mounttable/btmtd/doc.go
new file mode 100644
index 0000000..3882d49
--- /dev/null
+++ b/services/mounttable/btmtd/doc.go
@@ -0,0 +1,94 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated via go generate.
+// DO NOT UPDATE MANUALLY
+
+/*
+Runs the mounttable service.
+
+Usage:
+   btmtd [flags]
+   btmtd [flags] <command>
+
+The btmtd commands are:
+   setup       Creates and sets up the table
+   destroy     Destroy the table
+   dump        Dump the table
+   help        Display help for commands or topics
+
+The btmtd flags are:
+ -cluster=
+   The Cloud Bigtable cluster name
+ -in-memory-test=false
+   If true, use an in-memory bigtable server (for testing only)
+ -key-file=
+   The file that contains the Google Cloud JSON credentials to use
+ -permissions-file=
+   The file that contains the initial node permissions.
+ -project=
+   The Google Cloud project of the Cloud Bigtable cluster
+ -table=mounttable
+   The name of the table to use
+ -zone=
+   The Google Cloud zone of the Cloud Bigtable cluster
+
+The global flags are:
+ -alsologtostderr=true
+   log to standard error as well as files
+ -log_backtrace_at=:0
+   when logging hits line file:N, emit a stack trace
+ -log_dir=
+   if non-empty, write log files to this directory
+ -logtostderr=false
+   log to standard error instead of files
+ -max_stack_buf_size=4292608
+   max size in bytes of the buffer to use for logging stack traces
+ -metadata=<just specify -metadata to activate>
+   Displays metadata for the program and exits.
+ -stderrthreshold=2
+   logs at or above this threshold go to stderr
+ -time=false
+   Dump timing information to stderr before exiting the program.
+ -v=0
+   log level for V logs
+ -v23.credentials=
+   directory to use for storing security credentials
+ -v23.i18n-catalogue=
+   18n catalogue files to load, comma separated
+ -v23.namespace.root=[/(dev.v.io:r:vprod:service:mounttabled)@ns.dev.v.io:8101]
+   local namespace root; can be repeated to provided multiple roots
+ -v23.permissions.file=map[]
+   specify a perms file as <name>:<permsfile>
+ -v23.permissions.literal=
+   explicitly specify the runtime perms as a JSON-encoded access.Permissions.
+   Overrides all --v23.permissions.file flags.
+ -v23.proxy=
+   object name of proxy service to use to export services across network
+   boundaries
+ -v23.tcp.address=
+   address to listen on
+ -v23.tcp.protocol=wsh
+   protocol to listen with
+ -v23.vtrace.cache-size=1024
+   The number of vtrace traces to store in memory.
+ -v23.vtrace.collect-regexp=
+   Spans and annotations that match this regular expression will trigger trace
+   collection.
+ -v23.vtrace.dump-on-shutdown=true
+   If true, dump all stored traces on runtime shutdown.
+ -v23.vtrace.sample-rate=0
+   Rate (from 0.0 to 1.0) to sample vtrace traces.
+ -v23.vtrace.v=0
+   The verbosity level of the log messages to be captured in traces
+ -vmodule=
+   comma-separated list of globpattern=N settings for filename-filtered logging
+   (without the .go suffix).  E.g. foo/bar/baz.go is matched by patterns baz or
+   *az or b* but not by bar/baz or baz.go or az or b.*
+ -vpath=
+   comma-separated list of regexppattern=N settings for file pathname-filtered
+   logging (without the .go suffix).  E.g. foo/bar/baz.go is matched by patterns
+   foo/bar/baz or fo.*az or oo/ba or b.z but not by foo/bar/baz.go or fo*az
+*/
+package main
diff --git a/services/mounttable/btmtd/internal/bt.go b/services/mounttable/btmtd/internal/bt.go
new file mode 100644
index 0000000..140a7a1
--- /dev/null
+++ b/services/mounttable/btmtd/internal/bt.go
@@ -0,0 +1,298 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"math/rand"
+	"strconv"
+	"strings"
+	"time"
+
+	netcontext "golang.org/x/net/context"
+	"golang.org/x/oauth2"
+	"golang.org/x/oauth2/google"
+	"google.golang.org/cloud"
+	"google.golang.org/cloud/bigtable"
+	"google.golang.org/cloud/bigtable/bttest"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+
+	"v.io/v23/context"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	v23mt "v.io/v23/services/mounttable"
+
+	"v.io/x/ref/lib/timekeeper"
+)
+
+const (
+	metadataFamily = "m"
+	serversFamily  = "s"
+	childrenFamily = "c"
+
+	versionColumn     = "v"
+	permissionsColumn = "p"
+	stickyColumn      = "s"
+	timestampColumn   = "t"
+)
+
+// NewBigTable returns a BigTable object that abstracts some aspects of the
+// Cloud Bigtable API.
+func NewBigTable(keyFile, project, zone, cluster, tableName string) (*BigTable, error) {
+	ctx := netcontext.Background()
+	tk, err := getTokenSource(ctx, bigtable.Scope, keyFile)
+	if err != nil {
+		return nil, err
+	}
+	client, err := bigtable.NewClient(ctx, project, zone, cluster, cloud.WithTokenSource(tk))
+	if err != nil {
+		return nil, err
+	}
+
+	return &BigTable{
+		tableName: tableName,
+		cache:     &rowCache{},
+		tbl:       client.Open(tableName),
+		createAdminClient: func() (*bigtable.AdminClient, error) {
+			return bigtable.NewAdminClient(ctx, project, zone, cluster, cloud.WithTokenSource(tk))
+
+		},
+	}, nil
+}
+
+// NewTestBigTable returns a BigTable object that is connected to an in-memory
+// fake bigtable cluster.
+func NewTestBigTable(tableName string) (*BigTable, func(), error) {
+	srv, err := bttest.NewServer("127.0.0.1:0")
+	if err != nil {
+		return nil, nil, err
+	}
+	ctx := netcontext.Background()
+	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
+	if err != nil {
+		return nil, nil, err
+	}
+	client, err := bigtable.NewClient(ctx, "", "", "", cloud.WithBaseGRPC(conn))
+	if err != nil {
+		return nil, nil, err
+	}
+
+	return &BigTable{
+		tableName: tableName,
+		testMode:  true,
+		cache:     &rowCache{},
+		tbl:       client.Open(tableName),
+		createAdminClient: func() (*bigtable.AdminClient, error) {
+			conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
+			if err != nil {
+				return nil, err
+			}
+			return bigtable.NewAdminClient(ctx, "", "", "", cloud.WithBaseGRPC(conn))
+		},
+	}, func() { srv.Close() }, nil
+}
+
+type BigTable struct {
+	tableName         string
+	testMode          bool
+	tbl               *bigtable.Table
+	cache             *rowCache
+	createAdminClient func() (*bigtable.AdminClient, error)
+}
+
+// SetupTable creates the table, column families, and GC policies.
+func (b *BigTable) SetupTable(ctx *context.T, permissionsFile string) error {
+	bctx, cancel := btctx(ctx)
+	defer cancel()
+
+	client, err := b.createAdminClient()
+	if err != nil {
+		return err
+	}
+	defer client.Close()
+
+	if err := client.CreateTable(bctx, b.tableName); err != nil {
+		return err
+	}
+
+	families := []struct {
+		name     string
+		gcPolicy bigtable.GCPolicy
+	}{
+		{serversFamily, bigtable.UnionPolicy(bigtable.MaxVersionsPolicy(1), bigtable.MaxAgePolicy(time.Second))},
+		{metadataFamily, bigtable.MaxVersionsPolicy(1)},
+		{childrenFamily, bigtable.MaxVersionsPolicy(1)},
+	}
+	for _, f := range families {
+		if err := client.CreateColumnFamily(bctx, b.tableName, f.name); err != nil {
+			return err
+		}
+		if err := client.SetGCPolicy(bctx, b.tableName, f.name, f.gcPolicy); err != nil {
+			return err
+		}
+	}
+
+	if permissionsFile != "" {
+		return createNodesFromFile(ctx, b, permissionsFile)
+	}
+	perms := make(access.Permissions)
+	perms.Add(security.AllPrincipals, string(v23mt.Admin))
+	return b.createRow(ctx, "", perms, b.now())
+}
+
+func (b *BigTable) timeFloor(t bigtable.Timestamp) bigtable.Timestamp {
+	// The bigtable server expects millisecond granularity, but
+	// bigtable.Now() returns a timestamp with microsecond granularity.
+	//
+	// https://github.com/GoogleCloudPlatform/gcloud-golang/blob/master/bigtable/bttest/inmem.go#L734
+	// https://github.com/GoogleCloudPlatform/gcloud-golang/blob/master/bigtable/bigtable.go#L531
+	return (t / 1000) * 1000
+}
+
+func (b *BigTable) timeNext(t bigtable.Timestamp) bigtable.Timestamp {
+	return (t/1000 + 1) * 1000
+}
+
+func (b *BigTable) now() bigtable.Timestamp {
+	return b.timeFloor(bigtable.Now())
+}
+
+func (b *BigTable) time(t time.Time) bigtable.Timestamp {
+	return b.timeFloor(bigtable.Time(t))
+}
+
+// DeleteTable deletes the table.
+func (b *BigTable) DeleteTable(ctx *context.T) error {
+	bctx, cancel := btctx(ctx)
+	defer cancel()
+
+	client, err := b.createAdminClient()
+	if err != nil {
+		return err
+	}
+	defer client.Close()
+	return client.DeleteTable(bctx, b.tableName)
+}
+
+// DumpTable prints all the mounttable nodes stored in the bigtable.
+func (b *BigTable) DumpTable(ctx *context.T) error {
+	bctx, cancel := btctx(ctx)
+	defer cancel()
+
+	clock := timekeeper.RealTime()
+	return b.tbl.ReadRows(bctx, bigtable.InfiniteRange(""),
+		func(row bigtable.Row) bool {
+			n := nodeFromRow(ctx, b, row, clock)
+			if n.name == "" {
+				n.name = "(root)"
+			}
+			fmt.Printf("%s version: %s", n.name, n.version)
+			if n.sticky {
+				fmt.Printf(" sticky")
+			}
+			fmt.Printf(" perms: %s", n.permissions)
+			if len(n.servers) > 0 {
+				fmt.Printf(" servers:")
+				for _, s := range n.servers {
+					delta := s.Deadline.Time.Sub(clock.Now())
+					fmt.Printf(" [%s %ds]", s.Server, int(delta.Seconds()))
+				}
+				fmt.Printf(" flags: %+v", n.mountFlags)
+			}
+			if len(n.children) > 0 {
+				fmt.Printf(" children: [%s]", strings.Join(n.children, " "))
+			}
+			fmt.Println()
+			return true
+		},
+		bigtable.RowFilter(bigtable.LatestNFilter(1)),
+	)
+}
+
+func (b *BigTable) CountRows(ctx *context.T) (int, error) {
+	bctx, cancel := btctx(ctx)
+	defer cancel()
+
+	count := 0
+	if err := b.tbl.ReadRows(bctx, bigtable.InfiniteRange(""),
+		func(row bigtable.Row) bool {
+			count++
+			return true
+		},
+		bigtable.RowFilter(bigtable.LatestNFilter(1)),
+	); err != nil {
+		return 0, err
+	}
+	return count, nil
+}
+
+func getTokenSource(ctx netcontext.Context, scope, keyFile string) (oauth2.TokenSource, error) {
+	if len(keyFile) == 0 {
+		return google.DefaultTokenSource(ctx, scope)
+	}
+	data, err := ioutil.ReadFile(keyFile)
+	if err != nil {
+		return nil, err
+	}
+	config, err := google.JWTConfigFromJSON(data, scope)
+	if err != nil {
+		return nil, err
+	}
+	return config.TokenSource(ctx), nil
+}
+
+func btctx(ctx *context.T) (netcontext.Context, func()) {
+	deadline, hasDeadline := ctx.Deadline()
+	now := time.Now()
+	if !hasDeadline || deadline.Sub(now) < time.Minute {
+		deadline = now.Add(time.Minute)
+	}
+	return netcontext.WithDeadline(netcontext.Background(), deadline)
+}
+
+func (b *BigTable) apply(ctx *context.T, row string, m *bigtable.Mutation, opts ...bigtable.ApplyOption) error {
+	bctx, cancel := btctx(ctx)
+	defer cancel()
+	// The local cache entry for this row is invalidated after each
+	// mutation, whether it succeeds or not.
+	// If it succeeds, the row has changed and the cached data is stale.
+	// If it fails, it's likely because of a concurrent mutation by another
+	// server.
+	// Either way, we can't used the cached version anymore.
+	defer b.cache.invalidate(row)
+	return b.tbl.Apply(bctx, row, m, opts...)
+}
+
+func (b *BigTable) readRow(ctx *context.T, key string, opts ...bigtable.ReadOption) (bigtable.Row, error) {
+	row, err := b.cache.getRefresh(key,
+		func() (bigtable.Row, error) {
+			bctx, cancel := btctx(ctx)
+			defer cancel()
+			return b.tbl.ReadRow(bctx, key, opts...)
+		},
+	)
+	if grpc.Code(err) == codes.DeadlineExceeded {
+		ctx.Errorf("Received DeadlineExceeded for %s", key)
+		if b.testMode {
+			panic("DeadlineExceeded from testserver")
+		}
+	}
+	return row, err
+}
+
+func (b *BigTable) createRow(ctx *context.T, name string, perms access.Permissions, ts bigtable.Timestamp) error {
+	jsonPerms, err := json.Marshal(perms)
+	if err != nil {
+		return err
+	}
+	mut := bigtable.NewMutation()
+	mut.Set(metadataFamily, timestampColumn, ts, []byte{1})
+	mut.Set(metadataFamily, permissionsColumn, bigtable.ServerTime, jsonPerms)
+	mut.Set(metadataFamily, versionColumn, bigtable.ServerTime, []byte(strconv.FormatUint(uint64(rand.Uint32()), 10)))
+	return b.apply(ctx, rowKey(name), mut)
+}
diff --git a/services/mounttable/btmtd/internal/cache.go b/services/mounttable/btmtd/internal/cache.go
new file mode 100644
index 0000000..1664217
--- /dev/null
+++ b/services/mounttable/btmtd/internal/cache.go
@@ -0,0 +1,86 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"sync"
+	"time"
+
+	"github.com/golang/groupcache/lru"
+	"google.golang.org/cloud/bigtable"
+)
+
+// The purpose of this cache is to minimize the impact of "hot" rows, i.e.
+// rows that get accessed significantly more often than others.
+//
+// For example, rows associated to the root node and its immediate children are
+// expected to be hotter than rows associated with nodes further down the
+// tree.
+//
+// The cache also makes it so that concurrent read requests for the same row
+// are bundled together and only one read request is sent.
+//
+// The cacheTimeout controls how often rows can be refetched, which directly
+// affects how quickly other servers see mutations from this server. A short
+// timeout means that mutations propagate faster.
+//
+// With the current value, the bigtable server will see at most 1 request per
+// node per server per second, with the assumption that the server see at
+// most 'cacheSize' nodes per second.
+
+const (
+	cacheSize    = 1000
+	cacheTimeout = time.Second
+)
+
+type rowCache struct {
+	// mu guards cache insertions and lookups.
+	mu    sync.Mutex
+	cache *lru.Cache
+}
+
+type cacheEntry struct {
+	// mu guards 'row' and 'updated', and prevents concurrent rpc requests
+	// for the same row. It is locked when the refresh rpc is running.
+	mu      sync.Mutex
+	row     bigtable.Row
+	updated time.Time
+}
+
+func (c *rowCache) getRefresh(key string, getRow func() (bigtable.Row, error)) (bigtable.Row, error) {
+	c.mu.Lock()
+	if c.cache == nil {
+		c.cache = lru.New(cacheSize)
+	}
+	entry, ok := c.cache.Get(key)
+	if !ok {
+		entry = &cacheEntry{}
+		c.cache.Add(key, entry)
+	}
+	c.mu.Unlock()
+
+	e := entry.(*cacheEntry)
+	e.mu.Lock()
+	defer e.mu.Unlock()
+
+	if clock.Now().Sub(e.updated) < cacheTimeout {
+		return e.row, nil
+	}
+	var err error
+	if e.row, err = getRow(); err != nil || e.row.Key() == "" {
+		return e.row, err
+	}
+	e.updated = clock.Now()
+	return e.row, nil
+}
+
+func (c *rowCache) invalidate(key string) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.cache == nil {
+		return
+	}
+	c.cache.Remove(key)
+}
diff --git a/services/mounttable/btmtd/internal/cache_test.go b/services/mounttable/btmtd/internal/cache_test.go
new file mode 100644
index 0000000..be79e23
--- /dev/null
+++ b/services/mounttable/btmtd/internal/cache_test.go
@@ -0,0 +1,97 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"google.golang.org/cloud/bigtable"
+
+	"v.io/x/ref/test/timekeeper"
+)
+
+func TestRowCache(t *testing.T) {
+	clock := timekeeper.NewManualTime()
+	clock.AdvanceTime(2 * cacheTimeout)
+	SetClock(clock)
+	rowCache := &rowCache{}
+
+	testcases := []struct {
+		key             string
+		returnedVersion string
+		expectedVersion string
+		expectedCalls   int
+		skip            time.Duration
+	}{
+		// Time 0
+		{"one", "100", "100", 1, 0},
+		{"two", "200", "200", 1, cacheTimeout/2 + 1},
+		// Time 0.5 sec
+		{"one", "XXX", "100", 0, 0},
+		{"two", "XXX", "200", 0, cacheTimeout/2 + 1},
+		// Time 1 sec
+		{"one", "101", "101", 1, 0},
+		{"two", "201", "201", 1, cacheTimeout/2 + 1},
+		// Time 1.5 sec
+		{"one", "XXX", "101", 0, 0},
+		{"two", "XXX", "201", 0, cacheTimeout/2 + 1},
+		// Time 2 sec
+		{"one", "102", "102", 1, 0},
+		{"two", "202", "202", 1, cacheTimeout/2 + 1},
+	}
+
+	for i, tc := range testcases {
+		calls := 0
+		getRow := func() (bigtable.Row, error) {
+			calls++
+			row := bigtable.Row{
+				metadataFamily: []bigtable.ReadItem{
+					{
+						Row:    tc.key,
+						Column: versionColumn,
+						Value:  []byte(tc.returnedVersion),
+					},
+				},
+			}
+			return row, nil
+		}
+		const N = 5
+		var wg sync.WaitGroup
+		wg.Add(N)
+		for x := 0; x < N; x++ {
+			go func() {
+				defer wg.Done()
+				row, err := rowCache.getRefresh(tc.key, getRow)
+				if err != nil {
+					t.Errorf("Unexpected error for #%d: %v", i, err)
+					return
+				}
+				if got := row.Key(); got != tc.key {
+					t.Errorf("Unexpected key for #%d. Got %q, expected %q", i, got, tc.key)
+				}
+				meta := row[metadataFamily]
+				if meta == nil {
+					t.Errorf("Missing metadata family for #%d", i)
+					return
+				}
+				if len(meta) != 1 {
+					t.Errorf("Unexpected number of cells for #%d: %v", i, meta)
+					return
+				}
+				if got := string(meta[0].Value); got != tc.expectedVersion {
+					t.Errorf("Unexpected version for #%d. Got %q, expected %q", i, got, tc.expectedVersion)
+				}
+			}()
+		}
+		wg.Wait()
+		if got := calls; got != tc.expectedCalls {
+			t.Errorf("Unexpected function call count for #%d. Got %d, expected %d", i, got, tc.expectedCalls)
+		}
+		clock.AdvanceTime(tc.skip)
+	}
+
+}
diff --git a/services/mounttable/btmtd/internal/collectionserver_test.go b/services/mounttable/btmtd/internal/collectionserver_test.go
new file mode 100644
index 0000000..17b7396
--- /dev/null
+++ b/services/mounttable/btmtd/internal/collectionserver_test.go
@@ -0,0 +1,59 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal_test
+
+import (
+	"sync"
+
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/verror"
+)
+
+// collectionServer is a very simple collection server implementation for testing, with sufficient debugging to help
+// when there are problems.
+type collectionServer struct {
+	sync.Mutex
+	contents map[string][]byte
+}
+type rpcContext struct {
+	name string
+	*collectionServer
+}
+
+var instance collectionServer
+
+func newCollectionServer() *collectionServer {
+	return &collectionServer{contents: make(map[string][]byte)}
+}
+
+// Lookup implements rpc.Dispatcher.Lookup.
+func (s *collectionServer) Lookup(_ *context.T, name string) (interface{}, security.Authorizer, error) {
+	rpcc := &rpcContext{name: name, collectionServer: s}
+	return rpcc, security.AllowEveryone(), nil
+}
+
+// Export implements CollectionServerMethods.Export.
+func (c *rpcContext) Export(ctx *context.T, _ rpc.ServerCall, val string, overwrite bool) error {
+	c.Lock()
+	defer c.Unlock()
+	if b := c.contents[c.name]; overwrite || b == nil {
+		c.contents[c.name] = []byte(val)
+		return nil
+	}
+	return verror.New(naming.ErrNameExists, ctx, c.name)
+}
+
+// Lookup implements CollectionServerMethods.Lookup.
+func (c *rpcContext) Lookup(ctx *context.T, _ rpc.ServerCall) ([]byte, error) {
+	c.Lock()
+	defer c.Unlock()
+	if val := c.contents[c.name]; val != nil {
+		return val, nil
+	}
+	return nil, verror.New(naming.ErrNoSuchName, ctx, c.name)
+}
diff --git a/services/mounttable/btmtd/internal/concurrency_test.go b/services/mounttable/btmtd/internal/concurrency_test.go
new file mode 100644
index 0000000..fc0a819
--- /dev/null
+++ b/services/mounttable/btmtd/internal/concurrency_test.go
@@ -0,0 +1,138 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal_test
+
+import (
+	"fmt"
+	"math/rand"
+	"sync"
+	"testing"
+	"time"
+
+	"v.io/v23/naming"
+	"v.io/v23/verror"
+)
+
+func TestConcurrentMountUnmount(t *testing.T) {
+	rootCtx, _, _, shutdown := initTest()
+	defer shutdown()
+
+	stop, mtAddr, bt, _ := newMT(t, "", rootCtx)
+	defer stop()
+
+	var wg sync.WaitGroup
+	busyMountUnmount := func(name string) {
+		defer wg.Done()
+		for i := 0; i < 250; i++ {
+			n := fmt.Sprintf("%s/%d", name, rand.Intn(10))
+			doMount(t, rootCtx, mtAddr, n, "/example.com:12345", true)
+			if _, err := resolve(rootCtx, naming.JoinAddressName(mtAddr, n)); err != nil {
+				t.Errorf("resolve(%q) failed: %v", n, err)
+			}
+			doUnmount(t, rootCtx, mtAddr, n, "", true)
+		}
+	}
+
+	for i := 1; i <= 5; i++ {
+		wg.Add(3)
+		go busyMountUnmount(fmt.Sprintf("a/%d", i))
+		go busyMountUnmount(fmt.Sprintf("a/b/%d", i))
+		go busyMountUnmount(fmt.Sprintf("a/c/d/e/%d", i))
+	}
+	wg.Wait()
+
+	checkMatch(t, []string{}, doGlob(t, rootCtx, mtAddr, "", "*"))
+
+	count, err := bt.CountRows(rootCtx)
+	if err != nil {
+		t.Errorf("bt.CountRows failed: %v", err)
+	}
+	if expected := 1; count != expected {
+		t.Errorf("Unexpected number of rows. Got %d, expected %d", count, expected)
+		bt.DumpTable(rootCtx)
+	}
+}
+
+func TestConcurrentMountDelete(t *testing.T) {
+	rootCtx, _, _, shutdown := initTest()
+	defer shutdown()
+
+	stop, mtAddr, bt, _ := newMT(t, "", rootCtx)
+	defer stop()
+
+	var wg sync.WaitGroup
+	busyMount := func(name string) {
+		defer wg.Done()
+		for i := 0; i < 250; i++ {
+			doMount(t, rootCtx, mtAddr, name, "/example.com:12345", true)
+		}
+	}
+	busyDelete := func(name string) {
+		defer wg.Done()
+		for i := 0; i < 250; i++ {
+			doDeleteSubtree(t, rootCtx, mtAddr, name, true)
+		}
+	}
+
+	for i := 1; i <= 5; i++ {
+		wg.Add(4)
+		go busyMount(fmt.Sprintf("a/%d", i))
+		go busyDelete(fmt.Sprintf("a/%d", i))
+		go busyMount(fmt.Sprintf("b/%d/c/d/e/f/g/h/i", i))
+		go busyDelete(fmt.Sprintf("b/%d", i))
+	}
+	wg.Wait()
+
+	doDeleteSubtree(t, rootCtx, mtAddr, "a", true)
+	doDeleteSubtree(t, rootCtx, mtAddr, "b", true)
+
+	count, err := bt.CountRows(rootCtx)
+	if err != nil {
+		t.Errorf("bt.CountRows failed: %v", err)
+	}
+	if expected := 1; count != expected {
+		t.Errorf("Unexpected number of rows. Got %d, expected %d", count, expected)
+		bt.DumpTable(rootCtx)
+	}
+}
+
+func TestConcurrentExpiry(t *testing.T) {
+	rootCtx, _, _, shutdown := initTest()
+	defer shutdown()
+
+	stop, mtAddr, bt, clock := newMT(t, "", rootCtx)
+	defer stop()
+
+	const N = 100
+	name := func(i int) string { return fmt.Sprintf("a/b/c/d/e/f/g/h/i/j/k/l/%05d/%05d", i, i) }
+	for i := 0; i < N; i++ {
+		doMount(t, rootCtx, mtAddr, name(i), "/example.com:12345", true)
+	}
+	clock.AdvanceTime(time.Duration(ttlSecs+4) * time.Second)
+
+	var wg sync.WaitGroup
+	concurrentResolve := func(name string) {
+		defer wg.Done()
+		if _, err := resolve(rootCtx, naming.JoinAddressName(mtAddr, name)); verror.ErrorID(err) != naming.ErrNoSuchName.ID {
+			t.Errorf("resolve(%q) returned unexpected error: %v", name, err)
+		}
+	}
+
+	for i := 0; i < N; i++ {
+		wg.Add(2)
+		go concurrentResolve(name(i))
+		go concurrentResolve(name(i))
+	}
+	wg.Wait()
+
+	count, err := bt.CountRows(rootCtx)
+	if err != nil {
+		t.Errorf("bt.CountRows failed: %v", err)
+	}
+	if expected := 1; count != expected {
+		t.Errorf("Unexpected number of rows. Got %d, expected %d", count, expected)
+		bt.DumpTable(rootCtx)
+	}
+}
diff --git a/services/mounttable/btmtd/internal/dispatcher.go b/services/mounttable/btmtd/internal/dispatcher.go
new file mode 100644
index 0000000..3ac4168
--- /dev/null
+++ b/services/mounttable/btmtd/internal/dispatcher.go
@@ -0,0 +1,40 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"path"
+
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	v23mt "v.io/v23/services/mounttable"
+
+	"v.io/x/ref/lib/timekeeper"
+)
+
+var clock = timekeeper.RealTime()
+
+// For testing only.
+func SetClock(c timekeeper.TimeKeeper) {
+	clock = c
+}
+
+func NewDispatcher(bt *BigTable, globalAcl *access.AccessList) rpc.Dispatcher {
+	return &dispatcher{bt, globalAcl}
+}
+
+type dispatcher struct {
+	bt        *BigTable
+	globalAcl *access.AccessList
+}
+
+func (d *dispatcher) Lookup(ctx *context.T, suffix string) (interface{}, security.Authorizer, error) {
+	if suffix != "" {
+		suffix = path.Clean(suffix)
+	}
+	return v23mt.MountTableServer(&mounttable{suffix, d.globalAcl, d.bt}), security.AllowEveryone(), nil
+}
diff --git a/services/mounttable/btmtd/internal/mounttable.go b/services/mounttable/btmtd/internal/mounttable.go
new file mode 100644
index 0000000..c744974
--- /dev/null
+++ b/services/mounttable/btmtd/internal/mounttable.go
@@ -0,0 +1,425 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"math"
+	"math/rand"
+	"path"
+	"reflect"
+	"strings"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/glob"
+	"v.io/v23/naming"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	v23mt "v.io/v23/services/mounttable"
+	"v.io/v23/verror"
+)
+
+const pkgPath = "v.io/x/ref/services/mounttable/btmtd/internal"
+
+var (
+	errMalformedAddress = verror.Register(pkgPath+".errMalformedAddress", verror.NoRetry, "{1:}{2:} malformed address {3} for mounted server {4}{:_}")
+	errNotEmpty         = verror.Register(pkgPath+".errNotEmpty", verror.NoRetry, "{1:}{2:} cannot delete {3}: has children{:_}")
+	errConcurrentAccess = verror.Register(pkgPath+".errConcurrentAccess", verror.RetryRefetch, "{1:}{2:} concurrent access caused conflict{:_}")
+)
+
+func shouldAbort(ctx *context.T) error {
+	select {
+	case <-ctx.Done():
+		return verror.NewErrAborted(ctx)
+	default:
+		return nil
+	}
+}
+
+func op(ctx *context.T, f func() error) (err error) {
+	for {
+		if err = shouldAbort(ctx); err != nil {
+			return
+		}
+		if err = f(); verror.ErrorID(err) == errConcurrentAccess.ID {
+			ctx.Infof("Concurrent access conflict detected: %v", err)
+			time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
+			continue
+		}
+		return
+	}
+}
+
+type mounttable struct {
+	suffix    string
+	globalAcl *access.AccessList
+	bt        *BigTable
+}
+
+// Mount a server onto the name in the receiver.
+func (mt *mounttable) Mount(ctx *context.T, call rpc.ServerCall, server string, ttl uint32, flags naming.MountFlag) error {
+	ctx.Infof("%q.Mount(%q, %d, %v)", mt.suffix, server, ttl, flags)
+	if ttl == 0 {
+		ttl = math.MaxUint32
+	}
+	// Make sure the server address is reasonable.
+	ep := server
+	if naming.Rooted(server) {
+		ep, _ = naming.SplitAddressName(server)
+	}
+	if _, err := naming.ParseEndpoint(ep); err != nil {
+		return verror.New(errMalformedAddress, ctx, ep, server)
+	}
+
+	return op(ctx, func() error {
+		n, err := mt.accessNodeCreate(ctx, mt.suffix, call.Security())
+		if err != nil {
+			return err
+		}
+		if err := mt.authorize(ctx, call.Security(), n.permissions, v23mt.Mount, v23mt.Admin); err != nil {
+			return err
+		}
+		return n.mount(ctx, server, clock.Now().Add(time.Duration(ttl)*time.Second), flags)
+	})
+}
+
+// Unmount removes servers from the name in the receiver. If server is specified, only that
+// server is removed.
+func (mt *mounttable) Unmount(ctx *context.T, call rpc.ServerCall, server string) error {
+	ctx.Infof("%q.Unmount(%q)", mt.suffix, server)
+
+	return op(ctx, func() error {
+		n, err := mt.accessNodeRead(ctx, mt.suffix, call.Security())
+		if err != nil {
+			return err
+		}
+		if err := mt.authorize(ctx, call.Security(), n.permissions, v23mt.Mount, v23mt.Admin); err != nil {
+			return err
+		}
+		return n.unmount(ctx, server)
+	})
+}
+
+// Delete removes the receiver.  If the receiver has children, it will not
+// be removed unless deleteSubtree is true in which case the whole subtree is
+// removed.
+func (mt *mounttable) Delete(ctx *context.T, call rpc.ServerCall, deleteSubtree bool) error {
+	ctx.Infof("%q.Delete(%v)", mt.suffix, deleteSubtree)
+
+	if mt.suffix == "" {
+		return verror.New(verror.ErrNoAccess, ctx, "")
+	}
+
+	return op(ctx, func() error {
+		n, err := mt.accessNodeRead(ctx, mt.suffix, call.Security())
+		if verror.ErrorID(err) == naming.ErrNoSuchName.ID {
+			return nil
+		}
+		if err != nil {
+			return err
+		}
+		// Caller must have Admin access on either the node itself or
+		// its parent.
+		if err := mt.authorize(ctx, call.Security(), n.permissions, v23mt.Admin); err != nil {
+			parent, _ := path.Split(mt.suffix)
+			if p, _ := mt.accessNodeRead(ctx, parent, call.Security()); p == nil {
+				return err
+			} else if err2 := mt.authorize(ctx, call.Security(), p.permissions, v23mt.Admin); err2 != nil {
+				return err
+			}
+		}
+		return n.deleteAndGC(ctx, deleteSubtree)
+	})
+}
+
+// ResolveStep takes the next step in resolving a name.  Returns the next
+// servers to query and the suffix at those servers.
+func (mt *mounttable) ResolveStep(ctx *context.T, call rpc.ServerCall) (entry naming.MountEntry, err error) {
+	ctx.Infof("%q.ResolveStep()", mt.suffix)
+
+	n, remainder, err := mt.accessNodeWithServers(ctx, mt.suffix, call.Security())
+	if err != nil {
+		return entry, err
+	}
+	if n == nil {
+		entry.Name = mt.suffix
+		err = verror.New(naming.ErrNoSuchName, ctx, mt.suffix)
+		return
+	}
+	entry.Name = remainder
+	entry.Servers = make([]naming.MountedServer, len(n.servers))
+	for i, s := range n.servers {
+		entry.Servers[i] = s
+		entry.ServesMountTable = n.mountFlags.MT
+		entry.IsLeaf = n.mountFlags.Leaf
+	}
+	return entry, nil
+}
+
+// Glob__ finds matches in the namespace.  If we reach a mount point before
+// matching the whole pattern, return that mount point.
+func (mt *mounttable) Glob__(ctx *context.T, call rpc.GlobServerCall, g *glob.Glob) error {
+	ctx.Infof("%q.Glob__(%q)", mt.suffix, g)
+
+	n, remainder, err := mt.accessNodeWithServers(ctx, mt.suffix, call.Security())
+	if err != nil {
+		return err
+	}
+	if n != nil {
+		me := naming.MountEntry{
+			Name:             "",
+			Servers:          make([]naming.MountedServer, len(n.servers)),
+			ServesMountTable: n.mountFlags.MT,
+			IsLeaf:           n.mountFlags.Leaf,
+		}
+		for i, s := range n.servers {
+			me.Servers[i].Server = naming.Join(s.Server, remainder)
+			me.Servers[i].Deadline = s.Deadline
+		}
+		call.SendStream().Send(naming.GlobReplyEntry{Value: me})
+		return nil
+	}
+
+	type gState struct {
+		name string
+		g    *glob.Glob
+	}
+	queue := []gState{gState{"", g}}
+	for len(queue) != 0 {
+		if err := shouldAbort(ctx); err != nil {
+			return err
+		}
+		state := queue[0]
+		queue = queue[1:]
+
+		n, err := mt.accessNodeRead(ctx, naming.Join(mt.suffix, state.name), call.Security())
+		if err != nil {
+			ctx.VI(2).Infof("caller doesn't have access to %s", state.name)
+			continue
+		}
+		if n == nil {
+			continue
+		}
+		if len(n.servers) > 0 {
+			me := naming.MountEntry{
+				Name:             state.name,
+				Servers:          make([]naming.MountedServer, len(n.servers)),
+				ServesMountTable: n.mountFlags.MT,
+				IsLeaf:           n.mountFlags.Leaf,
+			}
+			for i, s := range n.servers {
+				me.Servers[i] = s
+			}
+			call.SendStream().Send(naming.GlobReplyEntry{Value: me})
+			continue
+		}
+		if state.g.Len() == 0 {
+			call.SendStream().Send(naming.GlobReplyEntry{Value: naming.MountEntry{Name: state.name}})
+		}
+		if state.g.Empty() {
+			continue
+		}
+		matcher, left := state.g.Head(), state.g.Tail()
+		for _, child := range n.children {
+			if matcher.Match(child) {
+				// TODO(rthellend): We should protect against large query results
+				// that could blow up memory usage.
+				queue = append(queue, gState{naming.Join(state.name, child), left})
+			}
+		}
+	}
+	return nil
+}
+
+// SetPermissions replaces the current Permissions for an object.  version
+// allows for optional, optimistic concurrency control.  If non-empty,
+// version's value must come from GetPermissions.  If any client has
+// successfully called SetPermissions in the meantime, the version will be
+// stale and SetPermissions will fail.  If empty, SetPermissions performs an
+// unconditional update.
+func (mt *mounttable) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+	ctx.Infof("%q.SetPermissions(%#v, %q)", mt.suffix, perms, version)
+
+	// If the user is trying to set permissions on a node such that the
+	// user will no longer have admin access, add the user as Admin.
+	if err := mt.authorize(ctx, call.Security(), perms, v23mt.Admin); err != nil {
+		blessings, _ := security.RemoteBlessingNames(ctx, call.Security())
+		for _, b := range blessings {
+			perms.Add(security.BlessingPattern(b), string(v23mt.Admin))
+		}
+	}
+	perms.Normalize()
+
+	return op(ctx, func() error {
+		n, err := mt.accessNodeCreate(ctx, mt.suffix, call.Security())
+		if err != nil {
+			return err
+		}
+		if err := mt.authorize(ctx, call.Security(), n.permissions, v23mt.Admin); err != nil {
+			return err
+		}
+		if version != "" && n.version != version {
+			return verror.NewErrBadVersion(ctx)
+		}
+		return n.setPermissions(ctx, perms)
+	})
+}
+
+// GetPermissions returns the complete, current Permissions for an object. The
+// returned version can be passed to a subsequent call to SetPermissions for
+// optimistic concurrency control. A successful call to SetPermissions will
+// invalidate version, and the client must call GetPermissions again to get
+// the current version.
+func (mt *mounttable) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+	ctx.Infof("%q.GetPermissions()", mt.suffix)
+
+	var n *mtNode
+	if n, err = mt.accessNodeRead(ctx, mt.suffix, call.Security()); err != nil {
+		return
+	}
+	return n.permissions, n.version, nil
+}
+
+func (mt *mounttable) authorize(ctx *context.T, call security.Call, perms access.Permissions, tags ...v23mt.Tag) error {
+	if l, r := call.LocalBlessings().PublicKey(), call.RemoteBlessings().PublicKey(); l != nil && r != nil && reflect.DeepEqual(l, r) {
+		return nil
+	}
+	blessings, invalid := security.RemoteBlessingNames(ctx, call)
+	for _, tag := range tags {
+		if acl, exists := perms[string(tag)]; exists && acl.Includes(blessings...) {
+			return nil
+		}
+	}
+	if mt.globalAcl != nil {
+		if mt.globalAcl.Includes(blessings...) {
+			return nil
+		}
+	}
+	return access.NewErrNoPermissions(ctx, blessings, invalid, string(tags[0]))
+}
+
+// accessNode returns the requested mounttable node after verifying that the
+// caller is authorized to access it. Missing nodes are created if 'create' is
+// true and the caller has 'Create' permission on the parent.
+//
+// If withServers is true, returns the first node within 'name' that has
+// mounted servers, and the remaining part is returned as a string.
+//
+// Example:
+//   a -> b -> c
+// If b has a mounted server,
+//  accessNode(.., "a/b", false, true, ..) returns (b, "")
+//  accessNode(.., "a/b/c", false, true, ..) returns (b, "c")
+//  accessNode(.., "a/b", false, false, ..) returns (b, "")
+//  accessNode(.., "a/b/c", false, false, ..) returns (c, "")
+func (mt *mounttable) accessNode(ctx *context.T, name string, create, withServers bool, call security.Call) (*mtNode, string, error) {
+start:
+	var (
+		n, parent *mtNode
+		current   string
+		elems     = append([]string{""}, strings.Split(name, "/")...)
+	)
+
+	for len(elems) > 0 {
+		if err := shouldAbort(ctx); err != nil {
+			return nil, "", err
+		}
+
+		elem := elems[0]
+		elems = elems[1:]
+		current = naming.Join(current, elem)
+
+		var err error
+		n, err = getNode(ctx, mt.bt, current)
+
+		switch {
+		case err != nil:
+			return nil, "", err
+		case n == nil:
+			if parent == nil {
+				return nil, "", verror.New(naming.ErrNoSuchNameRoot, ctx, current)
+			}
+			if !create {
+				return nil, "", verror.New(naming.ErrNoSuchName, ctx, current)
+			}
+			if err := mt.authorize(ctx, call, parent.permissions, v23mt.Create, v23mt.Admin); err != nil {
+				return nil, "", err
+			}
+			var perms access.Permissions
+			if perms = templatePermissions(parent.permissions, elem); perms == nil {
+				perms = parent.permissions.Copy()
+				if current == name {
+					names, _ := security.RemoteBlessingNames(ctx, call)
+					for _, n := range names {
+						perms.Add(security.BlessingPattern(n), string(v23mt.Admin))
+					}
+				}
+			}
+			perms.Normalize()
+			n, err = parent.createChild(ctx, elem, perms)
+			if err != nil {
+				return nil, "", err
+			}
+		default:
+			deleted, err := n.gc(ctx)
+			if deleted || verror.ErrorID(err) == errConcurrentAccess.ID {
+				goto start
+			}
+			if err != nil {
+				return nil, "", err
+			}
+		}
+		if err := mt.authorize(ctx, call, n.permissions, v23mt.Resolve, v23mt.Read, v23mt.Admin); err != nil {
+			return nil, "", err
+		}
+		if withServers && len(n.servers) > 0 {
+			return n, naming.Join(elems...), nil
+		}
+		parent = n
+	}
+	if withServers {
+		return nil, "", nil
+	}
+	return n, "", nil
+}
+
+func (mt *mounttable) accessNodeRead(ctx *context.T, name string, call security.Call) (*mtNode, error) {
+	n, _, err := mt.accessNode(ctx, name, false, false, call)
+	return n, err
+}
+
+func (mt *mounttable) accessNodeCreate(ctx *context.T, name string, call security.Call) (*mtNode, error) {
+	n, _, err := mt.accessNode(ctx, name, true, false, call)
+	return n, err
+}
+
+func (mt *mounttable) accessNodeWithServers(ctx *context.T, name string, call security.Call) (*mtNode, string, error) {
+	return mt.accessNode(ctx, name, false, true, call)
+}
+
+func templatePermissions(perms access.Permissions, child string) access.Permissions {
+	var templatePerms access.Permissions
+	for tag, acl := range perms {
+		if !strings.HasPrefix(tag, "%%/") {
+			continue
+		}
+		tmpAcl := access.AccessList{
+			In:    make([]security.BlessingPattern, len(acl.In)),
+			NotIn: make([]string, len(acl.NotIn)),
+		}
+		for i, b := range acl.In {
+			tmpAcl.In[i] = security.BlessingPattern(strings.Replace(string(b), "%%", child, -1))
+		}
+		for i, b := range acl.NotIn {
+			tmpAcl.NotIn[i] = strings.Replace(b, "%%", child, -1)
+		}
+		if templatePerms == nil {
+			templatePerms = make(access.Permissions)
+		}
+		templatePerms[strings.TrimPrefix(tag, "%%/")] = tmpAcl
+	}
+	return templatePerms
+}
diff --git a/services/mounttable/btmtd/internal/mounttable_test.go b/services/mounttable/btmtd/internal/mounttable_test.go
new file mode 100644
index 0000000..e591686
--- /dev/null
+++ b/services/mounttable/btmtd/internal/mounttable_test.go
@@ -0,0 +1,764 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal_test
+
+import (
+	"errors"
+	"io"
+	"reflect"
+	"runtime/debug"
+	"sort"
+	"testing"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/options"
+	"v.io/v23/rpc"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	_ "v.io/x/ref/runtime/factories/roaming"
+	"v.io/x/ref/services/debug/debuglib"
+	"v.io/x/ref/services/mounttable/btmtd/internal"
+	"v.io/x/ref/test"
+	"v.io/x/ref/test/testutil"
+	"v.io/x/ref/test/timekeeper"
+)
+
+// Simulate different processes with different runtimes.
+// rootCtx is the one running the mounttable service.
+const ttlSecs = 60 * 60
+
+func boom(t *testing.T, f string, v ...interface{}) {
+	t.Logf(f, v...)
+	t.Fatal(string(debug.Stack()))
+}
+
+func doMount(t *testing.T, ctx *context.T, ep, suffix, service string, shouldSucceed bool) {
+	name := naming.JoinAddressName(ep, suffix)
+	client := v23.GetClient(ctx)
+	if err := client.Call(ctx, name, "Mount", []interface{}{service, uint32(ttlSecs), 0}, nil, options.Preresolved{}); err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Mount %s onto %s: %s", service, name, err)
+	}
+}
+
+func doUnmount(t *testing.T, ctx *context.T, ep, suffix, service string, shouldSucceed bool) {
+	name := naming.JoinAddressName(ep, suffix)
+	client := v23.GetClient(ctx)
+	if err := client.Call(ctx, name, "Unmount", []interface{}{service}, nil, options.Preresolved{}); err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Unmount %s off of %s: %s", service, name, err)
+	}
+}
+
+func doGetPermissions(t *testing.T, ctx *context.T, ep, suffix string, shouldSucceed bool) (perms access.Permissions, version string) {
+	name := naming.JoinAddressName(ep, suffix)
+	client := v23.GetClient(ctx)
+	if err := client.Call(ctx, name, "GetPermissions", nil, []interface{}{&perms, &version}, options.Preresolved{}); err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to GetPermissions %s: %s", name, err)
+	}
+	return
+}
+
+func doSetPermissions(t *testing.T, ctx *context.T, ep, suffix string, perms access.Permissions, version string, shouldSucceed bool) {
+	name := naming.JoinAddressName(ep, suffix)
+	client := v23.GetClient(ctx)
+	if err := client.Call(ctx, name, "SetPermissions", []interface{}{perms, version}, nil, options.Preresolved{}); err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to SetPermissions %s: %s", name, err)
+	}
+}
+
+func doDeleteNode(t *testing.T, ctx *context.T, ep, suffix string, shouldSucceed bool) {
+	name := naming.JoinAddressName(ep, suffix)
+	client := v23.GetClient(ctx)
+	if err := client.Call(ctx, name, "Delete", []interface{}{false}, nil, options.Preresolved{}); err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete node %s: %s", name, err)
+	}
+}
+
+func doDeleteSubtree(t *testing.T, ctx *context.T, ep, suffix string, shouldSucceed bool) {
+	name := naming.JoinAddressName(ep, suffix)
+	client := v23.GetClient(ctx)
+	if err := client.Call(ctx, name, "Delete", []interface{}{true}, nil, options.Preresolved{}); err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Delete subtree %s: %s", name, err)
+	}
+}
+
+func mountentry2names(e *naming.MountEntry) []string {
+	names := make([]string, len(e.Servers))
+	for idx, s := range e.Servers {
+		names[idx] = naming.JoinAddressName(s.Server, e.Name)
+	}
+	return names
+}
+
+func resolve(ctx *context.T, name string) (*naming.MountEntry, error) {
+	// Resolve the name one level.
+	var entry naming.MountEntry
+	client := v23.GetClient(ctx)
+	if err := client.Call(ctx, name, "ResolveStep", nil, []interface{}{&entry}, options.Preresolved{}); err != nil {
+		return nil, err
+	}
+	if len(entry.Servers) < 1 {
+		return nil, errors.New("resolve returned no servers")
+	}
+	return &entry, nil
+}
+
+func export(t *testing.T, ctx *context.T, name, contents string) {
+	// Resolve the name.
+	resolved, err := resolve(ctx, name)
+	if err != nil {
+		boom(t, "Failed to Export.Resolve %s: %s", name, err)
+	}
+	// Export the value.
+	client := v23.GetClient(ctx)
+	if err := client.Call(ctx, mountentry2names(resolved)[0], "Export", []interface{}{contents, true}, nil, options.Preresolved{resolved}); err != nil {
+		boom(t, "Failed to Export.Call %s to %s: %s", name, contents, err)
+	}
+}
+
+func checkContents(t *testing.T, ctx *context.T, name, expected string, shouldSucceed bool) {
+	// Resolve the name.
+	resolved, err := resolve(ctx, name)
+	if err != nil {
+		if !shouldSucceed {
+			return
+		}
+		boom(t, "Failed to Resolve %s: %s", name, err)
+	}
+	// Look up the value.
+	client := v23.GetClient(ctx)
+	call, err := client.StartCall(ctx, mountentry2names(resolved)[0], "Lookup", nil, options.Preresolved{resolved})
+	if err != nil {
+		if shouldSucceed {
+			boom(t, "Failed Lookup.StartCall %s: %s", name, err)
+		}
+		return
+	}
+	var contents []byte
+	if err := call.Finish(&contents); err != nil {
+		if shouldSucceed {
+			boom(t, "Failed to Lookup %s: %s", name, err)
+		}
+		return
+	}
+	if string(contents) != expected {
+		boom(t, "Lookup %s, expected %q, got %q", name, expected, contents)
+	}
+	if !shouldSucceed {
+		boom(t, "Lookup %s, expected failure, got %q", name, contents)
+	}
+}
+
+func newMT(t *testing.T, permsFile string, rootCtx *context.T) (func(), string, *internal.BigTable, timekeeper.ManualTime) {
+	reservedDisp := debuglib.NewDispatcher(nil)
+	ctx := v23.WithReservedNameDispatcher(rootCtx, reservedDisp)
+
+	bt, shutdownBT, err := internal.NewTestBigTable("mounttable")
+	if err != nil {
+		boom(t, "NewTestBigTable: %s", err)
+	}
+	if err := bt.SetupTable(ctx, permsFile); err != nil {
+		boom(t, "bt.SetupTable: %s", err)
+	}
+
+	// Set clock slightly in the future to avoid triggering bigtable's GC.
+	clock := timekeeper.NewManualTime()
+	now := time.Now().UTC().Add(time.Hour)
+	for {
+		delta := now.Sub(clock.Now())
+		if delta < time.Second {
+			break
+		}
+		clock.AdvanceTime(delta)
+	}
+	// Add mount table service.
+	internal.SetClock(clock)
+	mt := internal.NewDispatcher(bt, nil)
+
+	// Start serving on a loopback address.
+	ctx, cancel := context.WithCancel(ctx)
+	ctx, server, err := v23.WithNewDispatchingServer(ctx, "", mt, options.ServesMountTable(true))
+	if err != nil {
+		boom(t, "r.NewServer: %s", err)
+	}
+
+	estr := server.Status().Endpoints[0].String()
+	t.Logf("endpoint %s", estr)
+	return func() {
+		cancel()
+		<-server.Closed()
+		shutdownBT()
+	}, estr, bt, clock
+}
+
+func newCollection(t *testing.T, rootCtx *context.T) (func(), string) {
+	// Start serving a collection service on a loopback address.  This
+	// is just a service we can mount and test against.
+	ctx, cancel := context.WithCancel(rootCtx)
+	_, server, err := v23.WithNewDispatchingServer(ctx, "collection", newCollectionServer())
+	if err != nil {
+		boom(t, "r.NewServer: %s", err)
+	}
+	estr := server.Status().Endpoints[0].String()
+	t.Logf("endpoint %s", estr)
+	return func() {
+		cancel()
+		<-server.Closed()
+	}, estr
+}
+
+func TestMountTable(t *testing.T) {
+	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
+	defer shutdown()
+
+	stop, mtAddr, _, clock := newMT(t, "testdata/test.perms", rootCtx)
+	defer stop()
+	stop, collectionAddr := newCollection(t, rootCtx)
+	defer stop()
+
+	collectionName := naming.JoinAddressName(collectionAddr, "collection")
+
+	// Mount the collection server into the mount table.
+	rootCtx.Infof("Mount the collection server into the mount table.")
+	doMount(t, rootCtx, mtAddr, "stuff", collectionName, true)
+
+	// Create a few objects and make sure we can read them.
+	rootCtx.Infof("Create a few objects.")
+	export(t, rootCtx, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain")
+	export(t, rootCtx, naming.JoinAddressName(mtAddr, "stuff/in/spain"), "in spain")
+	export(t, rootCtx, naming.JoinAddressName(mtAddr, "stuff/falls"), "falls mainly on the plain")
+	rootCtx.Infof("Make sure we can read them.")
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "stuff/in/spain"), "in spain", true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "stuff/falls"), "falls mainly on the plain", true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "/stuff/falls"), "falls mainly on the plain", true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "stuff/nonexistant"), "falls mainly on the plain", false)
+	checkContents(t, bobCtx, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", true)
+	checkContents(t, aliceCtx, naming.JoinAddressName(mtAddr, "stuff/the/rain"), "the rain", false)
+
+	// Test name element too long.
+	rootCtx.Infof("Name element too long.")
+	doMount(t, rootCtx, mtAddr, "a/abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnopqrstuvwxyz0123456789abcdefghijklmnop", collectionName, false)
+
+	// Test multiple mounts.
+	rootCtx.Infof("Multiple mounts.")
+	doMount(t, rootCtx, mtAddr, "a/b", collectionName, true)
+	doMount(t, rootCtx, mtAddr, "x/y", collectionName, true)
+	doMount(t, rootCtx, mtAddr, "alpha//beta", collectionName, true)
+	rootCtx.Infof("Make sure we can read them.")
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "stuff/falls"), "falls mainly on the plain", true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "x/y/falls"), "falls mainly on the plain", true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "alpha/beta/falls"), "falls mainly on the plain", true)
+	checkContents(t, aliceCtx, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", true)
+	checkContents(t, bobCtx, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", false)
+
+	// Test getting/setting AccessLists.
+	perms, version := doGetPermissions(t, rootCtx, mtAddr, "stuff", true)
+	doSetPermissions(t, rootCtx, mtAddr, "stuff", perms, "xyzzy", false) // bad version
+	doSetPermissions(t, rootCtx, mtAddr, "stuff", perms, version, true)  // correct version
+	_, nversion := doGetPermissions(t, rootCtx, mtAddr, "stuff", true)
+	if nversion == version {
+		boom(t, "version didn't change after SetPermissions: %s", nversion)
+	}
+	doSetPermissions(t, rootCtx, mtAddr, "stuff", perms, "", true) // no version
+
+	// Bob should be able to create nodes under the mounttable root but not alice.
+	doSetPermissions(t, aliceCtx, mtAddr, "onlybob", perms, "", false)
+	doSetPermissions(t, bobCtx, mtAddr, "onlybob", perms, "", true)
+
+	// Test that setting Permissions to permissions that don't include the the setter's
+	// blessings in Admin, automatically add their Blessings to Admin to prevent
+	// locking everyone out.
+	perms, _ = doGetPermissions(t, bobCtx, mtAddr, "onlybob", true)
+	noRootPerms := perms.Copy()
+	noRootPerms.Clear("bob", "Admin")
+	doSetPermissions(t, bobCtx, mtAddr, "onlybob", noRootPerms, "", true)
+	// This should succeed, because "bob" should automatically be added to "Admin"
+	// even though he cleared himself from "Admin".
+	doSetPermissions(t, bobCtx, mtAddr, "onlybob", perms, "", true)
+	// Test that adding a non-standard perms is normalized when retrieved.
+	admin := perms["Admin"]
+	admin.In = []security.BlessingPattern{"bob", "bob"}
+	perms["Admin"] = admin
+	doSetPermissions(t, bobCtx, mtAddr, "onlybob", perms, "", true)
+	perms, _ = doGetPermissions(t, bobCtx, mtAddr, "onlybob", true)
+	if got, want := perms["Admin"].In, []security.BlessingPattern{"bob"}; !reflect.DeepEqual(got, want) {
+		boom(t, "got %v, want %v", got, want)
+	}
+
+	// Test generic unmount.
+	rootCtx.Info("Test generic unmount.")
+	doUnmount(t, rootCtx, mtAddr, "a/b", "", true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", false)
+
+	// Test specific unmount.
+	rootCtx.Info("Test specific unmount.")
+	doMount(t, rootCtx, mtAddr, "a/b", collectionName, true)
+	doUnmount(t, rootCtx, mtAddr, "a/b", collectionName, true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "a/b/falls"), "falls mainly on the plain", false)
+
+	// Try timing out a mount.
+	rootCtx.Info("Try timing out a mount.")
+	doMount(t, rootCtx, mtAddr, "stuffWithTTL", collectionName, true)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "stuffWithTTL/the/rain"), "the rain", true)
+	clock.AdvanceTime(time.Duration(ttlSecs+4) * time.Second)
+	checkContents(t, rootCtx, naming.JoinAddressName(mtAddr, "stuffWithTTL/the/rain"), "the rain", false)
+
+	// Test unauthorized mount.
+	rootCtx.Info("Test unauthorized mount.")
+	doMount(t, bobCtx, mtAddr, "/a/b", collectionName, false)
+	doMount(t, aliceCtx, mtAddr, "/a/b", collectionName, false)
+
+	doUnmount(t, bobCtx, mtAddr, "x/y", collectionName, false)
+}
+
+func doGlobX(t *testing.T, ctx *context.T, ep, suffix, pattern string, joinServer bool) []string {
+	name := naming.JoinAddressName(ep, suffix)
+	client := v23.GetClient(ctx)
+	call, err := client.StartCall(ctx, name, rpc.GlobMethod, []interface{}{pattern}, options.Preresolved{})
+	if err != nil {
+		boom(t, "Glob.StartCall %s %s: %s", name, pattern, err)
+	}
+	var reply []string
+	for {
+		var gr naming.GlobReply
+		err := call.Recv(&gr)
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			boom(t, "Glob.StartCall %s: %s", name, pattern, err)
+		}
+		switch v := gr.(type) {
+		case naming.GlobReplyEntry:
+			if joinServer && len(v.Value.Servers) > 0 {
+				reply = append(reply, naming.JoinAddressName(v.Value.Servers[0].Server, v.Value.Name))
+			} else {
+				reply = append(reply, v.Value.Name)
+			}
+		}
+	}
+	if err := call.Finish(); err != nil {
+		boom(t, "Glob.Finish %s: %s", name, pattern, err)
+	}
+	return reply
+}
+
+func doGlob(t *testing.T, ctx *context.T, ep, suffix, pattern string) []string {
+	return doGlobX(t, ctx, ep, suffix, pattern, false)
+}
+
+// checkMatch verified that the two slices contain the same string items, albeit
+// not necessarily in the same order.  Item repetitions are allowed, but their
+// numbers need to match as well.
+func checkMatch(t *testing.T, want []string, got []string) {
+	if len(want) == 0 && len(got) == 0 {
+		return
+	}
+	w := sort.StringSlice(want)
+	w.Sort()
+	g := sort.StringSlice(got)
+	g.Sort()
+	if !reflect.DeepEqual(w, g) {
+		boom(t, "Glob expected %v got %v", want, got)
+	}
+}
+
+// checkExists makes sure a name exists (or not).
+func checkExists(t *testing.T, ctx *context.T, ep, suffix string, shouldSucceed bool) {
+	x := doGlobX(t, ctx, ep, "", suffix, false)
+	if len(x) != 1 || x[0] != suffix {
+		if shouldSucceed {
+			boom(t, "Failed to find %s", suffix)
+		}
+		return
+	}
+	if !shouldSucceed {
+		boom(t, "%s exists but shouldn't", suffix)
+	}
+}
+
+func TestGlob(t *testing.T) {
+	rootCtx, shutdown := test.V23InitWithMounttable()
+	defer shutdown()
+
+	stop, estr, _, _ := newMT(t, "", rootCtx)
+	defer stop()
+
+	// set up a mount space
+	fakeServer := naming.JoinAddressName(estr, "quux")
+	doMount(t, rootCtx, estr, "one/bright/day", fakeServer, true)
+	doMount(t, rootCtx, estr, "in/the/middle", fakeServer, true)
+	doMount(t, rootCtx, estr, "of/the/night", fakeServer, true)
+
+	// Try various globs.
+	tests := []struct {
+		in       string
+		expected []string
+	}{
+		{"*", []string{"one", "in", "of"}},
+		{"...", []string{"", "one", "in", "of", "one/bright", "in/the", "of/the", "one/bright/day", "in/the/middle", "of/the/night"}},
+		{"*/...", []string{"one", "in", "of", "one/bright", "in/the", "of/the", "one/bright/day", "in/the/middle", "of/the/night"}},
+		{"one/...", []string{"one", "one/bright", "one/bright/day"}},
+		{"of/the/night/two/dead/boys", []string{"of/the/night"}},
+		{"*/the", []string{"in/the", "of/the"}},
+		{"*/the/...", []string{"in/the", "of/the", "in/the/middle", "of/the/night"}},
+		{"o*", []string{"one", "of"}},
+		{"", []string{""}},
+	}
+	for _, test := range tests {
+		out := doGlob(t, rootCtx, estr, "", test.in)
+		checkMatch(t, test.expected, out)
+	}
+
+	// Test Glob on a name that is under a mounted server. The result should the
+	// the address the mounted server with the extra suffix.
+	{
+		results := doGlobX(t, rootCtx, estr, "of/the/night/two/dead/boys/got/up/to/fight", "*", true)
+		if len(results) != 1 {
+			boom(t, "Unexpected number of results. Got %v, want 1", len(results))
+		}
+		_, suffix := naming.SplitAddressName(results[0])
+		if expected := "quux/two/dead/boys/got/up/to/fight"; suffix != expected {
+			boom(t, "Unexpected suffix. Got %v, want %v", suffix, expected)
+		}
+	}
+}
+
+type fakeServerCall struct {
+	sendCount int
+}
+
+func (fakeServerCall) Security() security.Call              { return security.NewCall(&security.CallParams{}) }
+func (fakeServerCall) Suffix() string                       { return "" }
+func (fakeServerCall) LocalEndpoint() naming.Endpoint       { return naming.Endpoint{} }
+func (fakeServerCall) RemoteEndpoint() naming.Endpoint      { return naming.Endpoint{} }
+func (fakeServerCall) GrantedBlessings() security.Blessings { return security.Blessings{} }
+func (fakeServerCall) Server() rpc.Server                   { return nil }
+func (c *fakeServerCall) SendStream() interface {
+	Send(naming.GlobReply) error
+} {
+	return c
+}
+func (c *fakeServerCall) Send(reply naming.GlobReply) error {
+	c.sendCount++
+	return nil
+}
+
+/*
+func TestGlobAborts(t *testing.T) {
+	ctx, shutdown := test.V23InitWithMounttable()
+	defer shutdown()
+
+	mt, err := mounttablelib.NewMountTableDispatcher(ctx, "", "", "")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	mount := func(name string) error {
+		invoker, _, _ := mt.Lookup(ctx, name)
+		server := naming.FormatEndpoint("tcp", name)
+		return invoker.(mounttable.MountTableServerStub).Mount(ctx, fakeServerCall{}, server, 0, 0)
+	}
+	// Mount 125 entries: 5 "directories" with 25 entries each.
+	for i := 0; i < 5; i++ {
+		for j := 0; j < 25; j++ {
+			if err := mount(fmt.Sprintf("%d/%d", i, j)); err != nil {
+				t.Fatalf("%v (%d, %d)", err, i, j)
+			}
+		}
+	}
+
+	glob := func(ctx *context.T) (int, error) {
+		root, _, _ := mt.Lookup(ctx, "")
+		g, _ := glob.Parse("...")
+		fCall := &fakeServerCall{}
+		root.(rpc.Globber).Globber().AllGlobber.Glob__(ctx, fCall, g)
+		return fCall.sendCount, nil
+	}
+
+	got, err := glob(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if want := 5 + 125 + 1; got != want { // 5 "directories", 125 entries, 1 root entry
+		t.Errorf("Got %d want %d", got, want)
+	}
+	canceled, cancel := context.WithCancel(ctx)
+	cancel()
+	if got, err = glob(canceled); err != nil {
+		t.Fatal(err)
+	}
+	if got != 0 {
+		t.Errorf("Glob returned entries even though the context was cancelled first (returned %d)", got)
+	}
+}
+*/
+
+func TestAccessListTemplate(t *testing.T) {
+	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
+	defer shutdown()
+
+	stop, estr, _, _ := newMT(t, "testdata/test.perms", rootCtx)
+	defer stop()
+	fakeServer := naming.JoinAddressName(estr, "quux")
+
+	// Noone should be able to mount on someone else's names.
+	doMount(t, aliceCtx, estr, "users/ted", fakeServer, false)
+	doMount(t, bobCtx, estr, "users/carol", fakeServer, false)
+	doMount(t, rootCtx, estr, "users/george", fakeServer, false)
+
+	// Anyone should be able to mount on their own names.
+	doMount(t, aliceCtx, estr, "users/alice", fakeServer, true)
+	doMount(t, bobCtx, estr, "users/bob", fakeServer, true)
+	doMount(t, rootCtx, estr, "users/root", fakeServer, true)
+
+	// Make sure the counter works.
+	doUnmount(t, aliceCtx, estr, "users/alice", "", true)
+	doUnmount(t, bobCtx, estr, "users/bob", "", true)
+	doUnmount(t, rootCtx, estr, "users/root", "", true)
+	perms := access.Permissions{"Admin": access.AccessList{In: []security.BlessingPattern{security.AllPrincipals}}}
+	doSetPermissions(t, aliceCtx, estr, "users/alice/a/b/c/d", perms, "", true)
+	doSetPermissions(t, aliceCtx, estr, "users/alice/a/b/c/d", perms, "", true)
+
+	/*
+		// Do we obey limits?
+		for i := 0; i < mounttablelib.DefaultMaxNodesPerUser()-5; i++ {
+			node := fmt.Sprintf("users/alice/a/b/c/d/%d", i)
+			doSetPermissions(t, aliceCtx, estr, node, perms, "", true)
+		}
+		doSetPermissions(t, aliceCtx, estr, "users/alice/a/b/c/d/straw", perms, "", false)
+
+		// See if the stats numbers are correct.
+		testcases := []struct {
+			key      string
+			expected interface{}
+		}{
+			{"alice", int64(mounttablelib.DefaultMaxNodesPerUser())},
+			{"bob", int64(0)},
+			{"root", int64(0)},
+			{conventions.ServerUser, int64(3)},
+		}
+		for _, tc := range testcases {
+			name := "testAccessListTemplate/num-nodes-per-user/" + tc.key
+			got, err := libstats.Value(name)
+			if err != nil {
+				t.Errorf("unexpected error getting map entry for %s: %s", name, err)
+			}
+			if got != tc.expected {
+				t.Errorf("unexpected getting map entry for %s. Got %v, want %v", name, got, tc.expected)
+			}
+		}
+	*/
+}
+
+func TestGlobAccessLists(t *testing.T) {
+	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
+	defer shutdown()
+
+	stop, estr, _, _ := newMT(t, "testdata/test.perms", rootCtx)
+	defer stop()
+
+	// set up a mount space
+	fakeServer := naming.JoinAddressName(estr, "quux")
+	doMount(t, aliceCtx, estr, "one/bright/day", fakeServer, false) // Fails because alice can't mount there.
+	doMount(t, bobCtx, estr, "one/bright/day", fakeServer, true)
+	doMount(t, rootCtx, estr, "a/b/c", fakeServer, true)
+
+	// Try various globs.
+	tests := []struct {
+		ctx      *context.T
+		in       string
+		expected []string
+	}{
+		{rootCtx, "*", []string{"one", "a", "stuff", "users"}},
+		{aliceCtx, "*", []string{"one", "a", "users"}},
+		{bobCtx, "*", []string{"one", "stuff", "users"}},
+		// bob, alice, and root have different visibility to the space.
+		{rootCtx, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c", "stuff", "users"}},
+		{aliceCtx, "*/...", []string{"one", "a", "one/bright", "a/b", "one/bright/day", "a/b/c", "users"}},
+		{bobCtx, "*/...", []string{"one", "one/bright", "one/bright/day", "stuff", "users"}},
+	}
+	for _, test := range tests {
+		out := doGlob(t, test.ctx, estr, "", test.in)
+		checkMatch(t, test.expected, out)
+	}
+}
+
+func TestCleanup(t *testing.T) {
+	rootCtx, shutdown := test.V23InitWithMounttable()
+	defer shutdown()
+
+	stop, estr, _, _ := newMT(t, "", rootCtx)
+	defer stop()
+
+	// Set up one mount.
+	fakeServer := naming.JoinAddressName(estr, "quux")
+	doMount(t, rootCtx, estr, "one/bright/day", fakeServer, true)
+	checkMatch(t, []string{"one", "one/bright", "one/bright/day"}, doGlob(t, rootCtx, estr, "", "*/..."))
+
+	// After the unmount nothing should be left
+	doUnmount(t, rootCtx, estr, "one/bright/day", "", true)
+	checkMatch(t, nil, doGlob(t, rootCtx, estr, "", "one"))
+	checkMatch(t, nil, doGlob(t, rootCtx, estr, "", "*/..."))
+
+	// Set up a mount, then set the AccessList.
+	doMount(t, rootCtx, estr, "one/bright/day", fakeServer, true)
+	checkMatch(t, []string{"one", "one/bright", "one/bright/day"}, doGlob(t, rootCtx, estr, "", "*/..."))
+	perms := access.Permissions{"Read": access.AccessList{In: []security.BlessingPattern{security.AllPrincipals}}}
+	doSetPermissions(t, rootCtx, estr, "one/bright", perms, "", true)
+
+	// After the unmount we should still have everything above the AccessList.
+	doUnmount(t, rootCtx, estr, "one/bright/day", "", true)
+	checkMatch(t, []string{"one", "one/bright"}, doGlob(t, rootCtx, estr, "", "*/..."))
+}
+
+func TestDelete(t *testing.T) {
+	rootCtx, aliceCtx, bobCtx, shutdown := initTest()
+	defer shutdown()
+
+	stop, estr, _, _ := newMT(t, "testdata/test.perms", rootCtx)
+	defer stop()
+
+	// set up a mount space
+	fakeServer := naming.JoinAddressName(estr, "quux")
+	doMount(t, bobCtx, estr, "one/bright/day", fakeServer, true)
+	doMount(t, rootCtx, estr, "a/b/c", fakeServer, true)
+
+	// It shouldn't be possible to delete anything with children unless explicitly requested.
+	doDeleteNode(t, rootCtx, estr, "a/b", false)
+	checkExists(t, rootCtx, estr, "a/b", true)
+	doDeleteSubtree(t, rootCtx, estr, "a/b", true)
+	checkExists(t, rootCtx, estr, "a/b", false)
+
+	// Alice shouldn't be able to delete what bob created but bob and root should.
+	doDeleteNode(t, aliceCtx, estr, "one/bright/day", false)
+	checkExists(t, rootCtx, estr, "one/bright/day", true)
+	doDeleteNode(t, rootCtx, estr, "one/bright/day", true)
+	checkExists(t, rootCtx, estr, "one/bright/day", false)
+	doDeleteNode(t, bobCtx, estr, "one/bright", true)
+	checkExists(t, rootCtx, estr, "one/bright", false)
+
+	// Make sure directory admin can delete directory children.
+	perms := access.Permissions{"Admin": access.AccessList{In: []security.BlessingPattern{"bob"}}}
+	doSetPermissions(t, bobCtx, estr, "hoohaa", perms, "", false)
+	doDeleteNode(t, rootCtx, estr, "hoohaa", true)
+	checkExists(t, rootCtx, estr, "hoohaa", false)
+}
+
+func TestServerFormat(t *testing.T) {
+	rootCtx, shutdown := test.V23InitWithMounttable()
+	defer shutdown()
+
+	stop, estr, _, _ := newMT(t, "", rootCtx)
+	defer stop()
+
+	doMount(t, rootCtx, estr, "endpoint", naming.JoinAddressName(estr, "life/on/the/mississippi"), true)
+	doMount(t, rootCtx, estr, "hostport", "/atrampabroad:8000", true)
+	doMount(t, rootCtx, estr, "invalid/not/rooted", "atrampabroad:8000", false)
+	doMount(t, rootCtx, estr, "invalid/no/port", "/atrampabroad", false)
+	doMount(t, rootCtx, estr, "invalid/endpoint", "/@following the equator:8000@@@", false)
+}
+
+func TestExpiry(t *testing.T) {
+	rootCtx, shutdown := test.V23InitWithMounttable()
+	defer shutdown()
+
+	stop, estr, _, clock := newMT(t, "", rootCtx)
+	defer stop()
+	stop, collectionAddr := newCollection(t, rootCtx)
+	defer stop()
+
+	collectionName := naming.JoinAddressName(collectionAddr, "collection")
+
+	doMount(t, rootCtx, estr, "a1/b1", collectionName, true)
+	doMount(t, rootCtx, estr, "a1/b2", collectionName, true)
+	doMount(t, rootCtx, estr, "a2/b1", collectionName, true)
+	doMount(t, rootCtx, estr, "a2/b2/c", collectionName, true)
+
+	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, rootCtx, estr, "", "*/b1/..."))
+	clock.AdvanceTime(time.Duration(ttlSecs/2) * time.Second)
+	checkMatch(t, []string{"a1/b1", "a2/b1"}, doGlob(t, rootCtx, estr, "", "*/b1/..."))
+	checkMatch(t, []string{"c"}, doGlob(t, rootCtx, estr, "a2/b2", "*"))
+	// Refresh only a1/b1.  All the other mounts will expire upon the next
+	// ft advance.
+	doMount(t, rootCtx, estr, "a1/b1", collectionName, true)
+	clock.AdvanceTime(time.Duration(ttlSecs/2+4) * time.Second)
+	resolve(rootCtx, naming.JoinAddressName(estr, "a2/b2/c"))
+	resolve(rootCtx, naming.JoinAddressName(estr, "a2/b1"))
+	checkMatch(t, []string{"a1", "a1/b1"}, doGlob(t, rootCtx, estr, "", "*/..."))
+	checkMatch(t, []string{"a1/b1"}, doGlob(t, rootCtx, estr, "", "*/b1/..."))
+}
+
+func TestIntermediateNodesCreatedFromConfig(t *testing.T) {
+	rootCtx, _, _, shutdown := initTest()
+	defer shutdown()
+
+	stop, estr, _, _ := newMT(t, "testdata/intermediate.perms", rootCtx)
+	defer stop()
+
+	// x and x/y should have the same permissions at the root.
+	rootPerms, _ := doGetPermissions(t, rootCtx, estr, "", true)
+	if perms, _ := doGetPermissions(t, rootCtx, estr, "x", true); !reflect.DeepEqual(rootPerms, perms) {
+		boom(t, "for x got %v, want %v", perms, rootPerms)
+	}
+	if perms, _ := doGetPermissions(t, rootCtx, estr, "x/y", true); !reflect.DeepEqual(rootPerms, perms) {
+		boom(t, "for x/y got %v, want %v", perms, rootPerms)
+	}
+	if perms, _ := doGetPermissions(t, rootCtx, estr, "x/y/z", true); reflect.DeepEqual(rootPerms, perms) {
+		boom(t, "for x/y/z got %v, don't want %v", perms, rootPerms)
+	}
+}
+
+func initTest() (rootCtx *context.T, aliceCtx *context.T, bobCtx *context.T, shutdown v23.Shutdown) {
+	ctx, shutdown := test.V23InitWithMounttable()
+	var err error
+	if rootCtx, err = v23.WithPrincipal(ctx, testutil.NewPrincipal("root")); err != nil {
+		panic("failed to set root principal")
+	}
+	if aliceCtx, err = v23.WithPrincipal(ctx, testutil.NewPrincipal("alice")); err != nil {
+		panic("failed to set alice principal")
+	}
+	if bobCtx, err = v23.WithPrincipal(ctx, testutil.NewPrincipal("bob")); err != nil {
+		panic("failed to set bob principal")
+	}
+	for _, r := range []*context.T{rootCtx, aliceCtx, bobCtx} {
+		// A hack to set the namespace roots to a value that won't work.
+		v23.GetNamespace(r).SetRoots()
+		// And have all principals recognize each others blessings.
+		p1 := v23.GetPrincipal(r)
+		for _, other := range []*context.T{rootCtx, aliceCtx, bobCtx} {
+			// testutil.NewPrincipal has already setup each
+			// principal to use the same blessing for both server
+			// and client activities.
+			bother, _ := v23.GetPrincipal(other).BlessingStore().Default()
+			if err := security.AddToRoots(p1, bother); err != nil {
+				panic(err)
+			}
+		}
+	}
+	return rootCtx, aliceCtx, bobCtx, shutdown
+}
diff --git a/services/mounttable/btmtd/internal/node.go b/services/mounttable/btmtd/internal/node.go
new file mode 100644
index 0000000..d26aa3a
--- /dev/null
+++ b/services/mounttable/btmtd/internal/node.go
@@ -0,0 +1,392 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"encoding/hex"
+	"encoding/json"
+	"hash/fnv"
+	"io/ioutil"
+	"path"
+	"sort"
+	"strconv"
+	"strings"
+	"time"
+
+	"google.golang.org/cloud/bigtable"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/v23/security/access"
+	vdltime "v.io/v23/vdlroot/time"
+	"v.io/v23/verror"
+
+	"v.io/x/ref/lib/timekeeper"
+)
+
+type mtNode struct {
+	bt           *BigTable
+	name         string
+	sticky       bool
+	creationTime bigtable.Timestamp
+	permissions  access.Permissions
+	version      string
+	mountFlags   mtFlags
+	servers      []naming.MountedServer
+	children     []string
+}
+
+type mtFlags struct {
+	MT   bool `json:"mt,omitempty"`
+	Leaf bool `json:"leaf,omitempty"`
+}
+
+func longTimeout(ctx *context.T) (*context.T, func()) {
+	return context.WithTimeout(v23.GetBackgroundContext(ctx), time.Hour)
+}
+
+func getNode(ctx *context.T, bt *BigTable, name string) (*mtNode, error) {
+	row, err := bt.readRow(ctx, rowKey(name), bigtable.RowFilter(bigtable.LatestNFilter(1)))
+	if err != nil {
+		return nil, err
+	}
+	return nodeFromRow(ctx, bt, row, clock), nil
+}
+
+func rowKey(name string) string {
+	// The row key is a hash of the node name followed by the node name
+	// itself.
+	// This spreads the rows evenly across the tablet servers to avoid
+	// traffic imbalance.
+	name = naming.Clean("/" + name)
+	h := fnv.New32()
+	h.Write([]byte(name))
+	return hex.EncodeToString(h.Sum(nil)) + name
+}
+
+func nodeFromRow(ctx *context.T, bt *BigTable, row bigtable.Row, clock timekeeper.TimeKeeper) *mtNode {
+	const offset = 9 // 32-bit value in hex + '/'
+	name := row.Key()
+	if len(name) < offset {
+		return nil
+	}
+	n := &mtNode{
+		bt:   bt,
+		name: name[offset:],
+	}
+	for _, i := range row[metadataFamily] {
+		col := strings.TrimPrefix(i.Column, metadataFamily+":")
+		switch col {
+		case stickyColumn:
+			n.sticky = true
+		case versionColumn:
+			n.version = string(i.Value)
+		case permissionsColumn:
+			if err := json.Unmarshal(i.Value, &n.permissions); err != nil {
+				ctx.Errorf("Failed to decode permissions for %s", name)
+				return nil
+			}
+		case timestampColumn:
+			n.creationTime = i.Timestamp
+		}
+	}
+	n.servers = make([]naming.MountedServer, 0, len(row[serversFamily]))
+	for _, i := range row[serversFamily] {
+		deadline := i.Timestamp.Time()
+		if deadline.Before(clock.Now()) {
+			continue
+		}
+		if err := json.Unmarshal(i.Value, &n.mountFlags); err != nil {
+			ctx.Errorf("Failed to decode mount flags for %s", name)
+			return nil
+		}
+		n.servers = append(n.servers, naming.MountedServer{
+			Server:   i.Column[2:],
+			Deadline: vdltime.Deadline{deadline},
+		})
+	}
+	n.children = make([]string, 0, len(row[childrenFamily]))
+	for _, i := range row[childrenFamily] {
+		child := strings.TrimPrefix(i.Column, childrenFamily+":")
+		n.children = append(n.children, child)
+	}
+	return n
+}
+
+func (n *mtNode) createChild(ctx *context.T, child string, perms access.Permissions) (*mtNode, error) {
+	ts := n.bt.now()
+	mut := bigtable.NewMutation()
+	mut.Set(childrenFamily, child, ts, []byte{1})
+	if err := n.mutate(ctx, mut, false); err != nil {
+		return nil, err
+	}
+
+	// If the current process dies right here, it will leave the parent with
+	// a reference to a child row that doesn't exist. This means that the
+	// parent will never be seen as "empty" and will not be garbage
+	// collected. This will be corrected when:
+	//  - the child is created again, or
+	//  - the parent is forcibly deleted with Delete().
+
+	childName := naming.Join(n.name, child)
+	longCtx, cancel := longTimeout(ctx)
+	defer cancel()
+	if err := n.bt.createRow(longCtx, childName, perms, ts); err != nil {
+		return nil, err
+	}
+	n, err := getNode(ctx, n.bt, childName)
+	if err != nil {
+		return nil, err
+	}
+	if n == nil {
+		return nil, verror.New(errConcurrentAccess, ctx, childName)
+	}
+	return n, nil
+}
+
+func (n *mtNode) mount(ctx *context.T, server string, deadline time.Time, flags naming.MountFlag) error {
+	mut := bigtable.NewMutation()
+	if flags&naming.Replace != 0 {
+		mut.DeleteCellsInFamily(serversFamily)
+	}
+	f := mtFlags{
+		MT:   flags&naming.MT != 0,
+		Leaf: flags&naming.Leaf != 0,
+	}
+	jsonValue, err := json.Marshal(f)
+	if err != nil {
+		return err
+	}
+	mut.Set(serversFamily, server, n.bt.time(deadline), jsonValue)
+	if err := n.mutate(ctx, mut, false); err != nil {
+		return err
+	}
+	if flags&naming.Replace != 0 {
+		n.servers = nil
+	}
+	return nil
+}
+
+func (n *mtNode) unmount(ctx *context.T, server string) error {
+	mut := bigtable.NewMutation()
+	if server == "" {
+		// HACK ALERT
+		// The bttest server doesn't support DeleteCellsInFamily
+		if !n.bt.testMode {
+			mut.DeleteCellsInFamily(serversFamily)
+		} else {
+			for _, s := range n.servers {
+				mut.DeleteCellsInColumn(serversFamily, s.Server)
+			}
+		}
+	} else {
+		mut.DeleteCellsInColumn(serversFamily, server)
+	}
+	if err := n.mutate(ctx, mut, false); err != nil {
+		return err
+	}
+	if n, err := getNode(ctx, n.bt, n.name); err == nil {
+		n.gc(ctx)
+	}
+	return nil
+}
+
+func (n *mtNode) gc(ctx *context.T) (deletedAtLeastOne bool, err error) {
+	for n != nil && n.name != "" && !n.sticky && len(n.children) == 0 && len(n.servers) == 0 {
+		if err = n.delete(ctx, false); err != nil {
+			break
+		}
+		ctx.Infof("Deleted empty node %q", n.name)
+		deletedAtLeastOne = true
+		parent := path.Dir(n.name)
+		if parent == "." {
+			break
+		}
+		if n, err = getNode(ctx, n.bt, parent); err != nil {
+			break
+		}
+	}
+	return
+}
+
+func (n *mtNode) deleteAndGC(ctx *context.T, deleteSubtree bool) error {
+	if err := n.delete(ctx, deleteSubtree); err != nil {
+		return err
+	}
+	parentName, _ := path.Split(n.name)
+	if parent, err := getNode(ctx, n.bt, parentName); err == nil {
+		parent.gc(ctx)
+	}
+	return nil
+}
+
+func (n *mtNode) delete(ctx *context.T, deleteSubtree bool) error {
+	if !deleteSubtree && len(n.children) > 0 {
+		return verror.New(errNotEmpty, ctx, n.name)
+	}
+
+	// TODO(rthellend): This naive approach could be very expensive in
+	// terms of memory. A smarter, but slower, approach would be to walk
+	// the tree without holding on to all the node data.
+	for _, c := range n.children {
+		cn, err := getNode(ctx, n.bt, naming.Join(n.name, c))
+		if err != nil {
+			return err
+		}
+		if cn == nil {
+			// Node 'n' has a reference to a child that doesn't
+			// exist. It could be that it is being created or
+			// deleted concurrently. To be sure, we have to create
+			// it before deleting it.
+			if cn, err = n.createChild(ctx, c, n.permissions); err != nil {
+				return err
+			}
+		}
+		if err := cn.delete(ctx, true); err != nil {
+			return err
+		}
+	}
+
+	mut := bigtable.NewMutation()
+	mut.DeleteRow()
+	if err := n.mutate(ctx, mut, true); err != nil {
+		return err
+	}
+
+	// If the current process dies right here, it will leave the parent with
+	// a reference to a child row that no longer exists. This means that the
+	// parent will never be seen as "empty" and will not be garbage
+	// collected. This will be corrected when:
+	//  - the child is re-created, or
+	//  - the parent is forcibly deleted with Delete().
+
+	// Delete from parent node.
+	parent, child := path.Split(n.name)
+	mut = bigtable.NewMutation()
+	// DeleteTimestampRange deletes the cells whose timestamp is in the
+	// half open range [start,end). We need to delete the cell with
+	// timestamp n.creationTime (and any older ones).
+	mut.DeleteTimestampRange(childrenFamily, child, 0, n.bt.timeNext(n.creationTime))
+
+	longCtx, cancel := longTimeout(ctx)
+	defer cancel()
+	return n.bt.apply(longCtx, rowKey(parent), mut)
+}
+
+func (n *mtNode) setPermissions(ctx *context.T, perms access.Permissions) error {
+	jsonPerms, err := json.Marshal(perms)
+	if err != nil {
+		return err
+	}
+	mut := bigtable.NewMutation()
+	mut.Set(metadataFamily, permissionsColumn, bigtable.ServerTime, jsonPerms)
+	mut.Set(metadataFamily, stickyColumn, bigtable.ServerTime, []byte{1})
+	if err := n.mutate(ctx, mut, false); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (n *mtNode) mutate(ctx *context.T, mut *bigtable.Mutation, delete bool) error {
+	if !delete {
+		v, err := strconv.ParseUint(n.version, 10, 64)
+		if err != nil {
+			return err
+		}
+		newVersion := strconv.FormatUint(v+1, 10)
+		mut.Set(metadataFamily, versionColumn, bigtable.ServerTime, []byte(newVersion))
+	}
+
+	// The mutation will succeed iff the row already exists with the
+	// expected version.
+	filter := bigtable.ChainFilters(
+		bigtable.FamilyFilter(metadataFamily),
+		bigtable.ColumnFilter(versionColumn),
+		bigtable.LatestNFilter(1),
+		bigtable.ValueFilter(n.version),
+	)
+	condMut := bigtable.NewCondMutation(filter, mut, nil)
+	var success bool
+	if err := n.bt.apply(ctx, rowKey(n.name), condMut, bigtable.GetCondMutationResult(&success)); err != nil {
+		return err
+	}
+	if !success {
+		return verror.New(errConcurrentAccess, ctx, n.name)
+	}
+	return nil
+}
+
+func createNodesFromFile(ctx *context.T, bt *BigTable, fileName string) error {
+	var nodes map[string]access.Permissions
+	data, err := ioutil.ReadFile(fileName)
+	if err != nil {
+		return err
+	}
+	if err := json.Unmarshal(data, &nodes); err != nil {
+		return err
+	}
+	// This loop adds backward compatibility with the older template format,
+	// e.g. "a/b/%%" { "Admin": { "In": [ "%%" ] } }
+	// With the new format, this is equivalent to:
+	// "a/b" { "%%/Admin": { "In": [ "%%" ] } }
+	for node, perms := range nodes {
+		if strings.HasSuffix(node, "/%%") {
+			delete(nodes, node)
+			node = strings.TrimSuffix(node, "/%%")
+			p := nodes[node]
+			if p == nil {
+				p = make(access.Permissions)
+			}
+			for tag := range perms {
+				p["%%/"+tag] = perms[tag]
+			}
+			nodes[node] = p
+		}
+	}
+
+	// Create the nodes in alphanumeric order so that children are
+	// created after their parents.
+	sortedNodes := []string{}
+	for node := range nodes {
+		sortedNodes = append(sortedNodes, node)
+	}
+	sort.Strings(sortedNodes)
+
+	ts := bt.now()
+	for _, node := range sortedNodes {
+		perms := nodes[node]
+		if node == "" {
+			if err := bt.createRow(ctx, "", perms, ts); err != nil {
+				return err
+			}
+			continue
+		}
+		parentName := ""
+		for _, e := range strings.Split(node, "/") {
+			n, err := getNode(ctx, bt, naming.Join(parentName, e))
+			if err != nil {
+				return err
+			}
+			if n == nil {
+				parent, err := getNode(ctx, bt, parentName)
+				if err != nil {
+					return err
+				}
+				if n, err = parent.createChild(ctx, e, parent.permissions); err != nil {
+					return err
+				}
+			}
+			if n.name == node {
+				// setPermissions also makes the node sticky.
+				if err := n.setPermissions(ctx, perms); err != nil {
+					return err
+				}
+			}
+			parentName = n.name
+		}
+	}
+	return nil
+}
diff --git a/services/mounttable/btmtd/internal/testdata/intermediate.perms b/services/mounttable/btmtd/internal/testdata/intermediate.perms
new file mode 100644
index 0000000..438d3f1
--- /dev/null
+++ b/services/mounttable/btmtd/internal/testdata/intermediate.perms
@@ -0,0 +1,10 @@
+{
+"": {
+	"Read":  { "In": ["..."] },
+	"Admin": { "In": ["root"] }
+},
+"x/y/z": {
+	"Read":  { "In": ["bob", "root"] },
+        "Mount": { "In": ["root"] }
+}
+}
diff --git a/services/mounttable/btmtd/internal/testdata/test.perms b/services/mounttable/btmtd/internal/testdata/test.perms
new file mode 100644
index 0000000..270628e
--- /dev/null
+++ b/services/mounttable/btmtd/internal/testdata/test.perms
@@ -0,0 +1,22 @@
+{
+"": {
+	"Read":  { "In": ["..."] },
+	"Admin": { "In": ["root"] },
+	"Create": { "In": ["root", "bob"] }
+},
+"stuff": {
+	"Read":  { "In": ["bob", "root"] },
+        "Mount": { "In": ["root"] }
+},
+"a": {
+	"Read":  { "In": ["alice", "root"] },
+        "Create": { "In": ["root"] }
+},
+"users": {
+	"Create": { "In": ["..."] },
+	"Resolve": { "In": ["..."] }
+},
+"users/%%": {
+	"Admin":  { "In": ["%%"] }
+}
+}
diff --git a/services/mounttable/btmtd/main.go b/services/mounttable/btmtd/main.go
new file mode 100644
index 0000000..2e5676a
--- /dev/null
+++ b/services/mounttable/btmtd/main.go
@@ -0,0 +1,137 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The following enables go generate to generate the doc.go file.
+//go:generate go run $JIRI_ROOT/release/go/src/v.io/x/lib/cmdline/testdata/gendoc.go . -help
+
+package main
+
+import (
+	"math/rand"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/options"
+
+	"v.io/x/lib/cmdline"
+	"v.io/x/ref/lib/security/securityflag"
+	"v.io/x/ref/lib/signals"
+	"v.io/x/ref/lib/v23cmd"
+	_ "v.io/x/ref/runtime/factories/roaming"
+	"v.io/x/ref/services/mounttable/btmtd/internal"
+)
+
+var (
+	cmdRoot = &cmdline.Command{
+		Runner: v23cmd.RunnerFunc(runMT),
+		Name:   "btmtd",
+		Short:  "Runs the mounttable service",
+		Long:   "Runs the mounttable service.",
+		Children: []*cmdline.Command{
+			cmdSetup, cmdDestroy, cmdDump,
+		},
+	}
+	cmdSetup = &cmdline.Command{
+		Runner: v23cmd.RunnerFunc(runSetup),
+		Name:   "setup",
+		Short:  "Creates and sets up the table",
+		Long:   "Creates and sets up the table.",
+	}
+	cmdDestroy = &cmdline.Command{
+		Runner: v23cmd.RunnerFunc(runDestroy),
+		Name:   "destroy",
+		Short:  "Destroy the table",
+		Long:   "Destroy the table. All data will be lost.",
+	}
+	cmdDump = &cmdline.Command{
+		Runner: v23cmd.RunnerFunc(runDump),
+		Name:   "dump",
+		Short:  "Dump the table",
+		Long:   "Dump the table.",
+	}
+
+	keyFileFlag      string
+	projectFlag      string
+	zoneFlag         string
+	clusterFlag      string
+	tableFlag        string
+	inMemoryTestFlag bool
+
+	permissionsFileFlag string
+)
+
+func main() {
+	rand.Seed(time.Now().UnixNano())
+	cmdRoot.Flags.StringVar(&keyFileFlag, "key-file", "", "The file that contains the Google Cloud JSON credentials to use")
+	cmdRoot.Flags.StringVar(&projectFlag, "project", "", "The Google Cloud project of the Cloud Bigtable cluster")
+	cmdRoot.Flags.StringVar(&zoneFlag, "zone", "", "The Google Cloud zone of the Cloud Bigtable cluster")
+	cmdRoot.Flags.StringVar(&clusterFlag, "cluster", "", "The Cloud Bigtable cluster name")
+	cmdRoot.Flags.StringVar(&tableFlag, "table", "mounttable", "The name of the table to use")
+	cmdRoot.Flags.BoolVar(&inMemoryTestFlag, "in-memory-test", false, "If true, use an in-memory bigtable server (for testing only)")
+	cmdRoot.Flags.StringVar(&permissionsFileFlag, "permissions-file", "", "The file that contains the initial node permissions.")
+
+	cmdline.HideGlobalFlagsExcept()
+	cmdline.Main(cmdRoot)
+}
+
+func runSetup(ctx *context.T, env *cmdline.Env, args []string) error {
+	bt, err := internal.NewBigTable(keyFileFlag, projectFlag, zoneFlag, clusterFlag, tableFlag)
+	if err != nil {
+		return err
+	}
+	return bt.SetupTable(ctx, permissionsFileFlag)
+}
+
+func runDestroy(ctx *context.T, env *cmdline.Env, args []string) error {
+	bt, err := internal.NewBigTable(keyFileFlag, projectFlag, zoneFlag, clusterFlag, tableFlag)
+	if err != nil {
+		return err
+	}
+	return bt.DeleteTable(ctx)
+}
+
+func runDump(ctx *context.T, env *cmdline.Env, args []string) error {
+	bt, err := internal.NewBigTable(keyFileFlag, projectFlag, zoneFlag, clusterFlag, tableFlag)
+	if err != nil {
+		return err
+	}
+	return bt.DumpTable(ctx)
+}
+
+func runMT(ctx *context.T, env *cmdline.Env, args []string) error {
+	var (
+		bt  *internal.BigTable
+		err error
+	)
+	if inMemoryTestFlag {
+		var shutdown func()
+		if bt, shutdown, err = internal.NewTestBigTable(tableFlag); err != nil {
+			return err
+		}
+		defer shutdown()
+		if err := bt.SetupTable(ctx, permissionsFileFlag); err != nil {
+			return err
+		}
+	} else {
+		if bt, err = internal.NewBigTable(keyFileFlag, projectFlag, zoneFlag, clusterFlag, tableFlag); err != nil {
+			return err
+		}
+	}
+
+	globalPerms, err := securityflag.PermissionsFromFlag()
+	if err != nil {
+		return err
+	}
+	acl := globalPerms["Admin"]
+	disp := internal.NewDispatcher(bt, &acl)
+	ctx, server, err := v23.WithNewDispatchingServer(ctx, "", disp, options.ServesMountTable(true))
+	if err != nil {
+		return err
+	}
+	ctx.Infof("Listening on: %v", server.Status().Endpoints)
+
+	<-signals.ShutdownOnSignals(ctx)
+	return nil
+}