blob: 6af72996dc920ac62fdf8ca11d31dbddf163bf4b [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"bytes"
"encoding/json"
"fmt"
"math"
"os"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
cloudmonitoring "google.golang.org/api/monitoring/v3"
"v.io/jiri/tool"
"v.io/x/devtools/internal/monitoring"
"v.io/x/lib/cmdline"
)
const (
cloudServiceLatencyMetric = "custom.cloudmonitoring.googleapis.com/vanadium/service/latency"
cloudServiceCountersMetric = "custom.cloudmonitoring.googleapis.com/vanadium/service/counters"
cloudServiceQPSMetric = "custom.cloudmonitoring.googleapis.com/vanadium/service/qps/total"
nginxStatsMetric = "custom.cloudmonitoring.googleapis.com/vanadium/nginx/stats"
gceStatsMetric = "custom.cloudmonitoring.googleapis.com/vanadium/gce-instance/stats"
metricNameLabelKey = "custom.cloudmonitoring.googleapis.com/metric-name"
gceInstanceLabelKey = "custom.cloudmonitoring.googleapis.com/gce-instance"
gceZoneLabelKey = "custom.cloudmonitoring.googleapis.com/gce-zone"
historyDuration = time.Hour
serviceStatusOK = "serviceStatusOK"
serviceStatusWarning = "serviceStatusWarning"
serviceStatusDown = "serviceStatusDown"
warningLatency = 2000
criticalLatency = 5000
)
const (
thresholdHoldMinutes = 5
thresholdCPU = 90
thresholdDisk = 85
thresholdMounttableQPS = 150
thresholdPing = 500
thresholdRam = 90
thresholdServiceLatency = 2000.0
thresholdTCPConn = 200
buildInfoEndpointPrefix = "devmgr/apps/*/*/*/stats/system/metadata"
namespaceRoot = "/ns.dev.v.io:8151"
)
var (
binDirFlag string
credentialsFlag string
keyFileFlag string
projectFlag string
serviceAccountFlag string
// Running debug stats read
// devmgr/apps/*/*/*/stats/system/metadata/build.[TPUM]* takes > 1
// minute on the jenkins corp nodes with the RPC backward compatibility
// retry change.
debugCommandTimeout = 2 * time.Minute
debugRPCTimeout = 90 * time.Second
buildInfoRE = regexp.MustCompile(`devmgr/apps/([^/]*)/.*/stats/system/metadata/build.(Pristine|Time|User|Manifest):\s*(.*)`)
manifestRE = regexp.MustCompile(`.*label="(.*)">`)
)
type oncallData struct {
CollectionTimestamp int64
Zones map[string]*zoneData // Indexed by zone names.
OncallIDs string // IDs separated by ",".
}
type zoneData struct {
Instances map[string]*allMetricsData // Indexed by instance names.
Max *allMetricsData
Average *allMetricsData
}
type allMetricsData struct {
CloudServiceLatency map[string]*metricData // Indexed by metric names. Same below.
CloudServiceStats map[string]*metricData
CloudServiceGCE map[string]*metricData
CloudServiceBuildInfo map[string]*buildInfoData
NginxLoad map[string]*metricData
NginxGCE map[string]*metricData
GCEInfo *gceInfoData
Range *rangeData
}
type metricData struct {
ZoneName string
InstanceName string
Name string
CurrentValue float64
MinTime int64
MaxTime int64
MinValue float64
MaxValue float64
HistoryTimestamps []int64
HistoryValues []float64
Threshold float64
Healthy bool
}
// metricDataMap is a map with the following structure:
// {zoneName, {instanceName, {metricName, *metricData}}}.
type metricDataMap map[string]map[string]map[string]*metricData
type gceInfoData struct {
Status string
Id string
}
type buildInfoData struct {
ZoneName string
InstanceName string
ServiceName string
IsPristine string
Snapshot string
Time string
User string
}
type rangeData struct {
MinTime int64
MaxTime int64
}
func (r *rangeData) update(newMinTime, newMaxTime int64) {
if newMinTime < r.MinTime {
r.MinTime = newMinTime
}
if newMaxTime > r.MaxTime {
r.MaxTime = newMaxTime
}
}
type aggMetricData struct {
TimestampsToValues map[int64][]float64
}
type aggAllMetricsData struct {
CloudServiceLatency map[string]*aggMetricData // Indexed by metric names. Same below.
CloudServiceStats map[string]*aggMetricData
CloudServiceGCE map[string]*aggMetricData
NginxLoad map[string]*aggMetricData
NginxGCE map[string]*aggMetricData
}
type serviceStatusData struct {
CollectionTimestamp int64
Status []statusData
}
type statusData struct {
Name string
BuildTimestamp string
SnapshotLabel string
CurrentStatus string
Incidents []incidentData
}
type incidentData struct {
Start int64
Duration int64
Status string
}
type int64arr []int64
func (a int64arr) Len() int { return len(a) }
func (a int64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a int64arr) Less(i, j int) bool { return a[i] < a[j] }
func (a int64arr) Sort() { sort.Sort(a) }
func init() {
cmdCollect.Flags.StringVar(&binDirFlag, "bin-dir", "", "The path where all binaries are downloaded.")
cmdCollect.Flags.StringVar(&keyFileFlag, "key", "", "The path to the service account's JSON credentials file.")
cmdCollect.Flags.StringVar(&projectFlag, "project", "", "The GCM's corresponding GCE project ID.")
cmdCollect.Flags.StringVar(&credentialsFlag, "v23.credentials", "", "The path to v23 credentials.")
}
// cmdCollect represents the 'collect' command of the oncall tool.
var cmdCollect = &cmdline.Command{
Name: "collect",
Short: "Collect data for oncall dashboard",
Long: `
This subcommand collects data from Google Cloud Monitoring and stores the
processed data to Google Storage.
`,
Runner: cmdline.RunnerFunc(runCollect),
}
func runCollect(env *cmdline.Env, _ []string) error {
ctx := tool.NewContextFromEnv(env)
s, err := monitoring.Authenticate(keyFileFlag)
if err != nil {
return err
}
now := time.Now()
// Collect oncall related data used in the internal oncall dashboard.
zones := map[string]*zoneData{}
oncall := &oncallData{
CollectionTimestamp: now.Unix(),
Zones: zones,
}
if err := collectCloudServicesData(ctx, s, now, zones); err != nil {
return err
}
if err := collectCloudServicesBuildInfo(ctx, zones); err != nil {
return err
}
if err := collectNginxData(ctx, s, now, zones); err != nil {
return err
}
if err := collectGCEInstancesData(ctx, s, now, zones); err != nil {
return err
}
if err := collectOncallIDsData(ctx, oncall); err != nil {
return err
}
// Collect service status data used in the external dashboard.
buildInfo := zones["us-central1-c"].Instances["vanadium-cell-master"].CloudServiceBuildInfo
statusData, err := collectServiceStatusData(ctx, s, now, buildInfo)
if err != nil {
return err
}
if err := persistOncallData(ctx, statusData, oncall, now); err != nil {
return err
}
return nil
}
func collectServiceStatusData(ctx *tool.Context, s *cloudmonitoring.Service, now time.Time, buildInfo map[string]*buildInfoData) (*serviceStatusData, error) {
// Collect data for the last 8 days and aggregate data every 10 minutes.
resp, err := s.Projects.TimeSeries.List(fmt.Sprintf("projects/%s", projectFlag)).
Filter(fmt.Sprintf("metric.type=%s", cloudServiceLatencyMetric)).
AggregationAlignmentPeriod(fmt.Sprintf("%ds", 10*60)).
AggregationPerSeriesAligner("ALIGN_MAX").
IntervalStartTime(now.AddDate(0, 0, -8).Format(time.RFC3339)).
IntervalEndTime(now.Format(time.RFC3339)).Do()
if err != nil {
return nil, fmt.Errorf("List failed: %v", err)
}
status := []statusData{}
for _, t := range resp.TimeSeries {
serviceName := t.Metric.Labels[metricNameLabelKey]
curStatusData := statusData{
Name: serviceName,
CurrentStatus: statusForLatency(t.Points[0].Value.DoubleValue), // t.Points[0] is the latest
}
incidents, err := calcIncidents(t.Points)
if err != nil {
return nil, err
}
curStatusData.Incidents = incidents
buildInfoServiceName := serviceName
switch serviceName {
case "binary discharger", "google identity service", "macaroon service":
buildInfoServiceName = "identityd"
case "mounttable":
buildInfoServiceName = "mounttabled"
case "proxy service":
buildInfoServiceName = "proxyd"
case "binary repository":
buildInfoServiceName = "binaryd"
case "application repository":
buildInfoServiceName = "applicationd"
}
curBuildInfo := buildInfo[buildInfoServiceName]
if curBuildInfo != nil {
ts, err := strconv.ParseInt(curBuildInfo.Time, 10, 64)
if err != nil {
return nil, fmt.Errorf("ParseInt(%s) failed: %v", curBuildInfo.Time, err)
}
curStatusData.BuildTimestamp = time.Unix(ts, 0).Format(time.RFC822)
curStatusData.SnapshotLabel = strings.Replace(curBuildInfo.Snapshot, "snapshot/labels/", "", -1)
}
status = append(status, curStatusData)
}
return &serviceStatusData{
CollectionTimestamp: now.Unix(),
Status: status,
}, nil
}
func calcIncidents(points []*cloudmonitoring.Point) ([]incidentData, error) {
lastStatus := serviceStatusOK
incidents := []incidentData{}
var curIncident incidentData
// "points" are sorted from now to past. To calculate incidents, we iterate
// through them backwards.
for i := len(points) - 1; i >= 0; i-- {
point := points[i]
value := point.Value.DoubleValue
curStatus := statusForLatency(value)
if curStatus != lastStatus {
pointTime, err := time.Parse(time.RFC3339, point.Interval.StartTime)
if err != nil {
return nil, fmt.Errorf("time.Parse(%s) failed: %v", point.Interval.StartTime, err)
}
// Set the duration of the last incident.
if curIncident.Status != "" {
curIncident.Duration = pointTime.Unix() - curIncident.Start
incidents = append(incidents, curIncident)
curIncident.Status = ""
}
// At the start of an incident, create a new incidentData object, and
// record the incident start time and status.
if curStatus != serviceStatusOK {
curIncident = incidentData{}
curIncident.Start = pointTime.Unix()
curIncident.Status = curStatus
}
lastStatus = curStatus
}
}
// Process the possible last incident.
if lastStatus != serviceStatusOK {
strLastPointTime := points[0].Interval.StartTime
pointTime, err := time.Parse(time.RFC3339, strLastPointTime)
if err != nil {
return nil, fmt.Errorf("time.Parse(%q) failed: %v", strLastPointTime, err)
}
curIncident.Duration = pointTime.Unix() - curIncident.Start
incidents = append(incidents, curIncident)
}
return incidents, nil
}
func statusForLatency(latency float64) string {
if latency < warningLatency {
return serviceStatusOK
}
if latency < criticalLatency {
return serviceStatusWarning
}
return serviceStatusDown
}
func collectCloudServicesData(ctx *tool.Context, s *cloudmonitoring.Service, now time.Time, zones map[string]*zoneData) error {
// Collect and add latency data.
latencyMetrics, err := getMetricData(ctx, s, cloudServiceLatencyMetric, now, "latency")
if err != nil {
return err
}
for zone, instanceMap := range latencyMetrics {
if zones[zone] == nil {
zones[zone] = newZoneData(zone)
}
zoneData := zones[zone]
aggData := map[string]*aggMetricData{}
for instance, metricMap := range instanceMap {
if zoneData.Instances[instance] == nil {
zoneData.Instances[instance] = newInstanceData()
}
zoneData.Instances[instance].CloudServiceLatency = metricMap
for _, metric := range metricMap {
metric.Threshold = thresholdServiceLatency
if metric.Threshold != -1 {
metric.Healthy = !overThresholdFor(metric.HistoryTimestamps, metric.HistoryValues, thresholdServiceLatency, thresholdHoldMinutes)
}
aggregateMetricData(aggData, metric)
}
}
maxData, maxRangeData, averageData, averageRangeData := calculateMaxAndAverageData(aggData, zone)
zoneData.Max.CloudServiceLatency, zoneData.Average.CloudServiceLatency = maxData, averageData
zoneData.Max.Range.update(maxRangeData.MinTime, maxRangeData.MaxTime)
zoneData.Average.Range.update(averageRangeData.MinTime, averageRangeData.MaxTime)
}
// Collect and add stats (counters + qps) data.
counterMetrics, err := getMetricData(ctx, s, cloudServiceCountersMetric, now, "")
if err != nil {
return err
}
qpsMetrics, err := getMetricData(ctx, s, cloudServiceQPSMetric, now, "qps")
if err != nil {
return err
}
aggDataByZone := map[string]map[string]*aggMetricData{}
addStatsFn := func(metrics metricDataMap) {
for zone, instanceMap := range metrics {
if zones[zone] == nil {
zones[zone] = newZoneData(zone)
}
zoneData := zones[zone]
aggData := aggDataByZone[zone]
if aggData == nil {
aggData = map[string]*aggMetricData{}
}
aggDataByZone[zone] = aggData
for instance, metricMap := range instanceMap {
if zoneData.Instances[instance] == nil {
zoneData.Instances[instance] = newInstanceData()
}
stats := zoneData.Instances[instance].CloudServiceStats
if stats == nil {
stats = map[string]*metricData{}
zoneData.Instances[instance].CloudServiceStats = stats
}
for metricName, metric := range metricMap {
metric.Threshold = getThreshold(metricName)
if metric.Threshold != -1 {
metric.Healthy = !overThresholdFor(metric.HistoryTimestamps, metric.HistoryValues, metric.Threshold, thresholdHoldMinutes)
}
stats[metricName] = metric
aggregateMetricData(aggData, metric)
}
}
}
}
addStatsFn(counterMetrics)
addStatsFn(qpsMetrics)
for zone, aggData := range aggDataByZone {
zoneData := zones[zone]
maxData, maxRangeData, averageData, averageRangeData := calculateMaxAndAverageData(aggData, zone)
zoneData.Max.CloudServiceStats, zoneData.Average.CloudServiceStats = maxData, averageData
zoneData.Max.Range.update(maxRangeData.MinTime, maxRangeData.MaxTime)
zoneData.Average.Range.update(averageRangeData.MinTime, averageRangeData.MaxTime)
}
return nil
}
func collectCloudServicesBuildInfo(ctx *tool.Context, zones map[string]*zoneData) error {
serviceLocation := monitoring.ServiceLocationMap[namespaceRoot]
if serviceLocation == nil {
return fmt.Errorf("failed to find service location for %q", namespaceRoot)
}
zone := serviceLocation.Zone
instance := serviceLocation.Instance
if zones[zone] == nil {
zones[zone] = newZoneData(zone)
}
// Run "debug stats read" command to query build info data.
debug := filepath.Join(binDirFlag, "debug")
var stdoutBuf, stderrBuf bytes.Buffer
if err := ctx.NewSeq().
Capture(&stdoutBuf, &stderrBuf).
Timeout(debugCommandTimeout).
Last(debug,
"--timeout", debugRPCTimeout.String(),
"--v23.namespace.root", namespaceRoot,
"--v23.credentials", credentialsFlag, "stats", "read", fmt.Sprintf("%s/build.[TPUM]*", buildInfoEndpointPrefix)); err != nil {
return fmt.Errorf("debug command failed: %v\nSTDERR:\n%s\nSTDOUT:\n%s\nEND\n", err, stderrBuf.String(), stdoutBuf.String())
}
// Parse output.
lines := strings.Split(stdoutBuf.String(), "\n")
buildInfoByServiceName := map[string]*buildInfoData{}
for _, line := range lines {
matches := buildInfoRE.FindStringSubmatch(line)
if matches != nil {
service := matches[1]
metadataName := matches[2]
value := matches[3]
if _, ok := buildInfoByServiceName[service]; !ok {
buildInfoByServiceName[service] = &buildInfoData{
ZoneName: zone,
InstanceName: instance,
ServiceName: service,
}
}
curBuildInfo := buildInfoByServiceName[service]
switch metadataName {
case "Manifest":
manifestMatches := manifestRE.FindStringSubmatch(value)
if manifestMatches != nil {
curBuildInfo.Snapshot = strings.Replace(manifestMatches[1], "snapshot/labels/", "", -1)
}
case "Pristine":
curBuildInfo.IsPristine = value
case "Time":
t, err := time.Parse(time.RFC3339, value)
if err != nil {
return fmt.Errorf("Parse(%s) failed: %v", value, err)
}
curBuildInfo.Time = fmt.Sprintf("%d", t.Unix())
case "User":
curBuildInfo.User = value
}
}
}
if zones[zone].Instances[instance] == nil {
zones[zone].Instances[instance] = newInstanceData()
}
zones[zone].Instances[instance].CloudServiceBuildInfo = buildInfoByServiceName
return nil
}
func collectNginxData(ctx *tool.Context, s *cloudmonitoring.Service, now time.Time, zones map[string]*zoneData) error {
nginxMetrics, err := getMetricData(ctx, s, nginxStatsMetric, now, "")
if err != nil {
return err
}
for zone, instanceMap := range nginxMetrics {
if zones[zone] == nil {
zones[zone] = newZoneData(zone)
}
zoneData := zones[zone]
aggData := map[string]*aggMetricData{}
for instance, metricMap := range instanceMap {
if !strings.HasPrefix(instance, "nginx") {
continue
}
if zoneData.Instances[instance] == nil {
zoneData.Instances[instance] = newInstanceData()
}
zoneData.Instances[instance].NginxLoad = metricMap
for _, metric := range metricMap {
aggregateMetricData(aggData, metric)
}
}
maxData, maxRangeData, averageData, averageRangeData := calculateMaxAndAverageData(aggData, zone)
zoneData.Max.NginxLoad, zoneData.Average.NginxLoad = maxData, averageData
zoneData.Max.Range.update(maxRangeData.MinTime, maxRangeData.MaxTime)
zoneData.Average.Range.update(averageRangeData.MinTime, averageRangeData.MaxTime)
}
return nil
}
func collectGCEInstancesData(ctx *tool.Context, s *cloudmonitoring.Service, now time.Time, zones map[string]*zoneData) error {
// Query and add GCE stats.
gceMetrics, err := getMetricData(ctx, s, gceStatsMetric, now, "")
if err != nil {
return err
}
for zone, instanceMap := range gceMetrics {
if zones[zone] == nil {
zones[zone] = newZoneData(zone)
}
zoneData := zones[zone]
aggDataCloudSerivcesGCE := map[string]*aggMetricData{}
aggDataNginxGCE := map[string]*aggMetricData{}
for instance, metricMap := range instanceMap {
if zoneData.Instances[instance] == nil {
zoneData.Instances[instance] = newInstanceData()
}
cloudServiceGCE := zoneData.Instances[instance].CloudServiceGCE
nginxGCE := zoneData.Instances[instance].NginxGCE
if cloudServiceGCE == nil {
cloudServiceGCE = map[string]*metricData{}
zoneData.Instances[instance].CloudServiceGCE = cloudServiceGCE
}
if nginxGCE == nil {
nginxGCE = map[string]*metricData{}
zoneData.Instances[instance].NginxGCE = nginxGCE
}
// Set thresholds and calculate health.
for metricName, metric := range metricMap {
metric.Threshold = getThreshold(metricName)
if metric.Threshold != -1 {
metric.Healthy = !overThresholdFor(metric.HistoryTimestamps, metric.HistoryValues, metric.Threshold, thresholdHoldMinutes)
}
if strings.HasPrefix(instance, "vanadium") {
cloudServiceGCE[metricName] = metric
aggregateMetricData(aggDataCloudSerivcesGCE, metric)
} else if strings.HasPrefix(instance, "nginx") {
nginxGCE[metricName] = metric
aggregateMetricData(aggDataNginxGCE, metric)
}
}
}
maxData, maxRangeData1, averageData, averageRangeData1 := calculateMaxAndAverageData(aggDataCloudSerivcesGCE, zone)
zoneData.Max.CloudServiceGCE, zoneData.Average.CloudServiceGCE = maxData, averageData
maxData, maxRangeData2, averageData, averageRangeData2 := calculateMaxAndAverageData(aggDataNginxGCE, zone)
zoneData.Max.NginxGCE, zoneData.Average.NginxGCE = maxData, averageData
zoneData.Max.Range.update(maxRangeData1.MinTime, maxRangeData1.MaxTime)
zoneData.Max.Range.update(maxRangeData2.MinTime, maxRangeData2.MaxTime)
zoneData.Average.Range.update(averageRangeData1.MinTime, averageRangeData1.MaxTime)
zoneData.Average.Range.update(averageRangeData2.MinTime, averageRangeData2.MaxTime)
}
// Use "gcloud compute instances list" to get instances status.
var out bytes.Buffer
if err := ctx.NewSeq().Capture(&out, &out).Last("gcloud", "-q", "--project="+projectFlag, "compute", "instances", "list", "--format=json"); err != nil {
return err
}
type instanceData struct {
Name string
Zone string
Status string
Id string
}
var instances []instanceData
if err := json.Unmarshal(out.Bytes(), &instances); err != nil {
return fmt.Errorf("Unmarshal() failed: %v", err)
}
instancesByZone := map[string][]instanceData{}
for _, instance := range instances {
if strings.HasPrefix(instance.Name, "nginx") || strings.HasPrefix(instance.Name, "vanadium") {
instancesByZone[instance.Zone] = append(instancesByZone[instance.Zone], instance)
}
}
// Add instance status.
for zone, instances := range instancesByZone {
curZone := zones[zone]
if curZone == nil {
continue
}
for _, instance := range instances {
curZone.Instances[instance.Name].GCEInfo = &gceInfoData{
Status: instance.Status,
Id: instance.Id,
}
}
}
return nil
}
func collectOncallIDsData(ctx *tool.Context, oncall *oncallData) error {
var out bytes.Buffer
if err := ctx.NewSeq().Capture(&out, &out).Last("jiri", "oncall"); err != nil {
return err
}
oncall.OncallIDs = strings.TrimSpace(out.String())
return nil
}
// overThresholdFor checks whether the most recent values of the given time
// series are over the given threshold for the the given amount of time.
// This function assumes that the given time series data points are sorted by
// time (oldest first).
func overThresholdFor(timestamps []int64, values []float64, threshold float64, holdMinutes int) bool {
numPoints := len(timestamps)
maxTime := timestamps[numPoints-1]
for i := numPoints - 1; i >= 0; i-- {
t := timestamps[i]
v := values[i]
if v >= threshold {
if (maxTime - t) >= int64(holdMinutes*60) {
return true
}
} else {
return false
}
}
return false
}
func newZoneData(zone string) *zoneData {
return &zoneData{
Instances: map[string]*allMetricsData{},
Max: newInstanceData(),
Average: newInstanceData(),
}
}
func newInstanceData() *allMetricsData {
return &allMetricsData{
CloudServiceLatency: map[string]*metricData{},
CloudServiceStats: map[string]*metricData{},
CloudServiceGCE: map[string]*metricData{},
CloudServiceBuildInfo: map[string]*buildInfoData{},
NginxLoad: map[string]*metricData{},
NginxGCE: map[string]*metricData{},
Range: newRangeData(),
}
}
func newRangeData() *rangeData {
return &rangeData{
MinTime: math.MaxInt64,
MaxTime: 0,
}
}
// getMetricData queries GCM with the given metric, and returns metric items
// (metricData) organized in metricDataMap.
func getMetricData(ctx *tool.Context, s *cloudmonitoring.Service, metric string, now time.Time, metricNameSuffix string) (metricDataMap, error) {
// Query the given metric.
resp, err := s.Projects.TimeSeries.List(fmt.Sprintf("projects/%s", projectFlag)).
Filter(fmt.Sprintf("metric.type=%s", metric)).
IntervalStartTime(now.Add(-historyDuration).Format(time.RFC3339)).
IntervalEndTime(now.Format(time.RFC3339)).Do()
if err != nil {
return nil, fmt.Errorf("List() failed: %v", err)
}
// Populate metric items and put them into a metricDataMap.
data := metricDataMap{}
for _, t := range resp.TimeSeries {
zone := t.Metric.Labels[gceZoneLabelKey]
instance := t.Metric.Labels[gceInstanceLabelKey]
metricName := t.Metric.Labels[metricNameLabelKey]
if metricNameSuffix != "" {
metricName = fmt.Sprintf("%s %s", metricName, metricNameSuffix)
}
instanceMap := data[zone]
if instanceMap == nil {
instanceMap = map[string]map[string]*metricData{}
data[zone] = instanceMap
}
metricMap := instanceMap[instance]
if metricMap == nil {
metricMap = map[string]*metricData{}
instanceMap[instance] = metricMap
}
curMetricData := metricMap[metricName]
if curMetricData == nil {
curMetricData = &metricData{
ZoneName: zone,
InstanceName: instance,
Name: metricName,
CurrentValue: t.Points[0].Value.DoubleValue,
MinTime: math.MaxInt64,
MaxTime: 0,
MinValue: math.MaxFloat64,
MaxValue: 0,
Threshold: -1,
Healthy: true,
}
metricMap[metricName] = curMetricData
}
numPoints := len(t.Points)
timestamps := []int64{}
values := []float64{}
// t.Points are sorted from now to past. We process them starting with the
// latest and going back in time.
for i := numPoints - 1; i >= 0; i-- {
point := t.Points[i]
epochTime, err := time.Parse(time.RFC3339, point.Interval.StartTime)
if err != nil {
fmt.Fprintf(ctx.Stderr(), "%v\n", err)
continue
}
timestamp := epochTime.Unix()
timestamps = append(timestamps, timestamp)
values = append(values, point.Value.DoubleValue)
curMetricData.MinTime = int64(math.Min(float64(curMetricData.MinTime), float64(timestamp)))
curMetricData.MaxTime = int64(math.Max(float64(curMetricData.MaxTime), float64(timestamp)))
curMetricData.MinValue = math.Min(curMetricData.MinValue, point.Value.DoubleValue)
curMetricData.MaxValue = math.Max(curMetricData.MaxValue, point.Value.DoubleValue)
}
curMetricData.HistoryTimestamps = timestamps
curMetricData.HistoryValues = values
}
return data, nil
}
// aggregateMetricData aggregates the history values of the given metric data
// into the given aggData map indexed by metric names.
func aggregateMetricData(aggData map[string]*aggMetricData, metric *metricData) {
metricName := metric.Name
curAggMetricData := aggData[metricName]
if curAggMetricData == nil {
curAggMetricData = &aggMetricData{
TimestampsToValues: map[int64][]float64{},
}
aggData[metricName] = curAggMetricData
}
numPoints := len(metric.HistoryTimestamps)
for i := 0; i < numPoints; i++ {
t := metric.HistoryTimestamps[i]
v := metric.HistoryValues[i]
curAggMetricData.TimestampsToValues[t] = append(curAggMetricData.TimestampsToValues[t], v)
}
}
// calculateMaxAndAverageData calculates and returns the max and average data
// from the given aggregated data.
func calculateMaxAndAverageData(aggData map[string]*aggMetricData, zone string) (map[string]*metricData, *rangeData, map[string]*metricData, *rangeData) {
maxData := map[string]*metricData{}
maxRangeData := newRangeData()
averageData := map[string]*metricData{}
averageRangeData := newRangeData()
for metricName, metricAggData := range aggData {
sortedTimestamps := int64arr{}
for timestamp := range metricAggData.TimestampsToValues {
sortedTimestamps = append(sortedTimestamps, timestamp)
}
sortedTimestamps.Sort()
maxHistoryValues := []float64{}
averageHistoryValues := []float64{}
minMaxValue := math.MaxFloat64
maxMaxValue := 0.0
minAverageValue := math.MaxFloat64
maxAverageValue := 0.0
for _, timestamp := range sortedTimestamps {
values := metricAggData.TimestampsToValues[timestamp]
curMax := values[0]
curSum := 0.0
for _, v := range values {
if v > curMax {
curMax = v
}
curSum += v
}
curAverage := curSum / float64(len(values))
maxHistoryValues = append(maxHistoryValues, curMax)
averageHistoryValues = append(averageHistoryValues, curAverage)
minMaxValue = math.Min(minMaxValue, curMax)
maxMaxValue = math.Max(maxMaxValue, curMax)
minAverageValue = math.Min(minAverageValue, curAverage)
maxAverageValue = math.Max(maxAverageValue, curAverage)
}
minTime := sortedTimestamps[0]
maxTime := sortedTimestamps[len(sortedTimestamps)-1]
threshold := getThreshold(metricName)
maxData[metricName] = &metricData{
ZoneName: zone,
Name: metricName,
CurrentValue: maxHistoryValues[len(maxHistoryValues)-1],
MinTime: minTime,
MaxTime: maxTime,
MinValue: minMaxValue,
MaxValue: maxMaxValue,
HistoryTimestamps: sortedTimestamps,
HistoryValues: maxHistoryValues,
Threshold: threshold,
Healthy: true,
}
if threshold != -1 {
maxData[metricName].Healthy = !overThresholdFor(sortedTimestamps, maxHistoryValues, threshold, thresholdHoldMinutes)
}
maxRangeData.update(minTime, maxTime)
averageData[metricName] = &metricData{
ZoneName: zone,
Name: metricName,
CurrentValue: averageHistoryValues[len(maxHistoryValues)-1],
MinTime: minTime,
MaxTime: maxTime,
MinValue: minAverageValue,
MaxValue: maxAverageValue,
HistoryTimestamps: sortedTimestamps,
HistoryValues: averageHistoryValues,
Threshold: threshold,
Healthy: true,
}
if threshold != -1 {
averageData[metricName].Healthy = !overThresholdFor(sortedTimestamps, averageHistoryValues, threshold, thresholdHoldMinutes)
}
averageRangeData.update(minTime, maxTime)
}
return maxData, maxRangeData, averageData, averageRangeData
}
func getThreshold(metricName string) float64 {
if strings.HasSuffix(metricName, "latency") {
return thresholdServiceLatency
}
switch metricName {
case "mounttable qps":
return thresholdMounttableQPS
case "cpu-usage":
return thresholdCPU
case "disk-usage":
return thresholdDisk
case "memory-usage":
return thresholdRam
case "ping":
return thresholdPing
case "tcpconn":
return thresholdTCPConn
}
return -1.0
}
func persistOncallData(ctx *tool.Context, statusData *serviceStatusData, oncall *oncallData, now time.Time) error {
// Use timestamp (down to the minute part) as the main file name.
// We store oncall data and status data separately for efficiency.
bytesStatus, err := json.MarshalIndent(statusData, "", " ")
if err != nil {
return fmt.Errorf("MarshalIndent() failed: %v", err)
}
bytesOncall, err := json.MarshalIndent(oncall, "", " ")
if err != nil {
return fmt.Errorf("MarshalIndent() failed: %v", err)
}
s := ctx.NewSeq()
// Write data to a temporary directory.
curTime := now.Format("200601021504")
tmpDir, err := s.TempDir("", "")
if err != nil {
return err
}
defer ctx.NewSeq().RemoveAll(tmpDir)
statusDataFile := filepath.Join(tmpDir, fmt.Sprintf("%s.status", curTime))
oncallDataFile := filepath.Join(tmpDir, fmt.Sprintf("%s.oncall", curTime))
latestFile := filepath.Join(tmpDir, "latest")
args := []string{"-q", "cp", filepath.Join(tmpDir, "*"), bucketData + "/"}
// Upload data to Google Storage.
return s.WriteFile(statusDataFile, bytesStatus, os.FileMode(0600)).
WriteFile(oncallDataFile, bytesOncall, os.FileMode(0600)).
WriteFile(latestFile, []byte(curTime), os.FileMode(0600)).
Last("gsutil", args...)
}