services/mounttable/btmtd: Add fsck command

The fsck command scans the node table for inconsistencies:
  - orphan nodes, i.e. parent doesn't exist, or parent doesn't have a
    reference to it
  - dangling references, i.e. a node has a reference to a node that
    doesn't exist.

Optionally, fsck will delete orphan rows and dangling references when
--fix is used, in which case, all the counters will also be
recalculated.

The --fix flag should not be used while other processes could be
mutating the nodes.

Change-Id: I7eeed693c2e7717285f4a632b09648971377c3d0
diff --git a/services/mounttable/btmtd/doc.go b/services/mounttable/btmtd/doc.go
index 47eb159..5034fda 100644
--- a/services/mounttable/btmtd/doc.go
+++ b/services/mounttable/btmtd/doc.go
@@ -16,6 +16,7 @@
    setup       Creates and sets up the table
    destroy     Destroy the table
    dump        Dump the table
+   fsck        Check the table consistency
    help        Display help for commands or topics
 
 The btmtd flags are:
diff --git a/services/mounttable/btmtd/internal/bt.go b/services/mounttable/btmtd/internal/bt.go
index 9396673..04bf264 100644
--- a/services/mounttable/btmtd/internal/bt.go
+++ b/services/mounttable/btmtd/internal/bt.go
@@ -249,6 +249,61 @@
 	return nil
 }
 
+func (b *BigTable) Fsck(ctx *context.T, fix bool) error {
+	bctx, cancel := btctx(ctx)
+	defer cancel()
+
+	ctx.Infof("Checking table consistency...")
+
+	inconsistentTotal := 0
+	for pass := 1; ; pass++ {
+		ctx.Infof("Starting pass #%d", pass)
+		inconsistentCount := 0
+		if err := b.nodeTbl.ReadRows(
+			bctx,
+			bigtable.InfiniteRange(""),
+			func(row bigtable.Row) bool {
+				n := nodeFromRow(ctx, b, row, clock)
+				if err := n.checkInvariants(ctx, fix); err != nil {
+					inconsistentCount++
+					ctx.Infof("checkInvariants: %v", err)
+				}
+				return true
+			},
+			bigtable.RowFilter(bigtable.LatestNFilter(1)),
+		); err != nil {
+			return err
+		}
+		ctx.Infof("Completed pass #%d - inconsistent nodes: %d", pass, inconsistentCount)
+		inconsistentTotal += inconsistentCount
+		if !fix || inconsistentCount == 0 {
+			break
+		}
+	}
+
+	if !fix {
+		if inconsistentTotal > 0 {
+			return fmt.Errorf("found inconsistent nodes: %d", inconsistentTotal)
+		}
+		ctx.Infof("All nodes are consistent.")
+		return nil
+	}
+
+	if inconsistentTotal > 0 {
+		ctx.Infof("Found and fixed inconsistent nodes: %d\n", inconsistentTotal)
+	} else {
+		ctx.Infof("All nodes are consistent.")
+	}
+
+	ctx.Infof("Recalculating counters...")
+	if err := recalculateCounters(ctx, b); err != nil {
+		return err
+	}
+	ctx.Infof("Done.")
+
+	return nil
+}
+
 func (b *BigTable) CountRows(ctx *context.T) (int, error) {
 	bctx, cancel := btctx(ctx)
 	defer cancel()
diff --git a/services/mounttable/btmtd/internal/counters.go b/services/mounttable/btmtd/internal/counters.go
index df6ed6d..bf8713d 100644
--- a/services/mounttable/btmtd/internal/counters.go
+++ b/services/mounttable/btmtd/internal/counters.go
@@ -65,3 +65,40 @@
 func incrementCreatorServerCount(ctx *context.T, bt *BigTable, creator string, delta, limit int64) error {
 	return incrementWithLimit(ctx, bt, "num-servers-per-user:"+creator, delta, limit, errServerLimitExceeded)
 }
+
+func recalculateCounters(ctx *context.T, bt *BigTable) error {
+	bctx, cancel := btctx(ctx)
+	defer cancel()
+
+	// Delete all the counters.
+	if err := bt.counterTbl.ReadRows(bctx, bigtable.InfiniteRange(""),
+		func(row bigtable.Row) bool {
+			mut := bigtable.NewMutation()
+			mut.DeleteRow()
+			if err := bt.counterTbl.Apply(ctx, row.Key(), mut); err != nil {
+				ctx.Errorf("apply delete row (%q) failed: %v", row.Key(), err)
+				return false
+			}
+			return true
+		},
+	); err != nil {
+		return err
+	}
+
+	// Re-create all the counters.
+	return bt.nodeTbl.ReadRows(bctx, bigtable.InfiniteRange(""),
+		func(row bigtable.Row) bool {
+			n := nodeFromRow(ctx, bt, row, clock)
+			if err := incrementCreatorNodeCount(ctx, bt, n.creator, 1, 0); err != nil {
+				ctx.Errorf("incrementCreatorNodeCount(%q) failed: %v", n.name, err)
+				return false
+			}
+			if err := incrementCreatorServerCount(ctx, bt, n.creator, int64(len(n.servers)+len(n.expiredServers)), 0); err != nil {
+				ctx.Errorf("incrementCreatorServerCount(%q) failed: %v", n.name, err)
+				return false
+			}
+			return true
+		},
+		bigtable.RowFilter(bigtable.LatestNFilter(1)),
+	)
+}
diff --git a/services/mounttable/btmtd/internal/node.go b/services/mounttable/btmtd/internal/node.go
index c999369..723462d 100644
--- a/services/mounttable/btmtd/internal/node.go
+++ b/services/mounttable/btmtd/internal/node.go
@@ -515,3 +515,58 @@
 	}
 	return nil
 }
+
+func (n *mtNode) checkInvariants(ctx *context.T, fix bool) error {
+	// Is this an orphan node, i.e. either the parent doesn't exist, or the
+	// parent doesn't have a reference to it?
+	if n.name != "" {
+		parentName, childName := path.Split(n.name)
+		pn, err := getNode(ctx, n.bt, parentName)
+		if err != nil {
+			return err
+		}
+		orphan := true
+		if pn != nil {
+			for _, c := range pn.children {
+				if c.name() == childName && c.id() == n.id {
+					orphan = false
+					break
+				}
+			}
+		}
+		if orphan {
+			if fix {
+				if err := n.delete(ctx, true); err != nil {
+					return err
+				}
+				return fmt.Errorf("deleted orphan node %q", n.name)
+			}
+			return fmt.Errorf("found orphan node %q", n.name)
+		}
+	}
+	// Does this node have references to nodes that don't exis?
+	var mut *bigtable.Mutation
+	for _, c := range n.children {
+		cn, err := getNode(ctx, n.bt, naming.Join(n.name, c.name()))
+		if err != nil {
+			return err
+		}
+		if cn != nil && cn.id == c.id() {
+			continue
+		}
+		if mut == nil {
+			mut = bigtable.NewMutation()
+		}
+		mut.DeleteCellsInColumn(childrenFamily, string(c))
+	}
+	if mut != nil {
+		if fix {
+			if err := n.mutate(ctx, mut, false); err != nil {
+				return err
+			}
+			return fmt.Errorf("deleted dangling reference on %q", n.name)
+		}
+		return fmt.Errorf("found dangling reference on %q", n.name)
+	}
+	return nil
+}
diff --git a/services/mounttable/btmtd/internal/node_test.go b/services/mounttable/btmtd/internal/node_test.go
index aafbc1a..8d4bd21 100644
--- a/services/mounttable/btmtd/internal/node_test.go
+++ b/services/mounttable/btmtd/internal/node_test.go
@@ -5,10 +5,14 @@
 package internal
 
 import (
+	"fmt"
 	"math/rand"
 	"testing"
 	"time"
 
+	"google.golang.org/cloud/bigtable"
+
+	"v.io/v23/naming"
 	"v.io/v23/security"
 	"v.io/v23/security/access"
 	"v.io/x/ref/test"
@@ -151,3 +155,92 @@
 		bt.DumpTable(ctx)
 	}
 }
+
+func TestFsck(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	bt, shutdownBT, err := NewTestBigTable("test")
+	if err != nil {
+		t.Fatalf("NewTestBigTable: %s", err)
+	}
+	defer shutdownBT()
+	if err := bt.SetupTable(ctx, ""); err != nil {
+		t.Fatalf("bt.SetupTable: %s", err)
+	}
+
+	checkRowCount := func(expected int) {
+		count, err := bt.CountRows(ctx)
+		if err != nil {
+			t.Errorf("CountRows failed: %v", err)
+		}
+		if count != expected {
+			t.Errorf("Unexpected number of rows: got %d, expected %d", count, expected)
+		}
+	}
+
+	root, err := getNode(ctx, bt, "")
+	if err != nil {
+		t.Fatalf("getNode: %v", err)
+	}
+
+	// Create a few children.
+	for i := 1; i <= 5; i++ {
+		if _, err := root.createChild(ctx, fmt.Sprintf("child%d", i), root.permissions, "", 0); err != nil {
+			t.Fatalf("createChild failed: %v", err)
+		}
+		if root, err = getNode(ctx, bt, ""); err != nil {
+			t.Fatalf("getNode: %v", err)
+		}
+	}
+	checkRowCount(6)
+
+	// Create a few orphan rows.
+	for i := 1; i <= 5; i++ {
+		name := fmt.Sprintf("orphan%d", i)
+		childRef, err := newChild("XXXXXXXX", name)
+		if err != nil {
+			t.Fatalf("newChild failed: %v", err)
+		}
+		if err := bt.createRow(ctx, name, root.permissions, "", childRef, 0); err != nil {
+			t.Fatalf("createRow failed: %v", err)
+		}
+		if childRef, err = newChild("XXXXXXXX", "otherorphan"); err != nil {
+			t.Fatalf("newChild failed: %v", err)
+		}
+		if err := bt.createRow(ctx, naming.Join(name, childRef.name()), root.permissions, "", childRef, 0); err != nil {
+			t.Fatalf("createRow failed: %v", err)
+		}
+		for j := 1; j <= 5; j++ {
+			n, err := getNode(ctx, bt, name)
+			if err != nil {
+				t.Fatalf("getNode: %v", err)
+			}
+			if _, err := n.createChild(ctx, fmt.Sprintf("child%d", j), n.permissions, "", 0); err != nil {
+				t.Fatalf("createChild failed: %v", err)
+			}
+		}
+	}
+	checkRowCount(41)
+
+	// Add reference to a row that doesn't exist.
+	mut := bigtable.NewMutation()
+	mut.Set(childrenFamily, "XXXXXXXXwhodat", bigtable.ServerTime, []byte{1})
+	if err := bt.apply(ctx, rowKey("child1"), mut); err != nil {
+		t.Fatalf("unexpected failure: %v", err)
+	}
+	checkRowCount(41)
+
+	if err := bt.Fsck(ctx, false); err == nil {
+		t.Errorf("Fsck did not find any inconsistencies")
+	}
+	if err := bt.Fsck(ctx, true); err != nil {
+		t.Errorf("Fsck failed: %v", err)
+	}
+	if err := bt.Fsck(ctx, false); err != nil {
+		t.Errorf("Fsck failed: %v", err)
+	}
+
+	// root + 5 children
+	checkRowCount(6)
+}
diff --git a/services/mounttable/btmtd/main.go b/services/mounttable/btmtd/main.go
index 2b6b88e..7ea36bc 100644
--- a/services/mounttable/btmtd/main.go
+++ b/services/mounttable/btmtd/main.go
@@ -8,7 +8,9 @@
 package main
 
 import (
+	"fmt"
 	"math/rand"
+	"strings"
 	"time"
 
 	"v.io/v23"
@@ -30,7 +32,7 @@
 		Short:  "Runs the mounttable service",
 		Long:   "Runs the mounttable service.",
 		Children: []*cmdline.Command{
-			cmdSetup, cmdDestroy, cmdDump,
+			cmdSetup, cmdDestroy, cmdDump, cmdFsck,
 		},
 	}
 	cmdSetup = &cmdline.Command{
@@ -51,6 +53,12 @@
 		Short:  "Dump the table",
 		Long:   "Dump the table.",
 	}
+	cmdFsck = &cmdline.Command{
+		Runner: v23cmd.RunnerFunc(runFsck),
+		Name:   "fsck",
+		Short:  "Check the table consistency",
+		Long:   "Check the table consistency.",
+	}
 
 	keyFileFlag      string
 	projectFlag      string
@@ -64,6 +72,8 @@
 
 	maxNodesPerUserFlag   int
 	maxServersPerUserFlag int
+
+	fixFlag bool
 )
 
 func main() {
@@ -80,6 +90,8 @@
 	cmdRoot.Flags.IntVar(&maxNodesPerUserFlag, "max-nodes-per-user", 10000, "The maximum number of nodes that a single user can create.")
 	cmdRoot.Flags.IntVar(&maxServersPerUserFlag, "max-servers-per-user", 10000, "The maximum number of servers that a single user can mount.")
 
+	cmdFsck.Flags.BoolVar(&fixFlag, "fix", false, "Whether to fix consistency errors and recreate all the counters.")
+
 	cmdline.HideGlobalFlagsExcept()
 	cmdline.Main(cmdRoot)
 }
@@ -108,6 +120,23 @@
 	return bt.DumpTable(ctx)
 }
 
+func runFsck(ctx *context.T, env *cmdline.Env, args []string) error {
+	bt, err := internal.NewBigTable(keyFileFlag, projectFlag, zoneFlag, clusterFlag, tableFlag)
+	if err != nil {
+		return err
+	}
+	if fixFlag {
+		fmt.Fprintln(env.Stdout, "WARNING: Make sure nothing else is modifying the table while fsck is running")
+		fmt.Fprint(env.Stdout, "Continue [y/N]? ")
+		var answer string
+		if _, err := fmt.Scanf("%s", &answer); err != nil || strings.ToUpper(answer) != "Y" {
+			fmt.Fprintln(env.Stdout, "Aborted")
+			return nil
+		}
+	}
+	return bt.Fsck(ctx, fixFlag)
+}
+
 func runMT(ctx *context.T, env *cmdline.Env, args []string) error {
 	var (
 		bt  *internal.BigTable