blob: 3b57034997e2fd707727281f32fce716b3196b86 [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"fmt"
"runtime"
"sync"
"time"
cloudmonitoring "google.golang.org/api/monitoring/v3"
"v.io/jiri/tool"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/x/devtools/internal/monitoring"
"v.io/x/devtools/internal/test"
"v.io/x/lib/gcm"
)
const (
cloudSyncbasePattern = "sb/syncbased-*"
sysStatCPUUsagePercent = "__debug/stats/system/syscpu/Percent"
sysStatMemUsagePercent = "__debug/stats/system/sysmem/UsedPercent"
sysStatMemUsageBytes = "__debug/stats/system/sysmem/Used"
sysStatDiskUsagePercent = "__debug/stats/system/sysdisk/%2Fdata/UsedPercent"
sysStatDiskUsageBytes = "__debug/stats/system/sysdisk/%2Fdata/Used"
)
var (
cloudSyncbaseTimeout = 20 * time.Second
)
type cloudSyncbaseStatsTask struct {
mountedName string
sbMountEntry *naming.MountEntry
taskType cloudSyncbaseStatsTaskType
}
type cloudSyncbaseStatsTaskType int
const (
taskTypeCpu cloudSyncbaseStatsTaskType = iota
taskTypeMem
taskTypeDisk
taskTypeLatency
taskTypeQPS
)
type cloudSyncbaseStatsResult struct {
err error
}
type metricData struct {
metricLabel string
value float64
}
func checkCloudSyncbaseInstances(v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service) error {
v23ctx, cancel := context.WithTimeout(v23ctx, cloudSyncbaseTimeout)
defer cancel()
// Find all cloud syncbase instances.
glob, err := v23.GetNamespace(v23ctx).Glob(v23ctx, cloudSyncbasePattern)
if err != nil {
return err
}
sbInstances := []*naming.MountEntry{}
for e := range glob {
switch e := e.(type) {
case *naming.GlobReplyEntry:
sbInstances = append(sbInstances, &e.Value)
case *naming.GlobReplyError:
fmt.Fprintf(ctx.Stderr(), "%v\n", e.Value.Error)
}
}
// Send number of instances to GCM.
md, err := gcm.GetMetric("cloud-syncbase", projectFlag)
if err != nil {
return err
}
now := time.Now().UTC().Format(time.RFC3339)
numInstances := len(sbInstances)
if err := sendDataToGCM(s, md, float64(numInstances), now, "", "", "count", "_"); err != nil {
fmt.Fprintf(ctx.Stderr(), "%v\n", err)
} else {
test.Pass(ctx, "number of instances: %v\n", numInstances)
}
// Query stats from each instance.
taskTypes := []cloudSyncbaseStatsTaskType{taskTypeCpu, taskTypeMem, taskTypeDisk, taskTypeLatency, taskTypeQPS}
numTasks := numInstances * len(taskTypes)
tasks := make(chan cloudSyncbaseStatsTask, numTasks)
taskResults := make(chan cloudSyncbaseStatsResult, numTasks)
var aggsMu sync.Mutex // protects aggs
aggs := map[string]*aggregator{}
// Start workers and distribute work to them.
for i := 0; i < runtime.NumCPU(); i++ {
go statsWorker(v23ctx, ctx, s, now, &aggsMu, aggs, md, tasks, taskResults)
}
for _, sb := range sbInstances {
for _, t := range taskTypes {
tasks <- cloudSyncbaseStatsTask{
mountedName: sb.Name,
sbMountEntry: sb,
taskType: t,
}
}
}
close(tasks)
// Wait for all results to come back and send the aggregated data to GCM.
for i := 0; i < numTasks; i++ {
result := <-taskResults
if result.err != nil {
fmt.Fprintf(ctx.Stderr(), "%v\n", result.err)
continue
}
}
mdAgg, err := gcm.GetMetric("cloud-syncbase-agg", projectFlag)
if err != nil {
return err
}
for metricLabel, agg := range aggs {
if err := sendAggregatedDataToGCM(ctx, s, mdAgg, agg, now, metricLabel); err != nil {
fmt.Fprintf(ctx.Stderr(), "%v\n", err)
}
}
return nil
}
// statsWorker queries stats based on the task type, sends the results to GCM,
// and updates the corresponding aggregator.
func statsWorker(
v23ctx *context.T, ctx *tool.Context, s *cloudmonitoring.Service,
now string, aggsMu *sync.Mutex, aggs map[string]*aggregator, md *cloudmonitoring.MetricDescriptor,
tasks <-chan cloudSyncbaseStatsTask, results chan<- cloudSyncbaseStatsResult) {
for t := range tasks {
result := cloudSyncbaseStatsResult{}
metrics := []metricData{}
switch t.taskType {
case taskTypeCpu:
if cpuUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatCPUUsagePercent); err != nil {
result.err = err
} else {
metrics = append(metrics, metricData{
metricLabel: "syscpu-usage-pct",
value: cpuUsagePct,
})
}
case taskTypeMem:
if memUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatMemUsagePercent); err != nil {
result.err = err
} else {
metrics = append(metrics,
metricData{
metricLabel: "sysmem-usage-pct",
value: memUsagePct,
})
}
if memUsageBytes, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatMemUsageBytes); err != nil {
result.err = err
} else {
metrics = append(metrics,
metricData{
metricLabel: "sysmem-usage-bytes",
value: memUsageBytes,
})
}
case taskTypeDisk:
if diskUsagePct, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatDiskUsagePercent); err != nil {
result.err = err
} else {
metrics = append(metrics,
metricData{
metricLabel: "sysdisk-usage-pct",
value: diskUsagePct,
})
}
if diskUsageBytes, err := getSysStat(v23ctx, ctx, t.sbMountEntry, sysStatDiskUsageBytes); err != nil {
result.err = err
} else {
metrics = append(metrics,
metricData{
metricLabel: "sysdisk-usage-bytes",
value: diskUsageBytes,
})
}
case taskTypeLatency:
if lat, err := getLatency(v23ctx, t.sbMountEntry); err != nil {
result.err = err
} else {
metrics = append(metrics, metricData{
metricLabel: "latency",
value: float64(lat.Nanoseconds()) / 1000000.0,
})
}
case taskTypeQPS:
if _, totalQPS, err := getQPS(v23ctx, ctx, t.sbMountEntry); err != nil {
result.err = err
} else {
metrics = append(metrics, metricData{
metricLabel: "qps",
value: totalQPS,
})
}
}
for _, metric := range metrics {
getAggregator(aggsMu, aggs, metric.metricLabel).add(metric.value)
if err := sendDataToGCM(s, md, metric.value, now, "", "", metric.metricLabel, t.mountedName); err != nil {
fmt.Fprintf(ctx.Stderr(), "%v\n", err)
} else {
test.Pass(ctx, "%s, %s: %v\n", t.mountedName, metric.metricLabel, metric.value)
}
}
results <- result
}
}
func getAggregator(aggsMu *sync.Mutex, aggs map[string]*aggregator, metricLabel string) *aggregator {
aggsMu.Lock()
defer aggsMu.Unlock()
_, ok := aggs[metricLabel]
if !ok {
aggs[metricLabel] = newAggregator()
}
return aggs[metricLabel]
}
func getSysStat(v23ctx *context.T, ctx *tool.Context, me *naming.MountEntry, stat string) (float64, error) {
me.Name = "" // Need this to make monitoring.GetStat work correctly.
values, err := monitoring.GetStat(v23ctx, ctx, *me, stat)
if err != nil {
return -1, err
}
v := values[0]
fv, err := v.GetFloat64Value()
if err != nil {
return -1, err
}
return fv, nil
}