Use stats to monitor LevelDb file usage.
This adds the following four stats for a syncbase service:
syncbase/leveldb/service/{hash}/file_count
syncbase/leveldb/service/{hash}/filesystem_bytes
syncbase/leveldb/blobmap/{hash}/file_count
syncbase/leveldb/blobmap/{hash}/filesystem_bytes
and the following two stats metrics per database:
syncbase/leveldb/db/{blessing}/{DB-name}/{hash}/file_count
syncbase/leveldb/db/{blessing}/{DB-name}/{hash}/filesystem_bytes
where {hash} is a hash of the file path of the store.
Also fixes a bug: the service store was not being closed when these
service was closed.
Change-Id: I844786c105d2671df1013cd29b2534eeedefb495
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 0143749..7941db5 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -274,6 +274,7 @@
func (s *service) Close() {
s.vclockD.Close()
vsync.Close(s.sync)
+ s.st.Close()
}
////////////////////////////////////////
diff --git a/services/syncbase/store/leveldb/db.go b/services/syncbase/store/leveldb/db.go
index eae2e99..fbb09aa 100644
--- a/services/syncbase/store/leveldb/db.go
+++ b/services/syncbase/store/leveldb/db.go
@@ -11,12 +11,18 @@
// #include "syncbase_leveldb.h"
import "C"
import (
+ "encoding/base64"
"fmt"
+ "hash/fnv"
+ "regexp"
"strings"
"sync"
"unsafe"
+ "v.io/v23/naming"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
+ "v.io/x/ref/lib/stats"
"v.io/x/ref/services/syncbase/store"
"v.io/x/ref/services/syncbase/store/transactions"
)
@@ -32,6 +38,7 @@
readOptions *C.leveldb_readoptions_t
writeOptions *C.leveldb_writeoptions_t
err error
+ statsPrefix string
}
const defaultMaxOpenFiles = 100
@@ -43,6 +50,16 @@
}
// Open opens the database located at the given path.
+//
+// This adds the following four stats for a syncbase service:
+// syncbase/leveldb/service/{hash}/file_count
+// syncbase/leveldb/service/{hash}/filesystem_bytes
+// syncbase/leveldb/blobmap/{hash}/file_count
+// syncbase/leveldb/blobmap/{hash}/filesystem_bytes
+// and the following two stats metrics per database:
+// syncbase/leveldb/db/{blessing}/{DB-name}/{hash}/file_count
+// syncbase/leveldb/db/{blessing}/{DB-name}/{hash}/filesystem_bytes
+// where {hash} is a hash of the file path of the store.
func Open(path string, opts OpenOptions) (store.Store, error) {
bs, err := openBatchStore(path, opts)
if err != nil {
@@ -51,6 +68,14 @@
return transactions.Wrap(bs), nil
}
+// addStatCallback creates a stats object and returns its key
+func addStatCallback(prefix string, metric string, callback func() int64) string {
+ statsKey := naming.Join(prefix, metric)
+ stats.NewIntegerFunc(statsKey, callback)
+ vlog.VI(1).Infof("Adding stats call at %q", statsKey)
+ return statsKey
+}
+
// openBatchStore opens the non-transactional leveldb database that supports
// batch writes at the given path.
func openBatchStore(path string, opts OpenOptions) (*db, error) {
@@ -87,18 +112,28 @@
}
readOptions := C.leveldb_readoptions_create()
C.leveldb_readoptions_set_verify_checksums(readOptions, 1)
- return &db{
+
+ d := &db{
node: store.NewResourceNode(),
cDb: cDb,
readOptions: readOptions,
writeOptions: C.leveldb_writeoptions_create(),
- }, nil
+ statsPrefix: statsPrefixFromPath(path),
+ }
+
+ addStatCallback(d.statsPrefix, "file_count", func() int64 { return int64(d.fileCount()) })
+ addStatCallback(d.statsPrefix, "filesystem_bytes", func() int64 { return int64(d.filesystemBytes()) })
+
+ return d, nil
}
// Close implements the store.Store interface.
func (d *db) Close() error {
d.mu.Lock()
defer d.mu.Unlock()
+ if err := stats.Delete(d.statsPrefix); err != nil {
+ vlog.Errorf("Problem deleting stats %q: %v", d.statsPrefix, err)
+ }
if d.err != nil {
return store.ConvertError(d.err)
}
@@ -217,10 +252,10 @@
return C.GoString(value)
}
-// stats returns per-level statistics as a set of parallel slices of the same
-// length. For small databases this call is about 2X slower than a Get; for
-// large databases it is about one order of magnitude faster than a Get.
-func (d *db) stats() (fileCounts, fileMBs, readMBs, writeMBs []int, err error) {
+// levelInfo returns per-level statistics as a set of parallel slices of the
+// same length. For small databases this call is about 2X slower than a Get;
+// for large databases it is about one order of magnitude faster than a Get.
+func (d *db) levelInfo() (fileCounts, fileMBs, readMBs, writeMBs []int, err error) {
// It's unfortunate that LevelDB only provides the stats in a formatted
// string, so we have to parse them out.
text := d.property("leveldb.stats")
@@ -267,3 +302,67 @@
func (d *db) fileCount() int {
return int(C.syncbase_leveldb_file_count(d.cDb))
}
+
+// statsPrefixFromPath transforms a leveldb file path into a string that is more
+// convenient as a stats key in the following ways:
+//
+// * it is shorter, filtering out long hex strings and redundant fixed strings
+//
+// * it is easy to query using the stats glob API, always starting with
+// "syncbase/leveldb/{storeType}", where store type is one of {"service",
+// "blobmap", "db", "test", "other"}
+//
+// * it preserves human-readable database name, blessing, and syncbase service
+// IDs if they appear in the file path.
+//
+// * it is unique to a service instance with high probability, by including a
+// short hash of the path
+func statsPrefixFromPath(path string) string {
+ return naming.Join("syncbase", "leveldb", applyRules(path), hash(path))
+}
+
+func applyRules(path string) string {
+ for _, rule := range rules {
+ if rule.from.MatchString(path) {
+ return rule.from.ReplaceAllString(path, rule.to)
+ }
+ }
+ return fmt.Sprintf("other/%s", defaultRuleDelete.ReplaceAllString(path, ""))
+}
+
+var rules = []struct {
+ from *regexp.Regexp
+ to string
+}{
+ // From v.io/x/ref/services/syncbase/server.rootDirForDb
+ {regexp.MustCompile(
+ `.*/apps/(.+)\-[0-9a-f]+/dbs/(.+)\-[0-9a-f]+\-[0-9a-f]+/leveldb$`),
+ "db/$1/$2"},
+
+ // From v.io/x/ref/services/syncbase/localblobstore/fs_cablobstore.Create
+ {regexp.MustCompile(
+ `.*/blobs/chunk$`),
+ "blobmap"},
+
+ // From v.io/x/ref/services/syncbase/server.NewService
+ {regexp.MustCompile(
+ `.*/leveldb$`),
+ "service"},
+
+ // From v.io/x/ref/services/syncbase/store/leveldb.newBatchStore
+ {regexp.MustCompile(
+ `.*/syncbase_leveldb[0-9]+$`),
+ "test"},
+}
+
+// Used only if none of the above rules match
+var defaultRuleDelete = regexp.MustCompile(`(/tmp|/leveldb|-[0-9a-f]{30,})`)
+
+// hash returns a hash string of s. A small 32-bit non-crypto hash is good
+// enough, as it is just a fallback for the unlikely case we get a collision in
+// the filtered pathname.
+func hash(s string) string {
+ h := fnv.New32()
+ h.Write([]byte(s))
+ return base64.RawURLEncoding.EncodeToString(h.Sum([]byte{}))
+}
diff --git a/services/syncbase/store/leveldb/stats_benchmark_test.go b/services/syncbase/store/leveldb/stats_benchmark_test.go
index 9824a73..52085b5 100644
--- a/services/syncbase/store/leveldb/stats_benchmark_test.go
+++ b/services/syncbase/store/leveldb/stats_benchmark_test.go
@@ -6,6 +6,8 @@
import (
"testing"
+
+ "v.io/x/ref/lib/stats"
)
var (
@@ -53,6 +55,20 @@
}
+func BenchmarkStats_FileSystemBytesViaStats_LotsOfData(b *testing.B) {
+ bs, cleanup := newBatchStore()
+ defer cleanup()
+ defer b.StopTimer() // Called before cleanup() to avoid timing it.
+ write100MB(bs)
+ b.ResetTimer() // Ignore timing of init stuff above.
+
+ for i := 0; i < b.N; i++ {
+ fileBytes, _ := stats.GetStatsObject(
+ bs.statsPrefix + "/filesystem_bytes")
+ accumulate += int(fileBytes.Value().(int64))
+ }
+}
+
func BenchmarkStats_FileCount_NoData(b *testing.B) {
bs, cleanup := newBatchStore()
defer cleanup()
@@ -91,7 +107,21 @@
}
-func BenchmarkStats_Stats_NoData(b *testing.B) {
+func BenchmarkStats_FileCountViaStats_LotsOfData(b *testing.B) {
+ bs, cleanup := newBatchStore()
+ defer cleanup()
+ defer b.StopTimer() // Called before cleanup() to avoid timing it.
+ write100MB(bs)
+ b.ResetTimer() // Ignore timing of init stuff above.
+
+ for i := 0; i < b.N; i++ {
+ fileCount, _ := stats.GetStatsObject(
+ bs.statsPrefix + "/file_count")
+ accumulate += int(fileCount.Value().(int64))
+ }
+}
+
+func BenchmarkStats_LevelInfo_NoData(b *testing.B) {
bs, cleanup := newBatchStore()
defer cleanup()
defer b.StopTimer() // Called before cleanup() to avoid timing it.
@@ -99,13 +129,13 @@
for i := 0; i < b.N; i++ {
var c, s, r, w []int
- c, s, r, w, err = bs.stats()
+ c, s, r, w, err = bs.levelInfo()
accumulate += len(c) + len(s) + len(r) + len(w)
}
}
-func BenchmarkStats_Stats_SomeData(b *testing.B) {
+func BenchmarkStats_LevelInfo_SomeData(b *testing.B) {
bs, cleanup := newBatchStore()
defer cleanup()
defer b.StopTimer() // Called before cleanup() to avoid timing it.
@@ -114,13 +144,13 @@
for i := 0; i < b.N; i++ {
var c, s, r, w []int
- c, s, r, w, err = bs.stats()
+ c, s, r, w, err = bs.levelInfo()
accumulate += len(c) + len(s) + len(r) + len(w)
}
}
-func BenchmarkStats_Stats_LotsOfData(b *testing.B) {
+func BenchmarkStats_LevelInfo_LotsOfData(b *testing.B) {
bs, cleanup := newBatchStore()
defer cleanup()
defer b.StopTimer() // Called before cleanup() to avoid timing it.
@@ -129,7 +159,7 @@
for i := 0; i < b.N; i++ {
var c, s, r, w []int
- c, s, r, w, err = bs.stats()
+ c, s, r, w, err = bs.levelInfo()
accumulate += c[0] + s[0] + r[0] + w[0]
}
diff --git a/services/syncbase/store/leveldb/stats_blackbox_test.go b/services/syncbase/store/leveldb/stats_blackbox_test.go
new file mode 100644
index 0000000..71172b0
--- /dev/null
+++ b/services/syncbase/store/leveldb/stats_blackbox_test.go
@@ -0,0 +1,193 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package leveldb_test
+
+import (
+ "crypto/rand"
+ "fmt"
+ "log"
+ "testing"
+ "time"
+
+ "v.io/v23/context"
+ "v.io/v23/security"
+ "v.io/v23/security/access"
+ "v.io/v23/syncbase"
+ "v.io/x/ref/lib/stats"
+ _ "v.io/x/ref/runtime/factories/generic"
+ tu "v.io/x/ref/services/syncbase/testutil"
+)
+
+// In this file we do integration tests of the leveldb stats via the public
+// Syncbase API. For lower level tests see the stats_test.go file.
+
+func TestEmpty(t *testing.T) {
+ _, serverName, cleanup := tu.SetupOrDie(nil)
+ defer cleanup()
+
+ since := time.Now()
+ syncbase.NewService(serverName)
+
+ count := 0
+ for got := range statsValues("service/*/*", since) {
+ count++
+
+ // All stats values should be zero (see stats_test.go)
+ if got.(int64) != 0 {
+ t.Errorf("Got %v, want 0", got)
+ }
+ }
+ // Want 2 stats keys for the service
+ if count != 2 {
+ t.Errorf("Got %d service stats keys, want 2", count)
+ }
+
+ count = 0
+ for got := range statsValues("blobmap/*/*", since) {
+ count++
+
+ // All stats values should be zero (see stats_test.go)
+ if got.(int64) != 0 {
+ t.Errorf("Got %v, want 0", got)
+ }
+ }
+ // Want 2 stats keys for the blobmap
+ if count != 2 {
+ t.Errorf("Got %d blobmap stats keys, want 2", count)
+ }
+
+ for range statsValues("db/v_io_a_xyz/*/*/*", since) {
+ t.Error("Expected no database-specific stats")
+ }
+}
+
+func TestWithMultipleDbs(t *testing.T) {
+ ctx, serverName, cleanup := tu.SetupOrDie(nil)
+ defer cleanup()
+
+ since := time.Now()
+ service := syncbase.NewService(serverName)
+ tu.CreateDatabase(t, ctx, service, "empty_db_0")
+ tu.CreateDatabase(t, ctx, service, "empty_db_1")
+ tu.CreateDatabase(t, ctx, service, "empty_db_2")
+
+ count := 0
+ for got := range statsValues("service/*/*", since) {
+ count++
+
+ // All stats values should be zero (see stats_test.go)
+ if got.(int64) != 0 {
+ t.Errorf("Got %v, want 0", got)
+ }
+ }
+ // Want 2 stats keys for the service
+ if count != 2 {
+ t.Errorf("Got %d service stats keys, want 2", count)
+ }
+
+ count = 0
+ for got := range statsValues("blobmap/*/*", since) {
+ count++
+
+ // All stats values should be zero (see stats_test.go)
+ if got.(int64) != 0 {
+ t.Errorf("Got %v, want 0", got)
+ }
+ }
+ // Want 2 stats keys for the blobmap
+ if count != 2 {
+ t.Errorf("Got %d blobmap stats keys, want 2", count)
+ }
+
+ count = 0
+ for got := range statsValues("db/v_io_a_xyz/*/*/*", since) {
+ count++
+
+ // All stats values should be zero (see stats_test.go)
+ if got.(int64) != 0 {
+ t.Errorf("Got %v, want 0", got)
+ }
+ }
+ // Want 2 stats keys per each of 3 DBs
+ if count != 6 {
+ t.Errorf("Got %d DB stats keys, want 6", count)
+ }
+}
+
+func TestWithLotsOfData(t *testing.T) {
+ ctx, serverName, cleanup := tu.SetupOrDie(nil)
+ defer cleanup()
+
+ since := time.Now()
+ service := syncbase.NewService(serverName)
+ db := tu.CreateDatabase(t, ctx, service, "big_db")
+
+ write100MB(ctx, db)
+
+ // Expect at least 50 MB (see stats_test.go)
+ fileBytes := <-statsValues("db/v_io_a_xyz/big_db/*/filesystem_bytes", since)
+ if fileBytes.(int64) < 50*1024*1024 {
+ t.Errorf("Got %v, want more than 50 MB", fileBytes.(int64))
+ }
+ // Expect 25 or more (see stats_test.go)
+ fileCount := <-statsValues("db/v_io_a_xyz/big_db/*/file_count", since)
+ if fileCount.(int64) < 25 {
+ t.Errorf("Got %v, want 25 or more", fileCount.(int64))
+ }
+}
+
+// statsValues returns a channel with all the values since the given time of the
+// sync keys that match the given glob under the root "syncbase/leveldb/".
+func statsValues(pattern string, since time.Time) <-chan interface{} {
+ ch := make(chan interface{})
+ go func() {
+ iter := stats.Glob("syncbase/leveldb/", pattern, since, true)
+ for iter.Advance() {
+ if err := iter.Err(); err != nil {
+ panic(err)
+ }
+ if iter.Value().Value == nil {
+ log.Printf("Ignoring empty stats %q", iter.Value().Key)
+ } else {
+ ch <- iter.Value().Value
+ }
+ }
+ close(ch)
+ }()
+ return ch
+}
+
+// Write 100 kB each to 1024 keys, returning keys written.
+func write100MB(ctx *context.T, db syncbase.Database) {
+ coll := db.Collection(ctx, "the_collection")
+ err := coll.Create(ctx, permissions)
+ if err != nil {
+ panic(err)
+ }
+
+ hundredKB := make([]byte, 100*1024)
+
+ // Write 1024 X 100 KB (100 MB).
+ for i := 0; i < 1024; i++ {
+ rand.Read(hundredKB) // Randomize so it won't compress.
+ key := fmt.Sprintf("foo-%d", i)
+ err := coll.Put(ctx, key, hundredKB)
+ if err != nil {
+ panic(err)
+ }
+ }
+ return
+}
+
+var (
+ allAccess = access.AccessList{In: []security.BlessingPattern{"..."}}
+ permissions = access.Permissions{
+ "Admin": allAccess,
+ "Write": allAccess,
+ "Read": allAccess,
+ "Resolve": allAccess,
+ "Debug": allAccess,
+ }
+)
diff --git a/services/syncbase/store/leveldb/stats_test.go b/services/syncbase/store/leveldb/stats_test.go
index 3d845ab..a882137 100644
--- a/services/syncbase/store/leveldb/stats_test.go
+++ b/services/syncbase/store/leveldb/stats_test.go
@@ -11,9 +11,14 @@
"math/rand"
"testing"
+ "v.io/x/ref/lib/stats"
"v.io/x/ref/services/syncbase/store/transactions"
)
+// In this file we tests using private function in the batchstore wrapper around
+// LevelDB. For higher-level tests via the syncbase public API see the
+// stats_black_box_test.go file.
+
func TestWithNoData(t *testing.T) {
bs, cleanup := newBatchStore()
defer cleanup()
@@ -30,7 +35,7 @@
t.Errorf("Got %d files from fileCount(), want 0", fileCount)
}
- fileCounts, fileMBs, readMBs, writeMBs, err := bs.stats()
+ fileCounts, fileMBs, readMBs, writeMBs, err := bs.levelInfo()
if err != nil {
t.Errorf("Problem getting stats: %v", err)
}
@@ -38,7 +43,7 @@
t.Errorf("Got %d levels, want 0", len(fileCounts))
}
if sum(fileCounts) != 0 {
- t.Errorf("Got %d files from stats(), want 0", sum(fileCounts))
+ t.Errorf("Got %d files from levelInfo(), want 0", sum(fileCounts))
}
if sum(fileMBs) != 0 {
t.Errorf("Got %d MB files, want 0", sum(fileMBs))
@@ -51,6 +56,29 @@
}
}
+func TestWithNoData_ViaStats(t *testing.T) {
+ bs, cleanup := newBatchStore()
+ defer cleanup()
+
+ // New db with no data added.
+
+ byteCount, err := stats.GetStatsObject(bs.statsPrefix + "/filesystem_bytes")
+ if err != nil {
+ t.Fatalf("Problem getting stats object: %v", err)
+ }
+ if byteCount.Value().(int64) != 0 {
+ t.Errorf("Wanted zero byteCount for new database. Got %v", byteCount.Value())
+ }
+
+ fileCount, err := stats.GetStatsObject(bs.statsPrefix + "/file_count")
+ if err != nil {
+ t.Fatalf("Problem getting stats object: %v", err)
+ }
+ if fileCount.Value().(int64) != 0 {
+ t.Errorf("Got %v files from fileCount(), want 0", fileCount.Value())
+ }
+}
+
func TestWithLotsOfData(t *testing.T) {
bs, cleanup := newBatchStore()
defer cleanup()
@@ -81,7 +109,7 @@
t.Errorf("Got %d files from fileCount(), want 10 or more", fileCount)
}
- fileCountsFromStats, fileMBs, readMBs, writeMBs, err := bs.stats()
+ fileCountsFromStats, fileMBs, readMBs, writeMBs, err := bs.levelInfo()
if err != nil {
t.Errorf("Problem getting stats: %v", err)
}
@@ -99,7 +127,7 @@
// ways match, at least approximately.
totFileBytes := uint64(sum(fileMBs)) * 1024 * 1024
if !approxEquals(totFileBytes, byteCount) {
- t.Errorf("Got %d B of files, want approximately %s B",
+ t.Errorf("Got %d B of files, want approximately %d B",
totFileBytes, byteCount)
}
// As LevelDB does compression, we assume it has to read a significant
@@ -114,6 +142,43 @@
}
}
+func TestWithLotsOfData_ViaStats(t *testing.T) {
+ bs, cleanup := newBatchStore()
+ defer cleanup()
+
+ keys := write100MB(bs)
+
+ val, err := bs.Get(keys[0], nil)
+ if err != nil {
+ t.Errorf("Problem reading from database: %v", err)
+ }
+ if len(val) != 100*1024 {
+ t.Errorf(`Got %d, wanted %d`, len(val), 100*1024)
+ }
+
+ // Assume at least half the 100 MB of data made it out to level files.
+ byteCount, err := stats.GetStatsObject(bs.statsPrefix + "/filesystem_bytes")
+ if err != nil {
+ t.Fatalf("Problem getting stats object: %v", err)
+ }
+ if byteCount.Value().(int64) < 50*1024*1024 {
+ t.Errorf("Wanted more than 50 MB. Got %v B", byteCount.Value())
+ }
+
+ // According to https://rawgit.com/google/leveldb/master/doc/impl.html
+ // the files are of size 2 MB, so because we have writen 100 MB we
+ // expect that eventually that will be 50 files. But let's be
+ // conservative and assume only half of the data has made it out to the
+ // level files.
+ fileCount, err := stats.GetStatsObject(bs.statsPrefix + "/file_count")
+ if err != nil {
+ t.Fatalf("Problem getting stats object: %v", err)
+ }
+ if fileCount.Value().(int64) < 25 {
+ t.Errorf("Got %v files from fileCount(), want 25 or more", fileCount.Value())
+ }
+}
+
// True if within 10%.
func approxEquals(a, b uint64) bool {
diff := math.Abs(float64(a) - float64(b))
@@ -121,6 +186,9 @@
return diff/mean < 0.1
}
+var nextTestNum = 0
+
+// newBatchStore returns an empty leveldb store in a new temp directory.
func newBatchStore() (bs *db, cleanup func()) {
path, err := ioutil.TempDir("", "syncbase_leveldb")
if err != nil {