vmon/lib: add vmon check for cloud syncbase instances.

Change-Id: Iaaf2c16304d415b15be9ac97c14ff06282213fe8
diff --git a/internal/monitoring/monitoring.go b/internal/monitoring/monitoring.go
index 8b10445..8081feb 100644
--- a/internal/monitoring/monitoring.go
+++ b/internal/monitoring/monitoring.go
@@ -82,8 +82,10 @@
 		return i, nil
 	case int64:
 		return float64(i), nil
+	case uint64:
+		return float64(i), nil
 	default:
-		return 0, fmt.Errorf("invalid value: %v", sv.Value)
+		return 0, fmt.Errorf("invalid value: %v, %T", sv.Value, sv.Value)
 	}
 }
 
@@ -107,6 +109,15 @@
 // customMetricDescriptors is a map from metric's short names to their
 // MetricDescriptor definitions.
 var customMetricDescriptors = map[string]*cloudmonitoring.MetricDescriptor{
+	// Custom metrics for recording stats of cloud syncbase instances.
+	"cloud-syncbase": createMetric("cloud-syncbase", "Stats of cloud syncbase instances.", "double", false, []labelData{
+		labelData{
+			key:         "mounted_name",
+			description: "The relative mounted name of the instance",
+		},
+	}),
+	"cloud-syncbase-agg": createMetric("cloud-syncbase-agg", "The aggregated stats of cloud syncbase instances.", "double", false, aggLabelData),
+
 	// Custom metrics for recording check latency and its aggregation
 	// of vanadium production services.
 	"service-latency":     createMetric("service/latency", "The check latency (ms) of vanadium production services.", "double", true, nil),
diff --git a/vmon/check.go b/vmon/check.go
index a628334..1b46d09 100644
--- a/vmon/check.go
+++ b/vmon/check.go
@@ -20,6 +20,7 @@
 
 // checkFunctions is a map from check names to the corresponding check functions.
 var checkFunctions = map[string]func(*context.T, *tool.Context, *cloudmonitoring.Service) error{
+	"cloud-syncbase":            checkCloudSyncbaseInstances,
 	"jenkins":                   checkJenkins,
 	"service-latency":           checkServiceLatency,
 	"service-permethod-latency": checkServicePerMethodLatency,
diff --git a/vmon/cloudsyncbase.go b/vmon/cloudsyncbase.go
new file mode 100644
index 0000000..9b086a5
--- /dev/null
+++ b/vmon/cloudsyncbase.go
@@ -0,0 +1,234 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"fmt"
+	"runtime"
+	"time"
+
+	cloudmonitoring "google.golang.org/api/monitoring/v3"
+
+	"v.io/jiri/tool"
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/naming"
+	"v.io/x/devtools/internal/monitoring"
+	"v.io/x/devtools/internal/test"
+)
+
+const (
+	cloudSyncbasePattern    = "home/*/syncbased"
+	sysStatCPUUsagePercent  = "__debug/stats/system/syscpu/Percent"
+	sysStatMemUsagePercent  = "__debug/stats/system/sysmem/UsedPercent"
+	sysStatMemUsageBytes    = "__debug/stats/system/sysmem/Used"
+	sysStatDiskUsagePercent = "__debug/stats/system/sysdisk/%2Fdata/UsedPercent"
+	sysStatDiskUsageBytes   = "__debug/stats/system/sysdisk/%2Fdata/Used"
+)
+
+var (
+	cloudSyncbaseTimeout = 20 * time.Second
+)
+
+type cloudSyncbaseStatsTask struct {
+	mountedName  string
+	sbMountEntry *naming.MountEntry
+	taskType     cloudSyncbaseStatsTaskType
+}
+
+type cloudSyncbaseStatsTaskType int
+
+const (
+	taskTypeCpu cloudSyncbaseStatsTaskType = iota
+	taskTypeMem
+	taskTypeDisk
+	taskTypeLatency
+	taskTypeQPS
+)
+
+type cloudSyncbaseStatsResult struct {
+	err error
+}
+
+type metricData struct {
+	metricLabel string
+	value       float64
+}
+
+func checkCloudSyncbaseInstances(v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service) error {
+	v23ctx, cancel := context.WithTimeout(v23ctx, cloudSyncbaseTimeout)
+	defer cancel()
+
+	// Find all cloud syncbase instances.
+	glob, err := v23.GetNamespace(v23ctx).Glob(v23ctx, cloudSyncbasePattern)
+	if err != nil {
+		return err
+	}
+	sbInstances := []*naming.MountEntry{}
+	for e := range glob {
+		switch e := e.(type) {
+		case *naming.GlobReplyEntry:
+			sbInstances = append(sbInstances, &e.Value)
+		case *naming.GlobReplyError:
+			fmt.Fprintf(ctx.Stderr(), "%v\n", e.Value.Error)
+		}
+	}
+
+	// Query stats from each instance.
+	taskTypes := []cloudSyncbaseStatsTaskType{taskTypeCpu, taskTypeMem, taskTypeDisk, taskTypeLatency, taskTypeQPS}
+	numTasks := len(sbInstances) * len(taskTypes)
+	tasks := make(chan cloudSyncbaseStatsTask, numTasks)
+	taskResults := make(chan cloudSyncbaseStatsResult, numTasks)
+	aggs := map[string]*aggregator{}
+	md, err := monitoring.GetMetric("cloud-syncbase", projectFlag)
+	if err != nil {
+		return err
+	}
+	now := time.Now().UTC().Format(time.RFC3339)
+	// Start workers and distribute work to them.
+	for i := 0; i < runtime.NumCPU(); i++ {
+		go statsWorker(v23ctx, ctx, s, now, aggs, md, tasks, taskResults)
+	}
+	for _, sb := range sbInstances {
+		for _, t := range taskTypes {
+			tasks <- cloudSyncbaseStatsTask{
+				mountedName:  sb.Name,
+				sbMountEntry: sb,
+				taskType:     t,
+			}
+		}
+	}
+	close(tasks)
+
+	// Wait for all results to come back and send the aggregated data to GCM.
+	for i := 0; i < numTasks; i++ {
+		result := <-taskResults
+		if result.err != nil {
+			fmt.Fprintf(ctx.Stderr(), "%v\n", result.err)
+			continue
+		}
+	}
+	mdAgg, err := monitoring.GetMetric("cloud-syncbase-agg", projectFlag)
+	if err != nil {
+		return err
+	}
+	for metricLabel, agg := range aggs {
+		if err := sendAggregatedDataToGCM(ctx, s, mdAgg, agg, now, metricLabel); err != nil {
+			fmt.Fprintf(ctx.Stderr(), "%v\n", err)
+		}
+	}
+
+	return nil
+}
+
+// statsWorker queries stats based on the task type, sends the results to GCM,
+// and updates the corresponding aggregator.
+func statsWorker(
+	v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service,
+	now string, aggs map[string]*aggregator, md *cloudmonitoring.MetricDescriptor,
+	tasks <-chan cloudSyncbaseStatsTask, results chan<- cloudSyncbaseStatsResult) {
+	for t := range tasks {
+		result := cloudSyncbaseStatsResult{}
+		metrics := []metricData{}
+		switch t.taskType {
+		case taskTypeCpu:
+			if cpuUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatCPUUsagePercent); err != nil {
+				result.err = err
+			} else {
+				metrics = append(metrics, metricData{
+					metricLabel: "syscpu-usage-pct",
+					value:       cpuUsagePct,
+				})
+			}
+		case taskTypeMem:
+			if memUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatMemUsagePercent); err != nil {
+				result.err = err
+			} else {
+				metrics = append(metrics,
+					metricData{
+						metricLabel: "sysmem-usage-pct",
+						value:       memUsagePct,
+					})
+			}
+			if memUsageBytes, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatMemUsageBytes); err != nil {
+				result.err = err
+			} else {
+				metrics = append(metrics,
+					metricData{
+						metricLabel: "sysmem-usage-bytes",
+						value:       memUsageBytes,
+					})
+			}
+		case taskTypeDisk:
+			if diskUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatDiskUsagePercent); err != nil {
+				result.err = err
+			} else {
+				metrics = append(metrics,
+					metricData{
+						metricLabel: "sysdisk-usage-pct",
+						value:       diskUsagePct,
+					})
+			}
+			if diskUsageBytes, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatDiskUsageBytes); err != nil {
+				result.err = err
+			} else {
+				metrics = append(metrics,
+					metricData{
+						metricLabel: "sysdisk-usage-bytes",
+						value:       diskUsageBytes,
+					})
+			}
+		case taskTypeLatency:
+			if lat, err := getLatency(v23ctx, t.sbMountEntry); err != nil {
+				result.err = err
+			} else {
+				metrics = append(metrics, metricData{
+					metricLabel: "latency",
+					value:       float64(lat.Nanoseconds()) / 1000000.0,
+				})
+			}
+		case taskTypeQPS:
+			if _, totalQPS, err := getQPS(v23ctx, ctx, t.sbMountEntry); err != nil {
+				result.err = err
+			} else {
+				metrics = append(metrics, metricData{
+					metricLabel: "qps",
+					value:       totalQPS,
+				})
+			}
+		}
+		for _, metric := range metrics {
+			getAggregator(aggs, metric.metricLabel).add(metric.value)
+			if err := sendDataToGCM(s, md, metric.value, now, "", "", metric.metricLabel, t.mountedName); err != nil {
+				fmt.Fprintf(ctx.Stderr(), "%v\n", err)
+			} else {
+				test.Pass(ctx, "%s, %s: %v\n", t.mountedName, metric.metricLabel, metric.value)
+			}
+		}
+		results <- result
+	}
+}
+
+func getAggregator(aggs map[string]*aggregator, metricLabel string) *aggregator {
+	_, ok := aggs[metricLabel]
+	if !ok {
+		aggs[metricLabel] = newAggregator()
+	}
+	return aggs[metricLabel]
+}
+
+func getSysStat(v23ctx *context.T, ctx *tool.Context, me *naming.MountEntry, stat string) (float64, error) {
+	me.Name = "" // Need this to make monitoring.GetStat work correctly.
+	values, err := monitoring.GetStat(v23ctx, ctx, *me, stat)
+	if err != nil {
+		return -1, err
+	}
+	v := values[0]
+	fv, err := v.GetFloat64Value()
+	if err != nil {
+		return -1, err
+	}
+	return fv, nil
+}
diff --git a/vmon/doc.go b/vmon/doc.go
index 3972f6c..9d6524e 100644
--- a/vmon/doc.go
+++ b/vmon/doc.go
@@ -115,11 +115,12 @@
 Usage:
    vmon md create [flags] <names>
 
-<names> is a list of metric descriptor names to create. Available: gce-instance,
-jenkins, nginx, rpc-load-test, service-counters, service-counters-agg,
-service-latency, service-latency-agg, service-metadata, service-metadata-agg,
-service-permethod-latency, service-permethod-latency-agg, service-qps-method,
-service-qps-method-agg, service-qps-total, service-qps-total-agg
+<names> is a list of metric descriptor names to create. Available:
+cloud-syncbase, cloud-syncbase-agg, gce-instance, jenkins, nginx, rpc-load-test,
+service-counters, service-counters-agg, service-latency, service-latency-agg,
+service-metadata, service-metadata-agg, service-permethod-latency,
+service-permethod-latency-agg, service-qps-method, service-qps-method-agg,
+service-qps-total, service-qps-total-agg
 
 The vmon md create flags are:
  -color=true
@@ -138,11 +139,12 @@
 Usage:
    vmon md delete [flags] <names>
 
-<names> is a list of metric descriptor names to delete. Available: gce-instance,
-jenkins, nginx, rpc-load-test, service-counters, service-counters-agg,
-service-latency, service-latency-agg, service-metadata, service-metadata-agg,
-service-permethod-latency, service-permethod-latency-agg, service-qps-method,
-service-qps-method-agg, service-qps-total, service-qps-total-agg
+<names> is a list of metric descriptor names to delete. Available:
+cloud-syncbase, cloud-syncbase-agg, gce-instance, jenkins, nginx, rpc-load-test,
+service-counters, service-counters-agg, service-latency, service-latency-agg,
+service-metadata, service-metadata-agg, service-permethod-latency,
+service-permethod-latency-agg, service-qps-method, service-qps-method-agg,
+service-qps-total, service-qps-total-agg
 
 The vmon md delete flags are:
  -color=true
@@ -254,8 +256,8 @@
    vmon check run [flags] <names>
 
 <names> is a list of names identifying the checks to run. Available:
-gce-instance, jenkins, rpc-load-test, service-counters, service-latency,
-service-metadata, service-permethod-latency, service-qps
+cloud-syncbase, gce-instance, jenkins, rpc-load-test, service-counters,
+service-latency, service-metadata, service-permethod-latency, service-qps
 
 The vmon check run flags are:
  -bin-dir=
diff --git a/vmon/servicecommon.go b/vmon/servicecommon.go
index b27a403..f40c9db 100644
--- a/vmon/servicecommon.go
+++ b/vmon/servicecommon.go
@@ -112,7 +112,7 @@
 				},
 			},
 		}}).Do(); err != nil {
-		return fmt.Errorf("Timeseries Write failed for metric %q with value %f: %v", md.Name, value, err)
+		return fmt.Errorf("Timeseries Write failed for metric %q with value %f and labels %v: %v", md.Name, value, labels, err)
 	}
 	return nil
 }
diff --git a/vmon/servicelatency.go b/vmon/servicelatency.go
index 4719871..d999475 100644
--- a/vmon/servicelatency.go
+++ b/vmon/servicelatency.go
@@ -12,6 +12,7 @@
 
 	"v.io/jiri/tool"
 	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/v23/options"
 	"v.io/v23/rpc/reserved"
 	"v.io/v23/verror"
@@ -113,17 +114,12 @@
 	latencies := []latencyData{}
 	errors := []error{}
 	for _, group := range groups {
-		latency := timeout
 		v23ctx, cancel := context.WithTimeout(v23ctx, timeout)
 		defer cancel()
-		start := time.Now()
-		if _, err := reserved.Signature(v23ctx, "", options.Preresolved{&group}); err != nil {
-			if verror.ErrorID(err) != verror.ErrTimeout.ID {
-				errors = append(errors, err)
-				continue
-			}
-		} else {
-			latency = time.Now().Sub(start)
+		latency, err := getLatency(v23ctx, &group)
+		if err != nil {
+			errors = append(errors, err)
+			continue
 		}
 		location, err := monitoring.GetServiceLocation(v23ctx, ctx, group)
 		if err != nil {
@@ -141,3 +137,16 @@
 
 	return latencies, nil
 }
+
+func getLatency(v23ctx *context.T, me *naming.MountEntry) (time.Duration, error) {
+	latency := timeout
+	start := time.Now()
+	if _, err := reserved.Signature(v23ctx, "", options.Preresolved{me}); err != nil {
+		if verror.ErrorID(err) != verror.ErrTimeout.ID {
+			return -1, err
+		}
+	} else {
+		latency = time.Now().Sub(start)
+	}
+	return latency, nil
+}
diff --git a/vmon/serviceqps.go b/vmon/serviceqps.go
index 7044731..f7c076d 100644
--- a/vmon/serviceqps.go
+++ b/vmon/serviceqps.go
@@ -14,6 +14,7 @@
 
 	"v.io/jiri/tool"
 	"v.io/v23/context"
+	"v.io/v23/naming"
 	"v.io/x/devtools/internal/monitoring"
 	"v.io/x/devtools/internal/test"
 	"v.io/x/ref/services/stats"
@@ -141,31 +142,11 @@
 	qps := []qpsData{}
 	errors := []error{}
 	for _, group := range groups {
-		perMethodQPS := map[string]float64{}
-		totalQPS := 0.0
-		qpsResults, err := monitoring.GetStat(v23ctx, ctx, group, qpsSuffix)
+		perMethodQPS, totalQPS, err := getQPS(v23ctx, ctx, &group)
 		if err != nil {
 			errors = append(errors, err)
 			continue
 		}
-		curPerMethodQPS := map[string]float64{}
-		curTotalQPS := 0.0
-		for _, r := range qpsResults {
-			data, ok := r.Value.(stats.HistogramValue)
-			if !ok {
-				return nil, fmt.Errorf("invalid qps data: %v", r)
-			}
-			matches := qpsRE.FindStringSubmatch(r.Name)
-			if matches == nil {
-				continue
-			}
-			method := matches[1]
-			qps := (float64)(data.Count) / 60.0
-			curPerMethodQPS[method] += qps
-			curTotalQPS += qps
-		}
-		perMethodQPS = curPerMethodQPS
-		totalQPS = curTotalQPS
 		if len(perMethodQPS) == 0 {
 			errors = append(errors, fmt.Errorf("failed to check qps for service %q", serviceName))
 			continue
@@ -188,3 +169,27 @@
 
 	return qps, nil
 }
+
+func getQPS(v23ctx *context.T, ctx *tool.Context, me *naming.MountEntry) (map[string]float64, float64, error) {
+	qpsResults, err := monitoring.GetStat(v23ctx, ctx, *me, qpsSuffix)
+	if err != nil {
+		return nil, -1, err
+	}
+	perMethodQPS := map[string]float64{}
+	totalQPS := 0.0
+	for _, r := range qpsResults {
+		data, ok := r.Value.(stats.HistogramValue)
+		if !ok {
+			return nil, -1, fmt.Errorf("invalid qps data: %v", r)
+		}
+		matches := qpsRE.FindStringSubmatch(r.Name)
+		if matches == nil {
+			continue
+		}
+		method := matches[1]
+		qps := (float64)(data.Count) / 60.0
+		perMethodQPS[method] += qps
+		totalQPS += qps
+	}
+	return perMethodQPS, totalQPS, nil
+}