Use stats to monitor LevelDb file usage.

This adds the following four stats for a syncbase service:
  syncbase/leveldb/service/{hash}/file_count
  syncbase/leveldb/service/{hash}/filesystem_bytes
  syncbase/leveldb/blobmap/{hash}/file_count
  syncbase/leveldb/blobmap/{hash}/filesystem_bytes
and the following two stats metrics per database:
  syncbase/leveldb/db/{blessing}/{DB-name}/{hash}/file_count
  syncbase/leveldb/db/{blessing}/{DB-name}/{hash}/filesystem_bytes
where {hash} is a hash of the file path of the store.

Also fixes a bug: the service store was not being closed when these
service was closed.

Change-Id: I844786c105d2671df1013cd29b2534eeedefb495
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 0143749..7941db5 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -274,6 +274,7 @@
 func (s *service) Close() {
 	s.vclockD.Close()
 	vsync.Close(s.sync)
+	s.st.Close()
 }
 
 ////////////////////////////////////////
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index eae2e99..fbb09aa 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -11,12 +11,18 @@
 // #include "syncbase_leveldb.h"
 import "C"
 import (
+	"encoding/base64"
 	"fmt"
+	"hash/fnv"
+	"regexp"
 	"strings"
 	"sync"
 	"unsafe"
 
+	"v.io/v23/naming"
 	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
+	"v.io/x/ref/lib/stats"
 	"v.io/x/ref/services/syncbase/store"
 	"v.io/x/ref/services/syncbase/store/transactions"
 )
@@ -32,6 +38,7 @@
 	readOptions  *C.leveldb_readoptions_t
 	writeOptions *C.leveldb_writeoptions_t
 	err          error
+	statsPrefix  string
 }
 
 const defaultMaxOpenFiles = 100
@@ -43,6 +50,16 @@
 }
 
 // Open opens the database located at the given path.
+//
+// This adds the following four stats for a syncbase service:
+//   syncbase/leveldb/service/{hash}/file_count
+//   syncbase/leveldb/service/{hash}/filesystem_bytes
+//   syncbase/leveldb/blobmap/{hash}/file_count
+//   syncbase/leveldb/blobmap/{hash}/filesystem_bytes
+// and the following two stats metrics per database:
+//   syncbase/leveldb/db/{blessing}/{DB-name}/{hash}/file_count
+//   syncbase/leveldb/db/{blessing}/{DB-name}/{hash}/filesystem_bytes
+// where {hash} is a hash of the file path of the store.
 func Open(path string, opts OpenOptions) (store.Store, error) {
 	bs, err := openBatchStore(path, opts)
 	if err != nil {
@@ -51,6 +68,14 @@
 	return transactions.Wrap(bs), nil
 }
 
+// addStatCallback creates a stats object and returns its key
+func addStatCallback(prefix string, metric string, callback func() int64) string {
+	statsKey := naming.Join(prefix, metric)
+	stats.NewIntegerFunc(statsKey, callback)
+	vlog.VI(1).Infof("Adding stats call at %q", statsKey)
+	return statsKey
+}
+
 // openBatchStore opens the non-transactional leveldb database that supports
 // batch writes at the given path.
 func openBatchStore(path string, opts OpenOptions) (*db, error) {
@@ -87,18 +112,28 @@
 	}
 	readOptions := C.leveldb_readoptions_create()
 	C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
-	return &db{
+
+	d := &db{
 		node:         store.NewResourceNode(),
 		cDb:          cDb,
 		readOptions:  readOptions,
 		writeOptions: C.leveldb_writeoptions_create(),
-	}, nil
+		statsPrefix:  statsPrefixFromPath(path),
+	}
+
+	addStatCallback(d.statsPrefix, "file_count", func() int64 { return int64(d.fileCount()) })
+	addStatCallback(d.statsPrefix, "filesystem_bytes", func() int64 { return int64(d.filesystemBytes()) })
+
+	return d, nil
 }
 
 // Close implements the store.Store interface.
 func (d *db) Close() error {
 	d.mu.Lock()
 	defer d.mu.Unlock()
+	if err := stats.Delete(d.statsPrefix); err != nil {
+		vlog.Errorf("Problem deleting stats %q: %v", d.statsPrefix, err)
+	}
 	if d.err != nil {
 		return store.ConvertError(d.err)
 	}
@@ -217,10 +252,10 @@
 	return C.GoString(value)
 }
 
-// stats returns per-level statistics as a set of parallel slices of the same
-// length.  For small databases this call is about 2X slower than a Get; for
-// large databases it is about one order of magnitude faster than a Get.
-func (d *db) stats() (fileCounts, fileMBs, readMBs, writeMBs []int, err error) {
+// levelInfo returns per-level statistics as a set of parallel slices of the
+// same length.  For small databases this call is about 2X slower than a Get;
+// for large databases it is about one order of magnitude faster than a Get.
+func (d *db) levelInfo() (fileCounts, fileMBs, readMBs, writeMBs []int, err error) {
 	// It's unfortunate that LevelDB only provides the stats in a formatted
 	// string, so we have to parse them out.
 	text := d.property("leveldb.stats")
@@ -267,3 +302,67 @@
 func (d *db) fileCount() int {
 	return int(C.syncbase_leveldb_file_count(d.cDb))
 }
+
+// statsPrefixFromPath transforms a leveldb file path into a string that is more
+// convenient as a stats key in the following ways:
+//
+// * it is shorter, filtering out long hex strings and redundant fixed strings
+//
+// * it is easy to query using the stats glob API, always starting with
+// "syncbase/leveldb/{storeType}", where store type is one of {"service",
+// "blobmap", "db", "test", "other"}
+//
+// * it preserves human-readable database name, blessing, and syncbase service
+// IDs if they appear in the file path.
+//
+// * it is unique to a service instance with high probability, by including a
+// short hash of the path
+func statsPrefixFromPath(path string) string {
+	return naming.Join("syncbase", "leveldb", applyRules(path), hash(path))
+}
+
+func applyRules(path string) string {
+	for _, rule := range rules {
+		if rule.from.MatchString(path) {
+			return rule.from.ReplaceAllString(path, rule.to)
+		}
+	}
+	return fmt.Sprintf("other/%s", defaultRuleDelete.ReplaceAllString(path, ""))
+}
+
+var rules = []struct {
+	from *regexp.Regexp
+	to   string
+}{
+	// From v.io/x/ref/services/syncbase/server.rootDirForDb
+	{regexp.MustCompile(
+		`.*/apps/(.+)\-[0-9a-f]+/dbs/(.+)\-[0-9a-f]+\-[0-9a-f]+/leveldb$`),
+		"db/$1/$2"},
+
+	// From v.io/x/ref/services/syncbase/localblobstore/fs_cablobstore.Create
+	{regexp.MustCompile(
+		`.*/blobs/chunk$`),
+		"blobmap"},
+
+	// From v.io/x/ref/services/syncbase/server.NewService
+	{regexp.MustCompile(
+		`.*/leveldb$`),
+		"service"},
+
+	// From v.io/x/ref/services/syncbase/store/leveldb.newBatchStore
+	{regexp.MustCompile(
+		`.*/syncbase_leveldb[0-9]+$`),
+		"test"},
+}
+
+// Used only if none of the above rules match
+var defaultRuleDelete = regexp.MustCompile(`(/tmp|/leveldb|-[0-9a-f]{30,})`)
+
+// hash returns a hash string of s.  A small 32-bit non-crypto hash is good
+// enough, as it is just a fallback for the unlikely case we get a collision in
+// the filtered pathname.
+func hash(s string) string {
+	h := fnv.New32()
+	h.Write([]byte(s))
+	return base64.RawURLEncoding.EncodeToString(h.Sum([]byte{}))
+}
diff --git a/services/syncbase/store/leveldb/stats_benchmark_test.go b/services/syncbase/store/leveldb/stats_benchmark_test.go
index 9824a73..52085b5 100644
--- a/services/syncbase/store/leveldb/stats_benchmark_test.go
+++ b/services/syncbase/store/leveldb/stats_benchmark_test.go
@@ -6,6 +6,8 @@
 
 import (
 	"testing"
+
+	"v.io/x/ref/lib/stats"
 )
 
 var (
@@ -53,6 +55,20 @@
 
 }
 
+func BenchmarkStats_FileSystemBytesViaStats_LotsOfData(b *testing.B) {
+	bs, cleanup := newBatchStore()
+	defer cleanup()
+	defer b.StopTimer() // Called before cleanup() to avoid timing it.
+	write100MB(bs)
+	b.ResetTimer() // Ignore timing of init stuff above.
+
+	for i := 0; i < b.N; i++ {
+		fileBytes, _ := stats.GetStatsObject(
+			bs.statsPrefix + "/filesystem_bytes")
+		accumulate += int(fileBytes.Value().(int64))
+	}
+}
+
 func BenchmarkStats_FileCount_NoData(b *testing.B) {
 	bs, cleanup := newBatchStore()
 	defer cleanup()
@@ -91,7 +107,21 @@
 
 }
 
-func BenchmarkStats_Stats_NoData(b *testing.B) {
+func BenchmarkStats_FileCountViaStats_LotsOfData(b *testing.B) {
+	bs, cleanup := newBatchStore()
+	defer cleanup()
+	defer b.StopTimer() // Called before cleanup() to avoid timing it.
+	write100MB(bs)
+	b.ResetTimer() // Ignore timing of init stuff above.
+
+	for i := 0; i < b.N; i++ {
+		fileCount, _ := stats.GetStatsObject(
+			bs.statsPrefix + "/file_count")
+		accumulate += int(fileCount.Value().(int64))
+	}
+}
+
+func BenchmarkStats_LevelInfo_NoData(b *testing.B) {
 	bs, cleanup := newBatchStore()
 	defer cleanup()
 	defer b.StopTimer() // Called before cleanup() to avoid timing it.
@@ -99,13 +129,13 @@
 
 	for i := 0; i < b.N; i++ {
 		var c, s, r, w []int
-		c, s, r, w, err = bs.stats()
+		c, s, r, w, err = bs.levelInfo()
 		accumulate += len(c) + len(s) + len(r) + len(w)
 	}
 
 }
 
-func BenchmarkStats_Stats_SomeData(b *testing.B) {
+func BenchmarkStats_LevelInfo_SomeData(b *testing.B) {
 	bs, cleanup := newBatchStore()
 	defer cleanup()
 	defer b.StopTimer() // Called before cleanup() to avoid timing it.
@@ -114,13 +144,13 @@
 
 	for i := 0; i < b.N; i++ {
 		var c, s, r, w []int
-		c, s, r, w, err = bs.stats()
+		c, s, r, w, err = bs.levelInfo()
 		accumulate += len(c) + len(s) + len(r) + len(w)
 	}
 
 }
 
-func BenchmarkStats_Stats_LotsOfData(b *testing.B) {
+func BenchmarkStats_LevelInfo_LotsOfData(b *testing.B) {
 	bs, cleanup := newBatchStore()
 	defer cleanup()
 	defer b.StopTimer() // Called before cleanup() to avoid timing it.
@@ -129,7 +159,7 @@
 
 	for i := 0; i < b.N; i++ {
 		var c, s, r, w []int
-		c, s, r, w, err = bs.stats()
+		c, s, r, w, err = bs.levelInfo()
 		accumulate += c[0] + s[0] + r[0] + w[0]
 	}
 
diff --git a/services/syncbase/store/leveldb/stats_blackbox_test.go b/services/syncbase/store/leveldb/stats_blackbox_test.go
new file mode 100644
index 0000000..71172b0
--- /dev/null
+++ b/services/syncbase/store/leveldb/stats_blackbox_test.go
@@ -0,0 +1,193 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb_test
+
+import (
+	"crypto/rand"
+	"fmt"
+	"log"
+	"testing"
+	"time"
+
+	"v.io/v23/context"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	"v.io/v23/syncbase"
+	"v.io/x/ref/lib/stats"
+	_ "v.io/x/ref/runtime/factories/generic"
+	tu "v.io/x/ref/services/syncbase/testutil"
+)
+
+// In this file we do integration tests of the leveldb stats via the public
+// Syncbase API.  For lower level tests see the stats_test.go file.
+
+func TestEmpty(t *testing.T) {
+	_, serverName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+
+	since := time.Now()
+	syncbase.NewService(serverName)
+
+	count := 0
+	for got := range statsValues("service/*/*", since) {
+		count++
+
+		// All stats values should be zero (see stats_test.go)
+		if got.(int64) != 0 {
+			t.Errorf("Got %v, want 0", got)
+		}
+	}
+	// Want 2 stats keys for the service
+	if count != 2 {
+		t.Errorf("Got %d service stats keys, want 2", count)
+	}
+
+	count = 0
+	for got := range statsValues("blobmap/*/*", since) {
+		count++
+
+		// All stats values should be zero (see stats_test.go)
+		if got.(int64) != 0 {
+			t.Errorf("Got %v, want 0", got)
+		}
+	}
+	// Want 2 stats keys for the blobmap
+	if count != 2 {
+		t.Errorf("Got %d blobmap stats keys, want 2", count)
+	}
+
+	for range statsValues("db/v_io_a_xyz/*/*/*", since) {
+		t.Error("Expected no database-specific stats")
+	}
+}
+
+func TestWithMultipleDbs(t *testing.T) {
+	ctx, serverName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+
+	since := time.Now()
+	service := syncbase.NewService(serverName)
+	tu.CreateDatabase(t, ctx, service, "empty_db_0")
+	tu.CreateDatabase(t, ctx, service, "empty_db_1")
+	tu.CreateDatabase(t, ctx, service, "empty_db_2")
+
+	count := 0
+	for got := range statsValues("service/*/*", since) {
+		count++
+
+		// All stats values should be zero (see stats_test.go)
+		if got.(int64) != 0 {
+			t.Errorf("Got %v, want 0", got)
+		}
+	}
+	// Want 2 stats keys for the service
+	if count != 2 {
+		t.Errorf("Got %d service stats keys, want 2", count)
+	}
+
+	count = 0
+	for got := range statsValues("blobmap/*/*", since) {
+		count++
+
+		// All stats values should be zero (see stats_test.go)
+		if got.(int64) != 0 {
+			t.Errorf("Got %v, want 0", got)
+		}
+	}
+	// Want 2 stats keys for the blobmap
+	if count != 2 {
+		t.Errorf("Got %d blobmap stats keys, want 2", count)
+	}
+
+	count = 0
+	for got := range statsValues("db/v_io_a_xyz/*/*/*", since) {
+		count++
+
+		// All stats values should be zero (see stats_test.go)
+		if got.(int64) != 0 {
+			t.Errorf("Got %v, want 0", got)
+		}
+	}
+	// Want 2 stats keys per each of 3 DBs
+	if count != 6 {
+		t.Errorf("Got %d DB stats keys, want 6", count)
+	}
+}
+
+func TestWithLotsOfData(t *testing.T) {
+	ctx, serverName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+
+	since := time.Now()
+	service := syncbase.NewService(serverName)
+	db := tu.CreateDatabase(t, ctx, service, "big_db")
+
+	write100MB(ctx, db)
+
+	// Expect at least 50 MB (see stats_test.go)
+	fileBytes := <-statsValues("db/v_io_a_xyz/big_db/*/filesystem_bytes", since)
+	if fileBytes.(int64) < 50*1024*1024 {
+		t.Errorf("Got %v, want more than 50 MB", fileBytes.(int64))
+	}
+	// Expect 25 or more (see stats_test.go)
+	fileCount := <-statsValues("db/v_io_a_xyz/big_db/*/file_count", since)
+	if fileCount.(int64) < 25 {
+		t.Errorf("Got %v, want 25 or more", fileCount.(int64))
+	}
+}
+
+// statsValues returns a channel with all the values since the given time of the
+// sync keys that match the given glob under the root "syncbase/leveldb/".
+func statsValues(pattern string, since time.Time) <-chan interface{} {
+	ch := make(chan interface{})
+	go func() {
+		iter := stats.Glob("syncbase/leveldb/", pattern, since, true)
+		for iter.Advance() {
+			if err := iter.Err(); err != nil {
+				panic(err)
+			}
+			if iter.Value().Value == nil {
+				log.Printf("Ignoring empty stats %q", iter.Value().Key)
+			} else {
+				ch <- iter.Value().Value
+			}
+		}
+		close(ch)
+	}()
+	return ch
+}
+
+// Write 100 kB each to 1024 keys, returning keys written.
+func write100MB(ctx *context.T, db syncbase.Database) {
+	coll := db.Collection(ctx, "the_collection")
+	err := coll.Create(ctx, permissions)
+	if err != nil {
+		panic(err)
+	}
+
+	hundredKB := make([]byte, 100*1024)
+
+	// Write 1024 X 100 KB  (100 MB).
+	for i := 0; i < 1024; i++ {
+		rand.Read(hundredKB) // Randomize so it won't compress.
+		key := fmt.Sprintf("foo-%d", i)
+		err := coll.Put(ctx, key, hundredKB)
+		if err != nil {
+			panic(err)
+		}
+	}
+	return
+}
+
+var (
+	allAccess   = access.AccessList{In: []security.BlessingPattern{"..."}}
+	permissions = access.Permissions{
+		"Admin":   allAccess,
+		"Write":   allAccess,
+		"Read":    allAccess,
+		"Resolve": allAccess,
+		"Debug":   allAccess,
+	}
+)
diff --git a/services/syncbase/store/leveldb/stats_test.go b/services/syncbase/store/leveldb/stats_test.go
index 3d845ab..a882137 100644
--- a/services/syncbase/store/leveldb/stats_test.go
+++ b/services/syncbase/store/leveldb/stats_test.go
@@ -11,9 +11,14 @@
 	"math/rand"
 	"testing"
 
+	"v.io/x/ref/lib/stats"
 	"v.io/x/ref/services/syncbase/store/transactions"
 )
 
+// In this file we tests using private function in the batchstore wrapper around
+// LevelDB.  For higher-level tests via the syncbase public API see the
+// stats_black_box_test.go file.
+
 func TestWithNoData(t *testing.T) {
 	bs, cleanup := newBatchStore()
 	defer cleanup()
@@ -30,7 +35,7 @@
 		t.Errorf("Got %d files from fileCount(), want 0", fileCount)
 	}
 
-	fileCounts, fileMBs, readMBs, writeMBs, err := bs.stats()
+	fileCounts, fileMBs, readMBs, writeMBs, err := bs.levelInfo()
 	if err != nil {
 		t.Errorf("Problem getting stats: %v", err)
 	}
@@ -38,7 +43,7 @@
 		t.Errorf("Got %d levels, want 0", len(fileCounts))
 	}
 	if sum(fileCounts) != 0 {
-		t.Errorf("Got %d files from stats(), want 0", sum(fileCounts))
+		t.Errorf("Got %d files from levelInfo(), want 0", sum(fileCounts))
 	}
 	if sum(fileMBs) != 0 {
 		t.Errorf("Got %d MB files, want 0", sum(fileMBs))
@@ -51,6 +56,29 @@
 	}
 }
 
+func TestWithNoData_ViaStats(t *testing.T) {
+	bs, cleanup := newBatchStore()
+	defer cleanup()
+
+	// New db with no data added.
+
+	byteCount, err := stats.GetStatsObject(bs.statsPrefix + "/filesystem_bytes")
+	if err != nil {
+		t.Fatalf("Problem getting stats object: %v", err)
+	}
+	if byteCount.Value().(int64) != 0 {
+		t.Errorf("Wanted zero byteCount for new database. Got %v", byteCount.Value())
+	}
+
+	fileCount, err := stats.GetStatsObject(bs.statsPrefix + "/file_count")
+	if err != nil {
+		t.Fatalf("Problem getting stats object: %v", err)
+	}
+	if fileCount.Value().(int64) != 0 {
+		t.Errorf("Got %v files from fileCount(), want 0", fileCount.Value())
+	}
+}
+
 func TestWithLotsOfData(t *testing.T) {
 	bs, cleanup := newBatchStore()
 	defer cleanup()
@@ -81,7 +109,7 @@
 		t.Errorf("Got %d files from fileCount(), want 10 or more", fileCount)
 	}
 
-	fileCountsFromStats, fileMBs, readMBs, writeMBs, err := bs.stats()
+	fileCountsFromStats, fileMBs, readMBs, writeMBs, err := bs.levelInfo()
 	if err != nil {
 		t.Errorf("Problem getting stats: %v", err)
 	}
@@ -99,7 +127,7 @@
 	// ways match, at least approximately.
 	totFileBytes := uint64(sum(fileMBs)) * 1024 * 1024
 	if !approxEquals(totFileBytes, byteCount) {
-		t.Errorf("Got %d B of files, want approximately %s B",
+		t.Errorf("Got %d B of files, want approximately %d B",
 			totFileBytes, byteCount)
 	}
 	// As LevelDB does compression, we assume it has to read a significant
@@ -114,6 +142,43 @@
 	}
 }
 
+func TestWithLotsOfData_ViaStats(t *testing.T) {
+	bs, cleanup := newBatchStore()
+	defer cleanup()
+
+	keys := write100MB(bs)
+
+	val, err := bs.Get(keys[0], nil)
+	if err != nil {
+		t.Errorf("Problem reading from database: %v", err)
+	}
+	if len(val) != 100*1024 {
+		t.Errorf(`Got %d, wanted %d`, len(val), 100*1024)
+	}
+
+	// Assume at least half the 100 MB of data made it out to level files.
+	byteCount, err := stats.GetStatsObject(bs.statsPrefix + "/filesystem_bytes")
+	if err != nil {
+		t.Fatalf("Problem getting stats object: %v", err)
+	}
+	if byteCount.Value().(int64) < 50*1024*1024 {
+		t.Errorf("Wanted more than 50 MB. Got %v B", byteCount.Value())
+	}
+
+	// According to https://rawgit.com/google/leveldb/master/doc/impl.html
+	// the files are of size 2 MB, so because we have writen 100 MB we
+	// expect that eventually that will be 50 files.  But let's be
+	// conservative and assume only half of the data has made it out to the
+	// level files.
+	fileCount, err := stats.GetStatsObject(bs.statsPrefix + "/file_count")
+	if err != nil {
+		t.Fatalf("Problem getting stats object: %v", err)
+	}
+	if fileCount.Value().(int64) < 25 {
+		t.Errorf("Got %v files from fileCount(), want 25 or more", fileCount.Value())
+	}
+}
+
 // True if within 10%.
 func approxEquals(a, b uint64) bool {
 	diff := math.Abs(float64(a) - float64(b))
@@ -121,6 +186,9 @@
 	return diff/mean < 0.1
 }
 
+var nextTestNum = 0
+
+// newBatchStore returns an empty leveldb store in a new temp directory.
 func newBatchStore() (bs *db, cleanup func()) {
 	path, err := ioutil.TempDir("", "syncbase_leveldb")
 	if err != nil {